DEV Community

Cover image for Consuming paginated API using periodic Celery task in a Django Application
John Owolabi Idogun
John Owolabi Idogun

Posted on • Originally published at johnowolabiidogun.dev

Consuming paginated API using periodic Celery task in a Django Application

Introduction

API (Application Programming Interface) consumption is the new normal in contemporary times as many software products have shifted focus on decoupling Backend and Frontend codebases. Backend Engineers are tasked with writing consumable APIs that their Frontend counterparts consume. In some cases, even Backend Engineers utilize some other API services to accomplish their tasks.

Some services provide an enormously large dataset so making them accessible at a single API call might not be great. Pagination then comes to the rescue. Many APIs are now paginated to make available a fraction of the data. To access other fractions, you need some extra tasks.

This article demonstrates how to set up Celery background tasks to consume paginated APIs periodically. We'll explore iterative and recursive approaches for APIs paginated using page parameters and those using next URLs. The fetched data will be stored in a Django model, overwriting previous data. Note that persisting historical data is outside the scope of this article but will be addressed in a future post on building a data warehouse.

Prerequisite

Basic familiarity with Django is assumed. Refer to the Django tutorial for an introduction.

Source code

GitHub logo Sirneij / django_excel

Exporting Django model data as excel file (.xlsx) using openpyxl library and Google Spreadsheet API

django_excel

main Issues Forks Stars License

This repository accompanies this tutorial on dev.to. It has been deployed to Heroku and can be accessed live via this link.

NOTE: If you use Coingecko's API, when you use my code, CGSIRNEIJ, I get some commissions. That can be a good way to help me.

Run Locally

To run the project locally:

  1. Create a virtual environment using venv, poetry, virtualenv, or pipenv. I recommend virtualenv.

  2. Activate the virtual environment.

  3. Install dependencies:

    pip install -r requirements.txt
    Enter fullscreen mode Exit fullscreen mode
  4. Migrate the database:

    python manage.py migrate
    Enter fullscreen mode Exit fullscreen mode
  5. Run the project:

    python manage.py runserver
    Enter fullscreen mode Exit fullscreen mode
  6. In another terminal, issue this command to start celery:

    celery -A django_excel worker -l info -B
    Enter fullscreen mode Exit fullscreen mode

Run Tests Locally

To run the tests:

pytest --nomigrations --reuse-db -W error::RuntimeWarning --cov=core --cov-report=html tests/
Enter fullscreen mode Exit fullscreen mode



Implementation

Step 1: Setup Django project and app with celery

Create a Django project named django_excel within a virtual environment. Ensure that django and celery are installed in your environment.

virtualenv➜  django_excel git:(main) django-admin startproject django_excel .
Enter fullscreen mode Exit fullscreen mode

Create an app named core:

virtualenv➜  django_excel git:(main) python manage.py startapp core
Enter fullscreen mode Exit fullscreen mode

Register your application in settings.py.

...
# Application definition

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',

    'core.apps.CryptoappConfig', #Our new app
]
...

Enter fullscreen mode Exit fullscreen mode

It is time to set up our application to utilize Celery. To do this, create a file aptly named celery.py in your project's directory and paste the following snippet:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_excel.settings')

app = Celery('django_excel')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
Enter fullscreen mode Exit fullscreen mode

That was directly lifted from Celery Django documentation. Ensure you modify lines 6 and 8 to reflect your project's name. The namespace in line 14 enables you to prefix all celery-related configurations in your settings.py file with CELERY such as CELERY_BROKER_URL.

Note: Capitalization of Celery-Related Configurations

Because you are literarily providing constants, the celery-related configurations in your settings.py file are capitalized. For instance, one of the configurations is beat_schedule which in Django, becomes CELERY_BEAT_SCHEDULE.

Next, open open your project's __init__.py and append the following:

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)
Enter fullscreen mode Exit fullscreen mode

To conclude celery-related configurations, let's set the following in settings.py:

