Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces ReflectionServerV2, which enables the Genkit runtime to act as a WebSocket client for dev tools integration using JSON-RPC 2.0. The Genkit class is updated to conditionally start this server based on the GENKIT_REFLECTION_V2_SERVER environment variable. Feedback identifies a critical bug in handleCancelAction where all active actions are interrupted instead of the specific one requested, and points out an unused stream variable in handleRunAction.
| for (Map.Entry<String, Thread> entry : activeActions.entrySet()) { | ||
| // Interrupt the thread to cancel the action | ||
| entry.getValue().interrupt(); | ||
| activeActions.remove(entry.getKey()); | ||
| found = true; | ||
| } |
There was a problem hiding this comment.
The current implementation of handleCancelAction incorrectly interrupts all active actions, rather than the specific action targeted by the traceId. The loop iterates over the entire activeActions map and calls interrupt() on every thread. This can lead to unexpected termination of unrelated, concurrently running actions.
The comment on lines 487-488 acknowledges the difficulty in mapping traceId to a thread, but the implemented logic is critically flawed as it doesn't attempt to match, and the comment on line 488 is misleading as it interrupts all threads, not "all matching threads".
To fix this, you could introduce a mapping from traceId to requestId. This map would be populated in handleRunAction once the traceId is known, and then used in handleCancelAction to find the correct action to interrupt.
Here's a conceptual example of how you could implement this:
- Add a new map to the class:
private final ConcurrentHashMap<String, String> traceIdToRequestId = new ConcurrentHashMap<>(); - In
handleRunAction, after getting thetraceId, populate the map:traceId = result.getTraceId(); if (traceId != null) { traceIdToRequestId.put(traceId, requestId); }
- In the
finallyblock ofhandleRunAction, clean up the map:if (traceId != null) { traceIdToRequestId.remove(traceId); } activeActions.remove(requestId);
- Update
handleCancelActionto use this map to find and interrupt the correct thread.
| try { | ||
| String key = params.has("key") ? params.get("key").asText() : null; | ||
| JsonNode input = params.has("input") ? params.get("input") : null; | ||
| boolean stream = params.has("stream") && params.get("stream").asBoolean(); |
Description
Support for Reflection V2 in Genkit Java
Type of Change
Documentation
Reviewer Notes