Horizontal and Vertical Scaling Strategies for Batch Applications

In this article I would like to describe possible strategies for horizontal and vertical scaling of batch applications. Code samples presented here are based on the most popular batch processing framework – Spring Batch, but all strategies are conceptually compatible with JSR-352 specification (Batch Applications for the Java Platform).

All of the code snippets come from the sample bank transaction processing application performing payment sanctions screening that was developed for the sake of this article. Also, I assume that reader has at least intermediate knowledge of batch processing concepts.

Problem definition – the sample application

For the better illustration of concepts described in this article, I created a sample bank transaction processing application that is available on GitHub. As already mentioned, all code samples come from this application, so let me first give some brief overview of problem that it solves.

Financial institutions are often seen as a front line in fight against terrorist organizations and money laundering. Banks are obligated to have adequate systems and controls in place to ensure international sanctions compliance. Payment sanction screening algorithms may be very complex to implement and penalties for failure to comply can be severe. They are a number of different bodies that impose financial sanctions – European Union, United Nations and local governments. As a result, payments are usually screened against more than 30 public sanction lists.

Application developed for the sake of this article is responsible for processing Elixir0 messages. Elixir0 is a country-domestic Polish bank message format that represents standard credit transfer transactions. All payments contained by Elixir0 are screened against OFAC’s Specially Designated Nationals List. Screening algorithm performs fuzzy matching of beneficiary and ordering party name, address and alternate name against every SDN entity. If algorithm detects that sanctions might be imposed on ordering or receiving party, placement of the particular transaction is suspended and waits for manual approval.

Please bear in mind that the implementation of payment sanction screening mechanism used in the sample application is not the most optimal and performant algorithm for this task. Also, it should not be considered as a complete solution.

The entire transaction import process consist of three steps:

  1. Load transactions from Elixir0 message into the database,
  2. Perform fuzzy string matching of name, address and alternate name of beneficiary and ordering party against all registered SDN entities. In case of similarity metric high enough, potential sanction match is registered,
  3. Accept transactions without potential SDN matches registered, suspend transactions that have at least one SDN match registered.

Unsurprisingly, the second step is the most costly one and scaling strategies described in this article are explained on this step.

Source code of the sample application can be found at:
https://github.com/pdyraga/spring-batch-samples/

Detailed information on how to compile and run it is available at the README file.

Overview of available scaling strategies

There are three strategies available for parallel processing: split flows, chunking and partitioning. Two of them – chunking and partitioning can implement both horizontal and vertical scaling. Chunking and partitioning strategies implementing horizontal scaling are usually called “remote chunking” and “remote partitioning”. The most important difference between them and their vertical equivalents is that the work to be done is delegated to remote nodes instead of delegating it to threads within the same virtual machine.

There are no additional library dependencies needed for split flows, local chunking and partitioning – spring-batch-core is perfectly enough. However, in case of remote chunking and partitioning, additional spring-batch-integration module dependency is required. What’s more, this module has a transitive dependency to spring-integration-core, so you need to be very careful if you already have it on your dependency list – some compatibility issues may arise.

Local chunking

JSR-352 specifies chunk oriented processing style as its primary pattern. It plays very important role in Spring Batch as well. Every non-tasklet step usually consist of three components: item reader, item processor and item writer. Unsurprisingly, item reader is responsible for reading data from various sources and delivering them to item processor. Item processor does all the work on item that is necessary and passes processed data to item writer that is responsible for writing it out. Because creation and commit of transaction is generally expensive, it is expected that transaction handles some number of items grouped in chunks, instead of handling each one of them separately. In 1.x versions of Spring Batch, the item writer was expected to do some kind of buffering of items and step implementation decided when to flush the buffer basis on commit-interval property. This approach reflects item-oriented strategy rather than chunk-oriented strategy. The entire step processing was organized around the item being processed. ItemReader and ItemWriter interfaces were quite complicated because they were supporting transaction rollbacks by clearing the writer buffer, resetting reader to the previously marked position and processing the failed chunk once again, now with only one item in chunk (to detect and handle problematic item depending on skip/retry strategy). Spring Batch 2.0 brought new approach to this problem named chunk oriented processing. All methods used to control the underlying source (mark, reset, flush, clear) have been removed from ItemReader and ItemWriter interfaces. ItemReader works in forward-only mode, ItemWriter accepts entire chunk of items (instead of items one by one) and it is responsibility of framework to buffer list to be written.

