Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 132 additions & 7 deletions messaged/messaged.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,53 @@
#include <csignal>
#include <atomic>
#include <thread>
#include <unordered_map>
#include <string>
#include <utility>
#include <chrono>
#include <cstdint>

constexpr char DAEMON_NAME[] = "messaged";

int main( int argc, char** argv );
void signal_handler( int signo );
void runbroker();
void dothread_runproxy( void* pxsub, void* pxpub );
void dothread_runlvc( void* frontend, void* backend );

std::atomic<bool> is_broker_running(false);

/***** last-value-cache helpers ***********************************************/
namespace {
// last published message per topic: topic -> { topic_frame, payload_frame }.
// Touched only by the single LVC thread, so no locking is required.
std::unordered_map<std::string, std::pair<std::string,std::string>> g_lvc;

// receive one frame from sock as a std::string (empty on error)
std::string recv_frame( void* sock ) {
zmq_msg_t msg;
zmq_msg_init( &msg );
int n = zmq_msg_recv( &msg, sock, 0 );
std::string out;
if ( n >= 0 ) out.assign( static_cast<char*>( zmq_msg_data(&msg) ), zmq_msg_size(&msg) );
zmq_msg_close( &msg );
return out;
}

// true if another frame of the current multipart message is pending
bool has_more( void* sock ) {
int more = 0;
size_t sz = sizeof(more);
zmq_getsockopt( sock, ZMQ_RCVMORE, &more, &sz );
return more != 0;
}

// send one frame to sock
void send_frame( void* sock, const std::string &s, bool more ) {
zmq_send( sock, s.data(), s.size(), more ? ZMQ_SNDMORE : 0 );
}
}
/***** last-value-cache helpers ***********************************************/

/***** main *******************************************************************/
/**
* @brief the main function
Expand Down Expand Up @@ -97,15 +134,23 @@ void runbroker() {
zmq_setsockopt( xpub_socket, ZMQ_RCVHWM, &zero, sizeof(zero) );
zmq_setsockopt( xpub_socket, ZMQ_LINGER, &zero, sizeof(zero) );

// XPUB_VERBOSE: deliver EVERY subscribe message to the broker (not just the
// first per topic). Required so the LVC can replay the last value to each new
// subscriber, including a restarted daemon re-subscribing to a topic that
// another daemon is already subscribed to.
//
int one = 1;
zmq_setsockopt( xpub_socket, ZMQ_XPUB_VERBOSE, &one, sizeof(one) );

// bind the sockets
//
zmq_bind(xsub_socket, "tcp://127.0.0.1:5555");
zmq_bind(xpub_socket, "tcp://127.0.0.1:5556");

// start the proxy in a separate thread because zmq_proxy blocks
// start the last-value-cache broker in a separate thread (it blocks)
//
logwrite( "messaged::runbroker", "starting message broker" );
std::thread proxy_thread(dothread_runproxy, xsub_socket, xpub_socket);
logwrite( "messaged::runbroker", "starting last-value-cache message broker" );
std::thread proxy_thread(dothread_runlvc, xsub_socket, xpub_socket);

{
BoolState broker_running( is_broker_running );
Expand All @@ -127,8 +172,88 @@ void runbroker() {
zmq_ctx_destroy(context);
}

void dothread_runproxy( void* pxsub, void* pxpub ) {
if (zmq_proxy(pxsub, pxpub, nullptr) == -1) {
logwrite( "messaged::dothread_runproxy", "ERROR proxy failed: "+std::string(zmq_strerror(zmq_errno())) );
/***** dothread_runlvc ********************************************************/
/**
* @brief last-value-cache broker loop (replaces zmq_proxy)
* @details Forwards publisher traffic (frontend XSUB) to subscribers
* (backend XPUB) exactly like a plain proxy, but additionally:
* (1) caches the last message seen on each topic, and
* (2) replays that cached message to any newly subscribing
* socket, so a restarted daemon immediately receives the
* current state of every provider without a forced snapshot.
* A subscribe-all is re-asserted on the frontend on a fixed tick so
* every (re)connected publisher keeps forwarding to the broker,
* without relying on XSUB replay-on-attach semantics.
* @param[in] frontend XSUB socket (publishers connect here, port 5555)
* @param[in] backend XPUB socket (subscribers connect here, port 5556)
*
*/
void dothread_runlvc( void* frontend, void* backend ) {
zmq_pollitem_t items[2] = {
{ frontend, 0, ZMQ_POLLIN, 0 }, // publisher traffic
{ backend, 0, ZMQ_POLLIN, 0 }, // subscription traffic
};

const uint8_t subscribe_all = 0x01; // 0x01 = subscribe, empty prefix = all topics
const auto resub_period = std::chrono::seconds(2);
auto last_resub = std::chrono::steady_clock::now() - resub_period; // force immediate first send

logwrite( "messaged::dothread_runlvc", "last-value-cache broker running" );

while ( is_broker_running.load() ) {
if ( zmq_poll( items, 2, 200 ) < 0 ) { // 200ms tick so we can observe the stop flag
if ( zmq_errno() == EINTR ) continue;
logwrite( "messaged::dothread_runlvc", "ERROR poll failed: "+std::string(zmq_strerror(zmq_errno())) );
break;
}

// ---- publisher -> cache last value per topic, then fan out to subscribers
//
if ( items[0].revents & ZMQ_POLLIN ) {
std::string topic = recv_frame( frontend );
std::string payload = has_more( frontend ) ? recv_frame( frontend ) : std::string();
while ( has_more( frontend ) ) recv_frame( frontend ); // drain any unexpected extra frames

// Cache normal telemetry topics; skip control topics (e.g. "_snapshot")
// so a new subscriber is never handed a replayed stale request.
//
if ( !topic.empty() && topic.front() != '_' ) {
g_lvc[topic] = { topic, payload };
}

send_frame( backend, topic, true );
send_frame( backend, payload, false );
}

// ---- new subscription -> replay that topic's last value to the subscriber
//
if ( items[1].revents & ZMQ_POLLIN ) {
std::string sub = recv_frame( backend ); // [0x01|0x00][topic...]
while ( has_more( backend ) ) recv_frame( backend ); // subscriptions are single-frame; drain defensively

if ( !sub.empty() && static_cast<uint8_t>( sub.front() ) == 0x01 ) { // 0x01 == subscribe
auto it = g_lvc.find( sub.substr(1) );
if ( it != g_lvc.end() ) {
logwrite( "messaged::dothread_runlvc",
"[DEBUG] replaying cached topic "+it->second.first
+" ("+std::to_string( it->second.second.size() )+" bytes) to new subscriber" );
send_frame( backend, it->second.first, true );
send_frame( backend, it->second.second, false );
}
}
// unsubscribe (0x00) is ignored: the cache persists so future subscribers
// still receive the last value.
}

// ---- keep every (re)connected publisher forwarding to the broker
//
auto now = std::chrono::steady_clock::now();
if ( now - last_resub >= resub_period ) {
zmq_send( frontend, &subscribe_all, 1, 0 );
last_resub = now;
}
}

logwrite( "messaged::dothread_runlvc", "last-value-cache broker stopped" );
}
/***** dothread_runlvc ********************************************************/
Loading