Spring Cloud Stream
We will not focus on Spring Cloud in this part; we will do that in Part 2 of the book, from Chapter 8, Introduction to Spring Cloud, to Chapter 14, Understanding Distributed Tracing. However, we will bring in one of the modules that’s part of Spring Cloud: Spring Cloud Stream. Spring Cloud Stream provides a streaming abstraction over messaging, based on the publish and subscribe integration pattern. Spring Cloud Stream currently comes with built-in support for Apache Kafka and RabbitMQ. A number of separate projects exist that provide integration with other popular messaging systems. See https://github.com/spring-cloud?q=binder for more details.
The core concepts in Spring Cloud Stream are as follows:
- Message: A data structure that’s used to describe data sent to and received from a messaging system.
- Publisher: Sends messages to the messaging system, also known as a Supplier.
- Subscriber: Receives messages from the messaging system, also known as a Consumer.
- Destination: Used to communicate with the messaging system. Publishers use output destinations and subscribers use input destinations. Destinations are mapped by the specific binders to queues and topics in the underlying messaging system.
- Binder: A binder provides the actual integration with a specific messaging system, similar to what a JDBC driver does for a specific type of database.
The actual messaging system to be used is determined at runtime, depending on what is found on the classpath. Spring Cloud Stream comes with opinionated conventions on how to handle messaging. These conventions can be overridden by specifying a configuration for messaging features such as consumer groups, partitioning, persistence, durability, and error handling; for example, retries and dead letter queue handling.
Code examples for sending and receiving messages
To better understand how all this fits together, let’s look at some source code examples.
Spring Cloud Stream comes with two programming models: one older and nowadays deprecated model based on the use of annotations (for example, @EnableBinding
, @Output
, and @StreamListener
) and one newer model based on writing functions. In this book, we will use functional implementations.
To implement a publisher, we only need to implement the java.util.function.Supplier
functional interface as a Spring Bean. For example, the following is a publisher that publishes messages as a String:
@Bean
public Supplier<String> myPublisher() {
return () -> new Date().toString();
}
A subscriber is implemented as a Spring Bean implementing the java.util.function.Consumer
functional interface. For example, the following is a subscriber that consumes messages as Strings:
@Bean
public Consumer<String> mySubscriber() {
return s -> System.out.println("ML RECEIVED: " + s);
}
It is also possible to define a Spring Bean that processes messages, meaning that it both consumes and publishes messages. This can be done by implementing the java.util.function.Function
functional interface. For example, a Spring Bean that consumes incoming messages and publishes a new message after some processing (both messages are Strings in this example):
@Bean
public Function<String, String> myProcessor() {
return s -> "ML PROCESSED: " + s;
}
To make Spring Cloud Stream aware of these functions, we need to declare them using the spring.cloud.function.definition
configuration property. For example, for the three functions defined previously, this would look as follows:
spring.cloud.function:
definition: myPublisher;myProcessor;mySubscriber
Finally, we need to tell Spring Cloud Stream what destination to use for each function. To connect our three functions so that our processor consumes messages from our publisher and our subscriber consumes messages from the processor, we can supply the following configuration:
spring.cloud.stream.bindings:
myPublisher-out-0:
destination: myProcessor-in
myProcessor-in-0:
destination: myProcessor-in
myProcessor-out-0:
destination: myProcessor-out
mySubscriber-in-0:
destination: myProcessor-out
This will result in the following message flow:
myPublisher → myProcessor → mySubscriber
A supplier is triggered by Spring Cloud Stream by default every second, so we could expect output like the following if we start a Spring Boot application including the functions and configuration described previously:
ML RECEIVED: ML PROCESSED: Wed Mar 09 16:28:30 CET 2021
ML RECEIVED: ML PROCESSED: Wed Mar 09 16:28:31 CET 2021
ML RECEIVED: ML PROCESSED: Wed Mar 09 16:28:32 CET 2021
ML RECEIVED: ML PROCESSED: Wed Mar 09 16:28:33 CET 2021
In cases where the supplier should be triggered by an external event instead of using a timer, the StreamBridge
helper class can be used. For example, if a message should be published to the processor when a REST API, sampleCreateAPI
, is called, the code could look like the following:
@Autowired
private StreamBridge streamBridge;
@PostMapping
void sampleCreateAPI(@RequestBody String body) {
streamBridge.send("myProcessor-in-0", body);
}
Now that we understand the various Spring APIs, let’s learn a bit about Docker and containers in the next section.