Difference between item-oriented and chunk-oriented approach is very well explained on the images below.

In the item-oriented approach, once item has been successfully read and processed, it is immediately passed to writer. It is writer’s responsibility to buffer the items before writing them out. This way writer avoids creating and committing transaction for each single item:

Item oriented processing

In case of chunk-oriented approach, writer accepts entire data chunk that was prepared by framework. Item writer creates and commits transaction for each chunk it has received. Writer no longer needs to buffer list of items internally – it is now responsibility of code controlling the step to prepare the chunk: Chunk oriented processing

Chunk processing step can be vertically scaled by letting each item to be read, processed and written in a separate thread of execution. All step components are accessed by multiple threads concurrently:

Local chunking

Single-thread chunk processing step can be converted into multithreaded by adding reference to multithreaded TaskExecutor to the step definition. The number of threads operating within step can be controlled using throttle-limit attribute.

Vertically scaled chunk-oriented step example is available in elixir0ImportJob.xml file under paymentSanctionScreeningStep.chunking identifier:

elixir0ImportJob.xmlGitHub start:26
1
2
3
4
5
6
7
8
9
10
11
<step id="paymentSanctionScreeningStep.chunking">
  <tasklet task-executor="executor" throttle-limit="8">
    <chunk reader="paymentSanctionScreeningReader.chunking" processor="paymentSanctionScreeningProcessor"
           writer="hibernateItemWriter" commit-interval="5"/>
  </tasklet>
  <next on="COMPLETED" to="updateScreenedTransactionStatusStep" />
  <fail on="FAILED" />
  <listeners>
    <listener ref="paymentSanctionScreeningRecoveryCleaner" />
  </listeners>
</step>

Task executor bean is defined in the applicationContext.xml:

applicationContext.xmlGitHub start:24
1
<task:executor id="executor" pool-size="50" />

The configuration above makes maximum 8 threads (see throttle-limit attribute) to operate on step’s reader, processor and writer simultaneously. All these threads are pooled by the task executor.

Because reader, processor and writer components are accessed concurrently, it may be not enough to add reference to task executor to make step successfully process items in multiple threads. Many ItemReader and ItemWriter implementations are stateful and they are not safe for multithread usage.

One example of state that is often internally hold by item reader is count of items that have been read – see org.springframework.batch.item.support.AbstractItemCountingItemStreamReader and its subclasses. Number of items that have been read is stored in the execution context in order to restart the step from exactly the same execution position (the same item) it was before the failure. Also, all implementations of org.springframework.batch.item.ItemStream interface are potentially vulnerable for concurrency issues – the same stream can be opened or updated from multiple threads. Reader should be protected against such situations. Otherwise, you can for example end up with OptimisticLockingFailureException.

The first problem (regarding restartability of step in case of failure) can be solved by disabling reader’s state persistence by setting ItemReader’s saveState property to false. Of course, it doesn’t mean that we have to resign from restartability completely. There are two patterns that can help to properly recover the failed step, without storing read items count by reader. The first one involves complete cleanup of stale date when re-entering step and the second one involves adding special column that works as a kind of marker indicating if particular entry has been processed or not. Both these strategies deserve separate article – for now, let’s assume that clean up of stale data fits better to the step we are currently discussing.

Reader used by step using chunking scaling strategy is defined as follows:

elixir0ImportJob.xmlGitHub start:100
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<bean id="paymentSanctionScreeningReader.chunking" class="com.ontheserverside.batch.bank.processing.SynchronizedItemReaderDecorator">
  <constructor-arg>
    <bean class="org.springframework.batch.item.database.HibernateCursorItemReader">
      <property name="sessionFactory" ref="sessionFactory" />
      <property name="useStatelessSession" value="false" />
      <property name="saveState" value="false" />
      <property name="queryString">
        <value>
          SELECT NEW com.ontheserverside.batch.bank.screening.SanctionScreeningContext (tx, entity)
          FROM Elixir0Transaction tx, SDNEntity entity
          WHERE tx.status = :txStatus
          ORDER BY tx.id
        </value>
      </property>
      <property name="parameterValues">
        <map>
          <entry key="txStatus" value="#{T(com.ontheserverside.batch.bank.tx.TransactionStatus).LOADED}"/>
        </map>
      </property>
    </bean>
  </constructor-arg>
