From 999a91ec738b73da89d348f5ce5d7b7fde14339e Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 12 May 2026 14:13:45 +0530 Subject: [PATCH] Decouple FilterTreeCallbacks from TaskResourceTrackingService Introduce DelegationThreadTracker interface in the SPI layer so that FilterTreeCallbacks no longer depends on TaskResourceTrackingService directly. The tracker is created as a closure in AnalyticsSearchService where both the service and taskId are naturally available, keeping the FFM callback layer free of core OpenSearch dependencies. Also fixes premature tracker cleanup in the streaming path by tying the tracker lifecycle to FragmentResources.close() instead of a finally block that fires before the stream is consumed. Signed-off-by: Bukhtawar Khan Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Bukhtawar Khan --- .../spi/AnalyticsSearchBackendPlugin.java | 12 ++---- .../spi/DelegationThreadTracker.java | 32 +++++++++++++++ .../DataFusionAnalyticsBackendPlugin.java | 9 +---- .../indexfilter/FilterTreeCallbacks.java | 23 +++++------ .../DelegationTaskTrackingTests.java | 40 ++++++++++++++----- .../exec/AnalyticsSearchService.java | 25 +++++++++--- .../analytics/exec/FragmentResources.java | 22 +++++++++- 7 files changed, 115 insertions(+), 48 deletions(-) create mode 100644 sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/DelegationThreadTracker.java diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java index fdc7827ac93af..59f9b1f899f92 100644 --- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AnalyticsSearchBackendPlugin.java @@ -115,14 +115,8 @@ default void configureFilterDelegation(FilterDelegationHandle handle, BackendExe } /** - * Configure task-level resource tracking for delegation callbacks executing on foreign threads. - * Called after {@link #configureFilterDelegation}. Backends should wrap their callback dispatch - * with start/finish tracking calls for the given task. + * Install a thread tracker for attribution of delegation callbacks executing on foreign threads. + * Called after {@link #configureFilterDelegation}. Pass {@code null} to clear. */ - default void configureTaskTracking(org.opensearch.tasks.TaskResourceTrackingService trackingService, long taskId) {} - - /** - * Clear task tracking state after fragment execution completes. - */ - default void clearTaskTracking() {} + default void setDelegationThreadTracker(DelegationThreadTracker tracker) {} } diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/DelegationThreadTracker.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/DelegationThreadTracker.java new file mode 100644 index 0000000000000..97d33003de745 --- /dev/null +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/DelegationThreadTracker.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.analytics.spi; + +/** + * Tracks thread-level resource attribution for delegation callbacks executing + * on foreign threads (e.g., DataFusion/Tokio workers invoking Lucene via FFM). + * + * @opensearch.internal + */ +public interface DelegationThreadTracker { + + /** + * Signal that delegation work has started on the current thread. + * + * @return thread id to pass to {@link #trackEnd}, or {@code -1} if tracking is inactive + */ + long trackStart(); + + /** + * Signal that delegation work has finished on the given thread. + * + * @param threadId the value returned by {@link #trackStart} + */ + void trackEnd(long threadId); +} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java index 31e911c6d193e..280e746660111 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java @@ -569,12 +569,7 @@ public void configureFilterDelegation(FilterDelegationHandle handle, BackendExec } @Override - public void configureTaskTracking(org.opensearch.tasks.TaskResourceTrackingService trackingService, long taskId) { - FilterTreeCallbacks.setTaskTracking(trackingService, taskId); - } - - @Override - public void clearTaskTracking() { - FilterTreeCallbacks.setTaskTracking(null, -1); + public void setDelegationThreadTracker(org.opensearch.analytics.spi.DelegationThreadTracker tracker) { + FilterTreeCallbacks.setThreadTracker(tracker); } } diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java index 7d316c3ef3873..90b0d00273529 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java @@ -11,8 +11,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.analytics.spi.DelegationThreadTracker; import org.opensearch.analytics.spi.FilterDelegationHandle; -import org.opensearch.tasks.TaskResourceTrackingService; import java.lang.foreign.MemorySegment; import java.util.concurrent.atomic.AtomicReference; @@ -36,8 +36,7 @@ public final class FilterTreeCallbacks { private static final Logger LOGGER = LogManager.getLogger(FilterTreeCallbacks.class); private static final AtomicReference HANDLE = new AtomicReference<>(); - private static final AtomicReference TRACKING_SERVICE = new AtomicReference<>(); - private static long currentTaskId = -1; + private static final AtomicReference TRACKER = new AtomicReference<>(); private FilterTreeCallbacks() {} @@ -51,25 +50,21 @@ public static void setHandle(FilterDelegationHandle handle) { } /** - * Configure task resource tracking. All subsequent callbacks will attribute - * CPU/heap to the given task until cleared. + * Install or clear the thread tracker for resource attribution. */ - public static void setTaskTracking(TaskResourceTrackingService trackingService, long taskId) { - TRACKING_SERVICE.set(trackingService); - currentTaskId = taskId; + public static void setThreadTracker(DelegationThreadTracker tracker) { + TRACKER.set(tracker); } private static long trackStart() { - TaskResourceTrackingService tracker = TRACKING_SERVICE.get(); - if (tracker == null || currentTaskId < 0) return -1; - long threadId = Thread.currentThread().threadId(); - tracker.taskExecutionStartedOnThread(currentTaskId, threadId); - return threadId; + DelegationThreadTracker t = TRACKER.get(); + return (t != null) ? t.trackStart() : -1; } private static void trackEnd(long threadId) { if (threadId < 0) return; - TRACKING_SERVICE.get().taskExecutionFinishedOnThread(currentTaskId, threadId); + DelegationThreadTracker t = TRACKER.get(); + if (t != null) t.trackEnd(threadId); } // ── Provider lifecycle (cold path, once per query) ──────────────── diff --git a/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/indexfilter/DelegationTaskTrackingTests.java b/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/indexfilter/DelegationTaskTrackingTests.java index cef17e92553bb..debc4cdfef69b 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/indexfilter/DelegationTaskTrackingTests.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/indexfilter/DelegationTaskTrackingTests.java @@ -9,6 +9,7 @@ package org.opensearch.be.datafusion.indexfilter; import org.opensearch.analytics.exec.task.AnalyticsShardTask; +import org.opensearch.analytics.spi.DelegationThreadTracker; import org.opensearch.analytics.spi.FilterDelegationHandle; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -50,19 +51,19 @@ public void setUp() throws Exception { ); trackingService.setTaskResourceTrackingEnabled(true); FilterTreeCallbacks.setHandle(null); - FilterTreeCallbacks.setTaskTracking(null, -1); + FilterTreeCallbacks.setThreadTracker(null); } @Override public void tearDown() throws Exception { - FilterTreeCallbacks.setTaskTracking(null, -1); + FilterTreeCallbacks.setThreadTracker(null); FilterTreeCallbacks.setHandle(null); terminate(threadPool); super.tearDown(); } /** - * Tests the full production wiring: configureTaskTracking via SPI, then + * Tests the full production wiring: setDelegationThreadTracker via SPI, then * all three callback methods (createProvider, createCollector, collectDocs) * on a foreign thread. Verifies the thread is tracked against the task. */ @@ -70,7 +71,7 @@ public void testAllCallbackMethodsTrackedOnForeignThread() throws Exception { AnalyticsShardTask task = createAndTrackTask(1); var backendPlugin = new org.opensearch.be.datafusion.DataFusionAnalyticsBackendPlugin(null); - backendPlugin.configureTaskTracking(trackingService, task.getId()); + backendPlugin.setDelegationThreadTracker(createTracker(task.getId())); FilterTreeCallbacks.setHandle(new MockHandle(new long[] { 0xCAFEL })); CountDownLatch done = new CountDownLatch(1); @@ -88,7 +89,7 @@ public void testAllCallbackMethodsTrackedOnForeignThread() throws Exception { foreignThread.start(); assertTrue(done.await(5, TimeUnit.SECONDS)); - backendPlugin.clearTaskTracking(); + backendPlugin.setDelegationThreadTracker(null); trackingService.stopTracking(task); Map> stats = task.getResourceStats(); @@ -96,17 +97,17 @@ public void testAllCallbackMethodsTrackedOnForeignThread() throws Exception { } /** - * Tests that clearTaskTracking stops attribution. After clearing, + * Tests that clearing the thread tracker stops attribution. After clearing, * callbacks on a new thread should NOT be attributed to the old task. */ public void testClearTaskTrackingStopsAttribution() throws Exception { AnalyticsShardTask task = createAndTrackTask(2); - FilterTreeCallbacks.setTaskTracking(trackingService, task.getId()); + FilterTreeCallbacks.setThreadTracker(createTracker(task.getId())); FilterTreeCallbacks.setHandle(new MockHandle(new long[] { 1L })); // Clear tracking BEFORE running callbacks - FilterTreeCallbacks.setTaskTracking(null, -1); + FilterTreeCallbacks.setThreadTracker(null); CountDownLatch done = new CountDownLatch(1); Thread foreignThread = new Thread(() -> { @@ -126,7 +127,7 @@ public void testClearTaskTrackingStopsAttribution() throws Exception { trackingService.stopTracking(task); Map> stats = task.getResourceStats(); - assertFalse("Thread after clearTaskTracking should NOT be tracked", stats.containsKey(foreignThread.threadId())); + assertFalse("Thread after clearing tracker should NOT be tracked", stats.containsKey(foreignThread.threadId())); } /** @@ -135,7 +136,7 @@ public void testClearTaskTrackingStopsAttribution() throws Exception { public void testConcurrentThreadsAllTracked() throws Exception { AnalyticsShardTask task = createAndTrackTask(3); - FilterTreeCallbacks.setTaskTracking(trackingService, task.getId()); + FilterTreeCallbacks.setThreadTracker(createTracker(task.getId())); FilterTreeCallbacks.setHandle(new MockHandle(new long[] { 0xFFL })); int threadCount = 4; @@ -165,7 +166,7 @@ public void testConcurrentThreadsAllTracked() throws Exception { } assertTrue(done.await(10, TimeUnit.SECONDS)); - FilterTreeCallbacks.setTaskTracking(null, -1); + FilterTreeCallbacks.setThreadTracker(null); trackingService.stopTracking(task); Map> stats = task.getResourceStats(); @@ -174,6 +175,23 @@ public void testConcurrentThreadsAllTracked() throws Exception { } } + private DelegationThreadTracker createTracker(long taskId) { + TaskResourceTrackingService service = trackingService; + return new DelegationThreadTracker() { + @Override + public long trackStart() { + long threadId = Thread.currentThread().threadId(); + service.taskExecutionStartedOnThread(taskId, threadId); + return threadId; + } + + @Override + public void trackEnd(long threadId) { + service.taskExecutionFinishedOnThread(taskId, threadId); + } + }; + } + private AnalyticsShardTask createAndTrackTask(long id) { AnalyticsShardTask task = new AnalyticsShardTask( id, diff --git a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java index 7e0f54072b1b9..3c389add9d00c 100644 --- a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java +++ b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java @@ -18,6 +18,7 @@ import org.opensearch.analytics.spi.AnalyticsSearchBackendPlugin; import org.opensearch.analytics.spi.BackendExecutionContext; import org.opensearch.analytics.spi.DelegationDescriptor; +import org.opensearch.analytics.spi.DelegationThreadTracker; import org.opensearch.analytics.spi.FilterDelegationHandle; import org.opensearch.analytics.spi.FragmentInstructionHandler; import org.opensearch.analytics.spi.FragmentInstructionHandlerFactory; @@ -99,8 +100,6 @@ public FragmentResources executeFragmentStreaming(FragmentExecutionRequest reque } catch (Exception e) { listener.onFragmentFailure(resolved.queryId, resolved.stageId, resolved.shardIdStr, e); throw new RuntimeException("Failed to start streaming fragment on " + shard.shardId(), e); - } finally { - backends.get(resolved.plan.getBackendId()).clearTaskTracking(); } } @@ -110,6 +109,7 @@ private FragmentResources startFragment(FragmentExecutionRequest request, Resolv SearchExecEngine engine = null; EngineResultStream stream = null; BackendExecutionContext backendContext = null; + Runnable trackerCleanup = null; try { ShardScanExecutionContext ctx = buildContext(request, gatedReader.get(), resolved.plan, shard, task); AnalyticsSearchBackendPlugin backend = backends.get(resolved.plan.getBackendId()); @@ -135,16 +135,31 @@ private FragmentResources startFragment(FragmentExecutionRequest request, Resolv backend.configureFilterDelegation(handle, backendContext); if (task != null && taskResourceTrackingService != null) { - backend.configureTaskTracking(taskResourceTrackingService, task.getId()); + long taskId = task.getId(); + TaskResourceTrackingService service = taskResourceTrackingService; + backend.setDelegationThreadTracker(new DelegationThreadTracker() { + @Override + public long trackStart() { + long threadId = Thread.currentThread().threadId(); + service.taskExecutionStartedOnThread(taskId, threadId); + return threadId; + } + + @Override + public void trackEnd(long threadId) { + service.taskExecutionFinishedOnThread(taskId, threadId); + } + }); + trackerCleanup = () -> backend.setDelegationThreadTracker(null); } } engine = backend.getSearchExecEngineProvider().createSearchExecEngine(ctx, backendContext); stream = engine.execute(ctx); - return new FragmentResources(gatedReader, engine, stream); + return new FragmentResources(gatedReader, engine, stream, trackerCleanup); } catch (Exception e) { try { - new FragmentResources(gatedReader, engine, stream).close(); + new FragmentResources(gatedReader, engine, stream, trackerCleanup).close(); } catch (Exception suppressed) { e.addSuppressed(suppressed); } diff --git a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java index 0c61c3b85f79c..92b8e5e1041be 100644 --- a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java +++ b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java @@ -25,15 +25,26 @@ public final class FragmentResources implements AutoCloseable { private final GatedCloseable gatedReader; private final SearchExecEngine engine; private final EngineResultStream stream; + private final Runnable onClose; public FragmentResources( GatedCloseable gatedReader, SearchExecEngine engine, EngineResultStream stream + ) { + this(gatedReader, engine, stream, null); + } + + public FragmentResources( + GatedCloseable gatedReader, + SearchExecEngine engine, + EngineResultStream stream, + Runnable onClose ) { this.gatedReader = gatedReader; this.engine = engine; this.stream = stream; + this.onClose = onClose; } public EngineResultStream stream() { @@ -42,8 +53,15 @@ public EngineResultStream stream() { @Override public void close() throws Exception { - Exception first; - first = closeQuietly(stream, null); + Exception first = null; + if (onClose != null) { + try { + onClose.run(); + } catch (Exception e) { + first = e; + } + } + first = closeQuietly(stream, first); first = closeQuietly(engine, first); first = closeQuietly(gatedReader, first); if (first != null) throw first;