DEV Community

Sridhar Subramani
Sridhar Subramani

Posted on

Build a Distributed Task Scheduler Using RabbitMQ and Redis

Build a Distributed Task Scheduler Using RabbitMQ and Redis

Delay Task Execution Using RabbitMQ deadLetterExchange

Scheduler

Are you interested in building a task scheduler using RabbitMQ?

You may wonder why one should build a task scheduler using RabbitMQ given that it’s a message broker and has no reason to behave as a scheduler.

Well, a year ago, I was working on a hobby project where I wanted to use RabbitMQ and I happened to have a requirement of executing a piece of code after a certain time. A delayed task execution we can call it.

This is where I got curious, wondering if it is possible to delay the message execution. In other words, let’s say a message has been sent to a queue at x time and I want to consume it at x+y time, where y is configurable per message/task.

I was able to achieve this using deadLetterExchange. This is a RabbitMQ feature. By using this RabbitMQ feature as core logic, I was able to specify how much time should elapse before the consumer can consume the task or message.
But it’s not a task scheduler yet, it’s simply a delayed task execution. A task scheduler should be able to schedule and cancel tasks. Well, this is where Redis comes into play to make this into a task scheduler.

I hope the above content helps to set the context here. Let’s dive in.


Prerequisites

As part of this post, we are not going to discuss the introduction of RabbitMQ and Redis. It is assumed that you have at least a basic understanding of RabbitMQ and Redis. The sample project which is discussed here is written on NodeJS. Knowledge of NodeJS would be helpful, but it is not required.


How to achieve delayed task execution

RabbitMQ will forward the messages to the corresponding queue as soon as they are available. There is no direct way we can tell RabbitMQ to keep the message somewhere and send it to the expected queue after y time.
Let’s see how we can achieve this by using deadLetterExchange .

Dead Letter Exchanges

Messages from a queue can be dead-lettered (i.e., republished to an exchange) when any of the following events occurs:

  • The message is negatively acknowledged by a consumer.

  • The message expires due to per-message TTL.

  • The message was dropped because its queue exceeded a length limit.

We are going to use the second option (i.e per-message TTL) to trigger the deadLetterExchange. Let’s see it in detail.

TTL stands for Time To Live

Let’s set a TTL for each of our messages and send them to a queue. Let’s call it an intermediate queue. When declaring an intermediate queue, we set an option that when messages in this queue expire, they should be delivered to another specified exchange. Let’s call it the final exchange.

The message TTL time is the time we want to delay. The final exchange is where our consumers will consume messages from a specified queue.

So, once the message TTL expires, our consumer receives the message.

Now the question is, how do we let the message expire? It’s simple. We are sending the message to the intermediate queue right. Let’s not allow anyone to consume it from here. If the messages in this intermediate queue are not consumed till the specified TTL, then they are dead-lettered and the message will be delivered to our consumer who is happily consuming messages from the queue of final exchange.

By setting this TTL as the delay we require, and sending the message to an intermediate queue where there are no consumers, we can achieve a delayed task execution on RabbitMQ.

Below is the UML diagram to achieve delayed task execution on RabbitMQ using dead letter exchange.

RabbitMQ delayed task execution using dead letter exchangeRabbitMQ delayed task execution using dead letter exchange

But delayed task execution is not a task scheduler, right. Let’s see how we can build a task scheduler that will provide the option to schedule and cancel tasks using RabbitMQ and Redis.


Task Scheduler

The Role of Redis

Redis is used to track the validity of the scheduled task. Using this, we can set any scheduled task as invalid if required, and when it’s time to execute the task, we can check whether the task is valid to execute or not.

Since we would be using Redis to only store task validity information, it can be replaced with any data storage tool.

Architecture

task scheduler architecturetask scheduler architecture

The list entities we’d use to build this task scheduler are as follows.

  • Task → Task definition.

  • Producer → Base class provides option to send delayed message.

  • Consumer → Base class to consume messages from final queue.

  • Task Repository → Repository class to check the task validity status.

  • Task Scheduler → Class provides the option to schedule and invalidate tasks.

  • Task Consumer → Class to consumes tasks from specified task type queue.

Task

    class Task {
      public taskId: string;
      public taskType: string; // queue name
      public ttlInSeconds: number;
      public payload: string;
    }
Enter fullscreen mode Exit fullscreen mode
  • taskId → Unique id to identity this task.

  • taskType → Type of this task. this is used as a queue name so both producer and consumer can infer name the from the task type.

  • ttlInSeconds → The time delay after which we want this task to get executed.

  • payload → A JSON string describing the task; The taskType will come in handy to parse this json as per the different task types.

The taskType plays an important role. It acts as a queue name, so when a task scheduler wants to schedule a task, it can use taskType as a queue name, and whoever wants to consume the particular task type can consume it from that queue.

Following are some of the examples of taskType send-sms, send-offer-notification, send-greetings etc

