diff --git a/folsom/src/main/java/com/spotify/folsom/client/DefaultRawMemcacheClient.java b/folsom/src/main/java/com/spotify/folsom/client/DefaultRawMemcacheClient.java index 48fb26a1..71a45278 100644 --- a/folsom/src/main/java/com/spotify/folsom/client/DefaultRawMemcacheClient.java +++ b/folsom/src/main/java/com/spotify/folsom/client/DefaultRawMemcacheClient.java @@ -83,6 +83,7 @@ public class DefaultRawMemcacheClient extends AbstractRawMemcacheClient { private final BatchFlusher flusher; private final HostAndPort address; private final Executor executor; + private final boolean isDirectExecutor; private final long timeoutMillis; private final Metrics metrics; private final int maxSetLength; @@ -194,6 +195,7 @@ private DefaultRawMemcacheClient( int maxSetLength) { this.address = address; this.executor = executor; + this.isDirectExecutor = executor == null || Utils.isDirectExecutor(executor); this.timeoutMillis = timeoutMillis; this.metrics = metrics; this.maxSetLength = maxSetLength; @@ -247,7 +249,11 @@ public CompletionStage send(final Request request) { } private CompletionStage onExecutor(CompletionStage future) { - return Utils.onExecutor(future, executor); + if (isDirectExecutor) { + return future; + } else { + return Utils.onExecutor(future, executor); + } } /** diff --git a/folsom/src/main/java/com/spotify/folsom/client/Utils.java b/folsom/src/main/java/com/spotify/folsom/client/Utils.java index 11e5ed6f..4f9c6550 100644 --- a/folsom/src/main/java/com/spotify/folsom/client/Utils.java +++ b/folsom/src/main/java/com/spotify/folsom/client/Utils.java @@ -16,6 +16,8 @@ package com.spotify.folsom.client; +import static java.util.Objects.requireNonNull; + import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.util.List; @@ -72,4 +74,43 @@ static CompletionStage onExecutor(CompletionStage future, Executor exe } return future.whenCompleteAsync((t, throwable) -> {}, executor); } + + /** + * Checks if an executor is direct or not. Direct means that the executor executes work on the + * same thread as the work was submitted on. + * + * @param executor may not be null + * @return true if the executor is a direct executor, otherwise false + */ + static boolean isDirectExecutor(Executor executor) { + requireNonNull(executor); + RunnableWithThread runnableWithThread = new RunnableWithThread(); + try { + executor.execute(runnableWithThread); + + // We have the following cases + // 1) It is a direct executor, so runnableWithThread.thread will be set to the current thread. + // We will correctly return true + // 2) It is not a direct executor and runnableWithThread.thread is still null when we check + // it. + // We correctly return false + // 3) It is not a direct executor but the runnableWithThread.thread somehow managed to get set + // before we check it. In any case, it can't be referencing the same thread we are in + // We correctly return false + + return runnableWithThread.thread == Thread.currentThread(); + } catch (Exception e) { + // If the executor throws any exception, it can not be a direct executor + return false; + } + } + + private static class RunnableWithThread implements Runnable { + private Thread thread; + + @Override + public void run() { + thread = Thread.currentThread(); + } + } } diff --git a/folsom/src/test/java/com/spotify/folsom/client/UtilsTest.java b/folsom/src/test/java/com/spotify/folsom/client/UtilsTest.java index fe500663..5fd7f084 100644 --- a/folsom/src/test/java/com/spotify/folsom/client/UtilsTest.java +++ b/folsom/src/test/java/com/spotify/folsom/client/UtilsTest.java @@ -2,11 +2,14 @@ import static org.junit.Assert.assertEquals; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import org.junit.Test; public class UtilsTest { @@ -70,4 +73,24 @@ public void testOnExecutorException() throws ExecutionException, InterruptedExce }); assertEquals("thread-B-0", future2.get()); } + + @Test + public void testIsDirectExecutor() { + assertDirect(MoreExecutors.directExecutor(), true); + assertDirect(MoreExecutors.newDirectExecutorService(), true); + + assertDirect(ForkJoinPool.commonPool(), false); + assertDirect(Executors.newFixedThreadPool(1), false); + assertDirect(Executors.newFixedThreadPool(10), false); + assertDirect(Executors.newSingleThreadExecutor(), false); + assertDirect(Executors.newCachedThreadPool(), false); + assertDirect(Executors.newWorkStealingPool(), false); + assertDirect(Executors.newWorkStealingPool(10), false); + } + + private void assertDirect(Executor executor, boolean expected) { + for (int i = 0; i < 1000000; i++) { + assertEquals(expected, Utils.isDirectExecutor(executor)); + } + } }