</bean>

First of all, please notice that reader does not save its internal state – saveState property is set to false. In case of any failure, step will be restarted from the very beginning, so no items that should be processed can be lost. The only problem here is to make sure that step outcomes will not get duplicated when step gets restarted. For instance, if the same transaction was processed twice, two identical SanctionMatch objects would be created (SanctionMatch says that the particular transaction is suspicious, because some entity on the SDN list may be involved in it – see SanctionScreeningProcessor class). This problem is solved by before-step execution lister:

elixir0ImportJob.xmlGitHub start:33
1
2
3
<listeners>
  <listener ref="paymentSanctionScreeningRecoveryCleaner" />
</listeners>

that cleans up all stale step outcomes (all SanctionMatch objects for Elixir0Transactions with LOADED status). Of course, at this point we have to assume that there is only one import process running at the given time.

Please bear in mind, that StepExecutionListener is executed outside of chunk’s transaction. In fact, there is no transaction available in StepExecutionListener if not explicitly declared. That’s why SanctionScreeningRecoveryCleaner listener has its beforeStep method marked as @Transactional.

Regarding concurrency problems when accessing ItemReader from multiple threads, all methods from ItemReader and ItemStream interface should be synchronized. This is done by SynchronizedItemReaderDecorator:

SynchronizedItemReaderDecorator.javaGitHub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.ontheserverside.batch.bank.processing;

import org.springframework.batch.item.*;

public final class SynchronizedItemReaderDecorator<T> implements ItemStream, ItemReader<T> {

    private final ItemReader<T> delegate;

    public SynchronizedItemReaderDecorator(ItemReader<T> delegate) {
        this.delegate = delegate;
    }

    @Override
    public synchronized T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        return delegate.read();
    }

    @Override
    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).open(executionContext);
        }
    }

    @Override
    public synchronized void update(ExecutionContext executionContext) throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).update(executionContext);
        }
    }

    @Override
    public synchronized void close() throws ItemStreamException {
        if (delegate instanceof ItemStream) {
            ((ItemStream) delegate).close();
        }
    }
}

To sum up, some key facts to remember about multithreaded chunk processing:

  • single-thread chunk processing step can be converted to multithreaded by adding reference to the multithread task executor
  • most of ItemReader implementations are not safe for multithreaded usage – you need to manually synchronize them
  • keep in mind, that allowing multiple threads to operate simultaneously has serious impact on the step restartability.

Local partitioning

In case of partitioning scaling strategy, framework creates identical copies of step, including all its components (reader, processor and writer). Items that are going to be processed are divided into partitions and assigned to step’s clones depending on user-defined strategy. Usually, each step copy has one thread operating on it. It is quite reasonable approach and it is rarely required to increase number of threads operating on single partition (bear in mind, that if there are 8 copies of step, the same number of threads can operate simultaneously).

Local partitioning

Step that is based on partitioning strategy is defined as follows:

elixir0ImportJob.xmlGitHub start:38
1
2
3
4
5
6
7
8
9
10
11
12
13
<step id="paymentSanctionScreeningStep.partitioning">
  <partition partitioner="moduloPartitioner">
    <step>
      <tasklet throttle-limit="1">
        <chunk reader="paymentSanctionScreeningReader.partitioning" processor="paymentSanctionScreeningProcessor"
               writer="hibernateItemWriter" commit-interval="5"/>
      </tasklet>
    </step>
    <handler task-executor="executor" grid-size="8" />
  </partition>
  <next on="COMPLETED" to="updateScreenedTransactionStatusStep" />
  <fail on="FAILED" />
</step>

The first thing to note here is that partition consist of step definition and the partition handler. Because this is local partitioning step, each partition will be handled by separate thread of execution within the same virtual machine. That’s why handler has a reference to the same multithreaded task executor as in case of previously discussed local chunking-based step. The number of step copies (and number of threads that will operate on them) is controlled by grid-size property on partition handler. Because, as already stated, we usually want to have one thread operating per partition (no multithreaded chunking within partition), throttle-limit attribute of tasklet is set to 1.

