Communication Between Lift Comets

Lift has a great implementation of comet model in which long-held HTTP requests lets the server to notify clients almost immediately that something happened. Under the hood, Lift comets are actors that lives outside of the HTTP request/response cycle. Even though comet-browser communication is trivial, the comet-to-comet (or comet-to-actor) communication can be a bit problematic, especially when two comets live in separate sessions. In this post, I would like to describe comet-to-comet and comet-to-backend communication patterns available in Lift and present a new approach for which we have just released a Lift module.

LiftSession

LiftSession lets to retrieve reference to comet or send a message to comet with one of these methods:

1
2
3
def findComet(theType: String)
def findComet(theType: String, name: Box[String])
def sendCometActorMessage(theType: String, name: Box[String], msg: Any)

There is one important limitation here – the comet you want to refer to must live in the same LiftSession that your calling code lives in.

If all you need to do is to send a message between two or more user’s comets, these methods may be sufficient. However, the most frequent scenario is that you want to send a message to all active comets interested in some activity no matter in which LiftSession they live in. As an example, let’s consider a web chat application. When user posts a message in a chat room, we want all other users in this room to see this message immediately. Polling database each second from all comets is less than optimal. What we need to have is a way to ping all active comets about the new message or, even better, send them a notification containing this message.

CometListener and ListenerManager

The Lift’s ListenerManager contains the sendListenersMessage(msg: Any) method that allows to send a message to all comets extending CometListener trait that registered for the given ListenerManager instance using registerWith method:

1
2
3
4
5
6
7
8
9
10
11
class CometListenerExample extends NamedCometActorTrait with CometListener {

  override protected def registerWith = ListenerManagerExample

  override def mediumPriority = {
    case NewChatMessage(msg) =>
      partialUpdate(...)
  }

  override def render: RenderOut = ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
object ListenerManagerExample extends ListenerManager with LiftActor {
  case object Tick

  // just a dummy placeholder - it's required by ListenerManager
  val createUpdate: Any = "nothing"

  LAPinger.schedule(this, Tick, 5 seconds)

  override def mediumPriority = {
    case Tick =>
      sendListenersMessage(NewChatMessage(...))
      LAPinger.schedule(this, Tick, 5 seconds)
  }
}

ListenerManagerExample will send a NewChatMessage to all CometListenerExample instances each 5 seconds. The createUpdate method is required by ListenerManager trait – it’s used to create the first message that a newly subscribed CometListener will receive. In the example above it’s just a dummy, unhandled message. It’s not important in which session, the comet instance lives in. All comets registered in ListenerManagerExample will receive a message.

NamedCometListener

In case of NamedCometListener, the comet receiving messages does not have to extend any trait. NamedCometListener lets to retrieve a message dispatcher for the given comet name via getDispatcherFor method:

1
2
3
4
5
6
7
class NamedCometListenerExample extends CometActor {

  override def mediumPriority = {
    case NewChatMessage(msg) =>
      partialUpdate(...)
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object NamedCometListenerExample extends LiftActor {
  case object Tick

  LAPinger.schedule(this, Tick, 2 seconds)

  override def mediumPriority = {
    case Tick =>
      val message = NewChatMessage(...)
      NamedCometListener.getDispatchersFor(Full("chat-comet")).map { dispatcher =>
        dispatcher.map(_ ! message)
      }

      LAPinger.schedule(this, Tick, 2 seconds)
  }
}

When you insert comet to a page, you may provide it’s name. For example:

1
<lift:comet type="ChatComet" name="chat-comet">

or you use NamedCometActorSnippet for that:

1
2
3
4
5
class Chat extends NamedCometActorSnippet {
  val cometClass = "ChatComet"

  val name = "chat-comet"
}

NamedCometListener retrieves message dispatchers for all comets (from all users’ sessions) with the given name. This approach may be a bit more handy than CometListener+ListenerManager when you need to send a message directly between two comets.

MessageBus

MessageBus is a new approach that has been released as a Lift module. You can include it in your project by adding it as a dependency. For example, in SBT:

1
"net.liftmodules" %% "messagebus_3.0" % "1.0" % "compile"

MessageBus facilitates communication between any two or more LiftActors (under the hood, LiftCometActors are LiftActors).

Actors subscribe to Topics which are abstractions allowing to specify in which type of messages the given actor is interested in. Topic is a trait with just one method def name: String. Example Topic implementation looks as follows:

1
case class ChatRoomTopic(val name: String) extends Topic

In order to subscribe actor to the given topic, you need to send a Subscribe message. On a flip side, actor unsubscribes from topic sending an Unsubscribe message. In case of comets, this will be ususally done in localStartup and localShutdown method.

Actors should listen for messages from MessageBus in the same way as they listen for any other type of message. That is, for example, messageHandler method for LiftActors and high/low/mediumPriority methods for CometActors:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MessageBusExample extends CometActor {

  override def localSetup = {
    super.localSetup
    MessageBus ! Subscribe(this, ChatRoomTopic(...))
  }

  override def localShutdown = {
    super.localShutdown
    MessageBus ! Unsubscribe(this, ChatRoomTopic(...))
  }

  override def mediumPriority = {
    case NewChatMessage(msg) =>
      partialUpdate(...)
  }
}

There are two ways to send a message using MessageBus: For and ForAll. The payload of For message will be delivered by MessageBus to all LiftActors that subscribed to the given Topic (we look for their equality). The payload of ForAll message will be delivered by MessageBus to all LiftActors that subscribed to the given Topic type.

For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
object MessageBusExample extends LiftActor {
  case object Tick

  LAPinger.schedule(this, Tick, 2 seconds)

  def handleUserLeftRoom: PartialFunction[Any, Unit] = {
    case Tick =>
      MessageBus ! For(ChatRoomTopic("chat-comet-europe"), NewChatMessage(...)) // first message
      MessageBus ! For(ChatRoomTopic("chat-comet-north-america"), NewChatMessage(...)) // second message
      MessageBus ! ForAll[ChatRoomTopic](NewChatMessage(...)) // third message

      LAPinger.schedule(this, Tick, 2 seconds)
  }
}
  • The first message will be delivered to all LiftActors that subscribed to ChatRoomTopic("chat-comet-europe"),
  • The second message will be delivered to all LiftActors that subscribed to ChatRoomTopic("chat-comet-north-america"),
  • The third message will be delivered to all LiftActors that subscribed to any instance of ChatRoomTopic type. That is, actors that subscribed to ChatRoomTopic("chat-comet-europe") and actors that subscribed to ChatRoomTopic("chat-comet-north-america") will receive this message.

As always, an example application using all inter-session communication mechanisms described here is available on GitHub: https://github.com/pdyraga/lift-samples/tree/master/comet-messages

You can find more information about MessageBus module in its repository: https://github.com/pdyraga/lift-message-bus

Special thanks goes to Antonio Salazar Cardozo who significantly contributed to the idea and code.

Improved Radio Buttons for Lift

Sometimes, in programmer’s work even the simplest things can lead to unexpected issues. The perfect example are radio buttons in Lift. What can go wrong with such a fundamental HTML type ? There are two complaints that are often deal breakers.

First of all, both SHtml.radio and SHtml.radioElem replace entire element removing all attributes one might select on for styling and breaking the accessibility of labels. Secondly, they wrap radios in span elements and add hidden field to markup what breaks the + CSS selector.

Let’s see an example.

When I work with radio buttons I always define them in HTML as follows:

1
2
3
4
5
6
7
8
9
10
11
<form>
  <div id="country-selection">
    <input id="poland" type="radio">
    <label for=poland">Poland</label>
    <input id="germany" type="radio">
    <label for="germany">Germany</label>
    <input id="france" type="radio">
    <label for="france">France</label>
  </div>
  <button id="submit" type="submit">Submit</button>
</form>

The input and label combination makes radio buttons more convenient for application users. Both label and checkbox are clickable as opposite to the following HTML template where only checkbox itself is clickable:

1
2
3
4
5
6
7
8
<form>
  <div id="country-selection">
    <input id="poland" type="radio"> Poland
    <input id="germany" type="radio"> Germany
    <input id="france" type="radio"> France
  </div>
  <button id="submit" type="submit">Submit</button>
</form>

So let’s take the HTML template with input+label combination and use Lift’s SHtml.radioElem to bind country selection radios. There are a couple of ways to achieve it. First one is to bind SHtml.radioElem to the parent container of radios:

1
2
3
4
5
6
7
8
9
10
<form>
  <div id="country-selection">
    <input id="poland" type="radio">
    <label for="poland">Poland</label>
    <input id="germany" type="radio">
    <label for="germany">Germany</label>
    <input id="france" type="radio">
    <label for="france">France</label>
  </div>
</form>
1
2
3
4
"#country-selection"  #> SHtml.radioElem[Country](
  Seq(Poland, Germany, France),
  countrySelected
)(countrySelected = _).toForm &

This is what Lift produces:

1
2
3
4
5
6
7
8
9
10
11
12
<form>
  <span>
    <input type="radio" value="F1323037495100XKESH2" name="F13230374951042C4KF3" checked="checked">
    <input type="hidden" name="F13230374951042C4KF3" value="F1323037495103V3RMIR">&nbsp;Poland<br>
  </span>
  <span>
    <input type="radio" value="F13230374951012PM4PR" name="F13230374951042C4KF3">&nbsp;Germany<br>
  </span>
  <span>
    <input type="radio" value="F1323037495102KQBHXP" name="F13230374951042C4KF3">&nbsp;France<br>
  </span>
</form>

As you can see the original template is completely gone. Actually, entire country-selection div is gone. It has been replaced by completely other piece of HTML which, as it was already mentioned, wraps radio buttons in span elements and adds hidden field.

Another possibility is to have a single radio element in template and bind SHtml.radioElem to it:

1
2
3
4
5
<form>
  <div id="country-selection">
    <input class="country-selection-radio" type="radio">
  </div>
</form>
1
2
3
4
".country-selection-radio"  #> SHtml.radioElem[Country](
  Seq(Poland, Germany, France),
  countrySelected
)(countrySelected = _).toForm &

This will leave out country-selection div in template, but the inner HTML will be replaced completely just like in the previous example. All radio element attributes including ID and CSS classes (if defined) are gone. What we get is completely new HTML again, which in this particular example (only one radio in template) is quite obvious:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<form>
  <div id="country-selection">
    <span>
      <input type="radio" value="F56827428863CMAAI" name="F5682742890YJ54AH" checked="checked">
      <input type="hidden" name="F5682742890YJ54AH" value="F5682742889GT4SHD">&nbsp;Poland<br>
    </span>
    <span>
      <input type="radio" value="F5682742887D3ECFH" name="F5682742890YJ54AH">&nbsp;Germany<br>
    </span>
    <span>
      <input type="radio" value="F56827428884PVURO" name="F5682742890YJ54AH">&nbsp;France<br>
    </span>
  </div>
</form>

There is a third option letting us save most of our HTML structure:

1
2
3
4
5
6
7
8
9
10
<form>
  <div id="country-selection">
    <input id="poland" type="radio">
    <label for="poland">Poland</label>
    <input id="germany" type="radio">
    <label for="germany">Germany</label>
    <input id="france" type="radio">
    <label for="france">France</label>
  </div>
</form>
1
2
3
4
5
6
7
8
9
10
"#country-selection" #> {
  val radios = SHtml.radioElem[Country](
    Seq(Poland, Germany, France),
    countrySelected
  )(countrySelected = _)

