




















































In this article by Chandan Pandey, the author of Spring Integration Essentials, we will explore the out-of-the-box capabilities that the Spring Integration framework provides for a seamless flow of messages across heterogeneous components and see what Spring Integration has in the box when it comes to real-world integration challenges. We will cover Spring Integration's support for external components and we will cover the following topics in detail:
(For more resources related to this topic, see here.)
The aggregators are the opposite of splitters - they combine multiple messages and present them as a single message to the next endpoint. This is a very complex operation, so let's start by a real life scenario. A news channel might have many correspondents who can upload articles and related images. It might happen that the text of the articles arrives much sooner than the associated images - but the article must be sent for publishing only when all relevant images have also arrived. This scenario throws up a lot of challenges; partial articles should be stored somewhere, there should be a way to correlate incoming components with existing ones, and also there should be a way to identify the completion of a message. Aggregators are there to handle all of these aspects - some of the relevant concepts that are used are MessageStore, CorrelationStrategy, and ReleaseStrategy. Let's start with a code sample and then we will dive down to explore each of these concepts in detail:
<int:aggregator input-channel="fetchedFeedChannelForAggregatior" output-channel="aggregatedFeedChannel" ref="aggregatorSoFeedBean" method="aggregateAndPublish" release-strategy="sofeedCompletionStrategyBean" release-strategy-method="checkCompleteness" correlation-strategy="soFeedCorrelationStrategyBean" correlation-strategy-method="groupFeedsBasedOnCategory" message-store="feedsMySqlStore " expire-groups-upon-completion="true"> <int:poller fixed-rate="1000"></int:poller> </int:aggregator>
Hmm, a pretty big declaration! And why not—a lot of things combine together to act as an aggregator. Let's quickly glance at all the tags used:
Java class can be defined as an aggregator and, as described in the previous bullet points, the method and ref parameters decide which method of bean (referred by ref) should be invoked when messages have been aggregated as per CorrelationStrategy and released after fulfilment of ReleaseStrategy. In the following example, we are just printing the messages before passing them on to the next consumer in the chain:
public class SoFeedAggregator { public List<SyndEntry> aggregateAndPublish(List<SyndEntry>
messages) { //Do some pre-processing before passing on to next channel return messages; } }
Let's get to the details of the three most important components that complete the aggregator.
Aggregator needs to group the messages—but how will it decide the groups? In simple words, CorrelationStrategy decides how to correlate the messages. The default is based on a header named CORRELATION_ID. All messages having the same value for the CORRELATION_ID header will be put in one bracket. Alternatively, we can designate any Java class and its method to define a custom correlation strategy or can extend Spring Integration framework's CorrelationStrategy interface to define it. If the CorrelationStrategy interface is implemented, then the getCorrelationKey() method should be implemented. Let's see our correlation strategy in the feeds example:
public class CorrelationStrategy { public Object groupFeedsBasedOnCategory(Message<?> message) { if(message!=null){ SyndEntry entry = (SyndEntry)message.getPayload(); List<SyndCategoryImpl> categories=entry.getCategories(); if(categories!=null&&categories.size()>0){ for (SyndCategoryImpl category: categories) { //for simplicity, lets consider the first category return category.getName(); } } } return null; } }
So how are we correlating our messages? We are correlating the feeds based on the category name. The method must return an object that can be used for correlating the messages. If a user-defined object is returned, it must satisfy the requirements for a key in a map such as defining hashcode() and equals(). The return value must not be null.
Alternatively, if we would have wanted to implement it by extending framework support, then it would have looked like this:
public class CorrelationStrategy implements CorrelationStrategy { public Object getCorrelationKey(Message<?> message) { if(message!=null){ … return category.getName(); } } } return null; } } }
We have been grouping messages based on correlation strategy—but when will we release it for the next component? This is decided by the release strategy. Similar to the correlation strategy, any Java POJO can define the release strategy or we can extend framework support. Here is the example of using the Java POJO class:
public class CompletionStrategy { public boolean checkCompleteness(List<SyndEntry> messages) { if(messages!=null){ if(messages.size()>2){ return true; } } return false; } }
The argument of a message must be of type collection and it must return a Boolean indication whether to release the accumulated messages or not. For simplicity, we have just checked for the number of messages from the same category—if it's greater than two, we release the messages.
Until an aggregated message fulfils the release criteria, the aggregator needs to store them temporarily. This is where message stores come into the picture. Message stores can be of two types: in-memory and persistence store. Default is in memory, and if this is to be used, then there is no need to declare this attribute at all. If a persistent message store needs to be used, then it must be declared and its reference should be given to the message- store attribute. A mysql message store can be declared and referenced as follows:
<bean id=" feedsMySqlStore " class="org.springframework.integration.jdbc.JdbcMessageStore"> <property name="dataSource" ref="feedsSqlDataSource"/> </bean>
Data source is Spring framework's standard JDBC data source. The greatest advantage of using persistence store is recoverability—if the system recovers from a crash, all in-memory aggregated messages will not be lost. Another advantage is capacity—memory is limited, which can accommodate a limited number of messages for aggregation, but the database can have a much bigger space.
FTP, or File Transfer Protocol, is used to transfer files across networks. FTP communications consist of two parts: server and client. The client establishes a session with the server, after which it can download or upload files. Spring Integration provides components that act as a client and connect to the FTP server to communicate with it. What about the server—which server will it connect to? If you have access to any public or hosted FTP server, use it. Else, the easiest way for trying out the example in this section is to set up a local instance of the FTP server. FTP setup is out of the scope of this article.
To use Spring Integration components for FTP/FTPS, we need to add a namespace to our configuration file and then add the Maven dependency entry in the pom.xml file. The following entries should be made:
class="org.springframework.integration.
ftp.session.DefaultFtpSessionFactory"> <property name="host" value="localhost"/> <property name="port" value="21"/> <property name="username" value="testuser"/> <property name="password" value="testuser"/> </bean>
The DefaultFtpSessionFactory class is at work here, and it takes the following parameters:
A session pool for the factory is maintained and an instance is returned when required. Spring takes care of validating that a stale session is never returned.
Inbound adapters can be used to read the files from the server. The most important aspect is the session factory that we just discussed in the preceding section. The following code snippet configures an FTP inbound adapter that downloads a file from a remote directory and makes it available for processing:
<int-ftp:inbound-channel-adapter channel="ftpOutputChannel" session-factory="ftpClientSessionFactory" remote-directory="/" local-directory=
"C:\Chandan\Projects\siexample\ftp\ftplocalfolder" auto-create-local-directory="true" delete-remote-files="true" filename-pattern="*.txt" local-filename-generator-expression=
"#this.toLowerCase() + '.trns'"> <int:poller fixed-rate="1000"/> </int-ftp:inbound-channel-adapter>
Let's quickly go through the tags used in this code:
An inbound adapter is a special listener that listens for events on the remote directory, for example, an event fired on the creation of a new file. At this point, it will initiate the file transfer. It creates a payload of type Message<File> and puts it on the output channel. By default, the filename is retained and a file with the same name as the remote file is created in the local directory. This can be overridden by using local- filename-generator-expression.
On the remote server, there could be files that are still in the process of being written. Typically, there the extension is different, for example, filename.actualext.writing. The best way to avoid reading incomplete files is to use the filename pattern that will copy only those files that have been written completely.
Outbound adapters can be used to write files to the server. The following code snippet reads a message from a specified channel and writes it inside the FTP server's remote directory. The remote server session is determined as usual by the session factory. Make sure the username configured in the session object has the necessary permission to write to the remote directory. The following configuration sets up a FTP adapter that can upload files in the specified directory:
<int-ftp:outbound-channel-adapter channel="ftpOutputChannel" remote-directory="/uploadfolder" session-factory="ftpClientSessionFactory" auto-create-directory="true"> </int-ftp:outbound-channel-adapter>
Here is a brief description of the tags used:
The payload on the channel need not necessarily be a file type; it can be one of the following:
Files on the remote server must be made available only when they have been written completely and not when they are still partial. Spring uses a mechanism of writing the files to a temporary location and its availability is published only when it has been completely written. By default, the suffix is written, but it can be changed using the temporary-file-suffix property. This can be completely disabled by setting use-temporary-file- name to false.
Gateway, by definition, is a two-way component: it accepts input and provides a result for further processing. So what is the input and output in the case of FTP? It issues commands to the FTP server and returns the result of the command. The following command will issue an ls command with the option –l to the server. The result is a list of string objects containing the filename of each file that will be put on the reply- channel. The code is as follows:
<int-ftp:outbound-gateway id="ftpGateway" session-factory="ftpClientSessionFactory" request-channel="commandInChannel" command="ls" command-options="-1" reply-channel="commandOutChannel"/>
The tags are pretty simple:
For FTPS support, all that is needed is to change the factory class—an instance of org.springframework.integration.ftp.session.DefaultFtpsSessionFactory should be used. Note the s in DefaultFtpsSessionFactory. Once the session is created with this factory, it's ready to communicate over a secure channel. Here is an example of a secure session factory configuration:
<bean id="ftpSClientFactory" class="org.springframework.integration.ftp.session.
DefaultFtpsSessionFactory"> <property name="host" value="localhost"/> <property name="port" value="22"/> <property name="username" value="testuser"/> <property name="password" value="testuser"/> </bean>
Although it is obvious, I would remind you that the FTP server must be configured to support a secure connection and open the appropriate port.
Any application in today's context is incomplete if it does not provide support for social messaging. Spring Integration provides in-built support for many social interfaces such as e-mails, Twitter feeds, and so on. Let's discuss the implementation of Twitter in this section. Prior to Version 2.1, Spring Integration was dependent on the Twitter4J API for Twitter support, but now it leverages Spring's social module for Twitter integration. Spring Integration provides an interface for receiving and sending tweets as well as searching and publishing the search results in messages. Twitter uses oauth for authentication purposes. An app must be registered before we start Twitter development on it.
Let's look at the steps that need to be completed before we can use a Twitter component in our Spring Integration example:
After performing all these steps, and with the required keys and access token, we are ready to use Twitter. Let's store these in the twitterauth.properties property file:
twitter.oauth.apiKey= lnrDlMXSDnJumKLFRym02kHsy twitter.oauth.apiSecret= 6wlriIX9ay6w2f6at6XGQ7oNugk6dqNQEAArTsFsAU6RU8F2Td twitter.oauth.accessToken= 158239940-FGZHcbIDtdEqkIA77HPcv3uosfFRnUM30hRix9TI twitter.oauth.accessTokenSecret= H1oIeiQOlvCtJUiAZaachDEbLRq5m91IbP4bhg1QPRDeh
The next step towards Twitter integration is the creation of a Twitter template. This is similar to the datasource or connection factory for databases, JMS, and so on. It encapsulates details to connect to a social platform. Here is the code snippet:
<context:property-placeholder location="classpath: twitterauth.properties "/> <bean id="twitterTemplate" class=" org.springframework.social.
twitter.api.impl.TwitterTemplate "> <constructor-arg value="${twitter.oauth.apiKey}"/> <constructor-arg value="${twitter.oauth.apiSecret}"/> <constructor-arg value="${twitter.oauth.accessToken}"/> <constructor-arg value="${twitter.oauth.accessTokenSecret}"/> </bean>
As I mentioned, the template encapsulates all the values. Here is the order of the arguments:
With all the setup in place, let's now do some real work:
<beans twitter-template="twitterTemplate" channel="twitterChannel"> </int-twitter:inbound-channel-adapter>
The components in this code are covered in the following bullet points:
These adapters are further used for other applications, such as for searching messages, retrieving direct messages, and retrieving tweets that mention your account, and so on. Let's have a quick look at the code snippets for these adapters. I will not go into detail for each one; they are almost similar to what have been discussed previously.
<int-twitter:search-inbound-channel-adapter id="testSearch" twitter-template="twitterTemplate" query="#springintegration" channel="twitterSearchChannel"> </int-twitter:search-inbound-channel-adapter>
<int-twitter:dm-inbound-channel-adapter
id="testdirectMessage" twitter-template="twiterTemplate" channel="twitterDirectMessageChannel"> </int-twitter:dm-inbound-channel-adapter>
<int-twitter:mentions-inbound-channel-adapter
id="testmentionMessage" twitter-template="twiterTemplate" channel="twitterMentionMessageChannel"> </int-twitter:mentions-inbound-channel-adapter>
Twitter exposes outbound adapters to send messages. Here is a sample code:
<int-twitter:outbound-channel-adapter twitter-template="twitterTemplate" channel="twitterSendMessageChannel"/>
Whatever message is put on the twitterSendMessageChannel channel is tweeted by this adapter. Similar to an inbound gateway, the outbound gateway provides support for sending direct messages. Here is a simple example of an outbound adapter:
<int-twitter:dm-outbound-channel-adapter twitter-template="twitterTemplate" channel="twitterSendDirectMessage"/>
Any message that is put on the twitterSendDirectMessage channel is sent to the user directly. But where is the name of the user to whom the message will be sent? It is decided by a header in the message TwitterHeaders.DM_TARGET_USER_ID. This must be populated either programmatically, or by using enrichers or SpEL. For example, it can be programmatically added as follows:
Message message = MessageBuilder.withPayload("Chandan") .setHeader(TwitterHeaders.DM_TARGET_USER_ID,
"test_id").build();
Alternatively, it can be populated by using a header enricher, as follows:
<int:header-enricher input-channel="twitterIn" output-channel="twitterOut"> <int:header name="twitter_dmTargetUserId" value=" test_id "/> </int:header-enricher>
As gateways provide a two-way window, the search outbound gateway can be used to issue dynamic search commands and receive the results as a collection. If no result is found, the collection is empty. Let's configure a search outbound gateway, as follows:
<int-twitter:search-outbound-gateway id="twitterSearch" request-channel="searchQueryChannel" twitter-template="twitterTemplate" search-args-expression="#springintegration" reply-channel="searchQueryResultChannel"/>
And here is what the tags covered in this code mean:
This gives us enough to get started with the social integration aspects of the spring framework.
Enterprise landscape is incomplete without JMS—it is one of the most commonly used mediums of enterprise integration. Spring provides very good support for this. Spring Integration builds over that support and provides adapter and gateways to receive and consume messages from many middleware brokers such as ActiveMQ, RabbitMQ, Rediss, and so on.
Spring Integration provides inbound and outbound adapters to send and receive messages along with gateways that can be used in a request/reply scenario. Let's walk through these implementations in a little more detail. A basic understanding of the JMS mechanism and its concepts is expected. It is not possible to cover even the introduction of JMS here. Let's start with the prerequisites.
To use Spring Integration messaging components, namespaces, and relevant Maven the following dependency should be added:
>
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jms</artifactId> <version>${spring.integration.version}</version> </dependency>
After adding these two dependencies, we are ready to use the components. But before we can use an adapter, we must configure an underlying message broker. Let's configure ActiveMQ. Add the following in pom.xml:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>${activemq.version}</version> <exclusions> <exclusion> <artifactId>spring-context</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> <scope>compile</scope> </dependency>
After this, we are ready to create a connection factory and JMS queue that will be used by the adapters to communicate. First, create a session factory. As you will notice, this is wrapped in Spring's CachingConnectionFactory, but the underlying provider is ActiveMQ:
<bean id="connectionFactory" class="org.springframework.
jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost"/> </bean> </property> </bean>
Let's create a queue that can be used to retrieve and put messages:
<bean id="feedInputQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue.input"/> </bean>
Now, we are ready to send and retrieve messages from the queue. Let's look into each message one by one.
Spring Integration provides two ways of receiving messages: polling and event listener. Both of them are based on the underlying Spring framework's comprehensive support for JMS. JmsTemplate is used by the polling adapter, while MessageListener is used by the event-driven adapter. As the name suggests, a polling adapter keeps polling the queue for the arrival of new messages and puts the message on the configured channel if it finds one. On the other hand, in the case of the event-driven adapter, it's the responsibility of the server to notify the configured adapter.
Let's start with a code sample:
<int-jms:inbound-channel-adapter connection-factory="connectionFactory" destination="feedInputQueue" channel="jmsProcessedChannel"> <int:poller fixed-rate="1000" /> </int-jms:inbound-channel-adapter>
This code snippet contains the following components:
There is a poller element, so it's obvious that it is a polling-based adapter. It can be configured in one of two ways: by providing a JMS template or using a connection factory along with a destination. I have used the latter approach. The preceding adapter has a polling queue mentioned in the destination and once it gets any message, it puts the message on the channel configured in the channel attribute.
Similar to polling adapters, event-driven adapters also need a reference either to an implementation of the interface AbstractMessageListenerContainer or need a connection factory and destination. Again, I will use the latter approach. Here is a sample configuration:
<int-jms:message-driven-channel-adapter connection-factory="connectionFactory" destination="feedInputQueue" channel="jmsProcessedChannel"/>
There is no poller sub-element here. As soon as a message arrives at its destination, the adapter is invoked, which puts it on the configured channel.
Outbound adapters convert messages on the channel to JMS messages and put them on the configured queue. To convert Spring Integration messages to JMS messages, the outbound adapter uses JmsSendingMessageHandler. This is is an implementation of MessageHandler. Outbound adapters should be configured with either JmsTemplate or with a connection factory and destination queue. Keeping in sync with the preceding examples, we will take the latter approach, as follows:
<int-jms:outbound-channel-adapter connection-factory="connectionFactory" channel="jmsChannel" destination="feedInputQueue"/>
This adapter receives the Spring Integration message from jmsChannel, converts it to a JMS message, and puts it on the destination.
Gateway provides a request/reply behavior instead of a one-way send or receive. For example, after sending a message, we might expect a reply or we may want to send an acknowledgement after receiving a message.
Inbound gateways provide an alternative to inbound adapters when request-reply capabilities are expected. An inbound gateway is an event-based implementation that listens for a message on the queue, converts it to Spring Message, and puts it on the channel. Here is a sample code:
<int-jms:inbound-gateway request-destination="feedInputQueue" request-channel="jmsProcessedChannel"/>
However, this is what an inbound adapter does—even the configuration is similar, except the namespace. So, what is the difference? The difference lies in replying back to the reply destination. Once the message is put on the channel, it will be propagated down the line and at some stage a reply would be generated and sent back as an acknowledgement. The inbound gateway, on receiving this reply, will create a JMS message and put it back on the reply destination queue. Then, where is the reply destination? The reply destination is decided in one of the following ways:
An exception will be thrown by the gateway if it does not find either of the preceding two ways.
Outbound gateways should be used in scenarios where a reply is expected for the send messages. Let's start with an example:
<int-jms:outbound-gateway request-channel="jmsChannel" request-destination="feedInputQueue" reply-channel="jmsProcessedChannel" />
The preceding configuration will send messages to request-destination. When an acknowledgement is received, it can be fetched from the configured reply-destination. If reply-destination has not been configured, JMS TemporaryQueues will be created.
In this article, we covered out-of-the-box component provided by the Spring Integration framework such as aggregator. This article also showcased the simplicity and abstraction that Spring Integration provides when it comes to handling complicated integrations, be it file-based, HTTP, JMS, or any other integration mechanism.
Further resources on this subject: