Tag: coroutine

  • Thread pool with coroutines: Threads (1/3)

    Introduction

    In this little series of articles, we are going to see how to implement a thread pool usable with coroutines. This series will contain these articles :

    1. Creating a Thread
    2. Creating the pool
    3. Using future with the pool

    The final objective will be to be able to write something like that:

    ThreadPool threadPool;
    
    co_await threadPool;
    // Here we run on a threadPool thread
    
    auto future = schedule_on(threadPool, function, args...);
    
    co_await future;
    // Here we run on the asynchronous function thread

    Choice of implementation for the thread pool

    We will use the well-known work-stealing algorithm inside our thread pool. It implies that each thread has its own task queue and threads can steal tasks from each other. It will lead to concurrency with shared variables between threads, hence, we must be careful with data races.

    To deal with data races, I decided to make some homemade helpers inspired by the Rust programming language.

    Mutex

    Here is the first helper I have made. A mutex protects a resource from data race. So we can make a template class to protect the template argument. We use a callback to operate on a protected variable.

    #define FWD(x) std::forward<decltype(x)>(x)
    
    template <typename T> class Mutex {
    public:
      template <typename F> decltype(auto) with_lock(F &&f) {
        std::lock_guard lock{m_mutex};
        return std::invoke(FWD(f), m_value);
      }
      template <typename F> decltype(auto) with_lock(F &&f) const {
        std::shared_lock lock{m_mutex};
        return std::invoke(FWD(f), m_value);
      }
    
    protected:
      T m_value;
      mutable std::shared_mutex m_mutex;
    };

    Why do I use a shared mutex? I use a shared mutex because multiple readers are not an issue.

    Condition variable

    What are the events that can occured within a thread?

    1. The thread can be requested to stop
    2. The thread can have a new task to perform

    To not use CPU resources when the thread is not fed (i.e, there is no task to run), I decided to use a condition variable. The idea is simple, on the waiting thread, you wait for an event, and you go out of the wait function when the predicate is satisfied, and in another thread, you notify the condition variable to wake up.

    Since a condition variable is generally used with a Mutex, I decided to join them together through inheritance. Hence, a condition variable behaves like a mutex but can be waited on also.

    template <typename T> class ConditionVariable : public Mutex<T> {
    public:
      void notifyOne() { m_cond.notify_one(); }
      void notifyAll() { m_cond.notify_all(); }
    
      template <typename F> void wait(F f) {
        auto lock = std::unique_lock{this->m_mutex};
        m_cond.wait(lock, [&, this] { return std::invoke(f, this->m_value); });
      }
    
      template <typename F> void wait(F f, std::stop_token st) {
        auto lock = std::unique_lock{this->m_mutex};
        m_cond.wait(lock, st, [&, this] { return std::invoke(f, this->m_value); });
      }
    
    private:
      std::condition_variable_any m_cond;
    };

    You may wonder what is std::stop_token, it is simply a C++20 feature provided by std::jthread that avoid user to wait on an atomic boolean. Put it simply, a std::jthread, when it is destroyed, do two things:

    1. It calls request_stop to a std::stop_source that will notify the std::stop_token
    2. It joins the thread

    An Awaiter

    With coroutines, the task will not be a function, but a coroutine_handle which will be resumed. Hence, we need to have an object that manages this handle.

    struct Awaiter {
    public:
      Awaiter() {}
    
      ~Awaiter() {
        if (m_handle)
          m_handle.destroy();
      }
    
      template <typename... Args>
      Awaiter(std::coroutine_handle<Args...> handle) : m_handle{handle} {}
    
      Awaiter(const Awaiter &) = delete;
      Awaiter(Awaiter &&a) : m_handle{a.m_handle} { a.m_handle = nullptr; }
    
    
      void resume() {
        m_handle();
        m_handle = nullptr;
      }
    
    private:
      std::coroutine_handle<> m_handle = nullptr;
    };
    

    One will observe that we destroy the coroutine only if it was not resumed. It is a movable only type.

    A thread safe queue

    Now that we have our Awaiter object, we must push them into a thread-safe queue. The new tasks will be pushed into the queue, and the thread pool will pop them one by one.

    Since the queue may be empty, the pop operation can return nothing, represented by a std::nullopt.

    template <typename T> class ThreadSafeQueue {
    public:
      void push(T t) {
        m_queue.with_lock([&](auto &queue) { queue.push(std::move(t)); });
        m_queue.notifyOne();
      }
    
      std::optional<T> pop() {
        std::optional<T> x;
        m_queue.with_lock([&](auto &queue) {
          if (!queue.empty()) {
            x.emplace(std::move(queue.front()));
            queue.pop();
          }
        });
        return x;
      }
    
      void waitForAnElement(std::stop_token st) {
        auto hasElement = [](const auto &x) { return !x.empty(); };
        m_queue.wait(hasElement, st);
      }
    
    private:
      ConditionVariable<std::queue<T>> m_queue;
    };

    We have 3 operations possible.

    1. Push: this operation enqueue a new task and notify the condition variable
    2. Pop: This operation deque a task to be executed in the current thread.
    3. Wait for an element: This operation will make the current thread idle until we got a new task (notified by the push function)

    The thread

    It is time to design our thread class.

    The thread class will be designed over the std::jthread class. It will also embed a thread-safe queue of Awaiters.

    Thus, we can lay:

    class Thread {
    private:
      ThreadSafeQueue<Awaiter> m_awaiters;
      std::jthread m_thread;
    };

    First, we can imagine what operation our thread must do:

    1. Adding tasks
    2. Schedule operation (thanks to the co_await operator)
    3. A background infinite loop that will pop tasks and execute them.
    public:
      Thread() {
        m_thread = std::jthread([this](std::stop_token st) { run(st); });
      }
    
      void addAwaiter(Awaiter &&awaiter) { m_awaiters.push(std::move(awaiter)); }
    
      auto id() { return m_thread.get_id(); }
    
      Awaitable operator co_await() { return {*this}; }
    
    private:
      void run(std::stop_token st) {
        while (!st.stop_requested()) {
          m_awaiters.waitForAnElement(st);
    
          auto awaiter = m_awaiters.pop();
    
          if (awaiter)
            awaiter->resume();
        }
      }

    There is nothing complicated, the run methods just wait for an element, pop awaiters, execute them if they are valid and that’s all.

    The co_await operator will just push the coroutine_handle to the thread thanks to the Awaitable object.

      struct Awaitable {
        Thread &thread;
        bool await_ready() { return false; }
    
        void await_suspend(std::coroutine_handle<> handle) {
          thread.addAwaiter({handle});
        }
    
        void await_resume() {}
      };

    Using this thread

    We schedule the operations thanks to the co_await operator.
    Here is an example, the task is a basic promise that never suspends. It means that the coroutine frame is destroyed at the end of the function.

    struct task {
      struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() noexcept { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() noexcept {}
    
        ~promise_type() {}
      };
    
      ~task() {}
    };
    
    std::atomic_int x = 0;
    
    std::atomic_int done = 0;
    
    task f(Thread &thread1, Thread &thread2) {
      co_await thread1;
      ++x;
    
      co_await thread2;
      ++x;
      ++done;
    }

    The operation behind the first co_await runs on the first thread, the operation behind the second co_await runs on the second thread. Really simple.

    Conclusion

    We finished the first article about creating a thread pool using coroutines. We introduced some utility classes and designed a concurrent queue. If you want to try, you can find a full code here.

    Thanks to  Nir Friedman to help me design mutex and condition variable in a better way :).

  • Create a future with coroutine in C++

    I didn’t write on this blog for years now. Here is what I am doing right now. I am a C++ software engineer in Diagdev, a hematology-related company based in the south of France near Montpellier. I also developed a little library: Little Type Library. The goal of this library is to add range and concept like to C++17. It also brings other functional features with the pipe notation or the >> notation. We will see how to create a future using coroutine

    What is a coroutine?

    A coroutine, even if the name is complicated, is a simple thing.
    Functions begin, do things, and end. A coroutine is a kind of generalization over functions and they can be stopped and resumed in the same, or better, in another thread.

    A coroutine can be stopped either at the beginning, during the processing, or at the end.

    What is the problem with conventionnal future?

    Let’s say we have this code.

    #include <future>
    
    int main() {
        auto performSomething = []{};
        auto future = std::async(performSomething);
        
        doSomething();
        
        future.wait();
    }

    If performSomething and doSomething last the same duration, there is no issue, but if performSomething lasts longer, the future.wait() will make the main thread stalls during some instant, and so, you lose performance. If the performSomething lasts longer, there is a thread that does not do anything and so, we lose performance.

    Monadic expression is a way to solve it. The idea is to give the future one callback that will be automatically called when the asynchronous function ends. If you want to know more about this, I wrote an article a few years ago.

    How coroutines can solve the problem

    Actually, my solution using coroutine is like monadic expression. However, instead of giving a callback, you write the function normally.

    Here is the code we want to achieve.

    int square(int x) {
      return x * x;
    }
    
    task f() {
      auto squared6 = co_await async(square, 6);
    
      std::cout << squared6 << std::endl;
    }
    
    int main() {
      std::jthread thread;
      ::thread = &thread;
    
      f();
    
      doSomethingHeavy();
      return 0;
    }

    I don’t manage in a clean way my threads to be simpler. I used jthread instead of thread for its auto-join. This code is simple. The main function call f which is a coroutine. The coroutine f calls async with square and 6 as arguments. Async returns an awaitable awaited in f. The await launch square in another thread. f saves its state and returns to main. Once the thread finishes computing square, it resumes the coroutine f, and prints the squared value.

    Yes you read it properly, the std::cout << squared6 << std::endl is executed in the same thread as square.

    So, let’s create a future using coroutine !

    Async coroutine

    The job of the async coroutine is to return a future and launch the given function when we co_await the future. So here is the code :

    template <typename F, typename... Args>
    future<std::invoke_result_t<F, Args...>> async(F f, Args... args) {
      std::cout << "async" << std::endl;
      co_return f(args...);
    }

    Since we use the co_return keyword, async is a coroutine. We don’t want to execute f directly, so we need to stop the coroutine at the beginning. We also need to stop at the end to avoid use after free. Since the coroutine is stopped before its beginning, we need to save the result of the function. To finish, the future has the handle of the coroutine to be able to resume it.

    Here is the full code of future:

    template <typename T> struct future {
      struct promise_type {
        T value;
        future get_return_object() {
          return {std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        std::suspend_always initial_suspend() noexcept {
          return {};
        }
        std::suspend_always final_suspend() noexcept {
          return {};
        }
        void return_value(T x) {
          value = std::move(x);
        }
        void unhandled_exception() noexcept {}
      };
    
      std::coroutine_handle<promise_type> coro;
    
      future(std::coroutine_handle<promise_type> coro) : coro{coro} {}
    
      ~future() {
        if (coro)
          coro.destroy();
      }
    
      AwaitableFuture operator co_await() {
        return {*this};
      }
    };

    We see that the future has an co_await operator. This operator needs an awaitable object. An awaitable has 3 methods.

    • await_ready: to know if we must stop the coroutine or not
    • await_suspend: Called when we suspend the coroutine, it is here we will launch the other thread.
    • await_resume: To return the result to the called of co_await.

    For our future, the awaitable will look like that:

      struct AwaitableFuture {
        future &m_future;
        bool await_ready() const noexcept { return false; }
    
        void await_suspend(std::coroutine_handle<> handle) {
          *thread = std::jthread([this, handle] {
            m_future.coro.resume();
            handle.resume();
          });
        }
    
        T await_resume() {
          return m_future.coro.promise().value;
        }
      };

    Why do we need to call resume twice? Let’s remind the usage of the future :

    task f() {
      auto squared6 = co_await async(square, 6);
    
      std::cout << squared6 << std::endl;
    }

    f is a coroutine, but async also! The future coroutine is to resume the async one, and the await_suspend one is to resume f. The order is important, else, you will not compute the value before resuming f.

    Since f is a coroutine, here is the simple code for the task :

    
    struct task {
      struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() noexcept { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() noexcept {}
      };
    };

    Here is a full code with an online compiler if you want to try it.

    Conclusion

    I hope you understand this article well, and if you have some questions, don’t hesitate to ask :). For the future, I could write an article about how to create a when_all using coroutines, or how to use a thread pool with coroutines.

    Thanks for reading !