@@ -44,6 +44,93 @@ inline void logAndThrow(const std::string &error_msg) {
4444 throw std::runtime_error (error_msg);
4545}
4646
47+ std::optional<FfiClient::AsyncId> ExtractAsyncId (const proto::FfiEvent &event) {
48+ using E = proto::FfiEvent;
49+ switch (event.message_case ()) {
50+ case E::kConnect :
51+ return event.connect ().async_id ();
52+ case E::kDisconnect :
53+ return event.disconnect ().async_id ();
54+ case E::kDispose :
55+ return event.dispose ().async_id ();
56+ case E::kPublishTrack :
57+ return event.publish_track ().async_id ();
58+ case E::kUnpublishTrack :
59+ return event.unpublish_track ().async_id ();
60+ case E::kPublishData :
61+ return event.publish_data ().async_id ();
62+ case E::kPublishTranscription :
63+ return event.publish_transcription ().async_id ();
64+ case E::kCaptureAudioFrame :
65+ return event.capture_audio_frame ().async_id ();
66+ case E::kSetLocalMetadata :
67+ return event.set_local_metadata ().async_id ();
68+ case E::kSetLocalName :
69+ return event.set_local_name ().async_id ();
70+ case E::kSetLocalAttributes :
71+ return event.set_local_attributes ().async_id ();
72+ case E::kGetStats :
73+ return event.get_stats ().async_id ();
74+ case E::kGetSessionStats :
75+ return event.get_session_stats ().async_id ();
76+ case E::kPublishSipDtmf :
77+ return event.publish_sip_dtmf ().async_id ();
78+ case E::kChatMessage :
79+ return event.chat_message ().async_id ();
80+ case E::kPerformRpc :
81+ return event.perform_rpc ().async_id ();
82+
83+ // low-level data stream callbacks
84+ case E::kSendStreamHeader :
85+ return event.send_stream_header ().async_id ();
86+ case E::kSendStreamChunk :
87+ return event.send_stream_chunk ().async_id ();
88+ case E::kSendStreamTrailer :
89+ return event.send_stream_trailer ().async_id ();
90+
91+ // high-level
92+ case E::kByteStreamReaderReadAll :
93+ return event.byte_stream_reader_read_all ().async_id ();
94+ case E::kByteStreamReaderWriteToFile :
95+ return event.byte_stream_reader_write_to_file ().async_id ();
96+ case E::kByteStreamOpen :
97+ return event.byte_stream_open ().async_id ();
98+ case E::kByteStreamWriterWrite :
99+ return event.byte_stream_writer_write ().async_id ();
100+ case E::kByteStreamWriterClose :
101+ return event.byte_stream_writer_close ().async_id ();
102+ case E::kSendFile :
103+ return event.send_file ().async_id ();
104+
105+ case E::kTextStreamReaderReadAll :
106+ return event.text_stream_reader_read_all ().async_id ();
107+ case E::kTextStreamOpen :
108+ return event.text_stream_open ().async_id ();
109+ case E::kTextStreamWriterWrite :
110+ return event.text_stream_writer_write ().async_id ();
111+ case E::kTextStreamWriterClose :
112+ return event.text_stream_writer_close ().async_id ();
113+ case E::kSendText :
114+ return event.send_text ().async_id ();
115+ case E::kSendBytes :
116+ return event.send_bytes ().async_id ();
117+
118+ // NOT async completion:
119+ case E::kRoomEvent :
120+ case E::kTrackEvent :
121+ case E::kVideoStreamEvent :
122+ case E::kAudioStreamEvent :
123+ case E::kByteStreamReaderEvent :
124+ case E::kTextStreamReaderEvent :
125+ case E::kRpcMethodInvocation :
126+ case E::kLogs :
127+ case E::kPanic :
128+ case E::MESSAGE_NOT_SET:
129+ default :
130+ return std::nullopt ;
131+ }
132+ }
133+
47134} // namespace
48135
49136FfiClient::~FfiClient () {
@@ -77,7 +164,7 @@ bool FfiClient::isInitialized() const noexcept {
77164FfiClient::ListenerId
78165FfiClient::AddListener (const FfiClient::Listener &listener) {
79166 std::lock_guard<std::mutex> guard (lock_);
80- FfiClient::ListenerId id = nextListenerId ++;
167+ FfiClient::ListenerId id = next_listener_id ++;
81168 listeners_[id] = listener;
82169 return id;
83170}
@@ -117,34 +204,33 @@ FfiClient::sendRequest(const proto::FfiRequest &request) const {
117204}
118205
119206void FfiClient::PushEvent (const proto::FfiEvent &event) const {
120- std::vector<std::unique_ptr<PendingBase>> to_complete;
207+ std::unique_ptr<PendingBase> to_complete;
208+ std::vector<Listener> listeners_copy;
121209 {
122210 std::lock_guard<std::mutex> guard (lock_);
123- for (auto it = pending_.begin (); it != pending_.end ();) {
124- if ((*it)->matches (event)) {
125- to_complete.push_back (std::move (*it));
126- it = pending_.erase (it);
127- } else {
128- ++it;
211+
212+ // Complete pending future if this event is a callback with async_id
213+ if (auto async_id = ExtractAsyncId (event)) {
214+ auto it = pending_by_id_.find (*async_id);
215+ if (it != pending_by_id_.end () && it->second &&
216+ it->second ->matches (event)) {
217+ to_complete = std::move (it->second );
218+ pending_by_id_.erase (it);
129219 }
130220 }
131- }
132-
133- // Run handlers outside lock
134- for (auto &p : to_complete) {
135- p->complete (event);
136- }
137221
138- // Notify listeners. Note, we copy the listeners here to avoid calling into
139- // the listeners under the lock, which could potentially cause deadlock.
140- std::vector<Listener> listeners_copy;
141- {
142- std::lock_guard<std::mutex> guard (lock_);
222+ // Snapshot listeners
143223 listeners_copy.reserve (listeners_.size ());
144- for (auto &[_, listener] : listeners_) {
145- listeners_copy.push_back (listener );
224+ for (const auto &kv : listeners_) {
225+ listeners_copy.push_back (kv. second );
146226 }
147227 }
228+ // Run handler outside lock
229+ if (to_complete) {
230+ to_complete->complete (event);
231+ }
232+
233+ // Notify listeners outside lock
148234 for (auto &listener : listeners_copy) {
149235 listener (event);
150236 }
@@ -158,22 +244,19 @@ void LivekitFfiCallback(const uint8_t *buf, size_t len) {
158244}
159245
160246FfiClient::AsyncId FfiClient::generateAsyncId () {
161- return nextAsyncId_ .fetch_add (1 , std::memory_order_relaxed);
247+ return next_async_id_ .fetch_add (1 , std::memory_order_relaxed);
162248}
163249
164250bool FfiClient::cancelPendingByAsyncId (AsyncId async_id) {
165251 std::unique_ptr<PendingBase> to_cancel;
166252 {
167253 std::lock_guard<std::mutex> guard (lock_);
168- for (auto it = pending_.begin (); it != pending_.end (); ++it) {
169- if ((*it)->async_id == async_id) {
170- to_cancel = std::move (*it);
171- pending_.erase (it);
172- break ;
173- }
254+ auto it = pending_by_id_.find (async_id);
255+ if (it != pending_by_id_.end ()) {
256+ to_cancel = std::move (it->second );
257+ pending_by_id_.erase (it);
174258 }
175259 }
176-
177260 if (to_cancel) {
178261 to_cancel->cancel ();
179262 return true ;
@@ -192,7 +275,7 @@ std::future<T> FfiClient::registerAsync(
192275 pending->handler = std::move (handler);
193276 {
194277 std::lock_guard<std::mutex> guard (lock_);
195- pending_. push_back ( std::move (pending));
278+ pending_by_id_. emplace (async_id, std::move (pending));
196279 }
197280 return fut;
198281}
0 commit comments