mirror of
https://github.com/N64Recomp/N64ModernRuntime.git
synced 2026-02-02 03:36:17 +00:00
222 lines
8.1 KiB
C++
222 lines
8.1 KiB
C++
#include <bitset>
|
|
#include <thread>
|
|
|
|
#include "blockingconcurrentqueue.h"
|
|
|
|
#include "ultramodern/ultra64.h"
|
|
#include "ultramodern/ultramodern.hpp"
|
|
|
|
struct QueuedMessage {
|
|
PTR(OSMesgQueue) mq;
|
|
OSMesg mesg;
|
|
bool jam;
|
|
bool requeue_if_blocked;
|
|
};
|
|
|
|
static moodycamel::BlockingConcurrentQueue<QueuedMessage> external_messages {};
|
|
std::bitset<32> requeue_enabled;
|
|
|
|
void ultramodern::set_message_queue_control(const ultramodern::MessageQueueControl& mqc) {
|
|
requeue_enabled.reset();
|
|
requeue_enabled.set(static_cast<int>(EventMessageSource::Timer), mqc.requeue_timer);
|
|
requeue_enabled.set(static_cast<int>(EventMessageSource::Sp), mqc.requeue_sp);
|
|
requeue_enabled.set(static_cast<int>(EventMessageSource::Si), mqc.requeue_si);
|
|
requeue_enabled.set(static_cast<int>(EventMessageSource::Ai), mqc.requeue_ai);
|
|
requeue_enabled.set(static_cast<int>(EventMessageSource::Vi), mqc.requeue_vi);
|
|
requeue_enabled.set(static_cast<int>(EventMessageSource::Pi), mqc.requeue_pi);
|
|
requeue_enabled.set(static_cast<int>(EventMessageSource::Dp), mqc.requeue_dp);
|
|
}
|
|
|
|
void ultramodern::enqueue_external_message_type(PTR(OSMesgQueue) mq, OSMesg msg, bool jam, EventMessageSource src) {
|
|
external_messages.enqueue({mq, msg, jam, requeue_enabled[static_cast<int>(src)]});
|
|
}
|
|
|
|
void ultramodern::enqueue_external_message(PTR(OSMesgQueue) mq, OSMesg msg, bool jam, bool requeue_if_blocked) {
|
|
external_messages.enqueue({mq, msg, jam, requeue_if_blocked});
|
|
}
|
|
|
|
bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block);
|
|
|
|
void dequeue_external_messages(RDRAM_ARG1) {
|
|
QueuedMessage to_send;
|
|
std::vector<QueuedMessage> requeued_messages{};
|
|
while (external_messages.try_dequeue(to_send)) {
|
|
if (!do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false) && to_send.requeue_if_blocked) {
|
|
requeued_messages.push_back(to_send);
|
|
}
|
|
}
|
|
for (QueuedMessage& cur_mesg : requeued_messages) {
|
|
external_messages.enqueue(cur_mesg);
|
|
}
|
|
}
|
|
|
|
void ultramodern::wait_for_external_message(RDRAM_ARG1) {
|
|
QueuedMessage to_send;
|
|
external_messages.wait_dequeue(to_send);
|
|
if (!do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false) && to_send.requeue_if_blocked) {
|
|
external_messages.enqueue(to_send);
|
|
}
|
|
}
|
|
|
|
void ultramodern::wait_for_external_message_timed(RDRAM_ARG u32 millis) {
|
|
QueuedMessage to_send;
|
|
if (external_messages.wait_dequeue_timed(to_send, std::chrono::milliseconds{millis})) {
|
|
if (!do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false) && to_send.requeue_if_blocked) {
|
|
external_messages.enqueue(to_send);
|
|
}
|
|
}
|
|
}
|
|
|
|
extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg, s32 count) {
|
|
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
|
|
mq->blocked_on_recv = NULLPTR;
|
|
mq->blocked_on_send = NULLPTR;
|
|
mq->msgCount = count;
|
|
mq->msg = msg;
|
|
mq->validCount = 0;
|
|
mq->first = 0;
|
|
}
|
|
|
|
s32 MQ_GET_COUNT(OSMesgQueue *mq) {
|
|
return mq->validCount;
|
|
}
|
|
|
|
s32 MQ_IS_EMPTY(OSMesgQueue *mq) {
|
|
return mq->validCount == 0;
|
|
}
|
|
|
|
s32 MQ_IS_FULL(OSMesgQueue* mq) {
|
|
return MQ_GET_COUNT(mq) >= mq->msgCount;
|
|
}
|
|
|
|
bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block) {
|
|
OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_);
|
|
if (!block) {
|
|
// If non-blocking, fail if the queue is full.
|
|
if (MQ_IS_FULL(mq)) {
|
|
return false;
|
|
}
|
|
}
|
|
else {
|
|
// Otherwise, yield this thread until the queue has room.
|
|
while (MQ_IS_FULL(mq)) {
|
|
debug_printf("[Message Queue] Thread %d is blocked on send\n", TO_PTR(OSThread, ultramodern::this_thread())->id);
|
|
ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_send), ultramodern::this_thread());
|
|
ultramodern::run_next_thread_and_wait(PASS_RDRAM1);
|
|
}
|
|
}
|
|
|
|
if (jam) {
|
|
// Jams insert at the head of the message queue's buffer.
|
|
mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount;
|
|
TO_PTR(OSMesg, mq->msg)[mq->first] = msg;
|
|
mq->validCount++;
|
|
}
|
|
else {
|
|
// Sends insert at the tail of the message queue's buffer.
|
|
s32 last = (mq->first + mq->validCount) % mq->msgCount;
|
|
TO_PTR(OSMesg, mq->msg)[last] = msg;
|
|
mq->validCount++;
|
|
}
|
|
|
|
// If any threads were blocked on receiving from this message queue, pop the first one and schedule it.
|
|
PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv);
|
|
if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) {
|
|
ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue));
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool do_recv(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, bool block) {
|
|
OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_);
|
|
if (!block) {
|
|
// If non-blocking, fail if the queue is empty
|
|
if (MQ_IS_EMPTY(mq)) {
|
|
return false;
|
|
}
|
|
} else {
|
|
// Otherwise, yield this thread in a loop until the queue is no longer full
|
|
while (MQ_IS_EMPTY(mq)) {
|
|
debug_printf("[Message Queue] Thread %d is blocked on receive\n", TO_PTR(OSThread, ultramodern::this_thread())->id);
|
|
ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv), ultramodern::this_thread());
|
|
ultramodern::run_next_thread_and_wait(PASS_RDRAM1);
|
|
}
|
|
}
|
|
|
|
if (msg_ != NULLPTR) {
|
|
*TO_PTR(OSMesg, msg_) = TO_PTR(OSMesg, mq->msg)[mq->first];
|
|
}
|
|
|
|
mq->first = (mq->first + 1) % mq->msgCount;
|
|
mq->validCount--;
|
|
|
|
// If any threads were blocked on sending to this message queue, pop the first one and schedule it.
|
|
PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_send);
|
|
if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) {
|
|
ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue));
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
|
|
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
|
|
bool jam = false;
|
|
|
|
// Don't directly send to the message queue if this isn't a game thread to avoid contention.
|
|
if (!ultramodern::is_game_thread()) {
|
|
ultramodern::enqueue_external_message(mq_, msg, jam, false);
|
|
return 0;
|
|
}
|
|
|
|
// Handle any messages that have been received from an external thread.
|
|
dequeue_external_messages(PASS_RDRAM1);
|
|
|
|
// Try to send the message.
|
|
bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK);
|
|
|
|
// Check the queue to see if this thread should swap execution to another.
|
|
ultramodern::check_running_queue(PASS_RDRAM1);
|
|
|
|
return sent ? 0 : -1;
|
|
}
|
|
|
|
extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
|
|
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
|
|
bool jam = true;
|
|
|
|
// Don't directly send to the message queue if this isn't a game thread to avoid contention.
|
|
if (!ultramodern::is_game_thread()) {
|
|
ultramodern::enqueue_external_message(mq_, msg, jam, false);
|
|
return 0;
|
|
}
|
|
|
|
// Handle any messages that have been received from an external thread.
|
|
dequeue_external_messages(PASS_RDRAM1);
|
|
|
|
// Try to send the message.
|
|
bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK);
|
|
|
|
// Check the queue to see if this thread should swap execution to another.
|
|
ultramodern::check_running_queue(PASS_RDRAM1);
|
|
|
|
return sent ? 0 : -1;
|
|
}
|
|
|
|
extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, s32 flags) {
|
|
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
|
|
|
|
assert(ultramodern::is_game_thread() && "RecvMesg not allowed outside of game threads.");
|
|
|
|
// Handle any messages that have been received from an external thread.
|
|
dequeue_external_messages(PASS_RDRAM1);
|
|
|
|
// Try to receive a message.
|
|
bool received = do_recv(PASS_RDRAM mq_, msg_, flags == OS_MESG_BLOCK);
|
|
|
|
// Check the queue to see if this thread should swap execution to another.
|
|
ultramodern::check_running_queue(PASS_RDRAM1);
|
|
|
|
return received ? 0 : -1;
|
|
}
|