diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/hrtr/HttpRemoteTaskRunnerScaleTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/hrtr/HttpRemoteTaskRunnerScaleTest.java new file mode 100644 index 000000000000..0680827ef06c --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/hrtr/HttpRemoteTaskRunnerScaleTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing.hrtr; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** + * Scale test for HttpRemoteTaskRunnerV2 with large number of tasks with varying priorities and runtime durations. + */ +public class HttpRemoteTaskRunnerScaleTest extends EmbeddedClusterTestBase +{ + private static final Logger log = new Logger(HttpRemoteTaskRunnerScaleTest.class); + private static final int NUM_TASKS = 5_000; + private static final int NUM_WORKERS = 10; + private static final int WORKER_CAPACITY = 50; + private static final int TASK_RUN_TIME_MS = 100; + + private final EmbeddedOverlord overlord = new EmbeddedOverlord() + .addProperty("druid.indexer.runner.type", "httpRemote") + .addProperty("druid.indexer.runner.pendingTasksRunnerNumThreads", String.valueOf(Runtime.getRuntime().availableProcessors())); + + private final EmbeddedIndexer[] indexers = new EmbeddedIndexer[NUM_WORKERS]; + + @Override + public EmbeddedDruidCluster createCluster() + { + for (int i = 0; i < NUM_WORKERS; i++) { + indexers[i] = new EmbeddedIndexer() + .addProperty("druid.worker.capacity", String.valueOf(WORKER_CAPACITY)) + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s") + .addProperty("druid.plaintextPort", String.valueOf(8100 + i)); + } + + EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .useDefaultTimeoutForLatchableEmitter(240) + .addServer(new EmbeddedCoordinator()) + .addServer(overlord); + + for (EmbeddedIndexer indexer : indexers) { + cluster.addServer(indexer); + } + + return cluster + .addServer(new EmbeddedHistorical()) + .addServer(new EmbeddedBroker()) + .addServer(new EmbeddedRouter()); + } + + @Test + @Timeout(120) + public void test_scaleWithManyTasks() + { + log.info("Creating [%d] tasks...", NUM_TASKS); + List tasks = createTasks(NUM_TASKS); + + long startTime = System.currentTimeMillis(); + List taskIds = new ArrayList<>(); + double totalTime = 0; + for (NoopTask task : tasks) { + String taskId = task.getId(); + taskIds.add(taskId); + long t0 = System.nanoTime(); + cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task)); + long t1 = System.nanoTime(); + totalTime += t1 - t0; + } + + int successCount = 0; + int failureCount = 0; + int reportInterval = NUM_TASKS / 10; // Report every 10% + + log.info("Waiting for tasks to complete..."); + for (int i = 0; i < taskIds.size(); i++) { + String taskId = taskIds.get(i); + try { + cluster.callApi().waitForTaskToSucceed(taskId, overlord); + ++successCount; + if ((i + 1) % reportInterval == 0) { + log.info("Progress: %d/%d tasks completed (%.1f%%)", i + 1, NUM_TASKS, ((i + 1) * 100.0) / NUM_TASKS); + } + } + catch (AssertionError e) { + ++failureCount; + log.error("Task %s failed: %s", taskId, e.getMessage()); + } + } + + long endTime = System.currentTimeMillis(); + long durationMs = endTime - startTime; + + double tasksPerSecond = (NUM_TASKS * 1000.0) / durationMs; + log.info("All tasks submitted in [%d] ms", System.currentTimeMillis() - startTime); + log.info("Avg task submission time: [%f] ns", totalTime / NUM_TASKS); + log.info( + "Task runner completed %d/%d tasks in %dms (%.2f tasks/sec)", + successCount, + NUM_TASKS, + durationMs, + tasksPerSecond + ); + + printWaitTimeStats(); + + Assertions.assertEquals(NUM_TASKS, successCount, "All tasks should succeed"); + Assertions.assertEquals(0, failureCount, "No tasks should fail"); + } + + private void printWaitTimeStats() + { + List waitEvents = overlord.latchableEmitter().getMetricEvents("task/waiting/time"); + if (waitEvents.isEmpty()) { + log.info("No task/waiting/time events found"); + return; + } + + List waitTimesMs = new ArrayList<>(); + for (ServiceMetricEvent event : waitEvents) { + waitTimesMs.add(event.getValue().longValue()); + } + Collections.sort(waitTimesMs); + + long min = waitTimesMs.get(0); + long max = waitTimesMs.get(waitTimesMs.size() - 1); + long sum = waitTimesMs.stream().mapToLong(Long::longValue).sum(); + double avg = (double) sum / waitTimesMs.size(); + long p50 = waitTimesMs.get((int) (waitTimesMs.size() * 0.50)); + long p90 = waitTimesMs.get((int) (waitTimesMs.size() * 0.90)); + long p99 = waitTimesMs.get(Math.min((int) (waitTimesMs.size() * 0.99), waitTimesMs.size() - 1)); + + log.info("=== task/waiting/time stats (n=%d) ===", waitTimesMs.size()); + log.info(" min=%dms avg=%.1fms p50=%dms p90=%dms p99=%dms max=%dms", + min, avg, p50, p90, p99, max); + } + + private List createTasks(int numTasks) + { + List tasks = new ArrayList<>(); + Random random = new Random(12345); + + for (int i = 0; i < numTasks; i++) { + int priority = random.nextInt(101); + String taskId = IdUtils.getRandomIdWithPrefix(dataSource); + + Map context = new HashMap<>(); + context.put("priority", priority); + tasks.add(new NoopTask(taskId, null, dataSource, TASK_RUN_TIME_MS, 0, context)); + } + + return tasks; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 62f6b499cb30..07a6baad0fad 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -22,17 +22,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -40,8 +37,7 @@ import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import org.apache.curator.framework.CuratorFramework; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; @@ -74,6 +70,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; @@ -86,10 +83,9 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.query.DruidMetrics; -import org.apache.druid.server.initialization.IndexerZkConfig; import org.apache.druid.tasklogs.TaskLogStreamer; -import org.apache.zookeeper.KeeperException; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; @@ -101,7 +97,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -112,31 +107,29 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * A Remote TaskRunner to manage tasks on Middle Manager nodes using internal-discovery({@link DruidNodeDiscoveryProvider}) - * to discover them and Http. - * Middle Managers manages list of assigned/completed tasks on disk and expose 3 HTTP endpoints - * 1. POST request for assigning a task - * 2. POST request for shutting down a task - * 3. GET request for getting list of assigned, running, completed tasks on Middle Manager and its enable/disable status. - * This endpoint is implemented to support long poll and holds the request till there is a change. This class - * sends the next request immediately as the previous finishes to keep the state up-to-date. - *

- * ZK_CLEANUP_TODO : As of 0.11.1, it is required to cleanup task status paths from ZK which are created by the - * workers to support deprecated RemoteTaskRunner. So a method "scheduleCompletedTaskStatusCleanupFromZk()" is added' - * which should be removed in the release that removes RemoteTaskRunner legacy ZK updation WorkerTaskMonitor class. + * HTTP-based distributed task scheduler that manages assignment of tasks to slots on workers (MiddleManagers or Indexers). + * + * @see HttpRemoteTaskRunnerWorkItem.State for the task state machine. + * @see WorkerHolder.State for the worker state machine. */ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer, WorkerHolder.Listener { - public static final String TASK_DISCOVERED_COUNT = "task/discovered/count"; - private static final EmittingLogger log = new EmittingLogger(HttpRemoteTaskRunner.class); + public static final String TASK_DISCOVERED_COUNT = "task/discovered/count"; + private final LifecycleLock lifecycleLock = new LifecycleLock(); // Executor for assigning pending tasks to workers. @@ -144,39 +137,28 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer, // All known tasks, TaskID -> HttpRemoteTaskRunnerWorkItem // This is a ConcurrentMap as some of the reads are done without holding the lock. - @GuardedBy("statusLock") - private final ConcurrentMap tasks = new ConcurrentHashMap<>(); + private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); - // This is the list of pending tasks in the order they arrived, exclusively manipulated/used by thread that - // gives a new task to this class and threads in pendingTasksExec that are responsible for assigning tasks to - // workers. - @GuardedBy("statusLock") - private final List pendingTaskIds = new ArrayList<>(); + private final LinkedBlockingQueue pendingTasks = new LinkedBlockingQueue<>(); - // All discovered workers, "host:port" -> WorkerHolder - private final ConcurrentMap workers = new ConcurrentHashMap<>(); + // All discovered workers, "host:port" -> WorkerHolder. + // Locking policy: structural writes (add/remove/update entries) and READ ALL operations (iteration, size()) + // require workerStateLock. Point reads (.get(), .containsKey() on a known key) are safe without it, + // as ConcurrentHashMap guarantees atomic reads without external synchronization. + private final ConcurrentHashMap workers = new ConcurrentHashMap<>(); // Executor for syncing state of each worker. private final ScheduledExecutorService workersSyncExec; - // Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy. - private final ConcurrentMap lazyWorkers = new ConcurrentHashMap<>(); - - // Workers that have been blacklisted. - private final ConcurrentHashMap blackListedWorkers = new ConcurrentHashMap<>(); - - // workers which were assigned a task and are yet to acknowledge same. - // Map: workerId -> taskId - // all writes are guarded - @GuardedBy("statusLock") - private final ConcurrentMap workersWithUnacknowledgedTask = new ConcurrentHashMap<>(); + // Internal worker state counters + private final AtomicLong blackListedWorkersCount = new AtomicLong(0); // Executor to complete cleanup of workers which have disappeared. private final ListeningScheduledExecutorService cleanupExec; private final ConcurrentMap removedWorkerCleanups = new ConcurrentHashMap<>(); - - private final Object statusLock = new Object(); + // Lock for synchronizing worker state transitions to minimize races between scheduling/accounting/adhoc worker routines + private final Object workerStateLock = new Object(); // task runner listeners private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); @@ -194,15 +176,6 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer, private final TaskStorage taskStorage; private final ServiceEmitter emitter; - // ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed. - private static final Joiner JOINER = Joiner.on("/"); - - @Nullable // Null, if zk is disabled - private final CuratorFramework cf; - - @Nullable // Null, if zk is disabled - private final ScheduledExecutorService zkCleanupExec; - private final IndexerZkConfig indexerZkConfig; private volatile DruidNodeDiscovery.Listener nodeDiscoveryListener; public HttpRemoteTaskRunner( @@ -213,8 +186,6 @@ public HttpRemoteTaskRunner( ProvisioningStrategy provisioningStrategy, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, TaskStorage taskStorage, - @Nullable CuratorFramework cf, - IndexerZkConfig indexerZkConfig, ServiceEmitter emitter ) { @@ -240,19 +211,6 @@ public HttpRemoteTaskRunner( ScheduledExecutors.fixed(1, "HttpRemoteTaskRunner-Worker-Cleanup-%d") ); - if (cf != null) { - this.cf = cf; - this.zkCleanupExec = ScheduledExecutors.fixed( - 1, - "HttpRemoteTaskRunner-zk-cleanup-%d" - ); - } else { - this.cf = null; - this.zkCleanupExec = null; - } - - this.indexerZkConfig = indexerZkConfig; - this.provisioningStrategy = provisioningStrategy; } @@ -265,9 +223,7 @@ public void start() } try { - log.info("Starting..."); - - scheduleCompletedTaskStatusCleanupFromZk(); + log.info("Starting"); startWorkersHandling(); @@ -286,7 +242,7 @@ public void start() lifecycleLock.started(); - log.info("Started."); + log.info("Started"); } catch (Exception e) { throw new RuntimeException(e); @@ -296,91 +252,30 @@ public void start() } } - private void scheduleCompletedTaskStatusCleanupFromZk() + private ImmutableMap getImmutableWorkersCopy() { - if (cf == null) { - return; + ImmutableMap workersCopy; + synchronized (workerStateLock) { + workersCopy = ImmutableMap.copyOf(workers); } - - zkCleanupExec.scheduleAtFixedRate( - () -> { - try { - List workers; - try { - workers = cf.getChildren().forPath(indexerZkConfig.getStatusPath()); - } - catch (KeeperException.NoNodeException e) { - // statusPath doesn't exist yet; can occur if no middleManagers have started. - workers = ImmutableList.of(); - } - - Set knownActiveTaskIds = new HashSet<>(); - if (!workers.isEmpty()) { - for (Task task : taskStorage.getActiveTasks()) { - knownActiveTaskIds.add(task.getId()); - } - } - - for (String workerId : workers) { - String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), workerId); - - List taskIds; - try { - taskIds = cf.getChildren().forPath(workerStatusPath); - } - catch (KeeperException.NoNodeException e) { - taskIds = ImmutableList.of(); - } - - for (String taskId : taskIds) { - if (!knownActiveTaskIds.contains(taskId)) { - String taskStatusPath = JOINER.join(workerStatusPath, taskId); - try { - cf.delete().guaranteed().forPath(taskStatusPath); - } - catch (KeeperException.NoNodeException e) { - log.info("Failed to delete taskStatusPath[%s].", taskStatusPath); - } - } - } - } - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - catch (Exception ex) { - log.error(ex, "Unknown error while doing task status cleanup in ZK."); - } - }, - 1, - 5, - TimeUnit.MINUTES - ); + return workersCopy; } - /** - * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} - */ - @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe - Map getWorkersEligibleToRunTasks() + ImmutableMap getWorkersEligibleToRunTasks() { - // In this class, this method is called with statusLock held. - // writes to workersWithUnacknowledgedTask are always guarded by statusLock. - // however writes to lazyWorker/blacklistedWorkers aren't necessarily guarded by same lock, so technically there - // could be races in that a task could get assigned to a worker which in another thread is concurrently being - // marked lazy/blacklisted , but that is ok because that is equivalent to this worker being picked for task and - // being assigned lazy/blacklisted right after even when the two threads hold a mutually exclusive lock. - return Maps.transformEntries( - Maps.filterEntries( - workers, - input -> !lazyWorkers.containsKey(input.getKey()) && - !workersWithUnacknowledgedTask.containsKey(input.getKey()) && - !blackListedWorkers.containsKey(input.getKey()) && - input.getValue().isInitialized() && - input.getValue().isEnabled() - ), - (String key, WorkerHolder value) -> value.toImmutable() - ); + synchronized (workerStateLock) { + return ImmutableMap.copyOf( + Maps.transformEntries( + Maps.filterEntries( + workers, + input -> input.getValue().getState() == WorkerHolder.State.READY && + input.getValue().isInitialized() && + input.getValue().isEnabled() + ), + (String key, WorkerHolder value) -> value.toImmutable() + ) + ); + } } private ImmutableWorkerInfo findWorkerToRunTask(Task task) @@ -389,72 +284,77 @@ private ImmutableWorkerInfo findWorkerToRunTask(Task task) WorkerSelectStrategy strategy; if (workerConfig == null || workerConfig.getSelectStrategy() == null) { strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY; - log.debug("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName()); } else { strategy = workerConfig.getSelectStrategy(); } return strategy.findWorkerForTask( config, - ImmutableMap.copyOf(getWorkersEligibleToRunTasks()), + getWorkersEligibleToRunTasks(), task ); } + /** + * Attempts to assign a task to the given worker and blocks until the worker acknowledges it. + * + * @return true if the task was successfully assigned and started on the worker; + * false if the worker was not found (caller should requeue the task); + * throws {@link ISE} on assignment timeout (caller should fail the task). + */ private boolean runTaskOnWorker( - final HttpRemoteTaskRunnerWorkItem workItem, + final String taskId, final String workerHost ) throws InterruptedException { - String taskId = workItem.getTaskId(); - WorkerHolder workerHolder = workers.get(workerHost); + log.info("Assigning task[%s] to worker[%s]", taskId, workerHost); - if (workerHolder == null || lazyWorkers.containsKey(workerHost) || blackListedWorkers.containsKey(workerHost)) { - log.info("Not assigning task[%s] to removed or marked lazy/blacklisted worker[%s]", taskId, workerHost); + final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId); + // Point read on ConcurrentHashMap: safe without workerStateLock + final WorkerHolder workerHolder = workers.get(workerHost); + + Preconditions.checkState(workItem != null, "No task item found for task[%s]", taskId); + + // Worker was removed, gracefully bail + if (workerHolder == null) { + log.warn("No worker found for host[%s]", workerHost); return false; } - log.info("Assigning task [%s] to worker [%s]", taskId, workerHost); + Preconditions.checkState( + workerHolder.getState() == WorkerHolder.State.PENDING_ASSIGN, + "Found invalid state[%s] for worker[%s], expected state[%s]", + workerHolder.getState(), + workerHost, + WorkerHolder.State.PENDING_ASSIGN + ); if (workerHolder.assignTask(workItem.getTask())) { // Don't assign new tasks until the task we just assigned is actually running // on a worker - this avoids overflowing a worker with tasks long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis(); - long waitStart = System.currentTimeMillis(); - boolean isTaskAssignmentTimedOut = false; - synchronized (statusLock) { - while (tasks.containsKey(taskId) && tasks.get(taskId).getState().isPending()) { - long remaining = waitMs - (System.currentTimeMillis() - waitStart); - if (remaining > 0) { - statusLock.wait(remaining); - } else { - isTaskAssignmentTimedOut = true; - break; - } - } - } - if (isTaskAssignmentTimedOut) { + try { + workItem.getTaskStartedFuture().get(waitMs, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { log.makeAlert( - "Task assignment timed out on worker [%s], never ran task [%s] in timeout[%s]!", + "Task assignment timed out on worker[%s], never ran task[%s] in timeout[%s]!", workerHost, taskId, config.getTaskAssignmentTimeout() ).emit(); - // taskComplete(..) must be called outside of statusLock, see comments on method. - taskComplete( - workItem, - workerHolder, - TaskStatus.failure( - taskId, - StringUtils.format( - "The worker that this task is assigned did not start it in timeout[%s]. " - + "See overlord and middleManager/indexer logs for more details.", - config.getTaskAssignmentTimeout() - ) - ) + + throw new ISE( + "Task assignment timed out on worker[%s], never ran task[%s] in timeout[%s]! See overlord and middleManager/indexer logs for more details", + workerHost, + taskId, + config.getTaskAssignmentTimeout() ); } + catch (ExecutionException e) { + throw new RuntimeException(e); + } return true; } else { @@ -463,63 +363,77 @@ private boolean runTaskOnWorker( } // CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which results in TaskQueue.notifyStatus() being called - // because that is attached by TaskQueue to task result future. So, this method must not be called with "statusLock" + // because that is attached by TaskQueue to task result future. So, this method must not be called with "workerStatusLock" // held. See https://github.com/apache/druid/issues/6201 private void taskComplete( - HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem, - WorkerHolder workerHolder, + String taskId, + String workerHost, TaskStatus taskStatus ) { - Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread must not hold statusLock."); - Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem"); - Preconditions.checkNotNull(taskStatus, "taskStatus"); - if (workerHolder != null) { - log.info( - "Worker[%s] completed task[%s] with status[%s]", - workerHolder.getWorker().getHost(), - taskStatus.getId(), - taskStatus.getStatusCode() - ); - // Worker is done with this task - workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc()); - } + Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current thread must not hold workerStateLock"); - if (taskRunnerWorkItem.getResult().isDone()) { - // This is not the first complete event. - try { - TaskState lastKnownState = taskRunnerWorkItem.getResult().get().getStatusCode(); - if (taskStatus.getStatusCode() != lastKnownState) { - log.warn( - "The state of the new task complete event is different from its last known state. " - + "New state[%s], last known state[%s]", - taskStatus.getStatusCode(), - lastKnownState - ); + AtomicBoolean taskFound = new AtomicBoolean(false); + AtomicBoolean taskCompleted = new AtomicBoolean(false); + + tasks.compute( + taskId, + (key, taskEntry) -> { + if (taskEntry != null) { + taskFound.set(true); + if (taskEntry.getResultStatus() != null) { + // This is not the first complete event. + TaskState lastKnownState = taskEntry.getResultStatus().getStatusCode(); + if (taskStatus.getStatusCode() != lastKnownState) { + log.warn( + "Ignoring update to status[%s] as task[%s] has already completed with status[%s]", + taskStatus, + taskId, + lastKnownState + ); + } + } else { + taskEntry.setResult(taskStatus); + taskCompleted.set(true); + } + } + return taskEntry; } - } - catch (InterruptedException e) { - log.warn(e, "Interrupted while getting the last known task status."); - Thread.currentThread().interrupt(); - } - catch (ExecutionException e) { - // This case should not really happen. - log.warn(e, "Failed to get the last known task status. Ignoring this failure."); - } - } else { - // Notify interested parties - taskRunnerWorkItem.setResult(taskStatus); - TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus); + ); + + if (!taskFound.get()) { + throw new RE("Expected task[%s] to have been found", taskId); + } - // Update success/failure counters, Blacklist node if there are too many failures. - if (workerHolder != null) { - blacklistWorkerIfNeeded(taskStatus, workerHolder); + if (workerHost != null) { + final AtomicBoolean workerFound = new AtomicBoolean(false); + synchronized (workerStateLock) { + workers.compute( + workerHost, + (key, workerHolder) -> { + if (workerHolder != null) { + workerFound.set(true); + workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc()); + blacklistWorkerIfNeeded(taskStatus, workerHolder); + } + return workerHolder; + } + ); + } + if (workerFound.get()) { + log.info("Worker[%s] completed task[%s] with status[%s]", workerHost, taskId, taskStatus); + } else { + log.info("Could not find worker[%s] while marking task[%s] as complete", workerHost, taskId); } } - synchronized (statusLock) { - statusLock.notifyAll(); + // Notify listeners outside both tasks/workers critical sections to avoid deadlock + if (taskCompleted.get()) { + TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), taskStatus); } + + // Notify interested parties that a worker is potentially free and/or a task status updated + notifyWatchers(); } private void startWorkersHandling() throws InterruptedException @@ -554,30 +468,46 @@ public void nodeViewInitializedTimedOut() nodeViewInitialized(); } }; + druidNodeDiscovery.registerListener(nodeDiscoveryListener); long workerDiscoveryStartTime = System.currentTimeMillis(); while (!workerViewInitialized.await(30, TimeUnit.SECONDS)) { if (System.currentTimeMillis() - workerDiscoveryStartTime > TimeUnit.MINUTES.toMillis(5)) { - throw new ISE("Couldn't discover workers."); + throw new ISE("Couldn't discover workers"); } else { - log.info("Waiting for worker discovery..."); + log.info("Waiting for worker discovery"); } } - log.info("[%s] Workers are discovered.", workers.size()); - - // Wait till all worker state is sync'd so that we know which worker is running/completed what tasks or else + // Wait till all worker state is synced so that we know which worker is running/completed what tasks or else // We would start assigning tasks which are pretty soon going to be reported by discovered workers. - for (WorkerHolder worker : workers.values()) { - log.info("Waiting for worker[%s] to sync state...", worker.getWorker().getHost()); - worker.waitForInitialization(); + // Snapshot workers under workerStateLock, then wait outside it. waitForInitialization() blocks until the + // sync callback completes, and that callback calls notifyWatchers() which needs workerStateLock — holding + // it here would deadlock. + final List discoveredWorkers; + synchronized (workerStateLock) { + log.info("Discovered [%d] workers", workers.size()); + discoveredWorkers = new ArrayList<>(workers.values()); } - log.info("Workers have sync'd state successfully."); + + for (WorkerHolder workerHolder : discoveredWorkers) { + try { + workerHolder.waitForInitialization(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + log.info("Workers have synced state successfully"); } private Worker toWorker(DiscoveryDruidNode node) { - final WorkerNodeService workerNodeService = node.getService(WorkerNodeService.DISCOVERY_SERVICE_KEY, WorkerNodeService.class); + final WorkerNodeService workerNodeService = node.getService( + WorkerNodeService.DISCOVERY_SERVICE_KEY, + WorkerNodeService.class + ); if (workerNodeService == null) { // this shouldn't typically happen, but just in case it does, make a dummy worker to allow the callbacks to // continue since addWorker/removeWorker only need worker.getHost() @@ -603,58 +533,63 @@ private Worker toWorker(DiscoveryDruidNode node) @VisibleForTesting void addWorker(final Worker worker) { - synchronized (workers) { - log.info("Worker[%s] reportin' for duty!", worker.getHost()); - cancelWorkerCleanup(worker.getHost()); - - WorkerHolder holder = workers.get(worker.getHost()); - if (holder == null) { - List expectedAnnouncements = new ArrayList<>(); - synchronized (statusLock) { - // It might be a worker that existed before, temporarily went away and came back. We might have a set of - // tasks that we think are running on this worker. Provide that information to WorkerHolder that - // manages the task syncing with that worker. - for (Map.Entry e : tasks.entrySet()) { - HttpRemoteTaskRunnerWorkItem workItem = e.getValue(); - if (workItem.isRunningOnWorker(worker)) { - // This announcement is only used to notify when a task has disappeared on the worker - // So it is okay to set the dataSource and taskResource to null as they will not be used - expectedAnnouncements.add( - TaskAnnouncement.create( - workItem.getTaskId(), - workItem.getTaskType(), - null, - TaskStatus.running(workItem.getTaskId()), - workItem.getLocation(), - null - ) + log.info("Adding worker[%s]", worker.getHost()); + synchronized (workerStateLock) { + workers.compute( + worker.getHost(), + (key, workerEntry) -> { + cancelWorkerCleanup(worker.getHost()); + + // There cannot be any new tasks assigned to this worker as the entry has not been published yet. + // However, there can be callbacks in taskAddedOrUpdated() where some task suddenly begins running + // on this worker. That method still blocks on this key lock, so it will occur strictly before/after this insertion. + if (workerEntry == null) { + log.info("Unrecognized worker[%s], rebuilding task mapping", worker.getHost()); + final List expectedAnnouncements = new ArrayList<>(); + // It might be a worker that existed before, temporarily went away and came back. We might have a set of + // tasks that we think are running on this worker. Provide that information to WorkerHolder that + // manages the task syncing with that worker. + for (HttpRemoteTaskRunnerWorkItem taskEntry : tasks.values()) { + if (taskEntry.isRunningOnWorker(worker)) { + // This announcement is only used to notify when a task has disappeared on the worker + // So it is okay to set the dataSource and taskResource to null as they will not be used + expectedAnnouncements.add( + TaskAnnouncement.create( + taskEntry.getTaskId(), + taskEntry.getTaskType(), + null, + TaskStatus.running(taskEntry.getTaskId()), + taskEntry.getLocation(), + null + ) + ); + } + } + + workerEntry = createWorkerHolder( + smileMapper, + httpClient, + config, + workersSyncExec, + this, + worker, + expectedAnnouncements ); + workerEntry.start(); + } else { + log.info("Worker[%s] already exists", worker.getHost()); } + return workerEntry; } - } - holder = createWorkerHolder( - smileMapper, - httpClient, - config, - workersSyncExec, - this, - worker, - expectedAnnouncements - ); - holder.start(); - workers.put(worker.getHost(), holder); - } else { - log.info("Worker[%s] already exists.", worker.getHost()); - } - } + ); - synchronized (statusLock) { - statusLock.notifyAll(); + // Notify any waiters that there is a new worker available + workerStateLock.notifyAll(); } } protected WorkerHolder createWorkerHolder( - ObjectMapper smileMapper, + ObjectMapper objectMapper, HttpClient httpClient, HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, @@ -663,40 +598,49 @@ protected WorkerHolder createWorkerHolder( List knownAnnouncements ) { - return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker, knownAnnouncements); + return new WorkerHolder(objectMapper, httpClient, config, workersSyncExec, listener, worker, knownAnnouncements); } - private void removeWorker(final Worker worker) + @VisibleForTesting + void removeWorker(final Worker worker) { - synchronized (workers) { - log.info("Kaboom! Worker[%s] removed!", worker.getHost()); - - WorkerHolder workerHolder = workers.remove(worker.getHost()); + // Acquire workerLock to ensure atomicity between worker removal and competing scheduling routines + final WorkerHolder workerEntry; + final boolean wasBlacklisted; + synchronized (workerStateLock) { + workerEntry = workers.remove(worker.getHost()); + wasBlacklisted = workerEntry != null && workerEntry.getState() == WorkerHolder.State.BLACKLISTED; + if (wasBlacklisted) { + blackListedWorkersCount.decrementAndGet(); + } + } - if (workerHolder != null) { - try { - workerHolder.stop(); - scheduleTasksCleanupForWorker(worker.getHost()); - } - catch (Exception e) { - throw new RuntimeException(e); - } - finally { - checkAndRemoveWorkersFromBlackList(); - } + // Perform the cleanup operations outside the lock to avoid excessive locking/deadlock + if (workerEntry != null) { + log.info("Removing worker[%s]", worker.getHost()); + try { + workerEntry.stop(); + scheduleTasksCleanupForWorker(worker.getHost()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + // Removing a blacklisted worker may allow more workers to stay on the blacklist; re-evaluate thresholds. + if (wasBlacklisted) { + checkAndRemoveWorkersFromBlackList(); } - lazyWorkers.remove(worker.getHost()); + } else { + log.warn("Asked to remove a non-existent worker[%s]", worker.getHost()); } } - private boolean cancelWorkerCleanup(String workerHost) + private void cancelWorkerCleanup(String workerHost) { ScheduledFuture previousCleanup = removedWorkerCleanups.remove(workerHost); if (previousCleanup != null) { - log.info("Cancelling Worker[%s] scheduled task cleanup", workerHost); + log.info("Cancelling worker[%s] scheduled task cleanup", workerHost); previousCleanup.cancel(false); } - return previousCleanup != null; } private void scheduleTasksCleanupForWorker(final String workerHostAndPort) @@ -705,38 +649,35 @@ private void scheduleTasksCleanupForWorker(final String workerHostAndPort) final ListenableScheduledFuture cleanupTask = cleanupExec.schedule( () -> { - log.info("Running scheduled cleanup for Worker[%s]", workerHostAndPort); + log.info("Running scheduled cleanup for worker[%s]", workerHostAndPort); try { - Set tasksToFail = new HashSet<>(); - synchronized (statusLock) { - for (Map.Entry e : tasks.entrySet()) { - if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { - Worker w = e.getValue().getWorker(); - if (w != null && w.getHost().equals(workerHostAndPort)) { - tasksToFail.add(e.getValue()); - } - } + final Set tasksToFail = new HashSet<>(); + for (HttpRemoteTaskRunnerWorkItem taskEntry : tasks.values()) { + if (taskEntry.getState().inProgress() + && taskEntry.getWorker() != null + && taskEntry.getWorker().getHost().equals(workerHostAndPort)) { + tasksToFail.add(taskEntry); } } for (HttpRemoteTaskRunnerWorkItem taskItem : tasksToFail) { - if (!taskItem.getResult().isDone()) { + if (taskItem.getResultStatus() == null) { log.warn( - "Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s].", + "Failing task[%s] because worker[%s] disappeared and did not report within cleanup timeout[%s]", taskItem.getTaskId(), workerHostAndPort, config.getTaskCleanupTimeout() ); - // taskComplete(..) must be called outside of statusLock, see comments on method. + // taskComplete(..) must be called outside workerStatusLock, see comments on method. taskComplete( - taskItem, + taskItem.getTaskId(), null, TaskStatus.failure( taskItem.getTaskId(), StringUtils.format( "The worker that this task was assigned disappeared and " + "did not report cleanup within timeout[%s]. " - + "See overlord and middleManager/indexer logs for more details.", + + "See overlord and middleManager/indexer logs for more details", config.getTaskCleanupTimeout() ) ) @@ -745,7 +686,7 @@ private void scheduleTasksCleanupForWorker(final String workerHostAndPort) } } catch (Exception e) { - log.makeAlert("Exception while cleaning up worker[%s]", workerHostAndPort).emit(); + log.makeAlert(e, "Exception while cleaning up worker[%s]", workerHostAndPort).emit(); throw new RuntimeException(e); } }, @@ -780,17 +721,11 @@ private void scheduleSyncMonitoring() { workersSyncExec.scheduleAtFixedRate( () -> { - log.debug("Running the Sync Monitoring."); - try { syncMonitoring(); } catch (Exception ex) { - if (ex instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } else { - log.makeAlert(ex, "Exception in sync monitoring.").emit(); - } + log.makeAlert(ex, "Exception in worker sync monitoring").emit(); } }, 1, @@ -802,16 +737,14 @@ private void scheduleSyncMonitoring() @VisibleForTesting void syncMonitoring() { - // Ensure that the collection is not being modified during iteration. Iterate over a copy - final Set> workerEntrySet = ImmutableSet.copyOf(workers.entrySet()); - for (Map.Entry e : workerEntrySet) { + final ImmutableMap workersCopy = getImmutableWorkersCopy(); + for (Map.Entry e : workersCopy.entrySet()) { WorkerHolder workerHolder = e.getValue(); if (workerHolder.getUnderlyingSyncer().needsReset()) { - synchronized (workers) { - // check again that server is still there and only then reset. + synchronized (workerStateLock) { if (workers.containsKey(e.getKey())) { log.makeAlert( - "Worker[%s] is not syncing properly. Current state is [%s]. Resetting it.", + "Worker[%s] is not syncing properly. Current state is [%s]. Resetting it", workerHolder.getWorker().getHost(), workerHolder.getUnderlyingSyncer().getDebugInfo() ).emit(); @@ -831,12 +764,12 @@ Map getWorkerSyncerDebugInfo() { Preconditions.checkArgument(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); - Map result = Maps.newHashMapWithExpectedSize(workers.size()); - for (Map.Entry e : workers.entrySet()) { - WorkerHolder serverHolder = e.getValue(); + final ImmutableMap workersCopy = getImmutableWorkersCopy(); + Map result = Maps.newHashMapWithExpectedSize(workersCopy.size()); + for (Map.Entry e : workersCopy.entrySet()) { result.put( e.getKey(), - serverHolder.getUnderlyingSyncer().getDebugInfo() + e.getValue().getUnderlyingSyncer().getDebugInfo() ); } return result; @@ -844,35 +777,38 @@ Map getWorkerSyncerDebugInfo() private void checkAndRemoveWorkersFromBlackList() { - boolean shouldRunPendingTasks = false; - - Iterator> iterator = blackListedWorkers.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry e = iterator.next(); - if (shouldRemoveNodeFromBlackList(e.getValue())) { - iterator.remove(); - e.getValue().resetContinuouslyFailedTasksCount(); - e.getValue().setBlacklistedUntil(null); - shouldRunPendingTasks = true; + final AtomicBoolean shouldRunPendingTasks = new AtomicBoolean(false); + + synchronized (workerStateLock) { + for (final String workerHost : workers.keySet()) { + workers.computeIfPresent( + workerHost, + (workerHostKey, workerEntry) -> { + if (workerEntry.getState() == WorkerHolder.State.BLACKLISTED) { + if (shouldRemoveNodeFromBlackList(workerEntry)) { + workerEntry.resetContinuouslyFailedTasksCount(); + workerEntry.setBlacklistedUntil(null); + workerEntry.setState(WorkerHolder.State.READY); + blackListedWorkersCount.decrementAndGet(); + shouldRunPendingTasks.set(true); + } + } + return workerEntry; + } + ); } - } - if (shouldRunPendingTasks) { - synchronized (statusLock) { - statusLock.notifyAll(); + if (shouldRunPendingTasks.get()) { + workerStateLock.notifyAll(); } } } private boolean shouldRemoveNodeFromBlackList(WorkerHolder workerHolder) { - if (!workers.containsKey(workerHolder.getWorker().getHost())) { - return true; - } - - if (blackListedWorkers.size() > workers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0)) { + if (blackListedWorkersCount.get() > workers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0)) { log.info( - "Removing [%s] from blacklist because percentage of blacklisted workers exceeds [%d]", + "Removing worker[%s] from blacklist because percentage of blacklisted workers exceeds [%d]", workerHolder.getWorker(), config.getMaxPercentageBlacklistWorkers() ); @@ -880,40 +816,56 @@ private boolean shouldRemoveNodeFromBlackList(WorkerHolder workerHolder) return true; } - long remainingMillis = workerHolder.getBlacklistedUntil().getMillis() - System.currentTimeMillis(); + DateTime blacklistedUntil = workerHolder.getBlacklistedUntil(); + if (blacklistedUntil == null) { + // Blacklisted without an expiry time - should not happen, but remove from blacklist + log.warn("Worker[%s] is blacklisted but has no blacklistedUntil time set. Removing from blacklist", + workerHolder.getWorker()); + return true; + } + + long remainingMillis = blacklistedUntil.getMillis() - System.currentTimeMillis(); if (remainingMillis <= 0) { - log.info("Removing [%s] from blacklist because backoff time elapsed", workerHolder.getWorker()); + log.info("Removing worker[%s] from blacklist because backoff time elapsed", workerHolder.getWorker()); return true; } - log.info("[%s] still blacklisted for [%,ds]", workerHolder.getWorker(), remainingMillis / 1000); + log.info("Worker[%s] still blacklisted for [%,ds]", workerHolder.getWorker(), remainingMillis / 1000); return false; } private void blacklistWorkerIfNeeded(TaskStatus taskStatus, WorkerHolder workerHolder) { - synchronized (blackListedWorkers) { - if (taskStatus.isSuccess()) { - workerHolder.resetContinuouslyFailedTasksCount(); - if (blackListedWorkers.remove(workerHolder.getWorker().getHost()) != null) { - workerHolder.setBlacklistedUntil(null); - log.info("[%s] removed from blacklist because a task finished with SUCCESS", workerHolder.getWorker()); - } - } else if (taskStatus.isFailure()) { - workerHolder.incrementContinuouslyFailedTasksCount(); + if (taskStatus.isSuccess()) { + workerHolder.resetContinuouslyFailedTasksCount(); + if (workerHolder.getState() == WorkerHolder.State.BLACKLISTED) { + workerHolder.setBlacklistedUntil(null); + workerHolder.setState(WorkerHolder.State.READY); + blackListedWorkersCount.decrementAndGet(); + log.info( + "Worker[%s] removed from blacklist because task[%s] finished with SUCCESS", + workerHolder.getWorker(), + taskStatus.getId() + ); } + } else if (taskStatus.isFailure()) { + workerHolder.incrementContinuouslyFailedTasksCount(); + } - if (workerHolder.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() && - blackListedWorkers.size() <= workers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) { + if (workerHolder.getContinuouslyFailedTasksCount() > config.getMaxRetriesBeforeBlacklist() && + blackListedWorkersCount.get() <= workers.size() * (config.getMaxPercentageBlacklistWorkers() / 100.0) - 1) { + // If worker is active, blacklist it + if (workerHolder.getState() == WorkerHolder.State.READY + || workerHolder.getState() == WorkerHolder.State.PENDING_ASSIGN) { workerHolder.setBlacklistedUntil(DateTimes.nowUtc().plus(config.getWorkerBlackListBackoffTime())); - if (blackListedWorkers.put(workerHolder.getWorker().getHost(), workerHolder) == null) { - log.info( - "Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.", - workerHolder.getWorker(), - workerHolder.getBlacklistedUntil(), - workerHolder.getContinuouslyFailedTasksCount() - ); - } + workerHolder.setState(WorkerHolder.State.BLACKLISTED); + blackListedWorkersCount.incrementAndGet(); + log.info( + "Blacklisting worker[%s] until [%s] after [%,d] failed tasks in a row", + workerHolder.getWorker(), + workerHolder.getBlacklistedUntil(), + workerHolder.getContinuouslyFailedTasksCount() + ); } } } @@ -921,72 +873,84 @@ private void blacklistWorkerIfNeeded(TaskStatus taskStatus, WorkerHolder workerH @Override public Collection getWorkers() { - return workers.values().stream().map(worker -> worker.toImmutable()).collect(Collectors.toList()); - } - - @VisibleForTesting - ConcurrentMap getWorkersForTestingReadOnly() - { - return workers; + return getImmutableWorkersCopy().values().stream() + .map(WorkerHolder::toImmutable) + .collect(Collectors.toList()); } @Override public Collection getLazyWorkers() { - return lazyWorkers.values().stream().map(holder -> holder.getWorker()).collect(Collectors.toList()); + return getImmutableWorkersCopy().values() + .stream() + .filter(w -> w.getState() == WorkerHolder.State.LAZY) + .map(WorkerHolder::getWorker) + .collect(Collectors.toList()); } @Override public Collection markWorkersLazy(Predicate isLazyWorker, int maxLazyWorkers) { - // skip the lock and bail early if we should not mark any workers lazy (e.g. number - // of current workers is at or below the minNumWorkers of autoscaler config) - if (lazyWorkers.size() >= maxLazyWorkers) { - return getLazyWorkers(); - } - // Search for new workers to mark lazy. // Status lock is used to prevent any tasks being assigned to workers while we mark them lazy - synchronized (statusLock) { - for (Map.Entry worker : workers.entrySet()) { - if (lazyWorkers.size() >= maxLazyWorkers) { + synchronized (workerStateLock) { + long lazyCount = workers.values().stream() + .filter(w -> w.getState() == WorkerHolder.State.LAZY) + .count(); + AtomicInteger numMarkedLazy = new AtomicInteger((int) lazyCount); + AtomicBoolean reachedMax = new AtomicBoolean(false); + + for (String workerHostKey : workers.keySet()) { + if (reachedMax.get()) { break; } - final WorkerHolder workerHolder = worker.getValue(); - try { - if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && isLazyWorker.apply(workerHolder.toImmutable())) { - log.info("Adding Worker[%s] to lazySet!", workerHolder.getWorker().getHost()); - lazyWorkers.put(worker.getKey(), workerHolder); + workers.compute(workerHostKey, (key, workerHolder) -> { + if (workerHolder != null && numMarkedLazy.get() < maxLazyWorkers) { + try { + if (isWorkerOkForMarkingLazy(workerHolder) && isLazyWorker.apply(workerHolder.toImmutable())) { + log.info("Marking worker[%s] as lazy", workerHolder.getWorker().getHost()); + workerHolder.setState(WorkerHolder.State.LAZY); + numMarkedLazy.incrementAndGet(); + } + } + catch (Exception e) { + log.error(e, "Exception while marking worker[%s] as lazy. Skipping", key); + } } - } - catch (Exception e) { - throw new RuntimeException(e); - } + if (numMarkedLazy.get() >= maxLazyWorkers) { + reachedMax.set(true); + } + return workerHolder; + }); } - } - return getLazyWorkers(); + log.info("Marked [%d] workers as lazy", numMarkedLazy.get()); + return workers.values() + .stream() + .filter(w -> w.getState() == WorkerHolder.State.LAZY) + .map(WorkerHolder::getWorker) + .collect(Collectors.toList()); + } } - private boolean isWorkerOkForMarkingLazy(Worker worker) + private boolean isWorkerOkForMarkingLazy(WorkerHolder workerHolder) { - // Check that worker is not running any tasks and no task is being assigned to it. - synchronized (statusLock) { - if (workersWithUnacknowledgedTask.containsKey(worker.getHost())) { - return false; - } + // Check that worker is not already lazy, and does not have any in-flight tasks being assigned to it. + if (workerHolder.getState() == WorkerHolder.State.LAZY + || workerHolder.getState() == WorkerHolder.State.PENDING_ASSIGN) { + return false; + } - for (Map.Entry e : tasks.entrySet()) { - if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { - Worker w = e.getValue().getWorker(); - if (w != null && w.getHost().equals(worker.getHost())) { - return false; - } - } + // Check that worker has no in-flight/running tasks associated with it + final String workerHost = workerHolder.getWorker().getHost(); + for (HttpRemoteTaskRunnerWorkItem taskEntry : tasks.values()) { + if (taskEntry.getState().inProgress() + && taskEntry.getWorker() != null + && taskEntry.getWorker().getHost().equals(workerHost)) { + return false; } - - return true; } + return true; } @Override @@ -998,26 +962,25 @@ public WorkerTaskRunnerConfig getConfig() @Override public Collection getPendingTaskPayloads() { - synchronized (statusLock) { - return tasks.values() - .stream() - .filter(item -> item.getState().isPending()) - .map(HttpRemoteTaskRunnerWorkItem::getTask) - .collect(Collectors.toList()); - } + return tasks.values() + .stream() + .filter(item -> item.getState().isPending()) + .map(HttpRemoteTaskRunnerWorkItem::getTask) + .collect(Collectors.toList()); } @Override public Optional streamTaskLog(String taskId, long offset) throws IOException { - @SuppressWarnings("GuardedBy") // Read on tasks is safe HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId); Worker worker = null; if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) { worker = taskRunnerWorkItem.getWorker(); } - if (worker == null || !workers.containsKey(worker.getHost())) { + // Point read on ConcurrentHashMap: safe without workerStateLock + boolean workerOnline = worker != null && workers.containsKey(worker.getHost()); + if (!workerOnline) { // Worker is not running this task, it might be available in deep storage return Optional.absent(); } else { @@ -1049,14 +1012,15 @@ public Optional streamTaskLog(String taskId, long offset) throws IO @Override public Optional streamTaskReports(String taskId) throws IOException { - @SuppressWarnings("GuardedBy") // Read on tasks is safe HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId); Worker worker = null; if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) { worker = taskRunnerWorkItem.getWorker(); } - if (worker == null || !workers.containsKey(worker.getHost())) { + // Point read on ConcurrentHashMap: safe without workerStateLock + boolean workerOnline = worker != null && workers.containsKey(worker.getHost()); + if (!workerOnline) { // Worker is not running this task, it might be available in deep storage return Optional.absent(); } else { @@ -1102,26 +1066,24 @@ public void registerListener(TaskRunnerListener listener, Executor executor) { for (Pair pair : listeners) { if (pair.lhs.getListenerId().equals(listener.getListenerId())) { - throw new ISE("Listener [%s] already registered", listener.getListenerId()); + throw new ISE("Listener[%s] already registered", listener.getListenerId()); } } final Pair listenerPair = Pair.of(listener, executor); - synchronized (statusLock) { - for (Map.Entry entry : tasks.entrySet()) { - if (entry.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { - TaskRunnerUtils.notifyLocationChanged( - ImmutableList.of(listenerPair), - entry.getKey(), - entry.getValue().getLocation() - ); - } + for (Map.Entry entry : tasks.entrySet()) { + if (entry.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { + TaskRunnerUtils.notifyLocationChanged( + ImmutableList.of(listenerPair), + entry.getKey(), + entry.getValue().getLocation() + ); } - - log.info("Registered listener [%s]", listener.getListenerId()); - listeners.add(listenerPair); } + + log.info("Registered listener[%s]", listener.getListenerId()); + listeners.add(listenerPair); } @Override @@ -1130,7 +1092,7 @@ public void unregisterListener(String listenerId) for (Pair pair : listeners) { if (pair.lhs.getListenerId().equals(listenerId)) { listeners.remove(pair); - log.info("Unregistered listener [%s]", listenerId); + log.info("Unregistered listener[%s]", listenerId); return; } } @@ -1139,37 +1101,38 @@ public void unregisterListener(String listenerId) @Override public ListenableFuture run(Task task) { - Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS), "not started"); + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS), "TaskRunner not started"); - synchronized (statusLock) { - HttpRemoteTaskRunnerWorkItem existing = tasks.get(task.getId()); + AtomicReference> taskFuture = new AtomicReference<>(); - if (existing != null) { - log.info("Assigned a task[%s] that is known already. Ignored.", task.getId()); - if (existing.getTask() == null) { - // in case it was discovered from a worker on start() and TaskAnnouncement does not have Task instance - // in it. - existing.setTask(task); - } - return existing.getResult(); - } else { - log.info("Adding pending task[%s].", task.getId()); - HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = new HttpRemoteTaskRunnerWorkItem( - task.getId(), - null, - null, - task, - task.getType(), - HttpRemoteTaskRunnerWorkItem.State.PENDING - ); - tasks.put(task.getId(), taskRunnerWorkItem); - pendingTaskIds.add(task.getId()); + log.info("Adding task[%s]", task.getId()); - statusLock.notifyAll(); + tasks.compute( + task.getId(), (id, entry) -> { + // Task already exists, but in case it was discovered from a worker on start() + // and TaskAnnouncement does not have Task instance, add it. + if (entry != null) { + if (entry.getTask() == null) { + entry.setTask(task); + } + } else { + entry = new HttpRemoteTaskRunnerWorkItem( + task.getId(), + null, + null, + task, + task.getType(), + HttpRemoteTaskRunnerWorkItem.State.PENDING + ); + pendingTasks.offer(new PendingTaskQueueItem(task)); + } - return taskRunnerWorkItem.getResult(); - } - } + taskFuture.set(entry.getResult()); + return entry; + } + ); + + return taskFuture.get(); } private void startPendingTaskHandling() @@ -1179,7 +1142,7 @@ private void startPendingTaskHandling() () -> { try { if (!lifecycleLock.awaitStarted()) { - log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); + log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run").emit(); return; } @@ -1190,120 +1153,100 @@ private void startPendingTaskHandling() .emit(); } finally { - log.info("PendingTaskExecution loop exited."); + log.info("PendingTaskExecution loop exited"); } } ); } } - private void pendingTasksExecutionLoop() + @VisibleForTesting + void pendingTasksExecutionLoop() { while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { try { - // Find one pending task to run and a worker to run on - HttpRemoteTaskRunnerWorkItem taskItem = null; - ImmutableWorkerInfo immutableWorker = null; - - synchronized (statusLock) { - Iterator iter = pendingTaskIds.iterator(); - while (iter.hasNext()) { - String taskId = iter.next(); - HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); - - if (ti == null || !ti.getState().isPending()) { - // happens if the task was shutdown, failed or observed running by a worker - iter.remove(); - continue; - } - - if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { - // picked up by another pending task executor thread which is in the process of trying to - // run it on a worker, skip to next. - continue; - } - - if (ti.getTask() == null) { - // this is not supposed to happen except for a bug, we want to mark this task failed but - // taskComplete(..) can not be called while holding statusLock. See the javadoc on that - // method. - // so this will get marked failed afterwards outside of current synchronized block. - taskItem = ti; - break; - } - - immutableWorker = findWorkerToRunTask(ti.getTask()); - if (immutableWorker == null) { - continue; - } - - String prevUnackedTaskId = workersWithUnacknowledgedTask.putIfAbsent( - immutableWorker.getWorker().getHost(), - taskId - ); - if (prevUnackedTaskId != null) { - log.makeAlert( - "Found worker[%s] with unacked task[%s] but still was identified to run task[%s].", - immutableWorker.getWorker().getHost(), - prevUnackedTaskId, - taskId - ).emit(); - } - - // set state to PENDING_WORKER_ASSIGN before releasing the lock so that this task item is not picked - // up by another task execution thread. - // note that we can't simply delete this task item from pendingTaskIds or else we would have to add it - // back if this thread couldn't run this task for any reason, which we will know at some later time - // and also we will need to add it back to its old position in the list. that becomes complex quickly. - // Instead we keep the PENDING_WORKER_ASSIGN to notify other task execution threads not to pick this one up. - // And, it is automatically removed by any of the task execution threads when they notice that - // ti.getState().isPending() is false (at the beginning of this loop) - ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN); - taskItem = ti; - break; - } - - if (taskItem == null) { - // Either no pending task is found or no suitable worker is found for any of the pending tasks. - // statusLock.notifyAll() is called whenever a new task shows up or if there is a possibility for a task - // to successfully get worker to run, for example when a new worker shows up, a task slot opens up - // because some task completed etc. - statusLock.wait(TimeUnit.MINUTES.toMillis(1)); + final PendingTaskQueueItem taskItem = pendingTasks.poll(1, TimeUnit.MINUTES); + if (taskItem == null) { + log.info("Found no available tasks. Waiting for tasks to assign"); + continue; + } + final String taskId = taskItem.getTask().getId(); + + ImmutableWorkerInfo workerToAssign; + // Set to false inside tasks.compute() if the task is no longer PENDING when we attempt + // to transition it to PENDING_WORKER_ASSIGN. A task can complete concurrently (e.g. via + // cancellation) without holding workerStateLock, so we cannot rely on a pre-check. + final AtomicBoolean taskWasPending = new AtomicBoolean(true); + + synchronized (workerStateLock) { + workerToAssign = findWorkerToRunTask(taskItem.getTask()); + + if (workerToAssign == null) { + // Park until a worker signals capacity change (task complete, new worker, state → READY). + // Bounded wait avoids missed wakeups from spurious interrupts. + log.warn("No workers available to run task[%s], waiting for capacity", taskId); + workerStateLock.wait(TimeUnit.MINUTES.toMillis(1)); + pendingTasks.put(taskItem.requeue()); continue; } - } - String taskId = taskItem.getTaskId(); + // Mark this worker as unassignable while task is being assigned + final ImmutableWorkerInfo finalWorkerToAssign = workerToAssign; + workers.compute( + workerToAssign.getWorker().getHost(), (key, entry) -> { + Preconditions.checkState( + entry != null, + "Expected selected worker[%s] to be available", + finalWorkerToAssign.getWorker().getHost() + ); + Preconditions.checkState( + entry.getState() == WorkerHolder.State.READY, + "Expected worker[%s] state to be READY, got [%s]", + entry.getWorker().getHost(), + entry.getState() + ); - if (taskItem.getTask() == null) { - log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", taskId).emit(); - // taskComplete(..) must be called outside of statusLock, see comments on method. - taskComplete( - taskItem, - null, - TaskStatus.failure( - taskId, - "No payload found for this task. " - + "See overlord logs and middleManager/indexer logs for more details." - ) + entry.setState(WorkerHolder.State.PENDING_ASSIGN); + return entry; + } ); - continue; - } - if (immutableWorker == null) { - throw new ISE("Unexpected state: null immutableWorker"); + // Mark this task as pending worker assign. A task can complete concurrently (e.g. via + // cancellation) without holding workerStateLock, so we handle the non-PENDING case + // gracefully instead of asserting. The worker will be reset to READY in the finally block. + tasks.compute( + taskId, + (key, entry) -> { + Preconditions.checkState(entry != null, "Expected task[%s] to be in tasks set", taskId); + if (entry.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) { + entry.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN); + } else { + taskWasPending.set(false); + } + return entry; + } + ); } + final String workerHost = workerToAssign.getWorker().getHost(); try { - // this will send HTTP request to worker for assigning task - if (!runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) { - if (taskItem.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { - taskItem.revertStateFromPendingWorkerAssignToPending(); - } + if (!taskWasPending.get()) { + // Task completed (e.g. cancelled) while we were setting up assignment. + // Worker state will be reset to READY in the finally block. + log.info( + "Task[%s] completed concurrently before assignment to worker[%s], skipping", + taskId, + workerHost + ); + } else if (!runTaskOnWorker(taskId, workerHost)) { + log.warn("Failed to assign task[%s] to worker[%s]. Sending to back of queue", taskId, workerHost); + pendingTasks.put(taskItem.requeue()); + } else { + log.info("Assigned task[%s] to worker[%s]", taskId, workerHost); } } catch (InterruptedException ex) { - log.info("Got InterruptedException while assigning task[%s].", taskId); + log.info("Got InterruptedException while assigning task[%s]", taskId); throw ex; } catch (Throwable th) { @@ -1311,76 +1254,96 @@ private void pendingTasksExecutionLoop() .addData("taskId", taskId) .emit(); - // taskComplete(..) must be called outside of statusLock, see comments on method. + // taskComplete(..) must be called outside workerStatusLock, see comments on method. taskComplete( - taskItem, + taskId, null, - TaskStatus.failure(taskId, "Failed to assign this task. See overlord logs for more details.") + TaskStatus.failure( + taskId, + StringUtils.format( + "Failed to assign this task to worker[%s]. See overlord logs for more details", + workerHost + ) + ) ); } finally { - synchronized (statusLock) { - workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost()); - statusLock.notifyAll(); + // Allow the worker to accept tasks again + synchronized (workerStateLock) { + workers.compute( + workerHost, + (key, entry) -> { + if (entry == null) { + log.warn("Could not find worker[%s]", workerHost); + } else { + // Only reset the worker status if PENDING_ASSIGN + // If LAZY/BLACKLISTED, either the worker is getting trashed eminently or will be auto-reset. + entry.compareAndExchangeState(WorkerHolder.State.PENDING_ASSIGN, WorkerHolder.State.READY); + } + return entry; + } + ); } - } + // Reset task state from PENDING_WORKER_ASSIGN -> PENDING if assignment did not complete. + // This allows the task to be picked up again from the queue without hitting a precondition failure. + tasks.compute(taskId, (key, entry) -> { + if (entry != null) { + entry.revertToPending(); + } + return entry; + }); + + notifyWatchers(); + } } - catch (InterruptedException ex) { - log.info("Interrupted, will Exit."); + catch (InterruptedException e) { + log.warn("Interrupted, stopping pending task execution loop"); Thread.currentThread().interrupt(); } catch (Throwable th) { - log.makeAlert(th, "Unknown Exception while trying to assign tasks.").emit(); + log.makeAlert(th, "Unknown Exception while trying to assign tasks").emit(); } } - } - /** - * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} - */ - List getPendingTasksList() - { - synchronized (statusLock) { - return ImmutableList.copyOf(pendingTaskIds); - } + log.warn("Pending tasks execution loop exited"); } @Override public void shutdown(String taskId, String reason) { - if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) { - log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", taskId); - return; - } + log.info("Shutdown task[%s] because [%s]", taskId, reason); - WorkerHolder workerHolderRunningTask = null; - synchronized (statusLock) { - log.info("Shutdown [%s] because: [%s]", taskId, reason); - HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId); - if (taskRunnerWorkItem != null) { - if (taskRunnerWorkItem.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { - workerHolderRunningTask = workers.get(taskRunnerWorkItem.getWorker().getHost()); - if (workerHolderRunningTask == null) { - log.info("Can't shutdown! No worker running task[%s]", taskId); + AtomicReference workerHolderRunningTaskRef = new AtomicReference<>(); + final AtomicBoolean wasComplete = new AtomicBoolean(false); + tasks.compute( + taskId, + (key, entry) -> { + if (entry != null) { + if (entry.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { + workerHolderRunningTaskRef.set(workers.get(entry.getWorker().getHost())); + } else if (entry.getState() == HttpRemoteTaskRunnerWorkItem.State.COMPLETE) { + wasComplete.set(true); + entry = null; // delete the entry + } + } else { + log.debug("Asked to shutdown task[%s], but task not found. Already cleaned up", taskId); } - } else if (taskRunnerWorkItem.getState() == HttpRemoteTaskRunnerWorkItem.State.COMPLETE) { - tasks.remove(taskId); + return entry; } - } else { - log.info("Received shutdown task[%s], but can't find it. Ignored.", taskId); - } - } + ); - //shutdown is called outside of lock as we don't want to hold the lock while sending http request - //to worker. - if (workerHolderRunningTask != null) { + if (workerHolderRunningTaskRef.get() != null) { log.debug( - "Got shutdown request for task[%s]. Asking worker[%s] to kill it.", + "Got shutdown request for task[%s]. Asking worker[%s] to kill it", taskId, - workerHolderRunningTask.getWorker().getHost() + workerHolderRunningTaskRef.get().getWorker().getHost() ); - workerHolderRunningTask.shutdownTask(taskId); + workerHolderRunningTaskRef.get().shutdownTask(taskId); + } else if (wasComplete.get()) { + log.debug("Task[%s] already completed, no shutdown needed", taskId); + } else { + log.debug("Task[%s] not found or not running, no shutdown needed", taskId); } } @@ -1389,11 +1352,11 @@ public void shutdown(String taskId, String reason) public void stop() { if (!lifecycleLock.canStop()) { - throw new ISE("can't stop."); + throw new ISE("can't stop"); } try { - log.info("Stopping..."); + log.info("Stopping"); if (provisioningService != null) { provisioningService.close(); @@ -1408,26 +1371,30 @@ public void stop() druidNodeDiscovery.removeListener(nodeDiscoveryListener); log.info("Stopping worker holders"); - synchronized (workers) { - workers.values().forEach(w -> { - try { - w.stop(); - } - catch (Exception e) { - log.error(e, e.getMessage()); - } - }); + synchronized (workerStateLock) { + for (String workerHost : workers.keySet()) { + workers.compute(workerHost, (key, workerHolder) -> { + if (workerHolder != null) { + try { + workerHolder.stop(); + } + catch (Exception e) { + log.error(e, e.getMessage()); + } + } + return workerHolder; + }); + } } } finally { lifecycleLock.exitStop(); } - log.info("Stopped."); + log.info("Stopped"); } @Override - @SuppressWarnings("GuardedBy") // Read on tasks is safe public Collection getRunningTasks() { return tasks.values() @@ -1437,7 +1404,6 @@ public Collection getRunningTasks() } @Override - @SuppressWarnings("GuardedBy") // Read on tasks is safe public Collection getPendingTasks() { return tasks.values() @@ -1447,13 +1413,11 @@ public Collection getPendingTasks() } @Override - @SuppressWarnings("GuardedBy") // Read on tasks is safe public Collection getKnownTasks() { return ImmutableList.copyOf(tasks.values()); } - @SuppressWarnings("GuardedBy") // Read on tasks is safe public Collection getCompletedTasks() { return tasks.values() @@ -1464,7 +1428,6 @@ public Collection getCompletedTasks() @Nullable @Override - @SuppressWarnings("GuardedBy") // Read on tasks is safe public RunnerTaskState getRunnerTaskState(String taskId) { final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId); @@ -1476,7 +1439,6 @@ public RunnerTaskState getRunnerTaskState(String taskId) } @Override - @SuppressWarnings("GuardedBy") // Read on tasks is safe public TaskLocation getTaskLocation(String taskId) { final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId); @@ -1487,25 +1449,22 @@ public TaskLocation getTaskLocation(String taskId) } } - public List getBlacklistedWorkers() - { - return blackListedWorkers.values().stream().map( - (holder) -> holder.getWorker().getHost() - ).collect(Collectors.toList()); - } - public Collection getBlackListedWorkers() { - return ImmutableList.copyOf(Collections2.transform(blackListedWorkers.values(), WorkerHolder::toImmutable)); + return getImmutableWorkersCopy().values() + .stream() + .filter(w -> w.getState() == WorkerHolder.State.BLACKLISTED) + .map(WorkerHolder::toImmutable) + .collect(Collectors.toList()); } - /** - * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} , used for read only. - */ - @SuppressWarnings("GuardedBy") - Map getWorkersWithUnacknowledgedTasks() + public Collection getPendingAssignWorkers() { - return workersWithUnacknowledgedTask; + return getImmutableWorkersCopy().values() + .stream() + .filter(w -> w.getState() == WorkerHolder.State.PENDING_ASSIGN) + .map(WorkerHolder::toImmutable) + .collect(Collectors.toList()); } @Override @@ -1521,206 +1480,212 @@ public void taskAddedOrUpdated(final TaskAnnouncement announcement, final Worker final Worker worker = workerHolder.getWorker(); log.debug( - "Worker[%s] wrote [%s] status for task [%s] on [%s]", + "Worker[%s] wrote status[%s] for task[%s] on location[%s]", worker.getHost(), announcement.getTaskStatus().getStatusCode(), taskId, announcement.getTaskLocation() ); - HttpRemoteTaskRunnerWorkItem taskItem; - boolean shouldShutdownTask = false; - boolean isTaskCompleted = false; - - synchronized (statusLock) { - taskItem = tasks.get(taskId); - if (taskItem == null) { - // Try to find information about it in the TaskStorage - Optional knownStatusInStorage = taskStorage.getStatus(taskId); - - if (knownStatusInStorage.isPresent()) { - switch (knownStatusInStorage.get().getStatusCode()) { - case RUNNING: - taskItem = new HttpRemoteTaskRunnerWorkItem( - taskId, - worker, - TaskLocation.unknown(), - null, - announcement.getTaskType(), - HttpRemoteTaskRunnerWorkItem.State.RUNNING - ); - tasks.put(taskId, taskItem); - final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); - metricBuilder.setDimension(DruidMetrics.TASK_ID, taskId); - emitter.emit(metricBuilder.setMetric(TASK_DISCOVERED_COUNT, 1L)); - break; - case SUCCESS: - case FAILED: - if (!announcement.getTaskStatus().isComplete()) { - log.info( - "Worker[%s] reported status for completed, known from taskStorage, task[%s]. Ignored.", - worker.getHost(), - taskId - ); - } - break; - default: - log.makeAlert( - "Found unrecognized state[%s] of task[%s] in taskStorage. Notification[%s] from worker[%s] is ignored.", - knownStatusInStorage.get().getStatusCode(), - taskId, - announcement, - worker.getHost() - ).emit(); - } - } else { - log.warn( - "Worker[%s] reported status[%s] for unknown task[%s]. Ignored.", - worker.getHost(), - announcement.getStatus(), - taskId - ); - } - } - - if (taskItem == null) { - if (!announcement.getTaskStatus().isComplete()) { - shouldShutdownTask = true; - } - } else { - switch (announcement.getTaskStatus().getStatusCode()) { - case RUNNING: - switch (taskItem.getState()) { - case PENDING: - case PENDING_WORKER_ASSIGN: - taskItem.setWorker(worker); - taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING); - log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost()); - - final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); - IndexTaskUtils.setTaskDimensions(metricBuilder, taskItem.getTask()); - emitter.emit(metricBuilder.setMetric( - "task/pending/time", - new Duration(taskItem.getCreatedTime(), DateTimes.nowUtc()).getMillis()) - ); + final AtomicBoolean shouldShutdownTask = new AtomicBoolean(false); + final AtomicBoolean isTaskCompleted = new AtomicBoolean(false); - // fall through - case RUNNING: - if (worker.getHost().equals(taskItem.getWorker().getHost())) { - if (!announcement.getTaskLocation().equals(taskItem.getLocation())) { + tasks.compute( + taskId, + (key, taskEntry) -> { + if (taskEntry == null) { + // Try to find information about it in the TaskStorage + Optional knownStatusInStorage = taskStorage.getStatus(taskId); + + if (knownStatusInStorage.isPresent()) { + switch (knownStatusInStorage.get().getStatusCode()) { + case RUNNING: + taskEntry = new HttpRemoteTaskRunnerWorkItem( + taskId, + worker, + TaskLocation.unknown(), + null, + announcement.getTaskType(), + HttpRemoteTaskRunnerWorkItem.State.RUNNING + ); + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + metricBuilder.setDimension(DruidMetrics.TASK_ID, taskId); + emitter.emit(metricBuilder.setMetric(TASK_DISCOVERED_COUNT, 1L)); + break; + case SUCCESS: + case FAILED: + if (!announcement.getTaskStatus().isComplete()) { log.info( - "Task[%s] location changed on worker[%s]. new location[%s].", - taskId, + "Worker[%s] reported status for completed, known from taskStorage, task[%s]. Ignored", worker.getHost(), - announcement.getTaskLocation() + taskId ); - taskItem.setLocation(announcement.getTaskLocation()); - TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); } - } else { - log.warn( - "Found worker[%s] running task[%s] which is being run by another worker[%s]. Notification ignored.", - worker.getHost(), + break; + default: + log.makeAlert( + "Found unrecognized state[%s] of task[%s] in taskStorage. Notification[%s] from worker[%s] is ignored", + knownStatusInStorage.get().getStatusCode(), taskId, - taskItem.getWorker().getHost() - ); - shouldShutdownTask = true; - } - break; - case COMPLETE: - log.warn( - "Worker[%s] reported status for completed task[%s]. Ignored.", - worker.getHost(), - taskId - ); - shouldShutdownTask = true; - break; - default: - log.makeAlert( - "Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.", - taskItem.getState(), - taskId, - announcement, - worker.getHost() - ).emit(); + announcement, + worker.getHost() + ).emit(); + } + } else { + log.warn( + "Worker[%s] reported status[%s] for unknown task[%s]. Ignored", + worker.getHost(), + announcement.getStatus(), + taskId + ); } - break; - case FAILED: - case SUCCESS: - switch (taskItem.getState()) { - case PENDING: - case PENDING_WORKER_ASSIGN: - taskItem.setWorker(worker); - taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING); - log.info("Task[%s] finished on worker[%s].", taskId, worker.getHost()); - // fall through + } + + if (taskEntry == null) { + if (!announcement.getTaskStatus().isComplete()) { + shouldShutdownTask.set(true); + } + } else { + switch (announcement.getTaskStatus().getStatusCode()) { case RUNNING: - if (worker.getHost().equals(taskItem.getWorker().getHost())) { - if (!announcement.getTaskLocation().equals(taskItem.getLocation())) { - log.info( - "Task[%s] location changed on worker[%s]. new location[%s].", - taskId, - worker.getHost(), - announcement.getTaskLocation() + switch (taskEntry.getState()) { + case PENDING: + case PENDING_WORKER_ASSIGN: + taskEntry.setWorker(worker); + taskEntry.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING); + log.info("Task[%s] started RUNNING on worker[%s]", taskId, worker.getHost()); + + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, taskEntry.getTask()); + emitter.emit(metricBuilder.setMetric( + "task/pending/time", + new Duration(taskEntry.getCreatedTime(), DateTimes.nowUtc()).getMillis() + ) ); - taskItem.setLocation(announcement.getTaskLocation()); - TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); - } - isTaskCompleted = true; - } else { - log.warn( - "Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored.", - worker.getHost(), - taskId, - taskItem.getWorker().getHost() - ); + // fall through + case RUNNING: + if (worker.getHost().equals(taskEntry.getWorker().getHost())) { + if (!announcement.getTaskLocation().equals(taskEntry.getLocation())) { + log.info( + "Task[%s] location changed on worker[%s]. New location[%s]", + taskId, + worker.getHost(), + announcement.getTaskLocation() + ); + taskEntry.setLocation(announcement.getTaskLocation()); + TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); + } + } else { + log.warn( + "Found worker[%s] running task[%s] which is being run by another worker[%s]. Notification ignored", + worker.getHost(), + taskId, + taskEntry.getWorker().getHost() + ); + shouldShutdownTask.set(true); + } + break; + case COMPLETE: + log.warn( + "Worker[%s] reported status for completed task[%s]. Ignored", + worker.getHost(), + taskId + ); + shouldShutdownTask.set(true); + break; + default: + log.makeAlert( + "Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored", + taskEntry.getState(), + taskId, + announcement, + worker.getHost() + ).emit(); } break; - case COMPLETE: - // this can happen when a worker is restarted and reports its list of completed tasks again. + case FAILED: + case SUCCESS: + switch (taskEntry.getState()) { + case PENDING: + case PENDING_WORKER_ASSIGN: + taskEntry.setWorker(worker); + taskEntry.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING); + log.info("Task[%s] finished on worker[%s]", taskId, worker.getHost()); + // fall through + case RUNNING: + if (worker.getHost().equals(taskEntry.getWorker().getHost())) { + if (!announcement.getTaskLocation().equals(taskEntry.getLocation())) { + log.info( + "Task[%s] location changed on worker[%s]. new location[%s]", + taskId, + worker.getHost(), + announcement.getTaskLocation() + ); + taskEntry.setLocation(announcement.getTaskLocation()); + TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation()); + } + + isTaskCompleted.set(true); + } else { + log.warn( + "Worker[%s] reported completed task[%s] which is being run by another worker[%s]. Notification ignored", + worker.getHost(), + taskId, + taskEntry.getWorker().getHost() + ); + } + break; + case COMPLETE: + // this can happen when a worker is restarted and reports its list of completed tasks again. + break; + default: + log.makeAlert( + "Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored", + taskEntry.getState(), + taskId, + announcement, + worker.getHost() + ).emit(); + } break; default: log.makeAlert( - "Found unrecognized state[%s] of task[%s]. Notification[%s] from worker[%s] is ignored.", - taskItem.getState(), - taskId, - announcement, - worker.getHost() + "Worker[%s] reported unrecognized state[%s] for task[%s]", + worker.getHost(), + announcement.getTaskStatus().getStatusCode(), + taskId ).emit(); } - break; - default: - log.makeAlert( - "Worker[%s] reported unrecognized state[%s] for task[%s].", - worker.getHost(), - announcement.getTaskStatus().getStatusCode(), - taskId - ).emit(); + } + return taskEntry; } - } - } + ); - if (isTaskCompleted) { - // taskComplete(..) must be called outside of statusLock, see comments on method. - taskComplete(taskItem, workerHolder, announcement.getTaskStatus()); + if (isTaskCompleted.get()) { + // taskComplete(..) must be called outside statusLock, see comments on method. + taskComplete(taskId, worker.getHost(), announcement.getTaskStatus()); } - if (shouldShutdownTask) { - log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost()); + if (shouldShutdownTask.get()) { + log.warn("Killing task[%s] on worker[%s]", taskId, worker.getHost()); workerHolder.shutdownTask(taskId); } - synchronized (statusLock) { - statusLock.notifyAll(); - } + // Notify interested parties + notifyWatchers(); } @Override public void stateChanged(boolean enabled, WorkerHolder workerHolder) { - synchronized (statusLock) { - statusLock.notifyAll(); + notifyWatchers(); + } + + private void notifyWatchers() + { + synchronized (workerStateLock) { + workerStateLock.notifyAll(); } } @@ -1749,7 +1714,9 @@ public Map getIdleTaskSlotCount() int workerAvailableCapacity = worker.getAvailableCapacity(); totalIdlePeons.compute( workerCategory, - (category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity + (category, availableCapacity) -> availableCapacity == null + ? workerAvailableCapacity + : availableCapacity + workerAvailableCapacity ); } @@ -1797,7 +1764,9 @@ public Map getBlacklistedTaskSlotCount() int workerBlacklistedPeons = worker.getWorker().getCapacity(); totalBlacklistedPeons.compute( workerCategory, - (category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons + (category, blacklistedPeons) -> blacklistedPeons == null + ? workerBlacklistedPeons + : blacklistedPeons + workerBlacklistedPeons ); } @@ -1813,8 +1782,9 @@ public int getTotalCapacity() /** * Retrieves the maximum capacity of the task runner when autoscaling is enabled.* + * * @return The maximum capacity as an integer value. Returns -1 if the maximum - * capacity cannot be determined or if autoscaling is not enabled. + * capacity cannot be determined or if autoscaling is not enabled. */ @Override public int getMaximumCapacityWithAutoscale() @@ -1824,13 +1794,11 @@ public int getMaximumCapacityWithAutoscale() if (workerBehaviorConfig == null) { // Auto scale not setup log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); - maximumCapacity = -1; } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { // Auto scale not setup log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); - maximumCapacity = -1; } else { int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(getWorkers()); @@ -1850,14 +1818,16 @@ private static class HttpRemoteTaskRunnerWorkItem extends RemoteTaskRunnerWorkIt { enum State { - // Task has been given to HRTR, but a worker to run this task hasn't been identified yet. + /** Task has been submitted to the scheduler, but a worker to run it hasn't been identified yet. */ PENDING(0, true, RunnerTaskState.PENDING), - // A Worker has been identified to run this task, but request to run task hasn't been made to worker yet - // or worker hasn't acknowledged the task yet. + /** A worker has been selected for this task, but the task hasn't been acknowledged by the worker yet. */ PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING), + /** Task is running on a worker. */ RUNNING(2, false, RunnerTaskState.RUNNING), + + /** Task has completed (success or failure). */ COMPLETE(3, false, RunnerTaskState.NONE); private final int index; @@ -1876,14 +1846,21 @@ boolean isPending() return isPending; } + boolean inProgress() + { + return isPending || runnerTaskState == RunnerTaskState.RUNNING; + } + RunnerTaskState toRunnerTaskState() { return runnerTaskState; } } - private Task task; - private State state; + private volatile Task task; + private volatile State state; + private volatile TaskStatus resultStatus; + private final SettableFuture taskStartedFuture = SettableFuture.create(); HttpRemoteTaskRunnerWorkItem( String taskId, @@ -1905,9 +1882,9 @@ RunnerTaskState toRunnerTaskState() public boolean isRunningOnWorker(Worker candidateWorker) { - return getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING && - getWorker() != null && - Objects.equal(getWorker().getHost(), candidateWorker.getHost()); + return getState() == State.RUNNING && + getWorker() != null && + Objects.equal(getWorker().getHost(), candidateWorker.getHost()); } public Task getTask() @@ -1935,9 +1912,19 @@ public State getState() public void setResult(TaskStatus status) { setState(State.COMPLETE); + this.resultStatus = status; super.setResult(status); } + /** + * Returns the terminal {@link TaskStatus} if this task has completed, or {@code null} if it is still in progress. + */ + @Nullable + public TaskStatus getResultStatus() + { + return resultStatus; + } + public void setState(State state) { Preconditions.checkArgument( @@ -1950,31 +1937,49 @@ public void setState(State state) setStateUnconditionally(state); } - public void revertStateFromPendingWorkerAssignToPending() + /** + * Reverts state from PENDING_WORKER_ASSIGN back to PENDING when a worker assignment attempt fails + * and the task must be requeued. This is the only allowed backward transition. + */ + void revertToPending() { - Preconditions.checkState( - this.state == State.PENDING_WORKER_ASSIGN, - "Can't move state from [%s] to [%s]", - this.state, - State.PENDING - ); - - setStateUnconditionally(State.PENDING); + if (this.state == State.PENDING_WORKER_ASSIGN) { + setStateUnconditionally(State.PENDING); + } } private void setStateUnconditionally(State state) { - if (log.isDebugEnabled()) { - // Exception is logged to know what led to this call. - log.debug( - new RuntimeException("Stacktrace..."), - "Setting task[%s] work item state from [%s] to [%s].", - getTaskId(), - this.state, - state - ); - } this.state = state; + if (state == State.RUNNING) { + taskStartedFuture.set(null); + } + } + + public SettableFuture getTaskStartedFuture() + { + return taskStartedFuture; + } + } + + private static class PendingTaskQueueItem + { + private final Task task; + + PendingTaskQueueItem(Task task) + { + this.task = task; + } + + public Task getTask() + { + return task; + } + + /** Returns a new item for the same task, placing it at the back of the queue. */ + public PendingTaskQueueItem requeue() + { + return new PendingTaskQueueItem(task); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java index 3e0fddc1c0e8..fd1b53ffa3c2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java @@ -22,9 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.inject.Inject; -import com.google.inject.Provider; -import org.apache.curator.framework.CuratorFramework; -import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Smile; @@ -37,9 +34,6 @@ import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.server.initialization.IndexerZkConfig; - -import javax.annotation.Nullable; /** */ @@ -58,11 +52,6 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory cfProvider, - final IndexerZkConfig indexerZkConfig, - final ZkEnablementConfig zkEnablementConfig, final ServiceEmitter emitter ) { @@ -87,14 +73,7 @@ public HttpRemoteTaskRunnerFactory( this.provisioningStrategy = provisioningStrategy; this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; this.taskStorage = taskStorage; - this.indexerZkConfig = indexerZkConfig; this.emitter = emitter; - - if (zkEnablementConfig.isEnabled()) { - this.cf = cfProvider.get(); - } else { - this.cf = null; - } } @Override @@ -108,8 +87,6 @@ public HttpRemoteTaskRunner build() provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy : new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, taskStorage, - cf, - indexerZkConfig, emitter ); return runner; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java index fc2c5ced2542..fc19dd378124 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java @@ -72,7 +72,7 @@ public Response getPendingTasksQueue() return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build(); } - return Response.ok().entity(httpRemoteTaskRunner.getPendingTasksList()).build(); + return Response.ok().entity(httpRemoteTaskRunner.getPendingTasks()).build(); } @GET @@ -98,7 +98,7 @@ public Response getBlacklistedWorkers() return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build(); } - return Response.ok().entity(httpRemoteTaskRunner.getBlacklistedWorkers()).build(); + return Response.ok().entity(httpRemoteTaskRunner.getBlackListedWorkers()).build(); } @GET @@ -124,7 +124,7 @@ public Response getWorkersWithUnacknowledgedTasks() return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build(); } - return Response.ok().entity(httpRemoteTaskRunner.getWorkersWithUnacknowledgedTasks()).build(); + return Response.ok().entity(httpRemoteTaskRunner.getPendingAssignWorkers()).build(); } @GET diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index dc219d4f8b7f..90039e50fb79 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -64,9 +64,8 @@ public class WorkerHolder public static final TypeReference> WORKER_SYNC_RESP_TYPE_REF = new TypeReference<>() {}; - private final Worker worker; - private Worker disabledWorker; + private volatile Worker disabledWorker; protected final AtomicBoolean disabled; private final AtomicBoolean syncedAtleastOnce = new AtomicBoolean(false); @@ -86,6 +85,23 @@ public class WorkerHolder private final Listener listener; + private final AtomicReference state; + + public enum State + { + /** Worker is online and ready to accept new tasks. */ + READY, + + /** A task has been submitted to this worker, but the task hasn't been acknowledged by the worker yet. */ + PENDING_ASSIGN, + + /** Worker has exceeded the failure threshold and will not receive new tasks until the backoff period elapses. */ + BLACKLISTED, + + /** Worker has no running tasks and has been marked as reapable by the auto-scaler. */ + LAZY + } + public WorkerHolder( ObjectMapper smileMapper, HttpClient httpClient, @@ -121,6 +137,8 @@ public WorkerHolder( knownAnnouncements.forEach(e -> announcements.put(e.getTaskId(), e)); } tasksSnapshotRef = new AtomicReference<>(announcements); + + this.state = new AtomicReference<>(State.READY); } public Worker getWorker() @@ -465,4 +483,19 @@ public interface Listener void stateChanged(boolean enabled, WorkerHolder workerHolder); } + + public State getState() + { + return state.get(); + } + + public State compareAndExchangeState(State expectedState, State newState) + { + return state.compareAndExchange(expectedState, newState); + } + + public void setState(State state) + { + this.state.set(state); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 388af3bfe910..ba4a737f9203 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.curator.framework.CuratorFramework; import org.apache.druid.common.guava.DSuppliers; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.HttpInputSource; @@ -87,8 +86,6 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; -import org.apache.druid.server.initialization.IndexerZkConfig; -import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.Interval; @@ -193,7 +190,7 @@ public void testManageQueuedTasksReleaseLockWhenTaskIsNotReady() throws Exceptio } @Test - public void testManageQueuedTasksDoesNothingWhenInactive() throws Exception + public void testManageQueuedTasksDoesNothingWhenInactive() { // Add a task to the queue while active final TestTask task = new TestTask("t1", Intervals.of("2021-01/P1M")); @@ -767,8 +764,6 @@ private HttpRemoteTaskRunner createHttpRemoteTaskRunner() new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), serviceEmitter ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 3a3dcc3a7a12..3d3bc6e4ff6c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -26,7 +26,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import org.apache.curator.framework.CuratorFramework; +import com.google.common.util.concurrent.Futures; import org.apache.druid.common.guava.DSuppliers; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DiscoveryDruidNode; @@ -39,6 +39,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; @@ -52,19 +53,20 @@ import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.ChangeRequestHttpSyncer; -import org.apache.druid.server.initialization.IndexerZkConfig; -import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; @@ -113,7 +115,8 @@ public void testFreshStart() throws Exception HttpRemoteTaskRunner taskRunner = newHttpTaskRunnerInstance( druidNodeDiscoveryProvider, - new NoopProvisioningStrategy<>()); + new NoopProvisioningStrategy<>() + ); taskRunner.start(); @@ -163,7 +166,8 @@ public void testFreshStart_nodeDiscoveryTimedOut() throws Exception HttpRemoteTaskRunner taskRunner = newHttpTaskRunnerInstance( druidNodeDiscoveryProvider, - new NoopProvisioningStrategy<>()); + new NoopProvisioningStrategy<>() + ); taskRunner.start(); @@ -237,18 +241,18 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", Wo HttpRemoteTaskRunner taskRunner = newHttpTaskRunnerInstance( druidNodeDiscoveryProvider, - provisioningStrategy); + provisioningStrategy + ); taskRunner.start(); druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1, druidNode2)); - ConcurrentMap workers = taskRunner.getWorkersForTestingReadOnly(); - Assert.assertEquals(2, workers.size()); - Assert.assertTrue(workers.values().stream().noneMatch(w -> w.getUnderlyingSyncer().isExecutorShutdown())); - workers.values().iterator().next().stop(); + + // Verify workers were added + Assert.assertEquals(2, taskRunner.getWorkers().size()); + taskRunner.stop(); Assert.assertTrue(druidNodeDiscovery.getListeners().isEmpty()); - Assert.assertEquals(2, workers.size()); - Assert.assertTrue(workers.values().stream().allMatch(w -> w.getUnderlyingSyncer().isExecutorShutdown())); + EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService); } @@ -285,8 +289,6 @@ public int getPendingTasksRunnerNumThreads() provisioningStrategy, druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) { @@ -354,8 +356,6 @@ public int getPendingTasksRunnerNumThreads() new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) { @@ -459,8 +459,6 @@ public int getPendingTasksRunnerNumThreads() new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, taskStorageMock, - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) { @@ -602,8 +600,6 @@ public int getPendingTasksRunnerNumThreads() new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) { @@ -778,8 +774,6 @@ public Period getTaskCleanupTimeout() new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) { @@ -976,8 +970,6 @@ public int getPendingTasksRunnerNumThreads() new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) { @@ -1188,48 +1180,76 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", Wo /* * Task goes PENDING -> RUNNING -> SUCCESS and few more useless notifications in between. */ - @Test + @Test(timeout = 60_000L) public void testTaskAddedOrUpdated1() throws Exception { Task task = NoopTask.create(); List listenerNotificationsAccumulator = new ArrayList<>(); - HttpRemoteTaskRunner taskRunner = createTaskRunnerForTestTaskAddedOrUpdated( - EasyMock.createStrictMock(TaskStorage.class), - listenerNotificationsAccumulator - ); + Worker worker = new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); - EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)).anyTimes(); + EasyMock.expect(workerHolder.getWorker()) + .andReturn(worker) + .anyTimes(); + EasyMock.expect(workerHolder.getState()).andReturn(WorkerHolder.State.READY).anyTimes(); + EasyMock.expect(workerHolder.isInitialized()).andReturn(true).anyTimes(); + EasyMock.expect(workerHolder.isEnabled()).andReturn(true).anyTimes(); + EasyMock.expect(workerHolder.toImmutable()) + .andReturn(new ImmutableWorkerInfo(worker, 0, ImmutableSet.of(), ImmutableSet.of(), DateTimes.nowUtc())) + .anyTimes(); workerHolder.setLastCompletedTaskTime(EasyMock.anyObject()); + workerHolder.setState(EasyMock.anyObject()); + // Scheduling thread resets worker from PENDING_ASSIGN → READY in the finally block after assignment. + EasyMock.expect(workerHolder.compareAndExchangeState(WorkerHolder.State.PENDING_ASSIGN, WorkerHolder.State.READY)) + .andReturn(WorkerHolder.State.PENDING_ASSIGN) + .anyTimes(); workerHolder.resetContinuouslyFailedTasksCount(); EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0); + workerHolder.start(); + EasyMock.expectLastCall(); EasyMock.replay(workerHolder); + HttpRemoteTaskRunner taskRunner = createTaskRunnerForTestTaskAddedOrUpdated( + EasyMock.createStrictMock(TaskStorage.class), + listenerNotificationsAccumulator, + worker, + workerHolder + ); + + // Register the mock worker using the proper API + taskRunner.addWorker(worker); + Future future = taskRunner.run(task); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId()); // RUNNING notification from worker - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.running(task.getId()), - TaskLocation.create("worker", 1000, 1001) - ), workerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()), + TaskLocation.create("worker", 1000, 1001) + ), workerHolder + ); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); // Another RUNNING notification from worker, notifying change in location - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.running(task.getId()), - TaskLocation.create("worker", 1, 2) - ), workerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()), + TaskLocation.create("worker", 1, 2) + ), workerHolder + ); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); // Redundant RUNNING notification from worker, ignored - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.running(task.getId()), - TaskLocation.create("worker", 1, 2) - ), workerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()), + TaskLocation.create("worker", 1, 2) + ), workerHolder + ); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); // Another "rogue-worker" reports running it, and gets asked to shutdown the task @@ -1239,11 +1259,13 @@ public void testTaskAddedOrUpdated1() throws Exception .anyTimes(); rogueWorkerHolder.shutdownTask(task.getId()); EasyMock.replay(rogueWorkerHolder); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.running(task.getId()), - TaskLocation.create("rogue-worker", 1, 2) - ), rogueWorkerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()), + TaskLocation.create("rogue-worker", 1, 2) + ), rogueWorkerHolder + ); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); EasyMock.verify(rogueWorkerHolder); @@ -1253,20 +1275,24 @@ public void testTaskAddedOrUpdated1() throws Exception .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY)) .anyTimes(); EasyMock.replay(rogueWorkerHolder); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.failure(task.getId(), "Dummy task status failure err message"), - TaskLocation.create("rogue-worker", 1, 2) - ), rogueWorkerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.failure(task.getId(), "Dummy task status failure err message"), + TaskLocation.create("rogue-worker", 1, 2) + ), rogueWorkerHolder + ); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); EasyMock.verify(rogueWorkerHolder); // workers sends SUCCESS notification, task is marked SUCCESS now. - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.success(task.getId()), - TaskLocation.create("worker", 1, 2) - ), workerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.success(task.getId()), + TaskLocation.create("worker", 1, 2) + ), workerHolder + ); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId()); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -1277,11 +1303,13 @@ public void testTaskAddedOrUpdated1() throws Exception .anyTimes(); rogueWorkerHolder.shutdownTask(task.getId()); EasyMock.replay(rogueWorkerHolder); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.running(task.getId()), - TaskLocation.create("rogue-worker", 1, 2) - ), rogueWorkerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()), + TaskLocation.create("rogue-worker", 1, 2) + ), rogueWorkerHolder + ); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId()); EasyMock.verify(rogueWorkerHolder); @@ -1291,11 +1319,13 @@ public void testTaskAddedOrUpdated1() throws Exception .andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1", WorkerConfig.DEFAULT_CATEGORY)) .anyTimes(); EasyMock.replay(rogueWorkerHolder); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.failure(task.getId(), "Dummy task status failure for testing"), - TaskLocation.create("rogue-worker", 1, 2) - ), rogueWorkerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.failure(task.getId(), "Dummy task status failure for testing"), + TaskLocation.create("rogue-worker", 1, 2) + ), rogueWorkerHolder + ); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId()); EasyMock.verify(rogueWorkerHolder); @@ -1304,12 +1334,12 @@ public void testTaskAddedOrUpdated1() throws Exception EasyMock.verify(workerHolder); Assert.assertEquals( - listenerNotificationsAccumulator, ImmutableList.of( ImmutableList.of(task.getId(), TaskLocation.create("worker", 1000, 1001)), ImmutableList.of(task.getId(), TaskLocation.create("worker", 1, 2)), ImmutableList.of(task.getId(), TaskStatus.success(task.getId())) - ) + ), + listenerNotificationsAccumulator ); } @@ -1317,33 +1347,57 @@ public void testTaskAddedOrUpdated1() throws Exception * Task goes from PENDING -> SUCCESS . Happens when TaskRunner is given task but a worker reported it being already * completed with SUCCESS. */ - @Test + @Test(timeout = 60_000L) public void testTaskAddedOrUpdated2() throws Exception { Task task = NoopTask.create(); List listenerNotificationsAccumulator = new ArrayList<>(); - HttpRemoteTaskRunner taskRunner = createTaskRunnerForTestTaskAddedOrUpdated( - EasyMock.createStrictMock(TaskStorage.class), - listenerNotificationsAccumulator - ); Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes(); + EasyMock.expect(workerHolder.getState()).andReturn(WorkerHolder.State.READY).anyTimes(); + EasyMock.expect(workerHolder.isInitialized()).andReturn(true).anyTimes(); + EasyMock.expect(workerHolder.isEnabled()).andReturn(true).anyTimes(); + EasyMock.expect(workerHolder.toImmutable()) + .andReturn(new ImmutableWorkerInfo(worker, 0, ImmutableSet.of(), ImmutableSet.of(), DateTimes.nowUtc())) + .anyTimes(); + workerHolder.setState(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + // Scheduling thread resets worker from PENDING_ASSIGN → READY in the finally block after assignment. + EasyMock.expect(workerHolder.compareAndExchangeState(WorkerHolder.State.PENDING_ASSIGN, WorkerHolder.State.READY)) + .andReturn(WorkerHolder.State.PENDING_ASSIGN) + .anyTimes(); workerHolder.setLastCompletedTaskTime(EasyMock.anyObject()); workerHolder.resetContinuouslyFailedTasksCount(); EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0); + EasyMock.expect(workerHolder.getBlacklistedUntil()).andReturn(null); + EasyMock.expectLastCall().anyTimes(); + workerHolder.start(); + EasyMock.expectLastCall(); EasyMock.replay(workerHolder); + HttpRemoteTaskRunner taskRunner = createTaskRunnerForTestTaskAddedOrUpdated( + EasyMock.createStrictMock(TaskStorage.class), + listenerNotificationsAccumulator, + worker, + workerHolder + ); + + // Register the mock worker using the proper API + taskRunner.addWorker(worker); + Future future = taskRunner.run(task); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId()); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.success(task.getId()), - TaskLocation.create("worker", 1, 2) - ), workerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.success(task.getId()), + TaskLocation.create("worker", 1, 2) + ), workerHolder + ); Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId()); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -1351,11 +1405,11 @@ public void testTaskAddedOrUpdated2() throws Exception EasyMock.verify(workerHolder); Assert.assertEquals( - listenerNotificationsAccumulator, ImmutableList.of( ImmutableList.of(task.getId(), TaskLocation.create("worker", 1, 2)), ImmutableList.of(task.getId(), TaskStatus.success(task.getId())) - ) + ), + listenerNotificationsAccumulator ); } @@ -1363,7 +1417,7 @@ public void testTaskAddedOrUpdated2() throws Exception * Notifications received for tasks not known to TaskRunner maybe known to TaskStorage. * This could happen when TaskRunner starts and workers reports running/completed tasks on them. */ - @Test + @Test(timeout = 60_000L) public void testTaskAddedOrUpdated3() { Task task1 = NoopTask.create(); @@ -1383,71 +1437,101 @@ public void testTaskAddedOrUpdated3() EasyMock.replay(taskStorage); List listenerNotificationsAccumulator = new ArrayList<>(); - HttpRemoteTaskRunner taskRunner = - createTaskRunnerForTestTaskAddedOrUpdated(taskStorage, listenerNotificationsAccumulator); Worker worker = new Worker("http", "localhost", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes(); + EasyMock.expect(workerHolder.getState()).andReturn(WorkerHolder.State.READY).anyTimes(); + EasyMock.expect(workerHolder.isInitialized()).andReturn(true).anyTimes(); + EasyMock.expect(workerHolder.isEnabled()).andReturn(true).anyTimes(); + EasyMock.expect(workerHolder.toImmutable()) + .andReturn(new ImmutableWorkerInfo(worker, 0, ImmutableSet.of(), ImmutableSet.of(), DateTimes.nowUtc())) + .anyTimes(); + workerHolder.setState(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); workerHolder.setLastCompletedTaskTime(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); workerHolder.resetContinuouslyFailedTasksCount(); + EasyMock.expectLastCall().anyTimes(); EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0); + EasyMock.expect(workerHolder.getBlacklistedUntil()).andReturn(null); + EasyMock.expectLastCall().anyTimes(); workerHolder.shutdownTask(task3.getId()); workerHolder.shutdownTask(task5.getId()); + workerHolder.start(); + EasyMock.expectLastCall(); EasyMock.replay(workerHolder); + HttpRemoteTaskRunner taskRunner = + createTaskRunnerForTestTaskAddedOrUpdated(taskStorage, listenerNotificationsAccumulator, worker, workerHolder); + + // Register the mock worker using the proper API + taskRunner.addWorker(worker); + Assert.assertEquals(0, taskRunner.getKnownTasks().size()); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task1, - TaskStatus.running(task1.getId()), - TaskLocation.create("worker", 1, 2) - ), workerHolder); - - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task2, - TaskStatus.success(task2.getId()), - TaskLocation.create("worker", 3, 4) - ), workerHolder); - - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task3, - TaskStatus.running(task3.getId()), - TaskLocation.create("worker", 5, 6) - ), workerHolder); - - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task4, - TaskStatus.success(task4.getId()), - TaskLocation.create("worker", 7, 8) - ), workerHolder); - - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task5, - TaskStatus.running(task5.getId()), - TaskLocation.create("worker", 9, 10) - ), workerHolder); - - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task6, - TaskStatus.success(task6.getId()), - TaskLocation.create("worker", 11, 12) - ), workerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task1, + TaskStatus.running(task1.getId()), + TaskLocation.create("worker", 1, 2) + ), workerHolder + ); + + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task2, + TaskStatus.success(task2.getId()), + TaskLocation.create("worker", 3, 4) + ), workerHolder + ); + + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task3, + TaskStatus.running(task3.getId()), + TaskLocation.create("worker", 5, 6) + ), workerHolder + ); + + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task4, + TaskStatus.success(task4.getId()), + TaskLocation.create("worker", 7, 8) + ), workerHolder + ); + + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task5, + TaskStatus.running(task5.getId()), + TaskLocation.create("worker", 9, 10) + ), workerHolder + ); + + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task6, + TaskStatus.success(task6.getId()), + TaskLocation.create("worker", 11, 12) + ), workerHolder + ); EasyMock.verify(workerHolder, taskStorage); Assert.assertEquals( - listenerNotificationsAccumulator, ImmutableList.of( ImmutableList.of(task1.getId(), TaskLocation.create("worker", 1, 2)), ImmutableList.of(task2.getId(), TaskLocation.create("worker", 3, 4)), ImmutableList.of(task2.getId(), TaskStatus.success(task2.getId())) - ) + ), + listenerNotificationsAccumulator ); } - @Test + @Test(timeout = 60_000L) public void testTimeoutInAssigningTasks() throws Exception { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); @@ -1477,8 +1561,6 @@ public Period getTaskAssignmentTimeout() new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) { @@ -1556,11 +1638,11 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", Wo Assert.assertTrue(future.get().isFailure()); Assert.assertNotNull(future.get().getErrorMsg()); Assert.assertTrue( - future.get().getErrorMsg().startsWith("The worker that this task is assigned did not start it in timeout") + future.get().getErrorMsg().startsWith("Failed to assign this task") ); } - @Test + @Test(timeout = 60_000L) public void testExceptionThrownInAssigningTasks() throws Exception { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); @@ -1590,8 +1672,6 @@ public Period getTaskAssignmentTimeout() new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) { @@ -1677,7 +1757,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", Wo * Validate the internal state of tasks within the task runner * when shutdown is called on pending / running tasks and completed tasks */ - @Test + @Test(timeout = 60_000L) public void testShutdown() { List listenerNotificationsAccumulator = new ArrayList<>(); @@ -1690,9 +1770,15 @@ public void testShutdown() WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes(); + EasyMock.expect(workerHolder.getState()).andReturn(WorkerHolder.State.READY).anyTimes(); workerHolder.setLastCompletedTaskTime(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); workerHolder.resetContinuouslyFailedTasksCount(); - EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(0); + EasyMock.expectLastCall().anyTimes(); + workerHolder.incrementContinuouslyFailedTasksCount(); + EasyMock.expectLastCall().anyTimes(); + workerHolder.setBlacklistedUntil(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); EasyMock.replay(workerHolder); taskRunner.start(); @@ -1710,11 +1796,13 @@ public void testShutdown() Task completedTask = NoopTask.create(); taskRunner.run(completedTask); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - completedTask, - TaskStatus.success(completedTask.getId()), - TaskLocation.create("worker", 1, 2) - ), workerHolder); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + completedTask, + TaskStatus.success(completedTask.getId()), + TaskLocation.create("worker", 1, 2) + ), workerHolder + ); Assert.assertEquals(completedTask.getId(), Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId()); TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); @@ -1726,10 +1814,10 @@ public void testShutdown() // Completed tasks are cleaned up when shutdown is invokded on them (by TaskQueue) taskRunner.shutdown(completedTask.getId(), "Cleanup"); Assert.assertFalse(taskRunner.getKnownTasks() - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toSet()) - .contains(completedTask.getId()) + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toSet()) + .contains(completedTask.getId()) ); } @@ -1751,8 +1839,6 @@ public void testSyncMonitoring_finiteIteration() new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) { @@ -1780,13 +1866,13 @@ protected WorkerHolder createWorkerHolder( Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size()); } - @Test + @Test(timeout = 60_000L) public void testGetMaximumCapacity_noWorkerConfig() { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) - .andReturn(druidNodeDiscovery); + .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( @@ -1797,44 +1883,45 @@ public void testGetMaximumCapacity_noWorkerConfig() new TestProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ); Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale()); } - @Test + @Test(timeout = 60_000L) public void testGetMaximumCapacity_noAutoScaler() { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) - .andReturn(druidNodeDiscovery); + .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( TestHelper.makeJsonMapper(), new HttpRemoteTaskRunnerConfig(), EasyMock.createNiceMock(HttpClient.class), - DSuppliers.of(new AtomicReference<>(new DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null, null), null))), + DSuppliers.of(new AtomicReference<>(new DefaultWorkerBehaviorConfig( + new EqualDistributionWorkerSelectStrategy( + null, + null + ), null + ))), new TestProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ); Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale()); } - @Test + @Test(timeout = 60_000L) public void testGetMaximumCapacity_withAutoScaler() { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) - .andReturn(druidNodeDiscovery); + .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( @@ -1845,8 +1932,6 @@ public void testGetMaximumCapacity_withAutoScaler() new TestProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ); // Default autoscaler has max workers of 0 @@ -1857,6 +1942,16 @@ public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated( TaskStorage taskStorage, List listenerNotificationsAccumulator ) + { + return createTaskRunnerForTestTaskAddedOrUpdated(taskStorage, listenerNotificationsAccumulator, null, null); + } + + public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated( + TaskStorage taskStorage, + List listenerNotificationsAccumulator, + Worker mockWorker, + WorkerHolder mockWorkerHolder + ) { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); @@ -1864,6 +1959,17 @@ public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated( .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); + // Create HttpClient mock that returns a successful status response for worker syncing + HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class); + EasyMock.expect(httpClient.go( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + )).andReturn(Futures.immediateFuture( + new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder()) + )).anyTimes(); + EasyMock.replay(httpClient); + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( TestHelper.makeJsonMapper(), new HttpRemoteTaskRunnerConfig() @@ -1874,15 +1980,39 @@ public int getPendingTasksRunnerNumThreads() return 3; } }, - EasyMock.createNiceMock(HttpClient.class), + httpClient, DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, taskStorage, - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() - ); + ) + { + @Override + protected WorkerHolder createWorkerHolder( + ObjectMapper smileMapper, + HttpClient httpClient, + HttpRemoteTaskRunnerConfig config, + ScheduledExecutorService workersSyncExec, + WorkerHolder.Listener listener, + Worker worker, + List knownAnnouncements + ) + { + if (mockWorker != null && mockWorkerHolder != null && worker.getHost().equals(mockWorker.getHost())) { + return mockWorkerHolder; + } + return super.createWorkerHolder( + smileMapper, + httpClient, + config, + workersSyncExec, + listener, + worker, + knownAnnouncements + ); + } + }; taskRunner.start(); @@ -1931,6 +2061,7 @@ private WorkerHolder createNonSyncingWorkerHolder(Worker worker) WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); EasyMock.expect(workerHolder.getUnderlyingSyncer()).andReturn(syncer).anyTimes(); EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes(); + EasyMock.expect(workerHolder.getState()).andReturn(WorkerHolder.State.READY).anyTimes(); workerHolder.start(); EasyMock.expectLastCall(); workerHolder.stop(); @@ -2170,7 +2301,8 @@ WorkerHolder apply( private static HttpRemoteTaskRunner newHttpTaskRunnerInstance( DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, - ProvisioningStrategy provisioningStrategy) + ProvisioningStrategy provisioningStrategy + ) { return new HttpRemoteTaskRunner( TestHelper.makeJsonMapper(), @@ -2187,8 +2319,6 @@ public int getPendingTasksRunnerNumThreads() provisioningStrategy, druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ) {