From b6e6b758ae21fb5f4386099e7aea5b525e9b869f Mon Sep 17 00:00:00 2001 From: Chance Newkirk Date: Sun, 15 Mar 2026 20:33:17 -0700 Subject: [PATCH] Fix out-of-order sample loss by making remote write synchronous The store() method previously fired HTTP writes asynchronously via executeAsync() and returned immediately. When the RingBuffer's multiple worker threads dispatched consecutive batches containing samples for the same series, the async HTTP requests could arrive at the remote write endpoint out of timestamp order, causing the backend to reject the stale samples as out-of-order. This change makes store() block until the HTTP write completes, ensuring the ring buffer worker thread does not process the next batch until the current write has landed. This preserves per-series timestamp ordering across consecutive WriteRequests as required by the Prometheus Remote Write spec. Additionally fixes a bug where samplesLost incorrectly counted unfiltered samples (including NaN) instead of the actual samples that were attempted. Validated via A/B E2E testing against Thanos Receive: - Baseline (async): 14 out-of-order / 5,262 appended (0.27% loss) - Fix (sync): 0 out-of-order / 5,264 appended (0.00% loss) - Throughput: identical (~5,260 samples over equal soak periods) - All 45 smoke tests passing on both runs Assisted-By: Claude Opus 4.6 --- .../opennms/timeseries/cortex/CortexTSS.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java b/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java index 3bdde2d..ee7aa7d 100644 --- a/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java +++ b/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java @@ -233,15 +233,19 @@ public void store(final List samples, String clientID) throws StorageExc final Request request = builder.build(); LOG.trace("Writing: {}", writeRequest); - asyncHttpCallsBulkhead.executeCompletionStage(() -> executeAsync(request)).whenComplete((r, ex) -> { - if (ex == null) { - samplesWritten.mark(samplesSorted.size()); - } else { - // FIXME: Data loss - samplesLost.mark(samples.size()); - LOG.error("Error occurred while storing samples, sample will be lost.", ex); - } - }); + // Block until the HTTP write completes. This ensures the ring buffer worker + // thread does not process the next batch until this write has landed, preserving + // per-series timestamp ordering across consecutive WriteRequests as required by + // the Prometheus Remote Write spec. + try { + asyncHttpCallsBulkhead.executeCompletionStage(() -> executeAsync(request)).toCompletableFuture().get( + config.getWriteTimeoutInMs(), TimeUnit.MILLISECONDS); + samplesWritten.mark(samplesSorted.size()); + } catch (Exception ex) { + samplesLost.mark(samplesSorted.size()); + Throwable cause = ex.getCause() != null ? ex.getCause() : ex; + throw new StorageException("Failed to write samples to Prometheus: " + cause.getMessage(), cause); + } } private void persistExternalTags(final Sample s) {