Build a Distributed Task Scheduler Using RabbitMQ and Redis
Delay Task Execution Using RabbitMQ deadLetterExchange
Are you interested in building a task scheduler using RabbitMQ?
You may wonder why one should build a task scheduler using RabbitMQ given that it’s a message broker and has no reason to behave as a scheduler.
Well, a year ago, I was working on a hobby project where I wanted to use RabbitMQ and I happened to have a requirement of executing a piece of code after a certain time. A delayed task execution
we can call it.
This is where I got curious, wondering if it is possible to delay the message execution. In other words, let’s say a message has been sent to a queue at x time and I want to consume it at x+y time, where y is configurable per message/task.
I was able to achieve this using deadLetterExchange
. This is a RabbitMQ feature. By using this RabbitMQ feature as core logic, I was able to specify how much time should elapse before the consumer can consume the task or message.
But it’s not a task scheduler yet, it’s simply a delayed task execution. A task scheduler should be able to schedule
and cancel
tasks. Well, this is where Redis comes into play to make this into a task scheduler.
I hope the above content helps to set the context here. Let’s dive in.
Prerequisites
As part of this post, we are not going to discuss the introduction of RabbitMQ and Redis. It is assumed that you have at least a basic understanding of RabbitMQ and Redis. The sample project which is discussed here is written on NodeJS. Knowledge of NodeJS would be helpful, but it is not required.
How to achieve delayed task execution
RabbitMQ will forward the messages to the corresponding queue as soon as they are available. There is no direct way we can tell RabbitMQ to keep the message somewhere and send it to the expected queue after y time.
Let’s see how we can achieve this by using deadLetterExchange
.
Dead Letter Exchanges
Messages from a queue can be dead-lettered
(i.e., republished to an exchange) when any of the following events occurs:
The message is negatively acknowledged by a consumer.
The message expires due to per-message TTL.
The message was dropped because its queue exceeded a length limit.
We are going to use the second option (i.e per-message TTL) to trigger the deadLetterExchange
. Let’s see it in detail.
TTL stands for Time To Live
Let’s set a TTL for each of our messages and send them to a queue. Let’s call it an intermediate queue
. When declaring an intermediate queue
, we set an option that when messages in this queue expire, they should be delivered to another specified exchange. Let’s call it the final exchange
.
The message TTL time is the time we want to delay. The final exchange
is where our consumers will consume messages from a specified queue.
So, once the message TTL
expires, our consumer receives the message.
Now the question is, how do we let the message expire? It’s simple. We are sending the message to the intermediate queue
right. Let’s not allow anyone to consume it from here. If the messages in this intermediate queue
are not consumed till the specified TTL
, then they are dead-lettered
and the message will be delivered to our consumer who is happily consuming messages from the queue of final exchange
.
By setting this TTL
as the delay we require, and sending the message to an intermediate queue
where there are no consumers, we can achieve a delayed task execution
on RabbitMQ.
Below is the UML diagram to achieve delayed task execution
on RabbitMQ using dead letter exchange
.
RabbitMQ delayed task execution using dead letter exchange
But delayed task execution is not a task scheduler, right. Let’s see how we can build a task scheduler that will provide the option to schedule and cancel tasks using RabbitMQ and Redis.
Task Scheduler
The Role of Redis
Redis is used to track the validity of the scheduled task. Using this, we can set any scheduled task as invalid if required, and when it’s time to execute the task, we can check whether the task is valid to execute or not.
Since we would be using Redis to only store task validity information, it can be replaced with any data storage tool.
Architecture
The list entities we’d use to build this task scheduler are as follows.
Task → Task definition.
Producer → Base class provides option to send delayed message.
Consumer → Base class to consume messages from final queue.
Task Repository → Repository class to check the task validity status.
Task Scheduler → Class provides the option to schedule and invalidate tasks.
Task Consumer → Class to consumes tasks from specified task type queue.
Task
class Task {
public taskId: string;
public taskType: string; // queue name
public ttlInSeconds: number;
public payload: string;
}
taskId
→ Unique id to identity this task.taskType
→ Type of this task. this is used as a queue name so both producer and consumer can infer name the from the task type.ttlInSeconds
→ The time delay after which we want this task to get executed.payload
→ A JSON string describing the task; ThetaskType
will come in handy to parse this json as per the different task types.
The taskType
plays an important role. It acts as a queue name, so when a task scheduler wants to schedule a task, it can use taskType as a queue name, and whoever wants to consume the particular task type can consume it from that queue.
Following are some of the examples of taskType
send-sms
,send-offer-notification
,send-greetings
etc
Producer
public sendDelayedMessageToQueue(taskType: string, delayInMills: number, data: string): Promise<void> {
const INTERMEDIATE_QUEUE = `${taskType}_INTERMEDIATE_QUEUE`;
const INTERMEDIATE_EXCHANGE = `${taskType}_INTERMEDIATE_EXCHANGE`;
const INTERMEDIATE_EXCHANGE_TYPE = "fanout";
const FINAL_QUEUE = taskType;
const FINAL_EXCHANGE = `${taskType}_FINAL_EXCHANGE`;
const FINAL_EXCHANGE_TYPE = "fanout";
return new Promise((resolve, reject) => {
this.assertExchange(INTERMEDIATE_EXCHANGE, INTERMEDIATE_EXCHANGE_TYPE)
.then((_) => this.assertExchange(FINAL_EXCHANGE, FINAL_EXCHANGE_TYPE))
.then((_) => this.assertQueue(INTERMEDIATE_QUEUE, { deadLetterExchange: FINAL_EXCHANGE }))
.then((_) => this.assertQueue(FINAL_QUEUE, {}))
.then((_) => this.bindQueue(INTERMEDIATE_QUEUE, INTERMEDIATE_EXCHANGE, ""))
.then((_) => this.bindQueue(FINAL_QUEUE, FINAL_EXCHANGE, ""))
.then((_) => {
this.channel?.sendToQueue(INTERMEDIATE_QUEUE, Buffer.from(data), {
expiration: delayInMills, // The delay after which we want to send this message
});
resolve();
})
.catch(reject);
});
}
As we discussed earlier, we need two queues to achieve this delayed task execution
.
Intermediate queue → Holds the task till expiration; post expiration forwards the task to
final exchange
.Final queue → This is associated with the
final exchange
. Our task consumer will listen to this queue.
Both the queue and its associated exchange names are formed from taskType
to bring in some sort of contract between scheduler and consumer.
From the above code, we can see the intermediate queue is associated with the final exchange as a dead letter exchange
.
this.assertQueue(
INTERMEDIATE_QUEUE, { deadLetterExchange: FINAL_EXCHANGE }
)
Once the task expiration time has elapsed, the task will be forwarded to the final exchange
, and the final exchange
will route the task to the final queue.
sendToQueue(INTERMEDIATE_QUEUE, Buffer.from(data), {
expiration: delayInMills,
});
Consumer
public consume(taskType: string, handler: (payload: string) => void) {
const FINAL_QUEUE = taskType;
const FINAL_EXCHANGE = `${taskType}_FINAL_EXCHANGE`;
const FINAL_EXCHANGE_TYPE = "fanout";
this.assertExchange(FINAL_EXCHANGE, FINAL_EXCHANGE_TYPE)
.then((_) => this.assertQueue(FINAL_QUEUE, {}))
.then((_) => this.bindQueue(FINAL_QUEUE, FINAL_EXCHANGE, ""))
.then((_) => {
this.channel?.consume(
FINAL_QUEUE,
(msg: Message | null) => {
handler(msg?.content ? msg?.content.toString() : "");
},
{ noAck: true }
);
});
}
A consumer will consume messages from a specific queue (i.e. taskType
).
Task Repository
interface TaskRepository {
createTask(taskId: string, ttlInSeconds: number): Promise<void>;
deleteTask(taskId: string): Promise<void>;
isTaskValid(taskId: string): Promise<boolean>;
}
This provides options to create, delete, and validate tasks.
Task Scheduler
scheduleTask(delayInMilliseconds: number, task: Task): Promise<string> {
return new Promise((resolve: (taskId: string) => void, reject: (error: Error) => void) => {
this.taskRepository
.createTask(task.taskId, task.ttlInSeconds)
.then(() => this.producer.sendDelayedMessageToQueue(task.taskType, delayInMilliseconds, task.toJson()))
.then(() => resolve(task.taskId))
.catch((error) => reject(error));
});
}
invalidateTask(taskId: string): Promise<void> {
return this.taskRepository.deleteTask(taskId);
}
The Task Scheduler
will create a task entry in Redis when scheduling a task.
When we no longer want the task to be executed, the task can be deleted to mark it as invalid. The scheduleTask
returns a taskId
. The same can be used to invalidate the task if required.
Task Consumer
consume(taskType: string, handler: (task: Task) => void): void {
this.consumer.consume(taskType.toString(), async (payload: string) => {
const task = Task.fromJson(payload);
const isTaskValid = await this.taskRepository.isTaskValid(task.taskId);
if (isTaskValid) handler(task);
});
}
The Task Consumer
will listen for a specific taskType
and execute it if the task is still valid.
Even after we invalidate the task by deleting it. It will be consumed by the consumer at the scheduled time. It will be the Task Consumer
responsibility to check the task validity before executing the task using isTaskValid
function from the Task Repository
.
Demo
The demo includes two consumers and two producers.
Consider consumer 1 and 2 as email services, each microservice responsible for sending greetings
and offer notification
to customers.
The greeting and offer-notification are two different task types
which will be produced by Greeter-Service
and Offer-Notification-Service
producers respectively.
The demo shows that Email-Service-1
(i.e., consumer 1) will process both greet and offer-notification
tasks, where as Email-Service-2
(i.e., consumer 2) will process only offer-notification tasks
.
Due to brevity, the task invalidation is not shown on the demo but is explained in the project Readme file.
I tried my best to animate and highlight log messages to indicate the event flow between scheduler
and consumer
. The log message includes a timestamp, which can be used to assert whether the consumer is getting a task notification at the expected scheduled time or not. I have highlighted the timestamp on both the scheduler
and the consumer
whenever the consumer
receives a task
.
Source Code
https://github.com/sridhar-sp/task-scheduler
Reference
https://github.com/sridhar-sp/draw-server (Multiplayer drawing game using this task-scheduler)
https://play.google.com/store/apps/details?id=com.gandiva.draw (Production app using this task-scheduler)
Top comments (0)