Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions plugin/src/main/java/org/opennms/timeseries/cortex/CortexTSS.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,19 @@ public void store(final List<Sample> samples, String clientID) throws StorageExc
final Request request = builder.build();

LOG.trace("Writing: {}", writeRequest);
asyncHttpCallsBulkhead.executeCompletionStage(() -> executeAsync(request)).whenComplete((r, ex) -> {
if (ex == null) {
samplesWritten.mark(samplesSorted.size());
} else {
// FIXME: Data loss
samplesLost.mark(samples.size());
LOG.error("Error occurred while storing samples, sample will be lost.", ex);
}
});
// Block until the HTTP write completes. This ensures the ring buffer worker
// thread does not process the next batch until this write has landed, preserving
// per-series timestamp ordering across consecutive WriteRequests as required by
// the Prometheus Remote Write spec.
try {
asyncHttpCallsBulkhead.executeCompletionStage(() -> executeAsync(request)).toCompletableFuture().get(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't completely fix out of order issues as there are 16 writer threads that are trying to write. https://github.com/OpenNMS/opennms/blob/caf278eeb34341791e5617c139d2aa49cb517f4d/features/timeseries/src/main/resources/OSGI-INF/blueprint/blueprint.xml#L16

This only helps when write is very slow.
I think we should probably use a different config timeout other than config.getWriteTimeoutInMs() as this is broader than just write, ( connect, write, wait for ack)

config.getWriteTimeoutInMs(), TimeUnit.MILLISECONDS);
samplesWritten.mark(samplesSorted.size());
} catch (Exception ex) {
samplesLost.mark(samplesSorted.size());
Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
throw new StorageException("Failed to write samples to Prometheus: " + cause.getMessage(), cause);
}
}

private void persistExternalTags(final Sample s) {
Expand Down