Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sink_service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_executable(${CMAKE_PROJECT_NAME}
source/main.c
source/config.c
source/data.c
source/event_queue.c
source/otap.c
)

Expand All @@ -38,5 +39,5 @@ target_compile_definitions(${CMAKE_PROJECT_NAME} PRIVATE
C_MESH_API_COMMIT_HASH="${C_MESH_API_COMMIT_HASH}"
)

target_link_libraries(${CMAKE_PROJECT_NAME} wpc PkgConfig::systemd)
target_link_libraries(${CMAKE_PROJECT_NAME} wpc PkgConfig::systemd pthread)

15 changes: 15 additions & 0 deletions sink_service/source/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "config.h"
#include "config_macros.h"
#include "event_queue.h"
#include "wpc.h"

#define LOG_MODULE_NAME "Config"
Expand Down Expand Up @@ -1043,6 +1044,19 @@ static bool initialize_unmodifiable_variables()
}

static void on_stack_boot_status(uint8_t status)
{
const event_t event = {
.type = EVENT_TYPE_STACK_STATUS,
.event.stack_status.status = status
};

if (!EventQueue_Push(&event))
{
LOGE("Failed to enqueue stack status event\n");
}
}

void Config_HandleStackStatusChange(const uint8_t status)
{
/* After a reboot, read again the variable as it can be because
* of an otap and variables may change
Expand Down Expand Up @@ -1108,3 +1122,4 @@ void Config_Close()
sd_bus_slot_unref(m_slot);
}
}

11 changes: 11 additions & 0 deletions sink_service/source/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define SINK_MANAGER_SOURCE_CONFIG_H_

#include <systemd/sd-bus.h>
#include <stdint.h>

/**
* \brief Initialize the config module
Expand All @@ -22,4 +23,14 @@ int Config_Init(sd_bus * bus, char * object, char * interface);

void Config_Close();

/**
* \brief Handle a stack status change
* Re-reads some node attributes and sends StackStarted or
* StackStopped dbus signal.
* \param status
* 0 = stack started, otherwise stack stopped
*/
void Config_HandleStackStatusChange(const uint8_t status);

#endif /* SINK_MANAGER_SOURCE_CONFIG_H_ */

98 changes: 65 additions & 33 deletions sink_service/source/data.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
#include <stddef.h>
#include <stdlib.h>
#include <stdbool.h>
#include <errno.h>
#include <string.h>
#include <time.h>

#include "data.h"
#include "wpc.h"
#include "event_queue.h"

#define LOG_MODULE_NAME "Data"
#define MAX_LOG_LEVEL INFO_LOG_LEVEL
Expand Down Expand Up @@ -157,54 +158,42 @@ static bool onDataReceived(const uint8_t * bytes,
uint8_t hop_count,
unsigned long long timestamp_ms)
{
__attribute__((cleanup(sd_bus_message_unrefp))) sd_bus_message *m = NULL;
int r;

LOGD("%llu -> Data received on EP %d of len %d from 0x%x to 0x%x\n",
timestamp_ms,
dst_ep,
num_bytes,
src_addr,
dst_addr);

/* Create a new signal to be generated on Dbus */
r = sd_bus_message_new_signal(m_bus, &m, m_object, m_interface, "MessageReceived");

if (r < 0)
if (num_bytes > EVENT_DATA_MAX_PAYLOAD)
{
LOGE("Cannot create signal error=%s\n", strerror(-r));
LOGE("Payload too large: %zu > %d\n", num_bytes, EVENT_DATA_MAX_PAYLOAD);
return false;
}

/* Load all parameters */
// clang-format off
r = sd_bus_message_append(m,
"tuuyyuyy",
timestamp_ms,
src_addr,
dst_addr,
src_ep,
dst_ep,
travel_time,
qos,
hop_count);
// clang-format on
if (r < 0)
event_t event = {
.type = EVENT_TYPE_DATA_RECEIVED,
.event.data_received = {
.timestamp_ms = timestamp_ms,
.num_bytes = num_bytes,
.src_addr = src_addr,
.dst_addr = dst_addr,
.travel_time = travel_time,
.qos = (uint8_t) qos,
.src_ep = src_ep,
.dst_ep = dst_ep,
.hop_count = hop_count,
},
};
memcpy(event.event.data_received.payload, bytes, num_bytes);

if (!EventQueue_Push(&event))
{
LOGE("Cannot append info error=%s\n", strerror(-r));
LOGE("Failed to enqueue data received event\n");
return false;
}

r = sd_bus_message_append_array(m, 'y', bytes, num_bytes);
if (r < 0)
{
LOGE("Cannot append array error=%s\n", strerror(-r));
return false;
}

/* Send the signal on bus */
sd_bus_send(m_bus, m, NULL);

return true;
}

Expand Down Expand Up @@ -279,3 +268,46 @@ void Data_Close()
sd_bus_slot_unref(m_slot);
}
}

void Data_SendDataReceivedSignal(const event_data_received_t *const data)
{
__attribute__((cleanup(sd_bus_message_unrefp))) sd_bus_message *msg = NULL;

int r = sd_bus_message_new_signal(m_bus, &msg, m_object, m_interface, "MessageReceived");
if (r < 0)
{
LOGE("Cannot create dbus signal:%s\n", strerror(-r));
return;
}


r = sd_bus_message_append(msg,
"tuuyyuyy",
data->timestamp_ms,
data->src_addr,
data->dst_addr,
data->src_ep,
data->dst_ep,
data->travel_time,
data->qos,
data->hop_count);
if (r < 0)
{
LOGE("Cannot append to signal: %s\n", strerror(-r));
return;
}

r = sd_bus_message_append_array(msg, 'y', data->payload, data->num_bytes);
if (r < 0)
{
LOGE("Cannot append array to signal: %s\n", strerror(-r));
return;
}

r = sd_bus_send(m_bus, msg, NULL);
if (r < 0)
{
LOGE("Cannot send signal: %s\n", strerror(-r));
}
}

9 changes: 9 additions & 0 deletions sink_service/source/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define SINK_MANAGER_SOURCE_DATA_H_

#include <systemd/sd-bus.h>
#include "event_queue.h"

/**
* \brief Initialize the data module
Expand All @@ -24,4 +25,12 @@ int Data_Init(sd_bus * bus, char * object, char * interface, size_t downlink_lim

void Data_Close();

/**
* \brief Send a MessageReceived dbus signal
* \param data
* The received data event from the event queue
*/
void Data_SendDataReceivedSignal(const event_data_received_t *const data);

#endif /* SINK_MANAGER_SOURCE_DATA_H_ */

Loading