Note: An
mdbook
version, organized into chapters, can be found here. Its content is licensed under CC (Creative Commons), and if you want to modify it to fit it for your own use, you can use the GitHub repository.
Outline
- Concurrency in a single thread
- Generators
- Coroutines
- Scheduling coroutines
Concurrency in a single thread
Traditionally, concurrency has been a responsibility handed down to the operating system. A single process can create multiple threads and the OS will run these threads across its (multiple) cores as it sees fit. It will interrupt a running thread, store its state, and load another previously interrupted thread for the computer to work on.
It’s often not worth doing this. Moreover, writing correct multi-threaded programs is quite hard as data races and deadlocks are easy to miss. We can solve this problem by adding another level of indirection.
Generators
Python has generators that can be created with the yield
statement. They are commonly
used to build iterators of some sequence.
|
|
Output:
a
b
c
You can also use generators to make infinite sequences, which you can stop when you want.
|
|
Output:
1
2
3
You can similarly also build a Fibonacci sequence with this.
|
|
@anandology took infinite sequences one step further, adding specialised operations for them, and making Python evaluate lazily. The result is something very interesting, more like Haskell than Python. It is out of scope of this blog, but you can check it out in this GitHub repository.
Coroutines
Coroutines are functions that can be paused and resumed. Generators do just that.
yield
keyword is called so because the function yields control of the CPU. This control is given back to the caller of the generator.
All Python generators have a .send()
method. In addition to continuing the generator like next()
does, this
method accepts an argument that is sent to the generator. This value can be accessed as the returned value
of an yield
statement.
We can build a worker that computes the square of its input.
def square(x):
return x * x
def get_worker():
x = yield
while True:
x = yield square(x)
def main():
worker = get_worker()
worker.send(None) # "prime" the coroutine
tasks = [1, 2, 3, 4, 5]
for task in tasks:
result = worker.send(task)
print(f"square({task}) = {result}")
main()
Output
square(1) = 1
square(2) = 4
square(3) = 9
square(4) = 16
square(5) = 25
Aside: A real-world example
I used this pattern recently to build a ChatGPT chatbot that takes prompts from my terminal.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
import openai # configure openai openai.organization = ... openai.api_key = ... def get_chatgpt(): messages = [] user_input = yield while True: messages.append({"role": "user", "message": user_input}) response = openai.ChatCompletion.create(messages=messages) chatgpt_message = response["choices"][0]["message"] # chatgpt api detail messages.append({"role": "assistant", "message": chatgpt_message}) user_input = yield chatgpt_message def main(): chatgpt = get_chatgpt() chatgpt.send(None) while True: # user_input need not come from "input()" only. # for example, it could be over a websocket connection user_input = input("Prompt:") result = chatgpt.send(user_input) print("ChatGPT says:", result)
Coroutines with return values
Coroutines (and generators) can return values too. This value can be accessed with
the .value
attribute of the StopIteration
exception that is raised when the
coroutine completes (the function returns).
We have two methods that translate a number to either its English word or its Tamil
word. There is do_io_work()
at the start where this coroutine yields.
|
|
We can get the return value of each like this:
|
|
Output:
1 in Tamil is onnu
Running two coroutines concurrently
Let’s spice up do_io_work()
a bit. Instead of letting it be a normal function, we’ll
have it be a generator itself, which yield
s a random number of times. This will give
code the uncertainty of real-life I/O operations.
|
|
Let’s use this with our number_to_tamil
coroutine, along with the number_to_english
coroutine.
|
|
I used yield from
, which takes a coroutine, then propagates all the
yields from it to the caller.
yield from coro
is the same asfor x in coro: yield x
We’ll hook all three up in our main()
function, to print
the English
and Tamil translations of numbers from 1 to 3.
def main():
for x in range(1, 4):
tamil_coro, english_coro = number_to_tamil(x), number_to_english(x)
tamil_word, english_word = None, None
while True:
if tamil_word is not None and english_word is not None:
break
if tamil_word is None:
try:
tamil_coro.send(None)
except StopIteration as e:
tamil_word = e.value
print(f"{x} in Tamil is {tamil_word}")
if english_word is None:
try:
english_coro.send(None)
except StopIteration as e:
english_word = e.value
print(f"{x} in English is {english_word}")
main()
Output (could have a different order):
1 in English is one
1 in Tamil is onnu
2 in Tamil is rendu
2 in English is two
3 in Tamil is moonu
3 in English is three
Notice the different order of English/Tamil translations in each iteration. We just ran two coroutines asynchronously.
Aside: Compatibility with asyncio
The code that we wrote is not a metaphor for what asyncio does. It is exactly what how asyncio works. We need to only wrap our coroutines in an
asyncio.coroutine
decorator to make it work withasync/await
.import asyncio @asyncio.coroutine def number_to_tamil(x): ... @asyncio.coroutine def number_to_english(x): ... async def main(): for x in range(1, 4): tamil_word = await number_to_tamil(x) english_word = await number_to_english(x) print(f"{x} in Tamil is {tamil_word}") print(f"{x} in English is {english_word}") asyncio.run(main())
Output:
1 in Tamil is onnu 1 in English is one 2 in Tamil is rendu 2 in English is two 3 in Tamil is moonu 3 in English is three
This is still sequential, because we are waiting for
number_to_tamil(x)
to return before we start executingnumber_to_english(x)
. We can change that by converting each coroutine to a task.While the
await coro
statement will add coro to the queue and also yield,asyncio.create_task(coro)
will only add the coroutine as a task to the queue, without yielding control. That’s why we can keep adding more tasks until we call anawait
on something.But to see this in action, we will need to modify our code slightly to
translate
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
import asyncio import random ... async def translate(language, number): match language: case "English": result = await number_to_english(number) case "Tamil": result = await number_to_tamil(number) case _: raise ValueError(f"Language {language} is not supported") print(f"{number} in {language} is {result}")
Now let’s modify our
main()
coroutine to use this.
1 2 3 4 5 6 7 8 9 10
async def main(): for x in range(1, 4): tamil_task = asyncio.create_task(translate("Tamil", x)) english_task = asyncio.create_task(translate("English", x)) # wait for both tasks to finish await tamil_task await english_task asyncio.run(main())
Output
1 in Tamil is onnu 1 in English is one 2 in English is two 2 in Tamil is rendu 3 in English is three 3 in Tamil is moonu
Scheduling coroutines
We saw a pattern when we were running the two coroutines concurrently. We tried running the first one, tried running the second one, and returned only when both had finished running.
We can setup a scheduler and put in a queue to run tasks one by one.
|
|
Notice that we aren’t storing the returned value anywhere. We’ll come to it later.
To test this out now, we will have to print
the results of the number_to_tamil
and number_to_english
coroutines as soon as they are returned.
We’ll create a new translate
function to do this.
|
|
Notice how the yield from
syntax makes the code a lot more straightforward.
Let’s use this in main()
now.
|
|
We can also add all the tasks first, and then run all the tasks at once:
|
|
Output (order may vary):
3 in English is three
1 in Tamil is onnu
1 in English is one
2 in Tamil is rendu
2 in English is two
3 in Tamil is moonu
We have built a scheduler that runs concurrently!
Keeping track of return value
We did it, but we aren’t keeping track of the return values of tasks. To fix this problem, we add another level of indirection.
All problems in computer science can be solved by another level of indirection.
We build a Task
wrapper for our coroutines. This will have add two things on top of coroutines:
- a
.status
attribute which can beNone
,queued
,running
, orfinished
. - a
.value
attribute which will store its return value
task.value
is only vaild when task.status
is "finished"
.
The scheduler will be responsible for setting .status
and .value
.
|
|
Change scheduler to work with tasks instead of coroutines:
|
|
Let’s see this working in an example:
|
|
We added a new function translate_big_number
, which translates a number with multiple digits.
|
|
Output:
12 in English is one two
12 in Tamil is onnu rendu
133 in English is one three three
133 in Tamil is onnu moonu moonu
121 in English is one two one
121 in Tamil is onnu rendu onnu
Bonus Can you build a coroutine
sleep(duration)
that sleeps, and allows other tasks to run for a certain duration?
In asyncio
, the scheduler is known as the event loop. Besides tasks, it also has “Futures”, which are analogous to
JavaScript promises. Read more about it here.
Conclusion
We looked at how to build coroutines, how they can be run concurrently, and how tasks can wait for other tasks to complete.
To get the most out of this, we need to be able to actually perform I/O asynchronously. This can be done purely in Python
with the selectors
module. This is out of scope for this
blog.
Programming language-level support for lightweight threads has a lot of benefits to offer. Besides better performance on blocking I/O, it simplifies writing concurrent code which is notoriously hard to write correctly and debug. Coroutines need not run on a single OS thread. In other languages like Go and OCaml, they can and do run on multiple OS threads as well.