Please note, that there is no before-step execution listener defined here as it was in case of chunk processing step where before-step listener was performing stale data cleanup. Also, paymentSanctionScreeningReader.partitioning item reader is configured to remember number of items read. That’s because, there is only one thread operating per partition (throttle-limit=1) and there is a separate instance of ItemReader per partition, so number of items that have been read can be safely stored. Moreover, framework remembers number of partitions created and which one of them has failed. When step is being recovered, only failed partitions are restarted. What’s more, recovered partitions always receive the same input parameters, so if reader remembers number of items that have been successfully processed, these items are not read and processed again. The crucial thing to make it work properly is to read items always in the same order and this is achieved using ORDER BY tx.id query modifier.

Let’s look at the item reader declaration:

elixir0ImportJob.xmlGitHub start:142
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<bean id="paymentSanctionScreeningReader.partitioning" scope="step" class="org.springframework.batch.item.database.HibernateCursorItemReader">
  <property name="sessionFactory" ref="sessionFactory" />
  <property name="useStatelessSession" value="true" />
  <property name="saveState" value="false" />
  <property name="queryString">
    <value>
      SELECT NEW com.ontheserverside.batch.bank.screening.SanctionScreeningContext (tx, entity)
      FROM Elixir0Transaction tx, SDNEntity entity
      WHERE tx.status = :txStatus
      AND mod(tx.id, :modDivisor) = :modRemainder
      ORDER BY tx.id
    </value>
  </property>
  <property name="parameterValues">
    <map>
      <entry key="modDivisor" value="#{stepExecutionContext['mod.divisor']}" />
      <entry key="modRemainder" value="#{stepExecutionContext['mod.remainder']}" />
      <entry key="txStatus" value="#{T(com.ontheserverside.batch.bank.tx.TransactionStatus).LOADED}"/>
    </map>
  </property>
</bean>

There are two mysterious query parameters here: modDivisor and modRemainder. Basically, they are used to divide list of all transactions into partitions. It is important to note, that it is responsibility of reader to pick up items that belong to the particular partition. Item reader described here achieve it by computing modulus of transaction identifier and specified modulo divisor and then comparing the result to the expected modulo remainder value. Modulo divisor is always equal to the number of partitions, so for example in case of grid consisting of 8 step copies, we will always get results ranging from 0 to 7. Expected modulo remainder value is specific for the particular partition, so for example, partition 0 always receives transactions for which modulo remainder of transaction ID and grid size is equal to 0 and partition 1 always receives transactions for which modulo remainder of transaction ID and grid size is equal to 1.

Modulo divisor and remainder are read from the step execution context. Component that is putting them there is the partitioner defined under moduloPartitioner identifier (see step’s definition).

The responsibility of partitioner is to produce separate ExecutionContext for each partition. Partitioner used in the example takes care that partition number 0 receives ExecutionContext with modulo divisor set to 0 and partition number 1 receives ExecutionContext with modulo divisor set to 1. Also, partitioner is putting value of modulo divisor into the ExecutionContext. Both these values are used later in the ItemReader query string.

ModuloPartitioner.javaGitHub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.ontheserverside.batch.bank.processing;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

import java.util.HashMap;
import java.util.Map;

public final class ModuloPartitioner implements Partitioner {

    public static final String MOD_DIVISOR_KEY = "mod.divisor";
    public static final String MOD_REMAINDER_KEY = "mod.remainder";

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        final Map<String, ExecutionContext> contextMap = new HashMap<>();

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt(MOD_DIVISOR_KEY, gridSize);
            context.putInt(MOD_REMAINDER_KEY, i);
            contextMap.put(getContextName(i), context);
        }

        return contextMap;
    }

    private String getContextName(int index) {
        return String.format("partition-%d", index);
    }
}

As long as there is one thread operating per partition, there is no need to synchronize reader as it was in case of local chunking. Each thread of execution has it own reader instance, so even if it stores its state internally, there is no threat of concurrency problems.

As it is easy to notice, in terms of restartability, partitioning is usually better scaling strategy. However, there is one important difference that sometimes makes chunking preferred over partitioning.

