From 5804d82d1b9578bf28385f2ef7c14432d51e6ded Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=9Ean=20G=C3=BCne=C5=9F?= <180301198+sgunes-wirepas@users.noreply.github.com> Date: Wed, 29 Apr 2026 17:23:04 +0300 Subject: [PATCH] Send dbus signals from the main thread Callbacks set via WPC_register_for_data and WPC_register_for_stack_status are called from c-mesh-api's dispatch thread. Earlier, these callbacks were sending the dbus signals as well. This presented potential problems because sd_bus is not thread safe, and in this case the main thread and the dispatch thread could access the same dbus connection (m_bus). The callbacks now only pass the data to a ring buffer (event_queue.h) which the main thread reads from. The event queue uses an eventfd to signal when a new event was added to it. A main loop is now added as described in sd_bus_get_fd(3) manual. The loop polls the dbus and event queue file descriptors, and sends the dbus signals if necessary. Additionally, WPC_close() is moved to be before other close functions in order to ensure dispatch thread exits before we close the event queue. --- sink_service/CMakeLists.txt | 3 +- sink_service/source/config.c | 15 ++ sink_service/source/config.h | 11 + sink_service/source/data.c | 98 ++++++--- sink_service/source/data.h | 9 + sink_service/source/event_queue.c | 323 ++++++++++++++++++++++++++++++ sink_service/source/event_queue.h | 92 +++++++++ sink_service/source/main.c | 163 ++++++++++++--- 8 files changed, 655 insertions(+), 59 deletions(-) create mode 100644 sink_service/source/event_queue.c create mode 100644 sink_service/source/event_queue.h diff --git a/sink_service/CMakeLists.txt b/sink_service/CMakeLists.txt index 8148dbe3..dbb986a3 100644 --- a/sink_service/CMakeLists.txt +++ b/sink_service/CMakeLists.txt @@ -30,6 +30,7 @@ add_executable(${CMAKE_PROJECT_NAME} source/main.c source/config.c source/data.c + source/event_queue.c source/otap.c ) @@ -38,5 +39,5 @@ target_compile_definitions(${CMAKE_PROJECT_NAME} PRIVATE C_MESH_API_COMMIT_HASH="${C_MESH_API_COMMIT_HASH}" ) -target_link_libraries(${CMAKE_PROJECT_NAME} wpc PkgConfig::systemd) +target_link_libraries(${CMAKE_PROJECT_NAME} wpc PkgConfig::systemd pthread) diff --git a/sink_service/source/config.c b/sink_service/source/config.c index fc4560a5..d17eab42 100644 --- a/sink_service/source/config.c +++ b/sink_service/source/config.c @@ -11,6 +11,7 @@ #include "config.h" #include "config_macros.h" +#include "event_queue.h" #include "wpc.h" #define LOG_MODULE_NAME "Config" @@ -1043,6 +1044,19 @@ static bool initialize_unmodifiable_variables() } static void on_stack_boot_status(uint8_t status) +{ + const event_t event = { + .type = EVENT_TYPE_STACK_STATUS, + .event.stack_status.status = status + }; + + if (!EventQueue_Push(&event)) + { + LOGE("Failed to enqueue stack status event\n"); + } +} + +void Config_HandleStackStatusChange(const uint8_t status) { /* After a reboot, read again the variable as it can be because * of an otap and variables may change @@ -1108,3 +1122,4 @@ void Config_Close() sd_bus_slot_unref(m_slot); } } + diff --git a/sink_service/source/config.h b/sink_service/source/config.h index 45129723..860ecdfb 100644 --- a/sink_service/source/config.h +++ b/sink_service/source/config.h @@ -8,6 +8,7 @@ #define SINK_MANAGER_SOURCE_CONFIG_H_ #include +#include /** * \brief Initialize the config module @@ -22,4 +23,14 @@ int Config_Init(sd_bus * bus, char * object, char * interface); void Config_Close(); +/** + * \brief Handle a stack status change + * Re-reads some node attributes and sends StackStarted or + * StackStopped dbus signal. + * \param status + * 0 = stack started, otherwise stack stopped + */ +void Config_HandleStackStatusChange(const uint8_t status); + #endif /* SINK_MANAGER_SOURCE_CONFIG_H_ */ + diff --git a/sink_service/source/data.c b/sink_service/source/data.c index e94233ee..e217c493 100644 --- a/sink_service/source/data.c +++ b/sink_service/source/data.c @@ -7,11 +7,12 @@ #include #include #include -#include +#include #include #include "data.h" #include "wpc.h" +#include "event_queue.h" #define LOG_MODULE_NAME "Data" #define MAX_LOG_LEVEL INFO_LOG_LEVEL @@ -157,9 +158,6 @@ static bool onDataReceived(const uint8_t * bytes, uint8_t hop_count, unsigned long long timestamp_ms) { - __attribute__((cleanup(sd_bus_message_unrefp))) sd_bus_message *m = NULL; - int r; - LOGD("%llu -> Data received on EP %d of len %d from 0x%x to 0x%x\n", timestamp_ms, dst_ep, @@ -167,44 +165,35 @@ static bool onDataReceived(const uint8_t * bytes, src_addr, dst_addr); - /* Create a new signal to be generated on Dbus */ - r = sd_bus_message_new_signal(m_bus, &m, m_object, m_interface, "MessageReceived"); - if (r < 0) + if (num_bytes > EVENT_DATA_MAX_PAYLOAD) { - LOGE("Cannot create signal error=%s\n", strerror(-r)); + LOGE("Payload too large: %zu > %d\n", num_bytes, EVENT_DATA_MAX_PAYLOAD); return false; } - /* Load all parameters */ - // clang-format off - r = sd_bus_message_append(m, - "tuuyyuyy", - timestamp_ms, - src_addr, - dst_addr, - src_ep, - dst_ep, - travel_time, - qos, - hop_count); - // clang-format on - if (r < 0) + event_t event = { + .type = EVENT_TYPE_DATA_RECEIVED, + .event.data_received = { + .timestamp_ms = timestamp_ms, + .num_bytes = num_bytes, + .src_addr = src_addr, + .dst_addr = dst_addr, + .travel_time = travel_time, + .qos = (uint8_t) qos, + .src_ep = src_ep, + .dst_ep = dst_ep, + .hop_count = hop_count, + }, + }; + memcpy(event.event.data_received.payload, bytes, num_bytes); + + if (!EventQueue_Push(&event)) { - LOGE("Cannot append info error=%s\n", strerror(-r)); + LOGE("Failed to enqueue data received event\n"); return false; } - r = sd_bus_message_append_array(m, 'y', bytes, num_bytes); - if (r < 0) - { - LOGE("Cannot append array error=%s\n", strerror(-r)); - return false; - } - - /* Send the signal on bus */ - sd_bus_send(m_bus, m, NULL); - return true; } @@ -279,3 +268,46 @@ void Data_Close() sd_bus_slot_unref(m_slot); } } + +void Data_SendDataReceivedSignal(const event_data_received_t *const data) +{ + __attribute__((cleanup(sd_bus_message_unrefp))) sd_bus_message *msg = NULL; + + int r = sd_bus_message_new_signal(m_bus, &msg, m_object, m_interface, "MessageReceived"); + if (r < 0) + { + LOGE("Cannot create dbus signal:%s\n", strerror(-r)); + return; + } + + + r = sd_bus_message_append(msg, + "tuuyyuyy", + data->timestamp_ms, + data->src_addr, + data->dst_addr, + data->src_ep, + data->dst_ep, + data->travel_time, + data->qos, + data->hop_count); + if (r < 0) + { + LOGE("Cannot append to signal: %s\n", strerror(-r)); + return; + } + + r = sd_bus_message_append_array(msg, 'y', data->payload, data->num_bytes); + if (r < 0) + { + LOGE("Cannot append array to signal: %s\n", strerror(-r)); + return; + } + + r = sd_bus_send(m_bus, msg, NULL); + if (r < 0) + { + LOGE("Cannot send signal: %s\n", strerror(-r)); + } +} + diff --git a/sink_service/source/data.h b/sink_service/source/data.h index 73bc4ffd..9a1158a7 100644 --- a/sink_service/source/data.h +++ b/sink_service/source/data.h @@ -8,6 +8,7 @@ #define SINK_MANAGER_SOURCE_DATA_H_ #include +#include "event_queue.h" /** * \brief Initialize the data module @@ -24,4 +25,12 @@ int Data_Init(sd_bus * bus, char * object, char * interface, size_t downlink_lim void Data_Close(); +/** + * \brief Send a MessageReceived dbus signal + * \param data + * The received data event from the event queue + */ +void Data_SendDataReceivedSignal(const event_data_received_t *const data); + #endif /* SINK_MANAGER_SOURCE_DATA_H_ */ + diff --git a/sink_service/source/event_queue.c b/sink_service/source/event_queue.c new file mode 100644 index 00000000..383305d8 --- /dev/null +++ b/sink_service/source/event_queue.c @@ -0,0 +1,323 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "event_queue.h" + +#define LOG_MODULE_NAME "EventQueue" +#define MAX_LOG_LEVEL INFO_LOG_LEVEL +#include "logger.h" + +// Number selected to be larger than the indication queue size in c-mesh-api +#define EVENT_QUEUE_CAPACITY 64 + +// Number of times to retry when the queue is full +static const int PUSH_MAX_RETRIES = 3; + +// How long to wait for space on each retry (nanoseconds) +static const unsigned int PUSH_RETRY_WAIT_NS = 10000000; + +static event_t m_buffer[EVENT_QUEUE_CAPACITY]; +static size_t m_write_idx; +static size_t m_read_idx; + +static pthread_mutex_t m_lock; +static pthread_cond_t m_not_full_cond; + +static int m_event_fd = -1; + +static size_t get_next_write_idx_locked() +{ + return (m_write_idx + 1) % EVENT_QUEUE_CAPACITY; +} + +static bool is_buffer_full_locked() +{ + return m_read_idx == get_next_write_idx_locked(); +} + +/** + * \brief Wait until queue is not full + * \return true on successful wait (including timeout), + * false on error. + */ +static bool wait_for_not_full_cond_locked() +{ + struct timespec ts; + if (0 != clock_gettime(CLOCK_MONOTONIC, &ts)) + { + LOGE("Could not get current time when waiting: %s\n", strerror(errno)); + return false; + } + + static const unsigned long NS_PER_SEC = 1000000000L; + _Static_assert(ULONG_MAX >= (NS_PER_SEC + PUSH_RETRY_WAIT_NS), ""); + + unsigned long nsec = ts.tv_nsec + PUSH_RETRY_WAIT_NS; + ts.tv_sec += nsec / NS_PER_SEC; + ts.tv_nsec = nsec % NS_PER_SEC; + + const int ret = pthread_cond_timedwait(&m_not_full_cond, &m_lock, &ts); + if (ret != 0 && ret != ETIMEDOUT) + { + LOGE("Error when waiting for not full condition: %s\n", strerror(ret)); + return false; + } + return true; +} + +static void add_to_buffer_locked(const event_t *const event) +{ + m_buffer[m_write_idx] = *event; + m_write_idx = get_next_write_idx_locked(); +} + +static void wake_eventfd() +{ + if (0 != eventfd_write(m_event_fd, 1)) + { + LOGE("Failed to write to eventfd: %s\n", strerror(errno)); + } +} + +static bool init_condition_variable() +{ + bool success = false; + int ret; + pthread_condattr_t attr; + + ret = pthread_condattr_init(&attr); + if (ret != 0) + { + LOGE("Failed to init condattr: %s\n", strerror(ret)); + return false; + } + + ret = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + if (ret != 0) + { + LOGE("Could not set clock type for condattr: %s\n", strerror(ret)); + success = false; + goto destroy_condattr_and_return; + } + + ret = pthread_cond_init(&m_not_full_cond, &attr); + if (ret != 0) + { + LOGE("pthread_cond_init failed: %s\n", strerror(ret)); + success = false; + goto destroy_condattr_and_return; + } + + success = true; + +destroy_condattr_and_return: + ret = pthread_condattr_destroy(&attr); + if (ret != 0) + { + LOGE("Could not destroy condattr: %s\n", strerror(ret)); + } + + return success; +} + +static bool init_mutex() +{ + bool success = false; + int ret; + pthread_mutexattr_t attr; + + ret = pthread_mutexattr_init(&attr); + if (ret != 0) + { + LOGE("Failed to init mutexattr: %s\n", strerror(ret)); + return false; + } + + ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); + if (ret != 0) + { + LOGE("Failed to set mutexattr: %s\n", strerror(ret)); + success = false; + goto destroy_mutexattr_and_return; + } + + ret = pthread_mutex_init(&m_lock, &attr); + if (ret != 0) + { + LOGE("pthread_mutex_init failed: %s\n", strerror(ret)); + success = false; + goto destroy_mutexattr_and_return; + } + + success = true; + +destroy_mutexattr_and_return: + ret = pthread_mutexattr_destroy(&attr); + if (ret != 0) + { + LOGE("Could not destroy mutexattr: %s\n", strerror(ret)); + } + + return success; +} + +bool EventQueue_Init(void) +{ + if (!init_mutex()) + { + LOGE("Failed to init mutex\n"); + return false; + } + + if (!init_condition_variable()) + { + LOGE("Failed to init condition variable\n"); + goto error1; + } + + m_event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (m_event_fd < 0) + { + LOGE("Failed to create eventfd: %s\n", strerror(errno)); + goto error2; + } + + m_write_idx = 0; + m_read_idx = 0; + + return true; + +error2: + pthread_cond_destroy(&m_not_full_cond); +error1: + pthread_mutex_destroy(&m_lock); + return false; +} + +void EventQueue_Close(void) +{ + if (m_read_idx != m_write_idx) + { + LOGW("Queue is not empty when closing\n"); + } + + if (m_event_fd >= 0) + { + if (0 != close(m_event_fd)) + { + LOGE("Failed to close eventfd: %s\n", strerror(errno)); + } + m_event_fd = -1; + } + + int ret = pthread_cond_destroy(&m_not_full_cond); + if (ret != 0) + { + LOGE("Failed to destroy condvar: %s\n", strerror(ret)); + } + ret = pthread_mutex_destroy(&m_lock); + if (ret != 0) + { + LOGE("Failed to destroy mutex: %s\n", strerror(ret)); + } +} + +int EventQueue_get_fd(void) +{ + return m_event_fd; +} + +bool EventQueue_Push(const event_t *const event) +{ + if (!event) + { + LOGE("Cannot add null event to the queue\n"); + return false; + } + + int ret = pthread_mutex_lock(&m_lock); + if (ret != 0) + { + LOGE("Could not lock mutex when pushing: %s\n", strerror(ret)); + return false; + } + + for (int attempt = 0; attempt < PUSH_MAX_RETRIES; attempt++) + { + if (is_buffer_full_locked()) + { + if (!wait_for_not_full_cond_locked()) + { + // Error when waiting; unlock mutex and return. + break; + } + } + else + { + add_to_buffer_locked(event); + ret = pthread_mutex_unlock(&m_lock); + if (ret != 0) + { + LOGE("Could not unlock mutex after adding to buffer: %s\n", strerror(ret)); + } + LOGD("Added event with type: %d\n", event->type); + + // Return true even if writing to eventfd fails because event was + // added to the queue. + wake_eventfd(); + return true; + } + } + + ret = pthread_mutex_unlock(&m_lock); + if (ret != 0) + { + LOGE("Could not unlock mutex after pushing failed: %s\n", strerror(ret)); + } + LOGE("Event queue full after %d retries, dropping event (type=%d)\n", + PUSH_MAX_RETRIES, event->type); + return false; +} + +bool EventQueue_Pop(event_t *const event) +{ + if (!event) + { + return false; + } + + int ret = pthread_mutex_lock(&m_lock); + if (ret != 0) + { + LOGE("Could not lock mutex when popping: %s\n", strerror(ret)); + return false; + } + + const bool is_buffer_empty = (m_read_idx == m_write_idx); + if (!is_buffer_empty) + { + *event = m_buffer[m_read_idx]; + m_read_idx = (m_read_idx + 1) % EVENT_QUEUE_CAPACITY; + LOGD("Popped event with type: %d\n", event->type); + + ret = pthread_cond_signal(&m_not_full_cond); + if (ret != 0) + { + LOGE("Could not send condition signal after popping: %s\n", strerror(ret)); + } + } + + ret = pthread_mutex_unlock(&m_lock); + if (ret != 0) + { + LOGE("Could not unlock mutex after popping: %s\n", strerror(ret)); + } + + return !is_buffer_empty; +} + diff --git a/sink_service/source/event_queue.h b/sink_service/source/event_queue.h new file mode 100644 index 00000000..3e36a9eb --- /dev/null +++ b/sink_service/source/event_queue.h @@ -0,0 +1,92 @@ +#ifndef EVENT_QUEUE_H_ +#define EVENT_QUEUE_H_ + +#include +#include +#include +#include + +// Max payload size for a data packet (MAX_FULL_PACKET_SIZE from c-mesh-api) +#define EVENT_DATA_MAX_PAYLOAD 1500 + +typedef enum +{ + EVENT_TYPE_DATA_RECEIVED, + EVENT_TYPE_STACK_STATUS, +} event_type_e; + +/* + * Note: Data types here match the ones used by sd-bus for the MessageReceived + * signal. num_bytes is uint16_t to save space. + */ +_Static_assert(UINT16_MAX >= EVENT_DATA_MAX_PAYLOAD, ""); +typedef struct +{ + uint64_t timestamp_ms; + uint32_t src_addr; + uint32_t dst_addr; + uint32_t travel_time; + uint16_t num_bytes; + uint8_t qos; + uint8_t src_ep; + uint8_t dst_ep; + uint8_t hop_count; + uint8_t payload[EVENT_DATA_MAX_PAYLOAD]; +} event_data_received_t; + +typedef struct +{ + uint8_t status; +} event_stack_status_t; + +typedef struct +{ + event_type_e type; + union + { + event_data_received_t data_received; + event_stack_status_t stack_status; + } event; +} event_t; + +/** + * \brief Initialize the event queue + * \return true on success, false on error + */ +bool EventQueue_Init(void); + +/** + * \brief Close and clean up the event queue + */ +void EventQueue_Close(void); + +/** + * \brief Get the eventfd which is written when events queued. + * + * The main loop should poll this file descriptor and drain the queue + * when readable. A dummy value of 1 is written to it. + * \return The eventfd file descriptor, or -1 if not initialized. + */ +int EventQueue_get_fd(void); + +/** + * \brief Add an event to the queue + * + * If the queue is full, retries a few times with short waits. + * \param event + * The event to enqueue + * \return true if the event was enqueued, false if dropped. + */ +bool EventQueue_Push(const event_t *const event); + +/** + * \brief Pop an event from the queue. + * \param[out] event + * The dequeued event + * \return true if an event was dequeued, + * false otherwise (error or queue was empty) + */ +bool EventQueue_Pop(event_t *event); + +#endif // EVENT_QUEUE_H_ + diff --git a/sink_service/source/main.c b/sink_service/source/main.c index d3222efa..e4031423 100644 --- a/sink_service/source/main.c +++ b/sink_service/source/main.c @@ -3,6 +3,9 @@ * See file LICENSE for full license details. * */ + +#define _GNU_SOURCE // For ppoll() + #include #include #include @@ -11,6 +14,7 @@ #include #include #include +#include #include @@ -18,6 +22,7 @@ #include "config.h" #include "data.h" #include "otap.h" +#include "event_queue.h" #define LOG_MODULE_NAME "Main" #define MAX_LOG_LEVEL INFO_LOG_LEVEL @@ -300,6 +305,129 @@ static bool setup_signal_handlers_for_stopping() return true; } +static void process_pending_events(const int event_fd) +{ + uint64_t val; + int r = read(event_fd, &val, sizeof(val)); + if (r < 0) + { + LOGE("Could not read from event_fd: %s\n", strerror(errno)); + } + + event_t event; + while (EventQueue_Pop(&event)) + { + switch (event.type) + { + case EVENT_TYPE_DATA_RECEIVED: + Data_SendDataReceivedSignal(&event.event.data_received); + break; + case EVENT_TYPE_STACK_STATUS: + Config_HandleStackStatusChange(event.event.stack_status.status); + break; + default: + LOGE("Unknown event type: %d\n", event.type); + break; + } + } +} + +static int ppoll_with_sd_bus_timeout(struct pollfd *const fds, const nfds_t nfds) +{ + uint64_t deadline_usec; + int r = sd_bus_get_timeout(m_bus, &deadline_usec); + if (r < 0) + { + LOGE("Failed to get dbus timeout: %s\n", strerror(-r)); + errno = -r; + return -1; + } + + if (deadline_usec == UINT64_MAX) + { + return ppoll(fds, nfds, NULL, NULL); + } + + struct timespec now; + if (clock_gettime(CLOCK_MONOTONIC, &now) != 0) + { + LOGE("Could not get current time: %s\n", strerror(errno)); + return -1; + } + + static const uint64_t US_PER_SEC = 1000000; + static const uint64_t NS_PER_US = 1000; + + const uint64_t now_usec = ((uint64_t) now.tv_sec * US_PER_SEC) + (now.tv_nsec / NS_PER_US); + const uint64_t delta_usec = (deadline_usec > now_usec) ? (deadline_usec - now_usec) : 0; + + struct timespec timeout = { + .tv_sec = delta_usec / US_PER_SEC, + .tv_nsec = (delta_usec % US_PER_SEC) * NS_PER_US, + }; + + return ppoll(fds, nfds, &timeout, NULL); +} + +static int do_main_loop() +{ + int r = 0; + int event_fd = EventQueue_get_fd(); + if (event_fd < 0) + { + LOGE("Event queue fd not available\n"); + return -1; + } + + while (!m_stop_requested) + { + int dbus_fd = sd_bus_get_fd(m_bus); + if (dbus_fd < 0) + { + LOGE("Failed to get dbus fd: %s\n", strerror(-dbus_fd)); + return dbus_fd; + } + + int dbus_events = sd_bus_get_events(m_bus); + if (dbus_events < 0) + { + LOGE("Failed to get dbus events: %s\n", strerror(-dbus_events)); + return dbus_events; + } + + struct pollfd fds[2] = { + { .fd = dbus_fd, .events = dbus_events }, + { .fd = event_fd, .events = POLLIN }, + }; + + r = ppoll_with_sd_bus_timeout(fds, (sizeof(fds) / sizeof(fds[0]))); + + if (r < 0) + { + if (errno == EINTR) + { + continue; + } + LOGE("poll failed: %s\n", strerror(errno)); + return -errno; + } + + r = sd_bus_process(m_bus, NULL); + if (r < 0) + { + LOGE("Failed to process dbus: %s\n", strerror(-r)); + return r; + } + + if (fds[1].revents & POLLIN) + { + process_pending_events(event_fd); + } + } + + return 0; +} + static void print_version_to_stdout() { printf("Sink service version: %s\n", SINK_SERVICE_VERSION); @@ -462,6 +590,12 @@ int main(int argc, char * argv[]) return EXIT_FAILURE; } + if (!EventQueue_Init()) + { + LOGE("Cannot initialize event queue\n"); + return EXIT_FAILURE; + } + /* Connect to the user bus */ r = sd_bus_open_system(&m_bus); if (r < 0) @@ -499,38 +633,17 @@ int main(int argc, char * argv[]) goto finish; } - while (!m_stop_requested) - { - /* Process requests */ - r = sd_bus_process(m_bus, NULL); - if (r < 0) - { - LOGE("Failed to process bus: %s\n", strerror(-r)); - goto finish; - } - - /* we processed a request, try to process another one, right-away */ - if (r > 0) - continue; - - /* Wait for the next request to process */ - /* sd_bus_wait uses ppoll() internally, and also returns if a signal is */ - /* caught. */ - r = sd_bus_wait(m_bus, (uint64_t) -1); - if (r < 0) - { - LOGE("Failed to wait on bus: %s\n", strerror(-r)); - goto finish; - } - } + r = do_main_loop(); finish: LOGI("Exiting\n"); + WPC_close(); Otap_Close(); Data_Close(); Config_Close(); + EventQueue_Close(); sd_bus_unref(m_bus); - WPC_close(); return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS; } +