diff --git a/folsom-semantic-metrics/src/main/java/com/spotify/folsom/client/SemanticFolsomMetrics.java b/folsom-semantic-metrics/src/main/java/com/spotify/folsom/client/SemanticFolsomMetrics.java index f79472d4..32c504b9 100644 --- a/folsom-semantic-metrics/src/main/java/com/spotify/folsom/client/SemanticFolsomMetrics.java +++ b/folsom-semantic-metrics/src/main/java/com/spotify/folsom/client/SemanticFolsomMetrics.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; /** @@ -73,6 +75,8 @@ public class SemanticFolsomMetrics implements Metrics { private final MetricId id; private final Set gauges = new CopyOnWriteArraySet<>(); + private final ConcurrentMap instanceMetrics = new ConcurrentHashMap<>(); + private final MetricId outstandingRequestGaugeBaseId; public SemanticFolsomMetrics(final SemanticMetricRegistry registry, final MetricId baseMetricId) { @@ -142,12 +146,12 @@ protected Ratio getRatio() { this.touchSuccesses = registry.meter(touchMetersId.tagged("result", "success")); this.touchFailures = registry.meter(touchMetersId.tagged("result", "failure")); - final MetricId outstandingRequestGauge = + outstandingRequestGaugeBaseId = id.tagged( "what", "outstanding-requests", "unit", "requests"); registry.register( - outstandingRequestGauge, + outstandingRequestGaugeBaseId.tagged("host", "all"), (Gauge) () -> gauges.stream().mapToLong(OutstandingRequestsGauge::getOutstandingRequests).sum()); @@ -344,10 +348,47 @@ public RatioGauge getHitRatio() { @Override public void registerOutstandingRequestsGauge(final OutstandingRequestsGauge gauge) { gauges.add(gauge); + final String hostName = gauge.getHostName(); + instanceMetrics.computeIfAbsent(hostName, HostMetrics::new).add(gauge); } @Override - public void unregisterOutstandingRequestsGauge(OutstandingRequestsGauge gauge) { + public void unregisterOutstandingRequestsGauge(final OutstandingRequestsGauge gauge) { + final String hostName = gauge.getHostName(); gauges.remove(gauge); + instanceMetrics.computeIfAbsent(hostName, HostMetrics::new).remove(gauge); + + // TODO: possibly clean up old hostnames from the map. + } + + private class HostMetrics { + private final Set gauges = new CopyOnWriteArraySet<>(); + private final MetricId metricId; + private boolean active = false; + + private HostMetrics(final String hostname) { + metricId = outstandingRequestGaugeBaseId.tagged("host", hostname); + } + + private synchronized void remove(final OutstandingRequestsGauge gauge) { + gauges.remove(gauge); + + if (gauges.isEmpty() && active) { + active = false; + registry.remove(metricId); + } + } + + private synchronized void add(final OutstandingRequestsGauge gauge) { + gauges.add(gauge); + if (!active) { + active = true; + registry.register(metricId, (Gauge) this::gaugeFunction); + } + } + + private long gaugeFunction() { + return gauges.stream().mapToLong(OutstandingRequestsGauge::getOutstandingRequests).sum(); + } } } diff --git a/folsom/src/main/java/com/spotify/folsom/Metrics.java b/folsom/src/main/java/com/spotify/folsom/Metrics.java index 9647c051..93963be6 100644 --- a/folsom/src/main/java/com/spotify/folsom/Metrics.java +++ b/folsom/src/main/java/com/spotify/folsom/Metrics.java @@ -42,5 +42,9 @@ public interface Metrics { interface OutstandingRequestsGauge { int getOutstandingRequests(); + + default String getHostName() { + return ""; + } } } diff --git a/folsom/src/main/java/com/spotify/folsom/client/DefaultRawMemcacheClient.java b/folsom/src/main/java/com/spotify/folsom/client/DefaultRawMemcacheClient.java index 11b2770a..130e603e 100644 --- a/folsom/src/main/java/com/spotify/folsom/client/DefaultRawMemcacheClient.java +++ b/folsom/src/main/java/com/spotify/folsom/client/DefaultRawMemcacheClient.java @@ -97,6 +97,7 @@ public class DefaultRawMemcacheClient extends AbstractRawMemcacheClient { ? new EpollEventLoopGroup(0, THREAD_FACTORY) : new NioEventLoopGroup(0, THREAD_FACTORY)); private final int pendingCounterLimit; + private final OutstandingRequestGauge outstandingRequestGauge = new OutstandingRequestGauge(); public static CompletionStage connect( final HostAndPort address, @@ -210,7 +211,7 @@ private DefaultRawMemcacheClient( GLOBAL_CONNECTION_COUNT.incrementAndGet(); - metrics.registerOutstandingRequestsGauge(this::numPendingRequests); + metrics.registerOutstandingRequestsGauge(outstandingRequestGauge); channel.pipeline().addLast("handler", new ConnectionHandler()); } @@ -450,7 +451,7 @@ private void setDisconnected(String message) { pendingCounter.set(pendingCounterLimit); channel.close(); GLOBAL_CONNECTION_COUNT.decrementAndGet(); - metrics.unregisterOutstandingRequestsGauge(this::numPendingRequests); + metrics.unregisterOutstandingRequestsGauge(outstandingRequestGauge); notifyConnectionChange(); } } @@ -459,13 +460,21 @@ static int getGlobalConnectionCount() { return GLOBAL_CONNECTION_COUNT.get(); } - private int numPendingRequests() { - final int counter = pendingCounter.get(); - if (counter >= pendingCounterLimit) { - if (disconnectReason.get() != null) { - return 0; // Disconnected implies no pending requests + private class OutstandingRequestGauge implements Metrics.OutstandingRequestsGauge { + @Override + public int getOutstandingRequests() { + final int counter = pendingCounter.get(); + if (counter >= pendingCounterLimit) { + if (disconnectReason.get() != null) { + return 0; // Disconnected implies no pending requests + } } + return counter; + } + + @Override + public String getHostName() { + return address.getHostText(); } - return counter; } }