Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds
Arrow up icon
GO TO TOP
Instant Apache Camel Message Routing

You're reading from   Instant Apache Camel Message Routing Route, transform, split, multicast messages, and do much more with Camel

Arrow left icon
Product type Paperback
Published in Aug 2013
Publisher Packt
ISBN-13 9781783283477
Length 62 pages
Edition 1st Edition
Arrow right icon
Author (1):
Arrow left icon
Bilgin Ibryam Bilgin Ibryam
Author Profile Icon Bilgin Ibryam
Bilgin Ibryam
Arrow right icon
View More author details
Toc

Aggregating multiple messages into one (Intermediate)


The Aggregator pattern is the opposite of the Splitter pattern; it combines multiple messages into one. In a message driven application, it usually appears after Splitter or the Recipient List patterns where a number of messages have been produced out of one, and the goal of the Aggregator is to merge back related messages. In this tutorial, we will create a route that merges messages belonging to the same group into one.

Getting ready

The complete source code for this tutorial is located under the project camel-message-routing-examples/aggregating-messages.

How to do it...

  1. When using Aggregator, the first thing we have to decide is how the messages will be merged together. This is very specific to the actual message content and business logic of the application. We provide that logic by implementing the AggregationStrategy interface:

    public class InvoiceTotalAggregator implements AggregationStrategy {
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }
            BigDecimal currentTotal = oldExchange.getIn().getHeader("invoiceItemTotal", BigDecimal.class);
            BigDecimal itemTotal = newExchange.getIn().getHeader("invoiceItemTotal", BigDecimal.class);
            oldExchange.getIn().setHeader("invoiceItemTotal", currentTotal.add(itemTotal));
            return oldExchange;
        }
    }
  2. Then, we add the Aggregator to the route by referencing our custom aggregation strategy:

    <bean id="invoiceTotalAggregator" class="org.apache.camel.howto.InvoiceTotalAggregator"/>
    
    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="direct:start"/>
            <aggregate strategyRef="invoiceTotalAggregator" completionTimeout="3000">
                <correlationExpression>
                    <simple>header.invoiceId</simple>
                </correlationExpression>
                <to uri="mock:aggregated"/>
            </aggregate>
        </route>
    </camelContext>

The Aggregator also has two other mandatory settings: the correlation expression and the completion condition. The first one is evaluated on each incoming message to generate a correlation key. The key is used to determine which messages should be grouped together. In our example, the key would be the invoiceId, so all invoice item messages belonging to the same invoice will be merged into one message. The completion condition is used to tell Camel when the aggregation is completed and the currently built message should be sent further down the route.

How it works...

When a new message comes in, the first thing done by the Aggregator is to evaluate the correlation expression and get the correlation key. Then the correlation key is used to look up the internal repository for an existing message with the same correlation key. The Aggregator has an internal (in memory by default) repository that contains the current aggregated Exchange for each correlation key. The next step is to pass the old Exchange (if there is one) and the new Exchange to the AggregationStrategy to merge them. After the aggregation, the completeness condition is checked, and if it is satisfied the aggregated Exchange is sent to the next processor in the route. If the aggregation has not completed yet, the internal repository is updated with the aggregated Exchange, and it continues waiting for the next Exchange to complete it or for timeout to occur.

There are a number of ways for specifying the completion condition and at least one should be present: completionPredicate is a Predicate expression which has to evaluate to true in order to complete the aggregated Exchange; completionSize states how many messages should be aggregated to complete the aggregation, it can be a fixed number or dynamically retrieved from the Exchange with an expression. completionTimeout is based on an inactivity period in milliseconds of the aggregated message, and completes the aggregated Exchange if there are no further incoming messages within the specified time period; completionInterval is similar but completes all currently aggregated messages after each time interval in milliseconds. The latter two completion conditions are asynchronous conditions, because these conditions are not evaluated on each incoming message, they run in a background thread and whenever a condition is satisfied the corresponding aggregated message starts its journey on the route.

There's more...

The Aggregator is usually used together with other patterns. Here are two common integration patterns that the Aggregator takes part in:

Composed message processor

This pattern is useful for maintaining the incoming message flow while processing each element of the composite messages in a separate flow.

In the preceding routing diagram, we can see how the incoming composite message is split up into sub messages and then each sub message is routed to an appropriate destination using a Context-Based Router. After each sub message has been processed differently based on its type, they are aggregated back into a single message using Aggregator. Notice that the composed message processor pattern can also be achieved without using Aggregator explicitly. If you remember from the previous tutorial that Splitter has an embedded Aggregator in the form of AggregationStrategy. Using that strategy, Splitter can aggregate the results from each sub message and have the same effect as having an explicit Aggregator.

Scatter-gather

Here, a message is broadcast to multiple channels using the recipients list pattern or the publish subscribe mechanism. The difference in this case is that there is no Splitter involved, so each recipient gets the same copy of the incoming message.

After the message has been processed by multiple recipients, Aggregator receives the results and produces one single result. In this scenario, Aggregator usually does not merge results; instead it picks up only one of them because they are identical, in the example in the preceding diagram – the "Best" Quote.

You have been reading a chapter from
Instant Apache Camel Message Routing
Published in: Aug 2013
Publisher: Packt
ISBN-13: 9781783283477
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image