Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds
Arrow up icon
GO TO TOP
Instant Apache Camel Message Routing

You're reading from   Instant Apache Camel Message Routing Route, transform, split, multicast messages, and do much more with Camel

Arrow left icon
Product type Paperback
Published in Aug 2013
Publisher Packt
ISBN-13 9781783283477
Length 62 pages
Edition 1st Edition
Arrow right icon
Author (1):
Arrow left icon
Bilgin Ibryam Bilgin Ibryam
Author Profile Icon Bilgin Ibryam
Bilgin Ibryam
Arrow right icon
View More author details
Toc

Splitting a message into many (Intermediate)


Messages passed to integration applications are not always in the right granularity to work with. In many cases they are composite messages consisting of multiple elements each of which has to be processed individually. This is when a Splitter pattern can help us to split incoming messages into series of messages, which are easier to work with. In this tutorial, we will create a route that splits an incoming XML message into multiple messages.

Getting ready

The complete source code for this tutorial is located under the project camel-message-routing-examples/splitting-message.

How to do it...

  1. In the tutorial, we assume that the incoming message is an XML message in the following format:

    <invoice>
        <item><product name="widget" quantity="2"/></item>
        <item><product name="gadget" quantity="1"/></item>
    </invoice>
  2. At the point in the Camel route where the message has to split into sub messages add the Splitter with the XPath expression:

    from("direct:start")
        .split(xpath("//invoice/item/product"))
            .to("mock:products")
        .end()
        .to("mock:result");

How it works...

The only required configuration for Splitter is the criteria used to split the messages. It can be any expression language, so we have the freedom to choose from Simple language, XPath, XQuery, and many others, depending mainly on the message type and ease of splitting. In our example, to split the incoming XML we chose to use XPath expression that will be applied to the main message, and the sub messages will be sent to next endpoint(s).

There are a couple of options to customize the way newly created Exchanges are processed. With onPrepareRef it is possible to reference a Processor in the Registry that will be called just before sending the sub message to child endpoints. Setting parallelProcessing will cause new Exchanges to be processed in parallel instead of sequentially; stopOnException will interrupt further processing remaining sub messages if an Exception is thrown and return the error, whereas leaving the default false value will finish processing all sub messages and return the error at the end. Splitter also supports splitting messages using tokenizer expressions and splitting streamed messages, which is useful for chunking large files with a low memory footprint.

There's more...

In addition to splitting messages, the Splitter can also aggregate the messages back into one. This turns Splitter into a very powerful and compact construct used for splitting and then aggregating messages without a need for an additional Aggregator.

Aggregating results

When a message is passed to Splitter, it is split and the sub messages are sent to child endpoints, however, the processing is not over yet. If the incoming MEP is InOut, then Splitter has to reply to the caller or pass a message to the next processor in the route if there is one. This is where the strategyRef option comes into play. It allows us to reference a custom implementation of the AggregationStrategy interface from Registry used for aggregating results from the sub Exchanges. This interface only has one method that accepts the original incoming Exchange and the new Exchange returned from each sub Exchange that is processed:

Exchange aggregate(Exchange oldExchange, Exchange newExchange)

A typical implementation usually gets the result from newExchange, aggregates it into oldExchange and returns it. This allows accumulation of the sub Exchange results in the original Exchange and finally returning that as the final result from the Splitter. If no custom AggregationStrategy is used, the default strategy simply returns the original incoming Exchange without performing any aggregation.

AggregationStrategy is the same interface used in the Aggregator pattern (which we will see next), and having it as part of Splitter turns Splitter into a pattern having an embedded lightweight Aggregator.

Threading model

When parallelProcessing option is set, Splitter will create a new thread pool (java.util.concurrent.ExecutorService) and using this will process sub messages in parallel. Instead of creating a new thread pool, it is possible to share and reuse an existing thread pool by referencing it from the Registry using the executorServiceRef option.

Parallel processing with thread pools are also used in other EIPs such us Aggregator, multicast, WireTap, Recipient List, and some other components. Camel creates a new thread pool based on the thread pool profiles. The default thread pool profile looks like the following:

<threadPoolProfile id="defaultThreadPoolProfile" defaultProfile="true" poolSize="10" maxPoolSize="20" maxQueueSize="1000" rejectedPolicy="CallerRuns"/>

In the Java DSL, thread pool profiles are set using the ExecutorServiceManager field of CamelContext. If we use executorServiceRef, Camel will look up in Registry for a thread pool with the given ID. If there is no such thread pool, it will look up in Registry for a thread pool profile with the same ID. In case there is a thread pool profile with that ID, Camel will use it to create the new thread pool. If there is no such thread pool profile, default profile will be used. Also, all thread pools created by Camel will be properly shutdown when CamelContext is stopped, so we shouldn't worry about thread leaks. More details about the Camel threading model can be found on this page http://camel.apache.org/threading-model.html.

You have been reading a chapter from
Instant Apache Camel Message Routing
Published in: Aug 2013
Publisher: Packt
ISBN-13: 9781783283477
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image