|
25 | 25 | namespace livekit |
26 | 26 | { |
27 | 27 |
|
28 | | -void Room::Connect(const std::string& url, const std::string& token) |
29 | | -{ |
30 | | - std::lock_guard<std::mutex> guard(lock_); |
31 | | - if (connected_) { |
32 | | - throw std::runtime_error("already connected"); |
33 | | - } |
| 28 | +using proto::FfiRequest; |
| 29 | +using proto::FfiResponse; |
| 30 | +using proto::ConnectRequest; |
| 31 | +using proto::RoomOptions; |
| 32 | +using proto::ConnectCallback; |
| 33 | +using proto::FfiEvent; |
34 | 34 |
|
35 | | - connected_ = true; |
| 35 | +void Room::Connect(const std::string& url, const std::string& token) { |
| 36 | + // Register listener first (outside Room lock to avoid lock inversion) |
| 37 | + auto listenerId = FfiClient::getInstance().AddListener( |
| 38 | + std::bind(&Room::OnEvent, this, std::placeholders::_1)); |
36 | 39 |
|
37 | | - RoomOptions *options = new RoomOptions; |
38 | | - options->set_auto_subscribe(true); |
39 | | - |
40 | | - ConnectRequest *connectRequest = new ConnectRequest; |
41 | | - connectRequest->set_url(url); |
42 | | - connectRequest->set_token(token); |
43 | | - connectRequest->set_allocated_options(options); |
| 40 | + // Build request without heap allocs |
| 41 | + livekit::proto::FfiRequest req; |
| 42 | + auto* connect = req.mutable_connect(); |
| 43 | + connect->set_url(url); |
| 44 | + connect->set_token(token); |
| 45 | + connect->mutable_options()->set_auto_subscribe(true); |
44 | 46 |
|
45 | | - FFIRequest request; |
46 | | - request.set_allocated_connect(connectRequest); |
47 | | - |
48 | | - // TODO Free: |
49 | | - FfiClient::getInstance().AddListener(std::bind(&Room::OnEvent, this, std::placeholders::_1)); |
50 | | - |
51 | | - FFIResponse response = FfiClient::getInstance().SendRequest(request); |
52 | | - FFIAsyncId asyncId = response.connect().async_id(); |
| 47 | + // Mark “connecting” under lock, but DO NOT keep the lock across SendRequest |
| 48 | + { |
| 49 | + std::lock_guard<std::mutex> g(lock_); |
| 50 | + if (connected_) { |
| 51 | + FfiClient::getInstance().RemoveListener(listenerId); |
| 52 | + throw std::runtime_error("already connected"); |
| 53 | + } |
| 54 | + connectAsyncId_ = listenerId; |
| 55 | + } |
53 | 56 |
|
54 | | - connectAsyncId_ = asyncId.id(); |
| 57 | + // Call into FFI with no Room lock held (avoid re-entrancy deadlock) |
| 58 | + livekit::proto::FfiResponse resp = FfiClient::getInstance().SendRequest(req); |
| 59 | + // Store async id under lock |
| 60 | + { |
| 61 | + std::lock_guard<std::mutex> g(lock_); |
| 62 | + connectAsyncId_ = resp.connect().async_id(); |
| 63 | + } |
55 | 64 | } |
56 | 65 |
|
57 | | -void Room::OnEvent(const FFIEvent& event) |
58 | | -{ |
| 66 | +void Room::OnEvent(const FfiEvent& event) { |
| 67 | + // TODO, it is not a good idea to lock all the callbacks, improve it. |
59 | 68 | std::lock_guard<std::mutex> guard(lock_); |
60 | | - if (!connected_) { |
| 69 | + switch (event.message_case()) { |
| 70 | + case FfiEvent::kConnect: |
| 71 | + OnConnect(event.connect()); |
| 72 | + break; |
| 73 | + |
| 74 | + // TODO: Handle other FfiEvent types here (e.g. room_event, track_event, etc.) |
| 75 | + default: |
| 76 | + break; |
| 77 | + } |
| 78 | +} |
| 79 | + |
| 80 | +void Room::OnConnect(const ConnectCallback& cb) { |
| 81 | + // Match the async_id with the pending connectAsyncId_ |
| 82 | + if (cb.async_id() != connectAsyncId_) { |
61 | 83 | return; |
62 | 84 | } |
63 | | - |
64 | | - if (event.has_connect()) { |
65 | | - ConnectCallback connectCallback = event.connect(); |
66 | | - if (connectCallback.async_id().id() != connectAsyncId_) { |
67 | | - return; |
68 | | - } |
69 | 85 |
|
70 | | - std::cout << "Received ConnectCallback" << std::endl; |
| 86 | + std::cout << "Received ConnectCallback" << std::endl; |
71 | 87 |
|
72 | | - if (!connectCallback.has_error()) { |
73 | | - handle_ = FfiHandle(connectCallback.room().handle().id()); |
| 88 | + if (cb.message_case() == ConnectCallback::kError) { |
| 89 | + std::cerr << "Failed to connect to room: " << cb.error() << std::endl; |
| 90 | + connected_ = false; |
| 91 | + return; |
| 92 | + } |
74 | 93 |
|
75 | | - std::cout << "Connected to room" << std::endl; |
76 | | - std::cout << "Room SID: " << connectCallback.room().sid() << std::endl; |
77 | | - } else { |
78 | | - std::cerr << "Failed to connect to room: " << connectCallback.error() << std::endl; |
79 | | - } |
| 94 | + // Success path |
| 95 | + const auto& result = cb.result(); |
| 96 | + const auto& owned_room = result.room(); |
| 97 | + // OwnedRoom { FfiOwnedHandle handle = 1; RoomInfo info = 2; } |
| 98 | + handle_ = FfiHandle(static_cast<uintptr_t>(owned_room.handle().id())); |
| 99 | + if (owned_room.info().has_sid()) { |
| 100 | + std::cout << "Room SID: " << owned_room.info().sid() << std::endl; |
80 | 101 | } |
| 102 | + |
| 103 | + connected_ = true; |
| 104 | + std::cout << "Connected to room" << std::endl; |
81 | 105 | } |
82 | 106 |
|
83 | 107 | } |
0 commit comments