  "#poland" #> radios(0) &
  "#germany" #> radios(1) &
  "#france" #> radios(2)
}

This approach let us leave HTML template almost in the original form:

1
2
3
4
5
6
7
8
9
10
11
<form>
  <div id="country-selection">
    <input type="radio" value="F931933611488EWEJCC" name="F931933611492KN41Q3" checked="checked">
    <input type="hidden" name="F931933611492KN41Q3" value="F931933611491OF4M2T">
    <label for="poland">Poland</label>
    <input type="radio" name="F931933611492KN41Q3" value="F931933611489DYCOLU" id="germany">
    <label for="germany">Germany</label>
    <input type="radio" name="F931933611492KN41Q3" value="F931933611490D4XWWP" id="france">
    <label for="france">France</label>
  </div>
</form>

There are still two problems: hidden field is added to the first radio (it breaks + rule, if defined) and element ID is gone for the first radio. In order to bring ID back, we’d need to do some manual XHTML manipulation tricks in snippet.

We can still do better but we need to forget about SHtml.radio and SHtml.radioElem.

Here is the code we came up with Antonio Salazar Cardozo from Elemica. Thanks to Matt Farmer it will be included in Lift 3 as radioCssSel and maybe even at some point it will replace radioElem completely.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.ontheserverside.snippet

import net.liftweb.common.Box
import net.liftweb.http.S
import net.liftweb.util.CssSel
import net.liftweb.util.Helpers._

object Radio {

  /**
   * @param initialValue initial value or Empty if there should be no initial value set
   * @param onSubmit function to execute on form submission
   * @param cssSelToValue mapping between CSS selectors of radio input nodes and values assigned to them
   */
  def radioElem[T](initialValue: Box[T], onSubmit: Box[T] => Any)(cssSelToValue: (String, T)*): CssSel = {
    val radioOptions = cssSelToValue.map(_._2 -> nextFuncName).toMap

    def selectionHandler(selection: String) = {
      onSubmit(radioOptions.find(_._2 == selection).map(_._1))
    }

    S.fmapFunc(selectionHandler _)(funcName => {
      cssSelToValue.map { case (cssSel, value) =>
        s"$cssSel [name]" #> funcName &
        s"$cssSel [value]" #> radioOptions(value) &
        s"$cssSel [checked]" #> {
          if (initialValue === value)
            Some("true")
          else
            None
        }
      }.reduceLeft(_ & _)
    })
  }
}

Let’s use this code to bind radios from our original template:

1
2
3
4
5
6
7
8
9
10
<form>
  <div id="country-selection">
    <input id="poland" type="radio">
    <label for="poland">Poland</label>
    <input id="germany" type="radio">
    <label for="germany">Germany</label>
    <input id="france" type="radio">
    <label for="france">France</label>
  </div>
</form>
1
2
3
4
5
6
7
8
"#country-selection" #> Radio.radioElem[Country](
  countrySelected,
  countrySelected = _
)(
  "#poland" -> Poland,
  "#germany" -> Germany,
  "#france" -> France
)

This is what Lift produces:

1
2
3
4
5
6
7
8
9
10
<form>
  <div id="country-selection">
    <input checked="true" name="F641774276613OHEWTW" value="F6417742766101VCFW1" type="radio" id="poland">
    <label for="poland">Poland</label>
    <input name="F641774276613OHEWTW" value="F641774276611INMSWO" type="radio" id="germany">
    <label for="germany">Germany</label>
    <input name="F641774276613OHEWTW" value="F641774276612BJKPPL" type="radio" id="france">
    <label for="france">France</label>
  </div>
</form>

