Add SPMC queue + thread pool job executor

This commit is contained in:
Eidolon 2023-10-14 15:25:47 -05:00
parent 010d917f16
commit 61a021ce4a
4 changed files with 702 additions and 0 deletions

View file

@ -1,5 +1,8 @@
target_sources(SRB2SDL2 PRIVATE target_sources(SRB2SDL2 PRIVATE
memory.cpp memory.cpp
memory.h memory.h
spmc_queue.hpp
static_vec.hpp static_vec.hpp
thread_pool.cpp
thread_pool.h
) )

192
src/core/spmc_queue.hpp Normal file
View file

@ -0,0 +1,192 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
// This class is derived from Conor Williams' implementation of a concurrent deque
// https://github.com/ConorWilliams/ConcurrentDeque
// Copyright (C) 2021 Conor Williams
// The original version was for C++23. This one has been shrunk slightly and adapted to our conventions.
#ifndef __SRB2_CORE_SPMC_QUEUE_HPP__
#define __SRB2_CORE_SPMC_QUEUE_HPP__
#include <array>
#include <atomic>
#include <cstddef>
#include <memory>
#include <optional>
#include <tracy/tracy/Tracy.hpp>
#include "../cxxutil.hpp"
namespace srb2
{
template <typename T>
class SpMcQueue
{
struct RingBuff {
public:
explicit RingBuff(int64_t cap) : _cap{cap}, _mask{cap - 1}
{
SRB2_ASSERT(cap && (!(cap & (cap - 1))) && "Capacity must be buf power of 2!");
_buff = std::unique_ptr<T[]>(new T[_cap]);
}
std::int64_t capacity() const noexcept { return _cap; }
// Store (copy) at modulo index
void store(int64_t i, T&& x) noexcept
{
_buff[i & _mask] = std::move(x);
}
// Load (copy) at modulo index
T load(int64_t i) const noexcept
{
return _buff[i & _mask];
}
// Allocates and returns a new ring buffer, copies elements in range [b, t) into the new buffer.
RingBuff* resize(std::int64_t b, std::int64_t t) const
{
ZoneScoped;
RingBuff* ptr = new RingBuff{2 * _cap};
for (std::int64_t i = t; i != b; ++i) {
ptr->store(i, load(i));
}
return ptr;
}
private:
int64_t _cap; // Capacity of the buffer
int64_t _mask; // Bit mask to perform modulo capacity operations
std::unique_ptr<T[]> _buff;
};
alignas(64) std::atomic<int64_t> bottom_;
alignas(64) std::atomic<int64_t> top_;
alignas(64) std::atomic<RingBuff*> buffer_;
std::vector<std::unique_ptr<RingBuff>> garbage_;
public:
SpMcQueue(size_t capacity) : bottom_(0), top_(0), buffer_(new RingBuff(capacity))
{
garbage_.reserve(32);
}
~SpMcQueue() noexcept
{
delete buffer_.load(std::memory_order_relaxed);
};
size_t size() const noexcept
{
int64_t bottom = bottom_.load(std::memory_order_relaxed);
int64_t top = top_.load(std::memory_order_relaxed);
return static_cast<size_t>(bottom >= top ? bottom - top : 0);
}
bool empty() const noexcept
{
return size() == 0;
}
void push(T&& v) noexcept
{
int64_t bottom = bottom_.load(std::memory_order_relaxed);
int64_t top = top_.load(std::memory_order_acquire);
RingBuff* buf = buffer_.load(std::memory_order_relaxed);
if (buf->capacity() < (bottom - top) + 1)
{
// Queue is full, build a new one
RingBuff* newbuf = buf->resize(bottom, top);
garbage_.emplace_back(buf);
buffer_.store(newbuf, std::memory_order_relaxed);
buf = newbuf;
}
// Construct new object, this does not have to be atomic as no one can steal this item until after we
// store the new value of bottom, ordering is maintained by surrounding atomics.
buf->store(bottom, std::move(v));
std::atomic_thread_fence(std::memory_order_release);
bottom_.store(bottom + 1, std::memory_order_relaxed);
}
std::optional<T> pop() noexcept
{
int64_t bottom = bottom_.load(std::memory_order_relaxed) - 1;
RingBuff* buf = buffer_.load(std::memory_order_relaxed);
bottom_.store(bottom, std::memory_order_relaxed); // Stealers can no longer steal
std::atomic_thread_fence(std::memory_order_seq_cst);
int64_t top = top_.load(std::memory_order_relaxed);
if (top <= bottom)
{
// Non-empty deque
if (top == bottom)
{
// The last item could get stolen, by a stealer that loaded bottom before our write above
if (!top_.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
{
// Failed race, thief got the last item.
bottom_.store(bottom + 1, std::memory_order_relaxed);
return std::nullopt;
}
bottom_.store(bottom + 1, std::memory_order_relaxed);
}
// Can delay load until after acquiring slot as only this thread can push(), this load is not
// required to be atomic as we are the exclusive writer.
return buf->load(bottom);
}
else
{
bottom_.store(bottom + 1, std::memory_order_relaxed);
return std::nullopt;
}
}
std::optional<T> steal() noexcept
{
int64_t top = top_.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst);
int64_t bottom = bottom_.load(std::memory_order_acquire);
if (top < bottom)
{
// Must load *before* acquiring the slot as slot may be overwritten immediately after acquiring.
// This load is NOT required to be atomic even-though it may race with an overrite as we only
// return the value if we win the race below garanteeing we had no race during our read. If we
// loose the race then 'x' could be corrupt due to read-during-write race but as T is trivially
// destructible this does not matter.
T x = buffer_.load(std::memory_order_consume)->load(top);
if (!top_.compare_exchange_strong(top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
{
return std::nullopt;
}
return x;
}
else
{
return std::nullopt;
}
}
};
} // namespace srb2
#endif // __SRB2_CORE_SPMC_QUEUE_HPP__

345
src/core/thread_pool.cpp Normal file
View file

@ -0,0 +1,345 @@
// SONIC ROBO BLAST 2
//-----------------------------------------------------------------------------
// Copyright (C) 2023 by Ronald "Eidolon" Kinard
//
// This program is free software distributed under the
// terms of the GNU General Public License, version 2.
// See the 'LICENSE' file for more details.
//-----------------------------------------------------------------------------
#include "thread_pool.h"
#include <algorithm>
#include <condition_variable>
#include <exception>
#include <mutex>
#include <string>
#include <system_error>
#include <fmt/format.h>
#include <tracy/tracy/Tracy.hpp>
#include "../cxxutil.hpp"
#include "../m_argv.h"
using namespace srb2;
static void do_work(ThreadPool::Task& work)
{
try
{
ZoneScoped;
(work.thunk)(work.raw.data());
}
catch (...)
{
// can't do anything
}
(work.deleter)(work.raw.data());
if (work.pseudosema)
{
work.pseudosema->fetch_sub(1, std::memory_order_relaxed);
}
}
static void pool_executor(
int thread_index,
std::shared_ptr<std::atomic<bool>> pool_alive,
std::shared_ptr<std::mutex> worker_ready_mutex,
std::shared_ptr<std::condition_variable> worker_ready_condvar,
std::shared_ptr<ThreadPool::Queue> my_wq,
std::vector<std::shared_ptr<ThreadPool::Queue>> other_wqs
)
{
{
std::string thread_name = fmt::format("Thread Pool Thread {}", thread_index);
tracy::SetThreadName(thread_name.c_str());
}
int spins = 0;
while (true)
{
std::optional<ThreadPool::Task> work = my_wq->steal();
bool did_work = false;
if (work)
{
do_work(*work);
did_work = true;
spins = 0;
}
else
{
for (auto& q : other_wqs)
{
work = q->steal();
if (work)
{
do_work(*work);
did_work = true;
spins = 0;
// We only want to steal one work item at a time, to prioritize our own queue
break;
}
}
}
if (!did_work)
{
// Spin a few loops to avoid yielding, then wait for the ready lock
spins += 1;
if (spins > 100)
{
std::unique_lock<std::mutex> ready_lock {*worker_ready_mutex};
while (my_wq->empty() && pool_alive->load())
{
worker_ready_condvar->wait(ready_lock);
}
if (!pool_alive->load())
{
break;
}
}
}
}
}
ThreadPool::ThreadPool()
{
immediate_mode_ = true;
}
ThreadPool::ThreadPool(size_t threads)
{
next_queue_index_ = 0;
pool_alive_ = std::make_shared<std::atomic<bool>>(true);
for (size_t i = 0; i < threads; i++)
{
std::shared_ptr<Queue> wsq = std::make_shared<Queue>(2048);
work_queues_.push_back(wsq);
std::shared_ptr<std::mutex> mutex = std::make_shared<std::mutex>();
worker_ready_mutexes_.push_back(std::move(mutex));
std::shared_ptr<std::condition_variable> condvar = std::make_shared<std::condition_variable>();
worker_ready_condvars_.push_back(std::move(condvar));
}
for (size_t i = 0; i < threads; i++)
{
std::shared_ptr<Queue> my_queue = work_queues_[i];
std::vector<std::shared_ptr<Queue>> other_queues;
for (size_t j = 0; j < threads; j++)
{
// Order the other queues starting from the next adjacent worker
// i.e. if this is worker 2 of 8, then other queues is 3, 4, 5, 6, 7, 0, 1
// This tries to balance out work stealing behavior
size_t other_index = j + i;
if (other_index >= threads)
{
other_index -= threads;
}
if (other_index != i)
{
other_queues.push_back(work_queues_[other_index]);
}
}
std::thread thread;
try
{
thread = std::thread
{
pool_executor,
i,
pool_alive_,
worker_ready_mutexes_[i],
worker_ready_condvars_[i],
my_queue,
other_queues
};
}
catch (const std::system_error& error)
{
// Safe shutdown and rethrow
pool_alive_->store(false);
for (auto& t : threads_)
{
t.join();
}
throw error;
}
threads_.push_back(std::move(thread));
}
}
ThreadPool::ThreadPool(ThreadPool&&) = default;
ThreadPool::~ThreadPool() = default;
ThreadPool& ThreadPool::operator=(ThreadPool&&) = default;
void ThreadPool::begin_sema()
{
sema_begun_ = true;
}
ThreadPool::Sema ThreadPool::end_sema()
{
Sema ret = Sema(std::move(cur_sema_));
cur_sema_ = nullptr;
sema_begun_ = false;
return ret;
}
void ThreadPool::notify()
{
for (size_t i = 0; i < work_queues_.size(); i++)
{
auto& q = work_queues_[i];
size_t count = q->size();
if (count > 0)
{
worker_ready_condvars_[i]->notify_one();
}
}
}
void ThreadPool::notify_sema(const ThreadPool::Sema& sema)
{
if (!sema.pseudosema_)
{
return;
}
notify();
}
void ThreadPool::wait_idle()
{
if (immediate_mode_)
{
return;
}
ZoneScoped;
for (size_t i = 0; i < work_queues_.size(); i++)
{
auto& q = work_queues_[i];
std::optional<Task> work;
while ((work = q->pop()).has_value())
{
do_work(*work);
}
}
}
void ThreadPool::wait_sema(const Sema& sema)
{
if (!sema.pseudosema_)
{
return;
}
ZoneScoped;
while (sema.pseudosema_->load(std::memory_order_seq_cst) > 0)
{
// spin to win
for (size_t i = 0; i < work_queues_.size(); i++)
{
auto& q = work_queues_[i];
std::optional<Task> work;
if ((work = q->pop()).has_value())
{
do_work(*work);
break;
}
}
}
if (sema.pseudosema_->load(std::memory_order_seq_cst) != 0)
{
throw std::exception();
}
}
void ThreadPool::shutdown()
{
if (immediate_mode_)
{
return;
}
wait_idle();
pool_alive_->store(false);
for (auto& condvar : worker_ready_condvars_)
{
condvar->notify_all();
}
for (auto& t : threads_)
{
t.join();
}
}
std::unique_ptr<ThreadPool> srb2::g_main_threadpool;
void I_ThreadPoolInit(void)
{
SRB2_ASSERT(g_main_threadpool == nullptr);
size_t thread_count = std::min(static_cast<unsigned int>(9), std::thread::hardware_concurrency());
if (thread_count > 1)
{
// The main thread will act as a worker when waiting for pool idle
// Make one less worker thread to avoid unnecessary context switching
thread_count -= 1;
}
if (M_CheckParm("-singlethreaded"))
{
g_main_threadpool = std::make_unique<ThreadPool>();
}
else
{
g_main_threadpool = std::make_unique<ThreadPool>(thread_count);
}
}
void I_ThreadPoolShutdown(void)
{
if (!g_main_threadpool)
{
return;
}
g_main_threadpool->shutdown();
g_main_threadpool = nullptr;
}
void I_ThreadPoolSubmit(srb2cthunk_t thunk, void* data)
{
SRB2_ASSERT(g_main_threadpool != nullptr);
g_main_threadpool->schedule([=]() {
(thunk)(data);
});
g_main_threadpool->notify();
}
void I_ThreadPoolWaitIdle(void)
{
SRB2_ASSERT(g_main_threadpool != nullptr);
g_main_threadpool->wait_idle();
}

162
src/core/thread_pool.h Normal file
View file

@ -0,0 +1,162 @@
// SONIC ROBO BLAST 2
//-----------------------------------------------------------------------------
// Copyright (C) 2023 by Ronald "Eidolon" Kinard
//
// This program is free software distributed under the
// terms of the GNU General Public License, version 2.
// See the 'LICENSE' file for more details.
//-----------------------------------------------------------------------------
#ifndef __SRB2_CORE_THREAD_POOL_H__
#define __SRB2_CORE_THREAD_POOL_H__
#include <stddef.h>
#ifdef __cplusplus
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
#include "spmc_queue.hpp"
namespace srb2
{
class ThreadPool
{
public:
struct Task
{
void (*thunk)(void*);
void (*deleter)(void*);
std::shared_ptr<std::atomic<uint32_t>> pseudosema;
std::array<std::byte, 512 - sizeof(void(*)(void*)) * 2 - sizeof(std::shared_ptr<std::atomic<uint32_t>>)> raw;
};
using Queue = SpMcQueue<Task>;
class Sema
{
std::shared_ptr<std::atomic<uint32_t>> pseudosema_;
explicit Sema(std::shared_ptr<std::atomic<uint32_t>> sema) : pseudosema_(sema) {}
friend class ThreadPool;
public:
Sema() = default;
};
private:
std::shared_ptr<std::atomic<bool>> pool_alive_;
std::vector<std::shared_ptr<std::mutex>> worker_ready_mutexes_;
std::vector<std::shared_ptr<std::condition_variable>> worker_ready_condvars_;
std::vector<std::shared_ptr<Queue>> work_queues_;
std::vector<std::thread> threads_;
size_t next_queue_index_ = 0;
std::shared_ptr<std::atomic<uint32_t>> cur_sema_;
bool immediate_mode_ = false;
bool sema_begun_ = false;
public:
ThreadPool();
explicit ThreadPool(size_t threads);
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&);
~ThreadPool();
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&);
void begin_sema();
ThreadPool::Sema end_sema();
/// Enqueue but don't notify
template <typename T> void schedule(T&& thunk);
/// Notify threads after several schedules
void notify();
void notify_sema(const Sema& sema);
void wait_idle();
void wait_sema(const Sema& sema);
void shutdown();
};
extern std::unique_ptr<ThreadPool> g_main_threadpool;
template <typename F>
void callable_caller(F* f)
{
(*f)();
}
template <typename F>
void callable_destroyer(F* f)
{
f->~F();
}
template <typename T>
void ThreadPool::schedule(T&& thunk)
{
static_assert(sizeof(T) <= sizeof(std::declval<Task>().raw));
if (immediate_mode_)
{
(thunk)();
return;
}
if (sema_begun_)
{
if (cur_sema_ == nullptr)
{
cur_sema_ = std::make_shared<std::atomic<uint32_t>>(0);
}
cur_sema_->fetch_add(1, std::memory_order_relaxed);
}
size_t qi = next_queue_index_;
{
std::shared_ptr<Queue> q = work_queues_[qi];
Task task;
task.thunk = reinterpret_cast<void(*)(void*)>(callable_caller<T>);
task.deleter = reinterpret_cast<void(*)(void*)>(callable_destroyer<T>);
task.pseudosema = cur_sema_;
new (reinterpret_cast<T*>(task.raw.data())) T(std::move(thunk));
q->push(std::move(task));
}
// worker_ready_condvars_[qi]->notify_one();
next_queue_index_ += 1;
if (next_queue_index_ >= threads_.size())
{
next_queue_index_ = 0;
}
}
} // namespace srb2
extern "C" {
#endif // __cplusplus
typedef void (*srb2cthunk_t)(void*);
void I_ThreadPoolInit(void);
void I_ThreadPoolShutdown(void);
void I_ThreadPoolSubmit(srb2cthunk_t thunk, void* data);
void I_ThreadPoolWaitIdle(void);
#ifdef __cplusplus
} // extern "C"
#endif
#endif // __SRB2_CORE_THREAD_POOL_H__