Reorganizing messages (Intermediate)
In an asynchronous distributed application, messages can easily get out of order. Sometimes, this is due to a limitation of some endpoints, for example the Amazon Simple Queue Service not guaranteeing in order delivery of messages. And sometimes, it is a conscious architectural decision (for example to increase throughput), maybe there is a Splitter or Context-Based Router used to split long running tasks into separate threads which is losing the original message order. No matter what the reason is, when the messages have to be (re)ordered, Resequencer is the pattern to use.
Getting ready
The complete source code for this tutorial is located under the project camel-message-routing-examples/reorganizing-messages
.
How to do it...
Let's start with a route that is supposably getting random messages, and we want to put them back in order. Here is how a Resequencer definition looks in Java DSL:
from("direct:start") .resequence(header("message_index")) .batch().size(100).timeout(1000L) .to("mock:result");
The main piece of information Resequencer needs to know is the criteria that will be used to order the messages. This can be the message body itself, or a header or property and is specified using an expression. In addition to the header used for ordering messages, in our example there are two other options: batchSize
and batchTimeout
. These options control how long the Resequencer should wait and collect messages before sorting and letting them go.
How it works...
In the previous example, the Resequencer will collect up to 100 messages or will wait up to one second (whichever happens first), and then if there are any messages batched, sort them by the message_index
header value, and let the messages further down the pipeline. When used in the default batching mode, Resequencer can also do reverse sorting, allow duplicate messages, and reject old messages or messages with an invalid sequence number. The downside of the batch processing algorithm is that it always collects messages up to a certain number or timeout, reducing the overall throughput of the system. There is also a stream based algorithm which reorders messages continuously based on detections of gaps between messages rather than fixed batch size. In order to use stream-based Resequencer, the messages must contain a unique sequence number for which the predecessor and successor is known. For example, the message sequence 1, 2, 4 has a gap, and the Resequencer will let message 1 and 2 go instantly without any batching, but retain message 4 until 3 arrives or timeout occurs. Configuring a streaming based Resequencer is very similar to a batch based one, except this one has the timeout
and capacity
options:
from("direct:start") .resequence(header("message_index")) .stream().capacity(100).timeout(1000L) .to("mock:result");
More information about the Resequencer pattern and how to use it can be found here at http://camel.apache.org/resequencer.html.
There's more...
There are two other patterns that work on the message flow without modifying the message content. Let's have a look at Throttler and Delayer patterns.
Limiting flow rate with Throttler
Throttler is a simple pattern useful for organizing messages in time. It works by throttling down messages to a specified maximum rate in order to protect a target endpoint from getting overloaded. For example, some APIs do not allow frequent calls or incur extra charges when a certain rate is exceeded. A snippet like the following will prevent the mock:result
endpoint from getting more than three requests per 10 seconds during peak load times:
<route> <from uri="seda:a"/> <throttle maximumRequestsPerPeriod="3" timePeriodMillis="10000"> <to uri="mock:result"/> </throttle> </route>
The option maximumRequestsPerPeriod
doesn't have to be hardcoded in the route definition, it can also be calculated at runtime using expressions.
Throttler can also work in non-blocking mode, so when the number of messages exceeds the maximum rate, instead of blocking the caller thread, it will schedule a task to be executed in the future using a separate thread. This lets the caller process further incoming messages while still honoring the maximum rate after the Throttler.
Delaying messages
The Delayer pattern functions very similar to Throttler, but instead of introducing delay only when the maximum rate is exceeded, it always delays the messages by the specified amount of time. One good use case for this pattern is route testing and simulating long running processes:
<route> <from uri="direct:start"/> <delay asyncDelayed="true"> <constant>1000</constant> </delay> <to uri="mock:result"/> </route>
Similarly for Throttler, it is possible to specify the delay option using expression or make it non-blocking using the asyncDelayed
option.