Python 3.5 introduced two new keywords: async
and await
. These seemingly magic keywords enable thread-like concurrency without any threads at all. In this tutorial we will motivate why async programming exists and illustrate how Python's async/await
keywords work internally by building our own mini asyncio-like framework.
Why async programming?
To understand the motivation for async programming, we first must understand what limits the speed that our code can run. Ideally we'd like our code to run at light speed, instantly jumping through our code without any delay. However, in reality code runs much slower than that because of two factors:
- CPU time (time for the processor to execute instructions)
- IO time (time waiting for network requests or storage reads/writes)
When our code is waiting for IO, the CPU is essentially idle, waiting on some external device to respond. Typically the kernel will detect this and immediately switch to executing other threads in the system. So, if we want to speed up processing a set of IO-bound tasks, we can create one thread for each task. When one of the threads halts, waiting for IO, the kernel will switch to another thread to continue processing.
This works quite well in practice, but there are two downsides:
- Threads have an overhead (especially so in Python)
- We can't control when the kernel chooses to switch between threads
For instance, if we wanted to execute 10,000 tasks, we would either have to create 10,000 threads which would take a lot of RAM, or we would need to create a smaller number of worker threads and execute the tasks with less concurrency. Additionally, initially spawning these threads would take CPU time.
Since the kernel can choose to switch between threads at any time, race conditions can occur at any point in our code.
Introducing async
In traditional synchronous thread-based code, the kernel must detect when a thread is IO-bound and it chooses to switch between threads at-will. With Python async, the programmer explicitly declares IO-bound lines of code with the await
keyword and that explicitly gives permission for other tasks to be executed. For example, consider this code that performs a web request:
async def request_google():
reader, writer = await asyncio.open_connection('google.com', 80)
writer.write(b'GET / HTTP/2\n\n')
await writer.drain()
response = await reader.read()
return response.decode()
Here we see this code await
s in two places. So, while waiting for our bytes to be sent to the server (writer.drain()
), and while waiting for the server to reply with some bytes (reader.read()
), we know that other code might execute and global variables might change. However, from the start of the function until the first await
, we can be sure that our code runs line by line without ever switching to run other code in our program. This is the beauty of async.
asyncio
is a standard library that lets us do actually interesting things with these asynchronous functions. For instance, if we wanted to perform two requests to Google at once, we could do:
async def request_google_twice():
response_1, response_2 = await asyncio.gather(request_google(), request_google())
return response_1, response_2
When we call request_google_twice()
, the magical asyncio.gather
will start one function call, but when we await writer.drain()
, it will start executing the second function call, so that both requests happen in parallel. Then, it waits for either the first or second request's writer.drain()
call to complete and continues that function's execution.
Finally, there was one important detail that was left out: asyncio.run
. To actually call an asynchronous function from a regular [synchronous] Python function, we wrap the call in asyncio.run(...)
:
async def async_main():
r1, r2 = await request_google_twice()
print('Response one:', r1)
print('Response two:', r2)
return 12
return_val = asyncio.run(async_main())
Notice that if we just call async_main()
without await ...
or asyncio.run(...)
, nothing happens. This is expected simply by the nature of how async works.
So, how exactly does async work and what do these magical asyncio.run
and asyncio.gather
functions do? Read below to find out.
How async works
To understand the magic of async
, we first need to understand a simpler Python construct: the generator.
Generators
Generators are python functions that return a sequence of values one by one (an iterable). For example:
def get_numbers():
print("|| get_numbers begin")
print("|| get_numbers Giving 1...")
yield 1
print("|| get_numbers Giving 2...")
yield 2
print("|| get_numbers Giving 3...")
yield 3
print("|| get_numbers end")
print("| for begin")
for number in get_numbers():
print(f"| Got {number}.")
print("| for end")
| for begin
|| get_numbers begin
|| get_numbers Giving 1...
| Got 1.
|| get_numbers Giving 2...
| Got 2.
|| get_numbers Giving 3...
| Got 3.
|| get_numbers end
| for end
So we see that each iteration of the for loop we step once in the generator. We can perform this iteration even more explicitly using Python's next()
function:
In [3]: generator = get_numbers()
In [4]: next(generator)
|| get_numbers begin
|| get_numbers Giving 1...
Out[4]: 1
In [5]: next(generator)
|| get_numbers Giving 2...
Out[5]: 2
In [6]: next(generator)
|| get_numbers Giving 3...
Out[6]: 3
In [7]: next(generator)
|| get_numbers end
---------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-154-323ce5d717bb> in <module>
----> 1 next(generator)
StopIteration:
This is very similar to the behavior of an async function. Just as async functions execute code contiguously from the start of the function until the first await, the first time we call next()
, a generator will execute from the top of the function to the first yield statement. However, right now we are just returning numbers from the generator. We'll use this same idea, but return something different to create async-like functions using generators.
Using generators for async
Let's use generators to make our own mini async-like framework.
However, for simplicity, let's replace actual IO with sleeping (ie. time.sleep
). Let's consider an application that needs to send updates on regular intervals:
def send_updates(count: int, interval_seconds: float):
for i in range(1, count + 1):
time.sleep(interval_seconds)
print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
So if we call send_updates(3, 1.0)
, it will output these three messages, 1 second apart each:
[1.0] Sending update 1/3.
[1.0] Sending update 2/3.
[1.0] Sending update 3/3.
Now, let's say we want to run this for a few different intervals at the same time. Say, send_updates(10, 1.0)
, send_updates(5, 2.0)
, and send_updates(4, 3.0)
. We could do this using threads as follows:
threads = [
threading.Thread(target=send_updates, args=(10, 1.0)),
threading.Thread(target=send_updates, args=(5, 2.0)),
threading.Thread(target=send_updates, args=(4, 3.0))
]
for i in threads:
i.start()
for i in threads:
i.join()
This works, completing in around 12 seconds, but uses threads which has the downsides mentioned previously. Let's build the same thing using generators.
In our example demonstrating generators, we returned integers. To get async-like behavior, instead of returning an arbitrary value, we want to return some object that describes the IO we want to wait on. In our case, our "IO" is simply a timer that will wait for some duration of time. So, let's create a timer object that we will use for this purpose:
class AsyncTimer:
def __init__(self, duration: float):
self.done_time = time.time() + duration
Now, let's yield this from our function instead of calling time.sleep
:
def send_updates(count: int, interval_seconds: float):
for i in range(1, count + 1):
yield AsyncTimer(interval_seconds)
print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
Now, each time we call next(...)
on a call to send_updates(...)
, we will get an AsyncTimer
object that tells us until when we are supposed to wait:
generator = send_updates(3, 1.5)
timer = next(generator) # [1.5] Sending update 1/3.
print(timer.done_time - time.time()) # 1.498...
Since our code now doesn't actually call time.sleep
, we can now execute another send_updates
invocation at the same time.
So, to put this all together, we need to take a step back and realize a few things:
- Generators are like partially executed functions, waiting on some IO (a timer)
- Each partially executed function has some IO (timer) that it is waiting on before it can continue execution
- So the current state of our program is a list of pairs of each partially executed function (generator) and the IO this function is waiting on (a timer)
- Now, to run our program, we just need to wait until some IO is ready (ie. one of our timers has expired), and then execute the corresponding function one step forward, getting a new IO that is blocking the function.
Implementing this logic gives us the following:
# Initialize each generator with a timer of 0 so it immediately executes
generator_timer_pairs = [
(send_updates(10, 1.0), AsyncTimer(0)),
(send_updates(5, 2.0), AsyncTimer(0)),
(send_updates(4, 3.0), AsyncTimer(0))
]
while generator_timer_pairs:
pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
generator, min_timer = pair
# Wait until this timer is ready
time.sleep(max(0, min_timer.done_time - time.time()))
del generator_timer_pairs[generator_timer_pairs.index(pair)]
try: # Execute one more step of this function
new_timer = next(generator)
generator_timer_pairs.append((generator, new_timer))
except StopIteration: # When the function is complete
pass
And with that, we have ourselves a working example of async-like functions using generators. Notice that when a generator is done it raises StopIteration
, and when we have no more partially executed functions (generators), our function is done.
Now, we just wrap this in a function and we have something roughly similar to asyncio.run
combined with asyncio.gather
:
def async_run_all(*generators):
generator_timer_pairs = [
(generator, AsyncTimer(0))
for generator in generators
]
while generator_timer_pairs:
pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
generator, min_timer = pair
time.sleep(max(0, min_timer.done_time - time.time()))
del generator_timer_pairs[generator_timer_pairs.index(pair)]
try:
new_timer = next(generator)
generator_timer_pairs.append((generator, new_timer))
except StopIteration:
pass
async_run_all(
send_updates(10, 1.0),
send_updates(5, 2.0),
send_updates(4, 3.0)
)
Using async/await for async
The final step to achieving our caveman's version of asyncio
is to support the async/await
syntax introduced in Python 3.5. await
behaves similarly to yield
except instead of returning the provided value directly, it returns next((...).__await__())
. And async
functions return "coroutines" which behave like generators but need to use .send(None)
instead of next()
(notice, just as generators don't return anything when they are initially called, async functions don't do anything until they are stepped through which explains what we mentioned earlier).
So, given this information, we only have to make a few adjustments to convert our example to async/await
. Here's the final result:
class AsyncTimer:
def __init__(self, duration: float):
self.done_time = time.time() + duration
def __await__(self):
yield self
async def send_updates(count: int, interval_seconds: float):
for i in range(1, count + 1):
await AsyncTimer(interval_seconds)
print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
def _wait_until_io_ready(ios):
min_timer = min(ios, key=lambda x: x.done_time)
time.sleep(max(0, min_timer.done_time - time.time()))
return ios.index(min_timer)
def async_run_all(*coroutines):
coroutine_io_pairs = [
(coroutine, AsyncTimer(0))
for coroutine in coroutines
]
while coroutine_io_pairs:
ios = [io for cor, io in coroutine_io_pairs]
ready_index = _wait_until_io_ready(ios)
coroutine, _ = coroutine_io_pairs.pop(ready_index)
try:
new_io = coroutine.send(None)
coroutine_io_pairs.append((coroutine, new_io))
except StopIteration:
pass
async_run_all(
send_updates(10, 1.0),
send_updates(5, 2.0),
send_updates(4, 3.0)
)
There we have it, our mini async example complete, using async/await
. Now, you may have noticed I renamed timer to io and extracted logic for finding the minimum timer into a function called _wait_until_io_ready
. This is intentional to connect this example with the final topic: real IO.
Real IO (instead of just timers)
So, all these examples are great, but how do they actually relate to real asyncio where we want to wait on actual IO like TCP sockets and file reads/writes? Well, the beauty is in that _wait_until_io_ready
function. All we have to do to get real IO working is create some new AsyncReadFile
object similar to AsyncTimer
that contains a file descriptor. Then, the set of AsyncReadFile
objects that we are waiting on corresponds to a set of file descriptors. Finally, we can use the function (syscall) select() to wait until one of these file descriptors is ready. And since TCP/UDP sockets are implemented using file descriptors, this covers network requests too.
Conclusion
So there we have it, Python async from scratch. While we went in-depth with it, there's still a lot more nuances that we didn't cover. For instance, to call a generator async-like function from another generator function we would use yield from
, and we can return values from async
functions by passing parameters into .send(...)
. There are a whole host of other topics on asyncio specific constructs, and there are a great bunch of additional subtleties with things like async generators and cancelling tasks, but we'll leave all that for another day.
Let me know if you found this interesting and would like a followup that goes more in-depth. Also, I've only started using Python async a few weeks ago so do make sure to let me know if I've gotten anything wrong. Anyways, if you're still reading by this point, I admire your dedication and award you a golden internet-token of my appreciation.
Happy holidays and have a great new year!
- Matthew
Top comments (1)
Nice post, on point!
I created an complete coroutine library for PHP based on this concept at: symplely.github.io/coroutine/