Tag Archives: thread

Thread pool with coroutines: Threads (1/3)

Introduction

In this little series of articles, we are going to see how to implement a thread pool usable with coroutines. This series will contain these articles :

  1. Creating a Thread
  2. Creating the pool
  3. Using future with the pool

The final objective will be to be able to write something like that:

Choice of implementation for the thread pool

We will use the well-known work-stealing algorithm inside our thread pool. It implies that each thread has its own task queue and threads can steal tasks from each other. It will lead to concurrency with shared variables between threads, hence, we must be careful with data races.

To deal with data races, I decided to make some homemade helpers inspired by the Rust programming language.

Mutex

Here is the first helper I have made. A mutex protects a resource from data race. So we can make a template class to protect the template argument. We use a callback to operate on a protected variable.

Why do I use a shared mutex? I use a shared mutex because multiple readers are not an issue.

Condition variable

What are the events that can occured within a thread?

  1. The thread can be requested to stop
  2. The thread can have a new task to perform

To not use CPU resources when the thread is not fed (i.e, there is no task to run), I decided to use a condition variable. The idea is simple, on the waiting thread, you wait for an event, and you go out of the wait function when the predicate is satisfied, and in another thread, you notify the condition variable to wake up.

Since a condition variable is generally used with a Mutex, I decided to join them together through inheritance. Hence, a condition variable behaves like a mutex but can be waited on also.

You may wonder what is std::stop_token, it is simply a C++20 feature provided by std::jthread that avoid user to wait on an atomic boolean. Put it simply, a std::jthread, when it is destroyed, do two things:

  1. It calls request_stop to a std::stop_source that will notify the std::stop_token
  2. It joins the thread

An Awaiter

With coroutines, the task will not be a function, but a coroutine_handle which will be resumed. Hence, we need to have an object that manages this handle.

One will observe that we destroy the coroutine only if it was not resumed. It is a movable only type.

A thread safe queue

Now that we have our Awaiter object, we must push them into a thread-safe queue. The new tasks will be pushed into the queue, and the thread pool will pop them one by one.

Since the queue may be empty, the pop operation can return nothing, represented by a std::nullopt.

We have 3 operations possible.

  1. Push: this operation enqueue a new task and notify the condition variable
  2. Pop: This operation deque a task to be executed in the current thread.
  3. Wait for an element: This operation will make the current thread idle until we got a new task (notified by the push function)

The thread

It is time to design our thread class.

The thread class will be designed over the std::jthread class. It will also embed a thread-safe queue of Awaiters.

Thus, we can lay:

First, we can imagine what operation our thread must do:

  1. Adding tasks
  2. Schedule operation (thanks to the co_await operator)
  3. A background infinite loop that will pop tasks and execute them.

There is nothing complicated, the run methods just wait for an element, pop awaiters, execute them if they are valid and that’s all.

The co_await operator will just push the coroutine_handle to the thread thanks to the Awaitable object.

Using this thread

We schedule the operations thanks to the co_await operator.
Here is an example, the task is a basic promise that never suspends. It means that the coroutine frame is destroyed at the end of the function.

The operation behind the first co_await runs on the first thread, the operation behind the second co_await runs on the second thread. Really simple.

Conclusion

We finished the first article about creating a thread pool using coroutines. We introduced some utility classes and designed a concurrent queue. If you want to try, you can find a full code here.

Thanks to  Nir Friedman to help me design mutex and condition variable in a better way :).