Search icon CANCEL
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Conferences
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Instant Apache Camel Message Routing

You're reading from   Instant Apache Camel Message Routing Route, transform, split, multicast messages, and do much more with Camel

Arrow left icon
Product type Paperback
Published in Aug 2013
Publisher Packt
ISBN-13 9781783283477
Length 62 pages
Edition 1st Edition
Arrow right icon
Author (1):
Arrow left icon
Bilgin Ibryam Bilgin Ibryam
Author Profile Icon Bilgin Ibryam
Bilgin Ibryam
Arrow right icon
View More author details
Toc

Reorganizing messages (Intermediate)


In an asynchronous distributed application, messages can easily get out of order. Sometimes, this is due to a limitation of some endpoints, for example the Amazon Simple Queue Service not guaranteeing in order delivery of messages. And sometimes, it is a conscious architectural decision (for example to increase throughput), maybe there is a Splitter or Context-Based Router used to split long running tasks into separate threads which is losing the original message order. No matter what the reason is, when the messages have to be (re)ordered, Resequencer is the pattern to use.

Getting ready

The complete source code for this tutorial is located under the project camel-message-routing-examples/reorganizing-messages.

How to do it...

Let's start with a route that is supposably getting random messages, and we want to put them back in order. Here is how a Resequencer definition looks in Java DSL:

from("direct:start")
    .resequence(header("message_index"))
    .batch().size(100).timeout(1000L)
    .to("mock:result");

The main piece of information Resequencer needs to know is the criteria that will be used to order the messages. This can be the message body itself, or a header or property and is specified using an expression. In addition to the header used for ordering messages, in our example there are two other options: batchSize and batchTimeout. These options control how long the Resequencer should wait and collect messages before sorting and letting them go.

How it works...

In the previous example, the Resequencer will collect up to 100 messages or will wait up to one second (whichever happens first), and then if there are any messages batched, sort them by the message_index header value, and let the messages further down the pipeline. When used in the default batching mode, Resequencer can also do reverse sorting, allow duplicate messages, and reject old messages or messages with an invalid sequence number. The downside of the batch processing algorithm is that it always collects messages up to a certain number or timeout, reducing the overall throughput of the system. There is also a stream based algorithm which reorders messages continuously based on detections of gaps between messages rather than fixed batch size. In order to use stream-based Resequencer, the messages must contain a unique sequence number for which the predecessor and successor is known. For example, the message sequence 1, 2, 4 has a gap, and the Resequencer will let message 1 and 2 go instantly without any batching, but retain message 4 until 3 arrives or timeout occurs. Configuring a streaming based Resequencer is very similar to a batch based one, except this one has the timeout and capacity options:

from("direct:start")
    .resequence(header("message_index"))
    .stream().capacity(100).timeout(1000L)
    .to("mock:result");

More information about the Resequencer pattern and how to use it can be found here at http://camel.apache.org/resequencer.html.

There's more...

There are two other patterns that work on the message flow without modifying the message content. Let's have a look at Throttler and Delayer patterns.

Limiting flow rate with Throttler

Throttler is a simple pattern useful for organizing messages in time. It works by throttling down messages to a specified maximum rate in order to protect a target endpoint from getting overloaded. For example, some APIs do not allow frequent calls or incur extra charges when a certain rate is exceeded. A snippet like the following will prevent the mock:result endpoint from getting more than three requests per 10 seconds during peak load times:

<route>
    <from uri="seda:a"/>
        <throttle maximumRequestsPerPeriod="3" timePeriodMillis="10000">
            <to uri="mock:result"/>
        </throttle>
</route>

The option maximumRequestsPerPeriod doesn't have to be hardcoded in the route definition, it can also be calculated at runtime using expressions.

Throttler can also work in non-blocking mode, so when the number of messages exceeds the maximum rate, instead of blocking the caller thread, it will schedule a task to be executed in the future using a separate thread. This lets the caller process further incoming messages while still honoring the maximum rate after the Throttler.

Delaying messages

The Delayer pattern functions very similar to Throttler, but instead of introducing delay only when the maximum rate is exceeded, it always delays the messages by the specified amount of time. One good use case for this pattern is route testing and simulating long running processes:

<route>
    <from uri="direct:start"/>
    <delay asyncDelayed="true">
        <constant>1000</constant>
    </delay>
    <to uri="mock:result"/>
</route>

Similarly for Throttler, it is possible to specify the delay option using expression or make it non-blocking using the asyncDelayed option.

lock icon The rest of the chapter is locked
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime