Skip to content

Commit bb509a7

Browse files
simple_status example
1 parent 347c9c3 commit bb509a7

4 files changed

Lines changed: 304 additions & 2 deletions

File tree

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,6 @@ brew install clang-format
451451
#### Memory Checks
452452
Run valgrind on various examples or tests to check for memory leaks and other issues.
453453
```bash
454-
valgrind --leak-check=full ./build-debug/bin/BridgeRobot
455-
valgrind --leak-check=full ./build-debug/bin/BridgeHuman
456454
valgrind --leak-check=full ./build-debug/bin/livekit_integration_tests
457455
valgrind --leak-check=full ./build-debug/bin/livekit_stress_tests
458456
```

examples/CMakeLists.txt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ set(EXAMPLES_ALL
4343
SimpleJoystickSender
4444
SimpleJoystickReceiver
4545
SimpleDataStream
46+
SimpleStatusProducer
47+
SimpleStatusConsumer
4648
LoggingLevelsBasicUsage
4749
LoggingLevelsCustomSinks
4850
BridgeRobot
@@ -242,6 +244,32 @@ add_custom_command(
242244
$<TARGET_FILE_DIR:SimpleDataStream>/data
243245
)
244246

247+
# --- simple_status (producer + consumer text stream on producer-status) ---
248+
249+
add_executable(SimpleStatusProducer
250+
simple_status/producer.cpp
251+
)
252+
253+
target_include_directories(SimpleStatusProducer PRIVATE ${EXAMPLES_PRIVATE_INCLUDE_DIRS})
254+
255+
target_link_libraries(SimpleStatusProducer
256+
PRIVATE
257+
livekit
258+
spdlog::spdlog
259+
)
260+
261+
add_executable(SimpleStatusConsumer
262+
simple_status/consumer.cpp
263+
)
264+
265+
target_include_directories(SimpleStatusConsumer PRIVATE ${EXAMPLES_PRIVATE_INCLUDE_DIRS})
266+
267+
target_link_libraries(SimpleStatusConsumer
268+
PRIVATE
269+
livekit
270+
spdlog::spdlog
271+
)
272+
245273
# --- bridge_human_robot examples (robot + human; use livekit_bridge and SDL3) ---
246274

247275
add_executable(BridgeRobot
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/// Consumer participant: prints each incoming message on the `producer-status`
18+
/// text stream topic. Use a token whose identity is `consumer`.
19+
20+
#include "livekit/livekit.h"
21+
#include "livekit/lk_log.h"
22+
23+
#include <atomic>
24+
#include <chrono>
25+
#include <csignal>
26+
#include <cstdlib>
27+
#include <memory>
28+
#include <string>
29+
#include <thread>
30+
31+
using namespace livekit;
32+
33+
namespace {
34+
35+
constexpr const char *kTopic = "producer-status";
36+
37+
std::atomic<bool> g_running{true};
38+
39+
void handleSignal(int) { g_running.store(false); }
40+
41+
std::string getenvOrEmpty(const char *name) {
42+
const char *v = std::getenv(name);
43+
return v ? std::string(v) : std::string{};
44+
}
45+
46+
void handleStatusMessage(std::shared_ptr<TextStreamReader> reader,
47+
const std::string &participant_identity) {
48+
try {
49+
const std::string text = reader->readAll();
50+
LK_LOG_INFO("[from {}] {}", participant_identity, text);
51+
} catch (const std::exception &e) {
52+
LK_LOG_ERROR("Error reading text stream from {}: {}", participant_identity,
53+
e.what());
54+
}
55+
}
56+
57+
} // namespace
58+
59+
int main(int argc, char *argv[]) {
60+
std::string url = getenvOrEmpty("LIVEKIT_URL");
61+
std::string token = getenvOrEmpty("LIVEKIT_TOKEN");
62+
63+
if (argc >= 3) {
64+
url = argv[1];
65+
token = argv[2];
66+
}
67+
68+
if (url.empty() || token.empty()) {
69+
LK_LOG_ERROR("LIVEKIT_URL and LIVEKIT_TOKEN (or <ws-url> <token>) are "
70+
"required");
71+
return 1;
72+
}
73+
74+
std::signal(SIGINT, handleSignal);
75+
#ifdef SIGTERM
76+
std::signal(SIGTERM, handleSignal);
77+
#endif
78+
79+
livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole);
80+
81+
auto room = std::make_unique<Room>();
82+
RoomOptions options;
83+
options.auto_subscribe = true;
84+
options.dynacast = false;
85+
86+
if (!room->Connect(url, token, options)) {
87+
LK_LOG_ERROR("Failed to connect to room");
88+
livekit::shutdown();
89+
return 1;
90+
}
91+
92+
LocalParticipant *lp = room->localParticipant();
93+
if (!lp) {
94+
LK_LOG_ERROR("No local participant after connect");
95+
room->setDelegate(nullptr);
96+
room.reset();
97+
livekit::shutdown();
98+
return 1;
99+
}
100+
101+
LK_LOG_INFO("consumer connected as identity='{}' room='{}'",
102+
lp->identity(), room->room_info().name);
103+
104+
room->registerTextStreamHandler(
105+
kTopic, [](std::shared_ptr<TextStreamReader> reader,
106+
const std::string &participant_identity) {
107+
std::thread t(handleStatusMessage, std::move(reader),
108+
participant_identity);
109+
t.detach();
110+
});
111+
112+
LK_LOG_INFO("listening on topic '{}'; Ctrl-C to exit", kTopic);
113+
114+
while (g_running.load()) {
115+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
116+
}
117+
118+
LK_LOG_INFO("shutting down");
119+
room->unregisterTextStreamHandler(kTopic);
120+
room->setDelegate(nullptr);
121+
room.reset();
122+
livekit::shutdown();
123+
return 0;
124+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/// Producer participant: publishes periodic status on the `producer-status`
18+
/// text stream topic (4 Hz). Use a token whose identity is `producer`.
19+
20+
#include "livekit/livekit.h"
21+
#include "livekit/lk_log.h"
22+
23+
#include <atomic>
24+
#include <chrono>
25+
#include <csignal>
26+
#include <cstdlib>
27+
#include <iomanip>
28+
#include <map>
29+
#include <random>
30+
#include <sstream>
31+
#include <string>
32+
#include <thread>
33+
#include <vector>
34+
35+
using namespace livekit;
36+
37+
namespace {
38+
39+
constexpr const char *kTopic = "producer-status";
40+
41+
std::atomic<bool> g_running{true};
42+
43+
void handleSignal(int) { g_running.store(false); }
44+
45+
std::string getenvOrEmpty(const char *name) {
46+
const char *v = std::getenv(name);
47+
return v ? std::string(v) : std::string{};
48+
}
49+
50+
std::string randomHexId(std::size_t nbytes = 16) {
51+
static thread_local std::mt19937_64 rng{std::random_device{}()};
52+
std::ostringstream oss;
53+
for (std::size_t i = 0; i < nbytes; ++i) {
54+
std::uint8_t b = static_cast<std::uint8_t>(rng() & 0xFF);
55+
const char *hex = "0123456789abcdef";
56+
oss << hex[(b >> 4) & 0xF] << hex[b & 0xF];
57+
}
58+
return oss.str();
59+
}
60+
61+
} // namespace
62+
63+
int main(int argc, char *argv[]) {
64+
std::string url = getenvOrEmpty("LIVEKIT_URL");
65+
std::string token = getenvOrEmpty("LIVEKIT_TOKEN");
66+
67+
if (argc >= 3) {
68+
url = argv[1];
69+
token = argv[2];
70+
}
71+
72+
if (url.empty() || token.empty()) {
73+
LK_LOG_ERROR("LIVEKIT_URL and LIVEKIT_TOKEN (or <ws-url> <token>) are "
74+
"required");
75+
return 1;
76+
}
77+
78+
std::signal(SIGINT, handleSignal);
79+
#ifdef SIGTERM
80+
std::signal(SIGTERM, handleSignal);
81+
#endif
82+
83+
livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole);
84+
85+
auto room = std::make_unique<Room>();
86+
RoomOptions options;
87+
options.auto_subscribe = true;
88+
options.dynacast = false;
89+
90+
if (!room->Connect(url, token, options)) {
91+
LK_LOG_ERROR("Failed to connect to room");
92+
livekit::shutdown();
93+
return 1;
94+
}
95+
96+
LocalParticipant *lp = room->localParticipant();
97+
if (!lp) {
98+
LK_LOG_ERROR("No local participant after connect");
99+
room->setDelegate(nullptr);
100+
room.reset();
101+
livekit::shutdown();
102+
return 1;
103+
}
104+
105+
LK_LOG_INFO("producer connected as identity='{}' room='{}'",
106+
lp->identity(), room->room_info().name);
107+
108+
const std::string sender_id =
109+
!lp->identity().empty() ? lp->identity() : std::string("producer");
110+
111+
using clock = std::chrono::steady_clock;
112+
const auto start = clock::now();
113+
const auto period = std::chrono::milliseconds(250);
114+
auto next_deadline = clock::now();
115+
std::uint64_t count = 0;
116+
117+
while (g_running.load()) {
118+
const auto now = clock::now();
119+
const double elapsed_sec =
120+
std::chrono::duration<double>(now - start).count();
121+
122+
std::ostringstream body;
123+
body << std::fixed << std::setprecision(2) << elapsed_sec;
124+
const std::string text = std::string("[time-since-start]: ") + body.str() +
125+
" count: " + std::to_string(count);
126+
127+
try {
128+
const std::string stream_id = randomHexId();
129+
std::map<std::string, std::string> attrs;
130+
const std::vector<std::string> dest;
131+
const std::string reply_to_id;
132+
TextStreamWriter writer(*lp, kTopic, attrs, stream_id, text.size(),
133+
reply_to_id, dest, sender_id);
134+
writer.write(text);
135+
writer.close();
136+
} catch (const std::exception &e) {
137+
LK_LOG_ERROR("Failed to send status: {}", e.what());
138+
}
139+
140+
LK_LOG_DEBUG("sent: {}", text);
141+
++count;
142+
143+
next_deadline += period;
144+
std::this_thread::sleep_until(next_deadline);
145+
}
146+
147+
LK_LOG_INFO("shutting down");
148+
room->setDelegate(nullptr);
149+
room.reset();
150+
livekit::shutdown();
151+
return 0;
152+
}

0 commit comments

Comments
 (0)