As you can see, HTML structure is the same as in the original template and all element attributes (IDs in this example) remain unchanged.

I strongly recommend using this approach in your projects. It lets to leave HTML structure unchanged and facilitates Lift’s “view first” development approach.

I’ve put sample project on GitHub comparing SHtml.radioElem and Radio.radioElem bindings: https://github.com/pdyraga/lift-samples/tree/master/radio-buttons

Untangling Spring Batch Flow Control

Spring Batch provides a neat mechanism for job execution flow control. At first glance nothing really complicated – just couple configuration instructions controlling step transitions basis on execution result of previous step. Nevertheless, I often find developers confusing some basic concepts and making mistakes that may lead to serious problems, especially in job recovery.

In this post, I’d like to untangle the most common misunderstandings about Spring Batch flow control that I observed.

Before going any further, we must realise the difference between BatchStatus and ExitStatus. To make a long story short, BatchStatus represents status of a step or job from a “technical” side. It’s an enumeration so it has a fixed number of possible values: ABANDONED, COMPLETED, FAILED, STARTED, STARTING, STOPPED, STOPPING and UNKNOWN. Depending on BatchStatus, framework may or may not let you restart the failed job. ExitStatus represents business logic outcome. It’s a class, thus, its value can be customised. It comes with a predefined number of possible values (COMPLETED, EXECUTING, FAILED, NOOP, STOPPED, UNKNOWN) but it allows you to provide your custom status value depending on business logic needs.

When we define conditional flow, the on attribute refers to the ExitStatus. BatchStatus is automatically determined by framework basis on ExitStatus or determined basis on conditional flow settings. For instance, the following instruction will set BatchStatus to FAILED when step ends with ExitStatus COMPLETED_WITH_ERRORS:

1
<batch:fail on="COMPLETED_WITH_ERRORS" />

Order of transitions defined in XML does not matter

The following transition:

1
2
3
<batch:next on="COMPLETED" to="onCompletedStep" />
<batch:next on="COMPLETED_WITH_ERRORS to="onCompletedWithErrorsStep" />
<batch:fail on="FAILED" />

is exactly the same as:

1
2
3
<batch:fail on="FAILED" />
<batch:next on="COMPLETED" to="onCompletedStep" />
<batch:next on="COMPLETED_WITH_ERRORS to="onCompletedWithErrorsStep" />

Also, the following transition:

1
2
<batch:next on="COMPLETED" to="nextStep" />
<batch:fail on="*" />

is the same as:

1
2
<batch:fail on="*" />
<batch:next on="COMPLETED" to="nextStep" />

Spring Batch sorts state transition definitions by decreasing specificity of pattern counting wildcards. If wildcard counts are equal, then it falls back to lexicographic order. This way, COMPLETED or FAILED transitions are always checked before the * (asterix) transition.

The good practice is to keep the most specific patterns on top, but violating this rule has no impact on processing flow.

There is no need to cover all available exit statuses

When defining step transitions, it’s not necessary to cover all available ExitStatus values. For instance, if your step can be either COMPLETED or FAILED, you don’t have to define wildcard (*) transition for it. Spring Batch reference guide says that “all step’s transitions must be defined explicitly” but it doesn’t mean you need to cover all available ExitStatus values. Define only those values that are really possible for your step. For example, in case of previously mentioned step with two possible exit codes: COMPLETED and FAILED it is enough to define:

1
2
<batch:next on="COMPLETED" to="onCompletedStep" />
<batch:fail on="FAILED" />

One thing important to bear in mind, is that if your step ends up with ExitStatus for which there is no transition defined, Spring throws JobExecutionException: Flow execution ended unexpectedly caused by FlowExecutionException: Next state not found and automatically marks step as FAILED. That’s why it’s a good practice to put

1
<batch:fail on="FAILED" />

transition for each step that should be recoverable and which execution can result in an exception. With this statement, step fails with exception being the actual reason of its failure, instead of failing because of FlowExecutionException: Next state not found in case of exception occurred.

Job should fail at the exact point of failure

If some undesirable behaviour is detected in one of job’s steps, for example IOException is thrown when trying to rename some file, that particular step should be failed. The rule of thumb is to fail the step which has actually failed its execution, instead of routing flow to some common jobHasFailedLogAndAlertSomeAdult step and failing it there. To better illustrate this situation please review examples below.

The following example presents an ANTI-PATTERN that should be avoided – such construction is invalid and prevents job from being correctly recovered:

1
2
3
4
5
6
7
8
9
10
<batch:step id="myTestStep">
  <batch:tasklet ref="myTasklet" />
  <batch:next on="COMPLETED" to="someAnotherStep" />
  <batch:next on="FAILED" to="jobFailedStep" />
</batch:step>

<batch:step id="jobFailedStep">
  <batch:tasklet ref="callAnAdult" />
  <batch:fail on="*" />
</batch:step>

If myTasklet throws an exception, Spring Batch routes processing to jobFailedStep that is calling some tasklet and failing job. Why this is an anti-pattern? When job recovery is triggered, it iterates through job steps and re-run the first step that has failed. In this particular case it would be jobFailedStep instead of myTestStep so there would be no chance for myTasklet to try to execute again. Instead of giving myTestStep a chance to execute once again, job is failing in jobFailedStep straight after recovery has been run (this is the first step that has an ExitStatus.FAILED). Every time job recovery is run on this job, the same situation happens – there is no chance to actually recover.

What should be done instead, is to fail a job at the exact point of failure. If myTasklet throws an exception, this is the myTestStep that should be failed:

1
2
3
4
5
<batch:step id="myTestStep">
  <batch:tasklet ref="myTasklet"/>
  <batch:next on="COMPLETED" to="someAnotherStep" />
  <batch:fail on="FAILED" />
</batch:step>

This way, when job recovery starts, it picks up myTestStep and executes tasklet once again giving it a chance to complete successfully this time.

If step failed its execution, fail job instead of ending it

When step execution failed for some reason and you want to make job eligible for recovery you need to use <batch:fail> instead of <batch:end>. The former sets BatchStatus as FAILED, thus, makes job eligible for recovery. The latter, sets BatchStatus as COMPLETED and job cannot be started once again. If you try to do so, JobInstanceAlreadyCompleteException will be thrown.

For example, the following transition ends job with ExitStatus.FAILED but it does NOT make it recoverable (its BatchStatus is set to COMPLETED):

1
2
3
4
5
<batch:step id="myTestStep">
  <batch:tasklet ref="myTasklet"/>
  <batch:next on="COMPLETED" to="onCompletedStep" />
  <batch:end on="FAILED" exit-code="FAILED" />
</batch:step>

To make job recoverable at this step, it is required to fail processing instead of ending it. The following transition sets both BatchStatus and ExitStatus to FAILED.

1
2
3
4
5
<batch:step id="myTestStep">
  <batch:tasklet ref="myTasklet"/>
  <batch:next on="COMPLETED" to="onCompletedStep" />
  <batch:fail on="FAILED" />
</batch:step>

Binding Futures in Lift 2

In the Lift project I’m currently working on, we have a long-running background task which outcomes need to be displayed on screen once computed. I would like to describe a simple enhancement for Lift bindings that allows to bind Future[T] using CSS selectors the same way as any other simple type.

The traditional request-response lifecycle model assumes, that the client side is responsible for initiating the interaction sending HTTP request. Client’s request is processed by server that prepares and sends back HTTP response. At this point interaction is over, unless client sends another request that starts completely new request/response lifecycle.

