Category: coroutines

  • C++ error handling, let’s abuse the co_await operator

    Introduction

    Sometimes (very rarely :-p), errors may happen. It can be due to misuse from the user, it can be due to a system error, a corrupted, or a missing file. Errors can pop from everywhere. There are several ways to handle errors. In this article, we are going to see how coroutine may be used for error handling, so let’s abuse the co_await operator

    Old ways to handle errors

    One of the first ways to handle errors used (AFAIK) was the error code return. The idea is simple, you return OK when the function executes properly, else you return an error code, like OUT_OF_RANGE, FILE_NOT_EXIST

    You ended with code like:

    enum class error_code { OK, ERROR };
    
    error_code g(bool success, int &value) {
      if (!success)
        return error_code::ERROR;
      value = 5;
      return error_code::OK;
    }
    
    error_code f(bool success, int &value) {
      if (g(success, value) == error_code::OK) {
        value *= 2;
        return error_code::OK;
      }
      return error_code::ERROR;
    }
    
    int main() {
      int value;
    
      if (f(false, value) == error_code::ERROR) {
        std::cout << "error!" << std::endl;
      }
    
      if (f(true, value) == error_code::OK) {
        std::cout << value << std::endl;
      }
    }
    

    As expected, the code will write error! and 10.

    The advantage of this way is that it is really explicit, you know which function can fail. A disadvantage is that there is no propagation of errors. You must treat error in the same place that you treat the correct path. Another problem is it can lead to severe problems if the client of your function does not check if it fails or not. Believe me, programmers are somewhat lazy, and they may forget to check the return value and they will process the result like if it was correct. For example, let’s imagine you developed an initialize() function, it returns fails, but you use the object not initialized. it will, sooner or later, lead to a severe failure.

    Another way to process errors is the exception. When you detect an error, you throw the error, and when you want to process the error, you catch it. It solves the problems both of error propagation and the use of non initialized objects we saw prior. The code will look like that:

    #include <stdexcept>
    
    int g(bool success) {
      if (!success)
        throw std::runtime_error("Error !");
      return 5;
    }
    
    int f(bool success) { return g(success) * 2; }
    
    int main() {
      try {
        std::cout << f(true) << std::endl;
        std::cout << f(false) << std::endl;
      } catch (const std::exception &exception) {
        std::cout << exception.what() << std::endl;
      }
    }

    The code is shorter, and the error handling is done when you need it. However, it is difficult to know if the function f can fail if we don’t have the code of g(). We can doubt because f is not noexcept, but that’s all, and it does not give us so much information about the possible error.

    A modern way for error-handling thanks to ADT

    There is a proposal for an expected object for C++23. Basically, std::excpected is a template object which takes 2 arguments.

    1. The result type
    2. The error type

    It is a sum-type, so it can contain either the result or the error. However, the std::expected way to handle errors will be a bit like the error code, you don’t get automatic propagation. However, you may use the functional approaches that simulate the error propagation:

    expected<int, std::string> g(bool success) {
        if(success)
            return 5;
        return "Error!";
    }
    
    expected<int, std::string> f(bool success) {
        return g(success).map([](auto x){return x * 2;});   
    }
    
    int main() {
        auto a = f(false);
        auto b = f(true);
        
        if(a.is_error())
            std::cout << a.error() << std::endl;
        if(b.is_result())
            std::cout << b.result() << std::endl;
    }

    The map function will be executed only if g(success) is not an error, if it is an error, the error will be propagated to the caller.

    All the lambda thing is very good, works perfectly, is pretty_fast and readable. However, in some cases, it can become cumbersome.

    In the rust language programming, we would write something like:

    fn f(success: bool) -> Result<i32, String> {
        let value = g(success)?;
        value * 2
    }

    Note the presence of the operator ?. It means, if g(success) succeed, so continue the execution, else, stops the execution and propagates the error to the caller.

    Did this story of stopping and continue the execution reminds you something?

    Let’s abuse the co_await operator !

    The objective will be to be able to write something like:

    Expected<int, std::string> g(bool success) {
      if (!success)
        return "Error !!";
      return 5;
    }
    
    Expected<int, std::string> f(bool success) {
      int result = co_await g(success);
      co_return result * 2;
    }

    You can even imagine a macro try or TRY to make things even better :p. But be careful if you are using exceptions :).

    Let’s design a simple Expected class.

    template <typename Result, typename Error> class Expected {
    public:
      template <typename T> Expected(T t) : m_value{std::move(t)} {}
    
      bool is_result() const { return m_value.index() == 1; }
      Result result() const { return *std::get_if<1>(&m_value); }
    
      bool is_error() const { return m_value.index() == 2; }
      Error error() const { return *std::get_if<2>(&m_value); }
    
      template <typename T> Expected &operator=(T t) {
        m_value = std::move(t);
        return *this;
      }
    
    private:
      std::variant<std::monostate, Result, Error> m_value;
    };

    I didn’t use reference qualified member for the sake of simplicity. However, in a production code, to have the best performance, you must use them to avoid useless copy etc.

    I use a std::variant with std::monostate because we are going to need it later. So, basically, we have a class that represents either a result or an error. You have a function to ask which value is carried by the Expected and you have a function to retrieve the result or the error.

    As we said before, Expected is meant to be used with coroutines. It must have a nested promise_type

    The promise_type

    We remind that the promise_type must have 5 member functions.

    1. get_return_object() which will return an expected
    2. return_value() / return_void() which will handle the co_return operator.
    3. initial_suspend and final_suspend that handle the beginning and the end of the coroutine
    4. unhandled_exception that handles unhandled exceptions.

    In our example, unhandled_exception will do nothing for the sake of simplicity. initial_suspend and final_suspend will be of std::suspend_never because when we launch the function, we want it to not be paused, and when we exit the function, we expect everything to be cleared properly.

    Let’s talk about the get_return_object() and return_value(). We are going to begin with return_value(). Its prototype will be something like void return_value(Expected result). We can write different overloads for Result and Error and their reference qualified friends, but for the sake of simplicity, again, I chose to have only the Expected overload :-).

    We must do something with this result, we must set the current expected with this value. To do that, I decided to use a pointer on the current Expected instance.

    For the get_return_object function, things are not that easy. You must be able to construct an expected without an error or a result. Moreover, you must initialize the pointer to the expected in the promise_type.

    Then, I added a private constructor to the Expected object.

    private:
      Expected(promise_type &promise) noexcept { promise.expected = this; }
    

    The promise_type is as we described prior.

      struct promise_type {
        Expected *expected;
        Expected get_return_object() { return {*this}; }
        void return_value(Expected result) { *expected = std::move(result); }
    
        std::suspend_never initial_suspend() const noexcept { return {}; }
        std::suspend_never final_suspend() const noexcept { return {}; }
    
        void unhandled_exception() noexcept {}
      };

    However, be careful with your get_return_object function. Here it works because of guaranteed copy elision. If there was no elision, you will get a segmentation fault(in the best case) because the Expected address will not be the same 🙂

    Our Expected object can be co_returned from a coroutine, but it can not be co_awaited . So, let’s abuse the co_await operator !

    Awaiter

    To make our Expected class awaitable, we must define an Awaiter class.

    As a reminder, an awaiter must have three functions.

    1. await_ready: which returns a bool to know if we can continue the execution, or suspend it.
    2. await_resume: which returns the type wanted from co_await x.
    3. await_suspend: which is called when a coroutine is suspended.

    The await_ready is really simple. If the expected is an error, we suspend, else, we continue. The await_resume function just returns the result. The await_suspend function is the most complicated! It is called when we have an error, it must give the error to the expected returned by get_current_object. Moreover, it must destroys the current coroutine.

    Hence, here is the code for Awaiter class and the operator co_await:

      struct Awaiter {
        Expected expected;
    
        bool await_ready() { return expected.is_result(); }
        Result await_resume() { return expected.result(); }
        void await_suspend(std::coroutine_handle<promise_type> handle) {
          *handle.promise().expected = expected;
          handle.destroy();
        }
      };
    
      Awaiter operator co_await() { return Awaiter{*this}; }

    Again, I do not manage reference qualified methods. You must do it in production code. Here is the full code if you want it.

    Performance ?

    It is purely one hypothesis, but I do believe that in the future, the compiler could be optimized out of this kind of code. Indeed, from cpp-reference, we can read :

    • The lifetime of the coroutine state is strictly nested within the lifetime of the caller, and
    • the size of coroutine frame is known at the call site

    The first is obvious, the second I think yes, but I am not sure, that is why it is one hypothesis.

    Thanks for reading :).

  • Thread pool with coroutines: Threads (1/3)

    Introduction

    In this little series of articles, we are going to see how to implement a thread pool usable with coroutines. This series will contain these articles :

    1. Creating a Thread
    2. Creating the pool
    3. Using future with the pool

    The final objective will be to be able to write something like that:

    ThreadPool threadPool;
    
    co_await threadPool;
    // Here we run on a threadPool thread
    
    auto future = schedule_on(threadPool, function, args...);
    
    co_await future;
    // Here we run on the asynchronous function thread

    Choice of implementation for the thread pool

    We will use the well-known work-stealing algorithm inside our thread pool. It implies that each thread has its own task queue and threads can steal tasks from each other. It will lead to concurrency with shared variables between threads, hence, we must be careful with data races.

    To deal with data races, I decided to make some homemade helpers inspired by the Rust programming language.

    Mutex

    Here is the first helper I have made. A mutex protects a resource from data race. So we can make a template class to protect the template argument. We use a callback to operate on a protected variable.

    #define FWD(x) std::forward<decltype(x)>(x)
    
    template <typename T> class Mutex {
    public:
      template <typename F> decltype(auto) with_lock(F &&f) {
        std::lock_guard lock{m_mutex};
        return std::invoke(FWD(f), m_value);
      }
      template <typename F> decltype(auto) with_lock(F &&f) const {
        std::shared_lock lock{m_mutex};
        return std::invoke(FWD(f), m_value);
      }
    
    protected:
      T m_value;
      mutable std::shared_mutex m_mutex;
    };

    Why do I use a shared mutex? I use a shared mutex because multiple readers are not an issue.

    Condition variable

    What are the events that can occured within a thread?

    1. The thread can be requested to stop
    2. The thread can have a new task to perform

    To not use CPU resources when the thread is not fed (i.e, there is no task to run), I decided to use a condition variable. The idea is simple, on the waiting thread, you wait for an event, and you go out of the wait function when the predicate is satisfied, and in another thread, you notify the condition variable to wake up.

    Since a condition variable is generally used with a Mutex, I decided to join them together through inheritance. Hence, a condition variable behaves like a mutex but can be waited on also.

    template <typename T> class ConditionVariable : public Mutex<T> {
    public:
      void notifyOne() { m_cond.notify_one(); }
      void notifyAll() { m_cond.notify_all(); }
    
      template <typename F> void wait(F f) {
        auto lock = std::unique_lock{this->m_mutex};
        m_cond.wait(lock, [&, this] { return std::invoke(f, this->m_value); });
      }
    
      template <typename F> void wait(F f, std::stop_token st) {
        auto lock = std::unique_lock{this->m_mutex};
        m_cond.wait(lock, st, [&, this] { return std::invoke(f, this->m_value); });
      }
    
    private:
      std::condition_variable_any m_cond;
    };

    You may wonder what is std::stop_token, it is simply a C++20 feature provided by std::jthread that avoid user to wait on an atomic boolean. Put it simply, a std::jthread, when it is destroyed, do two things:

    1. It calls request_stop to a std::stop_source that will notify the std::stop_token
    2. It joins the thread

    An Awaiter

    With coroutines, the task will not be a function, but a coroutine_handle which will be resumed. Hence, we need to have an object that manages this handle.

    struct Awaiter {
    public:
      Awaiter() {}
    
      ~Awaiter() {
        if (m_handle)
          m_handle.destroy();
      }
    
      template <typename... Args>
      Awaiter(std::coroutine_handle<Args...> handle) : m_handle{handle} {}
    
      Awaiter(const Awaiter &) = delete;
      Awaiter(Awaiter &&a) : m_handle{a.m_handle} { a.m_handle = nullptr; }
    
    
      void resume() {
        m_handle();
        m_handle = nullptr;
      }
    
    private:
      std::coroutine_handle<> m_handle = nullptr;
    };
    

    One will observe that we destroy the coroutine only if it was not resumed. It is a movable only type.

    A thread safe queue

    Now that we have our Awaiter object, we must push them into a thread-safe queue. The new tasks will be pushed into the queue, and the thread pool will pop them one by one.

    Since the queue may be empty, the pop operation can return nothing, represented by a std::nullopt.

    template <typename T> class ThreadSafeQueue {
    public:
      void push(T t) {
        m_queue.with_lock([&](auto &queue) { queue.push(std::move(t)); });
        m_queue.notifyOne();
      }
    
      std::optional<T> pop() {
        std::optional<T> x;
        m_queue.with_lock([&](auto &queue) {
          if (!queue.empty()) {
            x.emplace(std::move(queue.front()));
            queue.pop();
          }
        });
        return x;
      }
    
      void waitForAnElement(std::stop_token st) {
        auto hasElement = [](const auto &x) { return !x.empty(); };
        m_queue.wait(hasElement, st);
      }
    
    private:
      ConditionVariable<std::queue<T>> m_queue;
    };

    We have 3 operations possible.

    1. Push: this operation enqueue a new task and notify the condition variable
    2. Pop: This operation deque a task to be executed in the current thread.
    3. Wait for an element: This operation will make the current thread idle until we got a new task (notified by the push function)

    The thread

    It is time to design our thread class.

    The thread class will be designed over the std::jthread class. It will also embed a thread-safe queue of Awaiters.

    Thus, we can lay:

    class Thread {
    private:
      ThreadSafeQueue<Awaiter> m_awaiters;
      std::jthread m_thread;
    };

    First, we can imagine what operation our thread must do:

    1. Adding tasks
    2. Schedule operation (thanks to the co_await operator)
    3. A background infinite loop that will pop tasks and execute them.
    public:
      Thread() {
        m_thread = std::jthread([this](std::stop_token st) { run(st); });
      }
    
      void addAwaiter(Awaiter &&awaiter) { m_awaiters.push(std::move(awaiter)); }
    
      auto id() { return m_thread.get_id(); }
    
      Awaitable operator co_await() { return {*this}; }
    
    private:
      void run(std::stop_token st) {
        while (!st.stop_requested()) {
          m_awaiters.waitForAnElement(st);
    
          auto awaiter = m_awaiters.pop();
    
          if (awaiter)
            awaiter->resume();
        }
      }

    There is nothing complicated, the run methods just wait for an element, pop awaiters, execute them if they are valid and that’s all.

    The co_await operator will just push the coroutine_handle to the thread thanks to the Awaitable object.

      struct Awaitable {
        Thread &thread;
        bool await_ready() { return false; }
    
        void await_suspend(std::coroutine_handle<> handle) {
          thread.addAwaiter({handle});
        }
    
        void await_resume() {}
      };

    Using this thread

    We schedule the operations thanks to the co_await operator.
    Here is an example, the task is a basic promise that never suspends. It means that the coroutine frame is destroyed at the end of the function.

    struct task {
      struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() noexcept { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() noexcept {}
    
        ~promise_type() {}
      };
    
      ~task() {}
    };
    
    std::atomic_int x = 0;
    
    std::atomic_int done = 0;
    
    task f(Thread &thread1, Thread &thread2) {
      co_await thread1;
      ++x;
    
      co_await thread2;
      ++x;
      ++done;
    }

    The operation behind the first co_await runs on the first thread, the operation behind the second co_await runs on the second thread. Really simple.

    Conclusion

    We finished the first article about creating a thread pool using coroutines. We introduced some utility classes and designed a concurrent queue. If you want to try, you can find a full code here.

    Thanks to  Nir Friedman to help me design mutex and condition variable in a better way :).