CC's application allows its taxis to organize themselves into groups by registering their topics of interest. The new feature to roll out will allow clients to send an order request to all taxis within a particular topic. It turns out that this feature matches a specific exchange routing rule, not surprisingly called topic! This type of exchange allows us to route the message to all the queues that have been bound with a routing key matching the routing key of the message. So, unlike the direct exchange that routes a message to one queue maximum, the topic exchange can route it to multiple queues. Two other examples of where topic-based routing could be applied are to location-specific data, such as traffic warning broadcasts, or to trip price updates.
A routing pattern consists of several words separated by dots. A best practice to follow is to structure routing keys from the most general element to the most specific one, such as news.economy.usa or europe.sweden.stockholm.
The topic exchange supports strict routing key matching and will also perform wildcard matching using * and # as placeholders for exactly one word and zero or more words, respectively.
The following diagram illustrates how the topic exchange will be used in CC's application. Notice how the single inbox queue remains unchanged and simply gets connected to the topic exchange via extra bindings, each of them reflecting the interest of a user:
Because the same inbox is used for everything, the code that's already in place for fetching messages doesn't need to be changed. In fact, this whole feature can be implemented with only a few additions. The first of these additions takes care of declaring the topic exchange in the existing on_start method, as follows:
def on_start(channel)
# Declare and return the topic exchange, taxi-topic
channel.topic("taxi-topic", durable: true, auto_delete: true)
end
There's nothing really new or fancy here; the main difference is that this exchange is called taxi-topic and is a topic type of exchange. Sending a message is even simpler than with the client-to-taxi feature because there is no attempt to create the addressee's queue. It wouldn't make sense for the sender to iterate through all the users to create and bind their queues, as only users already subscribed to the target topic at the time of sending will get the message, which is exactly the expected functionality. The order_taxi method is listed here:
# Publishing an order to the exchange
def order_taxi(type, exchange)
payload = "example-message"
message_id = rand
exchange.publish(payload,
routing_key: type,
content_type: "application/json",
content_encoding: "UTF-8",
persistent: true,
message_id: message_id)
end
exchange = on_start(channel)
# Order will go to any eco taxi
order_taxi('taxi.eco', exchange)
# Order will go to any eco taxi
order_taxi('taxi.eco', exchange)
# Order will go to any taxi
order_taxi('taxi', exchange)
# Order will go to any taxi
order_taxi('taxi', exchange)
The difference is that, now, messages are published to the taxi-topic exchange. The rest of the code that creates and publishes the message is exactly the same as the client-to-taxi messaging. Lastly, information needs to be added when a new taxi subscribes or unsubscribes from certain topics:
# example_consumer.rb
def taxi_topic_subscribe(channel, taxi, type)
# Declare a queue for a given taxi
queue = channel.queue(taxi, durable: true)
# Declare a topic exchange
exchange = channel.topic('taxi-topic', durable: true, auto_delete: true)
# Bind the queue to the exchange
queue.bind(exchange, routing_key: type)
# Bind the queue to the exchange to make sure the taxi will get any order
queue.bind(exchange, routing_key: 'taxi')
# Subscribe from the queue
queue.subscribe(block:true,manual_ack: false) do |delivery_info, properties, payload|
process_order(payload)
end
end
taxi = "taxi.3"
taxi_topic_subscribe(channel, taxi, "taxi.eco.3")
taxi.3 is the new environmentally friendly taxi, now ready to receive orders from clients that want an environmentally friendly car.
The AMQP specification does not provide any means to introspect the current bindings of a queue, so it is not possible to iterate them and remove the ones not needed anymore in order to reflect a change in a taxi's topics of interest. This is not a terrible concern because the application is required to maintain this state anyway.
With this new code in place, everything works as expected. No code changes are needed to retrieve the new client-to-taxi orders because they arrive in the same inbox queue as the previous messages. Topical messages are sent and received correctly by the taxi cars, and all this happens with a minimal change and no increase in the number of queues. When connected to the management console, click on the Exchanges tab; the only visible difference is the new exchange topic; that is, taxi-topic.