From af60b53ae3af1ebb5f4fde893b9ce4e8567a1eed Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Sun, 28 Jun 2026 19:31:35 -0700 Subject: [PATCH 1/5] core_btree_iterator_init ready race hygiene Signed-off-by: Rob Johnson --- src/core.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/core.c b/src/core.c index b30c01cb..f4e01aa2 100644 --- a/src/core.c +++ b/src/core.c @@ -854,7 +854,7 @@ static void core_btree_iterator_init_async_callback(void *arg) { core_btree_iterator_init_async_context *ctxt = arg; - ctxt->ready = TRUE; + __atomic_store_n(&ctxt->ready, TRUE, __ATOMIC_RELEASE); } static platform_status @@ -890,8 +890,8 @@ core_start_btree_iterator_init_async( prefetch_lookahead, core_btree_iterator_init_async_callback, ctxt); - ctxt->ready = FALSE; - ctxt->done = FALSE; + __atomic_store_n(&ctxt->ready, FALSE, __ATOMIC_RELAXED); + ctxt->done = FALSE; if (btree_iterator_init_async(&ctxt->state) == ASYNC_STATUS_DONE) { ctxt->done = TRUE; @@ -922,11 +922,13 @@ core_drain_btree_iterator_init_async( while (done_count < num_inits) { bool32 made_progress = FALSE; for (uint64 i = 0; i < num_inits; i++) { - if (ctxt[i].done || !ctxt[i].ready) { + if (ctxt[i].done + || !__atomic_exchange_n( + &ctxt[i].ready, FALSE, __ATOMIC_ACQUIRE)) + { continue; } - ctxt[i].ready = FALSE; made_progress = TRUE; if (btree_iterator_init_async(&ctxt[i].state) == ASYNC_STATUS_DONE) { ctxt[i].done = TRUE; From 8d7c5218fb152d854844f72f6d11ec63ec976935 Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Sun, 28 Jun 2026 23:43:16 -0700 Subject: [PATCH 2/5] move core btree async init contexts to shared memory Signed-off-by: Rob Johnson --- src/btree.c | 28 ++++++--------- src/clockcache.c | 4 +-- src/core.c | 12 +++++-- src/platform_linux/async.h | 71 +++++++++++++++++++++++++++++++++++--- 4 files changed, 87 insertions(+), 28 deletions(-) diff --git a/src/btree.c b/src/btree.c index 6299f92b..e93b0c50 100644 --- a/src/btree.c +++ b/src/btree.c @@ -3489,24 +3489,16 @@ find_btree_node_and_get_idx_bounds_async(btree_iterator_async_state *state, { btree_node_unget( state->itor->cc, state->itor->cfg, &state->itor->curr); - debug_assert(state->callback != NULL); - async_yield_after(state, state->callback(state->callback_arg)); - btree_lookup_async_state_init(&state->lookup_state, - state->itor->cc, - state->itor->cfg, - state->itor->root_addr, - state->itor->page_type, - state->target, - NULL, - state->callback, - state->callback_arg); - state->lookup_state.stop_at_height = state->itor->height; - state->lookup_state.stats = NULL; - state->lookup_state.get_node = TRUE; - async_await(state, - btree_lookup_node_async(&state->lookup_state, 0) - == ASYNC_STATUS_DONE); - state->itor->curr = state->lookup_state.node; + platform_status rc = btree_lookup_node(state->itor->cc, + state->itor->cfg, + state->itor->root_addr, + state->target, + state->itor->height, + state->itor->page_type, + TRUE, + &state->itor->curr, + NULL); + platform_assert_status_ok(rc); } } diff --git a/src/clockcache.c b/src/clockcache.c index 2fe92b3e..e30c65b0 100644 --- a/src/clockcache.c +++ b/src/clockcache.c @@ -1986,9 +1986,7 @@ clockcache_get_from_disk_async(clockcache_get_async_state *state, uint64 depth) state->io_start_time = platform_get_timestamp(); } - while (io_async_run(state->iostate) != ASYNC_STATUS_DONE) { - async_yield(state); - } + async_await(state, io_async_run(state->iostate) == ASYNC_STATUS_DONE); state->rc = io_async_state_get_result(state->iostate); if (!SUCCESS(state->rc)) { platform_error_log("clockcache_get_from_disk_async: async read failed " diff --git a/src/core.c b/src/core.c index f4e01aa2..5f6bd3dc 100644 --- a/src/core.c +++ b/src/core.c @@ -1171,8 +1171,14 @@ core_range_iterator_init(core_handle *spl, core_btree_iterator_init_async_context *init_ctxt = NULL; if (range_itor->num_branches != 0) { - init_ctxt = TYPED_ARRAY_ZALLOC( - PROCESS_PRIVATE_HEAP_ID, init_ctxt, range_itor->num_branches); + /* + * Async cache-load waiters embedded in these contexts can be released by + * another process when the clockcache is shared. The callback only marks + * ctxt->ready, so keep the context itself in the Splinter heap; the owning + * process remains responsible for resuming the iterator state. + */ + init_ctxt = + TYPED_ARRAY_ZALLOC(spl->heap_id, init_ctxt, range_itor->num_branches); } if (range_itor->num_branches != 0 && init_ctxt == NULL) { core_range_iterator_deinit(range_itor); @@ -1236,7 +1242,7 @@ core_range_iterator_init(core_handle *spl, } } if (init_ctxt != NULL) { - platform_free(PROCESS_PRIVATE_HEAP_ID, init_ctxt); + platform_free(spl->heap_id, init_ctxt); } if (!SUCCESS(rc)) { core_range_iterator_deinit(range_itor); diff --git a/src/platform_linux/async.h b/src/platform_linux/async.h index 92ffee94..6b408cc4 100644 --- a/src/platform_linux/async.h +++ b/src/platform_linux/async.h @@ -239,17 +239,24 @@ typedef void (*async_callback_fn)(void *); /* * Wait queues for exections awaiting some condition. */ +typedef struct async_wait_queue async_wait_queue; + +#define ASYNC_WAITER_MAGIC_QUEUED 0x4153594e43575144ULL +#define ASYNC_WAITER_MAGIC_IDLE 0x4153594e43574944ULL + typedef struct async_waiter { struct async_waiter *next; async_callback_fn callback; void *callback_arg; + async_wait_queue *queue; + uint64 magic; } async_waiter; -typedef struct async_wait_queue { +struct async_wait_queue { volatile uint64 lock; volatile async_waiter *head; async_waiter *tail; -} async_wait_queue; +}; static inline void async_wait_queue_init(async_wait_queue *queue) @@ -293,9 +300,21 @@ async_wait_queue_append(async_wait_queue *q, async_callback_fn callback, void *callback_arg) { + platform_assert(q != NULL); + platform_assert(waiter != NULL); + platform_assert(callback != NULL); + platform_assert(callback_arg != NULL); + platform_assert(waiter->magic != ASYNC_WAITER_MAGIC_QUEUED, + "waiter %p is already queued on %p; appending to %p\n", + waiter, + waiter->queue, + q); + waiter->callback = callback; waiter->callback_arg = callback_arg; waiter->next = NULL; + waiter->queue = q; + waiter->magic = ASYNC_WAITER_MAGIC_QUEUED; async_waiter *result; if (q->head == NULL) { @@ -309,12 +328,47 @@ async_wait_queue_append(async_wait_queue *q, return result; } +static inline void +async_waiter_assert_queued(async_wait_queue *queue, async_waiter *waiter) +{ + platform_assert(waiter != NULL); + platform_assert(waiter->magic == ASYNC_WAITER_MAGIC_QUEUED, + "waiter %p has bad magic 0x%lx, queue %p, expected queue " + "%p, next %p, callback %p, callback_arg %p\n", + waiter, + waiter->magic, + waiter->queue, + queue, + waiter->next, + waiter->callback, + waiter->callback_arg); + platform_assert(waiter->queue == queue, + "waiter %p belongs to queue %p, expected queue %p\n", + waiter, + waiter->queue, + queue); + platform_assert(waiter->callback != NULL); + platform_assert(waiter->callback_arg != NULL); +} + +static inline void +async_waiter_mark_idle(async_waiter *waiter) +{ + waiter->next = NULL; + waiter->callback = NULL; + waiter->callback_arg = NULL; + waiter->queue = NULL; + waiter->magic = ASYNC_WAITER_MAGIC_IDLE; +} + static inline void async_wait_queue_remove(async_wait_queue *queue, async_waiter *pred, async_waiter *waiter) { + async_waiter_assert_queued(queue, waiter); if (pred != NULL) { + async_waiter_assert_queued(queue, pred); platform_assert(pred->next == waiter); pred->next = waiter->next; if (queue->tail == waiter) { @@ -327,6 +381,7 @@ async_wait_queue_remove(async_wait_queue *queue, queue->tail = NULL; } } + async_waiter_mark_idle(waiter); } /* Public: notify one waiter that the condition has become true. */ @@ -343,6 +398,7 @@ async_wait_queue_release_one(async_wait_queue *q) waiter = q->head; if (waiter) { + async_waiter_assert_queued(q, (async_waiter *)waiter); q->head = waiter->next; if (q->head == NULL) { q->tail = NULL; @@ -351,7 +407,10 @@ async_wait_queue_release_one(async_wait_queue *q) async_wait_queue_unlock(q); if (waiter) { - waiter->callback(waiter->callback_arg); + async_callback_fn callback = waiter->callback; + void *callback_arg = waiter->callback_arg; + async_waiter_mark_idle((async_waiter *)waiter); + callback(callback_arg); } } @@ -372,8 +431,12 @@ async_wait_queue_release_all(async_wait_queue *q) async_wait_queue_unlock(q); while (waiter != NULL) { + async_waiter_assert_queued(q, (async_waiter *)waiter); async_waiter *next = waiter->next; - waiter->callback(waiter->callback_arg); + async_callback_fn callback = waiter->callback; + void *callback_arg = waiter->callback_arg; + async_waiter_mark_idle((async_waiter *)waiter); + callback(callback_arg); waiter = next; } } From 207ae4a87bca90baf1dacf1735a70e286548106a Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Sun, 28 Jun 2026 23:48:08 -0700 Subject: [PATCH 3/5] formatting Signed-off-by: Rob Johnson --- src/core.c | 7 +++---- src/platform_linux/async.h | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/core.c b/src/core.c index 5f6bd3dc..d499f24a 100644 --- a/src/core.c +++ b/src/core.c @@ -923,8 +923,7 @@ core_drain_btree_iterator_init_async( bool32 made_progress = FALSE; for (uint64 i = 0; i < num_inits; i++) { if (ctxt[i].done - || !__atomic_exchange_n( - &ctxt[i].ready, FALSE, __ATOMIC_ACQUIRE)) + || !__atomic_exchange_n(&ctxt[i].ready, FALSE, __ATOMIC_ACQUIRE)) { continue; } @@ -1174,8 +1173,8 @@ core_range_iterator_init(core_handle *spl, /* * Async cache-load waiters embedded in these contexts can be released by * another process when the clockcache is shared. The callback only marks - * ctxt->ready, so keep the context itself in the Splinter heap; the owning - * process remains responsible for resuming the iterator state. + * ctxt->ready, so keep the context itself in the Splinter heap; the + * owning process remains responsible for resuming the iterator state. */ init_ctxt = TYPED_ARRAY_ZALLOC(spl->heap_id, init_ctxt, range_itor->num_branches); diff --git a/src/platform_linux/async.h b/src/platform_linux/async.h index 6b408cc4..4937fca6 100644 --- a/src/platform_linux/async.h +++ b/src/platform_linux/async.h @@ -432,7 +432,7 @@ async_wait_queue_release_all(async_wait_queue *q) while (waiter != NULL) { async_waiter_assert_queued(q, (async_waiter *)waiter); - async_waiter *next = waiter->next; + async_waiter *next = waiter->next; async_callback_fn callback = waiter->callback; void *callback_arg = waiter->callback_arg; async_waiter_mark_idle((async_waiter *)waiter); From eb6de8fc684655a383b89e48d0d631c66977a53c Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Mon, 29 Jun 2026 10:15:43 -0700 Subject: [PATCH 4/5] fix wait queue instrumentation bug Signed-off-by: Rob Johnson --- src/clockcache.c | 14 ++++++++------ src/platform_linux/async.h | 16 +++++++++++----- src/platform_linux/laio.c | 1 + 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/clockcache.c b/src/clockcache.c index e30c65b0..b8197236 100644 --- a/src/clockcache.c +++ b/src/clockcache.c @@ -3124,12 +3124,14 @@ clockcache_get_async_state_init_virtual(void *payload, async_callback_fn callback, void *callback_arg) { - clockcache_get_async_state_init((clockcache_get_async_state *)payload, - (clockcache *)cc, - addr, - type, - callback, - callback_arg); + clockcache_get_async_state *state = (clockcache_get_async_state *)payload; + clockcache_get_async_state_init( + state, (clockcache *)cc, addr, type, callback, callback_arg); + // We initialize wait_node here, rather than right before the call to enqueue + // the waiter on the wait queue, because the async wait system has internal + // instrumentation that can catch double enqueues, which could break if we + // re-init before submission. + async_waiter_init(&state->wait_node); } static async_status diff --git a/src/platform_linux/async.h b/src/platform_linux/async.h index 4937fca6..bbd6dd31 100644 --- a/src/platform_linux/async.h +++ b/src/platform_linux/async.h @@ -258,6 +258,16 @@ struct async_wait_queue { async_waiter *tail; }; +static inline void +async_waiter_init(async_waiter *waiter) +{ + waiter->next = NULL; + waiter->callback = NULL; + waiter->callback_arg = NULL; + waiter->queue = NULL; + waiter->magic = ASYNC_WAITER_MAGIC_IDLE; +} + static inline void async_wait_queue_init(async_wait_queue *queue) { @@ -354,11 +364,7 @@ async_waiter_assert_queued(async_wait_queue *queue, async_waiter *waiter) static inline void async_waiter_mark_idle(async_waiter *waiter) { - waiter->next = NULL; - waiter->callback = NULL; - waiter->callback_arg = NULL; - waiter->queue = NULL; - waiter->magic = ASYNC_WAITER_MAGIC_IDLE; + async_waiter_init(waiter); } static inline void diff --git a/src/platform_linux/laio.c b/src/platform_linux/laio.c index cf1cfe7d..fe3062c9 100644 --- a/src/platform_linux/laio.c +++ b/src/platform_linux/laio.c @@ -542,6 +542,7 @@ laio_async_state_init(io_async_state *state, ios->addr = addr; ios->callback = callback; ios->callback_arg = callback_arg; + async_waiter_init(&ios->waiter_node); ios->reqs[0] = &ios->req; ios->iovlen = 0; ios->status = 0; From 2164b9f3a6e9f8e2aeea02d9402fdb50b357faa8 Mon Sep 17 00:00:00 2001 From: Rob Johnson Date: Mon, 29 Jun 2026 10:18:02 -0700 Subject: [PATCH 5/5] formatting Signed-off-by: Rob Johnson --- src/platform_linux/laio.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/platform_linux/laio.c b/src/platform_linux/laio.c index fe3062c9..87e6c3f5 100644 --- a/src/platform_linux/laio.c +++ b/src/platform_linux/laio.c @@ -543,9 +543,9 @@ laio_async_state_init(io_async_state *state, ios->callback = callback; ios->callback_arg = callback_arg; async_waiter_init(&ios->waiter_node); - ios->reqs[0] = &ios->req; - ios->iovlen = 0; - ios->status = 0; + ios->reqs[0] = &ios->req; + ios->iovlen = 0; + ios->status = 0; return STATUS_OK; }