In case of Future[T], this traditional model is not enough. It’s the server side that is aware when Future evaluation is completed so, theoretically, the server side should initiate the interaction. This behaviour can be simulated using Comet. The Comet model is based on long-running HTTP request that allows server to push data to the client. When client receives data from server, new long-running HTTP request is triggered. This way, we can simulate behaviour that server side is capable of initiating the interaction and may send notifications to client at any time.

Using Comet is sometimes not desirable, though. In my case, there is a very simple Lift snippet with only some part of DOM lazy loading. This lazy-loading part is computed by a relatively long-running background task (comparing to the page load time). Once computed, information is never updated, so only one “server push” is required.

Because I didn’t want to use Comet, I had to come up with my own solution. What I wanted to achieve was the possibility to bind Futures and LAFutures (Lift wrapper for Future) using CSS selectors just like in case of simple types:

1
2
"#scala-future *" #> Future { Thread.sleep(5000); "hello" } &
"#lift-lafuture *" #> LAFuture.build { Thread.sleep(6000); "world" }

Important requirement was that the code containing Future bindings might be rendered using IdMemoize transform. It means that there was absolutely no guarantee, that Futures were bound to static DOM elements visible at the first render time and never updated. Part of DOM containing nodes to which Futures were bound could be re-rendered at any time and after each such operation, Lift should attempt to resolve these Futures.

The code below is all you need to make it happen – the only thing required to do is to import FutureBinds._ into your snippet and you are free to bind Futures using CSS selectors just like in the example code above.

FutureBinds.scalaGitHub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.ontheserverside.lib

import net.liftweb.actor.LAFuture
import net.liftweb.http.SHtml
import net.liftweb.http.js.JsCmd
import net.liftweb.http.js.JsCmds.{After, OnLoad, Replace, Script}
import net.liftweb.util.Helpers._
import net.liftweb.util._

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.xml.NodeSeq

object FutureBinds {

  private def futureTransform[FutureType, T](
    innerTransform: CanBind[T],
    futureCompleted_? : (FutureType) => Boolean,
    resolveFuture: (FutureType) => T
  ): CanBind[FutureType] = new CanBind[FutureType] {

    def apply(future: => FutureType)(ns: NodeSeq): Seq[NodeSeq] = {

      List(BindHelpers.findOrCreateId { id =>
        val concreteFuture = future
        lazy val updateFunc = SHtml.ajaxInvoke(() => resolveAndUpdate).exp.cmd

        def resolveAndUpdate: JsCmd  = {
          if (futureCompleted_?(concreteFuture)) {
            Replace(id, innerTransform(resolveFuture(concreteFuture))(ns).flatten)
          } else {
            After(1 seconds, updateFunc)
          }
        }

        _ => <div id={id} class="loading"><img src="/images/ajax-loader.gif" alt="Loading"/></div> ++ Script(OnLoad(updateFunc))
      }(ns))
    }
  }

  implicit def futureTransform[T](implicit innerTransform: CanBind[T], executionContext: ExecutionContext): CanBind[Future[T]] = {
    futureTransform[Future[T],T](innerTransform, (future) => future.isCompleted, (future) => Await.result(future, Duration.Inf))
  }

  implicit def lafutureTransform[T](implicit innerTransform: CanBind[T]): CanBind[LAFuture[T]] = {
    futureTransform[LAFuture[T],T](innerTransform, (future) => future.complete_?, (future) => future.get)
  }
}

Each one second, client asks server “has Future completed?”. If so, server tells client to update its DOM with Future evaluation result. Otherwise, server says “Not yet. Please ask me again later”.

Alternative approach for binding Futures has been presented by Antonio in his lift-future-canbind-example on GitHub. It’s very clever idea that takes advantage of Lift’s <lift:lazy-load> mechanism. The FutureBinds code in that example looks much simpler but there is one important limitation. Under the hood, lazy load uses Lift’s comet and comets can’t be sent down to the client via AJAX at the moment. That’s just framework limitation. So even if we agree on using comet to handle Future bindings internally, we will not be able do render code containing Future bindings dynamically with IdMemoize.

Sample application with my FutureBinds implementation is available on GitHub.

Horizontal and Vertical Scaling Strategies for Batch Applications

In this article I would like to describe possible strategies for horizontal and vertical scaling of batch applications. Code samples presented here are based on the most popular batch processing framework – Spring Batch, but all strategies are conceptually compatible with JSR-352 specification (Batch Applications for the Java Platform).

All of the code snippets come from the sample bank transaction processing application performing payment sanctions screening that was developed for the sake of this article. Also, I assume that reader has at least intermediate knowledge of batch processing concepts.

Problem definition – the sample application

For the better illustration of concepts described in this article, I created a sample bank transaction processing application that is available on GitHub. As already mentioned, all code samples come from this application, so let me first give some brief overview of problem that it solves.

Financial institutions are often seen as a front line in fight against terrorist organizations and money laundering. Banks are obligated to have adequate systems and controls in place to ensure international sanctions compliance. Payment sanction screening algorithms may be very complex to implement and penalties for failure to comply can be severe. They are a number of different bodies that impose financial sanctions – European Union, United Nations and local governments. As a result, payments are usually screened against more than 30 public sanction lists.

Application developed for the sake of this article is responsible for processing Elixir0 messages. Elixir0 is a country-domestic Polish bank message format that represents standard credit transfer transactions. All payments contained by Elixir0 are screened against OFAC’s Specially Designated Nationals List. Screening algorithm performs fuzzy matching of beneficiary and ordering party name, address and alternate name against every SDN entity. If algorithm detects that sanctions might be imposed on ordering or receiving party, placement of the particular transaction is suspended and waits for manual approval.

Please bear in mind that the implementation of payment sanction screening mechanism used in the sample application is not the most optimal and performant algorithm for this task. Also, it should not be considered as a complete solution.

The entire transaction import process consist of three steps:

  1. Load transactions from Elixir0 message into the database,
  2. Perform fuzzy string matching of name, address and alternate name of beneficiary and ordering party against all registered SDN entities. In case of similarity metric high enough, potential sanction match is registered,
  3. Accept transactions without potential SDN matches registered, suspend transactions that have at least one SDN match registered.

Unsurprisingly, the second step is the most costly one and scaling strategies described in this article are explained on this step.

Source code of the sample application can be found at:
https://github.com/pdyraga/spring-batch-samples/

Detailed information on how to compile and run it is available at the README file.

Overview of available scaling strategies

There are three strategies available for parallel processing: split flows, chunking and partitioning. Two of them – chunking and partitioning can implement both horizontal and vertical scaling. Chunking and partitioning strategies implementing horizontal scaling are usually called “remote chunking” and “remote partitioning”. The most important difference between them and their vertical equivalents is that the work to be done is delegated to remote nodes instead of delegating it to threads within the same virtual machine.

There are no additional library dependencies needed for split flows, local chunking and partitioning – spring-batch-core is perfectly enough. However, in case of remote chunking and partitioning, additional spring-batch-integration module dependency is required. What’s more, this module has a transitive dependency to spring-integration-core, so you need to be very careful if you already have it on your dependency list – some compatibility issues may arise.

Local chunking

