Skip to content

Commit 1425ba7

Browse files
authored
fix: keep LoggableResponseBody's single-close guarantee honest when close() throws (#137)
PR: #137
1 parent 135e1c0 commit 1425ba7

2 files changed

Lines changed: 129 additions & 8 deletions

File tree

sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/LoggableResponseBody.kt

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public class LoggableResponseBody
9797
private var closed = false
9898

9999
// True once the whole body fit within the cap and was drained into [captured]; the drain
100-
// path closes the source via `.use {}` in that case, so [source] can hand out repeatable
100+
// path closes the source (best-effort) in that case, so [source] can hand out repeatable
101101
// peek() views and close() must not double-close the delegate.
102102
@Volatile
103103
private var fullyCaptured = false
@@ -206,11 +206,18 @@ public class LoggableResponseBody
206206
* both through this single guard prevents a double-close on a delegate whose [source]
207207
* returns the same instance and whose [close] closes it (some sockets / streams throw on
208208
* double-close). Callers must hold [lock].
209+
*
210+
* The guard flips in a `finally`, so a delegate whose `close()` throws is still marked
211+
* closed: the failing close propagates to the caller, but the delegate is never closed a
212+
* second time. This matches the drain path, which likewise marks the delegate closed after
213+
* a best-effort source close so a later [close] is a no-op.
209214
*/
210215
@Throws(IOException::class)
211216
private fun closeDelegateOnce() {
212-
if (!delegateClosed) {
217+
if (delegateClosed) return
218+
try {
213219
delegate.close()
220+
} finally {
214221
delegateClosed = true
215222
}
216223
}
@@ -235,10 +242,11 @@ public class LoggableResponseBody
235242
* the source was never obtained, so the delegate remains open and a subsequent [close]
236243
* will still close it.
237244
*
238-
* If EOF is reached within the cap the body is fully captured: the source is closed and
239-
* [fullyCaptured] is set, preserving the fully-repeatable behavior. If the cap is hit with
240-
* bytes still pending the delegate is **left open** and retained as [liveTail] so the
241-
* consumer still receives the rest of the body via [source].
245+
* If EOF is reached within the cap the body is fully captured: the source is closed
246+
* (best-effort — a close failure does not become a [drainError], since the capture already
247+
* succeeded) and [fullyCaptured] is set, preserving the fully-repeatable behavior. If the
248+
* cap is hit with bytes still pending the delegate is **left open** and retained as
249+
* [liveTail] so the consumer still receives the rest of the body via [source].
242250
*
243251
* If `provider.buffer()` throws (rare; would indicate a misconfigured provider), the error
244252
* is cached in [drainError] and an empty buffer is used as the fallback capture so
@@ -271,8 +279,19 @@ public class LoggableResponseBody
271279
remaining -= n
272280
}
273281
if (fullyCaptured) {
274-
// Whole body captured: close the source (and via ownership the delegate).
275-
capturedSource.close()
282+
// Whole body captured: close the source (and via ownership the delegate). The
283+
// capture already succeeded, so a close failure here is best-effort cleanup, not
284+
// a drain failure — swallow it (as the read-failure handler below does) so the
285+
// complete captured body stays readable and is never reported as a [drainError].
286+
// Mark the delegate closed whether or not close() throws, so a later close() does
287+
// not close the same source a second time (some sockets / streams throw on
288+
// double-close).
289+
try {
290+
capturedSource.close()
291+
} catch (_: Throwable) {
292+
// Best-effort: the body is already fully captured; a close failure must not
293+
// poison it or leave the single-close guard unset.
294+
}
276295
delegateClosed = true
277296
} else {
278297
// The cap was hit (or maxCaptureBytes was <= 0) with bytes still pending: retain

sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/response/LoggableResponseBodyTest.kt

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,34 @@ class LoggableResponseBodyTest {
5252
}
5353
}
5454

55+
/**
56+
* A body that fits the (unbounded) cap and whose source's [close] throws every time it is
57+
* called, counting invocations. Models a delegate whose handle is not safe to close twice and
58+
* whose close can fail — exercises the full-capture path's best-effort, single-close behavior.
59+
*/
60+
private fun fullyCapturedBodyWithThrowingClose(
61+
text: String,
62+
sourceCloseCount: AtomicInteger,
63+
): ResponseBody {
64+
val backing = Io.provider.buffer().also { it.writeUtf8(text) }
65+
val throwingOnClose =
66+
object : BufferedSource by backing {
67+
override fun close() {
68+
sourceCloseCount.incrementAndGet()
69+
throw IOException("source close failed")
70+
}
71+
}
72+
return object : ResponseBody() {
73+
override fun mediaType(): MediaType? = MediaType.parse("text/plain")
74+
75+
override fun contentLength(): Long = text.toByteArray(Charsets.UTF_8).size.toLong()
76+
77+
override fun source(): BufferedSource = throwingOnClose
78+
79+
override fun close() {}
80+
}
81+
}
82+
5583
// ----- source() that throws before entering .use {} -----
5684

5785
@Test
@@ -166,6 +194,44 @@ class LoggableResponseBodyTest {
166194
)
167195
}
168196

197+
@Test
198+
fun `fully captured body whose source close throws stays readable and is not poisoned`() {
199+
// Full-capture path: the body fits the cap, so the drain reads it all and then closes the
200+
// source. If that close throws, the capture itself already succeeded — the complete body is
201+
// in the buffer — so source() must still return it and captureException must stay null. A
202+
// close failure after a complete capture is best-effort cleanup, not a drain failure.
203+
val sourceCloseCount = AtomicInteger(0)
204+
val wrapper = LoggableResponseBody(fullyCapturedBodyWithThrowingClose("hello", sourceCloseCount))
205+
206+
assertEquals(
207+
"hello",
208+
wrapper.source().readUtf8(),
209+
"a complete capture must stay readable even when the source close fails",
210+
)
211+
assertNull(
212+
wrapper.captureException,
213+
"a successful capture must not surface a drain error for a best-effort close failure",
214+
)
215+
}
216+
217+
@Test
218+
fun `fully captured body whose source close throws is closed exactly once`() {
219+
// The best-effort close must still happen exactly once: the drain closes the source and,
220+
// because that close threw, the guard must be marked closed so a later wrapper.close() does
221+
// not close the same source a second time (some sockets / streams throw on double-close).
222+
val sourceCloseCount = AtomicInteger(0)
223+
val wrapper = LoggableResponseBody(fullyCapturedBodyWithThrowingClose("hello", sourceCloseCount))
224+
225+
wrapper.source().readUtf8()
226+
wrapper.close()
227+
228+
assertEquals(
229+
1,
230+
sourceCloseCount.get(),
231+
"a fully-captured source whose close() throws must be closed exactly once across drain and wrapper close",
232+
)
233+
}
234+
169235
// ----- additional failure-semantics tests -----
170236

171237
@Test
@@ -474,6 +540,42 @@ class LoggableResponseBodyTest {
474540
)
475541
}
476542

543+
@Test
544+
fun `delegate whose close throws is invoked only once across two close calls`() {
545+
// Over-cap path: the one-shot source close and the wrapper close both funnel through the
546+
// single-close guard. If the first close() throws, the guard must still flip so the second
547+
// close() is a no-op — a delegate whose handle is not safe to close twice must see exactly
548+
// one close even when that close fails.
549+
val delegateCloseCount = AtomicInteger(0)
550+
val payload = "abcdefghijklmnopqrstuvwxyz" // 26 bytes > cap
551+
val delegate =
552+
object : ResponseBody() {
553+
override fun mediaType(): MediaType? = MediaType.parse("text/plain")
554+
555+
override fun contentLength(): Long = payload.toByteArray(Charsets.UTF_8).size.toLong()
556+
557+
override fun source(): BufferedSource = Io.provider.buffer().also { it.writeUtf8(payload) }
558+
559+
override fun close() {
560+
delegateCloseCount.incrementAndGet()
561+
throw IOException("delegate close failed")
562+
}
563+
}
564+
val wrapper = LoggableResponseBody.bounded(delegate, Io.provider, 5L)
565+
566+
val tail = wrapper.source()
567+
// First close reaches the delegate and throws; the guard must still flip.
568+
assertFailsWith<IOException> { tail.close() }
569+
// Second close must be a no-op — the delegate is not closed again.
570+
wrapper.close()
571+
572+
assertEquals(
573+
1,
574+
delegateCloseCount.get(),
575+
"a delegate whose close() throws must still be closed exactly once across two close() calls",
576+
)
577+
}
578+
477579
@Test
478580
fun `over-cap close then source close closes the underlying source exactly once`() {
479581
// The reverse order: wrapper.close() first, then closing the already-handed-out one-shot

0 commit comments

Comments
 (0)