From 61a021ce4ad80832dd11ad34ecbed18d8f7e3732 Mon Sep 17 00:00:00 2001 From: Eidolon Date: Sat, 14 Oct 2023 15:25:47 -0500 Subject: [PATCH] Add SPMC queue + thread pool job executor --- src/core/CMakeLists.txt | 3 + src/core/spmc_queue.hpp | 192 ++++++++++++++++++++++ src/core/thread_pool.cpp | 345 +++++++++++++++++++++++++++++++++++++++ src/core/thread_pool.h | 162 ++++++++++++++++++ 4 files changed, 702 insertions(+) create mode 100644 src/core/spmc_queue.hpp create mode 100644 src/core/thread_pool.cpp create mode 100644 src/core/thread_pool.h diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 1a779b961..a5d0b521a 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -1,5 +1,8 @@ target_sources(SRB2SDL2 PRIVATE memory.cpp memory.h + spmc_queue.hpp static_vec.hpp + thread_pool.cpp + thread_pool.h ) diff --git a/src/core/spmc_queue.hpp b/src/core/spmc_queue.hpp new file mode 100644 index 000000000..9441422d9 --- /dev/null +++ b/src/core/spmc_queue.hpp @@ -0,0 +1,192 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +// This class is derived from Conor Williams' implementation of a concurrent deque +// https://github.com/ConorWilliams/ConcurrentDeque +// Copyright (C) 2021 Conor Williams + +// The original version was for C++23. This one has been shrunk slightly and adapted to our conventions. + +#ifndef __SRB2_CORE_SPMC_QUEUE_HPP__ +#define __SRB2_CORE_SPMC_QUEUE_HPP__ + +#include +#include +#include +#include +#include + +#include + +#include "../cxxutil.hpp" + +namespace srb2 +{ + +template +class SpMcQueue +{ + struct RingBuff { + public: + explicit RingBuff(int64_t cap) : _cap{cap}, _mask{cap - 1} + { + SRB2_ASSERT(cap && (!(cap & (cap - 1))) && "Capacity must be buf power of 2!"); + _buff = std::unique_ptr(new T[_cap]); + } + + std::int64_t capacity() const noexcept { return _cap; } + + // Store (copy) at modulo index + void store(int64_t i, T&& x) noexcept + { + _buff[i & _mask] = std::move(x); + } + + // Load (copy) at modulo index + T load(int64_t i) const noexcept + { + return _buff[i & _mask]; + } + + // Allocates and returns a new ring buffer, copies elements in range [b, t) into the new buffer. + RingBuff* resize(std::int64_t b, std::int64_t t) const + { + ZoneScoped; + RingBuff* ptr = new RingBuff{2 * _cap}; + for (std::int64_t i = t; i != b; ++i) { + ptr->store(i, load(i)); + } + return ptr; + } + + private: + int64_t _cap; // Capacity of the buffer + int64_t _mask; // Bit mask to perform modulo capacity operations + + std::unique_ptr _buff; + }; + + alignas(64) std::atomic bottom_; + alignas(64) std::atomic top_; + alignas(64) std::atomic buffer_; + + std::vector> garbage_; + +public: + SpMcQueue(size_t capacity) : bottom_(0), top_(0), buffer_(new RingBuff(capacity)) + { + garbage_.reserve(32); + } + ~SpMcQueue() noexcept + { + delete buffer_.load(std::memory_order_relaxed); + }; + + size_t size() const noexcept + { + int64_t bottom = bottom_.load(std::memory_order_relaxed); + int64_t top = top_.load(std::memory_order_relaxed); + return static_cast(bottom >= top ? bottom - top : 0); + } + + bool empty() const noexcept + { + return size() == 0; + } + + void push(T&& v) noexcept + { + int64_t bottom = bottom_.load(std::memory_order_relaxed); + int64_t top = top_.load(std::memory_order_acquire); + RingBuff* buf = buffer_.load(std::memory_order_relaxed); + + if (buf->capacity() < (bottom - top) + 1) + { + // Queue is full, build a new one + RingBuff* newbuf = buf->resize(bottom, top); + garbage_.emplace_back(buf); + buffer_.store(newbuf, std::memory_order_relaxed); + buf = newbuf; + } + + // Construct new object, this does not have to be atomic as no one can steal this item until after we + // store the new value of bottom, ordering is maintained by surrounding atomics. + buf->store(bottom, std::move(v)); + + std::atomic_thread_fence(std::memory_order_release); + bottom_.store(bottom + 1, std::memory_order_relaxed); + } + + std::optional pop() noexcept + { + int64_t bottom = bottom_.load(std::memory_order_relaxed) - 1; + RingBuff* buf = buffer_.load(std::memory_order_relaxed); + + bottom_.store(bottom, std::memory_order_relaxed); // Stealers can no longer steal + + std::atomic_thread_fence(std::memory_order_seq_cst); + int64_t top = top_.load(std::memory_order_relaxed); + + if (top <= bottom) + { + // Non-empty deque + if (top == bottom) + { + // The last item could get stolen, by a stealer that loaded bottom before our write above + if (!top_.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) + { + // Failed race, thief got the last item. + bottom_.store(bottom + 1, std::memory_order_relaxed); + return std::nullopt; + } + bottom_.store(bottom + 1, std::memory_order_relaxed); + } + + // Can delay load until after acquiring slot as only this thread can push(), this load is not + // required to be atomic as we are the exclusive writer. + return buf->load(bottom); + + } + else + { + bottom_.store(bottom + 1, std::memory_order_relaxed); + return std::nullopt; + } + } + + std::optional steal() noexcept + { + int64_t top = top_.load(std::memory_order_acquire); + std::atomic_thread_fence(std::memory_order_seq_cst); + int64_t bottom = bottom_.load(std::memory_order_acquire); + + if (top < bottom) + { + // Must load *before* acquiring the slot as slot may be overwritten immediately after acquiring. + // This load is NOT required to be atomic even-though it may race with an overrite as we only + // return the value if we win the race below garanteeing we had no race during our read. If we + // loose the race then 'x' could be corrupt due to read-during-write race but as T is trivially + // destructible this does not matter. + T x = buffer_.load(std::memory_order_consume)->load(top); + + if (!top_.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) + { + return std::nullopt; + } + + return x; + + } + else + { + return std::nullopt; + } + } +}; + +} // namespace srb2 + +#endif // __SRB2_CORE_SPMC_QUEUE_HPP__ diff --git a/src/core/thread_pool.cpp b/src/core/thread_pool.cpp new file mode 100644 index 000000000..30dbd5dad --- /dev/null +++ b/src/core/thread_pool.cpp @@ -0,0 +1,345 @@ +// SONIC ROBO BLAST 2 +//----------------------------------------------------------------------------- +// Copyright (C) 2023 by Ronald "Eidolon" Kinard +// +// This program is free software distributed under the +// terms of the GNU General Public License, version 2. +// See the 'LICENSE' file for more details. +//----------------------------------------------------------------------------- + +#include "thread_pool.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "../cxxutil.hpp" +#include "../m_argv.h" + +using namespace srb2; + +static void do_work(ThreadPool::Task& work) +{ + try + { + ZoneScoped; + (work.thunk)(work.raw.data()); + } + catch (...) + { + // can't do anything + } + + (work.deleter)(work.raw.data()); + if (work.pseudosema) + { + work.pseudosema->fetch_sub(1, std::memory_order_relaxed); + } +} + +static void pool_executor( + int thread_index, + std::shared_ptr> pool_alive, + std::shared_ptr worker_ready_mutex, + std::shared_ptr worker_ready_condvar, + std::shared_ptr my_wq, + std::vector> other_wqs +) +{ + { + std::string thread_name = fmt::format("Thread Pool Thread {}", thread_index); + tracy::SetThreadName(thread_name.c_str()); + } + + int spins = 0; + while (true) + { + std::optional work = my_wq->steal(); + bool did_work = false; + if (work) + { + do_work(*work); + + did_work = true; + spins = 0; + } + else + { + for (auto& q : other_wqs) + { + work = q->steal(); + if (work) + { + do_work(*work); + + did_work = true; + spins = 0; + + // We only want to steal one work item at a time, to prioritize our own queue + break; + } + } + } + + if (!did_work) + { + // Spin a few loops to avoid yielding, then wait for the ready lock + spins += 1; + if (spins > 100) + { + std::unique_lock ready_lock {*worker_ready_mutex}; + while (my_wq->empty() && pool_alive->load()) + { + worker_ready_condvar->wait(ready_lock); + } + + if (!pool_alive->load()) + { + break; + } + } + } + } +} + +ThreadPool::ThreadPool() +{ + immediate_mode_ = true; +} + +ThreadPool::ThreadPool(size_t threads) +{ + next_queue_index_ = 0; + pool_alive_ = std::make_shared>(true); + + for (size_t i = 0; i < threads; i++) + { + std::shared_ptr wsq = std::make_shared(2048); + work_queues_.push_back(wsq); + + std::shared_ptr mutex = std::make_shared(); + worker_ready_mutexes_.push_back(std::move(mutex)); + std::shared_ptr condvar = std::make_shared(); + worker_ready_condvars_.push_back(std::move(condvar)); + } + + for (size_t i = 0; i < threads; i++) + { + std::shared_ptr my_queue = work_queues_[i]; + std::vector> other_queues; + for (size_t j = 0; j < threads; j++) + { + // Order the other queues starting from the next adjacent worker + // i.e. if this is worker 2 of 8, then other queues is 3, 4, 5, 6, 7, 0, 1 + // This tries to balance out work stealing behavior + + size_t other_index = j + i; + if (other_index >= threads) + { + other_index -= threads; + } + + if (other_index != i) + { + other_queues.push_back(work_queues_[other_index]); + } + } + + std::thread thread; + try + { + thread = std::thread + { + pool_executor, + i, + pool_alive_, + worker_ready_mutexes_[i], + worker_ready_condvars_[i], + my_queue, + other_queues + }; + } + catch (const std::system_error& error) + { + // Safe shutdown and rethrow + pool_alive_->store(false); + for (auto& t : threads_) + { + t.join(); + } + throw error; + } + + threads_.push_back(std::move(thread)); + } +} + +ThreadPool::ThreadPool(ThreadPool&&) = default; +ThreadPool::~ThreadPool() = default; + +ThreadPool& ThreadPool::operator=(ThreadPool&&) = default; + +void ThreadPool::begin_sema() +{ + sema_begun_ = true; +} + +ThreadPool::Sema ThreadPool::end_sema() +{ + Sema ret = Sema(std::move(cur_sema_)); + cur_sema_ = nullptr; + sema_begun_ = false; + return ret; +} + +void ThreadPool::notify() +{ + for (size_t i = 0; i < work_queues_.size(); i++) + { + auto& q = work_queues_[i]; + size_t count = q->size(); + if (count > 0) + { + worker_ready_condvars_[i]->notify_one(); + } + } +} + +void ThreadPool::notify_sema(const ThreadPool::Sema& sema) +{ + if (!sema.pseudosema_) + { + return; + } + notify(); +} + +void ThreadPool::wait_idle() +{ + if (immediate_mode_) + { + return; + } + + ZoneScoped; + + for (size_t i = 0; i < work_queues_.size(); i++) + { + auto& q = work_queues_[i]; + + std::optional work; + while ((work = q->pop()).has_value()) + { + do_work(*work); + } + } +} + +void ThreadPool::wait_sema(const Sema& sema) +{ + if (!sema.pseudosema_) + { + return; + } + + ZoneScoped; + + while (sema.pseudosema_->load(std::memory_order_seq_cst) > 0) + { + // spin to win + for (size_t i = 0; i < work_queues_.size(); i++) + { + auto& q = work_queues_[i]; + + std::optional work; + if ((work = q->pop()).has_value()) + { + do_work(*work); + break; + } + } + } + + if (sema.pseudosema_->load(std::memory_order_seq_cst) != 0) + { + throw std::exception(); + } +} + +void ThreadPool::shutdown() +{ + if (immediate_mode_) + { + return; + } + + wait_idle(); + + pool_alive_->store(false); + + for (auto& condvar : worker_ready_condvars_) + { + condvar->notify_all(); + } + for (auto& t : threads_) + { + t.join(); + } +} + +std::unique_ptr srb2::g_main_threadpool; + +void I_ThreadPoolInit(void) +{ + SRB2_ASSERT(g_main_threadpool == nullptr); + size_t thread_count = std::min(static_cast(9), std::thread::hardware_concurrency()); + if (thread_count > 1) + { + // The main thread will act as a worker when waiting for pool idle + // Make one less worker thread to avoid unnecessary context switching + thread_count -= 1; + } + + if (M_CheckParm("-singlethreaded")) + { + g_main_threadpool = std::make_unique(); + } + else + { + g_main_threadpool = std::make_unique(thread_count); + } +} + +void I_ThreadPoolShutdown(void) +{ + if (!g_main_threadpool) + { + return; + } + + g_main_threadpool->shutdown(); + g_main_threadpool = nullptr; +} + +void I_ThreadPoolSubmit(srb2cthunk_t thunk, void* data) +{ + SRB2_ASSERT(g_main_threadpool != nullptr); + + g_main_threadpool->schedule([=]() { + (thunk)(data); + }); + g_main_threadpool->notify(); +} + +void I_ThreadPoolWaitIdle(void) +{ + SRB2_ASSERT(g_main_threadpool != nullptr); + + g_main_threadpool->wait_idle(); +} diff --git a/src/core/thread_pool.h b/src/core/thread_pool.h new file mode 100644 index 000000000..f6d8241ec --- /dev/null +++ b/src/core/thread_pool.h @@ -0,0 +1,162 @@ +// SONIC ROBO BLAST 2 +//----------------------------------------------------------------------------- +// Copyright (C) 2023 by Ronald "Eidolon" Kinard +// +// This program is free software distributed under the +// terms of the GNU General Public License, version 2. +// See the 'LICENSE' file for more details. +//----------------------------------------------------------------------------- + +#ifndef __SRB2_CORE_THREAD_POOL_H__ +#define __SRB2_CORE_THREAD_POOL_H__ + +#include + +#ifdef __cplusplus + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "spmc_queue.hpp" + +namespace srb2 +{ + +class ThreadPool +{ +public: + struct Task + { + void (*thunk)(void*); + void (*deleter)(void*); + std::shared_ptr> pseudosema; + std::array>)> raw; + }; + + using Queue = SpMcQueue; + + class Sema + { + std::shared_ptr> pseudosema_; + + explicit Sema(std::shared_ptr> sema) : pseudosema_(sema) {} + + friend class ThreadPool; + public: + Sema() = default; + }; + +private: + std::shared_ptr> pool_alive_; + std::vector> worker_ready_mutexes_; + std::vector> worker_ready_condvars_; + std::vector> work_queues_; + std::vector threads_; + size_t next_queue_index_ = 0; + std::shared_ptr> cur_sema_; + + bool immediate_mode_ = false; + bool sema_begun_ = false; + +public: + ThreadPool(); + explicit ThreadPool(size_t threads); + ThreadPool(const ThreadPool&) = delete; + ThreadPool(ThreadPool&&); + ~ThreadPool(); + + ThreadPool& operator=(const ThreadPool&) = delete; + ThreadPool& operator=(ThreadPool&&); + + void begin_sema(); + ThreadPool::Sema end_sema(); + + /// Enqueue but don't notify + template void schedule(T&& thunk); + /// Notify threads after several schedules + void notify(); + void notify_sema(const Sema& sema); + void wait_idle(); + void wait_sema(const Sema& sema); + void shutdown(); +}; + +extern std::unique_ptr g_main_threadpool; + +template +void callable_caller(F* f) +{ + (*f)(); +} + +template +void callable_destroyer(F* f) +{ + f->~F(); +} + +template +void ThreadPool::schedule(T&& thunk) +{ + static_assert(sizeof(T) <= sizeof(std::declval().raw)); + + if (immediate_mode_) + { + (thunk)(); + return; + } + + if (sema_begun_) + { + if (cur_sema_ == nullptr) + { + cur_sema_ = std::make_shared>(0); + } + cur_sema_->fetch_add(1, std::memory_order_relaxed); + } + + size_t qi = next_queue_index_; + + { + std::shared_ptr q = work_queues_[qi]; + Task task; + task.thunk = reinterpret_cast(callable_caller); + task.deleter = reinterpret_cast(callable_destroyer); + task.pseudosema = cur_sema_; + new (reinterpret_cast(task.raw.data())) T(std::move(thunk)); + + q->push(std::move(task)); + } + // worker_ready_condvars_[qi]->notify_one(); + + next_queue_index_ += 1; + if (next_queue_index_ >= threads_.size()) + { + next_queue_index_ = 0; + } +} + +} // namespace srb2 + +extern "C" { + +#endif // __cplusplus + +typedef void (*srb2cthunk_t)(void*); + +void I_ThreadPoolInit(void); +void I_ThreadPoolShutdown(void); +void I_ThreadPoolSubmit(srb2cthunk_t thunk, void* data); +void I_ThreadPoolWaitIdle(void); + +#ifdef __cplusplus +} // extern "C" +#endif + +#endif // __SRB2_CORE_THREAD_POOL_H__