In the realm of modern application development, efficiently managing background tasks is crucial. Redis, renowned for its versatile data structures and high performance, serves as an excellent foundation for building a task queue system. In this post, we'll delve into creating a Redis-based queue with configurable concurrency.
Working in various startups I have had the liberty to solve complex problems through quick but creative solutions. In this post, we will discuss one situation where I had to design a task queue that was easy to configure, manage and cost-effective. Task queues let applications perform work, called tasks, asynchronously outside of a user request. If an app needs to execute work in the background, it adds tasks to task queues. The tasks are executed later, by worker services. Now that we discussed what are task queues, let's dive into the implementation.
βοΈ Prerequisite
- Before we begin, we are assuming that you have some knowledge of Redis and Celery
- We will be using Redis and Celery setup in Python forked from my other project Flask Boilerplate
Let's roll in!π
Overview
The task queue system leverages Redis lists to store tasks and processes them asynchronously. We will create two celery functions, which will add tasks to the queue and process them in sequence as per the concurrency level.
1. queue_task_executor
This function does two things:
- adds the task to the queue
- processes the task when called by callback (2nd function)
@celery.task(name='core.tasks.queue_task_executor')
def queue_task_executor(**kwargs):
# fetch the task id (request_id)
request_id = kwargs.get('request_id')
is_called_by_callback = kwargs.get('is_called_by_callback', False)
if not is_called_by_callback:
# push this task to the tail of the queue
logger.info(f'pushing id: {request_id} to queue')
redis_client.rpush(TASK_EXEC_LIST_KEY, request_id)
# calling callback which will pick it up for execution
queue_task_executor_callback.apply_async(kwargs={})
return
# execute the task here
# time.sleep(5)
logger.info(f'task id:{request_id} execution completed successfully!')
It simply checks if the function is called by a callback to execute the task otherwise just adds the task to the tail of the Redis list (queue). The statement time.sleep(5)
is just a test line that can be replaced with the task call/logic. We can simply call appropriate task functions (to execute) based on the arguments (request_id
) being passed as metadata to queue the task.
2. queue_task_executor_callback
This callback function acts as a brain of the system. It performs the following:
- Pick the next bunch of tasks to run in parallel from the Redis list
- Uses Celery Groups to queue the task with a self-callback
@celery.task(name='core.tasks.queue_task_executor_callback')
def queue_task_executor_callback(**kwargs):
logger.debug(f'queue_task_executor_callback called {kwargs}')
if kwargs and kwargs.get('pick_next'): # only delete this key if called via completing tasks
redis_client.delete(TASK_EXECUTOR_KEY)
if redis_client.setnx(TASK_EXECUTOR_KEY, 1):
# retrieve first x elements from the queue
next_tasks = redis_client.lrange(TASK_EXEC_LIST_KEY, 0, TASK_CONCURRENCY_LIMIT - 1)
tasks = []
if len(next_tasks):
tasks = [
queue_task_executor.signature(
kwargs={
'request_id': int(val),
'is_called_by_callback': True
}
)
for val in next_tasks
]
# delete x elements from queue
redis_client.ltrim(TASK_EXEC_LIST_KEY, TASK_CONCURRENCY_LIMIT, -1)
job = group(tasks)
job.link(queue_task_executor_callback.signature(kwargs={'pick_next': True}))
_ = job.apply_async()
else:
# this means this was the last callback
redis_client.delete(TASK_EXECUTOR_KEY)
else:
logger.debug(f'queue_task_executor_callback already in progress {redis_client.get(TASK_EXECUTOR_KEY)}')
In this function, we retrieve the tasks from the Redis list that were added in the 1st function. The number of tasks to retrieve depends on the concurrency limit we set as TASK_CONCURRENCY_LIMIT
. Then, we leverage the Celery Groups feature to execute the tasks in parallel with a callback function that calls this function again with an argument to pick the next bunch of tasks. To avoid duplicate calls, we wrap this whole logic under a lock key TASK_EXECUTOR_KEY
, ensuring that only one celery task executes this function queue_task_executor_callback
to pick the next bunch of tasks from Redis list.
That's pretty much it! Now, we can add tasks to the queue by calling queue_task_executor
celery task.
π‘ Advantages of This Redis Queue Setup vs. Cloud Infrastructure
- Cost-Effectiveness: Cloud queuing services charge based on usage (e.g., number of requests, message size, or data transfer). With Redis, we pay only for hosting your Redis server, which can be significantly cheaper.
- Simplicity: We only need Redis as a dependency. There's no need to set up additional accounts, APIs, or permissions as we would with cloud services, and hence, it's easy to run and debug locally.
- Configurable Concurrency: This Redis-based setup allows us to directly control concurrency using the queue_task_executor_callback function.
π’ Final Words
This Redis-based queue system offers a lightweight, flexible, and cost-effective alternative to cloud-based queuing solutions. It empowers developers with full control over task execution, low-latency performance, and the ability to run independently of external vendors.
π You can check out my GitHub repository for a complete working example of this approach π
idris-rampurawala / redis-queue
Redis Queue project is aimed at creating a queue based in redis and celery using redis data structures.
Redis Queue
Redis Queue project is aimed at creating a queue based in redis and celery using redis data structures. We can control the task concurrency as per our need
Note: This project is forked from one of my project Flask Boilerplate to quickly get started.
Contributing
We encourage you to contribute to this project! Please check out the Contributing guidelines about how to proceed.
Getting Started
Prerequisites
- Python 3.11.3 or higher
- Up and running Redis client
Project setup
# clone the repo
$ git clone git@github.com:idris-rampurawala/redis-queue.git
# move to the project folder
$ cd redis-queue
If you want to install redis via docker
$ docker run -d --name="redis-queue" -p 6379:6379 redis
Creating virtual environment
- Setup the correct python version using pyenv
pyenv install 3.11.3
- Install
pipenv
a global python projectpip install pipenv
- Create a
virtual environment
for this project# activating the pipenv environment
β¦
Top comments (0)