A new-ish python design pattern

Or: How to improve your asyncio code!

So uh... yeah, this is just a definition. I'll quickly explain why this is. Also, I'll note that I wasn't brought up on this idea on my own, it's essentially something I found online but I think it's important to keep in mind that a lot of people who do asyncio don't use this pattern and spreading awareness of it will help a lot!

(I modeled this design document after the ones on oodesign.com, except y'know, I use Python code instead of Java)

Producer-Consumer-Worker pattern

Intent

When working in asynchronous code, usage of the producer-consumer pattern is common. The producer-consumer-worker pattern builds on this pattern.

The solution provided here is the Producer-Consumer-Worker pattern.

Implementation

This UML is probably wrong, but at it's simplest:

Applicability and examples

The reasoning for this is that the normal producer-consumer pattern is not useful for asynchronous code, as it is not resistant to potential exceptions. When running asynchronous code, one of the benefits is that even if one part of the event loop crashes, the rest of the loop can keep running. However in a traditional producer-consumer pattern, this means that in the situation the consumer crashes, the producer will aimlessly keep adding tasks to the queue without a consumer to answer it.

Specific problem and implementation

Example involving possibly crashing code

import asyncio
import random

queue = asyncio.Queue()

async def producer():
    task = {}
    task["crash"] = bool(random.getrandbits(1))
    await queue.put(task)

async def consumer():
    while True:
        task = await queue.get()
        loop = asyncio.get_event_loop()
        loop.create_task(task)

async def worker(task):
    if task["crash"]:
        raise Exception("Crashed!")
    print("Didn't crash!")

async def main():
    loop = asyncio.get_event_loop()
    loop.create_task(consumer())
    for _ in range(0, 10):
        loop.create_task(producer())

asyncio.run(main())
Explanation of methods and objects involved
queue

This is an asynchronous Queue, as provided by the asyncio standard library. It provides the queue that both the consumer and the worker use.

producer()

This asynchronous method determines on a random basis if the worker should crash or not, and then adds the output to the queue.

consumer()

This asynchronous method is started when the event loop begins and constantly waits for new input on the queue (This is what the await queue.get() call is for).

After that it creates a new task for the worker with the task as the assignment and adds it to the event loop.

worker(task)

This asynchronous method is created and started by the consumer. It simply runs a check on if it should crash, and if it does, it raises a generic Exception. Otherwise, it prints a message.

main()

Main asynchronous function and the entrypoint. It starts the consumer and creates 10 producers.

asyncio.run

stdlib function that starts main.

Conclusion

Without the worker involved in this task, if the asyncio consumer crashes (even though in this example it's arbitrary, in cases when working with web APIs which might not always be online) the entire consumer portion would be halted and as a result the loop would seemingly be able to continue fine but as the consumer would no longer be running, the producer will just add tasks to the queue without them ever being ran.

This also improves concurrency as the worker is another task on the event loop, which means that the consumer can continously keep retrieving tasks from the queue without it being blocked by the execution of said task.