vxorg/threadpool.hpp

92 lines
2.5 KiB
C++

#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <cstdlib>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
/// A simple thread pool executor.
/// Not at all optimized, and probably terrible for latency.
struct ThreadPool {
ThreadPool() = default;
// shorthand to call launch(nrThreads) automatically
inline explicit ThreadPool(std::size_t nrThreads) { launch(nrThreads); }
// move could be allowed, I guess
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
inline ~ThreadPool() { shutdown(); }
// takes anything that is callable with void() signature
// This includes capturable lambdas, so be careful or make sure you're locking state!
template <class Callable>
void add_task(Callable&& cb) {
auto worker = PickWorker();
//printf("picked worker %zu\n", worker);
// N.B: These wrappers still allow the thread to progress
if(QueueLength(worker) >= 4) {
//std::printf("queue for worker %zu too large. Blocking until it is empty\n", worker);
while(!QueueEmpty(worker)) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// add it to the task queue for that thread
{
std::unique_lock lk(this->taskQueues[worker].lock);
taskQueues[worker].queue.push_front(cb);
}
// Wake threads up if they are waiting for work
queueCv.notify_all();
}
void launch(std::size_t nrThreads);
// Shutdown the thread pool
void shutdown();
private:
// could just use unique_ptr<T[]> for both of these,
// or an analogue, since they will only increase or decrease in size on a call to launch()
std::vector<std::thread> threads {}; // or analogue
std::size_t nrThreads = 0;
struct TaskQueue {
std::mutex lock {};
std::deque<std::function<void()>> queue {};
};
TaskQueue* taskQueues {};
// Used to notify threads when work is available or to shutdown
std::condition_variable queueCv {};
/// Used to notify on shutdown
std::atomic_bool threadsShouldShutdown { false };
// implement these out of line
std::size_t QueueLength(std::size_t worker) const {
std::unique_lock lk(this->taskQueues[worker].lock);
return this->taskQueues[worker].queue.size();
}
bool QueueEmpty(std::size_t worker) const {
std::unique_lock lk(this->taskQueues[worker].lock);
return this->taskQueues[worker].queue.empty();
}
std::size_t PickWorker() const {
return std::rand() % nrThreads;
}
static void ThreadEntry(ThreadPool* pPool, std::size_t myIndex);
};