Because it is responsibility of item reader to pick up only these items that are assigned to the particular partition, reader must be aware of structure of the input data and there must be some way to effectively distribute input items between partitions (in the described example, transaction identifiers determines partition assignment). What’s more it is required that item reader is able to access items on arbitrary positions in the input data collection. This feature is required in order to allow multiple threads to read and process items from different positions in the input dataset simultaneously. For instance, if items that are going to be processed are already stored in a database, it is relatively easy to define a query that picks up only items assigned to certain partition, as well as because reader can access items at random positions in the input data collection (database provides random-access to all items of input data), multiple threads can operate simultaneously in an efficient way (no bottleneck on reading items). However, if we consider reading and parsing data from an input file, chunking is much better strategy to be used – usually, there is no efficient way to divide input data into partitions and read/parse them from random positions simultaneously (data are streamed).

To sum up, here are key facts about partitioning scaling strategy:

  • identical copies of step, including all its components (reader, processor, writer) are created and assigned to each partition
  • it is responsibility of ItemReader to pick up only these items that have been assigned to the particular partition
  • if single thread operate per partition, it is safe to let reader remember processed items count, thus, partitioning is usually better choose in terms of restartability
  • not all steps can use partitioning scaling strategy (input data can not be always divided into partitions effectively)

Remote chunking

Remote chunking is a horizontal scaling strategy and can be viewed as an implementation of a standard producer-consumer pattern. Producer reads items from data source and sends them through some middleware to consumers that are going to process received items. Clearly this pattern should be used when item processing part is expensive (transport of items through middleware introduce some overhead) and item reading is not a bottleneck (consumers should not get starved).

Remote chunking

In case of sample application done as an example for this article, JMS is used as a messaging middleware and consumers are just concurrent threads receiving messages from the queue. This is simplification of a remote chunking, but it was introduced only to make the sample application simpler to use. In order to have consumers working on separate nodes, it is enough to remove producer-related components from the Spring application context (or just introduce and move them to some inactive profile) and deploy application on a separate node having access to the same JMS broker as the producer (master node).

Definition of a remote chunking-based step looks the same as a definition of a single-threaded step (note that there is no reference to the task executor):

elixir0ImportJob.xmlGitHub start:52
1
2
3
4
5
6
7
8
9
10
11
<step id="paymentSanctionScreeningStep.remoteChunking">
  <tasklet>
    <chunk reader="paymentSanctionScreeningReader.remoteChunking" processor="paymentSanctionScreeningProcessor"
           writer="hibernateItemWriter" commit-interval="5" />
  </tasklet>
  <next on="COMPLETED" to="updateScreenedTransactionStatusStep" />
  <fail on="FAILED" />
  <listeners>
    <listener ref="paymentSanctionScreeningRecoveryCleaner" />
  </listeners>
</step>

The same is for the item reader used by this step – no special declarations to make it remote chunking-aware:

elixir0ImportJob.xmlGitHub start:123
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<bean id="paymentSanctionScreeningReader.remoteChunking" class="org.springframework.batch.item.database.HibernateCursorItemReader">
  <property name="sessionFactory" ref="sessionFactory" />
  <property name="useStatelessSession" value="false" />
  <property name="saveState" value="false" />
  <property name="queryString">
    <value>
      SELECT NEW com.ontheserverside.batch.bank.screening.SanctionScreeningContext (tx, entity)
      FROM Elixir0Transaction tx, SDNEntity entity
      WHERE tx.status = :txStatus
      ORDER BY tx.id
    </value>
  </property>
  <property name="parameterValues">
    <map>
      <entry key="txStatus" value="#{T(com.ontheserverside.batch.bank.tx.TransactionStatus).LOADED}"/>
    </map>
  </property>
</bean>

The entire magic that transforms this single-threaded step definition into remote chunking step is performed under the hood by separate components from spring-batch-integration module.

First one of them is a RemoteChunkHandlerFactoryBean instance that is a FactoryBean implementation that swaps step’s processor with a special component that writes items that have been read into messaging middleware. Business processing logic that would be normally performed by the original processor is moved to the handler produced by this factory (actually, handler is forwarding processing requests to the original ItemProcessor implementation) and this handler should be installed as a consumer from the messaging middleware.

RemoteChunkHandlerFactoryBean definition looks as follows:

elixir0ImportJob.xmlGitHub start:187
1
2
3
4
<bean id="chunkHandler" class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
  <property name="chunkWriter" ref="chunkWriter" />
  <property name="step" ref="paymentSanctionScreeningStep.remoteChunking" />
</bean>

