The basic concept and initial setup has already been covered, so let's jump in and send the first messages!
First, let's take a look at the order_taxi method, which is in charge of sending messages for the initial car request:
def order_taxi(taxi, exchange)
payload = "example-message"
message_id = rand
exchange.publish(payload,
routing_key: taxi,
content_type: "application/json",
content_encoding: "UTF-8",
persistent: true,
message_id: message_id)
end
exchange = on_start(channel)
order_taxi("taxi.1", exchange)
order_taxi is going to be called every time a user wants to order a taxi. There is no guarantee that the addressee has ever logged into the system, so as far as the sender is concerned, it's impossible to be sure the destination queue exists. The safest path is to declare the queue on every message sent, bearing in mind that this declare operation is idempotent, so it will not do anything if the queue already exists. This may seem strange at first, but it's the sender's responsibility to ensure the addressee's queue exists if they want to be sure the message will not get lost.
This is a common pattern with AMQP when there is no strong happens-before relationship between events. Re-declaration is the way to go. Conversely, the check-then-act pattern is discouraged; trying to check the pre-existence of an exchange or a queue does not guarantee success in the typical distributed environment where AMQP is used.
The method for publishing a message is very simple; call publish toward the exchange. Then, use the queue name as the routing key (as per the direct routing) and an array of bytes that represent the actual message payload. It's possible to add some optional message properties, which could include the following:
- content_type (string): A message is published and consumed as a byte array, but nothing really says what these bytes represent. In the current situation, both publishers and consumers are in the same system, so it could be assumed that the content type is expected. That being said, always specify the content type so that messages are self-contained; whichever system ends up receiving or introspecting a message will know for sure what the byte array it contains represents.
- content_encoding (string): A specific encoding (UTF-8) is used when serializing string messages into byte arrays so that they can be published. Again, in order for the messages to be self-explicit, provide all the necessary meta-information to allow them to be read.
- message_id (string): As demonstrated later in this book, message identifiers are an important aspect of traceability in messaging and distributed applications. In the example is a random message id generated.
- persistent (boolean): Specifies if the message should be persisted to disk or not.
Additionally, persistent messages in a non-persistent queue will be gone after a broker restart, also leaving you with an empty queue.
Ensure that messages are not lost by declaring a queue as durable and setting the message delivery mode to persistent.
But what would happen if sending the message fails, such as when the connection with RabbitMQ is broken?
Before going any further, let's take a look at the structure of an AMQP message.
AMQP message structure
The following screenshot illustrates the structure of an AMQP message and includes the four AMQP message properties just used, plus a few new ones. Note that this diagram uses the specification name of the fields and that each language implementation renames them slightly so that they can be valid names. For example, content-type becomes contentType in Java, and content_type in Ruby:
Except for reserved, all these properties are free to use and, unless otherwise specified, are ignored by the AMQP broker. In the case of RabbitMQ, the only field that is supported by the broker is the user-id field, which is validated to ensure it matches the name of the broker user that established the connection. Notice how the headers property allows us to add extra key-value pairs in case none of the standard properties fit the requirements.
The next section explains how messages are consumed.
Consuming messages
Now, let's turn our attention to the method in charge of retrieving messages, which is step 4 in the main architecture of CC, which can be found in the The application architecture behind CC section.
Here, the taxi application can check the queue for new messages at a regular interval. This is a so-called synchronous approach. This would mean holding the application thread in charge of dealing with the poll requests until all pending messages have been removed from the queue, as illustrated in the following diagram:
A frontend regularly polling the backend for messages would soon start to take its toll in terms of load, meaning that the overall solution would begin to suffer from performance degradation.
Instead, CC wisely decides to build the solution in favor of a server-push approach. The idea is to server-push messages to the clients from the broker. Luckily, RabbitMQ offers two ways to receive messages: there's the polling-based basic.get method and the push-based basic.consume method. As illustrated in the following diagram, messages are pushed to the consumer:
The subscribe method adds a consumer to the queue, which then subscribes to receive message deliveries.
With subscribe, the messages are delivered to the client from the broker when new messages are ready and the client has availability. This allows, in general, the smooth processing of messages. Additionally, using subscribe means that a consumer is connected as long as the channel it was declared on is available or until the client cancels it.
The message process is running smoothly and effortlessly, almost as if nothing is happening! That is, of course, until alerts are set in motion to acknowledge and/or negative acknowledge whether a part of the process has run as expected, or not as planned.
Acknowledgment and negative acknowledgment
RabbitMQ needs to know when a message can be considered successful in terms of being sent to the consumer as expected. The broker should then delete messages from the queue once the broker receives the response; otherwise, the queue would overflow. The client can reply to the broker by either acking (acknowledge) the message when it receives it or when the consumer has completely processed the message. In either situation, once the message has been ack:ed, it's removed from the queue.
Therefore, it's up to the consumer to acknowledge a message if and only if it is done with processing, or if it is certain that there is no risk of losing the message if it is processed asynchronously.
To avoid a situation where a message could be forever lost (for example, worker crashed, exceptions, and so on), the consuming application should not acknowledge a message until it is completely finished with it.
A message is rejected by an application when the application indicates to the broker that processing has failed or cannot be accomplished at the time. Nack, or negative-acknowledge, tells RabbitMQ that a message was not handled as instructed. Nack'ed messages, by default, are sent back to the queue for another try.
Acknowledges will be covered in detail in Chapter 3, Sending Messages to Multiple Taxi Drivers.
Ready? Set? Time to RUN, Rabbit!
Running the code
Now, it's time to set up some code for the consumer. You'll be able to recognize most of this code from the previous section, Sending the first messages:
- Require client libraries.
- Read RABBITMQ_URI from ENV.
- Start a communication session with RabbitMQ.
- Declare a queue for a given taxi.
- Declare a direct exchange, taxi-direct.
- Bind the queue to the exchange.
- Subscribe to the queue.
What follows is the code that's required for the initial consumer setup:
# example_consumer.rb
# 1. Require client library
require "bunny"
# 2. Read RABBITMQ_URI from ENV
connection = Bunny.new ENV["RABBITMQ_URI"]
# 3. Start a communication session with RabbitMQ
connection.start
channel = connection.create_channel
# Method for the processing
def process_order(info)
puts "Handling taxi order"
puts info
sleep 5.0
puts "Processing done"
end
def taxi_subscribe(channel, taxi)
# 4. Declare a queue for a given taxi
queue = channel.queue(taxi, durable: true)
# 5. Declare a direct exchange, taxi-direct
exchange = channel.direct("taxi-direct", durable: true, auto_delete: true)
# 6. Bind the queue to the exchange
queue.bind(exchange, routing_key: taxi)
# 7. Subscribe from the queue
queue.subscribe(block: true, manual_ack: false) do |delivery_info, properties, payload|
process_order(payload)
end
end
taxi = "taxi.1"
taxi_subscribe(channel, taxi)
Here, two flags were added to the subscribe method that need to be explained. Let's look at them in detail:
- block (Boolean, default false): Should the call block the calling thread? This option can be useful for keeping the main thread of a script alive. It is incompatible with automatic connection recovery and is not generally recommended.
- manual_ack (Boolean, default false): In CC's case, since the risk of losing a message is acceptable during this phase, the system does not manually acknowledge messages. Instead, it informs the broker to consider them as acknowledged as soon as it fetches them (more on manual acknowledgment later in this book).
And that's it! CC now has a working order request inbox ready to be tested. Next, we'll look at the management console when activated taxis are running.
Running the application
With the application running and a server connected to RabbitMQ, the following established connections can be seen from the management console:
Notice how the upstream and downstream network throughputs are clearly represented, and that the channels that get opened and closed very quickly are hard to see from the management console. So, let's look at the following exchanges:
The user exchange and the rate of messages coming in and out are shown in the management console. The fact that they are being consumed as fast as they come in is a good sign that the current architecture is sufficient for CC's needs and that messages are not piling up. But what are all these other exchanges that haven't been created by code and where are they coming from? The nameless exchange represented as (AMQP default) and all the exchanges with names that start with amq. are defined by the AMQP specification and, as such, must be provided by default by RabbitMQ. Now, what about queues? Let's have a look:
As expected, there is one queue per taxi and some nifty usage statistics. Notice how the ack column is empty, which is no surprise, given how message acknowledgment works. The queue is receiving messages while letting RabbitMQ know it won't be acknowledging them, so there is no activity related to acknowledging messages.
With enough RAM, RabbitMQ can deal with hundreds of queues and bindings without a problem, so multiple queues are not an issue.
Confident about its architecture and implementation, CC rolls out the client-to-taxi ordering subsystem. The client can send the request and the taxi can handle the request.
CC quickly expands the company with two new environmentally friendly cars. As in the previous solution, a client needs to send an order request message to a certain driver. Now, a new feature was requested – the capacity to send a message to a group of taxi cars. It should be possible for clients to select a normal taxi or an environmentally friendly taxi. Let's see how CC will implement this new feature through the power of RabbitMQ.