If you are working for a small company and they are requiring you to bring with a very smart, great, and easy solution to orchestrate different king of workflows but "not too expensive" which is in most of the cases the main requirement, then you're welcome to explore some interesting alternatives.
Before navigating between some alternatives, let's have some concepts clear just to understand the context.
Orchestration
Orchestration refers to the automated arrangement, coordination, and management of complex systems, services, or workflows. It involves ensuring that tasks or services are executed in the correct order, at the right time, and with the proper dependencies or conditions satisfied.
Workflows
A workflow is a defined series of steps or tasks that must be performed to complete a process. It often represents a business or technical process, where each step has dependencies or prerequisites.
Endpoint Callback
An endpoint callback refers to a mechanism where a system or service sends a response to a pre-defined URL (the callback endpoint) after completing a specific task or event.
Task Scheduling
Task scheduling is the process of planning and executing tasks or jobs at specified times or intervals. It ensures that operations are performed automatically without manual intervention.
If you are an experienced software developer you might be thinking in some products that can help you with it. Solutions like Apache Airflow are powerful but often excessive for simpler workflows. Additionally, platforms like Airflow on Composer come with higher operational costs, making them inaccessible for businesses with tighter budgets.
Another alternative to schedule some tasks is using python with Celery, but this will require you to have a cache database like Redis or RabbitMQ as a broker, and again, this is not cheap and will require a little bit more of coding to control everything.
Before moving into cloud scheduler alternative, be aware that this post is focused mainly in simple solutions to orchestrate workflows that does not requires complex validations and conditional paths between steps.
Now, it's time to deep dive into a effective alternative that involves the use of Google Cloud Scheduler and Google Cloud Tasks.
A real life job scenario
In this example let's supposed you are working for a company that required you to implement an architecture to execute asynchronously some process after an endpoint is called by the mobile application, on the other hand, the product owner wants to have a scheduled process to validate the status of certain records in the database (each 4 hours).
Let's create the endpoint the mobile applications will be calling
pip install fastapi uvicorn google-cloud-tasks
from fastapi import FastAPI, HTTPException
from google.cloud import tasks_v2
from datetime import datetime, timedelta
app = FastAPI()
@app.post("/trigger-task")
def trigger_task(data: dict):
"""
Synchronous endpoint called by the mobile app to trigger an asynchronous process.
"""
# Implement here your logic
try:
# Cloud Tasks client configuration
queue_name = "projects/my-project-id/locations/us-central1/queues/my-queue"
client = tasks_v2.CloudTasksClient()
# Define the task payload
task = {
"http_request": {
"http_method": tasks_v2.HttpMethod.POST,
"url": "https://my-service-url.com/late-process", # Cloud Tasks callback URL
"headers": {"Content-Type": "application/json"},
"body": data.get("payload", "{}").encode(),
}
}
# Optional: Schedule task with a delay (e.g., 30 seconds)
schedule_time = datetime.utcnow() + timedelta(seconds=30)
task["schedule_time"] = tasks_v2.types.Timestamp(seconds=int(schedule_time.timestamp()))
# Create the task in the specified queue
response = client.create_task(parent=queue_name, task=task)
return {"message": "Task triggered successfully", "task_name": response.name}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
When a user using the app consume the endpoint /trigger-task
your backend will perform your logic and after that a new cloud task will be created to consume another endpoint as a callback.
Now we'll create the endpoint for the callback
@app.post("/late-process")
def async_process(payload: dict):
"""
Callback endpoint for Cloud Tasks to execute the asynchronous process.
"""
# Your logic here
# Think something like email, generate a report, validate something else, etc...
try:
# Simulate processing
print(f"Processing async task with payload: {payload}")
# Logic for processing (e.g., generating a report)
return {"status": "success", "message": "Task processed"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Well, now we have our endpoint to trigger the cloud task and another endpoint to execute the callback from cloud task.
But, at this point you noticed that we are missing something here. Yes, we have to create now the resources in Google Cloud to makes all this things possible.
First thing we have to do is enabling the api in GCP
PROJECT_ID="my-project-id"
gcloud services enable --project="${PROJECT_ID}" \
cloudscheduler.googleapis.com \
cloudtasks.googleapis.com
Now we have to create the task queue
REGION="us-central1" # your preferred region
gcloud tasks queues create my-queue --location="${REGION}"
We are almost there, you've created the backend endpoints, gcp api enabled, cloud task queue created and ready to receive tasks.
But there's another requirement we haven't completed yet. The company is asking us a way to scheduled a task to validate the status of certain records periodically. And now is the time to use another powerful product from GCP which is Cloud Scheduler.
Let's create the scheduler job with the specifications received from the company, which is "each 4 hours"
gcloud scheduler jobs create http validate-records-job \
--schedule="0 */4 * * *" \
--http-method=POST \
--uri="https://my-service-url.com/validate-records" \
--headers="Content-Type=application/json" \
--time-zone="UTC"
With this command we are telling cloud scheduler that we want a new request to the endpoint https://my-service-url.com/validate-records
each 4 hours 0 */4 * * *
using the http method POST
and considering the time zone UTC
. If you wants to know a little bit more about crontab specifications you can play using this page: https://crontab.guru/
Another alternative for you to consider here (not the main scope of this post), is the use of PubSub topic and a Cloud Function in order to decoupling the different parts. It's a great alternative if you want to know when a certain event is triggered.
Finally, we need to create our last endpoint to receive the call of validating the status of some records.
@app.post("/validate-records")
def validate_records():
"""
Endpoint to validate records in the database, triggered by Cloud Scheduler.
"""
try:
# Simulate database validation
print("Validating records in the database...")
# Logic to validate records (e.g., check statuses, update fields)
return {"status": "success", "message": "Records validated"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Summary
There it is, we have created an entire orchestration workflow using no more than 3 endpoints and 2 gcp products (Cloud Task and Cloud Scheduler).
If you feel that this solution does not makes you feel smart just because you are not using Apache Airflow that's totally fine. This post does not have the purpose of rejecting the use of Apache Airflow, but leveraging cheaper alternatives for small companies that are not able to afford Cloud Composer and an entire ecosystem just to orchestrate couple of flows.
When you compare the costs between Cloud Composer and Cloud Scheduler you'll notice that the difference between them is ridiculous.
Feel free to share your thoughts about this alternative and complement this solution so others can have a better guide to help the companies they are working for.
Some useful resources:
Cloud Task Documentation -> https://cloud.google.com/tasks/docs
Cloud Scheduler -> https://cloud.google.com/scheduler/docs
Top comments (0)