It contains reference to the step that is going to be transformed and a reference to the ItemWriter implementation (chunkWriter property). This ItemWriter implementation is not a writer that is declared in the original step definition, but a separate writer that is responsible for writing input items to the messaging middleware (JMS in this case):

elixir0ImportJob.xmlGitHub start:192
1
2
3
4
5
6
7
8
9
10
<bean id="chunkWriter" class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter" scope="step">
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="screening.requests.chunking" />
      <property name="receiveTimeout" value="1000" />
    </bean>
  </property>
  <property name="replyChannel" ref="screening.replies.chunking" />
  <property name="maxWaitTimeouts" value="10" />
</bean>

ChunkMessageChannelItemWriter declaration above has a reference to two Spring Integration channels: screening.requests.chunking and screening.replies.chunking. As their names state, the former is used to send items that are going to be processed to remote consumers and the latter is used to aggregate information about chunk execution (it’s not processed item) from remote consumers.

screening.requests.chunking channel has a JMS outbout channel adapter as its endpoint and it is declared in jmsContext.xml file (this file part is a part of a master node configuration):

jmsContext.xmlGitHub start:29
1
2
3
4
<!-- master -->
<int:channel id="screening.requests.chunking" />
<int-jms:outbound-channel-adapter connection-factory="JMSConnectionFactory" channel="screening.requests.chunking"
                                  destination-name="queue-screening-requests-chunking" />

Later on, in the same file, there is a declaration of JMS listener receiving messages from the JMS queue that contains items that should be processed. This is a part of slave node configuration.

jmsContext.xmlGitHub start:35
1
2
3
4
5
6
<!-- slave -->
<jms:listener-container connection-factory="JMSConnectionFactory" transaction-manager="transactionManager"
                        acknowledge="transacted" concurrency="8">
  <jms:listener destination="queue-screening-requests-chunking" response-destination="queue-screening-replies-chunking"
                ref="chunkHandler" method="handleChunk" />
</jms:listener-container>

JMS listener declares bean produced by the previously configured RemoteChunkHandlerFactoryBean as a handler for received messages (handleChunk method). When new message is received from the queue, handler passes it to the original ItemProcessor and after that it is passed to the original ItemWriter. The key fact here is that ItemProcessor and ItemWriter works in the same thread on slave node. Because concurrency level of JMS listener container is set to 8, there are 8 consumer threads that are processing and writing items. When some chunk has been written (commit interval has been reached) or when execution of chunk processing has failed, information about chunk execution is returned to the master node (producer). This is required to inform flow controlling mechanism about step outcomes (StepContribution instance) just like it would be in case of locally running step instance.

JMS listener is configured to send responses to the destination with name queue-screening-replies-chunking. Master node has a message-driven channel adapter configured that is reading messages from this queue:

jmsContext.xmlGitHub start:43
1
2
3
4
5
6
7
8
9
10
11
12
<!--master -->
<int-jms:message-driven-channel-adapter connection-factory="JMSConnectionFactory" destination-name="queue-screening-replies-chunking"
                                        channel="screening.incoming.chunking" />

<int:channel id="screening.incoming.chunking" />
<int:transformer input-channel="screening.incoming.chunking" output-channel="screening.replies.chunking" method="extract">
  <bean class="org.springframework.batch.integration.chunk.JmsRedeliveredExtractor"/>
</int:transformer>

<int:channel id="screening.replies.chunking">
  <int:queue/>
</int:channel>

When the message driven channel adapter receives message from queue-screening-replies-chunking destination, it is passing this message to the transformer component invoking JmsRedeliveredExtractor. This class is responsible for failing job in case of redelivered JMS message detected – such message may come from previous, failed step execution or indicate that step it timing out. In both these cases there is clearly something wrong with job execution and the current processing state may be inconsistent.

When transformer finished his work, message is passed to the screening.replies.chunking channel that is configured as a reply channel of ChunkMessageChannelItemWriter that is connecting RemoteChunkHandlerFactoryBean with a messaging middleware. Such reply message contains chunk execution summary (ChunkResponse class) and is used later to compute step outcome state (StepContribution class).

The very important fact to remember, is that items that are going to be processed are passed in a serialized form through the messaging middleware. If these items are Hibernate entities, they will need to be re-attached to the session on consumer nodes. That’s why all relationships are eagerly fetched before producer puts them into the queue.

