diff --git a/Makefile b/Makefile index 7ef94aa..1bb520f 100644 --- a/Makefile +++ b/Makefile @@ -5,11 +5,18 @@ all: vxorg tree_test clean: rm vxorg tree_test *.o -vxorg: vxorg.o vxheaven_parse.o +vxorg: vxorg.o vxheaven_parse.o threadpool.o $(CXX) $^ -o $@ tree_test: tree_test.o $(CXX) $^ -o $@ %.o: %.cpp - $(CXX) -c $< -o $@ \ No newline at end of file + $(CXX) -c $< -o $@ + + +# dep rules +# I feel like it's 1970 again +tree_test.o: tree.hpp +vxorg.o: tree.hpp vxheaven_parse.hpp threadpool.hpp +vxheaven_parse.o: tree.hpp vxheaven_parse.hpp \ No newline at end of file diff --git a/threadpool.cpp b/threadpool.cpp new file mode 100644 index 0000000..1c5b4c8 --- /dev/null +++ b/threadpool.cpp @@ -0,0 +1,70 @@ +#include "threadpool.hpp" + +void ThreadPool::ThreadEntry(ThreadPool* pPool, std::size_t myIndex) { + auto& pool = *pPool; + + // set a cutesy name +#ifdef __linux__ + pthread_setname_np(pthread_self(), "PoolWorker"); +#endif + + // The thread loop + while(true) { + { + // wait for at least a single task, or shutdown notification (one of the two) + std::unique_lock lk(pool.taskQueues[myIndex].lock); + pool.queueCv.wait(lk, [&]() { + if(pool.threadsShouldShutdown) + return true; + return !pool.taskQueues[myIndex].queue.empty(); + }); + } + + // Exit if the pool is to shutdown + if(pool.threadsShouldShutdown && pPool->QueueEmpty(myIndex)) + break; + + // pop and run tasks until we run out of tasks to run + { + std::unique_lock lk(pool.taskQueues[myIndex].lock); + + // TODO: Work-steal from other threads. + + while(!pool.taskQueues[myIndex].queue.empty()) { + auto& cb = pool.taskQueues[myIndex].queue.back(); + cb(); + pool.taskQueues[myIndex].queue.pop_back(); + } + } + } + +} + +void ThreadPool::launch(std::size_t nrThreads) { + threadsShouldShutdown = false; + this->nrThreads = nrThreads; + + threads.resize(this->nrThreads); + taskQueues = new TaskQueue[this->nrThreads]; + + for(std::size_t i = 0; i < this->nrThreads; ++i) + threads.emplace_back(std::thread(&ThreadEntry, this, i)); +} + +// Shutdown the thread pool +void ThreadPool::shutdown() { + if(!threadsShouldShutdown) + threadsShouldShutdown = true; + + queueCv.notify_all(); + + // join all the threads (if possible) to make sure they all exit + for(auto& thread : threads) + if(thread.joinable()) + thread.join(); + + nrThreads = 0; + + delete[] taskQueues; + taskQueues = nullptr; +} \ No newline at end of file diff --git a/threadpool.hpp b/threadpool.hpp new file mode 100644 index 0000000..9eb06ac --- /dev/null +++ b/threadpool.hpp @@ -0,0 +1,90 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/// A simple thread pool executor. +/// Not at all optimized, and probably terrible for latency. +struct ThreadPool { + ThreadPool() = default; + + // shorthand to call launch(nrThreads) automatically + inline explicit ThreadPool(std::size_t nrThreads) { launch(nrThreads); } + + // move could be allowed, I guess + ThreadPool(const ThreadPool&) = delete; + ThreadPool(ThreadPool&&) = delete; + + inline ~ThreadPool() { shutdown(); } + + // takes anything that is callable with void() signature + // This includes capturable lambdas, so be careful or make sure you're locking state! + template + void add_task(Callable&& cb) { + auto worker = PickWorker(); + + //printf("picked worker %zu\n", worker); + + // N.B: These wrappers still allow the thread to progress + if(QueueLength(worker) >= 4) { + //std::printf("queue for worker %zu too large. Blocking until it is empty\n", worker); + while(!QueueEmpty(worker)) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + + // add it to the task queue for that thread + { + std::unique_lock lk(this->taskQueues[worker].lock); + taskQueues[worker].queue.push_front(cb); + } + + // Wake threads up if they are waiting for work + queueCv.notify_all(); + } + + void launch(std::size_t nrThreads); + // Shutdown the thread pool + void shutdown(); + + private: + // could just use unique_ptr for both of these, + // or an analogue, since they will only increase or decrease in size on a call to launch() + std::vector threads {}; // or analogue + + std::size_t nrThreads = 0; + + struct TaskQueue { + std::mutex lock {}; + std::deque> queue {}; + }; + + TaskQueue* taskQueues {}; + + // Used to notify threads when work is available or to shutdown + std::condition_variable queueCv {}; + + /// Used to notify on shutdown + std::atomic_bool threadsShouldShutdown { false }; + + std::size_t QueueLength(std::size_t worker) const { + std::unique_lock lk(this->taskQueues[worker].lock); + return this->taskQueues[worker].queue.size(); + } + + bool QueueEmpty(std::size_t worker) const { + std::unique_lock lk(this->taskQueues[worker].lock); + return this->taskQueues[worker].queue.empty(); + } + + std::size_t PickWorker() const { + return std::rand() % nrThreads; + } + + static void ThreadEntry(ThreadPool* pPool, std::size_t myIndex); +}; \ No newline at end of file diff --git a/vxorg.cpp b/vxorg.cpp index f7c3b8a..e08ff6a 100644 --- a/vxorg.cpp +++ b/vxorg.cpp @@ -5,6 +5,7 @@ #include #include "indicators/terminal_size.hpp" +#include "threadpool.hpp" #include "tree.hpp" #include "vxheaven_parse.hpp" @@ -14,6 +15,9 @@ int main() { std::ifstream ifs("./testdata/samples.sort"); vxorg::VxHeavenTree tree; + // used for os filesystem ops + ThreadPool fsPool(4); + vxorg::parse_into_tree(tree, ifs); std::filesystem::path root = std::filesystem::current_path() / "testdata"; @@ -66,7 +70,6 @@ int main() { #endif #if 1 - if(!node->is_root()) { if(data.is_sample) { std::string sample_name = vxorg::get_sample_name(node); @@ -79,23 +82,28 @@ int main() { std::printf("WARNING: sample %s/%s in tree (source disk file %s) does not exist\n", path.string().c_str(), sample_name.c_str(), source_path.string().c_str()); } else { - if(!std::filesystem::exists(path)) { - std::filesystem::create_directories(path); - } + fsPool.add_task([path, source_path, sample_name, &bar]() { + bar.set_option(ind::option::PostfixText { std::format("Moving {}", sample_name) }); - bar.set_option(ind::option::PostfixText { std::format("Moving {}", sample_name) }); + auto dest_path = path / sample_name; - // FIXME Make this a move - std::filesystem::copy_file(source_path, path / vxorg::get_sample_name(node)); + // possibly TOCTOUable but it should:tm: be fine? + if(!std::filesystem::exists(path)) { + std::filesystem::create_directories(path); + } - bar.tick(); + if(std::filesystem::exists(dest_path)) { + std::filesystem::remove(dest_path); + } + + std::filesystem::rename(source_path, dest_path); + bar.tick(); + }); } - - // std::printf("sample %s/%s in tree (source disk file %s)\n", path.string().c_str(), sample_name.c_str(), - // source_path.string().c_str()); } } #endif }); + return 0; } \ No newline at end of file