...
CELERY_BROKER_URL: str = config('REDIS_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND: str = config('REDIS_URL', default='redis://localhost:6379/0')
CELERY_ACCEPT_CONTENT: list[str] = ['application/json']
CELERY_TASK_SERIALIZER: str = 'json'
CELERY_RESULT_SERIALIZER: str = 'json'
Enter fullscreen mode Exit fullscreen mode

We are using redis as our broker. You can opt for RabbitMQ which is supported out-of-box by celery.

In the settings above, I am linking CELERY_BROKER_URL to an environment variable named REDIS_URL. It normally should look like redis://127.0.0.1:6379 on a Linux system. That means I could have set my CELERY_BROKER_URL and CELERY_RESULT_BACKEND as:

...
from celery.schedules import crontab
from decouple import Csv, config
...
CELERY_BROKER_URL: str = config('REDIS_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND: str = config('REDIS_URL', default='redis://localhost:6379/0')

CELERY_ACCEPT_CONTENT: list[str] = ['application/json']
CELERY_TASK_SERIALIZER: str = 'json'
CELERY_RESULT_SERIALIZER: str = 'json'
Enter fullscreen mode Exit fullscreen mode

Note that CELERY_RESULT_BACKEND is optional as well as CELERY_ACCEPT_CONTENT, CELERY_TASK_SERIALIZER, and CELERY_RESULT_SERIALIZER. However, not setting the last three might result in some runtime errors mostly when dealing with databases in asynchronous email broadcasting with celery.

The stage is now set, let's set up our database. We will be consuming CoinGecko's API and will be saving some data.

Step 2: Define FullCoin model

Our model will look like this:

from django.db import models

class FullCoin(models.Model):
    coin_id = models.CharField(max_length=100, primary_key=True)
    symbol = models.CharField(max_length=10, null=True, blank=True)
    name = models.CharField(max_length=100, null=True, blank=True)
    image = models.URLField(null=True, blank=True)
    current_price = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
    market_cap = models.BigIntegerField(null=True, blank=True)
    market_cap_rank = models.IntegerField(null=True, blank=True)
    fully_diluted_valuation = models.BigIntegerField(null=True, blank=True)
    total_volume = models.BigIntegerField(null=True, blank=True)
    high_24h = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
    low_24h = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
    price_change_24h = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
    price_change_percentage_24h = models.DecimalField(max_digits=10, decimal_places=5, null=True, blank=True)
    market_cap_change_24h = models.BigIntegerField(null=True, blank=True)
    market_cap_change_percentage_24h = models.DecimalField(max_digits=10, decimal_places=5, null=True, blank=True)
    circulating_supply = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
    total_supply = models.DecimalField(max_digits=20, decimal_places=2, null=True)
    max_supply = models.DecimalField(max_digits=20, decimal_places=2, null=True)
    ath = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
    ath_change_percentage = models.DecimalField(max_digits=10, decimal_places=5, null=True, blank=True)
    ath_date = models.DateTimeField(null=True, blank=True)
    atl = models.DecimalField(max_digits=20, decimal_places=2, null=True, blank=True)
    atl_change_percentage = models.DecimalField(max_digits=20, decimal_places=5, null=True, blank=True)
    atl_date = models.DateTimeField(null=True, blank=True)
    last_updated = models.DateTimeField(null=True, blank=True)

    def __str__(self):
        return f"{self.name} ({self.symbol.upper()})"
Enter fullscreen mode Exit fullscreen mode

These are all the fields taken directly from CoinGecko's public API for coin markets:

[
  {
    "id": "bitcoin",
    "symbol": "btc",
    "name": "Bitcoin",
    "image": "https://assets.coingecko.com/coins/images/1/large/bitcoin.png?1696501400",
    "current_price": 70187,
    "market_cap": 1381651251183,
    "market_cap_rank": 1,
    "fully_diluted_valuation": 1474623675796,
    "total_volume": 20154184933,
    "high_24h": 70215,
    "low_24h": 68060,
    "price_change_24h": 2126.88,
    "price_change_percentage_24h": 3.12502,
    "market_cap_change_24h": 44287678051,
    "market_cap_change_percentage_24h": 3.31157,
    "circulating_supply": 19675987,
    "total_supply": 21000000,
    "max_supply": 21000000,
    "ath": 73738,
    "ath_change_percentage": -4.77063,
    "ath_date": "2024-03-14T07:10:36.635Z",
    "atl": 67.81,
    "atl_change_percentage": 103455.83335,
    "atl_date": "2013-07-06T00:00:00.000Z",
    "roi": null,
    "last_updated": "2024-04-07T16:49:31.736Z"
  }
]
Enter fullscreen mode Exit fullscreen mode

Note:Commission from referral

If you use Coingecko's API, when you use my code, CGSIRNEIJ, I get some commissions. That can be a good way to support me.

null=True makes a column nullable in SQL:

CREATE TABLE IF NOT EXISTS full_coin(
    symbol VARCHAR(10) NULL,
    ...
)
Enter fullscreen mode Exit fullscreen mode

null=False or leaving it unset makes the column non-nullable:

CREATE TABLE IF NOT EXISTS full_coin(
    symbol VARCHAR(10) NOT NULL,
    ...
)
Enter fullscreen mode Exit fullscreen mode

blank=True allows the field to be optional in forms and the admin page.

Talking about the admin site, let's register the model:

from django.contrib import admin

from core.models import FullCoin

@admin.register(FullCoin)
class FullCoinAdmin(admin.ModelAdmin):
    list_display = (
        'coin_id',
        'symbol',
        'name',
        'current_price',
        'market_cap',
        'market_cap_rank',
        'fully_diluted_valuation',
        'total_volume',
        'high_24h',
        'low_24h',
        'price_change_24h',
        'price_change_percentage_24h',
        'market_cap_change_24h',
        'market_cap_change_percentage_24h',
        'circulating_supply',
        'total_supply',
        'max_supply',
        'ath',
        'ath_change_percentage',
        'ath_date',
        'atl',
        'atl_change_percentage',
        'atl_date',
        'last_updated',
    )
Enter fullscreen mode Exit fullscreen mode

With this, you can migrate your database:

```bash :Terminal:
virtualenv➜ django_excel git:(main) python manage.py makemigrations # create migration file(s)
virtualenv➜ django_excel git:(main) python manage.py migrate # create the tables in the db




Optionally, you can create a superuser:



```bash :Terminal:
virtualenv➜  django_excel git:(main) python manage.py createsuperuser
Enter fullscreen mode Exit fullscreen mode

Follow the prompts.

Step 3: Create and Register Periodic Tasks

Here is the juicy part:

...
import logging
import time

import requests
from celery import shared_task
from django.conf import settings

from core.models import FullCoin

logger = logging.getLogger(__name__)


def build_api_url(page: int) -> str:
    """Build the API URL."""
    market_currency_order = 'markets?vs_currency=usd&order=market_cap_desc&'
    per_page = f'per_page=50&page={page}&sparkline=false'
    return f'{settings.BASE_API_URL}/coins/{market_currency_order}{per_page}'


def fetch_coins_iteratively() -> Generator[dict, None, None]:
    """Fetch coins data from API using generator."""
    page = 1
    while True:
        try:
            url = build_api_url(page)
            response = requests.get(url)
            coin_data = response.json()

            # Check for rate limit response
            if isinstance(coin_data, dict) and coin_data.get('status', {}).get('error_code') == 429:
                logger.warning("Rate limit exceeded. Waiting 60 seconds...")
                time.sleep(60)
                continue

            # Check for empty response (end of pagination)
            if not coin_data:
                break

            yield from coin_data
            logger.info(f"Fetched page {page} with {len(coin_data)} coins")
            page += 1

            time.sleep(1)  # Be nice to the API

        except requests.exceptions.RequestException as e:
            logger.error(f"Request failed on page {page}: {e}")
            raise


@shared_task(
    bind=True,
    autoretry_for=(Exception,),
    retry_backoff=True,
    retry_backoff_max=600,
    max_retries=5,
)
def get_full_coin_data_iteratively_for_page(self) -> None:
    """Get full coin data iteratively for each page."""
    try:
        # Use list comprehension to collect coins in batches
        batch_size = 100
        coins_batch = []

        for coin in fetch_coins_iteratively():
            coins_batch.append(coin)

            if len(coins_batch) >= batch_size:
                logger.info(f"Processing batch of {len(coins_batch)} coins")
                store_data(coins_batch)
                coins_batch = []

        # Process remaining coins
        if coins_batch:
            logger.info(f"Processing final batch of {len(coins_batch)} coins")
            store_data(coins_batch)

    except Exception as e:
        logger.error(f"Failed to process coins: {e}")
        raise self.retry(exc=e)
Enter fullscreen mode Exit fullscreen mode

The build_api_url helps continuously build CoinGecko API url based on the supplied page number. The BASE_API_URL is:

```python :django_excel/settings.py:
...
BASE_API_URL: str = 'https://api.coingecko.com/api/v3'




`fetch_coins_iteratively` is the core of the program. It starts with the first page and does an "infinite" loop which breaks only when there's no data returned by the API using the iterative strategy.

Its recursive alternative is:



```python
def fetch_coins_recursively(page: int = 1) -> Generator[dict, None, None]:
    """Fetch coins data from API recursively using generator."""
    try:
        url = build_api_url(page)
        response = requests.get(url)
        coin_data = response.json()

        # Check for rate limit response
        if isinstance(coin_data, dict) and coin_data.get('status', {}).get('error_code') == 429:
            logger.warning("Rate limit exceeded. Waiting 60 seconds...")
            time.sleep(60)
            yield from fetch_coins_recursively(page)
            return

        # Base case: empty response (end of pagination)
        if not coin_data:
            return

        # Process current page
        yield from coin_data
        logger.info(f"Fetched page {page} with {len(coin_data)} coins")

        # Be nice to the API
        time.sleep(1)

        # Recursive case: fetch next page
        yield from fetch_coins_recursively(page + 1)

    except requests.exceptions.RequestException as e:
        logger.error(f"Request failed on page {page}: {e}")
        raise
Enter fullscreen mode Exit fullscreen mode

Then there is the get_full_coin_data_iteratively_for_page which is decorated by shared_task (for task autodiscovery). We supplied some parameters:

  • bind=True to access task instance via self
  • autoretry_for=(Exception,) to auto-retry on exceptions
  • retry_backoff=True for exponential backoff
  • max_retries=5 to limit retries to 5

For this task to be periodic, we must add it to the CELERY_BEAT_SCHEDULE in settings.py:

...
CELERY_BEAT_SCHEDULE: dict[str, dict[str, Any]] = {
    ...
    'get_full_coin_data_iteratively_for_page': {
        'task': 'core.tasks.get_full_coin_data_iteratively_for_page',
        'schedule': crontab(minute='*/3'),
    },
}
Enter fullscreen mode Exit fullscreen mode

It schedules this task to run every 3 minutes ('*/3') using crontab.

These implementations were with performance in mind. However, there is still room for improvement.

Step 4: Bonus

There are APIs whose paginations are not page-based but use the next (default DRF pagination strategy). For these systems, the last bits of data have empty next. That's the breaking point:

def get_api_data(url: str) -> None:
    """Make recursive requests."""
    headers = {'Authorization': f'Token {settings.API_TOKEN}'}
    response = requests.get(url, headers=headers)
    data: dict[str, Any] = response.json()

    if 'next' in data and data.get('next') is not None:
        get_api_data(data.get('next'))
Enter fullscreen mode Exit fullscreen mode

That's it! I hope you enjoyed it.

Outro

Enjoyed this article? I'm a Software Engineer, Technical Writer, and Technical Support Engineer actively seeking new opportunities, particularly in areas related to web security, finance, healthcare, and education. If you think my expertise aligns with your team's needs, let's chat! You can find me on LinkedIn and X. I am also an email away.

If you found this article valuable, consider sharing it with your network to help spread the knowledge!

Top comments (0)