Because there is one thread reading and sending data to the messaging middleware, there is an impression that reader can safely update its read item count. Everything is much more complicated, though. The problem lies in the fact that although producer sends chunks in the same order they were read (you can track it down looking at sequence number of ChunkRequest), there is no guarantee that response messages will be put into the response queue in the same order. As soon as there is an information about successfully completed chunk processing, read item count is updated. This problem is similar to the one with local chunk processing – there is no processing completion order guaranteed, but read item count is updated (and potentially overwritten) each time chunk completed successfully. Because of that, remote chunking step is using the same stale data cleanup listener as its vertical equivalent. Also, ItemReader is configured to do not persist its state internally.

Key facts to remember about remote chunking:

  • remote chunking is an implementation of producer-consumer pattern where producer and consumer communicate through messaging middleware
  • there is single producer thread reading items and feeding consumers
  • declaration of remote chunking-based step looks the same as declaration of single-threaded chunk processing step; The entire transformation magic is performed under the hood by separate components from spring-batch-integration module.

Remote partitioning

Remote partitioning is similar to its local (vertical) equivalent in that identical copies of step including all its components are created. However, in case of remote partitioning, these step copies are executed on separate nodes. Just like in case of remote chunking, separate components from spring-batch-integration module are responsible for configuring step as a remote partitioning-based. In the sample application, partitions are handled by threads within the same virtual machine (just to make the deployment easier), but it’s enough to extract slave components to separate application context (or to define master/slave Spring profiles) to make partitions to be executed on a separate nodes.

Remote partitioning

Usually, remote chunking step declaration consist of two parts – master and partition step:

elixir0ImportJob.xmlGitHub start:64
1
2
3
4
5
<step id="paymentSanctionScreeningStep.remotePartitioning.MASTER">
  <partition partitioner="moduloPartitioner" handler="JMSPartitionHandler" step="paymentSanctionScreeningStep.remotePartitioning.PARTITION" />
  <next on="COMPLETED" to="updateScreenedTransactionStatusStep" />
  <fail on="FAILED" />
</step>
elixir0Import.xmlGitHub start:77
1
2
3
4
5
6
<step id="paymentSanctionScreeningStep.remotePartitioning.PARTITION" xmlns="http://www.springframework.org/schema/batch">
  <tasklet throttle-limit="1">
  <chunk reader="paymentSanctionScreeningReader.partitioning" processor="paymentSanctionScreeningProcessor"
         writer="hibernateItemWriter" commit-interval="5" />
  </tasklet>
</step>

As a side note, it is good to remember that even if it is not configured explicitly as in the code snippet above, local partitioning steps also follows this pattern. In case of local and remote partitioning, there is always one master and multiple partitions controlled by the master that is responsible for coordinating work.

Just like in case of local partitioning, the same modulo partitioner component is used – it is responsibility of remote node to pick up only these items that have been assigned to the partition executed by this node.

elixir0ImportJob.xmlGitHub start:203
1
2
3
4
5
6
7
8
9
10
<bean id="JMSPartitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="paymentSanctionScreeningStep.remotePartitioning.PARTITION" />
  <property name="gridSize" value="8" />
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="screening.requests.partitioning"/>
    </bean>
  </property>
  <property name="replyChannel" ref="screening.replies.partitioning" />
</bean>

The partition handler component configured above is responsible for sending instructions to remote nodes (information about partitions that are going to be created and their execution contexts) as well as receiving their responses. Although, I named it as a JMSPartitionHandler it can actually use any other messaging middleware – spring integration channel provides a nice abstraction to plug various protocols here. MessageChannelPartitionHandler takes a reference to the step that will be executed on remote nodes (paymentSanctionScreeningStep.remotePartitioning.PARTITION in this case), size of a grid and reference to spring integration channels that will be used as request and response data transfer mediums.

Couple of lines below, there is another important bean defined: StepExecutionRequestHandler:

elixir0ImportJob.xmlGitHub start:214
1
2
3
4
5
6
<bean id="stepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer" />
  <property name="stepLocator">
    <bean class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
  </property>
</bean>

This bean is responsible for handling partition execution requests and it belongs to the remote node configuration part.

Just like in case of remote chunking, communication between master and remote nodes is organized using JMS queues. It is possible to plug other communication protocols here, though. Actually, in case of remote partitioning, communication with the remote workers does not need to be transactional or have guaranteed delivery.

