Introduction to AMQP and RabbitMQ
RabbitMQ is a message broker that implements the Advanced Message Queue Protocol (AMQP). Other software can connect to an AMQP server to send and receive messages. There are many different things that make message brokers useful:
-
For example, if you’re developing a web app and you need to do some heavy processing; e.g. the user uploads a video and you want to prepare low resolution versions of it. Doing this while processing the HTTP POST would take too long so you do it asynchronously by posting a “please prepare low res versions of this video” message to the queue instead.
-
A nice side-effect is that you can implement message senders and receivers in different languages (there are decent quality AMQP libaries for most languages).
-
The message broker comes with a lot of tools that can be used for monitoring and administration of the queues. In the example above, if several video conversion workers service the same queue you can monitor the queue length and dynamically spin up new worker instances if the queue becomes too long.
-
Other than the “work queue” example above, you can also run queues with publisher/subscriber (pubsub) behavior so that a single message is copied out to N subscribers. For example, if a user in a webapp hits “Delete my account” or similar then a number of backend systems/processes could be listening to this message and take different actions. To some extent this “loose coupling” allows different systems/processes to communicate/cooperate without having hardcoded references to each other.
-
Finally, you can configure your queues/messages to be persistent (on disk) so that a message always remains in the queue until it has been fully processed, even if the worker or even RabbitMQ itself crashes (RabbitMQ is implemented in Erlang where fault tolerance by being restartable is common).
It should be noted that there are several message brokers available that implement AMQP, and there are also a bit of a split between AMQP 0.9.1 and AMQP 1.0 because the former specfies transport + model while the latter only specifies the transport protocol (leaving the users locked-in to a specific AMQP implementation/vendor). There is also STOMP, an alternative wire protocol that is implemented by many brokers. At any rate, this is certainly much better than the old JavaEE JMS which standardized the API rather than the wire protocol, which was a really bad idea.
Even sticking to AMQP, there are a lot of different brokers to choose from and they all differ in terms of easy-of-use, scalability and tool support etc. ZeroMQ for example is implemented in C++ and can link as a library directly to the sending and receiving processes, eliminating the need for a separate message broker server. This nets you much better performance, but it’s more work to get started especially for more advanced setups that include load balancing, persistent messages and routing.
If you need to handle a very large number of messages per second and you don’t need a lot advanced routing then you might want to look into Apache Kafka as a potential alternative. If you need millions of workers on a single machine (i.e. creating a thread per worker won’t work) you might want to build something on top of an Actor-model solution like for example Akka instead of using AMQP.
How to setup RabbitMQ
To install RabbitMQ on a Debian/Ubuntu system do sudo apt-get install
rabbitmq-server
. After that, if you list the open ports using for example
sudo netstat -tuplen
you will see that a process called beam.smp
has
opened port 5672
; beam is the Erlang runtime and 5672 is used for non-TLS
AMQP traffic. By default, AMQP clients can login using guest/guest
. Even if
the socket is bound on all interfaces, the default credentials only works if you
connect from localhost. I tried to get it to bind to localhost only (both for the
AMQP port and the higher cluster port) but I ran into some issues there and I
didn’t want to sink too much time into it.
Anyway, once installed you can interact with rabbitmq using the rabbitmqctl
command, for example:
sudo rabbitmqctl list_users
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_queues
sudo rabbitmqctl change_password USER PASS
sudo rabbitmqctl purge_queue FOO
It’s kind of sloppy that sudo rabbitmqctl change_password USER
doesn’t
prompt for a password (you don’t want to accidentally store it in bash history or
expose it via w
on multi user systems) and instead of printing command line
usage info it actually hit SIGSEGV (not impressive!).
I suppose most people install the web based management interface anyway; this can
be done by running sudo rabbitmq-plugins enable rabbitmq_management
. It
opens an HTTP server on port 15672 (easy to remember since it’s the AMQP port
with a “1” in front of it).
Once inside the web based management interface, click the “Admin” tab, add a new user and put “administrator” in the tags field. After the user is created it will be marked with “No access”, to fix that just click the username and then click the button called “Set permission” to give it access to the “/” vhost (vhosts are similar to vhosts in apache, it allows you to run several broker setups on a single machine so that each broker setup has its own set of queues etc). Finally, delete the guest/guest account and create a new non-admin account (just leave ‘Tags’ blank) and use this account when authenticating from scripts/software that needs to send/receive messages.
Sending and receiving messages
In Python, you can send and receive messages to/from RabbitMQ using a library
called Pika (on Debian/Ubuntu you can install it using sudo apt-get install
python-pika
, or you might prefer to use virtualenv
+ pip
).
Below are two scripts for sending and receiving messages using Pika. These scripts are roughly similar to what you’d use to setup a work queue (pubsub would be slightly different).
The first script posts messages to a queue. RabbitMQ tutorials and documentation refers to such programs as “producers”:
#!/usr/bin/env python
import os
import sys
import pika
import logging
AMQP_PORT = 5672
QUEUE_NAME = "my_queue"
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
CREDENTIALS_FILE = os.path.join(SCRIPT_PATH, ".amqp-credentials")
def main():
logging.basicConfig()
(username, password) = open(CREDENTIALS_FILE).read().splitlines()
credentials = pika.PlainCredentials(username, password)
params = pika.ConnectionParameters('localhost', AMQP_PORT, '/', credentials)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
message = sys.argv[1]
was_delivered = channel.basic_publish(
mandatory=True,
exchange='',
routing_key=QUEUE_NAME,
properties=pika.BasicProperties(delivery_mode=2),
body=message)
if was_delivered:
print 'message "%s" has been delivered to queue "%s"' % (message, QUEUE_NAME)
else:
print "ERROR: message could not be sent!"
channel.close()
if __name__ == '__main__':
main()
The example above is pretty straight forward, but there are a couple of things worth commenting on:
-
The
durable=True
flag passed toqueue_declare
means that the queue will remain even if the message broker is restarted (intentionally or because it crashed). Note, however, that the actual messages in the queue will be lost unless each message is marked as persistent (which the sender does by passing aproperties=pika.BasicProperties(delivery_mode=2)
parameter tobasic_publish
). -
queue_declare()
creates a queue if it doesn’t exist or reuses an existing one as long as it has the same type as the queue being declared (e.g. whether the queue is durable or not). If the type isn’t the same as the declared queue, an exception is raised. -
The call to
channel.confirm_delivery()
puts the channel into confirm mode which means that the broker will send back a confirmation once it has delivered the message to a queue (or concluded that the message should be discarded silently). The latter is, in fact, a common use case for pubsub where a stream of messages are routed to N subscribers, and if N is currently zero this means that the messages should be discarded. -
The flag
mandatory=True
passed tobasic_publish
means thatbasic_publish
will return an error if the message could not be delivered to at least one queue. For durable queues this means that the message will be written to disk before the confirmation is sent. -
Finally, setting
exchange=''
instructs RabbitMQ to deliver the message directly to the queue specified in therouting_key
parameter. For pubsub you need to create your own exchange though.
Once we’ve posted our message to a queue we need another script that connects to the message broker and receives the messages (a consumer in RabbitMQ parlance):
#!/usr/bin/env python
import os
import sys
import pika
import logging
AMQP_PORT = 5672
QUEUE_NAME = 'my_queue'
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
CREDENTIALS_FILE = os.path.join(SCRIPT_PATH, ".amqp-credentials")
def main():
logging.basicConfig()
(username, password) = open(CREDENTIALS_FILE).read().splitlines()
credentials = pika.PlainCredentials(username, password)
params = pika.ConnectionParameters('localhost', AMQP_PORT, '/', credentials)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
def callback(ch, method, properties, body):
print "received message: %r" % body
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=QUEUE_NAME)
print 'waiting for new messages on queue: ' + QUEUE_NAME
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
pass
The consumer sets up the channel and declares the queue in the exact same way as
the producer did. After that it calls start_consuming()
which will block
and wait for new messages. Whenever a new message arrives, the callback passed
to basic_consume()
is called.
The call to channel.basic_qos(prefetch_count=1)
enables “fair dispatch”
and needs some explanation. By default RabbitMQ will assign incoming messages to
available consumers in a round robin fashion without regard to which consumers
are idle/busy. This means that some workers (consumers) might end up with a long
backlog even while other workers are idle which is quite unintuitive.
For example, a 2-worker round-robin scheduling of a workload where every odd task is heavy and every even task is easy will become very unfair because one of the workers will get all the easy tasks (and wont be busy that much) while the other worker will consistency get all the heavy tasks (and will even thus become infinitely more backlogged over time if the actual work takes longer than the time until the next task arrives – all while the first worker is completely idle).
The call to channel.basic_qos(prefetch_count=1)
instead makes RabbitMQ not
give new work to a consumer until it has acknowledged that the consumer
completed the previous work it was assigned.
If you want to dig deeper, rabbitmq.com has several excellent tutorials available (written for multiple different programming languages).