diff --git a/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java b/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java index 3bdde2d..ee7aa7d 100644 --- a/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java +++ b/plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java @@ -233,15 +233,19 @@ public void store(final List samples, String clientID) throws StorageExc final Request request = builder.build(); LOG.trace("Writing: {}", writeRequest); - asyncHttpCallsBulkhead.executeCompletionStage(() -> executeAsync(request)).whenComplete((r, ex) -> { - if (ex == null) { - samplesWritten.mark(samplesSorted.size()); - } else { - // FIXME: Data loss - samplesLost.mark(samples.size()); - LOG.error("Error occurred while storing samples, sample will be lost.", ex); - } - }); + // Block until the HTTP write completes. This ensures the ring buffer worker + // thread does not process the next batch until this write has landed, preserving + // per-series timestamp ordering across consecutive WriteRequests as required by + // the Prometheus Remote Write spec. + try { + asyncHttpCallsBulkhead.executeCompletionStage(() -> executeAsync(request)).toCompletableFuture().get( + config.getWriteTimeoutInMs(), TimeUnit.MILLISECONDS); + samplesWritten.mark(samplesSorted.size()); + } catch (Exception ex) { + samplesLost.mark(samplesSorted.size()); + Throwable cause = ex.getCause() != null ? ex.getCause() : ex; + throw new StorageException("Failed to write samples to Prometheus: " + cause.getMessage(), cause); + } } private void persistExternalTags(final Sample s) {