JSR-352 specifies chunk oriented processing style as its primary pattern. It plays very important role in Spring Batch as well. Every non-tasklet step usually consist of three components: item reader, item processor and item writer. Unsurprisingly, item reader is responsible for reading data from various sources and delivering them to item processor. Item processor does all the work on item that is necessary and passes processed data to item writer that is responsible for writing it out. Because creation and commit of transaction is generally expensive, it is expected that transaction handles some number of items grouped in chunks, instead of handling each one of them separately. In 1.x versions of Spring Batch, the item writer was expected to do some kind of buffering of items and step implementation decided when to flush the buffer basis on commit-interval property. This approach reflects item-oriented strategy rather than chunk-oriented strategy. The entire step processing was organized around the item being processed. ItemReader and ItemWriter interfaces were quite complicated because they were supporting transaction rollbacks by clearing the writer buffer, resetting reader to the previously marked position and processing the failed chunk once again, now with only one item in chunk (to detect and handle problematic item depending on skip/retry strategy). Spring Batch 2.0 brought new approach to this problem named chunk oriented processing. All methods used to control the underlying source (mark, reset, flush, clear) have been removed from ItemReader and ItemWriter interfaces. ItemReader works in forward-only mode, ItemWriter accepts entire chunk of items (instead of items one by one) and it is responsibility of framework to buffer list to be written.

Difference between item-oriented and chunk-oriented approach is very well explained on the images below.

In the item-oriented approach, once item has been successfully read and processed, it is immediately passed to writer. It is writer’s responsibility to buffer the items before writing them out. This way writer avoids creating and committing transaction for each single item:

Item oriented processing

In case of chunk-oriented approach, writer accepts entire data chunk that was prepared by framework. Item writer creates and commits transaction for each chunk it has received. Writer no longer needs to buffer list of items internally – it is now responsibility of code controlling the step to prepare the chunk: Chunk oriented processing

Chunk processing step can be vertically scaled by letting each item to be read, processed and written in a separate thread of execution. All step components are accessed by multiple threads concurrently:

Local chunking

Single-thread chunk processing step can be converted into multithreaded by adding reference to multithreaded TaskExecutor to the step definition. The number of threads operating within step can be controlled using throttle-limit attribute.

Vertically scaled chunk-oriented step example is available in elixir0ImportJob.xml file under paymentSanctionScreeningStep.chunking identifier:

elixir0ImportJob.xmlGitHub start:26
1
2
3
4
5
6
7
8
9
10
11
<step id="paymentSanctionScreeningStep.chunking">
  <tasklet task-executor="executor" throttle-limit="8">
    <chunk reader="paymentSanctionScreeningReader.chunking" processor="paymentSanctionScreeningProcessor"
           writer="hibernateItemWriter" commit-interval="5"/>
  </tasklet>
  <next on="COMPLETED" to="updateScreenedTransactionStatusStep" />
  <fail on="FAILED" />
  <listeners>
    <listener ref="paymentSanctionScreeningRecoveryCleaner" />
  </listeners>
</step>

Task executor bean is defined in the applicationContext.xml:

applicationContext.xmlGitHub start:24
1
<task:executor id="executor" pool-size="50" />

The configuration above makes maximum 8 threads (see throttle-limit attribute) to operate on step’s reader, processor and writer simultaneously. All these threads are pooled by the task executor.

Because reader, processor and writer components are accessed concurrently, it may be not enough to add reference to task executor to make step successfully process items in multiple threads. Many ItemReader and ItemWriter implementations are stateful and they are not safe for multithread usage.

One example of state that is often internally hold by item reader is count of items that have been read – see org.springframework.batch.item.support.AbstractItemCountingItemStreamReader and its subclasses. Number of items that have been read is stored in the execution context in order to restart the step from exactly the same execution position (the same item) it was before the failure. Also, all implementations of org.springframework.batch.item.ItemStream interface are potentially vulnerable for concurrency issues – the same stream can be opened or updated from multiple threads. Reader should be protected against such situations. Otherwise, you can for example end up with OptimisticLockingFailureException.

The first problem (regarding restartability of step in case of failure) can be solved by disabling reader’s state persistence by setting ItemReader’s saveState property to false. Of course, it doesn’t mean that we have to resign from restartability completely. There are two patterns that can help to properly recover the failed step, without storing read items count by reader. The first one involves complete cleanup of stale date when re-entering step and the second one involves adding special column that works as a kind of marker indicating if particular entry has been processed or not. Both these strategies deserve separate article – for now, let’s assume that clean up of stale data fits better to the step we are currently discussing.

Reader used by step using chunking scaling strategy is defined as follows:

elixir0ImportJob.xmlGitHub start:100
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<bean id="paymentSanctionScreeningReader.chunking" class="com.ontheserverside.batch.bank.processing.SynchronizedItemReaderDecorator">
  <constructor-arg>
    <bean class="org.springframework.batch.item.database.HibernateCursorItemReader">
      <property name="sessionFactory" ref="sessionFactory" />
      <property name="useStatelessSession" value="false" />
      <property name="saveState" value="false" />
      <property name="queryString">
        <value>
          SELECT NEW com.ontheserverside.batch.bank.screening.SanctionScreeningContext (tx, entity)
          FROM Elixir0Transaction tx, SDNEntity entity
          WHERE tx.status = :txStatus
          ORDER BY tx.id
        </value>
      </property>
      <property name="parameterValues">
        <map>
          <entry key="txStatus" value="#{T(com.ontheserverside.batch.bank.tx.TransactionStatus).LOADED}"/>
        </map>
      </property>
    </bean>
  </constructor-arg>
</bean>

First of all, please notice that reader does not save its internal state – saveState property is set to false. In case of any failure, step will be restarted from the very beginning, so no items that should be processed can be lost. The only problem here is to make sure that step outcomes will not get duplicated when step gets restarted. For instance, if the same transaction was processed twice, two identical SanctionMatch objects would be created (SanctionMatch says that the particular transaction is suspicious, because some entity on the SDN list may be involved in it – see SanctionScreeningProcessor class). This problem is solved by before-step execution lister:

elixir0ImportJob.xmlGitHub start:33
1
2
3
<listeners>
  <listener ref="paymentSanctionScreeningRecoveryCleaner" />
</listeners>

that cleans up all stale step outcomes (all SanctionMatch objects for Elixir0Transactions with LOADED status). Of course, at this point we have to assume that there is only one import process running at the given time.

Please bear in mind, that StepExecutionListener is executed outside of chunk’s transaction. In fact, there is no transaction available in StepExecutionListener if not explicitly declared. That’s why SanctionScreeningRecoveryCleaner listener has its beforeStep method marked as @Transactional.

Regarding concurrency problems when accessing ItemReader from multiple threads, all methods from ItemReader and ItemStream interface should be synchronized. This is done by SynchronizedItemReaderDecorator:

SynchronizedItemReaderDecorator.javaGitHub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.ontheserverside.batch.bank.processing;

import org.springframework.batch.item.*;

public final class SynchronizedItemReaderDecorator<T> implements ItemStream, ItemReader<T> {

    private final ItemReader<T> delegate;

    public SynchronizedItemReaderDecorator(ItemReader<T> delegate) {
        this.delegate = delegate;
    }

    @Override
    public synchronized T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return delegate.read();
    }

    @Override
    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).open(executionContext);
        }
    }

    @Override
    public synchronized void update(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).update(executionContext);
        }
    }

    @Override
    public synchronized void close() throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).close();
        }
    }
}

To sum up, some key facts to remember about multithreaded chunk processing:

  • single-thread chunk processing step can be converted to multithreaded by adding reference to the multithread task executor
  • most of ItemReader implementations are not safe for multithreaded usage – you need to manually synchronize them
  • keep in mind, that allowing multiple threads to operate simultaneously has serious impact on the step restartability.

Local partitioning

In case of partitioning scaling strategy, framework creates identical copies of step, including all its components (reader, processor and writer). Items that are going to be processed are divided into partitions and assigned to step’s clones depending on user-defined strategy. Usually, each step copy has one thread operating on it. It is quite reasonable approach and it is rarely required to increase number of threads operating on single partition (bear in mind, that if there are 8 copies of step, the same number of threads can operate simultaneously).

