// SONIC ROBO BLAST 2 //----------------------------------------------------------------------------- // Copyright (C) 2023 by Ronald "Eidolon" Kinard // // This program is free software distributed under the // terms of the GNU General Public License, version 2. // See the 'LICENSE' file for more details. //----------------------------------------------------------------------------- #ifndef __SRB2_CORE_THREAD_POOL_H__ #define __SRB2_CORE_THREAD_POOL_H__ #include #ifdef __cplusplus #include #include #include #include #include #include #include #include #include "spmc_queue.hpp" namespace srb2 { class ThreadPool { public: struct Task { void (*thunk)(void*); void (*deleter)(void*); std::shared_ptr> pseudosema; std::array>)> raw; }; using Queue = SpMcQueue; class Sema { std::shared_ptr> pseudosema_; explicit Sema(std::shared_ptr> sema) : pseudosema_(sema) {} friend class ThreadPool; public: Sema() = default; }; private: std::shared_ptr> pool_alive_; std::vector> worker_ready_mutexes_; std::vector> worker_ready_condvars_; std::vector> work_queues_; std::vector threads_; size_t next_queue_index_ = 0; std::shared_ptr> cur_sema_; bool immediate_mode_ = false; bool sema_begun_ = false; public: ThreadPool(); explicit ThreadPool(size_t threads); ThreadPool(const ThreadPool&) = delete; ThreadPool(ThreadPool&&); ~ThreadPool(); ThreadPool& operator=(const ThreadPool&) = delete; ThreadPool& operator=(ThreadPool&&); void begin_sema(); ThreadPool::Sema end_sema(); /// Enqueue but don't notify template void schedule(T&& thunk); /// Notify threads after several schedules void notify(); void notify_sema(const Sema& sema); void wait_idle(); void wait_sema(const Sema& sema); void shutdown(); }; extern std::unique_ptr g_main_threadpool; template void callable_caller(F* f) { (*f)(); } template void callable_destroyer(F* f) { f->~F(); } template void ThreadPool::schedule(T&& thunk) { static_assert(sizeof(T) <= sizeof(std::declval().raw)); if (immediate_mode_) { (thunk)(); return; } if (sema_begun_) { if (cur_sema_ == nullptr) { cur_sema_ = std::make_shared>(0); } cur_sema_->fetch_add(1, std::memory_order_relaxed); } size_t qi = next_queue_index_; { std::shared_ptr q = work_queues_[qi]; Task task; task.thunk = reinterpret_cast(callable_caller); task.deleter = reinterpret_cast(callable_destroyer); task.pseudosema = cur_sema_; new (reinterpret_cast(task.raw.data())) T(std::move(thunk)); q->push(std::move(task)); } // worker_ready_condvars_[qi]->notify_one(); next_queue_index_ += 1; if (next_queue_index_ >= threads_.size()) { next_queue_index_ = 0; } } } // namespace srb2 extern "C" { #endif // __cplusplus typedef void (*srb2cthunk_t)(void*); void I_ThreadPoolInit(void); void I_ThreadPoolShutdown(void); void I_ThreadPoolSubmit(srb2cthunk_t thunk, void* data); void I_ThreadPoolWaitIdle(void); #ifdef __cplusplus } // extern "C" #endif #endif // __SRB2_CORE_THREAD_POOL_H__