92 lines
No EOL
2.5 KiB
C++
92 lines
No EOL
2.5 KiB
C++
#include <atomic>
|
|
#include <condition_variable>
|
|
#include <cstddef>
|
|
#include <cstdlib>
|
|
#include <deque>
|
|
#include <functional>
|
|
#include <mutex>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
/// A simple thread pool executor.
|
|
/// Not at all optimized, and probably terrible for latency.
|
|
struct ThreadPool {
|
|
ThreadPool() = default;
|
|
|
|
// shorthand to call launch(nrThreads) automatically
|
|
inline explicit ThreadPool(std::size_t nrThreads) { launch(nrThreads); }
|
|
|
|
// move could be allowed, I guess
|
|
ThreadPool(const ThreadPool&) = delete;
|
|
ThreadPool(ThreadPool&&) = delete;
|
|
|
|
inline ~ThreadPool() { shutdown(); }
|
|
|
|
// takes anything that is callable with void() signature
|
|
// This includes capturable lambdas, so be careful or make sure you're locking state!
|
|
template <class Callable>
|
|
void add_task(Callable&& cb) {
|
|
auto worker = PickWorker();
|
|
|
|
//printf("picked worker %zu\n", worker);
|
|
|
|
// N.B: These wrappers still allow the thread to progress
|
|
if(QueueLength(worker) >= 4) {
|
|
//std::printf("queue for worker %zu too large. Blocking until it is empty\n", worker);
|
|
while(!QueueEmpty(worker)) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
}
|
|
}
|
|
|
|
// add it to the task queue for that thread
|
|
{
|
|
std::unique_lock lk(this->taskQueues[worker].lock);
|
|
taskQueues[worker].queue.push_front(cb);
|
|
}
|
|
|
|
// Wake threads up if they are waiting for work
|
|
queueCv.notify_all();
|
|
}
|
|
|
|
void launch(std::size_t nrThreads);
|
|
// Shutdown the thread pool
|
|
void shutdown();
|
|
|
|
private:
|
|
// could just use unique_ptr<T[]> for both of these,
|
|
// or an analogue, since they will only increase or decrease in size on a call to launch()
|
|
std::vector<std::thread> threads {}; // or analogue
|
|
|
|
std::size_t nrThreads = 0;
|
|
|
|
struct TaskQueue {
|
|
std::mutex lock {};
|
|
std::deque<std::function<void()>> queue {};
|
|
};
|
|
|
|
TaskQueue* taskQueues {};
|
|
|
|
// Used to notify threads when work is available or to shutdown
|
|
std::condition_variable queueCv {};
|
|
|
|
/// Used to notify on shutdown
|
|
std::atomic_bool threadsShouldShutdown { false };
|
|
|
|
// implement these out of line
|
|
|
|
std::size_t QueueLength(std::size_t worker) const {
|
|
std::unique_lock lk(this->taskQueues[worker].lock);
|
|
return this->taskQueues[worker].queue.size();
|
|
}
|
|
|
|
bool QueueEmpty(std::size_t worker) const {
|
|
std::unique_lock lk(this->taskQueues[worker].lock);
|
|
return this->taskQueues[worker].queue.empty();
|
|
}
|
|
|
|
std::size_t PickWorker() const {
|
|
return std::rand() % nrThreads;
|
|
}
|
|
|
|
static void ThreadEntry(ThreadPool* pPool, std::size_t myIndex);
|
|
}; |