GroovyGrid: grid computing with Groovy and GridGain

Grid computing is complicated field. Even with perfect GridGain framework, which helps a lot in developing and testing distributed applications, it still require writing some auxilary Java code, which mostly serves framework itself and not application logic (like defining classes for tasks and jobs). Today I want to talk a bit about small Groovy DSL for GridGain, which I develop with purpose to simplify the story.

It is important to notice that GroovyGrid is still to be experimental project and one of the main purposes of this post is to collect opinions if it can be useful for other people.

The task we gonna solve is pretty standard – given directory (let us assume on shared file system) and we want to calculate how many times each character occurs in all Groovy and Java source files recursively. Solution of this problem should help us to see GroovyGrid in action.

For purpose of this post I wrote trivial Groovy class, which will help us to accumulate statistics. It is a bit lengthy but in reality everything it does is definition of several ‘leftShift’ methods, which allow us write something like statistic << data, where data can be list of files, or file, or String or char or even another statistic accumulator. You can easily skip following code.

class CharStatisticAccumulator extends HashMap {
    private CharStatisticAccumulator add (char ch, counter = 1) {
        if (!this[ch])
          this [ch]  = counter
        else
          this [ch] += counter
        this
    }

    CharStatisticAccumulator leftShift (char ch) {
        add ch
    }

    CharStatisticAccumulator leftShift (Map.Entry e) {
        add e.key, e.value
    }

    CharStatisticAccumulator leftShift (Collection coll) {
        coll.each {
            this << it
        }
        this
    }

    CharStatisticAccumulator leftShift (String line) {
        this << Arrays.asList(line.chars)
    }

    CharStatisticAccumulator leftShift (File file) {
        file.eachLine {
            this << it
        }
        this
    }

    CharStatisticAccumulator leftShift (CharStatisticAccumulator statistic) {
        this << statistic.entrySet ()
    }
}

Now we are back to our main task of calculation statistic. Standard map/reduce approach nicely implemented by GridGain advice us how to do it:
1) split files by groups

2) assign calculation of statistic for each group as job to different grid nodes

3) combine results calculated by all jobs in to final statistic.

Here is final code for calculation of our statistic. It is richly commented and I hope self-explaining. I am not trying to be lazy here but found that commented code probably the best way to demonstrate what’s going on.

    /**
     * Recursively calculates number of character usage in
     * .java and .groovy files of given directory
     */
    static def calculateCharStatistic (File dir) {

        // In GridGain each task consist of one or several jobs.
        // Two main operations of task
        // - map - which split task to jobs and assigned jobs to grid nodes
        // - reduce - which combains result of jobs to result of task
        //
        // In GroovyGrid body of closure solves both task.
        // It will be called in 'map' method  with both purposes
        // of creating jobs and define useful callbacks
        // like 'reduce', 'onJobResult' etc.
        GroovyGrid.mapReduce {

            // recursively collect all .java and .groovy files
            // it is usual Groovy and has nothing to do with grid computing
            def files = []
            dir.eachFileRecurse { File innerFile ->
                if (!innerFile.directory
                 && ( innerFile.name.endsWith(".java")
                   || innerFile.name.endsWith(".groovy"))) {
                    files << innerFile
                }
            }

            // group files in to groups by 64 files
            // again usual Groovy iteration/grouping
            int groupIndex = 0
            Map groups = files.groupBy { (groupIndex++) >> 6 }

            
            // iterate groups
            groups.each { groupId, groupFiles ->
                // define grid job for each group
                // It is important to note that closure below will be
                // called not immediately but later 
                // on some (possibly different) grid node.
                def job = job {
                    println "Job $groupId started"
                    CharStatisticAccumulator jobResult
                        = new CharStatisticAccumulator()
                    groupFiles.each { File jobFile ->
                        jobResult << jobFile
                        println "processed $jobFile.absolutePath"
                    }
                    println "Job $groupId ended"

                    // result of job execution
                    jobResult
                }

                // Usually it is not necessary to map jobs to grid nodes
                // manually but if needed it is possible to do
                // by assigning of 'gridNode' property of job
                // 'gridGrid' property of type Grid and
                // 'gridLoadBalancer' of type GridLoadBalancer
                // are injected in each task.
                //
                // In fact, this demostration is only purpose of 'job' variable -
                // you usually don't do it in normal life.
                if (groupId % 5 == 0)
                    job.gridNode = gridGrid.localNode
                else
                    job.gridNode = gridLoadBalancer.getBalancedNode (job)
            }

            // here we will collect statistic
            def taskResult = new CharStatisticAccumulator()

            // defines callback to be called when result
            // of a job becomes available
            onJobResult { GridJobResult jobResult
                      /*, List<GroovyJobResult> receivedBefore */ ->
                // Interesting to notice that we are able
                // to access 'groupId' of the job here
                println "Result of job $jobResult.job.groupId obtained"

                // normally GridGain process all partial results in 'reduce' method
                // It is possible with GroovyGrid as well but here
                // for demonstration purposes we do differently -
                // partial result added immediately in to result statistic
                taskResult << jobResult.data

                // as we don't need result of the job anymore we can forget it
                jobResult.data = null
            }

            // defines callback to be called to calculate task result
            // after results of all jobs became available
            reduce { List<GridJobResult> results ->
                // Let us print IDs of all jobs to make sure
                // that we've calculated everything
                // Of course, here we are also able to access 'groupId' of the job
                println results*.job*.groupId.sort ()

                // result of task calculation
                taskResult
            }

            // defines callback to be called when task finished
            onTaskFinished { GridTaskFuture future ->
                println "Task completed"
            }
        }
    }

I will appreciate comments and advices from everybody – especially from people familiar with both Groovy and GridGain.

Advertisements

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: