Last week, we discussed a quick weekend project on building a word game. That project was sparked by a conversation suggesting I integrate LLM functionality into my experimental chatbot, BigMeow. Currently, I’m working on another chatbot project, and I’m reusing some code from BigMeow. Back then, I struggled a lot, especially with the lack of information on asynchronous programming in Python. Using the Telegram Bot API as an example, I hope to provide some helpful insights for those interested in this area.
A cute photo generated by Copilot on the topic
From Synchronous Simplicity to Asynchronous Challenges
The previous incarnation of BigMeow was built with the synchronous version of python-telegram-bot. It was a simple bot to report the current petrol price in Malaysia, which would be updated every week. Because it is synchronous, it is very simple; I just needed to copy-pasta from the tutorial, then add in handlers to fetch and send petrol prices.
I don’t follow the trend of language development closely. Pre-ES6 JavaScript was my first experience in writing code, where things happen asynchronously. We could set a piece of code to run on a timer with setTimeout or setInterval, or fetch a page with jquery.ajax (pre-fetch API era). However, in the code there’s no way to know when it is done, nor direct access to the result upon completion.
I began async programming with JavaScript. Photo by Claudio Schwarz on Unsplash
It took me a while to rewire my brain to get used to this asynchronous code execution. Where things may happen outside normal flow of execution, and extra care is needed when using the returned value. Code simply has to be written in another way with different mindset. Eventually, a convenient construct such as Promise gets built into the language, and async-await syntax follows.
When async-await was introduced in ES6, I already moved on to Python full-time. Most libraries I used daily are still synchronous to this date, though some started exploring the possibility. The telegram library, I mentioned earlier, was the first one to announce its switch completely to the asynchronous world.
Async-await syntax brings the familiarity of synchronous programming, so it shouldn’t be a big deal, right?
Diving into Asynchronous Development: Webhooks, Queues, and AsyncIO
Messages to be put in a queue. Photo by Joanna Kosinska on Unsplash
It could be the case in other language, but not so much in Python. The experience does get better over time, but I still find myself suddenly had to learn about things like asyncio, event loop, coroutines etc. To be fair, I did learn about greenlets at work before this, but didn’t really have a chance to apply that knowledge. However, this new asyncio feels completely different.
Anyway, enough reminiscing about the past, this is not intended to be the ultimate guide on asynchronous programming, but a more pragmatic quick-start guide I wish I had back then. Assuming we are in a properly managed project (either through tools like poetry or uv), let’s start with a new module telegram.py for our telegram bot. Remember to add python-telegram-bot dependency to the project.
import asyncio
from threading import Event
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
application = ApplicationBuilder().token("Your telegram token").build()
async def run(exit_event: Event) -> None:
async with application:
await setup()
# Starting the bot
await application.start()
# Receive messages
# Wait for exit signal
await asyncio.to_thread(exit_event.wait)
# Stopping the bot
await application.stop()
async def setup() -> None:
pass
We will build a web application to host a webhook to receive messages from users interacting with our bot. However, before that, we need a queue to relay the message from our webhook to our bot. Considering we will be referencing this queue in both the bot and the web application, let’s define this in a separate settings.py. We also need a secret token to only allow legitimate requests from Telegram; therefore, let’s also define it here.
import multiprocessing
telegram_queue = multiprocessing.Queue()
TELEGRAM_SECRET_TOKEN = "Some random token"
The web application and bot will run in their separate processes, hence we are using a process-safe queue. Next, we will use FastAPI (include fastapi[standard] to the list of dependencies) to build the web application, with the following code in web.py.
import asyncio
from functools import partial
from threading import Event
from typing import Annotated
import uvicorn
from fastapi import FastAPI, Header, Query, Request, Response
import settings
app = FastAPI()
async def run(exit_event: Event) -> None:
# Configure the server
server = uvicorn.Server(
uvicorn.Config(
"yourapp.web:app",
host="0.0.0.0",
port=80,
log_level="info",
)
)
# Start the webserver
asyncio.create_task(server.serve())
await asyncio.to_thread(exit_event.wait)
# Stop the webserver
await server.shutdown()
Both FastAPI and python-telegram-bot are asynchronous, thus most of our functions are marked async. The run functions are the entry point to both the bot and web application, and the functions both expect an Event object that serves as a cue to exit when needed.
Next, we implement the actual webhook in web.py
@app.post("/webhook/telegram", include_in_schema=False)
async def telegram_webhook(
request: Request,
x_telegram_bot_api_secret_token: Annotated[
str, Header(pattern=settings.TELEGRAM_SECRET_TOKEN, strict=True)
],
) -> None:
# pass the message to the relay queue
asyncio.create_task(
asyncio.to_thread(
partial(settings.telegram_queue.put, await request.json()),
)
)
In the code above, to authenticate the webhook requests, we want to ensure each request has the token defined in settings.py is present in the request header X-Telegram-Bot-Api-Secret-Token. Once that is checked, we pass the incoming data to our process-safe queue. As the operation of putting data into the queue is blocking, we use asyncio.to_thread to execute the operation in a separate thread.
Now we can talk a little about asynchronous programming in python after seeing the code so far. As shown above, we find that async-await brings the familiarity of synchronous programming. In fact, if we want to, we can simply put an await keyword in front of every async function and the code would still work, albeit possibly more inefficient.
The beauty of asynchronous programming is that we can schedule certain parts of the program to run concurrently. Putting the await keyword before the function call, causes the execution to wait for results to return. However, there are times we want to ensure that function is called, or we only care about the progress much later. This is when we schedule it to run by using asyncio.create_task.
The web application part is practically done, we can now move on to the bot telegram.py. Previously, we relayed the messages sent by users to a queue. To consume the queue, we will implement the following coroutine, which is another term for functions defined with the async keyword.
import queue
from contextlib import suppress
from typing import Awaitable, Callable
async def loop_queue(coro_func: Callable[[], Awaitable[None]]) -> None:
while True:
await coro_func()
async def message_consume() -> None:
with suppress(queue.Empty):
await application.update_queue.put(
Update.de_json(
await asyncio.to_thread(
partial(
settings.telegram_queue.get,
timeout=10,
)
),
application.bot,
)
)
In message_consume we first retrieve the message from settings.telegram_queue. A timeout is set as we do not want it to stall indefinitely, such that it allows other async operations to run concurrently. Should we fail to receive anything when the timeout happens, we suppress the error and move on. Otherwise, we parse the message with Update.de_json and pass it to the application queue for further process. The helper coroutine loop_queue was written to avoid excessive nesting in message_consume.
We just want to ensure message_consume is scheduled, therefore we wrap it with asyncio.create_task in the run coroutine, as follows:
async def run(exit_event: Event) -> None:
async with application:
await setup()
# Starting the bot
await application.start()
# Receive messages
asyncio.create_task(loop_queue(message_consume))
# Wait for exit signal
await asyncio.to_thread(exit_event.wait)
# Stopping the bot
await application.stop()
Next, we want to define the behavior of messages received. Let’s revise the setup coroutine.
async def setup() -> None:
# Initializing application
asyncio.create_task(
application.bot.set_webhook(
"https://the/address/to/the/webhook/telegram/",
allowed_updates=Update.ALL_TYPES,
secret_token=settings.TELEGRAM_SECRET_TOKEN,
)
)
application.add_handlers(
(
CommandHandler("start", update_handle_welcome),
# Other handlers
)
)
The first statement registers the webhook, and it requires the URL to the endpoint, as well as the secret token to authenticate the web request. The second statement registers the behaviour of the bot. For now, we want to respond to the /start command. The implementation of the handler is shown below,
async def update_handle_welcome(
update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
try:
assert update.effective_chat
await context.bot.send_message(update.effective_chat.id, "Welcome to my bot")
except AssertionError as e:
logger.error(e)
Now every time the user starts a conversation with our bot, it responds with a friendly welcome message. Adding another handler is easy, for instance, we want to echo the same message users send us, we simply append to the tuple in the application.add_handlers statement.
async def setup() -> None:
...
application.add_handlers(
(
CommandHandler("start", update_handle_welcome),
# Other handlers
MessageHandler(filters.TEXT, update_handle_text),
)
)
...
async def update_handle_text(
update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
try:
assert update.effective_chat
assert update.message
assert update.message.from_user
assert update.message.text
await context.bot.send_message(
update.effective_chat.id,
update.message.text,
reply_to_message_id=update.message.id,
allow_sending_without_reply=True,
)
except AssertionError as e:
logger.exception(e)
Finally, we want to write code to run both the bot and web application in parallel, each running in a separate process. For that, we schedule them with a ProcessPoolExecutor
import asyncio
import threading
from concurrent.futures import ProcessPoolExecutor
from telegram import run as tg_run
from web import run as web_run
def process_run(func, pexit_event: threading.Event) -> None:
asyncio.run(func(pexit_event))
with ProcessPoolExecutor() as executor:
executor.submit(process_run, tg_run, exit_event)
executor.submit(process_run, web_run, exit_event)
Ah, I forgot about the exit_event we passed to the run coroutines. So both separate applications are expected to run indefinitely. In the past, I’ve seen tutorials including an infinite loop to the end, so the application does not exit prematurely. However we could also pass a process-safe Event object and prompt the application to safely exit as required, for example when we kill the process with CTRL+C.
import signal
from functools import partial
def shutdown_handler(
_signum, _frame, exit_event: threading.Event
) -> None:
exit_event.set()
manager = multiprocessing.Manager()
exit_event = manager.Event()
for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
signal.signal(
s, partial(shutdown_handler, exit_event=exit_event)
)
with ProcessPoolExecutor() as executor:
executor.submit(process_run, tg_run, exit_event)
executor.submit(process_run, web_run, exit_event)
The final steps are to register the bot with BotFather, replace the placeholders in the code (the webhook URL and the Telegram bot token), and then the bot is ready to go.
Lessons Learned and Architectural Insights
Better QOL developing AsyncIO apps now. Photo by Usman Yousaf on Unsplash
The quality-of-life is definitely improving for application development that utilizes asyncio. For instance we do not have to explicitly set up and manage the event loop any more in recent Python releases. However it can still be challenging in finding tutorials in building applications involving multiple asyncio libraries such as this.
BigMeow started with a goal: I want to run multiple chatbots within the same application. I first started with Telegram and Discord. The earlier iterations were done purely by brute force without knowing much about async-programming in Python. However, it worked, and I just left it be for a while.
It started to break, when I was attempting to switch the telegram bot to receive user messages via a webhook. I couldn’t find a way to properly host a web application within the mess I made. That’s when I started to go around looking for articles and tutorials on the topic. One remarkable series of articles I found was written by Lynn Root on the topic.
The series is very well-written, and provides a good introduction to asyncio. It definitely helped me to properly learn how to do asynchronous programming in Python. Though it was written for earlier versions of Python, it is still a useful guide for newcomers.
Both BigMeow and my current chatbot project owes quite a lot to Root’s tutorial. Hopefully my contribution through this article helps to clear doubts for people who are interested in asynchronous programming.
Some ideas for improvement. Photo by ameenfahmy on Unsplash
The bot we covered today is not perfect, but it is a good enough start for further exploration. Concurrency does not mean parallelism, so cramming both the bot and web application within one single process may not be a great idea. Though through clever scheduling, it reduces the risk of blocking, but if there are too many tasks scheduled lagging is inevitable.
Separating them into separate process, also allow us to split them into individual programs without much change in code in the future. One of the changes, would likely be replacing the process-safe queue, to something that allows inter-process communication, or even message queues systems.
While this tutorial is on telegram bot and web application, it can be adapted to other applications with multiple components by following the general design. If the application does not need to run indefinitely, the exit_event part can be omitted, and the program ends upon completion.
The article is quite a bit longer than I initially expected. So I should probably end here. It is not hard to build a chatbot, and with services offering LLM APIs getting more accessible these days, it is easier than ever to integrate chatbots and LLM together.
Lastly, thanks for reading, and I shall write again, next week.
While a large language model provided editorial assistance to refine the structure and clarity of this article, all ideas, concepts, and code are my own. For project collaborations or job opportunities, please connect with me here on Medium or via my LinkedIn profile.
Top comments (0)