DEV Community

codewitgabi
codewitgabi

Posted on

Messaging queues (Rabbitmq as broker)

Since backend development goes beyond building CRUD applications, I am here to discuss one of the core concepts of backend development which is message queueing.

In DSA, queues are basically a structure that follow the FIFO (First In First Out) rule; meaning that the first element to be entered is the first to be processed just like queue in banks, shoprite etc.

A message queue is a form of asynchronous service-to-service communication used in serverless and microservices architectures. Messages are stored on the queue until they are processed and deleted. Each message is processed only once, by a single consumer AWS.

Today, I will discuss some things that need to be considered to have a kind of reliable server when using Rabbitmq as a message broker.

  • Message acknowledgement: Doing a task can take a few seconds, you may wonder what happens if a consumer starts a long task and it terminates before it completes. Basically, once our producer delivers a message to the consumer, it marks it for deletion. In this case, if you terminate a worker, the message it was just processing is lost. The messages that were dispatched to this particular worker but were not yet handled are also lost. If a worker dies, we'd like the task to be delivered to another worker. In order to prevent this from happening, Rabbitmq provides message acknowledgement. If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

To achieve this, do this

def callback(ch, method, properties, body):
    # your code
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='hello', on_message_callback=callback)
Enter fullscreen mode Exit fullscreen mode
  • Message durability: We have learnt how to handle tasks when a worker is stopped. How do we handle them if our broker is stopped? When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable. This should be done both for the producer and consumer (or worker).
channel.queue_declare(queue='queue_name', durable=True)
Enter fullscreen mode Exit fullscreen mode

If you have already created a queue that's not durable, updating it won't change that so it's better to do that at the beginning

At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with the value of pika.DeliveryMode.Persistent

channel.basic_publish(
    exchange="",
    routing_key="queue_name",
    body=message,
    properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)
Enter fullscreen mode Exit fullscreen mode
  • Fair dispatch: By default, if we have multiple workers/consumers running, rabbitmq tries to share tasks to these workers evenly using the round-robin algorithm (RRA) but what if one worker is already done processing its task and the others are not done but because of RRA, it's not its turn so it becomes idle and the request is added to another worker that is processing a task. To fix this issue, you should use a fair dispatch in the worker.
channel.basic_qos(prefetch_count=1)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)