#include #include #include #include #include #include #include #include #include /// A simple thread pool executor. /// Not at all optimized, and probably terrible for latency. struct ThreadPool { ThreadPool() = default; // shorthand to call launch(nrThreads) automatically inline explicit ThreadPool(std::size_t nrThreads) { launch(nrThreads); } // move could be allowed, I guess ThreadPool(const ThreadPool&) = delete; ThreadPool(ThreadPool&&) = delete; inline ~ThreadPool() { shutdown(); } // takes anything that is callable with void() signature // This includes capturable lambdas, so be careful or make sure you're locking state! template void add_task(Callable&& cb) { auto worker = PickWorker(); //printf("picked worker %zu\n", worker); // N.B: These wrappers still allow the thread to progress if(QueueLength(worker) >= 4) { //std::printf("queue for worker %zu too large. Blocking until it is empty\n", worker); while(!QueueEmpty(worker)) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } // add it to the task queue for that thread { std::unique_lock lk(this->taskQueues[worker].lock); taskQueues[worker].queue.push_front(cb); } // Wake threads up if they are waiting for work queueCv.notify_all(); } void launch(std::size_t nrThreads); // Shutdown the thread pool void shutdown(); private: // could just use unique_ptr for both of these, // or an analogue, since they will only increase or decrease in size on a call to launch() std::vector threads {}; // or analogue std::size_t nrThreads = 0; struct TaskQueue { std::mutex lock {}; std::deque> queue {}; }; TaskQueue* taskQueues {}; // Used to notify threads when work is available or to shutdown std::condition_variable queueCv {}; /// Used to notify on shutdown std::atomic_bool threadsShouldShutdown { false }; // implement these out of line std::size_t QueueLength(std::size_t worker) const { std::unique_lock lk(this->taskQueues[worker].lock); return this->taskQueues[worker].queue.size(); } bool QueueEmpty(std::size_t worker) const { std::unique_lock lk(this->taskQueues[worker].lock); return this->taskQueues[worker].queue.empty(); } std::size_t PickWorker() const { return std::rand() % nrThreads; } static void ThreadEntry(ThreadPool* pPool, std::size_t myIndex); };