Blog

  • Thread pool with coroutines: Threads (1/3)

    Introduction

    In this little series of articles, we are going to see how to implement a thread pool usable with coroutines. This series will contain these articles :

    1. Creating a Thread
    2. Creating the pool
    3. Using future with the pool

    The final objective will be to be able to write something like that:

    ThreadPool threadPool;
    
    co_await threadPool;
    // Here we run on a threadPool thread
    
    auto future = schedule_on(threadPool, function, args...);
    
    co_await future;
    // Here we run on the asynchronous function thread

    Choice of implementation for the thread pool

    We will use the well-known work-stealing algorithm inside our thread pool. It implies that each thread has its own task queue and threads can steal tasks from each other. It will lead to concurrency with shared variables between threads, hence, we must be careful with data races.

    To deal with data races, I decided to make some homemade helpers inspired by the Rust programming language.

    Mutex

    Here is the first helper I have made. A mutex protects a resource from data race. So we can make a template class to protect the template argument. We use a callback to operate on a protected variable.

    #define FWD(x) std::forward<decltype(x)>(x)
    
    template <typename T> class Mutex {
    public:
      template <typename F> decltype(auto) with_lock(F &&f) {
        std::lock_guard lock{m_mutex};
        return std::invoke(FWD(f), m_value);
      }
      template <typename F> decltype(auto) with_lock(F &&f) const {
        std::shared_lock lock{m_mutex};
        return std::invoke(FWD(f), m_value);
      }
    
    protected:
      T m_value;
      mutable std::shared_mutex m_mutex;
    };

    Why do I use a shared mutex? I use a shared mutex because multiple readers are not an issue.

    Condition variable

    What are the events that can occured within a thread?

    1. The thread can be requested to stop
    2. The thread can have a new task to perform

    To not use CPU resources when the thread is not fed (i.e, there is no task to run), I decided to use a condition variable. The idea is simple, on the waiting thread, you wait for an event, and you go out of the wait function when the predicate is satisfied, and in another thread, you notify the condition variable to wake up.

    Since a condition variable is generally used with a Mutex, I decided to join them together through inheritance. Hence, a condition variable behaves like a mutex but can be waited on also.

    template <typename T> class ConditionVariable : public Mutex<T> {
    public:
      void notifyOne() { m_cond.notify_one(); }
      void notifyAll() { m_cond.notify_all(); }
    
      template <typename F> void wait(F f) {
        auto lock = std::unique_lock{this->m_mutex};
        m_cond.wait(lock, [&, this] { return std::invoke(f, this->m_value); });
      }
    
      template <typename F> void wait(F f, std::stop_token st) {
        auto lock = std::unique_lock{this->m_mutex};
        m_cond.wait(lock, st, [&, this] { return std::invoke(f, this->m_value); });
      }
    
    private:
      std::condition_variable_any m_cond;
    };

    You may wonder what is std::stop_token, it is simply a C++20 feature provided by std::jthread that avoid user to wait on an atomic boolean. Put it simply, a std::jthread, when it is destroyed, do two things:

    1. It calls request_stop to a std::stop_source that will notify the std::stop_token
    2. It joins the thread

    An Awaiter

    With coroutines, the task will not be a function, but a coroutine_handle which will be resumed. Hence, we need to have an object that manages this handle.

    struct Awaiter {
    public:
      Awaiter() {}
    
      ~Awaiter() {
        if (m_handle)
          m_handle.destroy();
      }
    
      template <typename... Args>
      Awaiter(std::coroutine_handle<Args...> handle) : m_handle{handle} {}
    
      Awaiter(const Awaiter &) = delete;
      Awaiter(Awaiter &&a) : m_handle{a.m_handle} { a.m_handle = nullptr; }
    
    
      void resume() {
        m_handle();
        m_handle = nullptr;
      }
    
    private:
      std::coroutine_handle<> m_handle = nullptr;
    };
    

    One will observe that we destroy the coroutine only if it was not resumed. It is a movable only type.

    A thread safe queue

    Now that we have our Awaiter object, we must push them into a thread-safe queue. The new tasks will be pushed into the queue, and the thread pool will pop them one by one.

    Since the queue may be empty, the pop operation can return nothing, represented by a std::nullopt.

    template <typename T> class ThreadSafeQueue {
    public:
      void push(T t) {
        m_queue.with_lock([&](auto &queue) { queue.push(std::move(t)); });
        m_queue.notifyOne();
      }
    
      std::optional<T> pop() {
        std::optional<T> x;
        m_queue.with_lock([&](auto &queue) {
          if (!queue.empty()) {
            x.emplace(std::move(queue.front()));
            queue.pop();
          }
        });
        return x;
      }
    
      void waitForAnElement(std::stop_token st) {
        auto hasElement = [](const auto &x) { return !x.empty(); };
        m_queue.wait(hasElement, st);
      }
    
    private:
      ConditionVariable<std::queue<T>> m_queue;
    };

    We have 3 operations possible.

    1. Push: this operation enqueue a new task and notify the condition variable
    2. Pop: This operation deque a task to be executed in the current thread.
    3. Wait for an element: This operation will make the current thread idle until we got a new task (notified by the push function)

    The thread

    It is time to design our thread class.

    The thread class will be designed over the std::jthread class. It will also embed a thread-safe queue of Awaiters.

    Thus, we can lay:

    class Thread {
    private:
      ThreadSafeQueue<Awaiter> m_awaiters;
      std::jthread m_thread;
    };

    First, we can imagine what operation our thread must do:

    1. Adding tasks
    2. Schedule operation (thanks to the co_await operator)
    3. A background infinite loop that will pop tasks and execute them.
    public:
      Thread() {
        m_thread = std::jthread([this](std::stop_token st) { run(st); });
      }
    
      void addAwaiter(Awaiter &&awaiter) { m_awaiters.push(std::move(awaiter)); }
    
      auto id() { return m_thread.get_id(); }
    
      Awaitable operator co_await() { return {*this}; }
    
    private:
      void run(std::stop_token st) {
        while (!st.stop_requested()) {
          m_awaiters.waitForAnElement(st);
    
          auto awaiter = m_awaiters.pop();
    
          if (awaiter)
            awaiter->resume();
        }
      }

    There is nothing complicated, the run methods just wait for an element, pop awaiters, execute them if they are valid and that’s all.

    The co_await operator will just push the coroutine_handle to the thread thanks to the Awaitable object.

      struct Awaitable {
        Thread &thread;
        bool await_ready() { return false; }
    
        void await_suspend(std::coroutine_handle<> handle) {
          thread.addAwaiter({handle});
        }
    
        void await_resume() {}
      };

    Using this thread

    We schedule the operations thanks to the co_await operator.
    Here is an example, the task is a basic promise that never suspends. It means that the coroutine frame is destroyed at the end of the function.

    struct task {
      struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() noexcept { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() noexcept {}
    
        ~promise_type() {}
      };
    
      ~task() {}
    };
    
    std::atomic_int x = 0;
    
    std::atomic_int done = 0;
    
    task f(Thread &thread1, Thread &thread2) {
      co_await thread1;
      ++x;
    
      co_await thread2;
      ++x;
      ++done;
    }

    The operation behind the first co_await runs on the first thread, the operation behind the second co_await runs on the second thread. Really simple.

    Conclusion

    We finished the first article about creating a thread pool using coroutines. We introduced some utility classes and designed a concurrent queue. If you want to try, you can find a full code here.

    Thanks to  Nir Friedman to help me design mutex and condition variable in a better way :).

  • Create a future with coroutine in C++

    I didn’t write on this blog for years now. Here is what I am doing right now. I am a C++ software engineer in Diagdev, a hematology-related company based in the south of France near Montpellier. I also developed a little library: Little Type Library. The goal of this library is to add range and concept like to C++17. It also brings other functional features with the pipe notation or the >> notation. We will see how to create a future using coroutine

    What is a coroutine?

    A coroutine, even if the name is complicated, is a simple thing.
    Functions begin, do things, and end. A coroutine is a kind of generalization over functions and they can be stopped and resumed in the same, or better, in another thread.

    A coroutine can be stopped either at the beginning, during the processing, or at the end.

    What is the problem with conventionnal future?

    Let’s say we have this code.

    #include <future>
    
    int main() {
        auto performSomething = []{};
        auto future = std::async(performSomething);
        
        doSomething();
        
        future.wait();
    }

    If performSomething and doSomething last the same duration, there is no issue, but if performSomething lasts longer, the future.wait() will make the main thread stalls during some instant, and so, you lose performance. If the performSomething lasts longer, there is a thread that does not do anything and so, we lose performance.

    Monadic expression is a way to solve it. The idea is to give the future one callback that will be automatically called when the asynchronous function ends. If you want to know more about this, I wrote an article a few years ago.

    How coroutines can solve the problem

    Actually, my solution using coroutine is like monadic expression. However, instead of giving a callback, you write the function normally.

    Here is the code we want to achieve.

    int square(int x) {
      return x * x;
    }
    
    task f() {
      auto squared6 = co_await async(square, 6);
    
      std::cout << squared6 << std::endl;
    }
    
    int main() {
      std::jthread thread;
      ::thread = &thread;
    
      f();
    
      doSomethingHeavy();
      return 0;
    }

    I don’t manage in a clean way my threads to be simpler. I used jthread instead of thread for its auto-join. This code is simple. The main function call f which is a coroutine. The coroutine f calls async with square and 6 as arguments. Async returns an awaitable awaited in f. The await launch square in another thread. f saves its state and returns to main. Once the thread finishes computing square, it resumes the coroutine f, and prints the squared value.

    Yes you read it properly, the std::cout << squared6 << std::endl is executed in the same thread as square.

    So, let’s create a future using coroutine !

    Async coroutine

    The job of the async coroutine is to return a future and launch the given function when we co_await the future. So here is the code :

    template <typename F, typename... Args>
    future<std::invoke_result_t<F, Args...>> async(F f, Args... args) {
      std::cout << "async" << std::endl;
      co_return f(args...);
    }

    Since we use the co_return keyword, async is a coroutine. We don’t want to execute f directly, so we need to stop the coroutine at the beginning. We also need to stop at the end to avoid use after free. Since the coroutine is stopped before its beginning, we need to save the result of the function. To finish, the future has the handle of the coroutine to be able to resume it.

    Here is the full code of future:

    template <typename T> struct future {
      struct promise_type {
        T value;
        future get_return_object() {
          return {std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        std::suspend_always initial_suspend() noexcept {
          return {};
        }
        std::suspend_always final_suspend() noexcept {
          return {};
        }
        void return_value(T x) {
          value = std::move(x);
        }
        void unhandled_exception() noexcept {}
      };
    
      std::coroutine_handle<promise_type> coro;
    
      future(std::coroutine_handle<promise_type> coro) : coro{coro} {}
    
      ~future() {
        if (coro)
          coro.destroy();
      }
    
      AwaitableFuture operator co_await() {
        return {*this};
      }
    };

    We see that the future has an co_await operator. This operator needs an awaitable object. An awaitable has 3 methods.

    • await_ready: to know if we must stop the coroutine or not
    • await_suspend: Called when we suspend the coroutine, it is here we will launch the other thread.
    • await_resume: To return the result to the called of co_await.

    For our future, the awaitable will look like that:

      struct AwaitableFuture {
        future &m_future;
        bool await_ready() const noexcept { return false; }
    
        void await_suspend(std::coroutine_handle<> handle) {
          *thread = std::jthread([this, handle] {
            m_future.coro.resume();
            handle.resume();
          });
        }
    
        T await_resume() {
          return m_future.coro.promise().value;
        }
      };

    Why do we need to call resume twice? Let’s remind the usage of the future :

    task f() {
      auto squared6 = co_await async(square, 6);
    
      std::cout << squared6 << std::endl;
    }

    f is a coroutine, but async also! The future coroutine is to resume the async one, and the await_suspend one is to resume f. The order is important, else, you will not compute the value before resuming f.

    Since f is a coroutine, here is the simple code for the task :

    
    struct task {
      struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() noexcept { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() noexcept {}
      };
    };

    Here is a full code with an online compiler if you want to try it.

    Conclusion

    I hope you understand this article well, and if you have some questions, don’t hesitate to ask :). For the future, I could write an article about how to create a when_all using coroutines, or how to use a thread pool with coroutines.

    Thanks for reading !

  • Multithreading in an expressive way with monadic expression

    Hi, there is a long time I have not written anything, sorry for that! I planned to write several articles during these few next weeks. This article will deal with multithreading.

    Introduction

    In a lot of articles, we can read that multithreading is not easy, and other things like that. In this article, we will learn how to write multi-threaded application in a simple and a fun way.
    The idea will be to write something like that

    int main() {
        auto test = when_all(
            [] {return 8; },    // return 8
            [] {return 25; }    // return 25
        ) // tuple(8, 25)
        .then(
            [](auto a, auto b) {return std::make_tuple(a, b); }, // return tuple(8, 25)
            [](auto a, auto b) {std::cout << a << "-" << b << " = "; }, // return nothing
            [](auto a, auto b) {return a - b; } // return - 17
        ) // tuple(tuple(8, 25), -17)
        .deferredExecutor();
    
        // Launch the chain of function in an asynchronous way
        test.execute(asynchronous_scheduler{});
        // Do some work very expensive
        auto result = test.waitAndGet();
        auto first = std::get<0>(result); // tuple(8, 25)
        auto second = std::get<1>(result); // -17
        std::cout << second << std::endl;
    
        return 0;
    }

    If we write a little scheme, it gives us that :

    Multithreading in action
    Multithreading in action

    Obviously, when the function f3 starts, functions f1 and f2 must have complete. The same thing happens for the getResult. It must wait for f3, f4, f5 to complete.

    Does it really exist?

    Actually, it does, and it will even be in the standard in C++20. However, the feature from the C++20 suffers from different problems that are explained in the Vittorio Romeo article. Actually, this article could be a continuation of the series of articles from Vittorio.

    Multithreading with Monadic Expression: Implementation

    I hope you really love read ununderstandable C++ code because the code is full of template and metaprogramming. I recommend to have some basis in really modern C++ or read the series from Vittorio I was talking before. We are going to see a lot of things both in multithreading and metaprogramming

    Why do we need a continuation of the series from Vittorio?

    Firstly, the code provided by Vittorio does not work on Visual Studio, and I really want to have my codes work on all compilers and all operating systems. Second, in its article, he only provides a version using a function waitAndGet that prevents us to insert code into the main thread in an easy way.
    The third issue is that the group of functions returns a std::tuple

    For example, you must have to write something like that

    when_all(
        []{return 8;},
        []{return 25;}
    ).then([](auto tuple) {auto [a, b] = tuple; return a - b;});
    
    // Instead of
    when_all(
        []{return 8;},
        []{return 25;}
    ).then([](auto a, auto b) {return a - b;});

    But, in reality, some people will prefer the first version, other will prefer the second version. What will happen if a function does not return any arguments? According to me, the second version is more natural.

    Forwarding

    Because we are going to use a lot of metaprogramming, we may need to perfect forward some arguments. Here are two macro that defines a perfect forwarding. There is one macro for a normal variable and another one for auto. I was not able to use the same one for both cases because sometimes, use the form ::std::forward<decltype(variable)>(variable); is too difficult for Visual Studio, that is why I provide these both declarations.

    #pragma once
    #include <utility>
    
    #define FWD(T, x) ::std::forward<T>(x)
    #define FWD_AUTO(x) ::std::forward<decltype(x)>(x)

    This code does not really need any explanations.

    A Latch

    A latch is useful when you want to know if a job is finished or not. Let’s say you have two thread, which one of them is waiting the result coming from the second one. The first one need to wait the second to finish.

    Latch
    Latch

    The following code shows how to implement a simple “bool_latch”

    #pragma once
    #include <mutex>
    #include <condition_variable>
    
    class bool_latch {
    public:
        /**
         * Function to call when the job is finished
         */
        void job_done() {
            {
                std::scoped_lock<std::mutex> lock(mMutex);
                mDone = true;
            }
            mConditionVariable.notify_one();
        }
    
        /**
         * Function to call when you want to wait for the job to be done
         */
        void wait_for_job_done() {
            std::unique_lock<std::mutex> lock(mMutex);
            mConditionVariable.wait(lock, [this] {return mDone; });
        }
    
    private:
        std::mutex mMutex;
        std::condition_variable mConditionVariable;
        bool mDone{ false };
    };
    

    The code is based on condition_variable, mutex, and a boolean variable

    The return type

    Each function may return one value. However, what returns a group of functions? The better choice is to return a std::tuple. Thus, if you have three functions that return an int at the same level (running in 3 parallels threads), the result will be a std::tuple<int, int, int>.

    Functions that return void.

    The simple way for this case is to create an empty type that we will name nothing. Its declaration is straightforward.

    struct nothing{};

    The function, instead to return nothing, must return the type nothing.
    Let’s say you have three functions, f1, f2, f3. f1 and f3 return an int and f2 returns nothing. You will get a std::tuple<int, nothing, int>
    How to return nothing instead of void? This function explains that in a straightforward way!

    namespace detail {
        template<typename F, typename ...Args>
        /**
         This function returns f(args...) when decltype(f(args...)) is not void. Returns nothing{} else.
         @param f - The function
         @param args... - Arguments to give to the function
         @result - returns f(args...) if possible, or nothing{} else.
        */
        inline decltype(auto) callThatReturnsNothingInsteadOfVoid(F &&f, Args &&...args) {
            if constexpr(std::is_void_v<std::invoke_result_t<F, Args...>>) {
                f(FWD(Args, args)...);
                return nothing{};
            }
    
            else
                return f(FWD(Args, args)...);
        }
    }

    Pass a tuple as an argument

    Okay, now say you have a std::tuple<int, double, std::string> and, you want to give it to one function with a prototype like returnType function(int, double, std::string).
    One way to do that is to use the function apply.

    Here there is no problem, but assume that there is a nothing in your tuple like std::tuple<int, nothing, int>.
    When you will apply this tuple to the function, you will also pass the nothing.

    To avoid such problem, you must filter all the arguments and store the arguments inside a lambda function.

    namespace detail {
        template<typename F>
        inline decltype(auto) callWithoutNothingAsArgument_Impl(F &&f) {
            return callThatReturnsNothingInsteadOfVoid(FWD(F, f));
        }
    
        template<typename F, typename T, typename ...Args>
        inline decltype(auto) callWithoutNothingAsArgument_Impl(F &&f, T &&t, Args &&...args) {
            return callWithoutNothingAsArgument_Impl([&f, &t](auto &&...xs) -> decltype(auto) {
                if constexpr(std::is_same_v < nothing, std::decay_t<T>>) {
                    return f(FWD_AUTO(xs)...);
                }
    
                else {
                    return f(FWD(T, t), FWD_AUTO(xs)...);
                }
            }, FWD(Args, args)...);
        }
    }
    
    template<typename F, typename ...Args>
    inline decltype(auto) callWithoutNothingAsArgument(F &&f, std::tuple<Args...> tuple) {
        return std::apply([&f](auto ...xs)->decltype(auto) {
            return detail::callWithoutNothingAsArgument_Impl(f, xs...);
        }, tuple);
    }

    Architecture

    This part will be the most difficult part of the article. We will see how the architecture work. It is an architecture with nodes. We will see three kinds of nodes. The root node that is the beginning of the chain of functions, the when_all node that can own one or several functions which will run in parallel and one node result_getter that represents the end of the chain of functions.

    Overview

    Overview of multithreading
    Overview of multithreading

    As you may have noticed, there is no type erasure. The type owned by the caller is the result_getter. However, the result_getter, when it executes all the functions, it must return to the root. And to do that, it must go through the when_all.

    Now go to explain whole the code!

    root

    #pragma once
    #include <tuple>
    #include "fwd.h"
    
    namespace detail {
        class root {
            template<typename Parent, typename... F>
            friend class when_all;
    
            // A node produce nothing
            using output_type = std::tuple<>;
    
        private:
            // Once you are at the root, you can call the execute function
            template<typename Scheduler, typename Child, typename ...GrandChildren>
            void goToRootAndExecute(Scheduler &&s, Child &directChild, GrandChildren &... grandChildren) & {
                execute(FWD(Scheduler, s), directChild, grandChildren...);
            }
    
            // You must use the scheduler
            template<typename Scheduler, typename Child, typename ...GrandChildren>
            void execute(Scheduler &&s, Child &directChild, GrandChildren &... grandChildren) & {
                s([&]() -> decltype(auto) {return directChild.execute(FWD(Scheduler, s), output_type{}, grandChildren...); });
            }
        };
    }

    There is nothing difficult here. The when_all is a friend. There are two functions, the first one is called by the children and its purpose it’s only to reach the top. After, you execute the child functions through the scheduler.

    when_all

    #pragma once
    #include 
    #include "enumerate_args.h"
    #include "result_getter.h"
    
    namespace detail {
        struct movable_atomic_size_t : std::atomic_size_t {
            using std::atomic_size_t::atomic;
            movable_atomic_size_t(movable_atomic_size_t &&v) : std::atomic_size_t(v.load(std::memory_order_acquire)) {}
        };
    
        template
        class when_all : Parent, Fs... {
            friend class root;
    
            template
            friend class ::detail::when_all;
    
            template
            friend class result_getter;
    
            using input_type = typename Parent::output_type;
            using output_type = std::tuple(), std::declval()))...>;
    
        public:
            /** There is SFINAE here because of something "kind of weird".
                Let's say you have to build something like Parent(std::move(parent)). Instead to try to reach the move constructor,
                it will try to instantiate this function with FsFWD = empty. However, Fs = {f1, f2, f3...};
                The SFINAE is to forbid this error, and so, Parent(parent) will reach the move constructor **/
            template 0)>>
            when_all(ParentFWD &&parent, FsFWD && ...f) : Parent(FWD(ParentFWD, parent)), Fs(FWD(FsFWD, f))... {}
    
            template
            decltype(auto) then(FFwd &&...ffwd) && {
                static_assert(sizeof...(FFwd) > 0, "Must have several functions objects");
                return make_when_all(std::move(*this), FWD(FFwd, ffwd)...);
            }
    
            decltype(auto) deferredExecutor() && {
                return make_result_getter(std::move(*this));
            }
    
            template
            decltype(auto) executeWaitAndGet(Scheduler &&s) && {
                auto f = deferredExecutor();
                f.execute(FWD(Scheduler, s));
                return f.waitAndGet();
            }
    
        private:
            Parent &getParent() {
                return static_cast(*this);
            }
    
            template
            void goToRootAndExecute(Scheduler &&s, Children&... children) & {
                // this is for ADL
                this->getParent().goToRootAndExecute(FWD(Scheduler, s), *this, children...);
            }
    
            template
            void execute(Scheduler &&s, input_type r, Child &directChild, GrandChildren &...grandChildren) & {
                auto exec = [&, r](auto i, auto &f) {
                    ::std::get < i >(mResult) = callWithoutNothingAsArgument(f, r);
    
                    if (mLeft.fetch_sub(1, std::memory_order::memory_order_release) == 1)
                        directChild.execute(FWD(Scheduler, s), std::move(mResult), grandChildren...);
                };
    
                auto executeOneFunction = [&s, &exec](auto i, auto f) {
                    if constexpr(i == sizeof...(Fs)-1)
                        exec(i, f);
    
                    else {
                        s([exec, i, f]() {
                            exec(i, f);
                        });
                    }
                };
    
                enumerate_args(executeOneFunction, static_cast(*this)...);
            }
    
        private:
            output_type mResult;
            movable_atomic_size_t mLeft{ sizeof...(Fs) };
        };
    
        template
        inline auto make_when_all(Parent &&parent, F&& ...f) {
            return when_all, std::decay_t...>{FWD(Parent, parent), FWD(F, f)...};
        }
    }
    
    template
    auto when_all(F&& ...f) {
        static_assert(sizeof...(F) > 0, "Must have several functions objects");
        return ::detail::make_when_all(detail::root{}, FWD(F, f)...);
    }

    This is the most difficult part.
    However, everything is not difficult. There are two things that are difficult. The output_type and the execute function.

    using input_type = typename Parent::output_type;
    using output_type = std::tuple<decltype(callWithoutNothingAsArgument(std::declval<Fs&>(), std::declval<input_type>()))...>;
    

    The input_type is straightforward, we take the output from the parent. The output_type is a bit tricky. The idea is to call all the function, and for each of them, add the result to a tuple.

    template
    void execute(Scheduler &&s, input_type r, Child &directChild, GrandChildren &...grandChildren) & {
        auto exec = [&, r](auto i, auto &f) {
            ::std::get < i >(mResult) = callWithoutNothingAsArgument(f, r);
    
            if (mLeft.fetch_sub(1, std::memory_order::memory_order_release) == 1)
                directChild.execute(FWD(Scheduler, s), std::move(mResult), grandChildren...);
        };
    
        auto executeOneFunction = [&s, &exec](auto i, auto f) {
            if constexpr(i == sizeof...(Fs)-1)
                exec(i, f);
    
            else {
                s([exec, i, f]() {
                    exec(i, f);
                });
            }
        };
        enumerate_args(executeOneFunction, static_cast(*this)...);
    }
    

    There are several things to think about.
    The exec lambda is the function that will be executed in a parallel thread. We store the value inside the good position of the tuple. If all functions have finished, we call the execute function for the directChild. This condition is checked by mLeft.

    The executeOneFunction lambda is the function that computes if the function must be launched through the scheduler or not. (If there is only one function, there is no need to launch it through the scheduler because the root already did that).

    The enumerate_args execute the function with each arguments, and give the index as an argument as well.

    The enumerate_args is following :

    #pragma once
    
    #include <type_traits>
    #include "fwd.h"
    
    namespace detail {
        template<typename F, std::size_t ...Is, typename ...Args>
        void enumerate_args_impl(F &&f, std::index_sequence<Is...>, Args && ...args) {
            using expander = int[];
            expander{ 0, ((void)f(std::integral_constant<std::size_t, Is>{}, FWD(Args, args)), 0)... };
        }
    }
    
    template<typename F, typename ...Args, typename Indices = std::index_sequence_for<Args...>>
    void enumerate_args(F &&f, Args &&...args) {
        detail::enumerate_args_impl(FWD(F, f), Indices{}, FWD(Args, args)...);
    }
    

    result_getter

    Once you are here, you may have already build your own result_getter

    #pragma once
    #include "bool_latch.h"
    #include "nothing.h"
    
    namespace detail {
    
        template<typename Parent>
        class result_getter : Parent {
            using input_type = typename Parent::output_type;
            using result_type = epurated_tuple_without_nothing_t<input_type>;
    
            template<typename, typename...>
            friend class when_all;
    
        public:
            template<typename ParentFWD>
            result_getter(ParentFWD &&parent) : Parent(std::move(parent)) {}
    
            template<typename Scheduler>
            void execute(Scheduler &&s) & {
                // this is for ADL
                this->getParent().goToRootAndExecute(FWD(Scheduler, s), *this);
            }
    
            result_type waitAndGet() & {
                mLatch.wait_for_job_done();
                return mResult;
            }
    
        private:
            Parent &getParent() {
                return static_cast<Parent&>(*this);
            }
    
            template<typename Scheduler>
            void execute(Scheduler &&, input_type r) & {
                auto setResult = [this](auto ...xs) {
                    if constexpr (sizeof...(xs) == 1)
                        mResult = result_type{ xs... };
    
                    else
                        mResult = std::make_tuple(xs...);
    
                    mLatch.job_done();
                };
    
                callWithoutNothingAsArgument(setResult, r);
            }
    
        private:
            result_type mResult;
            bool_latch mLatch;
        };
    
        // Ensure move semantic
        template<typename Parent, typename = std::enable_if_t<std::is_rvalue_reference_v<Parent&&>>>
        inline auto make_result_getter(Parent &&parent) {
            return result_getter<std::decay_t<Parent>>{std::move(parent)};
        }
    }
    

    The idea here is to execute the function and wait for the latch to return the result. All the part for epurated tuple is done here :

    namespace detail {
        template<typename...>
        struct tuple_without_nothing_impl;
    
        template<typename Tuple>
        struct tuple_without_nothing_impl<Tuple> {
            using type = Tuple;
        };
    
        template<typename ...PreviousArgs, typename T, typename ...NextArgs>
        struct tuple_without_nothing_impl<std::tuple<PreviousArgs...>, T, NextArgs...> {
            using type = std::conditional_t<
                std::is_same_v<T, nothing>,
                typename tuple_without_nothing_impl<std::tuple<PreviousArgs...>, NextArgs...>::type,
                typename tuple_without_nothing_impl<std::tuple<PreviousArgs..., T>, NextArgs...>::type>;
        };
    }
    
    template<typename Tuple>
    struct tuple_without_nothing;
    
    template<typename ...Args>
    struct tuple_without_nothing<std::tuple<Args...>> {
        using type = typename detail::tuple_without_nothing_impl<std::tuple<>, Args...>::type;
    };
    
    
    template<typename Tuple>
    using tuple_without_nothing_t = typename tuple_without_nothing<Tuple>::type;
    
    template<typename Tuple>
    struct epurated_tuple {
        using type = std::conditional_t<
            std::tuple_size_v<Tuple> == 1,
            std::tuple_element_t<0, Tuple>,
            tuple_without_nothing_t<Tuple>
        >;
    };
    
    template<>
    struct epurated_tuple<std::tuple<>> {
        using type = std::tuple<>;
    };
    
    template<typename Tuple>
    using epurated_tuple_without_nothing_t = typename epurated_tuple<tuple_without_nothing_t<Tuple>>::type;

    The idea is to remove the nothings from the tuple, and if the tuple owns only one type, the tuple is removed and we get only the type.

    Conclusion

    In this article, you learned how to write safe and easy to read multi-threaded functions. There is still a lot of things to do, but it is really usable and easy to use. And if someone else has to read your code, he should be able to do so.

    If you want to test it online, you can go on wandbox

    Reference

    Vittorio Romeo