use threadpool for stuff

This commit is contained in:
Lily Tsuru 2024-10-22 00:49:49 -04:00
parent a997a356d2
commit 9af77c6159
4 changed files with 188 additions and 13 deletions

View file

@ -5,7 +5,7 @@ all: vxorg tree_test
clean: clean:
rm vxorg tree_test *.o rm vxorg tree_test *.o
vxorg: vxorg.o vxheaven_parse.o vxorg: vxorg.o vxheaven_parse.o threadpool.o
$(CXX) $^ -o $@ $(CXX) $^ -o $@
tree_test: tree_test.o tree_test: tree_test.o
@ -13,3 +13,10 @@ tree_test: tree_test.o
%.o: %.cpp %.o: %.cpp
$(CXX) -c $< -o $@ $(CXX) -c $< -o $@
# dep rules
# I feel like it's 1970 again
tree_test.o: tree.hpp
vxorg.o: tree.hpp vxheaven_parse.hpp threadpool.hpp
vxheaven_parse.o: tree.hpp vxheaven_parse.hpp

70
threadpool.cpp Normal file
View file

@ -0,0 +1,70 @@
#include "threadpool.hpp"
void ThreadPool::ThreadEntry(ThreadPool* pPool, std::size_t myIndex) {
auto& pool = *pPool;
// set a cutesy name
#ifdef __linux__
pthread_setname_np(pthread_self(), "PoolWorker");
#endif
// The thread loop
while(true) {
{
// wait for at least a single task, or shutdown notification (one of the two)
std::unique_lock lk(pool.taskQueues[myIndex].lock);
pool.queueCv.wait(lk, [&]() {
if(pool.threadsShouldShutdown)
return true;
return !pool.taskQueues[myIndex].queue.empty();
});
}
// Exit if the pool is to shutdown
if(pool.threadsShouldShutdown && pPool->QueueEmpty(myIndex))
break;
// pop and run tasks until we run out of tasks to run
{
std::unique_lock lk(pool.taskQueues[myIndex].lock);
// TODO: Work-steal from other threads.
while(!pool.taskQueues[myIndex].queue.empty()) {
auto& cb = pool.taskQueues[myIndex].queue.back();
cb();
pool.taskQueues[myIndex].queue.pop_back();
}
}
}
}
void ThreadPool::launch(std::size_t nrThreads) {
threadsShouldShutdown = false;
this->nrThreads = nrThreads;
threads.resize(this->nrThreads);
taskQueues = new TaskQueue[this->nrThreads];
for(std::size_t i = 0; i < this->nrThreads; ++i)
threads.emplace_back(std::thread(&ThreadEntry, this, i));
}
// Shutdown the thread pool
void ThreadPool::shutdown() {
if(!threadsShouldShutdown)
threadsShouldShutdown = true;
queueCv.notify_all();
// join all the threads (if possible) to make sure they all exit
for(auto& thread : threads)
if(thread.joinable())
thread.join();
nrThreads = 0;
delete[] taskQueues;
taskQueues = nullptr;
}

90
threadpool.hpp Normal file
View file

