DEV Community

Cover image for Reducing Orchestration Costs Through Cloud Task And Cloud Scheduler
Jolver Ardila
Jolver Ardila

Posted on

Reducing Orchestration Costs Through Cloud Task And Cloud Scheduler

If you are working for a small company and they are requiring you to bring with a very smart, great, and easy solution to orchestrate different king of workflows but "not too expensive" which is in most of the cases the main requirement, then you're welcome to explore some interesting alternatives.

Before navigating between some alternatives, let's have some concepts clear just to understand the context.

Orchestration

Orchestration refers to the automated arrangement, coordination, and management of complex systems, services, or workflows. It involves ensuring that tasks or services are executed in the correct order, at the right time, and with the proper dependencies or conditions satisfied.

Workflows

A workflow is a defined series of steps or tasks that must be performed to complete a process. It often represents a business or technical process, where each step has dependencies or prerequisites.

Endpoint Callback

An endpoint callback refers to a mechanism where a system or service sends a response to a pre-defined URL (the callback endpoint) after completing a specific task or event.

Task Scheduling

Task scheduling is the process of planning and executing tasks or jobs at specified times or intervals. It ensures that operations are performed automatically without manual intervention.

If you are an experienced software developer you might be thinking in some products that can help you with it. Solutions like Apache Airflow are powerful but often excessive for simpler workflows. Additionally, platforms like Airflow on Composer come with higher operational costs, making them inaccessible for businesses with tighter budgets.

Another alternative to schedule some tasks is using python with Celery, but this will require you to have a cache database like Redis or RabbitMQ as a broker, and again, this is not cheap and will require a little bit more of coding to control everything.

Before moving into cloud scheduler alternative, be aware that this post is focused mainly in simple solutions to orchestrate workflows that does not requires complex validations and conditional paths between steps.

Now, it's time to deep dive into a effective alternative that involves the use of Google Cloud Scheduler and Google Cloud Tasks.

A real life job scenario
In this example let's supposed you are working for a company that required you to implement an architecture to execute asynchronously some process after an endpoint is called by the mobile application, on the other hand, the product owner wants to have a scheduled process to validate the status of certain records in the database (each 4 hours).

Let's create the endpoint the mobile applications will be calling

pip install fastapi uvicorn google-cloud-tasks
Enter fullscreen mode Exit fullscreen mode
from fastapi import FastAPI, HTTPException
from google.cloud import tasks_v2
from datetime import datetime, timedelta

app = FastAPI()

@app.post("/trigger-task")
def trigger_task(data: dict):
    """
    Synchronous endpoint called by the mobile app to trigger an asynchronous process.
    """
    # Implement here your logic
    try:
        # Cloud Tasks client configuration
        queue_name = "projects/my-project-id/locations/us-central1/queues/my-queue"
        client = tasks_v2.CloudTasksClient()

        # Define the task payload
        task = {
            "http_request": {
                "http_method": tasks_v2.HttpMethod.POST,
                "url": "https://my-service-url.com/late-process",  # Cloud Tasks callback URL
                "headers": {"Content-Type": "application/json"},
                "body": data.get("payload", "{}").encode(),
            }
        }

        # Optional: Schedule task with a delay (e.g., 30 seconds)
        schedule_time = datetime.utcnow() + timedelta(seconds=30)
        task["schedule_time"] = tasks_v2.types.Timestamp(seconds=int(schedule_time.timestamp()))

        # Create the task in the specified queue
        response = client.create_task(parent=queue_name, task=task)
        return {"message": "Task triggered successfully", "task_name": response.name}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
Enter fullscreen mode Exit fullscreen mode

When a user using the app consume the endpoint /trigger-task your backend will perform your logic and after that a new cloud task will be created to consume another endpoint as a callback.

Now we'll create the endpoint for the callback

@app.post("/late-process") 
def async_process(payload: dict):
    """
    Callback endpoint for Cloud Tasks to execute the asynchronous process.
    """
    # Your logic here
    # Think something like email, generate a report, validate something else, etc...
    try:
        # Simulate processing
        print(f"Processing async task with payload: {payload}")
        # Logic for processing (e.g., generating a report)
        return {"status": "success", "message": "Task processed"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
Enter fullscreen mode Exit fullscreen mode

Well, now we have our endpoint to trigger the cloud task and another endpoint to execute the callback from cloud task.
But, at this point you noticed that we are missing something here. Yes, we have to create now the resources in Google Cloud to makes all this things possible.

First thing we have to do is enabling the api in GCP

PROJECT_ID="my-project-id"
gcloud services enable --project="${PROJECT_ID}" \
    cloudscheduler.googleapis.com \
    cloudtasks.googleapis.com
Enter fullscreen mode Exit fullscreen mode

Now we have to create the task queue

REGION="us-central1" # your preferred region
gcloud tasks queues create my-queue --location="${REGION}"
Enter fullscreen mode Exit fullscreen mode

We are almost there, you've created the backend endpoints, gcp api enabled, cloud task queue created and ready to receive tasks.

But there's another requirement we haven't completed yet. The company is asking us a way to scheduled a task to validate the status of certain records periodically. And now is the time to use another powerful product from GCP which is Cloud Scheduler.

Let's create the scheduler job with the specifications received from the company, which is "each 4 hours"

gcloud scheduler jobs create http validate-records-job \
    --schedule="0 */4 * * *" \
    --http-method=POST \
    --uri="https://my-service-url.com/validate-records" \
    --headers="Content-Type=application/json" \
    --time-zone="UTC"
Enter fullscreen mode Exit fullscreen mode

With this command we are telling cloud scheduler that we want a new request to the endpoint https://my-service-url.com/validate-records each 4 hours 0 */4 * * * using the http method POST and considering the time zone UTC. If you wants to know a little bit more about crontab specifications you can play using this page: https://crontab.guru/

Another alternative for you to consider here (not the main scope of this post), is the use of PubSub topic and a Cloud Function in order to decoupling the different parts. It's a great alternative if you want to know when a certain event is triggered.

Finally, we need to create our last endpoint to receive the call of validating the status of some records.

@app.post("/validate-records")
def validate_records():
    """
    Endpoint to validate records in the database, triggered by Cloud Scheduler.
    """
    try:
        # Simulate database validation
        print("Validating records in the database...")
        # Logic to validate records (e.g., check statuses, update fields)
        return {"status": "success", "message": "Records validated"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
Enter fullscreen mode Exit fullscreen mode

Summary
There it is, we have created an entire orchestration workflow using no more than 3 endpoints and 2 gcp products (Cloud Task and Cloud Scheduler).
If you feel that this solution does not makes you feel smart just because you are not using Apache Airflow that's totally fine. This post does not have the purpose of rejecting the use of Apache Airflow, but leveraging cheaper alternatives for small companies that are not able to afford Cloud Composer and an entire ecosystem just to orchestrate couple of flows.

When you compare the costs between Cloud Composer and Cloud Scheduler you'll notice that the difference between them is ridiculous.

Feel free to share your thoughts about this alternative and complement this solution so others can have a better guide to help the companies they are working for.

Some useful resources:
Cloud Task Documentation -> https://cloud.google.com/tasks/docs
Cloud Scheduler -> https://cloud.google.com/scheduler/docs

Top comments (0)