Note: An mdbook version, organized into chapters, can be found here. Its content is licensed under CC (Creative Commons), and if you want to modify it to fit it for your own use, you can use the GitHub repository.

Outline

  • Concurrency in a single thread
  • Generators
  • Coroutines
  • Scheduling coroutines

Concurrency in a single thread

Traditionally, concurrency has been a responsibility handed down to the operating system. A single process can create multiple threads and the OS will run these threads across its (multiple) cores as it sees fit. It will interrupt a running thread, store its state, and load another previously interrupted thread for the computer to work on.

It’s often not worth doing this. Moreover, writing correct multi-threaded programs is quite hard as data races and deadlocks are easy to miss. We can solve this problem by adding another level of indirection.


Generators

Python has generators that can be created with the yield statement. They are commonly used to build iterators of some sequence.

1
2
3
4
5
6
7
8
def abc_generator():
    yield "a"
    yield "b"
    yield "c"

abc = abc_generator()
for letter in abc:
    print(letter)

Output:

a
b
c

You can also use generators to make infinite sequences, which you can stop when you want.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def natural_numbers_generator():
    i = 1
    while True:
        yield i
        i = i + 1

natural_numbers = natural_numbers_generator()
print(next(natural_numbers))
print(next(natural_numbers))
print(next(natural_numbers))

Output:

1
2
3

You can similarly also build a Fibonacci sequence with this.

1
2
3
4
5
6
7
def fib():
    a, b = 0, 1
    while 1:
        yield b
        a, b = b, a + b

# sequence: 1, 1, 2, 3, 5, ...

@anandology took infinite sequences one step further, adding specialised operations for them, and making Python evaluate lazily. The result is something very interesting, more like Haskell than Python. It is out of scope of this blog, but you can check it out in this GitHub repository.


Coroutines

Coroutines are functions that can be paused and resumed. Generators do just that.

yield keyword is called so because the function yields control of the CPU. This control is given back to the caller of the generator.

All Python generators have a .send() method. In addition to continuing the generator like next() does, this method accepts an argument that is sent to the generator. This value can be accessed as the returned value of an yield statement.

We can build a worker that computes the square of its input.

def square(x):
    return x * x

def get_worker():
    x = yield
    while True:
        x = yield square(x)

def main():
    worker = get_worker()
    worker.send(None)  # "prime" the coroutine
    tasks = [1, 2, 3, 4, 5]
    for task in tasks:
        result = worker.send(task)
        print(f"square({task}) = {result}")

main()

Output

square(1) = 1
square(2) = 4
square(3) = 9
square(4) = 16
square(5) = 25

Aside: A real-world example

I used this pattern recently to build a ChatGPT chatbot that takes prompts from my terminal.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import openai

# configure openai
openai.organization = ...
openai.api_key = ...

def get_chatgpt():
    messages = []
    user_input = yield
    while True:
        messages.append({"role": "user", "message": user_input})
        response = openai.ChatCompletion.create(messages=messages)
        chatgpt_message = response["choices"][0]["message"]  # chatgpt api detail
        messages.append({"role": "assistant", "message": chatgpt_message})
        user_input = yield chatgpt_message

def main():
    chatgpt = get_chatgpt()
    chatgpt.send(None)

    while True:
        # user_input need not come from "input()" only.
        # for example, it could be over a websocket connection
        user_input = input("Prompt:")
        result = chatgpt.send(user_input)
        print("ChatGPT says:", result)

Coroutines with return values

Coroutines (and generators) can return values too. This value can be accessed with the .value attribute of the StopIteration exception that is raised when the coroutine completes (the function returns).

We have two methods that translate a number to either its English word or its Tamil word. There is do_io_work() at the start where this coroutine yields.

1
2
3
4
5
6
7
def number_to_tamil(x):
    yield do_io_work()
    match x:
        case 1: return "onnu"
        case 2: return "rendu"
        case 3: return "moonu"
        case _: raise ValueError("x must be in [1,3]")

We can get the return value of each like this:

1
2
3
4
5
6
7
8
tamil = number_to_tamil(1)
tamil.send(None)  # "prime" the coroutine

try:
    while True:
        tamil.send(None)
except StopIteration as e:
    print("1 in Tamil is", e.value)

Output:

1 in Tamil is onnu

Running two coroutines concurrently

Let’s spice up do_io_work() a bit. Instead of letting it be a normal function, we’ll have it be a generator itself, which yields a random number of times. This will give code the uncertainty of real-life I/O operations.

1
2
3
4
5
6
import random

def do_io_work():
    # yield a random number of times
    for i in range(random.randint(1, 5)):
        yield

Let’s use this with our number_to_tamil coroutine, along with the number_to_english coroutine.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def number_to_tamil(x):
    yield from do_io_work()
    match x:
        case 1: return "onnu"
        case 2: return "rendu"
        case 3: return "moonu"
        case _: raise ValueError("x must be in [1,3]")

def number_to_english(x):
    yield from do_io_work()
    match x:
        case 1: return "one"
        case 2: return "two"
        case 3: return "three"
        case _: raise ValueError("x must be in [1,3]")

I used yield from, which takes a coroutine, then propagates all the yields from it to the caller.

yield from coro is the same as for x in coro: yield x

We’ll hook all three up in our main() function, to print the English and Tamil translations of numbers from 1 to 3.

def main():
    for x in range(1, 4):
        tamil_coro, english_coro = number_to_tamil(x), number_to_english(x)
        tamil_word, english_word = None, None
        while True:
            if tamil_word is not None and english_word is not None:
                break

            if tamil_word is None:
                try:
                    tamil_coro.send(None)
                except StopIteration as e:
                    tamil_word = e.value
                    print(f"{x} in Tamil is {tamil_word}")

            if english_word is None:
                try:
                    english_coro.send(None)
                except StopIteration as e:
                    english_word = e.value
                    print(f"{x} in English is {english_word}")

main()

Output (could have a different order):

1 in English is one
1 in Tamil is onnu
2 in Tamil is rendu
2 in English is two
3 in Tamil is moonu
3 in English is three

Notice the different order of English/Tamil translations in each iteration. We just ran two coroutines asynchronously.


Aside: Compatibility with asyncio

The code that we wrote is not a metaphor for what asyncio does. It is exactly what how asyncio works. We need to only wrap our coroutines in an asyncio.coroutine decorator to make it work with async/await.

import asyncio

@asyncio.coroutine
def number_to_tamil(x):
    ...

@asyncio.coroutine
def number_to_english(x):
    ...

async def main():
    for x in range(1, 4):
        tamil_word = await number_to_tamil(x)
        english_word = await number_to_english(x)

        print(f"{x} in Tamil is {tamil_word}")
        print(f"{x} in English is {english_word}")

asyncio.run(main())

Output:

1 in Tamil is onnu
1 in English is one
2 in Tamil is rendu
2 in English is two
3 in Tamil is moonu
3 in English is three

This is still sequential, because we are waiting for number_to_tamil(x) to return before we start executing number_to_english(x). We can change that by converting each coroutine to a task.

While the await coro statement will add coro to the queue and also yield, asyncio.create_task(coro) will only add the coroutine as a task to the queue, without yielding control. That’s why we can keep adding more tasks until we call an await on something.

But to see this in action, we will need to modify our code slightly to print values as soon as they are computed. Let’s add this logic in a new coroutine we’ll call translate.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio
import random

...

async def translate(language, number):
    match language:
        case "English":
            result = await number_to_english(number)
        case "Tamil":
            result = await number_to_tamil(number)
        case _:
            raise ValueError(f"Language {language} is not supported")

    print(f"{number} in {language} is {result}")

Now let’s modify our main() coroutine to use this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
async def main():
    for x in range(1, 4):
        tamil_task = asyncio.create_task(translate("Tamil", x))
        english_task = asyncio.create_task(translate("English", x))

        # wait for both tasks to finish
        await tamil_task
        await english_task

asyncio.run(main())

Output

1 in Tamil is onnu
1 in English is one
2 in English is two
2 in Tamil is rendu
3 in English is three
3 in Tamil is moonu

Scheduling coroutines

We saw a pattern when we were running the two coroutines concurrently. We tried running the first one, tried running the second one, and returned only when both had finished running.

We can setup a scheduler and put in a queue to run tasks one by one.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import queue

class Scheduler:
    def __init__(self):
        self.tasks = queue.Queue()

    def add_task(self, coro):
        self.tasks.push(coro)

    def run(self):
        while not queue.Empty():
            task = self.tasks.pop()
            try:
                task.send(None)
            except StopIteration:
                pass
            else:
                # task didn't raise StopIteration, push it back to queue
                self.tasks.pop(task)