@ -0,0 +1,90 @@
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <cstdlib>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
/// A simple thread pool executor.
/// Not at all optimized, and probably terrible for latency.
struct ThreadPool {
ThreadPool() = default;
// shorthand to call launch(nrThreads) automatically
inline explicit ThreadPool(std::size_t nrThreads) { launch(nrThreads); }
// move could be allowed, I guess
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
inline ~ThreadPool() { shutdown(); }
// takes anything that is callable with void() signature
// This includes capturable lambdas, so be careful or make sure you're locking state!
template <class Callable>
void add_task(Callable&& cb) {
auto worker = PickWorker();
//printf("picked worker %zu\n", worker);
// N.B: These wrappers still allow the thread to progress
if(QueueLength(worker) >= 4) {
//std::printf("queue for worker %zu too large. Blocking until it is empty\n", worker);
while(!QueueEmpty(worker)) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// add it to the task queue for that thread
{
std::unique_lock lk(this->taskQueues[worker].lock);
taskQueues[worker].queue.push_front(cb);
}
// Wake threads up if they are waiting for work
queueCv.notify_all();
}
void launch(std::size_t nrThreads);
// Shutdown the thread pool
void shutdown();
private:
// could just use unique_ptr<T[]> for both of these,
// or an analogue, since they will only increase or decrease in size on a call to launch()
std::vector<std::thread> threads {}; // or analogue
std::size_t nrThreads = 0;
struct TaskQueue {
std::mutex lock {};
std::deque<std::function<void()>> queue {};
};
TaskQueue* taskQueues {};
// Used to notify threads when work is available or to shutdown
std::condition_variable queueCv {};
/// Used to notify on shutdown
std::atomic_bool threadsShouldShutdown { false };
std::size_t QueueLength(std::size_t worker) const {
std::unique_lock lk(this->taskQueues[worker].lock);
return this->taskQueues[worker].queue.size();
}
bool QueueEmpty(std::size_t worker) const {
std::unique_lock lk(this->taskQueues[worker].lock);
return this->taskQueues[worker].queue.empty();
}
std::size_t PickWorker() const {
return std::rand() % nrThreads;
}
static void ThreadEntry(ThreadPool* pPool, std::size_t myIndex);
};

View file

@ -5,6 +5,7 @@
#include <string> #include <string>
#include "indicators/terminal_size.hpp" #include "indicators/terminal_size.hpp"
#include "threadpool.hpp"
#include "tree.hpp" #include "tree.hpp"
#include "vxheaven_parse.hpp" #include "vxheaven_parse.hpp"
@ -14,6 +15,9 @@ int main() {
std::ifstream ifs("./testdata/samples.sort"); std::ifstream ifs("./testdata/samples.sort");
vxorg::VxHeavenTree tree; vxorg::VxHeavenTree tree;
// used for os filesystem ops
ThreadPool fsPool(4);
vxorg::parse_into_tree(tree, ifs); vxorg::parse_into_tree(tree, ifs);
std::filesystem::path root = std::filesystem::current_path() / "testdata"; std::filesystem::path root = std::filesystem::current_path() / "testdata";
@ -66,7 +70,6 @@ int main() {
#endif #endif
#if 1 #if 1
if(!node->is_root()) { if(!node->is_root()) {
if(data.is_sample) { if(data.is_sample) {
std::string sample_name = vxorg::get_sample_name(node); std::string sample_name = vxorg::get_sample_name(node);
@ -79,23 +82,28 @@ int main() {
std::printf("WARNING: sample %s/%s in tree (source disk file %s) does not exist\n", path.string().c_str(), sample_name.c_str(), std::printf("WARNING: sample %s/%s in tree (source disk file %s) does not exist\n", path.string().c_str(), sample_name.c_str(),
source_path.string().c_str()); source_path.string().c_str());
} else { } else {
fsPool.add_task([path, source_path, sample_name, &bar]() {
bar.set_option(ind::option::PostfixText { std::format("Moving {}", sample_name) });
auto dest_path = path / sample_name;
// possibly TOCTOUable but it should:tm: be fine?
if(!std::filesystem::exists(path)) { if(!std::filesystem::exists(path)) {
std::filesystem::create_directories(path); std::filesystem::create_directories(path);
} }
bar.set_option(ind::option::PostfixText { std::format("Moving {}", sample_name) }); if(std::filesystem::exists(dest_path)) {
std::filesystem::remove(dest_path);
// FIXME Make this a move
std::filesystem::copy_file(source_path, path / vxorg::get_sample_name(node));
bar.tick();
} }
// std::printf("sample %s/%s in tree (source disk file %s)\n", path.string().c_str(), sample_name.c_str(), std::filesystem::rename(source_path, dest_path);
// source_path.string().c_str()); bar.tick();
});
}
} }
} }
#endif #endif
}); });
return 0; return 0;
} }