Splitting a message into many (Intermediate)
Messages passed to integration applications are not always in the right granularity to work with. In many cases they are composite messages consisting of multiple elements each of which has to be processed individually. This is when a Splitter pattern can help us to split incoming messages into series of messages, which are easier to work with. In this tutorial, we will create a route that splits an incoming XML message into multiple messages.
Getting ready
The complete source code for this tutorial is located under the project camel-message-routing-examples/splitting-message
.
How to do it...
In the tutorial, we assume that the incoming message is an XML message in the following format:
<invoice> <item><product name="widget" quantity="2"/></item> <item><product name="gadget" quantity="1"/></item> </invoice>
At the point in the Camel route where the message has to split into sub messages add the Splitter with the XPath expression:
from("direct:start") .split(xpath("//invoice/item/product")) .to("mock:products") .end() .to("mock:result");
How it works...
The only required configuration for Splitter is the criteria used to split the messages. It can be any expression language, so we have the freedom to choose from Simple language, XPath, XQuery, and many others, depending mainly on the message type and ease of splitting. In our example, to split the incoming XML we chose to use XPath expression that will be applied to the main message, and the sub messages will be sent to next endpoint(s).
There are a couple of options to customize the way newly created Exchange
s are processed. With onPrepareRef
it is possible to reference a Processor
in the Registry
that will be called just before sending the sub message to child endpoints. Setting parallelProcessing
will cause new Exchanges
to be processed in parallel instead of sequentially; stopOnException
will interrupt further processing remaining sub messages if an Exception is thrown and return the error, whereas leaving the default false
value will finish processing all sub messages and return the error at the end. Splitter also supports splitting messages using tokenizer expressions and splitting streamed messages, which is useful for chunking large files with a low memory footprint.
There's more...
In addition to splitting messages, the Splitter can also aggregate the messages back into one. This turns Splitter into a very powerful and compact construct used for splitting and then aggregating messages without a need for an additional Aggregator.
Aggregating results
When a message is passed to Splitter, it is split and the sub messages are sent to child endpoints, however, the processing is not over yet. If the incoming MEP is InOut, then Splitter has to reply to the caller or pass a message to the next processor in the route if there is one. This is where the strategyRef
option comes into play. It allows us to reference a custom implementation of the AggregationStrategy
interface from Registry
used for aggregating results from the sub Exchanges
. This interface only has one method that accepts the original incoming Exchange
and the new Exchange
returned from each sub Exchange
that is processed:
Exchange aggregate(Exchange oldExchange, Exchange newExchange)
A typical implementation usually gets the result from newExchange
, aggregates it into oldExchange
and returns it. This allows accumulation of the sub Exchange
results in the original Exchange
and finally returning that as the final result from the Splitter. If no custom AggregationStrategy
is used, the default strategy simply returns the original incoming Exchange
without performing any aggregation.
AggregationStrategy
is the same interface used in the Aggregator pattern (which we will see next), and having it as part of Splitter turns Splitter into a pattern having an embedded lightweight Aggregator.
Threading model
When parallelProcessing
option is set, Splitter will create a new thread pool (java.util.concurrent.ExecutorService
) and using this will process sub messages in parallel. Instead of creating a new thread pool, it is possible to share and reuse an existing thread pool by referencing it from the Registry
using the executorServiceRef
option.
Parallel processing with thread pools are also used in other EIPs such us Aggregator, multicast, WireTap, Recipient List, and some other components. Camel creates a new thread pool based on the thread pool profiles. The default thread pool profile looks like the following:
<threadPoolProfile id="defaultThreadPoolProfile" defaultProfile="true" poolSize="10" maxPoolSize="20" maxQueueSize="1000" rejectedPolicy="CallerRuns"/>
In the Java DSL, thread pool profiles are set using the ExecutorServiceManager
field of CamelContext
. If we use executorServiceRef
, Camel will look up in Registry
for a thread pool with the given ID. If there is no such thread pool, it will look up in Registry
for a thread pool profile with the same ID. In case there is a thread pool profile with that ID, Camel will use it to create the new thread pool. If there is no such thread pool profile, default profile will be used. Also, all thread pools created by Camel will be properly shutdown when CamelContext
is stopped, so we shouldn't worry about thread leaks. More details about the Camel threading model can be found on this page http://camel.apache.org/threading-model.html.