diff --git a/src/btree.c b/src/btree.c index 6299f92b..e93b0c50 100644 --- a/src/btree.c +++ b/src/btree.c @@ -3489,24 +3489,16 @@ find_btree_node_and_get_idx_bounds_async(btree_iterator_async_state *state, { btree_node_unget( state->itor->cc, state->itor->cfg, &state->itor->curr); - debug_assert(state->callback != NULL); - async_yield_after(state, state->callback(state->callback_arg)); - btree_lookup_async_state_init(&state->lookup_state, - state->itor->cc, - state->itor->cfg, - state->itor->root_addr, - state->itor->page_type, - state->target, - NULL, - state->callback, - state->callback_arg); - state->lookup_state.stop_at_height = state->itor->height; - state->lookup_state.stats = NULL; - state->lookup_state.get_node = TRUE; - async_await(state, - btree_lookup_node_async(&state->lookup_state, 0) - == ASYNC_STATUS_DONE); - state->itor->curr = state->lookup_state.node; + platform_status rc = btree_lookup_node(state->itor->cc, + state->itor->cfg, + state->itor->root_addr, + state->target, + state->itor->height, + state->itor->page_type, + TRUE, + &state->itor->curr, + NULL); + platform_assert_status_ok(rc); } } diff --git a/src/clockcache.c b/src/clockcache.c index 2fe92b3e..b8197236 100644 --- a/src/clockcache.c +++ b/src/clockcache.c @@ -1986,9 +1986,7 @@ clockcache_get_from_disk_async(clockcache_get_async_state *state, uint64 depth) state->io_start_time = platform_get_timestamp(); } - while (io_async_run(state->iostate) != ASYNC_STATUS_DONE) { - async_yield(state); - } + async_await(state, io_async_run(state->iostate) == ASYNC_STATUS_DONE); state->rc = io_async_state_get_result(state->iostate); if (!SUCCESS(state->rc)) { platform_error_log("clockcache_get_from_disk_async: async read failed " @@ -3126,12 +3124,14 @@ clockcache_get_async_state_init_virtual(void *payload, async_callback_fn callback, void *callback_arg) { - clockcache_get_async_state_init((clockcache_get_async_state *)payload, - (clockcache *)cc, - addr, - type, - callback, - callback_arg); + clockcache_get_async_state *state = (clockcache_get_async_state *)payload; + clockcache_get_async_state_init( + state, (clockcache *)cc, addr, type, callback, callback_arg); + // We initialize wait_node here, rather than right before the call to enqueue + // the waiter on the wait queue, because the async wait system has internal + // instrumentation that can catch double enqueues, which could break if we + // re-init before submission. + async_waiter_init(&state->wait_node); } static async_status diff --git a/src/core.c b/src/core.c index b30c01cb..d499f24a 100644 --- a/src/core.c +++ b/src/core.c @@ -854,7 +854,7 @@ static void core_btree_iterator_init_async_callback(void *arg) { core_btree_iterator_init_async_context *ctxt = arg; - ctxt->ready = TRUE; + __atomic_store_n(&ctxt->ready, TRUE, __ATOMIC_RELEASE); } static platform_status @@ -890,8 +890,8 @@ core_start_btree_iterator_init_async( prefetch_lookahead, core_btree_iterator_init_async_callback, ctxt); - ctxt->ready = FALSE; - ctxt->done = FALSE; + __atomic_store_n(&ctxt->ready, FALSE, __ATOMIC_RELAXED); + ctxt->done = FALSE; if (btree_iterator_init_async(&ctxt->state) == ASYNC_STATUS_DONE) { ctxt->done = TRUE; @@ -922,11 +922,12 @@ core_drain_btree_iterator_init_async( while (done_count < num_inits) { bool32 made_progress = FALSE; for (uint64 i = 0; i < num_inits; i++) { - if (ctxt[i].done || !ctxt[i].ready) { + if (ctxt[i].done + || !__atomic_exchange_n(&ctxt[i].ready, FALSE, __ATOMIC_ACQUIRE)) + { continue; } - ctxt[i].ready = FALSE; made_progress = TRUE; if (btree_iterator_init_async(&ctxt[i].state) == ASYNC_STATUS_DONE) { ctxt[i].done = TRUE; @@ -1169,8 +1170,14 @@ core_range_iterator_init(core_handle *spl, core_btree_iterator_init_async_context *init_ctxt = NULL; if (range_itor->num_branches != 0) { - init_ctxt = TYPED_ARRAY_ZALLOC( - PROCESS_PRIVATE_HEAP_ID, init_ctxt, range_itor->num_branches); + /* + * Async cache-load waiters embedded in these contexts can be released by + * another process when the clockcache is shared. The callback only marks + * ctxt->ready, so keep the context itself in the Splinter heap; the + * owning process remains responsible for resuming the iterator state. + */ + init_ctxt = + TYPED_ARRAY_ZALLOC(spl->heap_id, init_ctxt, range_itor->num_branches); } if (range_itor->num_branches != 0 && init_ctxt == NULL) { core_range_iterator_deinit(range_itor); @@ -1234,7 +1241,7 @@ core_range_iterator_init(core_handle *spl, } } if (init_ctxt != NULL) { - platform_free(PROCESS_PRIVATE_HEAP_ID, init_ctxt); + platform_free(spl->heap_id, init_ctxt); } if (!SUCCESS(rc)) { core_range_iterator_deinit(range_itor); diff --git a/src/platform_linux/async.h b/src/platform_linux/async.h index 92ffee94..bbd6dd31 100644 --- a/src/platform_linux/async.h +++ b/src/platform_linux/async.h @@ -239,17 +239,34 @@ typedef void (*async_callback_fn)(void *); /* * Wait queues for exections awaiting some condition. */ +typedef struct async_wait_queue async_wait_queue; + +#define ASYNC_WAITER_MAGIC_QUEUED 0x4153594e43575144ULL +#define ASYNC_WAITER_MAGIC_IDLE 0x4153594e43574944ULL + typedef struct async_waiter { struct async_waiter *next; async_callback_fn callback; void *callback_arg; + async_wait_queue *queue; + uint64 magic; } async_waiter; -typedef struct async_wait_queue { +struct async_wait_queue { volatile uint64 lock; volatile async_waiter *head; async_waiter *tail; -} async_wait_queue; +}; + +static inline void +async_waiter_init(async_waiter *waiter) +{ + waiter->next = NULL; + waiter->callback = NULL; + waiter->callback_arg = NULL; + waiter->queue = NULL; + waiter->magic = ASYNC_WAITER_MAGIC_IDLE; +} static inline void async_wait_queue_init(async_wait_queue *queue) @@ -293,9 +310,21 @@ async_wait_queue_append(async_wait_queue *q, async_callback_fn callback, void *callback_arg) { + platform_assert(q != NULL); + platform_assert(waiter != NULL); + platform_assert(callback != NULL); + platform_assert(callback_arg != NULL); + platform_assert(waiter->magic != ASYNC_WAITER_MAGIC_QUEUED, + "waiter %p is already queued on %p; appending to %p\n", + waiter, + waiter->queue, + q); + waiter->callback = callback; waiter->callback_arg = callback_arg; waiter->next = NULL; + waiter->queue = q; + waiter->magic = ASYNC_WAITER_MAGIC_QUEUED; async_waiter *result; if (q->head == NULL) { @@ -309,12 +338,43 @@ async_wait_queue_append(async_wait_queue *q, return result; } +static inline void +async_waiter_assert_queued(async_wait_queue *queue, async_waiter *waiter) +{ + platform_assert(waiter != NULL); + platform_assert(waiter->magic == ASYNC_WAITER_MAGIC_QUEUED, + "waiter %p has bad magic 0x%lx, queue %p, expected queue " + "%p, next %p, callback %p, callback_arg %p\n", + waiter, + waiter->magic, + waiter->queue, + queue, + waiter->next, + waiter->callback, + waiter->callback_arg); + platform_assert(waiter->queue == queue, + "waiter %p belongs to queue %p, expected queue %p\n", + waiter, + waiter->queue, + queue); + platform_assert(waiter->callback != NULL); + platform_assert(waiter->callback_arg != NULL); +} + +static inline void +async_waiter_mark_idle(async_waiter *waiter) +{ + async_waiter_init(waiter); +} + static inline void async_wait_queue_remove(async_wait_queue *queue, async_waiter *pred, async_waiter *waiter) { + async_waiter_assert_queued(queue, waiter); if (pred != NULL) { + async_waiter_assert_queued(queue, pred); platform_assert(pred->next == waiter); pred->next = waiter->next; if (queue->tail == waiter) { @@ -327,6 +387,7 @@ async_wait_queue_remove(async_wait_queue *queue, queue->tail = NULL; } } + async_waiter_mark_idle(waiter); } /* Public: notify one waiter that the condition has become true. */ @@ -343,6 +404,7 @@ async_wait_queue_release_one(async_wait_queue *q) waiter = q->head; if (waiter) { + async_waiter_assert_queued(q, (async_waiter *)waiter); q->head = waiter->next; if (q->head == NULL) { q->tail = NULL; @@ -351,7 +413,10 @@ async_wait_queue_release_one(async_wait_queue *q) async_wait_queue_unlock(q); if (waiter) { - waiter->callback(waiter->callback_arg); + async_callback_fn callback = waiter->callback; + void *callback_arg = waiter->callback_arg; + async_waiter_mark_idle((async_waiter *)waiter); + callback(callback_arg); } } @@ -372,8 +437,12 @@ async_wait_queue_release_all(async_wait_queue *q) async_wait_queue_unlock(q); while (waiter != NULL) { - async_waiter *next = waiter->next; - waiter->callback(waiter->callback_arg); + async_waiter_assert_queued(q, (async_waiter *)waiter); + async_waiter *next = waiter->next; + async_callback_fn callback = waiter->callback; + void *callback_arg = waiter->callback_arg; + async_waiter_mark_idle((async_waiter *)waiter); + callback(callback_arg); waiter = next; } } diff --git a/src/platform_linux/laio.c b/src/platform_linux/laio.c index cf1cfe7d..87e6c3f5 100644 --- a/src/platform_linux/laio.c +++ b/src/platform_linux/laio.c @@ -542,9 +542,10 @@ laio_async_state_init(io_async_state *state, ios->addr = addr; ios->callback = callback; ios->callback_arg = callback_arg; - ios->reqs[0] = &ios->req; - ios->iovlen = 0; - ios->status = 0; + async_waiter_init(&ios->waiter_node); + ios->reqs[0] = &ios->req; + ios->iovlen = 0; + ios->status = 0; return STATUS_OK; }