Producer

    public sendDelayedMessageToQueue(taskType: string, delayInMills: number, data: string): Promise<void> {
        const INTERMEDIATE_QUEUE = `${taskType}_INTERMEDIATE_QUEUE`;
        const INTERMEDIATE_EXCHANGE = `${taskType}_INTERMEDIATE_EXCHANGE`;
        const INTERMEDIATE_EXCHANGE_TYPE = "fanout";

        const FINAL_QUEUE = taskType;
        const FINAL_EXCHANGE = `${taskType}_FINAL_EXCHANGE`;
        const FINAL_EXCHANGE_TYPE = "fanout";

        return new Promise((resolve, reject) => {
          this.assertExchange(INTERMEDIATE_EXCHANGE, INTERMEDIATE_EXCHANGE_TYPE)
            .then((_) => this.assertExchange(FINAL_EXCHANGE, FINAL_EXCHANGE_TYPE))
            .then((_) => this.assertQueue(INTERMEDIATE_QUEUE, { deadLetterExchange: FINAL_EXCHANGE }))
            .then((_) => this.assertQueue(FINAL_QUEUE, {}))
            .then((_) => this.bindQueue(INTERMEDIATE_QUEUE, INTERMEDIATE_EXCHANGE, ""))
            .then((_) => this.bindQueue(FINAL_QUEUE, FINAL_EXCHANGE, ""))
            .then((_) => {
              this.channel?.sendToQueue(INTERMEDIATE_QUEUE, Buffer.from(data), {
                expiration: delayInMills, // The delay after which we want to send this message
              });
              resolve();
            })
            .catch(reject);
        });
      }
Enter fullscreen mode Exit fullscreen mode

As we discussed earlier, we need two queues to achieve this delayed task execution.

  1. Intermediate queue → Holds the task till expiration; post expiration forwards the task to final exchange.

  2. Final queue → This is associated with the final exchange. Our task consumer will listen to this queue.

Both the queue and its associated exchange names are formed from taskType to bring in some sort of contract between scheduler and consumer.

From the above code, we can see the intermediate queue is associated with the final exchange as a dead letter exchange.

    this.assertQueue(
        INTERMEDIATE_QUEUE, { deadLetterExchange: FINAL_EXCHANGE }
    )
Enter fullscreen mode Exit fullscreen mode

Once the task expiration time has elapsed, the task will be forwarded to the final exchange, and the final exchange will route the task to the final queue.

    sendToQueue(INTERMEDIATE_QUEUE, Buffer.from(data), {
        expiration: delayInMills,
    });
Enter fullscreen mode Exit fullscreen mode

Consumer

      public consume(taskType: string, handler: (payload: string) => void) {
        const FINAL_QUEUE = taskType;
        const FINAL_EXCHANGE = `${taskType}_FINAL_EXCHANGE`;
        const FINAL_EXCHANGE_TYPE = "fanout";

        this.assertExchange(FINAL_EXCHANGE, FINAL_EXCHANGE_TYPE)
          .then((_) => this.assertQueue(FINAL_QUEUE, {}))
          .then((_) => this.bindQueue(FINAL_QUEUE, FINAL_EXCHANGE, ""))
          .then((_) => {
            this.channel?.consume(
              FINAL_QUEUE,
              (msg: Message | null) => {
                handler(msg?.content ? msg?.content.toString() : "");
              },
              { noAck: true }
            );
          });
      }
Enter fullscreen mode Exit fullscreen mode

A consumer will consume messages from a specific queue (i.e. taskType).

Task Repository

    interface TaskRepository {
      createTask(taskId: string, ttlInSeconds: number): Promise<void>;
      deleteTask(taskId: string): Promise<void>;
      isTaskValid(taskId: string): Promise<boolean>;
    }
Enter fullscreen mode Exit fullscreen mode

This provides options to create, delete, and validate tasks.

Task Scheduler

    scheduleTask(delayInMilliseconds: number, task: Task): Promise<string> {
       return new Promise((resolve: (taskId: string) => void, reject: (error: Error) => void) => {
         this.taskRepository
           .createTask(task.taskId, task.ttlInSeconds)
           .then(() => this.producer.sendDelayedMessageToQueue(task.taskType, delayInMilliseconds, task.toJson()))
           .then(() => resolve(task.taskId))
           .catch((error) => reject(error));
       });
    }


    invalidateTask(taskId: string): Promise<void> {
        return this.taskRepository.deleteTask(taskId);
    }
Enter fullscreen mode Exit fullscreen mode

The Task Scheduler will create a task entry in Redis when scheduling a task.
When we no longer want the task to be executed, the task can be deleted to mark it as invalid. The scheduleTask returns a taskId. The same can be used to invalidate the task if required.

Task Consumer

    consume(taskType: string, handler: (task: Task) => void): void {
        this.consumer.consume(taskType.toString(), async (payload: string) => {
          const task = Task.fromJson(payload);
          const isTaskValid = await this.taskRepository.isTaskValid(task.taskId);

          if (isTaskValid) handler(task);
        });
    }
Enter fullscreen mode Exit fullscreen mode

The Task Consumer will listen for a specific taskType and execute it if the task is still valid.
Even after we invalidate the task by deleting it. It will be consumed by the consumer at the scheduled time. It will be the Task Consumer responsibility to check the task validity before executing the task using isTaskValid function from the Task Repository.


Demo

The demo includes two consumers and two producers.
Consider consumer 1 and 2 as email services, each microservice responsible for sending greetings and offer notification to customers.
The greeting and offer-notification are two different task types which will be produced by Greeter-Service and Offer-Notification-Service producers respectively.

The demo shows that Email-Service-1 (i.e., consumer 1) will process both greet and offer-notification tasks, where as Email-Service-2 (i.e., consumer 2) will process only offer-notification tasks.

Due to brevity, the task invalidation is not shown on the demo but is explained in the project Readme file.

I tried my best to animate and highlight log messages to indicate the event flow between scheduler and consumer. The log message includes a timestamp, which can be used to assert whether the consumer is getting a task notification at the expected scheduled time or not. I have highlighted the timestamp on both the scheduler and the consumer whenever the consumer receives a task.


Source Code

https://github.com/sridhar-sp/task-scheduler


Reference

Top comments (0)