Local partitioning

Step that is based on partitioning strategy is defined as follows:

elixir0ImportJob.xmlGitHub start:38
1
2
3
4
5
6
7
8
9
10
11
12
13
<step id="paymentSanctionScreeningStep.partitioning">
  <partition partitioner="moduloPartitioner">
    <step>
      <tasklet throttle-limit="1">
        <chunk reader="paymentSanctionScreeningReader.partitioning" processor="paymentSanctionScreeningProcessor"
               writer="hibernateItemWriter" commit-interval="5"/>
      </tasklet>
    </step>
    <handler task-executor="executor" grid-size="8" />
  </partition>
  <next on="COMPLETED" to="updateScreenedTransactionStatusStep" />
  <fail on="FAILED" />
</step>

The first thing to note here is that partition consist of step definition and the partition handler. Because this is local partitioning step, each partition will be handled by separate thread of execution within the same virtual machine. That’s why handler has a reference to the same multithreaded task executor as in case of previously discussed local chunking-based step. The number of step copies (and number of threads that will operate on them) is controlled by grid-size property on partition handler. Because, as already stated, we usually want to have one thread operating per partition (no multithreaded chunking within partition), throttle-limit attribute of tasklet is set to 1.

Please note, that there is no before-step execution listener defined here as it was in case of chunk processing step where before-step listener was performing stale data cleanup. Also, paymentSanctionScreeningReader.partitioning item reader is configured to remember number of items read. That’s because, there is only one thread operating per partition (throttle-limit=1) and there is a separate instance of ItemReader per partition, so number of items that have been read can be safely stored. Moreover, framework remembers number of partitions created and which one of them has failed. When step is being recovered, only failed partitions are restarted. What’s more, recovered partitions always receive the same input parameters, so if reader remembers number of items that have been successfully processed, these items are not read and processed again. The crucial thing to make it work properly is to read items always in the same order and this is achieved using ORDER BY tx.id query modifier.

Let’s look at the item reader declaration:

elixir0ImportJob.xmlGitHub start:142
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<bean id="paymentSanctionScreeningReader.partitioning" scope="step" class="org.springframework.batch.item.database.HibernateCursorItemReader">
  <property name="sessionFactory" ref="sessionFactory" />
  <property name="useStatelessSession" value="true" />
  <property name="saveState" value="false" />
  <property name="queryString">
    <value>
      SELECT NEW com.ontheserverside.batch.bank.screening.SanctionScreeningContext (tx, entity)
      FROM Elixir0Transaction tx, SDNEntity entity
      WHERE tx.status = :txStatus
      AND mod(tx.id, :modDivisor) = :modRemainder
      ORDER BY tx.id
    </value>
  </property>
  <property name="parameterValues">
    <map>
      <entry key="modDivisor" value="#{stepExecutionContext['mod.divisor']}" />
      <entry key="modRemainder" value="#{stepExecutionContext['mod.remainder']}" />
      <entry key="txStatus" value="#{T(com.ontheserverside.batch.bank.tx.TransactionStatus).LOADED}"/>
    </map>
  </property>
</bean>

There are two mysterious query parameters here: modDivisor and modRemainder. Basically, they are used to divide list of all transactions into partitions. It is important to note, that it is responsibility of reader to pick up items that belong to the particular partition. Item reader described here achieve it by computing modulus of transaction identifier and specified modulo divisor and then comparing the result to the expected modulo remainder value. Modulo divisor is always equal to the number of partitions, so for example in case of grid consisting of 8 step copies, we will always get results ranging from 0 to 7. Expected modulo remainder value is specific for the particular partition, so for example, partition 0 always receives transactions for which modulo remainder of transaction ID and grid size is equal to 0 and partition 1 always receives transactions for which modulo remainder of transaction ID and grid size is equal to 1.

Modulo divisor and remainder are read from the step execution context. Component that is putting them there is the partitioner defined under moduloPartitioner identifier (see step’s definition).

The responsibility of partitioner is to produce separate ExecutionContext for each partition. Partitioner used in the example takes care that partition number 0 receives ExecutionContext with modulo divisor set to 0 and partition number 1 receives ExecutionContext with modulo divisor set to 1. Also, partitioner is putting value of modulo divisor into the ExecutionContext. Both these values are used later in the ItemReader query string.

ModuloPartitioner.javaGitHub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.ontheserverside.batch.bank.processing;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

import java.util.HashMap;
import java.util.Map;

public final class ModuloPartitioner implements Partitioner {

    public static final String MOD_DIVISOR_KEY = "mod.divisor";
    public static final String MOD_REMAINDER_KEY = "mod.remainder";

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        final Map<String, ExecutionContext> contextMap = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt(MOD_DIVISOR_KEY, gridSize);
            context.putInt(MOD_REMAINDER_KEY, i);
            contextMap.put(getContextName(i), context);
        }

        return contextMap;
    }

    private String getContextName(int index) {
        return String.format("partition-%d", index);
    }
}

As long as there is one thread operating per partition, there is no need to synchronize reader as it was in case of local chunking. Each thread of execution has it own reader instance, so even if it stores its state internally, there is no threat of concurrency problems.

As it is easy to notice, in terms of restartability, partitioning is usually better scaling strategy. However, there is one important difference that sometimes makes chunking preferred over partitioning.

Because it is responsibility of item reader to pick up only these items that are assigned to the particular partition, reader must be aware of structure of the input data and there must be some way to effectively distribute input items between partitions (in the described example, transaction identifiers determines partition assignment). What’s more it is required that item reader is able to access items on arbitrary positions in the input data collection. This feature is required in order to allow multiple threads to read and process items from different positions in the input dataset simultaneously. For instance, if items that are going to be processed are already stored in a database, it is relatively easy to define a query that picks up only items assigned to certain partition, as well as because reader can access items at random positions in the input data collection (database provides random-access to all items of input data), multiple threads can operate simultaneously in an efficient way (no bottleneck on reading items). However, if we consider reading and parsing data from an input file, chunking is much better strategy to be used – usually, there is no efficient way to divide input data into partitions and read/parse them from random positions simultaneously (data are streamed).

To sum up, here are key facts about partitioning scaling strategy:

  • identical copies of step, including all its components (reader, processor, writer) are created and assigned to each partition
  • it is responsibility of ItemReader to pick up only these items that have been assigned to the particular partition
  • if single thread operate per partition, it is safe to let reader remember processed items count, thus, partitioning is usually better choose in terms of restartability
  • not all steps can use partitioning scaling strategy (input data can not be always divided into partitions effectively)

Remote chunking

Remote chunking is a horizontal scaling strategy and can be viewed as an implementation of a standard producer-consumer pattern. Producer reads items from data source and sends them through some middleware to consumers that are going to process received items. Clearly this pattern should be used when item processing part is expensive (transport of items through middleware introduce some overhead) and item reading is not a bottleneck (consumers should not get starved).

Remote chunking

In case of sample application done as an example for this article, JMS is used as a messaging middleware and consumers are just concurrent threads receiving messages from the queue. This is simplification of a remote chunking, but it was introduced only to make the sample application simpler to use. In order to have consumers working on separate nodes, it is enough to remove producer-related components from the Spring application context (or just introduce and move them to some inactive profile) and deploy application on a separate node having access to the same JMS broker as the producer (master node).

Definition of a remote chunking-based step looks the same as a definition of a single-threaded step (note that there is no reference to the task executor):

