diff --git a/infra-ai/src/main/java/com/nageoffer/ai/ragent/infra/chat/RoutingLLMService.java b/infra-ai/src/main/java/com/nageoffer/ai/ragent/infra/chat/RoutingLLMService.java index 4be0801a..a2b7f24e 100644 --- a/infra-ai/src/main/java/com/nageoffer/ai/ragent/infra/chat/RoutingLLMService.java +++ b/infra-ai/src/main/java/com/nageoffer/ai/ragent/infra/chat/RoutingLLMService.java @@ -222,6 +222,7 @@ private static final class ProbeBufferingCallback implements StreamCallback { private final Object lock = new Object(); private final List bufferedEvents = new ArrayList<>(); private volatile boolean committed; + private boolean draining; private ProbeBufferingCallback(StreamCallback downstream, FirstPacketAwaiter awaiter) { this.downstream = downstream; @@ -259,34 +260,23 @@ public void onError(Throwable t) { * 2. 按事件顺序回放缓存,保证时序一致 */ private void commit() { - List snapshot; synchronized (lock) { if (committed) { return; } committed = true; - if (bufferedEvents.isEmpty()) { - return; - } - snapshot = new ArrayList<>(bufferedEvents); - bufferedEvents.clear(); - } - for (BufferedEvent event : snapshot) { - dispatch(event); } + drain(); } private void bufferOrDispatch(BufferedEvent event) { - boolean dispatchNow; synchronized (lock) { - dispatchNow = committed; - if (!dispatchNow) { - bufferedEvents.add(event); + bufferedEvents.add(event); + if (!committed) { + return; } } - if (dispatchNow) { - dispatch(event); - } + drain(); } private void dispatch(BufferedEvent event) { @@ -300,6 +290,26 @@ private void dispatch(BufferedEvent event) { } } + private void drain() { + while (true) { + BufferedEvent event; + synchronized (lock) { + if (draining || bufferedEvents.isEmpty()) { + return; + } + draining = true; + event = bufferedEvents.remove(0); + } + try { + dispatch(event); + } finally { + synchronized (lock) { + draining = false; + } + } + } + } + private record BufferedEvent(EventType type, String content, Throwable error) { private static BufferedEvent content(String content) {