Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
Expand Down Expand Up @@ -73,6 +75,8 @@ public class SemanticFolsomMetrics implements Metrics {
private final MetricId id;

private final Set<OutstandingRequestsGauge> gauges = new CopyOnWriteArraySet<>();
private final ConcurrentMap<String, HostMetrics> instanceMetrics = new ConcurrentHashMap<>();
private final MetricId outstandingRequestGaugeBaseId;

public SemanticFolsomMetrics(final SemanticMetricRegistry registry, final MetricId baseMetricId) {

Expand Down Expand Up @@ -142,12 +146,12 @@ protected Ratio getRatio() {
this.touchSuccesses = registry.meter(touchMetersId.tagged("result", "success"));
this.touchFailures = registry.meter(touchMetersId.tagged("result", "failure"));

final MetricId outstandingRequestGauge =
outstandingRequestGaugeBaseId =
id.tagged(
"what", "outstanding-requests",
"unit", "requests");
registry.register(
outstandingRequestGauge,
outstandingRequestGaugeBaseId.tagged("host", "all"),
(Gauge<Long>)
() ->
gauges.stream().mapToLong(OutstandingRequestsGauge::getOutstandingRequests).sum());
Expand Down Expand Up @@ -344,10 +348,47 @@ public RatioGauge getHitRatio() {
@Override
public void registerOutstandingRequestsGauge(final OutstandingRequestsGauge gauge) {
gauges.add(gauge);
final String hostName = gauge.getHostName();
instanceMetrics.computeIfAbsent(hostName, HostMetrics::new).add(gauge);
}

@Override
public void unregisterOutstandingRequestsGauge(OutstandingRequestsGauge gauge) {
public void unregisterOutstandingRequestsGauge(final OutstandingRequestsGauge gauge) {
final String hostName = gauge.getHostName();
gauges.remove(gauge);
instanceMetrics.computeIfAbsent(hostName, HostMetrics::new).remove(gauge);

// TODO: possibly clean up old hostnames from the map.
}

private class HostMetrics {
private final Set<OutstandingRequestsGauge> gauges = new CopyOnWriteArraySet<>();
private final MetricId metricId;
private boolean active = false;

private HostMetrics(final String hostname) {
metricId = outstandingRequestGaugeBaseId.tagged("host", hostname);
}

private synchronized void remove(final OutstandingRequestsGauge gauge) {
gauges.remove(gauge);

if (gauges.isEmpty() && active) {
active = false;
registry.remove(metricId);
}
}

private synchronized void add(final OutstandingRequestsGauge gauge) {
gauges.add(gauge);
if (!active) {
active = true;
registry.register(metricId, (Gauge<Long>) this::gaugeFunction);
}
}

private long gaugeFunction() {
return gauges.stream().mapToLong(OutstandingRequestsGauge::getOutstandingRequests).sum();
}
}
}
4 changes: 4 additions & 0 deletions folsom/src/main/java/com/spotify/folsom/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,9 @@ public interface Metrics {

interface OutstandingRequestsGauge {
int getOutstandingRequests();

default String getHostName() {
return "";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class DefaultRawMemcacheClient extends AbstractRawMemcacheClient {
? new EpollEventLoopGroup(0, THREAD_FACTORY)
: new NioEventLoopGroup(0, THREAD_FACTORY));
private final int pendingCounterLimit;
private final OutstandingRequestGauge outstandingRequestGauge = new OutstandingRequestGauge();

public static CompletionStage<RawMemcacheClient> connect(
final HostAndPort address,
Expand Down Expand Up @@ -210,7 +211,7 @@ private DefaultRawMemcacheClient(

GLOBAL_CONNECTION_COUNT.incrementAndGet();

metrics.registerOutstandingRequestsGauge(this::numPendingRequests);
metrics.registerOutstandingRequestsGauge(outstandingRequestGauge);

channel.pipeline().addLast("handler", new ConnectionHandler());
}
Expand Down Expand Up @@ -450,7 +451,7 @@ private void setDisconnected(String message) {
pendingCounter.set(pendingCounterLimit);
channel.close();
GLOBAL_CONNECTION_COUNT.decrementAndGet();
metrics.unregisterOutstandingRequestsGauge(this::numPendingRequests);
metrics.unregisterOutstandingRequestsGauge(outstandingRequestGauge);
notifyConnectionChange();
}
}
Expand All @@ -459,13 +460,21 @@ static int getGlobalConnectionCount() {
return GLOBAL_CONNECTION_COUNT.get();
}

private int numPendingRequests() {
final int counter = pendingCounter.get();
if (counter >= pendingCounterLimit) {
if (disconnectReason.get() != null) {
return 0; // Disconnected implies no pending requests
private class OutstandingRequestGauge implements Metrics.OutstandingRequestsGauge {
@Override
public int getOutstandingRequests() {
final int counter = pendingCounter.get();
if (counter >= pendingCounterLimit) {
if (disconnectReason.get() != null) {
return 0; // Disconnected implies no pending requests
}
}
return counter;
}

@Override
public String getHostName() {
return address.getHostText();
}
return counter;
}
}