From 2096c92f65a719063168ce2a287aa2dcbe0353b5 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 17 Jun 2026 06:04:13 +0300 Subject: [PATCH 1/4] feat: add AsyncPaginator for non-blocking strategy-driven pagination Pagination was sync-only: Paginator walks pages over an HttpClient and a consumer must block a thread per page. This adds AsyncPaginator, the non-blocking counterpart that drives the same page-to-page walk over an AsyncHttpClient and returns a CompletableFuture, so no thread parks waiting on a page. AsyncPaginator reuses the existing PaginationStrategy/Page wire-convention seam unchanged, so any strategy written for Paginator works here. It exposes forEachAsync(Consumer) and collectAllAsync(), preserves the maxPages safety cap, and closes each Response after the strategy parses it (including on the exceptional path). The driver is trampolined so a long run of synchronously-completed page futures stays stack-safe. The surface stays transport-agnostic and in sdk-core with no new dependencies (java.util.concurrent only); Reactor/coroutines bridges belong in the adapter modules. --- sdk-core/api/sdk-core.api | 8 + .../sdk/core/pagination/AsyncPaginator.kt | 289 +++++++++++++++ .../sdk/core/pagination/AsyncPaginatorTest.kt | 338 ++++++++++++++++++ .../core/pagination/StubAsyncHttpClient.kt | 71 ++++ 4 files changed, 706 insertions(+) create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt create mode 100644 sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt create mode 100644 sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/StubAsyncHttpClient.kt diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 7ebcc312..4fab19ad 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -1977,6 +1977,14 @@ public abstract interface class org/dexpace/sdk/core/io/Source : java/io/Closeab public abstract fun read (Lorg/dexpace/sdk/core/io/Buffer;J)J } +public final class org/dexpace/sdk/core/pagination/AsyncPaginator { + public fun (Lorg/dexpace/sdk/core/client/AsyncHttpClient;Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/pagination/PaginationStrategy;)V + public fun (Lorg/dexpace/sdk/core/client/AsyncHttpClient;Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/pagination/PaginationStrategy;J)V + public synthetic fun (Lorg/dexpace/sdk/core/client/AsyncHttpClient;Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/pagination/PaginationStrategy;JILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun collectAllAsync ()Ljava/util/concurrent/CompletableFuture; + public final fun forEachAsync (Ljava/util/function/Consumer;)Ljava/util/concurrent/CompletableFuture; +} + public final class org/dexpace/sdk/core/pagination/CursorPaginationStrategy : org/dexpace/sdk/core/pagination/PaginationStrategy { public fun (Lkotlin/jvm/functions/Function1;)V public fun (Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt new file mode 100644 index 00000000..ef80d580 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt @@ -0,0 +1,289 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.pagination + +import org.dexpace.sdk.core.client.AsyncHttpClient +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.util.Futures +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import java.util.function.Consumer + +/** + * Asynchronous, strategy-driven paginator over an [AsyncHttpClient] — the non-blocking + * counterpart of [Paginator]. + * + * `AsyncPaginator` executes [initialRequest] against [asyncHttpClient], delegates each + * response to [strategy], and drives the page-to-page walk through a chain of + * [CompletableFuture]s. No thread blocks waiting on a page: each page is fetched, parsed, + * drained to the caller's consumer, and the next page is re-armed inside the future's + * completion graph. The same wire conventions ([PaginationStrategy], [Page]) back both the + * sync and async paginators, so a strategy written for [Paginator] works here unchanged. + * + * ## Laziness and the fetch budget + * + * Iteration is page-lazy in the same sense as [Paginator]: exactly one HTTP exchange happens + * per page consumed. A new page is fetched only after the previous page has been drained to + * the consumer and reports `hasNext` with a non-null next request. Empty pages still count + * toward the [maxPages] budget. + * + * ## Termination + * + * The walk completes when any of: + * + * - the current page reports `hasNext == false`, or + * - the current page's `nextPageRequest()` returns `null`, or + * - [maxPages] pages have been fetched (the safety cap), or + * - the consumer throws, or a transport/parse failure occurs (the result future completes + * exceptionally). + * + * ## Safety cap + * + * [maxPages] defaults to `Long.MAX_VALUE` (effectively unbounded). A misbehaving server that + * never advances its paging cursor would otherwise drive an unbounded fetch loop. **Production + * callers should set a finite cap.** The cap counts pages fetched (HTTP exchanges), not items. + * + * ## Response lifecycle + * + * Each [Response] is closed by the paginator after the strategy has parsed it — including on + * the exceptional path, where the response is closed before the result future is completed + * exceptionally. Strategies MUST read everything they need synchronously inside `parse(...)` + * and MUST NOT retain the response or its body past the call. Items in the returned [Page] + * outlive the response. + * + * ## Consumer threading + * + * The [Consumer] passed to [forEachAsync] is invoked on whichever thread completes the page + * future — typically a transport callback thread. It runs inline in the completion graph, so + * a slow consumer holds up the walk. The consumer is never invoked concurrently for a single + * [forEachAsync] call, but it MUST NOT assume any particular thread. A consumer that throws + * aborts the walk and surfaces through the result future. + * + * ## Stack safety + * + * The driver is trampolined: synchronously completed page futures (e.g. from an in-memory + * fake transport, or a cache hit) are processed in a loop rather than via recursive + * `thenCompose`, so a long run of already-complete pages does not overflow the stack. + * + * ## Thread-safety + * + * `AsyncPaginator` itself holds only immutable fields and is safe to share. Each call to + * [forEachAsync] / [collectAllAsync] starts an independent walk with its own state. + * + * ## Java interop + * + * ``` + * AsyncPaginator paginator = new AsyncPaginator<>(client, request, strategy); + * paginator.forEachAsync(item -> handle(item)) + * .thenRun(() -> done()); + * paginator.collectAllAsync().thenAccept(items -> ...); + * ``` + * + * @param T Element type yielded by the paginator. + * @property asyncHttpClient Async transport used to execute each page request. + * @property initialRequest Request used to fetch the first page; also passed to the strategy + * as a template for building subsequent page requests. + * @property strategy Strategy that parses each response into a [Page]. + * @property maxPages Safety cap on the total number of pages (HTTP exchanges) the walk will + * fetch. Defaults to `Long.MAX_VALUE` (unbounded). Must be positive. + */ +public class AsyncPaginator + @JvmOverloads + constructor( + private val asyncHttpClient: AsyncHttpClient, + private val initialRequest: Request, + private val strategy: PaginationStrategy, + private val maxPages: Long = Long.MAX_VALUE, + ) { + init { + require(maxPages > 0L) { "maxPages must be positive, was $maxPages" } + } + + /** + * Walks every item across every page, invoking [consumer] for each item in + * server-defined order. The returned future completes (with `null`) once the walk has + * terminated normally, or completes exceptionally if the transport fails, a strategy + * `parse` throws, or [consumer] throws. + * + * Each call starts a fresh walk from [initialRequest]. + * + * @param consumer Invoked once per item. See the class-level "Consumer threading" KDoc. + * @return A future that completes when the walk finishes. + */ + public fun forEachAsync(consumer: Consumer): CompletableFuture { + val result = CompletableFuture() + Walk(consumer, result).start() + return result + } + + /** + * Collects every item across every page into a single list, in server-defined order. + * The returned future completes with the accumulated list, or completes exceptionally + * on transport/parse/consumer failure. + * + * Buffers all items in memory — prefer [forEachAsync] for large or unbounded result + * sets. + * + * @return A future that completes with all items. + */ + public fun collectAllAsync(): CompletableFuture> { + val items = ArrayList() + return forEachAsync { items.add(it) }.thenApply { items } + } + + /** + * Drives one independent walk. Single-call state; never reused across [forEachAsync] + * invocations. + */ + private inner class Walk( + private val consumer: Consumer, + private val result: CompletableFuture, + ) { + private var nextRequest: Request? = initialRequest + private var pagesFetched: Long = 0L + + // Guards against re-entrant driving: a page future that completes synchronously + // would otherwise recurse into the loop that scheduled it. We trampoline instead — + // whichever stack frame owns the loop picks up the staged work. + private val driving = AtomicBoolean(false) + private var pendingPage: Page? = null + + fun start() { + drive() + } + + /** + * Trampoline loop. Each [step] advances the walk by one unit of synchronously + * available work and reports back via [Step]: keep looping ([Step.CONTINUE]), + * terminate ([Step.DONE]), or yield the loop to an async completion callback + * ([Step.SUSPEND]). On `SUSPEND` the `driving` flag is left set — the callback owns + * it and clears it before re-entering [drive] — so this frame must not clear it. + */ + private fun drive() { + if (!driving.compareAndSet(false, true)) { + // Another stack frame owns the loop; it will observe the work we queued. + return + } + var suspended = false + try { + var outcome = step() + while (outcome == Step.CONTINUE) { + outcome = step() + } + suspended = outcome == Step.SUSPEND + } finally { + if (!suspended) { + driving.set(false) + } + } + } + + /** + * Performs one unit of synchronously available work: drain a staged page, finish the + * walk, or fetch the next page. A synchronously completed fetch yields [Step.CONTINUE]; + * a genuinely pending fetch arms a completion callback and yields [Step.SUSPEND]. + */ + private fun step(): Step { + val staged = pendingPage + if (staged != null) { + pendingPage = null + return if (drainPage(staged)) Step.CONTINUE else Step.DONE + } + val request = nextRequest + if (request == null || pagesFetched >= maxPages) { + // No next request, or the cap is reached before fetching a page we would + // otherwise yield: the walk is complete. + result.complete(null) + return Step.DONE + } + nextRequest = null + val pageFuture = fetchPage(request) + if (pageFuture.isDone) { + // Synchronous completion (fake transport, cache hit): handle inline to keep + // the stack flat instead of recursing through a callback. + return if (stagePage(pageFuture)) Step.CONTINUE else Step.DONE + } + // Genuinely async: hand the `driving` flag to the callback and suspend the loop. + pageFuture.whenComplete { _, _ -> + if (stagePage(pageFuture)) { + driving.set(false) + drive() + } + } + return Step.SUSPEND + } + + /** + * Stages a completed page future for draining. Returns `true` to continue driving, + * `false` if the future failed (the walk is then terminated exceptionally). + */ + private fun stagePage(future: CompletableFuture>): Boolean { + val page: Page = + try { + future.join() + } catch (t: Throwable) { + result.completeExceptionally(Futures.unwrap(t)) + return false + } + pendingPage = page + return true + } + + /** + * Emits a page's items to the consumer, then schedules the next request. Returns + * `true` to continue driving, `false` if the consumer threw (walk aborted). + */ + private fun drainPage(page: Page): Boolean { + try { + val items = page.items + var i = 0 + val size = items.size + while (i < size) { + consumer.accept(items[i]) + i++ + } + } catch (t: Throwable) { + result.completeExceptionally(t) + return false + } + nextRequest = if (page.hasNext) page.nextPageRequest() else null + return true + } + + /** + * Executes [request], parses the response into a [Page], and closes the response — + * mirroring [Paginator]'s per-page lifecycle. The returned future completes with the + * parsed page or exceptionally if the transport or strategy fails. + */ + private fun fetchPage(request: Request): CompletableFuture> { + pagesFetched++ + val transportFuture: CompletableFuture = + try { + asyncHttpClient.executeAsync(request) + } catch (t: Throwable) { + // An eager throw from executeAsync (a contract violation, but be + // defensive) becomes an exceptional future so the driver stays uniform. + return Futures.failed(t) + } + return transportFuture.handle { response, error -> + if (error != null) { + throw Futures.unwrap(error) + } + try { + strategy.parse(response, initialRequest) + } finally { + response.close() + } + } + } + } + } + +/** Outcome of a single step of [AsyncPaginator]'s trampoline loop. */ +private enum class Step { CONTINUE, DONE, SUSPEND } diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt new file mode 100644 index 00000000..025f2dff --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt @@ -0,0 +1,338 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.pagination + +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import java.io.IOException +import java.util.concurrent.CompletionException +import java.util.concurrent.ExecutionException +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +class AsyncPaginatorTest { + private val itemsExtractor: (Response) -> List = { resp -> + val body = resp.body!!.source().use { it.readUtf8() } + if (body.isEmpty()) emptyList() else body.split(",") + } + + @BeforeTest + fun setup() { + installIoProvider() + } + + private fun initialRequest(): Request = + Request.builder() + .url("https://api.example.com/items") + .method(Method.GET) + .build() + + private fun strategy(): LinkHeaderPaginationStrategy = LinkHeaderPaginationStrategy(itemsExtractor) + + /** A three-page Link-header stub: items -> ?page=2 -> ?page=3 (terminal). */ + private fun threePageClient(executor: java.util.concurrent.Executor? = null): StubAsyncHttpClient { + val client = StubAsyncHttpClient(executor) + client.on("https://api.example.com/items") { req -> + textResponse( + req, + "a,b", + extraHeaders = + mapOf("Link" to "; rel=\"next\""), + ) + } + client.on("https://api.example.com/items?page=2") { req -> + textResponse( + req, + "c,d", + extraHeaders = + mapOf("Link" to "; rel=\"next\""), + ) + } + client.on("https://api.example.com/items?page=3") { req -> + textResponse(req, "e") + } + return client + } + + @Test + fun `collectAllAsync walks every page synchronously-completed`() { + val client = threePageClient() + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val items = paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + + assertEquals(listOf("a", "b", "c", "d", "e"), items) + assertEquals(3, client.callCount) + assertEquals( + listOf( + "https://api.example.com/items", + "https://api.example.com/items?page=2", + "https://api.example.com/items?page=3", + ), + client.receivedUrls, + ) + } + + @Test + fun `forEachAsync visits each item in order`() { + val client = threePageClient() + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val seen = ArrayList() + paginator.forEachAsync { seen.add(it) }.get(5, TimeUnit.SECONDS) + + assertEquals(listOf("a", "b", "c", "d", "e"), seen) + } + + @Test + fun `walk completes when pages complete on a background executor`() { + val executor = Executors.newFixedThreadPool(2) + try { + val client = threePageClient(executor) + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val items = paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + + assertEquals(listOf("a", "b", "c", "d", "e"), items) + assertEquals(3, client.callCount) + } finally { + executor.shutdownNow() + } + } + + @Test + fun `maxPages caps a server that returns the same cursor forever`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + textResponse( + req, + "x", + extraHeaders = mapOf("Link" to "; rel=\"next\""), + ) + } + val paginator = AsyncPaginator(client, initialRequest(), strategy(), maxPages = 3L) + + val items = paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + + assertEquals(listOf("x", "x", "x"), items) + assertEquals(3, client.callCount) + } + + @Test + fun `maxPages of one fetches only the first page`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + textResponse( + req, + "only", + extraHeaders = mapOf("Link" to "; rel=\"next\""), + ) + } + val paginator = AsyncPaginator(client, initialRequest(), strategy(), maxPages = 1L) + + assertEquals(listOf("only"), paginator.collectAllAsync().get(5, TimeUnit.SECONDS)) + assertEquals(1, client.callCount) + } + + @Test + fun `non-positive maxPages is rejected`() { + val client = StubAsyncHttpClient() + assertFailsWith { + AsyncPaginator(client, initialRequest(), strategy(), maxPages = 0L) + } + } + + @Test + fun `transport failure completes the result future exceptionally with the original cause`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { error("boom from transport") } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val ex = + assertFailsWith { + paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + } + // Futures.unwrap unwraps the CompletionException wrapper so callers see the cause. + assertTrue(ex.cause is IllegalStateException, "cause was ${ex.cause}") + assertEquals("boom from transport", ex.cause?.message) + } + + @Test + fun `strategy parse failure surfaces and still closes the response`() { + val closed = AtomicInteger(0) + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + countingResponse(req, "a,b", closed) + } + // Strategy that always throws while parsing. + val failing = + PaginationStrategy { _, _ -> throw IOException("parse exploded") } + val paginator = AsyncPaginator(client, initialRequest(), failing) + + val ex = + assertFailsWith { + paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + } + assertTrue(ex.cause is IOException, "cause was ${ex.cause}") + assertEquals(1, closed.get(), "response must be closed even when parse throws") + } + + @Test + fun `consumer throwing aborts the walk and surfaces the exception`() { + val client = threePageClient() + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val ex = + assertFailsWith { + paginator + .forEachAsync { item -> + if (item == "c") error("stop at c") + // else: keep going + }.get(5, TimeUnit.SECONDS) + } + assertTrue(ex.cause is IllegalStateException, "cause was ${ex.cause}") + assertEquals("stop at c", ex.cause?.message) + // The walk fetched page 1 (a,b) and page 2 (c,d), then aborted on 'c'. Page 3 untouched. + assertEquals(2, client.callCount) + } + + @Test + fun `each response is closed exactly once after parsing`() { + val closes = AtomicInteger(0) + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + countingResponse( + req, + "a", + closes, + extraHeaders = + mapOf("Link" to "; rel=\"next\""), + ) + } + client.on("https://api.example.com/items?page=2") { req -> + countingResponse(req, "b", closes) + } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + assertEquals(listOf("a", "b"), paginator.collectAllAsync().get(5, TimeUnit.SECONDS)) + assertEquals(2, closes.get(), "both responses must be closed") + } + + @Test + fun `empty terminal page yields no extra items and stops`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + textResponse( + req, + "a,b", + extraHeaders = + mapOf("Link" to "; rel=\"next\""), + ) + } + client.on("https://api.example.com/items?page=2") { req -> + // Empty body, no Link header → end of stream. + textResponse(req, "") + } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + assertEquals(listOf("a", "b"), paginator.collectAllAsync().get(5, TimeUnit.SECONDS)) + assertEquals(2, client.callCount) + } + + @Test + fun `deep run of synchronously-completed pages stays stack-safe`() { + // A server that advances its cursor every page for many pages. With synchronous + // completion, a naive recursive driver would overflow the stack; the trampoline must + // process these in a loop. + val pageCount = 5_000 + val client = StubAsyncHttpClient() + for (page in 0 until pageCount) { + val url = + if (page == 0) { + "https://api.example.com/items" + } else { + "https://api.example.com/items?page=$page" + } + val isLast = page == pageCount - 1 + client.on(url) { req -> + if (isLast) { + textResponse(req, "p$page") + } else { + textResponse( + req, + "p$page", + extraHeaders = + mapOf( + "Link" to + "; rel=\"next\"", + ), + ) + } + } + } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val items = paginator.collectAllAsync().get(30, TimeUnit.SECONDS) + assertEquals(pageCount, items.size) + assertEquals("p0", items.first()) + assertEquals("p${pageCount - 1}", items.last()) + } + + @Test + fun `collectAllAsync wrapper rethrows as CompletionException on join`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { error("kaboom") } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + // join() (vs get()) wraps in CompletionException — confirms the failure path is wired + // through the standard CompletableFuture contract. + assertFailsWith { + paginator.collectAllAsync().join() + } + } + + @AfterTest + fun teardown() { + // no-op + } +} + +/** + * A [textResponse] whose [Response.close] increments [closeCounter], letting tests assert the + * paginator closes each response exactly once. + */ +private fun countingResponse( + request: Request, + body: String, + closeCounter: AtomicInteger, + extraHeaders: Map = emptyMap(), +): Response { + val delegate = textResponse(request, body, extraHeaders) + val countingBody = + object : org.dexpace.sdk.core.http.response.ResponseBody() { + override fun mediaType(): org.dexpace.sdk.core.http.common.MediaType? = delegate.body?.mediaType() + + override fun contentLength(): Long = delegate.body?.contentLength() ?: -1L + + override fun source(): org.dexpace.sdk.core.io.BufferedSource = delegate.body!!.source() + + override fun close() { + closeCounter.incrementAndGet() + delegate.close() + } + } + return delegate.newBuilder().body(countingBody).build() +} diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/StubAsyncHttpClient.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/StubAsyncHttpClient.kt new file mode 100644 index 00000000..977d59ae --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/StubAsyncHttpClient.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.pagination + +import org.dexpace.sdk.core.client.AsyncHttpClient +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executor + +/** + * Test support: an async stub driven by canned responses keyed by request URL — the async + * mirror of [StubHttpClient]. + * + * Each registered responder produces a [Response] (or throws). The future returned by + * [executeAsync] is completed either inline (synchronous completion, exercising the + * trampoline's already-done fast path) or via [completionExecutor] when one is supplied + * (genuinely deferred completion, exercising the callback re-arm path). + */ +internal class StubAsyncHttpClient( + private val completionExecutor: Executor? = null, +) : AsyncHttpClient { + private val responders: MutableMap Response> = LinkedHashMap() + private val urls: MutableList = ArrayList() + + fun on( + url: String, + responseBuilder: (Request) -> Response, + ): StubAsyncHttpClient { + responders[url] = responseBuilder + return this + } + + /** All URLs received, in call order. */ + val receivedUrls: List get() = urls.toList() + + /** Number of HTTP calls executed. */ + val callCount: Int get() = urls.size + + override fun executeAsync(request: Request): CompletableFuture { + val url = request.url.toString() + urls.add(url) + val responder = + responders[url] + ?: error("StubAsyncHttpClient: no canned response for URL: $url\nKnown: ${responders.keys}") + val executor = completionExecutor + if (executor == null) { + return try { + CompletableFuture.completedFuture(responder(request)) + } catch (t: Throwable) { + val f = CompletableFuture() + f.completeExceptionally(t) + f + } + } + val future = CompletableFuture() + executor.execute { + try { + future.complete(responder(request)) + } catch (t: Throwable) { + future.completeExceptionally(t) + } + } + return future + } +} From ac20b487856657d82fc2c4c935dab9806405bb27 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 24 Jun 2026 03:18:57 +0300 Subject: [PATCH 2/4] feat: make AsyncPaginator walks cancellable and harden failure paths Wire the future returned by forEachAsync/collectAllAsync back into the walk so completing it from the outside (cancel, complete, completeExceptionally, orTimeout) halts pagination: the driver stops before the next fetch and best-effort aborts the in-flight page exchange by cancelling its transport future. The cancellation contract is documented on the class. Harden the driver's failure handling: - surface a throwing Page.hasNext/nextPageRequest() through the result future instead of letting it escape the driver and strand the walk - fail cleanly if a transport violates the SPI by completing with a null Response, rather than NPE-ing inside parse/close Narrow collectAllAsync's return type to CompletableFuture> so the accumulator is not handed back as mutable. Add tests for cancellation propagation, external completion stopping the walk before the next fetch, the null-Response guard, and the throwing-nextPageRequest path. --- .../sdk/core/pagination/AsyncPaginator.kt | 59 +++++++- .../sdk/core/pagination/AsyncPaginatorTest.kt | 129 ++++++++++++++++-- 2 files changed, 173 insertions(+), 15 deletions(-) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt index ef80d580..98f4af36 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt @@ -65,6 +65,15 @@ import java.util.function.Consumer * [forEachAsync] call, but it MUST NOT assume any particular thread. A consumer that throws * aborts the walk and surfaces through the result future. * + * ## Cancellation + * + * Completing the future returned by [forEachAsync] / [collectAllAsync] from the outside — + * `cancel(true)`, `complete(...)`, `completeExceptionally(...)`, `orTimeout(...)` — halts the + * walk: the driver fetches no further pages, and the page exchange currently in flight is + * best-effort aborted by cancelling its transport future (which propagates into the underlying + * client per the [AsyncHttpClient] cancellation contract). A page request already dispatched + * may still complete before the abort takes effect; its response is closed and discarded. + * * ## Stack safety * * The driver is trampolined: synchronously completed page futures (e.g. from an in-memory @@ -111,7 +120,9 @@ public class AsyncPaginator * terminated normally, or completes exceptionally if the transport fails, a strategy * `parse` throws, or [consumer] throws. * - * Each call starts a fresh walk from [initialRequest]. + * Each call starts a fresh walk from [initialRequest]. Cancelling (or otherwise + * completing) the returned future halts the walk and best-effort aborts the in-flight + * page exchange — see the class-level "Cancellation" KDoc. * * @param consumer Invoked once per item. See the class-level "Consumer threading" KDoc. * @return A future that completes when the walk finishes. @@ -132,7 +143,7 @@ public class AsyncPaginator * * @return A future that completes with all items. */ - public fun collectAllAsync(): CompletableFuture> { + public fun collectAllAsync(): CompletableFuture> { val items = ArrayList() return forEachAsync { items.add(it) }.thenApply { items } } @@ -154,7 +165,18 @@ public class AsyncPaginator private val driving = AtomicBoolean(false) private var pendingPage: Page? = null + // The transport future for the page currently being fetched, or null between + // fetches. Written by the loop owner (in [fetchPage]); read by the cancellation + // hook on a possibly different thread, hence @Volatile. + @Volatile + private var inFlight: CompletableFuture? = null + fun start() { + // If the caller cancels/completes the result future, abort the in-flight page + // exchange (best-effort) so a long walk does not keep an exchange open. The + // driver also re-checks result.isDone before each fetch, so no further pages + // are requested once the result is settled. + result.whenComplete { _, _ -> inFlight?.cancel(true) } drive() } @@ -193,12 +215,16 @@ public class AsyncPaginator val staged = pendingPage if (staged != null) { pendingPage = null - return if (drainPage(staged)) Step.CONTINUE else Step.DONE + // If the result was settled from the outside (cancelled, timed out, or + // completed by the caller), drop the staged page undrained instead of + // emitting it to the consumer. + return if (!result.isDone && drainPage(staged)) Step.CONTINUE else Step.DONE } val request = nextRequest - if (request == null || pagesFetched >= maxPages) { - // No next request, or the cap is reached before fetching a page we would - // otherwise yield: the walk is complete. + if (request == null || result.isDone || pagesFetched >= maxPages) { + // No next request, the result was settled externally, or the cap is reached + // before fetching a page we would otherwise yield: the walk is complete. + // complete(null) is a no-op when the result is already settled. result.complete(null) return Step.DONE } @@ -210,6 +236,9 @@ public class AsyncPaginator return if (stagePage(pageFuture)) Step.CONTINUE else Step.DONE } // Genuinely async: hand the `driving` flag to the callback and suspend the loop. + // If pageFuture completes during registration, this callback runs inline on the + // current thread and re-enters drive() while this frame still returns SUSPEND — + // safe, because the SUSPEND frame never clears `driving`. pageFuture.whenComplete { _, _ -> if (stagePage(pageFuture)) { driving.set(false) @@ -248,11 +277,14 @@ public class AsyncPaginator consumer.accept(items[i]) i++ } + // Compute the next request inside the guard: a Page whose hasNext / + // nextPageRequest() throws (e.g. a malformed cursor) must surface through + // the result future, not escape the driver and strand the walk. + nextRequest = if (page.hasNext) page.nextPageRequest() else null } catch (t: Throwable) { result.completeExceptionally(t) return false } - nextRequest = if (page.hasNext) page.nextPageRequest() else null return true } @@ -271,10 +303,23 @@ public class AsyncPaginator // defensive) becomes an exceptional future so the driver stays uniform. return Futures.failed(t) } + // Publish the in-flight exchange so external cancellation can abort it, then + // re-check: if the walk was settled between step()'s fetch guard and here, the + // one-shot cancel hook may have already fired against the previous (now-stale) + // inFlight, so abort the exchange we just dispatched ourselves. + inFlight = transportFuture + if (result.isDone) { + transportFuture.cancel(true) + } return transportFuture.handle { response, error -> if (error != null) { throw Futures.unwrap(error) } + if (response == null) { + // AsyncHttpClient forbids a null success completion; fail cleanly + // rather than NPE on the parse/close below. + error("AsyncHttpClient.executeAsync completed with a null Response") + } try { strategy.parse(response, initialRequest) } finally { diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt index 025f2dff..d7412a15 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt @@ -7,20 +7,27 @@ package org.dexpace.sdk.core.pagination +import org.dexpace.sdk.core.client.AsyncHttpClient +import org.dexpace.sdk.core.http.common.MediaType import org.dexpace.sdk.core.http.request.Method import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.http.response.ResponseBody +import org.dexpace.sdk.core.io.BufferedSource import java.io.IOException +import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionException import java.util.concurrent.ExecutionException +import java.util.concurrent.Executor import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger -import kotlin.test.AfterTest +import java.util.concurrent.atomic.AtomicReference import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertFalse import kotlin.test.assertTrue class AsyncPaginatorTest { @@ -43,7 +50,7 @@ class AsyncPaginatorTest { private fun strategy(): LinkHeaderPaginationStrategy = LinkHeaderPaginationStrategy(itemsExtractor) /** A three-page Link-header stub: items -> ?page=2 -> ?page=3 (terminal). */ - private fun threePageClient(executor: java.util.concurrent.Executor? = null): StubAsyncHttpClient { + private fun threePageClient(executor: Executor? = null): StubAsyncHttpClient { val client = StubAsyncHttpClient(executor) client.on("https://api.example.com/items") { req -> textResponse( @@ -304,9 +311,115 @@ class AsyncPaginatorTest { } } - @AfterTest - fun teardown() { - // no-op + @Test + fun `cancelling the result future aborts the in-flight exchange and stops the walk`() { + // A transport that hands back a future which never completes on its own, so the walk + // suspends on the first page until the test cancels it. + val transportFuture = CompletableFuture() + val callCount = AtomicInteger(0) + val client = + AsyncHttpClient { + callCount.incrementAndGet() + transportFuture + } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val result = paginator.forEachAsync { } + assertFalse(result.isDone, "walk should be suspended on the in-flight page") + + result.cancel(true) + + assertTrue(transportFuture.isCancelled, "in-flight transport future must be cancelled") + assertTrue(result.isCancelled) + assertEquals(1, callCount.get(), "no further pages fetched after cancellation") + } + + @Test + fun `completing the result during the walk stops it before the next fetch`() { + // Page 1 is held by a future the test completes by hand; page 2 records whether it is + // ever fetched. The consumer completes the result future from inside the walk (on page 1's + // last item), which must make the driver terminate at the next fetch decision — exercising + // the result.isDone short-circuit in step(), not just the cancel hook. + val page1 = CompletableFuture() + val page2Calls = AtomicInteger(0) + val client = + AsyncHttpClient { req -> + if (req.url.toString() == "https://api.example.com/items") { + page1 + } else { + page2Calls.incrementAndGet() + CompletableFuture.completedFuture(textResponse(req, "c,d")) + } + } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val resultRef = AtomicReference>() + val seen = ArrayList() + val result = + paginator.forEachAsync { item -> + seen.add(item) + if (item == "b") resultRef.get().complete(null) + } + resultRef.set(result) + + // Walk is suspended on page 1; nothing consumed yet. + assertTrue(seen.isEmpty(), "consumer should not run before page 1 completes") + + // Completing page 1 drains "a","b" inline; the consumer settles the result on "b". + page1.complete( + textResponse( + initialRequest(), + "a,b", + extraHeaders = + mapOf("Link" to "; rel=\"next\""), + ), + ) + + result.get(5, TimeUnit.SECONDS) + assertEquals(listOf("a", "b"), seen) + assertEquals(0, page2Calls.get(), "page 2 must not be fetched after the result is settled") + } + + @Test + fun `a null Response completion fails the walk cleanly`() { + // A misbehaving transport that violates the AsyncHttpClient contract by completing with a + // null Response. The paginator must fail the walk rather than NPE inside parse/close. + @Suppress("UNCHECKED_CAST") + val client = + AsyncHttpClient { + CompletableFuture.completedFuture(null) as CompletableFuture + } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val ex = + assertFailsWith { + paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + } + assertTrue(ex.cause is IllegalStateException, "cause was ${ex.cause}") + } + + @Test + fun `a throwing nextPageRequest surfaces through the result future`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> textResponse(req, "a,b") } + // A page that claims a next page exists but throws while building its request. + val strategy = + PaginationStrategy { _, _ -> + object : Page { + override val items: List = listOf("a", "b") + override val hasNext: Boolean = true + + override fun nextPageRequest(): Request? = error("cannot build next page request") + } + } + val paginator = AsyncPaginator(client, initialRequest(), strategy) + + val ex = + assertFailsWith { + paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + } + assertTrue(ex.cause is IllegalStateException, "cause was ${ex.cause}") + assertEquals("cannot build next page request", ex.cause?.message) } } @@ -322,12 +435,12 @@ private fun countingResponse( ): Response { val delegate = textResponse(request, body, extraHeaders) val countingBody = - object : org.dexpace.sdk.core.http.response.ResponseBody() { - override fun mediaType(): org.dexpace.sdk.core.http.common.MediaType? = delegate.body?.mediaType() + object : ResponseBody() { + override fun mediaType(): MediaType? = delegate.body?.mediaType() override fun contentLength(): Long = delegate.body?.contentLength() ?: -1L - override fun source(): org.dexpace.sdk.core.io.BufferedSource = delegate.body!!.source() + override fun source(): BufferedSource = delegate.body!!.source() override fun close() { closeCounter.incrementAndGet() From 9cc04b16acb889528186892c184bbd513b3942a4 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 24 Jun 2026 03:51:56 +0300 Subject: [PATCH 3/4] feat: let AsyncPaginator drain pages on a caller-supplied executor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The page consumer runs inline on the thread that completes each page — a transport callback thread — so a consumer that blocks ties up the transport's callback pool. Add forEachAsync(Consumer, Executor) and collectAllAsync(Executor) overloads that run the page-draining driver, and therefore every consumer invocation, on the supplied executor; parse and close stay bound to each transport future's own completion. Ordering, single-page back-pressure, the single-entry trampoline, and cancellation semantics are unchanged, and a rejected executor dispatch fails the walk rather than stranding it. Also covers the eager-throw path of executeAsync and makes the consumer-threading and page-granularity cancellation contracts explicit in the KDoc. --- sdk-core/api/sdk-core.api | 2 + .../sdk/core/pagination/AsyncPaginator.kt | 109 +++++++++++--- .../sdk/core/pagination/AsyncPaginatorTest.kt | 138 ++++++++++++++++++ 3 files changed, 231 insertions(+), 18 deletions(-) diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 4fab19ad..d5cf6cd5 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -1982,7 +1982,9 @@ public final class org/dexpace/sdk/core/pagination/AsyncPaginator { public fun (Lorg/dexpace/sdk/core/client/AsyncHttpClient;Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/pagination/PaginationStrategy;J)V public synthetic fun (Lorg/dexpace/sdk/core/client/AsyncHttpClient;Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/pagination/PaginationStrategy;JILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun collectAllAsync ()Ljava/util/concurrent/CompletableFuture; + public final fun collectAllAsync (Ljava/util/concurrent/Executor;)Ljava/util/concurrent/CompletableFuture; public final fun forEachAsync (Ljava/util/function/Consumer;)Ljava/util/concurrent/CompletableFuture; + public final fun forEachAsync (Ljava/util/function/Consumer;Ljava/util/concurrent/Executor;)Ljava/util/concurrent/CompletableFuture; } public final class org/dexpace/sdk/core/pagination/CursorPaginationStrategy : org/dexpace/sdk/core/pagination/PaginationStrategy { diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt index 98f4af36..b2f63f46 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt @@ -12,6 +12,7 @@ import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.util.Futures import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executor import java.util.concurrent.atomic.AtomicBoolean import java.util.function.Consumer @@ -59,11 +60,18 @@ import java.util.function.Consumer * * ## Consumer threading * - * The [Consumer] passed to [forEachAsync] is invoked on whichever thread completes the page - * future — typically a transport callback thread. It runs inline in the completion graph, so - * a slow consumer holds up the walk. The consumer is never invoked concurrently for a single - * [forEachAsync] call, but it MUST NOT assume any particular thread. A consumer that throws - * aborts the walk and surfaces through the result future. + * By default the [Consumer] passed to [forEachAsync] is invoked on whichever thread completes + * the page future — typically a transport callback thread. It runs inline in the completion + * graph, so a slow or blocking consumer holds up the walk and ties up that transport thread. + * For a consumer that does blocking or expensive work, prefer the [forEachAsync] / + * [collectAllAsync] overloads that take an [Executor]: the page-draining driver — and therefore + * every consumer invocation — then runs on that executor, leaving the transport's callback + * threads free. (The strategy `parse` and response `close()` are not dispatched to the executor; + * they run inline in each transport future's completion — usually on a transport callback thread, + * but on the driver thread when the transport completes synchronously.) Either way the consumer + * is never invoked concurrently for a single walk, items are delivered one at a time in server + * order, and the consumer MUST NOT assume any particular thread. A consumer that throws aborts + * the walk and surfaces through the result future. * * ## Cancellation * @@ -74,6 +82,11 @@ import java.util.function.Consumer * client per the [AsyncHttpClient] cancellation contract). A page request already dispatched * may still complete before the abort takes effect; its response is closed and discarded. * + * Cancellation takes effect at page granularity. If the result is settled while a page is + * mid-drain, the items already being delivered from that page still reach the consumer — the + * driver stops at the next page boundary rather than interrupting an in-progress drain. A page + * that has been fetched but not yet drained when the result settles is dropped undrained. + * * ## Stack safety * * The driver is trampolined: synchronously completed page futures (e.g. from an in-memory @@ -116,7 +129,8 @@ public class AsyncPaginator /** * Walks every item across every page, invoking [consumer] for each item in - * server-defined order. The returned future completes (with `null`) once the walk has + * server-defined order, on the thread that completes each page (typically a transport + * callback thread). The returned future completes (with `null`) once the walk has * terminated normally, or completes exceptionally if the transport fails, a strategy * `parse` throws, or [consumer] throws. * @@ -127,11 +141,26 @@ public class AsyncPaginator * @param consumer Invoked once per item. See the class-level "Consumer threading" KDoc. * @return A future that completes when the walk finishes. */ - public fun forEachAsync(consumer: Consumer): CompletableFuture { - val result = CompletableFuture() - Walk(consumer, result).start() - return result - } + public fun forEachAsync(consumer: Consumer): CompletableFuture = startWalk(consumer, null) + + /** + * Like [forEachAsync], but runs the page-draining driver — and therefore every + * [consumer] invocation — on [executor] instead of inline on the page-completion thread. + * Use this when the consumer does blocking or expensive work, so it does not tie up the + * transport's callback threads. Items are still delivered one at a time in server order, + * and the consumer is never invoked concurrently for a single walk. + * + * If [executor] rejects work (it is shut down, or a bounded queue is saturated) the walk + * terminates and the returned future completes exceptionally with the rejection. + * + * @param consumer Invoked once per item, on [executor]. + * @param executor Executor on which the driver and consumer run. + * @return A future that completes when the walk finishes. + */ + public fun forEachAsync( + consumer: Consumer, + executor: Executor, + ): CompletableFuture = startWalk(consumer, executor) /** * Collects every item across every page into a single list, in server-defined order. @@ -143,9 +172,29 @@ public class AsyncPaginator * * @return A future that completes with all items. */ - public fun collectAllAsync(): CompletableFuture> { + public fun collectAllAsync(): CompletableFuture> = collectInto(null) + + /** + * Like [collectAllAsync], but runs the driver on [executor] — see the threading contract + * on [forEachAsync]`(consumer, executor)`. + * + * @param executor Executor on which the driver runs. + * @return A future that completes with all items. + */ + public fun collectAllAsync(executor: Executor): CompletableFuture> = collectInto(executor) + + private fun startWalk( + consumer: Consumer, + executor: Executor?, + ): CompletableFuture { + val result = CompletableFuture() + Walk(consumer, executor, result).start() + return result + } + + private fun collectInto(executor: Executor?): CompletableFuture> { val items = ArrayList() - return forEachAsync { items.add(it) }.thenApply { items } + return startWalk({ items.add(it) }, executor).thenApply { items } } /** @@ -154,6 +203,7 @@ public class AsyncPaginator */ private inner class Walk( private val consumer: Consumer, + private val executor: Executor?, private val result: CompletableFuture, ) { private var nextRequest: Request? = initialRequest @@ -177,7 +227,29 @@ public class AsyncPaginator // driver also re-checks result.isDone before each fetch, so no further pages // are requested once the result is settled. result.whenComplete { _, _ -> inFlight?.cancel(true) } - drive() + // Enter the driver — on [executor] when one was supplied, so even a synchronously + // completed first page drains to the consumer off the caller's thread. + resume() + } + + /** + * Enters the trampoline loop, dispatching to [executor] when one was supplied. Both + * the initial entry and every async re-entry route through here, so the consumer runs + * on the executor rather than on a transport callback thread. A rejected dispatch + * (executor shut down or saturated) fails the walk instead of leaking the rejection + * into the completion machinery or hanging the result future. + */ + private fun resume() { + val ex = executor + if (ex == null) { + drive() + } else { + try { + ex.execute { drive() } + } catch (t: Throwable) { + result.completeExceptionally(t) + } + } } /** @@ -236,13 +308,14 @@ public class AsyncPaginator return if (stagePage(pageFuture)) Step.CONTINUE else Step.DONE } // Genuinely async: hand the `driving` flag to the callback and suspend the loop. - // If pageFuture completes during registration, this callback runs inline on the - // current thread and re-enters drive() while this frame still returns SUSPEND — - // safe, because the SUSPEND frame never clears `driving`. + // The callback stages the page, releases the flag, and re-enters the driver via + // resume() — on the executor when one was supplied. If pageFuture completes during + // registration the callback runs inline on the current thread; the SUSPEND frame + // still never clears `driving`, so the re-entry stays safe. pageFuture.whenComplete { _, _ -> if (stagePage(pageFuture)) { driving.set(false) - drive() + resume() } } return Step.SUSPEND diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt index d7412a15..a02bb38c 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt @@ -17,12 +17,15 @@ import org.dexpace.sdk.core.io.BufferedSource import java.io.IOException import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionException +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutionException import java.util.concurrent.Executor import java.util.concurrent.Executors +import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference +import java.util.function.Consumer import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals @@ -421,6 +424,141 @@ class AsyncPaginatorTest { assertTrue(ex.cause is IllegalStateException, "cause was ${ex.cause}") assertEquals("cannot build next page request", ex.cause?.message) } + + @Test + fun `an eager throw from executeAsync fails the walk instead of escaping`() { + // Unlike StubAsyncHttpClient (which turns a responder throw into a failed future), this + // transport violates the contract by throwing synchronously out of executeAsync. The + // paginator must catch it and fail the walk rather than let it escape forEachAsync. + val client = AsyncHttpClient { throw IllegalStateException("eager executeAsync failure") } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val ex = + assertFailsWith { + paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + } + assertTrue(ex.cause is IllegalStateException, "cause was ${ex.cause}") + assertEquals("eager executeAsync failure", ex.cause?.message) + } + + @Test + fun `forEachAsync with an executor runs the consumer on that executor`() { + // Synchronous-completion transport: without the executor the consumer would run on the + // caller's thread. With it, even synchronously completed pages must drain on the executor. + val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "consumer") } + try { + val client = threePageClient() + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val threads = ConcurrentHashMap.newKeySet() + val seen = ArrayList() + val recorder = + Consumer { item -> + threads.add(Thread.currentThread().name) + seen.add(item) + } + paginator.forEachAsync(recorder, executor).get(5, TimeUnit.SECONDS) + + assertEquals(listOf("a", "b", "c", "d", "e"), seen) + assertEquals(setOf("consumer"), threads, "consumer must run only on the supplied executor") + } finally { + executor.shutdownNow() + } + } + + @Test + fun `executor overload isolates the consumer from the transport callback thread`() { + // Pages complete on a background "transport" pool; the consumer is pinned to its own + // "consumer" executor. The consumer must never observe a transport thread. + val transport = Executors.newFixedThreadPool(2) { r -> Thread(r, "transport") } + val consumerExec = Executors.newSingleThreadExecutor { r -> Thread(r, "consumer") } + try { + val client = threePageClient(transport) + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val threads = ConcurrentHashMap.newKeySet() + val recorder = Consumer { threads.add(Thread.currentThread().name) } + paginator.forEachAsync(recorder, consumerExec).get(5, TimeUnit.SECONDS) + + assertEquals(setOf("consumer"), threads, "consumer ran on a transport thread") + } finally { + transport.shutdownNow() + consumerExec.shutdownNow() + } + } + + @Test + fun `collectAllAsync with an executor collects every item in order`() { + val executor = Executors.newSingleThreadExecutor() + try { + val client = threePageClient() + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val items = paginator.collectAllAsync(executor).get(5, TimeUnit.SECONDS) + + assertEquals(listOf("a", "b", "c", "d", "e"), items) + } finally { + executor.shutdownNow() + } + } + + @Test + fun `deep run on an executor stays stack-safe`() { + // The trampoline must stay flat even when entered through an executor: a long run of + // synchronously completed pages is processed in a single drive loop on one executor task. + val pageCount = 5_000 + val client = StubAsyncHttpClient() + for (page in 0 until pageCount) { + val url = + if (page == 0) { + "https://api.example.com/items" + } else { + "https://api.example.com/items?page=$page" + } + val isLast = page == pageCount - 1 + client.on(url) { req -> + if (isLast) { + textResponse(req, "p$page") + } else { + textResponse( + req, + "p$page", + extraHeaders = + mapOf( + "Link" to + "; rel=\"next\"", + ), + ) + } + } + } + val executor = Executors.newSingleThreadExecutor() + try { + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + val items = paginator.collectAllAsync(executor).get(30, TimeUnit.SECONDS) + assertEquals(pageCount, items.size) + assertEquals("p${pageCount - 1}", items.last()) + } finally { + executor.shutdownNow() + } + } + + @Test + fun `a rejected executor dispatch fails the walk`() { + // A shut-down executor rejects the very first dispatch; the walk must complete + // exceptionally with the rejection rather than hang or leak it. + val executor = Executors.newSingleThreadExecutor() + executor.shutdownNow() + val client = threePageClient() + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val noop = Consumer { } + val ex = + assertFailsWith { + paginator.forEachAsync(noop, executor).get(5, TimeUnit.SECONDS) + } + assertTrue(ex.cause is RejectedExecutionException, "cause was ${ex.cause}") + } } /** From 4fd2d409c9c060e45ea9c023437957ec9ce12499 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 24 Jun 2026 05:13:24 +0300 Subject: [PATCH 4/4] docs: clarify AsyncPaginator cancellation race and inFlight lifecycle The Cancellation KDoc claimed an in-flight page's response is always closed and discarded on abort. That holds only when the response is delivered before the cancel settles the transport future; if the cancel wins that race, the transport may have built a Response that never reaches the paginator's close path. Document this as an SPI-level limitation rather than promising more than the AsyncHttpClient contract can deliver. Also fix the inFlight field comment, which described the field as null between fetches. It is null only before the first fetch; afterward it retains the last (now-completed) transport future, and cancelling that completed future is a harmless no-op. --- .../dexpace/sdk/core/pagination/AsyncPaginator.kt | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt index b2f63f46..56124d5a 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt @@ -80,7 +80,12 @@ import java.util.function.Consumer * walk: the driver fetches no further pages, and the page exchange currently in flight is * best-effort aborted by cancelling its transport future (which propagates into the underlying * client per the [AsyncHttpClient] cancellation contract). A page request already dispatched - * may still complete before the abort takes effect; its response is closed and discarded. + * may still complete before the abort takes effect; when it completes successfully, the + * paginator closes and discards that response. One narrow race is inherent to the + * [AsyncHttpClient] SPI and the paginator cannot close around it: if the cancel settles the + * transport future before the transport delivers its [Response], that response never reaches the + * paginator's close path — releasing it is the transport's responsibility, since cancelling a + * `CompletableFuture` cannot reach back into an already-built response. * * Cancellation takes effect at page granularity. If the result is settled while a page is * mid-drain, the items already being delivered from that page still reach the consumer — the @@ -215,9 +220,11 @@ public class AsyncPaginator private val driving = AtomicBoolean(false) private var pendingPage: Page? = null - // The transport future for the page currently being fetched, or null between - // fetches. Written by the loop owner (in [fetchPage]); read by the cancellation - // hook on a possibly different thread, hence @Volatile. + // The transport future for the page currently being fetched: null until the first + // fetch, thereafter the most recently dispatched exchange (it is never reset to null, + // so after a fetch settles it retains that now-completed future — cancelling it is a + // harmless no-op). Written by the loop owner (in [fetchPage]); read by the + // cancellation hook on a possibly different thread, hence @Volatile. @Volatile private var inFlight: CompletableFuture? = null