Please note: this blog has been migrated to a new location at https://jakesgordon.com. All new writing will be published over there, existing content has been left here for reference, but will no longer be updated (as of Nov 2023)

SOA - using AMQP

Wed, Sep 3, 2014
Service Oriented Design with Ruby and Rails - Paul Dix

In the introductory article we talked about why we might implement a Service Oriented architecture in order to:

We also talked a little bit of theory on what an SOA looks like, how to define the boundaries for our services, and the practical implications of building a distributed system.

In the previous article we showed how to build an SOA using HTTP that implements the 3 main communication patterns of a distributed system:

This article will show how to implement the same 3 communication patterns using a single technology, an AMQP message broker such as RabbitMQ.

Table of Contents

About RabbitMQ

Using the RabbitMQ broker requires the use of a language-specific client library. For Ruby we have a couple of options:

For most purposes, the Bunny gem is the simplest option. Indeed the RabbitMQ tutorial’s themselves are implemented using the Bunny gem:

Installing RabbitMQ

Install a RabbitMQ broker (more details):

$ sudo apt-get install rabbitmq-server

… along with a Ruby client library (more details):

$ sudo gem install bunny

Synchronous Request/Response

Just like HTTP we need to host a service and make requests from a client

Hosting the Service

For a simple service we can implement a RabbitMQ consumer that subscribes to a named queue to receive incoming requests and then publishes the response to an exclusive reply queue for that client.

For example, consider a service that returns the current (random) weather for a given location, implemented in weather_service.rb:

#!/usr/bin/env ruby

require 'bunny'

module WeatherService

  #-------------------------------------------

  QUEUE = "weather"

  #-------------------------------------------

  def self.run

    conn = Bunny.new
    conn.start

    channel  = conn.create_channel
    exchange = channel.default_exchange
    queue    = channel.queue(QUEUE)

    queue.subscribe(:block => true) do |delivery, properties, location|
      puts "Weather requested for #{location}"
      response = WeatherService.forecast(location)
      exchange.publish(response, {
        :routing_key    => properties.reply_to,
        :correlation_id => properties.correlation_id
      })
    end

  end

  #-------------------------------------------

  def self.forecast(location)
    "It's #{['hot', 'warm', 'cold', 'freezing'].sample} in #{location}"
  end

  #-------------------------------------------
    
end

WeatherService.run

We can run the service from the command line:

$ ruby weather_service.rb

NOTE: In the previous article we discussed using Unicorn and Sinatra to host our HTTP services in order to provide a robust production environment including daemonization, clustering, logfile/pidfile management, etc. If, instead of HTTP, we have chosen AMQP as our SOA transport then there are not (yet) robust service hosting applications…. but for a sneak preview of one that is in development (and will be introduced in the next article in this series), you can check out the RackRabbit project.

Making Client Requests

Where HTTP is designed for request/response, a message broker is designed primarily for asynchronous messaging. Therefore request/response using a message broker is actually a little more complex. In order to wait for a response, the client must subscribe to an exclusive reply queue, and since the subscription handler occurs in a different thread, this involves thread synchronization in order to block the primary thread while waiting for the response.

We can implement a weather.rb client executable script as follows:

#!/usr/bin/env ruby

require "bunny"
require "securerandom"

QUEUE = "weather"

conn = Bunny.new
conn.start

location    = ARGV[0] || "London"
channel     = conn.create_channel
exchange    = channel.default_exchange
reply_queue = channel.queue("", :exclusive => true)
message_id  = SecureRandom.uuid
lock        = Mutex.new
condition   = ConditionVariable.new
response    = nil

# PREPARE the response handler
reply_queue.subscribe do |deliver, properties, body|
  if properties[:correlation_id] == message_id
    response = body
    lock.synchronize { condition.signal }
  end
end

# PUBLISH the request
exchange.publish(location, {
  :routing_key    => QUEUE,
  :correlation_id => message_id,
  :reply_to       => reply_queue.name
})

# WAIT for the response
lock.synchronize { condition.wait(lock) }
puts response

… and test it from the command line:

$ ruby weather.rb Seattle
It's hot in Seattle

$ ruby weather.rb Boston
It's cold in Boston

NOTE: This request/response dance using an exclusive reply queue and a thread synchronization mechanism is fairly complex, and is boilerplate framework code that should be extracted out into a separate library. For a sneak preview of a library that is in development (and will be introduced in the next article in this series), you can check out the RackRabbit client library.

Asynchronous Worker Queue

To implement a worker queue with RabbitMQ we create a consumer process that subscribes to a named queue. We can run any number of instances of the process and RabbitMQ will distribute the requests among the active consumers.

The following example shows a worker process subscribing to a named queue and extracting it’s arguments from serialized JSON in the message body:

#!/usr/bin/env ruby

require "bunny"
require "json"

