GroovyGrid DSL poll: what is right way to implement branching control structures in Groovy

December 12, 2008

It is well-known that you can’t implement if/else type control structure in Java. Another useful example of such control structure is continuations. It is much less know that you can implement such kind of structure in Groovy. In this short article I will show several options how it can be done. Our use case will be the same: check point definition for GroovyGrid. The main quesion of the post is which option is better from readability point of view. I will appreciate any comments from Groovy user, which way do they prefer.

Usecase

GroovyGrid is Groovy DSK for GridGain framework. I develop GroovyGrid with idea to simplify coding of grid applications using Groovy instead of Java. Our use case for this article will be checkpoint definition.

So what is checkpoint? Checkpoint is possibility for extensive computing jobs can save their intermidiate state, so if job failed and was started by GridGain again it can continue not from scratch but using previously save state. In general logic of such job looks like following pseudo-code

Check if saved state exist
If it doesn't
     Execute first part of the job
     Save result of first part on checkpoint
Execute second part of job using either loaded or calculated result of first part

Now we are redy to start with implementation options

Option I: Closure after closure

We can use following syntax

checkPoint ("checkpoint") {
     // first part calculation
}
{ state ->
    // second part calculation
}

As you can see we put two closures one after another without any syntactical indication of connection between closures. Fortunately for us Groovy parser understand such construction as call of ‘checkpoint’ method with one parameter of type String and two parameters of type closure. So our implementation of ‘checkPoint’ method is pretty trivial (avoiding of course exception handling and such)

def checkPoint(String name, Closure before, Closure after) {
    Serializable state = gridTaskSession.loadCheckpoint(name)
    if (state == null) {
       state = before ()
       gridTaskSession.saveCheckpoint("checkpoint", state)
    }
    after(state)
}

What I like in this option is that it is extremly simple in implementation and very straight forward. Important to note here that if we want to use several checkpoints it is very easy to nest it.

checkPoint("state2") {
    checkPoint("state1") {
        // step1
     }
     { state1 ->
       // step2
     }
}
{ state2 ->
   // step3
}

Option II: Control object

What I don’t like in Option I is the fact that closure are not ‘visually’ connected in the code. So here are improved syntax

checkPoint ("checkpoint") {
    // first part calculation
}.andContinue { state ->
    // second part calculation
}

What we gain is ‘andContinue’, which probably improves readability. What we complicate is implementation – now ‘checkPoint’ method returns not value but control object to be called to complete the calculation

def checkPoint(String name, Closure before) {
    [
      andContinue : { Closure after ->
          Serializable state = gridTaskSession.loadCheckpoint(name)
          if (state == null) {
              state = before ()
              gridTaskSession.saveCheckpoint("checkpoint", state)
          }

          after (state)
      }
    ]
}

There is nice trick we can use to variate the same idea – use ‘rightShift’ of ‘rightShiftUnsigned’ name instead of ‘andContinue’ It allows us to write code like this (‘rightShiftUnsigned’ case)

checkPoint ("checkpoint") {
    // first part calculation
}  >>> { state ->
    // second part calculation
}

Again nesting is pretty simple

checkPoint("state2") {
    checkPoint("state1") {
        // step1
     } >>> { state1 ->
       // step2
     }
} >>> { state2 ->
   // step3
}

Option III: No after-brunch at all

Careful reader might notice for our particular use case special syntax for ‘after brunch’ is not necessary because we will always execute it. So we can simply write

def state = checkPoint ("checkpoint") {
    // first part calculation
}
// second part calculation

There main reason I consider other options here is because the article is more about methodology and not about particular use case. For example, in case if we want to execute first part asynchronously on grid or thread pool and and continue with second part only after first one completed (of course without waiting) the current option will not work at all.

Anyway for completeness here is implementation

def checkPoint(String name, Closure before) {
    Serializable state = gridTaskSession.loadCheckpoint(name)
    if (state == null) {
       state = before ()
       gridTaskSession.saveCheckpoint("checkpoint", state)
    }
    state
}

Main questions

  1. Which option do you prefer?
  2. What are other options?

Please let me know and Enjoy Groovy!

Advertisements

GroovyGrid: grid computing with Groovy and GridGain

December 9, 2008

Grid computing is complicated field. Even with perfect GridGain framework, which helps a lot in developing and testing distributed applications, it still require writing some auxilary Java code, which mostly serves framework itself and not application logic (like defining classes for tasks and jobs). Today I want to talk a bit about small Groovy DSL for GridGain, which I develop with purpose to simplify the story.

It is important to notice that GroovyGrid is still to be experimental project and one of the main purposes of this post is to collect opinions if it can be useful for other people.

The task we gonna solve is pretty standard – given directory (let us assume on shared file system) and we want to calculate how many times each character occurs in all Groovy and Java source files recursively. Solution of this problem should help us to see GroovyGrid in action.