elixir0ImportJob.xmlGitHub start:52
1
2
3
4
5
6
7
8
9
10
11
<step id="paymentSanctionScreeningStep.remoteChunking">
  <tasklet>
    <chunk reader="paymentSanctionScreeningReader.remoteChunking" processor="paymentSanctionScreeningProcessor"
           writer="hibernateItemWriter" commit-interval="5" />
  </tasklet>
  <next on="COMPLETED" to="updateScreenedTransactionStatusStep" />
  <fail on="FAILED" />
  <listeners>
    <listener ref="paymentSanctionScreeningRecoveryCleaner" />
  </listeners>
</step>

The same is for the item reader used by this step – no special declarations to make it remote chunking-aware:

elixir0ImportJob.xmlGitHub start:123
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<bean id="paymentSanctionScreeningReader.remoteChunking" class="org.springframework.batch.item.database.HibernateCursorItemReader">
  <property name="sessionFactory" ref="sessionFactory" />
  <property name="useStatelessSession" value="false" />
  <property name="saveState" value="false" />
  <property name="queryString">
    <value>
      SELECT NEW com.ontheserverside.batch.bank.screening.SanctionScreeningContext (tx, entity)
      FROM Elixir0Transaction tx, SDNEntity entity
      WHERE tx.status = :txStatus
      ORDER BY tx.id
    </value>
  </property>
  <property name="parameterValues">
    <map>
      <entry key="txStatus" value="#{T(com.ontheserverside.batch.bank.tx.TransactionStatus).LOADED}"/>
    </map>
  </property>
</bean>

The entire magic that transforms this single-threaded step definition into remote chunking step is performed under the hood by separate components from spring-batch-integration module.

First one of them is a RemoteChunkHandlerFactoryBean instance that is a FactoryBean implementation that swaps step’s processor with a special component that writes items that have been read into messaging middleware. Business processing logic that would be normally performed by the original processor is moved to the handler produced by this factory (actually, handler is forwarding processing requests to the original ItemProcessor implementation) and this handler should be installed as a consumer from the messaging middleware.

RemoteChunkHandlerFactoryBean definition looks as follows:

elixir0ImportJob.xmlGitHub start:187
1
2
3
4
<bean id="chunkHandler" class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
  <property name="chunkWriter" ref="chunkWriter" />
  <property name="step" ref="paymentSanctionScreeningStep.remoteChunking" />
</bean>

It contains reference to the step that is going to be transformed and a reference to the ItemWriter implementation (chunkWriter property). This ItemWriter implementation is not a writer that is declared in the original step definition, but a separate writer that is responsible for writing input items to the messaging middleware (JMS in this case):

elixir0ImportJob.xmlGitHub start:192
1
2
3
4
5
6
7
8
9
10
<bean id="chunkWriter" class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter" scope="step">
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="screening.requests.chunking" />
      <property name="receiveTimeout" value="1000" />
    </bean>
  </property>
  <property name="replyChannel" ref="screening.replies.chunking" />
  <property name="maxWaitTimeouts" value="10" />
</bean>

ChunkMessageChannelItemWriter declaration above has a reference to two Spring Integration channels: screening.requests.chunking and screening.replies.chunking. As their names state, the former is used to send items that are going to be processed to remote consumers and the latter is used to aggregate information about chunk execution (it’s not processed item) from remote consumers.

screening.requests.chunking channel has a JMS outbout channel adapter as its endpoint and it is declared in jmsContext.xml file (this file part is a part of a master node configuration):

jmsContext.xmlGitHub start:29
1
2
3
4
<!-- master -->
<int:channel id="screening.requests.chunking" />
<int-jms:outbound-channel-adapter connection-factory="JMSConnectionFactory" channel="screening.requests.chunking"
                                  destination-name="queue-screening-requests-chunking" />

Later on, in the same file, there is a declaration of JMS listener receiving messages from the JMS queue that contains items that should be processed. This is a part of slave node configuration.

jmsContext.xmlGitHub start:35
1
2
3
4
5
6
<!-- slave -->
<jms:listener-container connection-factory="JMSConnectionFactory" transaction-manager="transactionManager"
                        acknowledge="transacted" concurrency="8">
  <jms:listener destination="queue-screening-requests-chunking" response-destination="queue-screening-replies-chunking"
                ref="chunkHandler" method="handleChunk" />
</jms:listener-container>

JMS listener declares bean produced by the previously configured RemoteChunkHandlerFactoryBean as a handler for received messages (handleChunk method). When new message is received from the queue, handler passes it to the original ItemProcessor and after that it is passed to the original ItemWriter. The key fact here is that ItemProcessor and ItemWriter works in the same thread on slave node. Because concurrency level of JMS listener container is set to 8, there are 8 consumer threads that are processing and writing items. When some chunk has been written (commit interval has been reached) or when execution of chunk processing has failed, information about chunk execution is returned to the master node (producer). This is required to inform flow controlling mechanism about step outcomes (StepContribution instance) just like it would be in case of locally running step instance.

JMS listener is configured to send responses to the destination with name queue-screening-replies-chunking. Master node has a message-driven channel adapter configured that is reading messages from this queue:

jmsContext.xmlGitHub start:43
1
2
3
4
5
6
7
8
9
10
11
12
<!--master -->
<int-jms:message-driven-channel-adapter connection-factory="JMSConnectionFactory" destination-name="queue-screening-replies-chunking"
                                        channel="screening.incoming.chunking" />

<int:channel id="screening.incoming.chunking" />
<int:transformer input-channel="screening.incoming.chunking" output-channel="screening.replies.chunking" method="extract">
  <bean class="org.springframework.batch.integration.chunk.JmsRedeliveredExtractor"/>
</int:transformer>

<int:channel id="screening.replies.chunking">
  <int:queue/>
</int:channel>

When the message driven channel adapter receives message from queue-screening-replies-chunking destination, it is passing this message to the transformer component invoking JmsRedeliveredExtractor. This class is responsible for failing job in case of redelivered JMS message detected – such message may come from previous, failed step execution or indicate that step it timing out. In both these cases there is clearly something wrong with job execution and the current processing state may be inconsistent.

When transformer finished his work, message is passed to the screening.replies.chunking channel that is configured as a reply channel of ChunkMessageChannelItemWriter that is connecting RemoteChunkHandlerFactoryBean with a messaging middleware. Such reply message contains chunk execution summary (ChunkResponse class) and is used later to compute step outcome state (StepContribution class).

The very important fact to remember, is that items that are going to be processed are passed in a serialized form through the messaging middleware. If these items are Hibernate entities, they will need to be re-attached to the session on consumer nodes. That’s why all relationships are eagerly fetched before producer puts them into the queue.

Because there is one thread reading and sending data to the messaging middleware, there is an impression that reader can safely update its read item count. Everything is much more complicated, though. The problem lies in the fact that although producer sends chunks in the same order they were read (you can track it down looking at sequence number of ChunkRequest), there is no guarantee that response messages will be put into the response queue in the same order. As soon as there is an information about successfully completed chunk processing, read item count is updated. This problem is similar to the one with local chunk processing – there is no processing completion order guaranteed, but read item count is updated (and potentially overwritten) each time chunk completed successfully. Because of that, remote chunking step is using the same stale data cleanup listener as its vertical equivalent. Also, ItemReader is configured to do not persist its state internally.

Key facts to remember about remote chunking:

  • remote chunking is an implementation of producer-consumer pattern where producer and consumer communicate through messaging middleware
  • there is single producer thread reading items and feeding consumers
  • declaration of remote chunking-based step looks the same as declaration of single-threaded chunk processing step; The entire transformation magic is performed under the hood by separate components from spring-batch-integration module.