Notice that we aren’t storing the returned value anywhere. We’ll come to it later. To test this out now, we will have to print the results of the number_to_tamil and number_to_english coroutines as soon as they are returned.

We’ll create a new translate function to do this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def translate(language, number):
    match language:
        case "English":
            result = yield from number_to_english(number)
        case "Tamil":
            result = yield from number_to_tamil(number)
        case _:
            raise ValueError(f"Language {language} is not supported")

    print(f"{number} in {language} is {result}")

Notice how the yield from syntax makes the code a lot more straightforward.

Let’s use this in main() now.

1
2
3
4
5
6
7
8
def main():
    scheduler = Scheduler()
    for x in range(1, 4):
        scheduler.add_task(translate("Tamil", x))
        scheduler.add_task(translate("English", x))
        scheduler.run()

main()

We can also add all the tasks first, and then run all the tasks at once:

1
2
3
4
5
6
7
8
def main():
    scheduler = Scheduler()
    for x in range(1, 4):
        scheduler.add_task(translate("Tamil", x))
        scheduler.add_task(translate("English", x))
    scheduler.run()

main()

Output (order may vary):

3 in English is three
1 in Tamil is onnu
1 in English is one
2 in Tamil is rendu
2 in English is two
3 in Tamil is moonu

We have built a scheduler that runs concurrently!

Keeping track of return value

We did it, but we aren’t keeping track of the return values of tasks. To fix this problem, we add another level of indirection.

All problems in computer science can be solved by another level of indirection.

We build a Task wrapper for our coroutines. This will have add two things on top of coroutines:

  • a .status attribute which can be None, queued, running, or finished.
  • a .value attribute which will store its return value

task.value is only vaild when task.status is "finished".

The scheduler will be responsible for setting .status and .value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class Task:
    def __init__(self, coro, status=None):
        self.coro = coro
        self.status = status
        self.value = None

    def get_value(self):
        if self.status == "finished":
            return self.value
        raise Exception("Task has not finished")

Change scheduler to work with tasks instead of coroutines:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Scheduler:
    def __init__(self):
        self.task_q = queue.Queue()

    def add_task(self, task):
        assert isinstance(task, Task), "task must be an instance of Task"

        task.status = "queued"
        self.task_q.put(task)

    def run(self):
        while not self.task_q.empty():
            task = self.task_q.get()
            task.status = "running"
            try:
                task.coro.send(None)
            except StopIteration as e:
                task.value = e.value
                task.status = "finished"
            else:
                self.task_q.put(task)
                task.status = "queued"

Let’s see this working in an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def translate_big_number(language, n):
    match language:
        case "English":
            translator = number_to_english
        case "Tamil":
            translator = number_to_tamil
        case _:
            raise ValueError(f"Language {language} is unsupported")

    digits = [int(d) for d in str(n)]
    result = ""
    for digit in digits:
        result += yield from translator(digit)
        result += " "

    return result

We added a new function translate_big_number, which translates a number with multiple digits.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def main():
    scheduler = Scheduler()

    for x in [12, 133, 120]:
        task1 = Task(translate_big_number("English", x))
        task2 = Task(translate_big_number("Tamil", x))

        scheduler.add_task(task1)
        scheduler.add_task(task2)
        scheduler.run()

        print(f"{x} in English is {task1.value}")
        print(f"{x} in Tamil is {task2.value}")

Output:

12 in English is one two
12 in Tamil is onnu rendu
133 in English is one three three
133 in Tamil is onnu moonu moonu
121 in English is one two one
121 in Tamil is onnu rendu onnu

Bonus Can you build a coroutine sleep(duration) that sleeps, and allows other tasks to run for a certain duration?

In asyncio, the scheduler is known as the event loop. Besides tasks, it also has “Futures”, which are analogous to JavaScript promises. Read more about it here.


Conclusion

We looked at how to build coroutines, how they can be run concurrently, and how tasks can wait for other tasks to complete.

To get the most out of this, we need to be able to actually perform I/O asynchronously. This can be done purely in Python with the selectors module. This is out of scope for this blog.

Programming language-level support for lightweight threads has a lot of benefits to offer. Besides better performance on blocking I/O, it simplifies writing concurrent code which is notoriously hard to write correctly and debug. Coroutines need not run on a single OS thread. In other languages like Go and OCaml, they can and do run on multiple OS threads as well.