For purpose of this post I wrote trivial Groovy class, which will help us to accumulate statistics. It is a bit lengthy but in reality everything it does is definition of several ‘leftShift’ methods, which allow us write something like statistic << data, where data can be list of files, or file, or String or char or even another statistic accumulator. You can easily skip following code.

class CharStatisticAccumulator extends HashMap {
    private CharStatisticAccumulator add (char ch, counter = 1) {
        if (!this[ch])
          this [ch]  = counter
        else
          this [ch] += counter
        this
    }

    CharStatisticAccumulator leftShift (char ch) {
        add ch
    }

    CharStatisticAccumulator leftShift (Map.Entry e) {
        add e.key, e.value
    }

    CharStatisticAccumulator leftShift (Collection coll) {
        coll.each {
            this << it
        }
        this
    }

    CharStatisticAccumulator leftShift (String line) {
        this << Arrays.asList(line.chars)
    }

    CharStatisticAccumulator leftShift (File file) {
        file.eachLine {
            this << it
        }
        this
    }

    CharStatisticAccumulator leftShift (CharStatisticAccumulator statistic) {
        this << statistic.entrySet ()
    }
}

Now we are back to our main task of calculation statistic. Standard map/reduce approach nicely implemented by GridGain advice us how to do it:
1) split files by groups

2) assign calculation of statistic for each group as job to different grid nodes

3) combine results calculated by all jobs in to final statistic.

Here is final code for calculation of our statistic. It is richly commented and I hope self-explaining. I am not trying to be lazy here but found that commented code probably the best way to demonstrate what’s going on.

    /**
     * Recursively calculates number of character usage in
     * .java and .groovy files of given directory
     */
    static def calculateCharStatistic (File dir) {

        // In GridGain each task consist of one or several jobs.
        // Two main operations of task
        // - map - which split task to jobs and assigned jobs to grid nodes
        // - reduce - which combains result of jobs to result of task
        //
        // In GroovyGrid body of closure solves both task.
        // It will be called in 'map' method  with both purposes
        // of creating jobs and define useful callbacks
        // like 'reduce', 'onJobResult' etc.
        GroovyGrid.mapReduce {

            // recursively collect all .java and .groovy files
            // it is usual Groovy and has nothing to do with grid computing
            def files = []
            dir.eachFileRecurse { File innerFile ->
                if (!innerFile.directory
                 && ( innerFile.name.endsWith(".java")
                   || innerFile.name.endsWith(".groovy"))) {
                    files << innerFile
                }
            }

            // group files in to groups by 64 files
            // again usual Groovy iteration/grouping
            int groupIndex = 0
            Map groups = files.groupBy { (groupIndex++) >> 6 }

            
            // iterate groups
            groups.each { groupId, groupFiles ->
                // define grid job for each group
                // It is important to note that closure below will be
                // called not immediately but later 
                // on some (possibly different) grid node.
                def job = job {
                    println "Job $groupId started"
                    CharStatisticAccumulator jobResult
                        = new CharStatisticAccumulator()
                    groupFiles.each { File jobFile ->
                        jobResult << jobFile
                        println "processed $jobFile.absolutePath"
                    }
                    println "Job $groupId ended"

                    // result of job execution
                    jobResult
                }

                // Usually it is not necessary to map jobs to grid nodes
                // manually but if needed it is possible to do
                // by assigning of 'gridNode' property of job
                // 'gridGrid' property of type Grid and
                // 'gridLoadBalancer' of type GridLoadBalancer
                // are injected in each task.
                //
                // In fact, this demostration is only purpose of 'job' variable -
                // you usually don't do it in normal life.
                if (groupId % 5 == 0)
                    job.gridNode = gridGrid.localNode
                else
                    job.gridNode = gridLoadBalancer.getBalancedNode (job)
            }

            // here we will collect statistic
            def taskResult = new CharStatisticAccumulator()

            // defines callback to be called when result
            // of a job becomes available
            onJobResult { GridJobResult jobResult
                      /*, List<GroovyJobResult> receivedBefore */ ->
                // Interesting to notice that we are able
                // to access 'groupId' of the job here
                println "Result of job $jobResult.job.groupId obtained"

                // normally GridGain process all partial results in 'reduce' method
                // It is possible with GroovyGrid as well but here
                // for demonstration purposes we do differently -
                // partial result added immediately in to result statistic
                taskResult << jobResult.data

                // as we don't need result of the job anymore we can forget it
                jobResult.data = null
            }

            // defines callback to be called to calculate task result
            // after results of all jobs became available
            reduce { List<GridJobResult> results ->
                // Let us print IDs of all jobs to make sure
                // that we've calculated everything
                // Of course, here we are also able to access 'groupId' of the job
                println results*.job*.groupId.sort ()

                // result of task calculation
                taskResult
            }

            // defines callback to be called when task finished
            onTaskFinished { GridTaskFuture future ->
                println "Task completed"
            }
        }
    }

I will appreciate comments and advices from everybody – especially from people familiar with both Groovy and GridGain.