module MyWorker

  #----------------------------------------------

  QUEUE = "tasks"

  #----------------------------------------------

  def self.run

    conn = Bunny.new
    conn.start

    channel = conn.create_channel
    queue   = channel.queue(QUEUE)

    queue.subscribe(:block => true) do |delivery, properties, body|
      perform(JSON.parse(body))
    end

  end

  #----------------------------------------------

  def self.perform(args)
    action = args["action"]
    amount = args["amount"]
    amount.times do |i|
      puts "#{action} for #{i} seconds"
      sleep 1
    end
  end

  #----------------------------------------------

end

MyWorker.run

We can run the worker from the command line:

$ ruby worker.rb

A client process can enqueue tasks for the worker by publishing a serialized JSON message into the RabbitMQ queue. The following executable script, enqueue.rb shows an example:

#!/usr/bin/env ruby

require "bunny"
require "json"

QUEUE = "tasks"

conn = Bunny.new
conn.start

channel = conn.create_channel
queue   = channel.queue(QUEUE)
action  =  ARGV[0] || "waiting"
amount  = (ARGV[1] || 5).to_i
args    = { :action => action, :amount => amount }

channel.default_exchange.publish(args.to_json, {
  :routing_key => queue.name
})

conn.close

Tasks can now be enqueued from the command line:

$ ruby enqueue.rb reading  2
$ ruby enqueue.rb sleeping 3
$ ruby enqueue.rb eating   4

… and we should see output in our other terminal where the worker process is running:

$ ruby worker.rb
reading for 0 seconds
reading for 1 seconds
sleeping for 0 seconds
sleeping for 1 seconds
sleeping for 2 seconds
eating for 0 seconds
eating for 1 seconds
eating for 2 seconds
eating for 3 seconds

Asynchronous Publish/Subscribe

To implement publish/subscribe in RabbitMQ the client publishes to an exchange instead of directly to a specific named queue. Subscribers can bind to the exchange in a variety of ways. The simplest solution is to use a fanout exchange. RabbitMQ will route the message to all subscribers of a fanout exchange.

More selective routing can be peformed by using a topic exchange instead of a fanout exchange. See AMQP concepts for more information on the different types of exchange, and RabbitMQ Tutorials for details on how to use them.

For example, we can implement a subscriber in subscribe.rb:

#!/usr/bin/env ruby

require "bunny"

module Subscriber

  EXCHANGE = "events"

  def self.run(name)
    conn = Bunny.new
    conn.start
    channel  = conn.create_channel
    exchange = channel.fanout(EXCHANGE)
    queue    = channel.queue("", :exclusive => true)
    queue.bind(exchange)
    queue.subscribe(:block => true) do |delivery_info, properties, event|
      puts "#{name} saw #{event}"
    end
  end

end

Subscriber.run(ARGV[0] || "A Subscriber")

We can run multiple subscribers (in different terminals):

$ ruby subscribe FOO
$ ruby subscribe BAR

We can publish an event as follows:

#!/usr/bin/env ruby

require "bunny"

EXCHANGE = "events"

conn = Bunny.new
conn.start

event    = ARGV[0] || "An Event"
channel  = conn.create_channel
exchange = channel.fanout(EXCHANGE)

exchange.publish(event)

conn.close

Finally, we can test the system from the command line:

$ ruby publish eggs
$ ruby publish sausages
$ ruby publish pancakes

… and we should see output in our other terminals where the worker processes are running:

$ ruby subscribe FOO
FOO saw eggs
FOO saw sausages
FOO saw pancakes
$ ruby subscribe BAR
BAR saw eggs
BAR saw sausages
BAR saw pancakes

Up Next…

This article has shown how to implement the 3 primary communication patterns of a distributed system using RabbitMQ and the Bunny client library.

If you are new to RabbitMQ and Bunny then the examples in this article may seem complex, especially compared to more well-known mechanisms such as HTTParty, Rack, Redis and Resque. However, it is important to recognize that much of the RabbitMQ/Bunny code is boilerplate code and could easily be extracted into a common library and exposed with a much simpler interface.

We could imagine exposing a simpler interface in our client side code, something like:

rabbit.request("queue",    "message")  # synchronous request/response
rabbit.enqueue("queue",    "message")  # asynchronous worker queue
rabbit.publish("exchange", "message")  # asynchronous pub/sub

We could also imagine exposing a simpler mechanism for building and hosting consumer services, since they all, ultimately, simply subscribe to a queue. Perhaps a Unicorn-style server that will subscribe to a queue instead of listening on a socket…. perhaps it can automatically wrap the inbound message into a Rack environment and pass it off to be handled to any rack application… perhaps one built with Sinatra…

… well in fact there is such a project in development, it’s called RackRabbit, and it will be introduced in the next article in this series.

Ruby client libraries:

RabbitMQ and AMQP:

RabbitMQ Tutorials:

RabbitMQ Articles: