Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ private static final class ProbeBufferingCallback implements StreamCallback {
private final Object lock = new Object();
private final List<BufferedEvent> bufferedEvents = new ArrayList<>();
private volatile boolean committed;
private boolean draining;

private ProbeBufferingCallback(StreamCallback downstream, FirstPacketAwaiter awaiter) {
this.downstream = downstream;
Expand Down Expand Up @@ -259,34 +260,23 @@ public void onError(Throwable t) {
* 2. 按事件顺序回放缓存,保证时序一致
*/
private void commit() {
List<BufferedEvent> snapshot;
synchronized (lock) {
if (committed) {
return;
}
committed = true;
if (bufferedEvents.isEmpty()) {
return;
}
snapshot = new ArrayList<>(bufferedEvents);
bufferedEvents.clear();
}
for (BufferedEvent event : snapshot) {
dispatch(event);
}
drain();
}

private void bufferOrDispatch(BufferedEvent event) {
boolean dispatchNow;
synchronized (lock) {
dispatchNow = committed;
if (!dispatchNow) {
bufferedEvents.add(event);
bufferedEvents.add(event);
if (!committed) {
return;
}
}
if (dispatchNow) {
dispatch(event);
}
drain();
}

private void dispatch(BufferedEvent event) {
Expand All @@ -300,6 +290,26 @@ private void dispatch(BufferedEvent event) {
}
}

private void drain() {
while (true) {
BufferedEvent event;
synchronized (lock) {
if (draining || bufferedEvents.isEmpty()) {
return;
}
draining = true;
event = bufferedEvents.remove(0);
}
try {
dispatch(event);
} finally {
synchronized (lock) {
draining = false;
}
}
}
}

private record BufferedEvent(EventType type, String content, Throwable error) {

private static BufferedEvent content(String content) {
Expand Down