ultramodern: add external-requeue counter + accessor

Counts requeues at the three sites in dequeue_external_messages,
wait_for_external_message, and wait_for_external_message_timed. A
sustained nonzero rate means a target OSMesgQueue is being overrun —
the existing requeue mechanism prevents a softlock, but the
underlying receiver-thread starvation is worth investigating, so
giving runners a way to observe it is useful.

Adds:
  - static std::atomic<uint64_t> g_external_requeues
  - extern "C" uint64_t ultramodern_external_requeues(void) accessor
  - fetch_add at each of the three requeue sites (one bulk-add for
    the dequeue path which collects then bulk-requeues, single-adds
    for the two wait paths which requeue inline)

Zero impact on the hot path. Atomics fire only when the requeue path
fires; in normal operation that's never. The dequeue site uses a
single bulk fetch_add for the whole batch instead of N individual
adds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matthew Stanley 2026-05-05 22:20:18 -07:00
parent 0bb76b0fc7
commit 039586e853

View file

@ -1,3 +1,4 @@
#include <atomic>
#include <bitset>
#include <thread>
@ -16,6 +17,18 @@ struct QueuedMessage {
static moodycamel::BlockingConcurrentQueue<QueuedMessage> external_messages {};
std::bitset<32> requeue_enabled;
// Counter for how many external messages have been re-queued because
// their target OSMesgQueue was full at dequeue time. Bumped at the
// three requeue sites in dequeue_external_messages,
// wait_for_external_message, and wait_for_external_message_timed. A
// sustained nonzero rate indicates a target queue is being overrun
// (receiver thread starved); useful for runner-side observability.
static std::atomic<uint64_t> g_external_requeues{0};
extern "C" uint64_t ultramodern_external_requeues(void) {
return g_external_requeues.load(std::memory_order_relaxed);
}
void ultramodern::set_message_queue_control(const ultramodern::MessageQueueControl& mqc) {
requeue_enabled.reset();
requeue_enabled.set(static_cast<int>(EventMessageSource::Timer), mqc.requeue_timer);
@ -45,8 +58,11 @@ void dequeue_external_messages(RDRAM_ARG1) {
requeued_messages.push_back(to_send);
}
}
for (QueuedMessage& cur_mesg : requeued_messages) {
external_messages.enqueue(cur_mesg);
if (!requeued_messages.empty()) {
g_external_requeues.fetch_add(requeued_messages.size(), std::memory_order_relaxed);
for (QueuedMessage& cur_mesg : requeued_messages) {
external_messages.enqueue(cur_mesg);
}
}
}
@ -54,6 +70,7 @@ void ultramodern::wait_for_external_message(RDRAM_ARG1) {
QueuedMessage to_send;
external_messages.wait_dequeue(to_send);
if (!do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false) && to_send.requeue_if_blocked) {
g_external_requeues.fetch_add(1, std::memory_order_relaxed);
external_messages.enqueue(to_send);
}
}
@ -62,6 +79,7 @@ void ultramodern::wait_for_external_message_timed(RDRAM_ARG u32 millis) {
QueuedMessage to_send;
if (external_messages.wait_dequeue_timed(to_send, std::chrono::milliseconds{millis})) {
if (!do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false) && to_send.requeue_if_blocked) {
g_external_requeues.fetch_add(1, std::memory_order_relaxed);
external_messages.enqueue(to_send);
}
}