RabbitMQ Partial Order Implementation using Consistent Hash Exchange. Golang.

Messaging and queues in general

Today, any system that is a little more than just a TODO list app, cannot live without messaging system. There are many reasons to introduce queues in your system. Let’s go through the most popular:

  • Reliability
  • Resiliency
  • Scaling
  • Asynchronous communication
  • Delayed/long-running jobs
  • Decoupling
  • Load Control
  • Etc.

Reliability

Imagine an API, which is receiving orders of some products. Ordering a product is not a simple operation. It can include checking the warehouse to see if the product is still in the stock, payment processing, initiating delivery, sending emails and etc. Naturally, multiple databases and even 3rd party services will be involved in this “simple” flow.

Resiliency

This one is directly related to the concept of reliability. To be resilient, you must be reliable first. Once you have a message in the queue, you know that if some service is not available right now, you can always try later. You can apply different strategies for processing the message.

Scaling

Think about API service like lightweight endpoints which do validations and send messages. those operations are very fast. You can send thousands of messages in a second to the queue.

Asynchronous communication

Imagine you are ordering pizza, and you need to be on the phone until the delivery guy will come to your apartment? Sounds weird…

Delayed/long-running jobs

Queues often used as a solution for delayed and long-running job processing.

Decoupling

By introducing a queue in the system, you are actually decoupling the part of the system that creates messages from the part of the system that consumes messages.

Load Control

The queue can be used to control the load and provide predictable behavior.

Summary on queues

Which data can be sent to the queues? It can be analytics data that are streamed in real-time directly from the websites, data received from IoT or mobile devices, business events, and many more.

What is RabbitMQ?

RabbitMQ is one of the most popular open-source message brokers. Think about it like a very smart buffer, that sits between sender (producer) and receiver (consumer), and knows to deliver messages from the producers to consumers according to some rules, and decoupling your system as a side effect.

Simplest RbbitMQ Topology with multiple producers and multiple consumers

Basic RabbitMQ concepts

RabbitMQ has 3 main concepts: exchanges, queues, and bindings.

Exchanges

In RabbitMQ, messages are not published directly to the queue. Instead, producers send messages to exchange and know nothing about the queues and consumers.

Queues

Consumers, in differ from producers, read messages from specific queues. The consumer initializes a TCP connection to the RabbitMQ, opens the channel (logical connection to the queue) to the queue, and starts receiving the messages. By using multiple consumers on the same queue, our system can scale.

Bindings

How does the exchange know where it should send the message? This is where bindings do their job. Bindings are virtual connections between exchanges and queues or other exchanges.

Basic RabbitMQ Exchange Types

Fanout Exchange

This type of exchange is the dumbest and fastest exchange RabbitMQ provides us. A fanout exchange just copies and routes a received message to all queues that are bound to it regardless of routing keys.

Direct Exchange

This exchange delivers messages to queues by comparing provided routing key with existing bindings. If multiple queues bound with the same key, messages with this routing key will be copied to all of them.

Topic Exchange

Topic exchanges are similar to direct exchange but much smarter and flexible. The idea is that you can define a kind of “regex” matcher as opposite to equal direct exchange that only knows compare for equality.

  • eu.order.regular
  • usa.order.vip
  • eu.order.
  • .order.vip
  • usa.order.vip.hello
  • order.vip.hello.world

Header Exchange

A headers exchange routes messages based on the values in the headers that can be assigned to each message before publishing. It’s a kind of topic exchange on steroids.

Message Ordering

When we are talking about message ordering, we basically imply two types of ordering.

  • Global ordering
  • Partial ordering
  • ProductAdded
  • ProductTitleChangeRequest
  • ProductDeleted
  • OrderCreated
  • OrderUpdated
  • OrderCancelled
  • OrderDeleted
Pay attention for different product and order ids in the payload

Partial Ordering over Global Ordering

So, what can we do?

Expected behavior when messages for the same product or order processed serially with the order they were published. We did lose global order, but who cares?

RabbitMQ Consistent Hash Exchange Type to rescue

RabbitMQ has a built-in plugin, named rabbitmq_consistent_hash_exchange, which can be used to achieve our goal. In simple words, this plugin knows to apply some smart hash function to the provided routing key or special header value, and route the message to the same queue.

But,…

There is always but…

RabbitMQ Single Active Consumer feature

As said in the documentation, a single active consumer allows having only one consumer at a time consuming from a queue and to failover to another registered consumer in case the active one is canceled or died.

Solution

We will implement a consumer, with multiple queues connection support.

Project high-level overview

To emulate deployments locally we will use Nomad.

Project Structure

The project will consist of multiple executables:

  • Manager — cli
  • Producer — will generate dummy data and publish messages to exchange
  • Consumer — will subscribe to at least one queue and process messages
  • Abstraction over RabbitMQ driver

We will implement

  • Creating exchanges/queue/bindings
  • Producer with auto-reconnect
  • Consumer with multiple queues subscribing
  • Gracefull shutdown
  • Manager — cli

Test dataset

I’ll use a free dataset in the producer with thousands of messages for testing our POC.

Enough with the theory!

I’ll not show all the source code here. Just important stuff. You can find the link to the full source code at the bottom.

Topology

Our POC will support architecture, where multiple teams are interested in the same messages and want to preserve Partial Ordering.

Producer

The producer will connect to RabbitMQ, create the channel, declare the main fanout exchange, initialize listeners for connection errors, and set up tear down.

Initialize listeners for returned messages and network errors
Sending message
Publishing message to RabbitMQ

Consumer

Consumer, after the connection with RabbitMQ, has been established, will try to declare the same main fanout exchange like the producer does. This operation is idempotent.

Init exchanges and queues
Example of handler

RUNNING THE PROJECT

If you want to play with it locally, you need:

  • go version go1.14.1 darwin/amd64
  • Docker

NOMAD SETUP

Install the Nomad (more in official docs).

brew tap hashicorp/tap
brew install hashicorp/tap/nomad
nomad agent-dev -bind 0.0.0.0 -log-level DEBUG -dc local
cd ./gorabbit/infra/cli
go run cli.go SystemUp
Username: user
Password: bitnami
go run cli.go up
infra/nomad/jobs/orders_marketing.nomad
infra/nomad/jobs/orders_processing.nomad
infra/cli/manifest.json
In the case of orders_marketing, each instance subscribed to exactly one queue
In the case of orders_marketing consumers, each instance subscribed to two different queues. This is how we scale the instances.
export RABBITMQ_CONNECTION_STRING=amqp://user:bitnami@localhost:5672
export WORKING_DIRECTORY=/Users/alexander/git/gorabbit
export TOPIC=orders
producer
In this case in production, before enabling the new feature, ensure consumers were deployed and all the exchanges and queues were set up.
go run cli.go rescale
The nacked message will be received by the new subscribed consumer
go run cli.go shutdown
go run cli.go SystemShutDown

Source code

Enjoy :)

Disclaimer

The main purpose of the code is to show some ideas on how to use this feature. It's far from being production-ready, has no tests and I’m sure has bugs 😬

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alexander Ravikovich

Alexander Ravikovich

In GO we trust. Software Engineer. @Isreal