Recently I encountered a batch processing task of processing thousands of images from S3. As the processing was relatively lightweight, most of the computation time was spent on downloading and uploading images, that is, I/O. Such I/O bound tasks are a great fit for multithreading, contrasting CPU-bound tasks better suited for multiprocessing with all its quirks related to interprocess communication. In this post, I'd like to share a small example how to run tasks in a thread pool.
We'll use ThreadPoolExecutor
to execute tasks in a configurable number of worker threads. One option would be to submit tasks to the pool with executor.submit()
. This method returns a concurrent.futures.Future
object, to which one can add callbacks with add_done_callback()
. However, callbacks are evil and it's best to avoid them if possible.
With asyncio
, we can instead use awaitable asyncio.future
objects and avoid callbacks.
Our toy task is defined as follows:
def execute_hello(ind):
print(f"Thread {threading.current_thread().name}: Starting task: {ind}...")
time.sleep(1)
print(f"Thread {threading.current_thread().name}: Finished task {ind}!")
return ind
The task will sleep one second between printing informative messages to the standard output. It takes an integer argument so we can keep track of which task is executing.
The entrypoint to our program looks as follows:
import asyncio
def execute_hello(ind):
...
async def main(tasks=20):
...
if __name__ == "__main__":
asyncio.run(main())
The asyncio.run
function executes the coroutine main
by starting an event loop. Note that you need Python > 3.7 to use asyncio.run
.
In the coroutine, we'll declare the ThreadPoolExecutor
with, for example, four worker threads:
MAX_WORKERS = 4
async def main(tasks=20):
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
...
Using ThreadPoolExecutor
as context manager ensures that the pool is shutdown when the context manager exits.
Now that we have the pool, we can submit tasks to it. Instead of using pool.submit
, we'll use loop.run_in_executor()
from asyncio
:
async def main(tasks=20):
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
loop = asyncio.get_running_loop()
futures = [
loop.run_in_executor(pool, execute_hello, task)
for task in range(tasks)
]
Function asyncio.get_running_loop()
returns the running event loop. We then create a future for each of the 20 tasks we want to run using loop.run_in_executor(pool, execute_hello, task)
.
So far, so good. If we now execute the script, we can see that the pool executes four tasks at a time and takes around five seconds to complete as expected. However, we're still missing any clean-up code and error handling.
To gather the results from the tasks, we'll use asyncio.gather
. This function can be used to await for multiple awaitables to finish and add error handling like this:
async def main(tasks=20):
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
loop = asyncio.get_running_loop()
futures = [
loop.run_in_executor(pool, execute_hello, task)
for task in range(tasks)
]
try:
results = await asyncio.gather(*futures, return_exceptions=False)
except Exception as ex:
print("Caught error executing task", ex)
raise
print(f"Finished processing, got results: {results}")
When our script now finishes, it prints the result from every task in order:
Finished processing, got results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Let's now test our error handling by modifying the task as follows:
def execute_hello(ind):
print(f"Thread {threading.current_thread().name}: Starting task: {ind}...")
time.sleep(1)
if ind == 2:
print("BOOM!")
raise Exception("Boom!")
print(f"Thread {threading.current_thread().name}: Finished task {ind}!")
return ind
If we now run the script, we notice that the execution continues until all tasks are run even though the third task in the first batch fails. This may or may not be what we want. In my case, I needed to avoid running new tasks after any single task failed. This can be accomplished by setting the global _shutdown
variable to True
when the first task fails and skipping performing work in any remaining tasks if _shutdown
is True
:
_shutdown = False
def execute_hello(ind):
if _shutdown:
print(f"Thread {threading.current_thread().name}: Skipping task {ind} as shutdown was requested")
return None
...
async def main(tasks=20):
global _shutdown
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
loop = asyncio.get_running_loop()
futures = [
loop.run_in_executor(pool, execute_hello, task)
for task in range(tasks)
]
try:
results = await asyncio.gather(*futures, return_exceptions=False)
except Exception as ex:
print("Caught error executing task", ex)
_shutdown = True
raise
print(f"Finished processing, got results: {results}")
Now any tasks that hadn't been started at the time of the first exception are skipped. Any tasks that had been started are still awaited to finish.
If you have any questions or comments, please leave one!
Top comments (1)
how can you simulate thousands of connections to a server using asyncio and threadpool.