diff --git a/CMakeLists.txt b/CMakeLists.txt index 1a1f6d8..bc402bc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,7 +14,7 @@ option(ENABLE_SHARED "Build shared library" ON) option(ENABLE_PROFILING "Enable asan" OFF) mark_as_advanced(ENABLE_PROFILING) -set(SOURCE_LIST "src/filerd.cpp") +set(SOURCE_LIST "src/filerd.cpp" "src/threadpool.cpp") add_library(ensure OBJECT "src/ensure.cpp") target_compile_options(ensure PRIVATE -Wall -Wextra -pedantic -std=c++17) if(ENABLE_SHARED) @@ -29,7 +29,7 @@ if(ENABLE_PROFILING) target_link_options(rexy PRIVATE -fsanitize=address -fno-omit-frame-pointer -fno-optimize-sibling-calls) endif() -set(LIBREXY_PUBLIC_HEADERS "include/rexy/traits.hpp" "include/rexy/steal.hpp" "include/rexy/binary.hpp" "include/rexy/expression.hpp" "include/rexy/binary.tpp" "include/rexy/string_base.hpp" "include/rexy/string.hpp" "include/rexy/filerd.hpp" "include/rexy/string_base.tpp") +set(LIBREXY_PUBLIC_HEADERS "include/rexy/traits.hpp" "include/rexy/steal.hpp" "include/rexy/binary.hpp" "include/rexy/expression.hpp" "include/rexy/binary.tpp" "include/rexy/string_base.hpp" "include/rexy/string.hpp" "include/rexy/filerd.hpp" "include/rexy/string_base.tpp" "include/rexy/threadpool.hpp") target_compile_options(rexy PRIVATE -Wall -Wextra -pedantic -std=c++17) install(TARGETS rexy diff --git a/include/rexy/threadpool.hpp b/include/rexy/threadpool.hpp new file mode 100644 index 0000000..0f50913 --- /dev/null +++ b/include/rexy/threadpool.hpp @@ -0,0 +1,70 @@ +#ifndef REXY_THREADPOOL_HPP +#define REXY_THREADPOOL_HPP + +#include +#include +#include +#include //unique_lock, scoped_lock +#include //shared_mutex, shared_lock +#include //atomic_bool +#include //shared_ptr +#include //future, packaged_task +#include //function, bind +#include //move, forward + +namespace rexy{ + + class threadpool + { + private: + using mutex_t = std::shared_mutex; + using write_lock_t = std::unique_lock; + using read_lock_t = std::shared_lock; + + private: + mutable read_lock_t m_ctor_lock; + mutable std::condition_variable_any m_qcv; + mutable mutex_t m_qlk; + + std::vector m_workers; + std::queue> m_jobs; + std::atomic_bool m_valid = true; + + public: + explicit threadpool(int numthreads = std::thread::hardware_concurrency()); + threadpool(const threadpool&); + threadpool(threadpool&&); + ~threadpool(void); + + threadpool& operator=(const threadpool&) = delete; + threadpool& operator=(threadpool&&) = delete; + + template + auto add_job(Func&& f, Args&&... args) -> std::future(f)(std::forward(args)...))>; + + private: + void worker_loop(void); + }; + + template + auto threadpool::add_job(Func&& f, Args&&... args) -> std::future(f)(std::forward(args)...))>{ + using return_t = decltype(std::forward(f)(std::forward(args)...)); + using task_t = std::packaged_task; + + //shared pointer to a packaged task which takes no arguments in operator() + std::shared_ptr task_ptr = std::make_shared(std::bind(std::forward(f), std::forward(args)...)); + + { + write_lock_t lk(m_qlk); + //make a copy of the shared pointer by capturing by-value + m_jobs.emplace([task_ptr]{(*task_ptr)();}); + } + + //wakeup a worker to run the job + m_qcv.notify_one(); + return task_ptr->get_future(); + } + +} + +#endif diff --git a/src/threadpool.cpp b/src/threadpool.cpp new file mode 100644 index 0000000..ecd05f8 --- /dev/null +++ b/src/threadpool.cpp @@ -0,0 +1,58 @@ +#include "rexy/threadpool.hpp" + +#include //move, forward + +namespace rexy{ + + threadpool::threadpool(int numthreads){ + for(int i = 0;i < numthreads;++i){ + m_workers.emplace_back(&threadpool::worker_loop, this); + } + } + void threadpool::worker_loop(void){ + while(m_valid){ + write_lock_t lk(m_qlk); + //wait for wakeup, continue if the threadpool is dead or there is a job to run + m_qcv.wait(lk, [this]{return !m_valid || !m_jobs.empty();}); + + //if the threadpool is dead, kill ourself + if(!m_valid){ + return; + } + + //steal job from queue + auto job = std::move(m_jobs.front()); + m_jobs.pop(); + + lk.unlock(); + job(); + } + } + threadpool::threadpool(const threadpool& other): + m_ctor_lock(other.m_qlk), + m_jobs(other.m_jobs), + m_valid(true) + { + for(size_t i = 0;i < other.m_workers.size();++i){ + m_workers.emplace_back(&threadpool::worker_loop, this); + } + m_qcv.notify_all(); + } + threadpool::threadpool(threadpool&& other): + m_ctor_lock(other.m_qlk), + m_workers(std::move(other.m_workers)), + m_jobs(std::move(other.m_jobs)), + m_valid(other.m_valid.exchange(false)) + { + m_ctor_lock.unlock(); + } + threadpool::~threadpool(void){ + m_valid = false; + //wakeup all workers and end them + m_qcv.notify_all(); + for(auto& thread : m_workers){ + thread.join(); + } + } + +}