Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions src/btree.c
Original file line number Diff line number Diff line change
Expand Up @@ -3489,24 +3489,16 @@ find_btree_node_and_get_idx_bounds_async(btree_iterator_async_state *state,
{
btree_node_unget(
state->itor->cc, state->itor->cfg, &state->itor->curr);
debug_assert(state->callback != NULL);
async_yield_after(state, state->callback(state->callback_arg));
btree_lookup_async_state_init(&state->lookup_state,
state->itor->cc,
state->itor->cfg,
state->itor->root_addr,
state->itor->page_type,
state->target,
NULL,
state->callback,
state->callback_arg);
state->lookup_state.stop_at_height = state->itor->height;
state->lookup_state.stats = NULL;
state->lookup_state.get_node = TRUE;
async_await(state,
btree_lookup_node_async(&state->lookup_state, 0)
== ASYNC_STATUS_DONE);
state->itor->curr = state->lookup_state.node;
platform_status rc = btree_lookup_node(state->itor->cc,
state->itor->cfg,
state->itor->root_addr,
state->target,
state->itor->height,
state->itor->page_type,
TRUE,
&state->itor->curr,
NULL);
platform_assert_status_ok(rc);
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/clockcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1986,9 +1986,7 @@ clockcache_get_from_disk_async(clockcache_get_async_state *state, uint64 depth)
state->io_start_time = platform_get_timestamp();
}

while (io_async_run(state->iostate) != ASYNC_STATUS_DONE) {
async_yield(state);
}
async_await(state, io_async_run(state->iostate) == ASYNC_STATUS_DONE);
state->rc = io_async_state_get_result(state->iostate);
if (!SUCCESS(state->rc)) {
platform_error_log("clockcache_get_from_disk_async: async read failed "
Expand Down Expand Up @@ -3126,12 +3124,14 @@ clockcache_get_async_state_init_virtual(void *payload,
async_callback_fn callback,
void *callback_arg)
{
clockcache_get_async_state_init((clockcache_get_async_state *)payload,
(clockcache *)cc,
addr,
type,
callback,
callback_arg);
clockcache_get_async_state *state = (clockcache_get_async_state *)payload;
clockcache_get_async_state_init(
state, (clockcache *)cc, addr, type, callback, callback_arg);
// We initialize wait_node here, rather than right before the call to enqueue
// the waiter on the wait queue, because the async wait system has internal
// instrumentation that can catch double enqueues, which could break if we
// re-init before submission.
async_waiter_init(&state->wait_node);
}

static async_status
Expand Down
23 changes: 15 additions & 8 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ static void
core_btree_iterator_init_async_callback(void *arg)
{
core_btree_iterator_init_async_context *ctxt = arg;
ctxt->ready = TRUE;
__atomic_store_n(&ctxt->ready, TRUE, __ATOMIC_RELEASE);
}

static platform_status
Expand Down Expand Up @@ -890,8 +890,8 @@ core_start_btree_iterator_init_async(
prefetch_lookahead,
core_btree_iterator_init_async_callback,
ctxt);
ctxt->ready = FALSE;
ctxt->done = FALSE;
__atomic_store_n(&ctxt->ready, FALSE, __ATOMIC_RELAXED);
ctxt->done = FALSE;

if (btree_iterator_init_async(&ctxt->state) == ASYNC_STATUS_DONE) {
ctxt->done = TRUE;
Expand Down Expand Up @@ -922,11 +922,12 @@ core_drain_btree_iterator_init_async(
while (done_count < num_inits) {
bool32 made_progress = FALSE;
for (uint64 i = 0; i < num_inits; i++) {
if (ctxt[i].done || !ctxt[i].ready) {
if (ctxt[i].done
|| !__atomic_exchange_n(&ctxt[i].ready, FALSE, __ATOMIC_ACQUIRE))
{
continue;
}

ctxt[i].ready = FALSE;
made_progress = TRUE;
if (btree_iterator_init_async(&ctxt[i].state) == ASYNC_STATUS_DONE) {
ctxt[i].done = TRUE;
Expand Down Expand Up @@ -1169,8 +1170,14 @@ core_range_iterator_init(core_handle *spl,

core_btree_iterator_init_async_context *init_ctxt = NULL;
if (range_itor->num_branches != 0) {
init_ctxt = TYPED_ARRAY_ZALLOC(
PROCESS_PRIVATE_HEAP_ID, init_ctxt, range_itor->num_branches);
/*
* Async cache-load waiters embedded in these contexts can be released by
* another process when the clockcache is shared. The callback only marks
* ctxt->ready, so keep the context itself in the Splinter heap; the
* owning process remains responsible for resuming the iterator state.
*/
init_ctxt =
TYPED_ARRAY_ZALLOC(spl->heap_id, init_ctxt, range_itor->num_branches);
}
if (range_itor->num_branches != 0 && init_ctxt == NULL) {
core_range_iterator_deinit(range_itor);
Expand Down Expand Up @@ -1234,7 +1241,7 @@ core_range_iterator_init(core_handle *spl,
}
}
if (init_ctxt != NULL) {
platform_free(PROCESS_PRIVATE_HEAP_ID, init_ctxt);
platform_free(spl->heap_id, init_ctxt);
}
if (!SUCCESS(rc)) {
core_range_iterator_deinit(range_itor);
Expand Down
79 changes: 74 additions & 5 deletions src/platform_linux/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,34 @@ typedef void (*async_callback_fn)(void *);
/*
* Wait queues for exections awaiting some condition.
*/
typedef struct async_wait_queue async_wait_queue;

#define ASYNC_WAITER_MAGIC_QUEUED 0x4153594e43575144ULL
#define ASYNC_WAITER_MAGIC_IDLE 0x4153594e43574944ULL

typedef struct async_waiter {
struct async_waiter *next;
async_callback_fn callback;
void *callback_arg;
async_wait_queue *queue;
uint64 magic;
} async_waiter;

typedef struct async_wait_queue {
struct async_wait_queue {
volatile uint64 lock;
volatile async_waiter *head;
async_waiter *tail;
} async_wait_queue;
};

static inline void
async_waiter_init(async_waiter *waiter)
{
waiter->next = NULL;
waiter->callback = NULL;
waiter->callback_arg = NULL;
waiter->queue = NULL;
waiter->magic = ASYNC_WAITER_MAGIC_IDLE;
}

static inline void
async_wait_queue_init(async_wait_queue *queue)
Expand Down Expand Up @@ -293,9 +310,21 @@ async_wait_queue_append(async_wait_queue *q,
async_callback_fn callback,
void *callback_arg)
{
platform_assert(q != NULL);
platform_assert(waiter != NULL);
platform_assert(callback != NULL);
platform_assert(callback_arg != NULL);
platform_assert(waiter->magic != ASYNC_WAITER_MAGIC_QUEUED,
"waiter %p is already queued on %p; appending to %p\n",
waiter,
waiter->queue,
q);

waiter->callback = callback;
waiter->callback_arg = callback_arg;
waiter->next = NULL;
waiter->queue = q;
waiter->magic = ASYNC_WAITER_MAGIC_QUEUED;

async_waiter *result;
if (q->head == NULL) {
Expand All @@ -309,12 +338,43 @@ async_wait_queue_append(async_wait_queue *q,
return result;
}

static inline void
async_waiter_assert_queued(async_wait_queue *queue, async_waiter *waiter)
{
platform_assert(waiter != NULL);
platform_assert(waiter->magic == ASYNC_WAITER_MAGIC_QUEUED,
"waiter %p has bad magic 0x%lx, queue %p, expected queue "
"%p, next %p, callback %p, callback_arg %p\n",
waiter,
waiter->magic,
waiter->queue,
queue,
waiter->next,
waiter->callback,
waiter->callback_arg);
platform_assert(waiter->queue == queue,
"waiter %p belongs to queue %p, expected queue %p\n",
waiter,
waiter->queue,
queue);
platform_assert(waiter->callback != NULL);
platform_assert(waiter->callback_arg != NULL);
}

static inline void
async_waiter_mark_idle(async_waiter *waiter)
{
async_waiter_init(waiter);
}

static inline void
async_wait_queue_remove(async_wait_queue *queue,
async_waiter *pred,
async_waiter *waiter)
{
async_waiter_assert_queued(queue, waiter);
if (pred != NULL) {
async_waiter_assert_queued(queue, pred);
platform_assert(pred->next == waiter);
pred->next = waiter->next;
if (queue->tail == waiter) {
Expand All @@ -327,6 +387,7 @@ async_wait_queue_remove(async_wait_queue *queue,
queue->tail = NULL;
}
}
async_waiter_mark_idle(waiter);
}

/* Public: notify one waiter that the condition has become true. */
Expand All @@ -343,6 +404,7 @@ async_wait_queue_release_one(async_wait_queue *q)

waiter = q->head;
if (waiter) {
async_waiter_assert_queued(q, (async_waiter *)waiter);
q->head = waiter->next;
if (q->head == NULL) {
q->tail = NULL;
Expand All @@ -351,7 +413,10 @@ async_wait_queue_release_one(async_wait_queue *q)
async_wait_queue_unlock(q);

if (waiter) {
waiter->callback(waiter->callback_arg);
async_callback_fn callback = waiter->callback;
void *callback_arg = waiter->callback_arg;
async_waiter_mark_idle((async_waiter *)waiter);
callback(callback_arg);
}
}

Expand All @@ -372,8 +437,12 @@ async_wait_queue_release_all(async_wait_queue *q)
async_wait_queue_unlock(q);

while (waiter != NULL) {
async_waiter *next = waiter->next;
waiter->callback(waiter->callback_arg);
async_waiter_assert_queued(q, (async_waiter *)waiter);
async_waiter *next = waiter->next;
async_callback_fn callback = waiter->callback;
void *callback_arg = waiter->callback_arg;
async_waiter_mark_idle((async_waiter *)waiter);
callback(callback_arg);
waiter = next;
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/platform_linux/laio.c
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,10 @@ laio_async_state_init(io_async_state *state,
ios->addr = addr;
ios->callback = callback;
ios->callback_arg = callback_arg;
ios->reqs[0] = &ios->req;
ios->iovlen = 0;
ios->status = 0;
async_waiter_init(&ios->waiter_node);
ios->reqs[0] = &ios->req;
ios->iovlen = 0;
ios->status = 0;
return STATUS_OK;
}

Expand Down
Loading