diff --git a/ultramodern/src/mesgqueue.cpp b/ultramodern/src/mesgqueue.cpp index 9944a02..7714b8f 100644 --- a/ultramodern/src/mesgqueue.cpp +++ b/ultramodern/src/mesgqueue.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -16,6 +17,18 @@ struct QueuedMessage { static moodycamel::BlockingConcurrentQueue external_messages {}; std::bitset<32> requeue_enabled; +// Counter for how many external messages have been re-queued because +// their target OSMesgQueue was full at dequeue time. Bumped at the +// three requeue sites in dequeue_external_messages, +// wait_for_external_message, and wait_for_external_message_timed. A +// sustained nonzero rate indicates a target queue is being overrun +// (receiver thread starved); useful for runner-side observability. +static std::atomic g_external_requeues{0}; + +extern "C" uint64_t ultramodern_external_requeues(void) { + return g_external_requeues.load(std::memory_order_relaxed); +} + void ultramodern::set_message_queue_control(const ultramodern::MessageQueueControl& mqc) { requeue_enabled.reset(); requeue_enabled.set(static_cast(EventMessageSource::Timer), mqc.requeue_timer); @@ -45,8 +58,11 @@ void dequeue_external_messages(RDRAM_ARG1) { requeued_messages.push_back(to_send); } } - for (QueuedMessage& cur_mesg : requeued_messages) { - external_messages.enqueue(cur_mesg); + if (!requeued_messages.empty()) { + g_external_requeues.fetch_add(requeued_messages.size(), std::memory_order_relaxed); + for (QueuedMessage& cur_mesg : requeued_messages) { + external_messages.enqueue(cur_mesg); + } } } @@ -54,6 +70,7 @@ void ultramodern::wait_for_external_message(RDRAM_ARG1) { QueuedMessage to_send; external_messages.wait_dequeue(to_send); if (!do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false) && to_send.requeue_if_blocked) { + g_external_requeues.fetch_add(1, std::memory_order_relaxed); external_messages.enqueue(to_send); } } @@ -62,6 +79,7 @@ void ultramodern::wait_for_external_message_timed(RDRAM_ARG u32 millis) { QueuedMessage to_send; if (external_messages.wait_dequeue_timed(to_send, std::chrono::milliseconds{millis})) { if (!do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false) && to_send.requeue_if_blocked) { + g_external_requeues.fetch_add(1, std::memory_order_relaxed); external_messages.enqueue(to_send); } }