Remote partitioning

Remote partitioning is similar to its local (vertical) equivalent in that identical copies of step including all its components are created. However, in case of remote partitioning, these step copies are executed on separate nodes. Just like in case of remote chunking, separate components from spring-batch-integration module are responsible for configuring step as a remote partitioning-based. In the sample application, partitions are handled by threads within the same virtual machine (just to make the deployment easier), but it’s enough to extract slave components to separate application context (or to define master/slave Spring profiles) to make partitions to be executed on a separate nodes.

Remote partitioning

Usually, remote chunking step declaration consist of two parts – master and partition step:

elixir0ImportJob.xmlGitHub start:64
1
2
3
4
5
<step id="paymentSanctionScreeningStep.remotePartitioning.MASTER">
  <partition partitioner="moduloPartitioner" handler="JMSPartitionHandler" step="paymentSanctionScreeningStep.remotePartitioning.PARTITION" />
  <next on="COMPLETED" to="updateScreenedTransactionStatusStep" />
  <fail on="FAILED" />
</step>
elixir0Import.xmlGitHub start:77
1
2
3
4
5
6
<step id="paymentSanctionScreeningStep.remotePartitioning.PARTITION" xmlns="http://www.springframework.org/schema/batch">
  <tasklet throttle-limit="1">
  <chunk reader="paymentSanctionScreeningReader.partitioning" processor="paymentSanctionScreeningProcessor"
         writer="hibernateItemWriter" commit-interval="5" />
  </tasklet>
</step>

As a side note, it is good to remember that even if it is not configured explicitly as in the code snippet above, local partitioning steps also follows this pattern. In case of local and remote partitioning, there is always one master and multiple partitions controlled by the master that is responsible for coordinating work.

Just like in case of local partitioning, the same modulo partitioner component is used – it is responsibility of remote node to pick up only these items that have been assigned to the partition executed by this node.

elixir0ImportJob.xmlGitHub start:203
1
2
3
4
5
6
7
8
9
10
<bean id="JMSPartitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="paymentSanctionScreeningStep.remotePartitioning.PARTITION" />
  <property name="gridSize" value="8" />
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="screening.requests.partitioning"/>
    </bean>
  </property>
  <property name="replyChannel" ref="screening.replies.partitioning" />
</bean>

The partition handler component configured above is responsible for sending instructions to remote nodes (information about partitions that are going to be created and their execution contexts) as well as receiving their responses. Although, I named it as a JMSPartitionHandler it can actually use any other messaging middleware – spring integration channel provides a nice abstraction to plug various protocols here. MessageChannelPartitionHandler takes a reference to the step that will be executed on remote nodes (paymentSanctionScreeningStep.remotePartitioning.PARTITION in this case), size of a grid and reference to spring integration channels that will be used as request and response data transfer mediums.

Couple of lines below, there is another important bean defined: StepExecutionRequestHandler:

elixir0ImportJob.xmlGitHub start:214
1
2
3
4
5
6
<bean id="stepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer" />
  <property name="stepLocator">
    <bean class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
  </property>
</bean>

This bean is responsible for handling partition execution requests and it belongs to the remote node configuration part.

Just like in case of remote chunking, communication between master and remote nodes is organized using JMS queues. It is possible to plug other communication protocols here, though. Actually, in case of remote partitioning, communication with the remote workers does not need to be transactional or have guaranteed delivery.

Master node sends partition execution requests using outbound channel adapter attached to the channel declared previously as a default channel of MessagingTemplate used by JMSPartitionHandler bean:

jmsContext.xmlGitHub start:57
1
2
3
4
<!-- master -->
<int:channel id="screening.requests.partitioning" />
<int-jms:outbound-channel-adapter connection-factory="JMSConnectionFactory" channel="screening.requests.partitioning"
                                  destination-name="queue-screening-requests-partitioning" />

Important difference between remote partitioning and remote chunking is that in case of remote partitioning, channel (JMS queue) do not transfer items that are going to be processed. Master node sends step execution requests having execution context parameters set to those produced by patitioner. Remote node is responsible for partition instantiation and the partition itself (ItemReader actually) is responsible for picking up only these items that have been assigned to this partition.

Message-driven channel adapter receives messages on the slave node and publishes them into the screening.handler.in.partitioning channel that has a service activator assigned. This service activator is calling handle method of previously mentioned stepExecutionRequestHandler that is responsible for instantiation, configuration, execution and aggregation of results from the partition. When partition processing is finished, outbound channel adapter sends aggregated partition execution results back to the master node.

jmsContext.xmlstart:63
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- slave -->
<int-jms:message-driven-channel-adapter connection-factory="JMSConnectionFactory"
                                        destination-name="queue-screening-requests-partitioning"
                                        channel="screening.handler.in.partitioning"
                                        concurrent-consumers="8"/>
<int:channel id="screening.handler.in.partitioning" />
<int:service-activator input-channel="screening.handler.in.partitioning" output-channel="screening.handler.out.partitioning"
        ref="stepExecutionRequestHandler" method="handle" />

<int:channel id="screening.handler.out.partitioning" />
<int-jms:outbound-channel-adapter connection-factory="JMSConnectionFactory"
                                  destination-name="queue-screening-replies-partitioning"
                                  channel="screening.handler.out.partitioning" />

In case of sample application, we are using 8 concurrent consumers that are simulating separate nodes. If you, however, place remote worker configuration on 8 separate machines and set concurrency level to 1, the result would be exactly the same (except that you would get “real” remote workers – not only threads within the same JVM).

The responsibility of master node is to wait for all slaves to finish their work and to aggregate results from all partitions. The configuration below has a timeout set to one hour for execution of all partitions. It doesn’t mean however, that if all partitions executed in one minute we would wait one hour anyway – it’s just an upperbound value – when the last partition finishes processing, execution of job on master node is continued. The component that implements spring integration aggregator interface is the already mentioned JMSPartitionHandlerBean (MessageChannelPartitionHandler class):

jmsContext.xmlstart:78
1
2
3
4
5
6
7
8
9
10
11
12
<!-- master -->
<int-jms:message-driven-channel-adapter connection-factory="JMSConnectionFactory" channel="screening.staging.partitioning"
                                        destination-name="queue-screening-replies-partitioning" />

<int:channel id="screening.staging.partitioning" />

<int:aggregator ref="JMSPartitionHandler" method="aggregate"
                input-channel="screening.staging.partitioning" output-channel="screening.replies.partitioning" send-timeout="3600000"/> <!-- 1h in [ms] -->

<int:channel id="screening.replies.partitioning">
  <int:queue />
</int:channel>

In terms of restartability, remote partitioning behaves the same as its local equivalent. We don’t need to clean up stale data, because as long as there is one thread operating per partition, there is no threat of concurrency issues (each partition operates on its own clone of step).

Just like in case of local partitioning, remote partitioning requires knowledge of input data and there must be some way to distribute items between partitions effectively.

Key facts about remote partitioning:

  • each slave node contains identical copy of step, including all its components (reader, processor, writer)
  • it is responsibility of ItemReader to pick up only these items that have been assigned to the particular partition
  • items are not transferred through the messaging middleware – only partition execution requests and partition execution summaries are send
  • not all steps can use remote partitioning scaling strategy (input data can not be always divided into partitions effectively)

Author

photo

View Piotr Dyraga's LinkedIn profile  Piotr Dyraga
Senior software engineering consultant experienced in a wide range of projects (banking, logistics, computer networks and others). Please feel free to contact me if you are looking for top-notch development services for your project.

Recent Posts