Building a Robust Task Queue with BullMQ and Qstash
Hello guys, this is my first content. I would like to start my first article by creating a real use case using BullMQ and Qstash. I hope it will be useful.
BullMQ and Qstash are actually seen as competitors of each other, but I wanted to bring you a new perspective and show that they can also be used as complements to each other.
Task queues are essential for handling asynchronous operations in modern applications, especially when the operations are time-consuming or need to be processed in the background. Combining BullMQ with Qstash provides a powerful and scalable solution to this problem. I will explain how to integrate BullMQ with Qstash for a real-world use case: processing user-uploaded image files.
Why BullMQ and Qstash?
BullMQ is a Node.js-based library for managing job queues. It is feature-rich, reliable, and provides support for delayed, repeatable, and prioritized jobs. Under the hood, it uses Redis for persistence and messaging.
Qstash, on the other hand, is a serverless messaging and queuing system provided by Upstash. It allows you to publish messages to endpoints securely and reliably without the need to manage infrastructure.
Together, BullMQ and Qstash can handle scenarios where tasks originate from a public-facing API or service (via Qstash) and are processed by a secure backend (via BullMQ).
Use Case: Processing User-Uploaded Images
Imagine you run a service where users upload images that need to be processed (e.g., resized, converted, or analyzed). A typical workflow might involve:
- Receiving the upload request.
- Publishing the task to a queue.
- Processing the image asynchronously.
- Storing the results and notifying the user.
Here’s how I can implement this using BullMQ and Qstash.
Setting Up the Environment
Prerequisites
Before starting, make sure you have the following:
- Node.js (v14 or later)
- An Upstash Qstash account
- A Redis instance (preferably hosted, e.g., by Upstash)
- BullMQ installed (
npm install bullmq
) - Axios installed for HTTP requests (
npm install axios
)
Qstash Setup
- Create a Qstash Token: Go to the Qstash dashboard and generate an API token.
-
Create an Endpoint: Define an endpoint in your application that Qstash will send messages to. For our use case, it could be
/process-image
.
Implementation
1. Setting Up the Publisher (Qstash)
First, let me create a function to publish messages to Qstash. Each message will contain metadata about the uploaded image.
const axios = require('axios');
const QSTASH_URL = 'https://qstash.upstash.io/v1/publish';
const QSTASH_TOKEN = 'your_qstash_token';
async function publishToQstash(endpoint, payload) {
const url = `${QSTASH_URL}/${encodeURIComponent(endpoint)}`;
try {
const response = await axios.post(url, payload, {
headers: {
Authorization: `Bearer ${QSTASH_TOKEN}`,
},
});
console.log('Message published:', response.data);
} catch (error) {
console.error('Failed to publish message:', error);
}
}
// Example usage
publishToQstash('/process-image', { imageId: '12345', userId: '67890' });
2. Setting Up the Worker (BullMQ)
Next, I create a BullMQ worker to handle the incoming tasks from Qstash. The worker will process the image asynchronously.
const { Worker, Queue } = require('bullmq');
const Redis = require('ioredis');
const connection = new Redis('your_redis_url');
const queueName = 'image-processing';
// Define the job processor
const worker = new Worker(
queueName,
async (job) => {
console.log(`Processing job ${job.id} with data:`, job.data);
// Simulate image processing
await new Promise((resolve) => setTimeout(resolve, 3000));
console.log(`Job ${job.id} completed.`);
},
{ connection }
);
worker.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err);
});
// To add jobs from Qstash
const queue = new Queue(queueName, { connection });
async function addJobToQueue(data) {
await queue.add('process', data);
console.log('Job added to queue:', data);
}
3. Creating the Endpoint to Bridge Qstash and BullMQ
Finally, I’ll set up an endpoint to receive messages from Qstash and enqueue jobs for BullMQ.
const express = require('express');
const bodyParser = require('body-parser');
const app = express();
const PORT = 3000;
app.use(bodyParser.json());
app.post('/process-image', async (req, res) => {
const { imageId, userId } = req.body;
console.log('Received task:', req.body);
try {
await addJobToQueue({ imageId, userId });
res.status(200).send('Task accepted');
} catch (error) {
console.error('Failed to enqueue job:', error);
res.status(500).send('Internal Server Error');
}
});
app.listen(PORT, () => {
console.log(`Server running on http://localhost:${PORT}`);
});
Testing the Integration
- Start your server:
node server.js
- Publish a message to Qstash using the
publishToQstash
function. - Verify that the BullMQ worker processes the job and logs the output.
The Workflow Of Building A Task Queue
I want to visualize the workflow of the use case. Here is the steps:
- User Uploads Image: The user sends an image via a front-end application.
- Backend Validates Image Format and Size: The backend ensures the image meets expected criteria.
- Metadata Extracted from Image: Extract necessary metadata such as dimensions, format, and file size.
- Qstash Publishes Task with Metadata: The task containing metadata is sent to Qstash.
- Backend Endpoint Authenticates Qstash Request: The endpoint ensures the request comes from Qstash.
- Logs the Received Task for Monitoring: Task details are logged for future reference.
- BullMQ Enqueues Job in Redis Queue: A job is created and added to the Redis-backed BullMQ queue.
- Redis Notifies Worker: Redis alerts the worker about the new job.
- Worker Retrieves Job from Queue: The worker fetches the job from the queue.
- Worker Downloads Image for Processing: The worker retrieves the image file for further operations.
- Image Processing Begins: Operations like resizing, converting, or analyzing the image commence.
- Processing Logs Captured for Debugging: Any processing logs are captured for monitoring.
- Results Saved to Persistent Storage: The processed image is saved to a storage system (e.g., S3, database).
- Worker Notifies Backend of Completion: The worker informs the backend that processing is complete.
- Backend Updates Job Status in Database: Job status is updated in the system database.
- Backend Sends User Notification: The user is notified via email or other means.
- Task Marked as Complete in System: The system marks the task as completed.
- Monitoring System Logs Task Metrics: Task performance and metrics are logged for analysis.
Conclusion
By combining BullMQ and Qstash, you can build a robust, scalable, and decoupled task queue. BullMQ handles the heavy lifting for job processing, while Qstash simplifies the messaging layer, allowing you to integrate with external systems securely. This architecture is particularly well-suited for scenarios like processing user uploads, where reliability and scalability are paramount.
That's all. I am open to any feedback and questions. Thanks!
Top comments (0)