Master node sends partition execution requests using outbound channel adapter attached to the channel declared previously as a default channel of MessagingTemplate used by JMSPartitionHandler bean:

jmsContext.xmlGitHub start:57
1
2
3
4
<!-- master -->
<int:channel id="screening.requests.partitioning" />
<int-jms:outbound-channel-adapter connection-factory="JMSConnectionFactory" channel="screening.requests.partitioning"
                                  destination-name="queue-screening-requests-partitioning" />

Important difference between remote partitioning and remote chunking is that in case of remote partitioning, channel (JMS queue) do not transfer items that are going to be processed. Master node sends step execution requests having execution context parameters set to those produced by patitioner. Remote node is responsible for partition instantiation and the partition itself (ItemReader actually) is responsible for picking up only these items that have been assigned to this partition.

Message-driven channel adapter receives messages on the slave node and publishes them into the screening.handler.in.partitioning channel that has a service activator assigned. This service activator is calling handle method of previously mentioned stepExecutionRequestHandler that is responsible for instantiation, configuration, execution and aggregation of results from the partition. When partition processing is finished, outbound channel adapter sends aggregated partition execution results back to the master node.

jmsContext.xmlstart:63
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- slave -->
<int-jms:message-driven-channel-adapter connection-factory="JMSConnectionFactory"
                                        destination-name="queue-screening-requests-partitioning"
                                        channel="screening.handler.in.partitioning"
                                        concurrent-consumers="8"/>
<int:channel id="screening.handler.in.partitioning" />
<int:service-activator input-channel="screening.handler.in.partitioning" output-channel="screening.handler.out.partitioning"
        ref="stepExecutionRequestHandler" method="handle" />

<int:channel id="screening.handler.out.partitioning" />
<int-jms:outbound-channel-adapter connection-factory="JMSConnectionFactory"
                                  destination-name="queue-screening-replies-partitioning"
                                  channel="screening.handler.out.partitioning" />

In case of sample application, we are using 8 concurrent consumers that are simulating separate nodes. If you, however, place remote worker configuration on 8 separate machines and set concurrency level to 1, the result would be exactly the same (except that you would get “real” remote workers – not only threads within the same JVM).

The responsibility of master node is to wait for all slaves to finish their work and to aggregate results from all partitions. The configuration below has a timeout set to one hour for execution of all partitions. It doesn’t mean however, that if all partitions executed in one minute we would wait one hour anyway – it’s just an upperbound value – when the last partition finishes processing, execution of job on master node is continued. The component that implements spring integration aggregator interface is the already mentioned JMSPartitionHandlerBean (MessageChannelPartitionHandler class):

jmsContext.xmlstart:78
1
2
3
4
5
6
7
8
9
10
11
12
<!-- master -->
<int-jms:message-driven-channel-adapter connection-factory="JMSConnectionFactory" channel="screening.staging.partitioning"
                                        destination-name="queue-screening-replies-partitioning" />

<int:channel id="screening.staging.partitioning" />

<int:aggregator ref="JMSPartitionHandler" method="aggregate"
                input-channel="screening.staging.partitioning" output-channel="screening.replies.partitioning" send-timeout="3600000"/> <!-- 1h in [ms] -->

<int:channel id="screening.replies.partitioning">
  <int:queue />
</int:channel>

In terms of restartability, remote partitioning behaves the same as its local equivalent. We don’t need to clean up stale data, because as long as there is one thread operating per partition, there is no threat of concurrency issues (each partition operates on its own clone of step).

Just like in case of local partitioning, remote partitioning requires knowledge of input data and there must be some way to distribute items between partitions effectively.

Key facts about remote partitioning:

  • each slave node contains identical copy of step, including all its components (reader, processor, writer)
  • it is responsibility of ItemReader to pick up only these items that have been assigned to the particular partition
  • items are not transferred through the messaging middleware – only partition execution requests and partition execution summaries are send
  • not all steps can use remote partitioning scaling strategy (input data can not be always divided into partitions effectively)

Comments

Author

photo

View Piotr Dyraga's LinkedIn profile  Piotr Dyraga
Software engineering consultant experienced in a wide range of projects (banking, logistics, computer networks and others). Please feel free to contact me if you are looking for development services for your project.

Recent Posts