From ae826ccacd937a355288dcf8ba2d5cf2e5c1259a Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Thu, 23 Apr 2026 19:53:18 +0800 Subject: [PATCH 01/10] * FIX [trasnport/mqtt] free nano_qos_db in pipe_fini for safety Signed-off-by: JaylinYu --- src/sp/transport/mqtt/broker_tcp.c | 14 +++++++------- src/sp/transport/mqtts/broker_tls.c | 14 +++++++------- src/sp/transport/mqttws/nmq_websocket.c | 19 +++++++++++-------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/sp/transport/mqtt/broker_tcp.c b/src/sp/transport/mqtt/broker_tcp.c index 573bc65bd..4587bf2d1 100644 --- a/src/sp/transport/mqtt/broker_tcp.c +++ b/src/sp/transport/mqtt/broker_tcp.c @@ -150,13 +150,6 @@ tcptran_pipe_close(void *arg) nni_free(p->npipe->subinfol, sizeof(nni_list)); p->npipe->subinfol = NULL; } - void *nano_qos_db = p->npipe->nano_qos_db; - if (!p->conf->sqlite.enable && nano_qos_db != NULL) { - nni_qos_db_remove_all_msg( - false, nano_qos_db, tran_close_unack_msg_cb); - nni_qos_db_fini_id_hash(nano_qos_db); - p->npipe->nano_qos_db = NULL; - } nni_lmq_flush(&p->rslmq); nni_mtx_unlock(&p->mtx); @@ -228,6 +221,13 @@ tcptran_pipe_fini(void *arg) nni_mtx_unlock(&ep->mtx); } nni_mtx_lock(&p->mtx); + void *nano_qos_db = p->npipe->nano_qos_db; + if (!p->conf->sqlite.enable && nano_qos_db != NULL) { + nni_qos_db_remove_all_msg( + false, nano_qos_db, tran_close_unack_msg_cb); + nni_qos_db_fini_id_hash(nano_qos_db); + p->npipe->nano_qos_db = NULL; + } if (p->tcp_cparam) { conn_param_free(p->tcp_cparam); //reap thread working p->tcp_cparam = NULL; diff --git a/src/sp/transport/mqtts/broker_tls.c b/src/sp/transport/mqtts/broker_tls.c index 2c61b320b..eef06fef9 100644 --- a/src/sp/transport/mqtts/broker_tls.c +++ b/src/sp/transport/mqtts/broker_tls.c @@ -153,13 +153,6 @@ tlstran_pipe_close(void *arg) nni_free(p->npipe->subinfol, sizeof(nni_list)); p->npipe->subinfol = NULL; } - void *nano_qos_db = p->npipe->nano_qos_db; - if (!p->conf->sqlite.enable && nano_qos_db != NULL) { - nni_qos_db_remove_all_msg( - false, nano_qos_db, tran_close_unack_msg_cb); - nni_qos_db_fini_id_hash(nano_qos_db); - p->npipe->nano_qos_db = NULL; - } nni_lmq_flush(&p->rslmq); nni_mtx_unlock(&p->mtx); @@ -235,6 +228,13 @@ tlstran_pipe_fini(void *arg) nni_mtx_unlock(&ep->mtx); } nni_mtx_lock(&p->mtx); + void *nano_qos_db = p->npipe->nano_qos_db; + if (!p->conf->sqlite.enable && nano_qos_db != NULL) { + nni_qos_db_remove_all_msg( + false, nano_qos_db, tran_close_unack_msg_cb); + nni_qos_db_fini_id_hash(nano_qos_db); + p->npipe->nano_qos_db = NULL; + } if (p->tcp_cparam) { conn_param_free(p->tcp_cparam); p->tcp_cparam = NULL; diff --git a/src/sp/transport/mqttws/nmq_websocket.c b/src/sp/transport/mqttws/nmq_websocket.c index f1b12fb35..80796441d 100644 --- a/src/sp/transport/mqttws/nmq_websocket.c +++ b/src/sp/transport/mqttws/nmq_websocket.c @@ -616,7 +616,8 @@ wstran_pipe_recv_cb(void *arg) nni_aio_finish_error(p->ep_aio, rv); } else if (uaio != NULL) { nni_aio_set_msg(uaio, NULL); - nni_aio_finish_error(uaio, p->err_code == MQTT_SUCCESS ? rv : p->err_code); + nni_aio_finish_error(uaio, + p->err_code == MQTT_SUCCESS ? rv : (int) p->err_code); } else { // Let protocol layer do the close first. nng_stream_close(p->ws); @@ -1333,6 +1334,14 @@ wstran_pipe_fini(void *arg) nni_msg_free(p->tmp_msg); nni_lmq_flush(&p->recvlmq); nng_free(p->qos_buf, 16 + NNI_NANO_MAX_PACKET_SIZE); + // must free nano_qos_db in fini for safety + void *nano_qos_db = p->npipe->nano_qos_db; + if (!p->conf->sqlite.enable && nano_qos_db != NULL) { + nni_qos_db_remove_all_msg( + false, nano_qos_db, tran_close_unack_msg_cb); + nni_qos_db_fini_id_hash(nano_qos_db); + p->npipe->nano_qos_db = NULL; + } nni_mtx_unlock(&p->mtx); nng_stream_free(p->ws); @@ -1375,13 +1384,7 @@ wstran_pipe_close(void *arg) nni_free(p->npipe->subinfol, sizeof(nni_list)); p->npipe->subinfol = NULL; } - void *nano_qos_db = p->npipe->nano_qos_db; - if (!p->conf->sqlite.enable && nano_qos_db != NULL) { - nni_qos_db_remove_all_msg( - false, nano_qos_db, tran_close_unack_msg_cb); - nni_qos_db_fini_id_hash(nano_qos_db); - p->npipe->nano_qos_db = NULL; - } + nni_lmq_flush(&p->rslmq); nni_mtx_unlock(&p->mtx); nng_stream_close(p->ws); From 9cc630a3b1cfeb007bba9f598cef21b797070f1d Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Thu, 23 Apr 2026 19:53:40 +0800 Subject: [PATCH 02/10] * MDF [nmq_mqtt] protect closed pipe from qos_db msg resending Signed-off-by: JaylinYu --- src/sp/protocol/mqtt/nmq_mqtt.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/sp/protocol/mqtt/nmq_mqtt.c b/src/sp/protocol/mqtt/nmq_mqtt.c index 484cec3ab..a9db93893 100644 --- a/src/sp/protocol/mqtt/nmq_mqtt.c +++ b/src/sp/protocol/mqtt/nmq_mqtt.c @@ -229,6 +229,10 @@ nano_pipe_timer_cb(void *arg) nni_sleep_aio(qos_duration * 1000, &p->aio_timer); nni_mtx_unlock(&sock->lk); return; + } else if (nni_atomic_get_bool(&p->closed)) { + // stop sending msg to closed pipe + log_warn("pipe is already closed"); + return; } nni_mtx_lock(&p->lk); From 628c200144a26109691bd5abcd893db1fe1f2366 Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Fri, 24 Apr 2026 15:35:06 +0800 Subject: [PATCH 03/10] * MDF [transport/mqtt] add transport safety guard Signed-off-by: JaylinYu --- src/sp/protocol/mqtt/nmq_mqtt.c | 7 ++++--- src/sp/transport/mqtt/broker_tcp.c | 3 ++- src/sp/transport/mqtts/broker_tls.c | 3 ++- src/sp/transport/mqttws/nmq_websocket.c | 3 ++- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/sp/protocol/mqtt/nmq_mqtt.c b/src/sp/protocol/mqtt/nmq_mqtt.c index a9db93893..7a73da573 100644 --- a/src/sp/protocol/mqtt/nmq_mqtt.c +++ b/src/sp/protocol/mqtt/nmq_mqtt.c @@ -229,13 +229,14 @@ nano_pipe_timer_cb(void *arg) nni_sleep_aio(qos_duration * 1000, &p->aio_timer); nni_mtx_unlock(&sock->lk); return; - } else if (nni_atomic_get_bool(&p->closed)) { + } + nni_mtx_lock(&p->lk); + if (nni_atomic_get_bool(&p->closed)) { // stop sending msg to closed pipe + nni_mtx_unlock(&p->lk); log_warn("pipe is already closed"); return; } - nni_mtx_lock(&p->lk); - // According to the MQTT protocol, when keepalive is 0, the server should not check the keepalive timeout. if (p->keepalive != 0) { qos_backoff = p->ka_refresh * (qos_duration) * 1000 - diff --git a/src/sp/transport/mqtt/broker_tcp.c b/src/sp/transport/mqtt/broker_tcp.c index 4587bf2d1..82e5c884d 100644 --- a/src/sp/transport/mqtt/broker_tcp.c +++ b/src/sp/transport/mqtt/broker_tcp.c @@ -222,7 +222,8 @@ tcptran_pipe_fini(void *arg) } nni_mtx_lock(&p->mtx); void *nano_qos_db = p->npipe->nano_qos_db; - if (!p->conf->sqlite.enable && nano_qos_db != NULL) { + if (p->npipe != NULL && p->conf != NULL && + !p->conf->sqlite.enable && nano_qos_db != NULL) { nni_qos_db_remove_all_msg( false, nano_qos_db, tran_close_unack_msg_cb); nni_qos_db_fini_id_hash(nano_qos_db); diff --git a/src/sp/transport/mqtts/broker_tls.c b/src/sp/transport/mqtts/broker_tls.c index eef06fef9..5d0de0f55 100644 --- a/src/sp/transport/mqtts/broker_tls.c +++ b/src/sp/transport/mqtts/broker_tls.c @@ -229,7 +229,8 @@ tlstran_pipe_fini(void *arg) } nni_mtx_lock(&p->mtx); void *nano_qos_db = p->npipe->nano_qos_db; - if (!p->conf->sqlite.enable && nano_qos_db != NULL) { + if (p->npipe != NULL && p->conf != NULL && + !p->conf->sqlite.enable && nano_qos_db != NULL) { nni_qos_db_remove_all_msg( false, nano_qos_db, tran_close_unack_msg_cb); nni_qos_db_fini_id_hash(nano_qos_db); diff --git a/src/sp/transport/mqttws/nmq_websocket.c b/src/sp/transport/mqttws/nmq_websocket.c index 80796441d..4c97c58fa 100644 --- a/src/sp/transport/mqttws/nmq_websocket.c +++ b/src/sp/transport/mqttws/nmq_websocket.c @@ -1336,7 +1336,8 @@ wstran_pipe_fini(void *arg) nng_free(p->qos_buf, 16 + NNI_NANO_MAX_PACKET_SIZE); // must free nano_qos_db in fini for safety void *nano_qos_db = p->npipe->nano_qos_db; - if (!p->conf->sqlite.enable && nano_qos_db != NULL) { + if (p->npipe != NULL && p->conf != NULL && + !p->conf->sqlite.enable && nano_qos_db != NULL) { nni_qos_db_remove_all_msg( false, nano_qos_db, tran_close_unack_msg_cb); nni_qos_db_fini_id_hash(nano_qos_db); From 854b9b4fdcac5418d01923f7272466f7a3d0be48 Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Wed, 29 Apr 2026 14:21:46 +0800 Subject: [PATCH 04/10] * RM [CMakeList] remove BLF code Signed-off-by: JaylinYu --- src/supplemental/nanolib/CMakeLists.txt | 1 - src/supplemental/nanolib/blf/CMakeLists.txt | 6 - src/supplemental/nanolib/blf/blf.cc | 453 -------------------- 3 files changed, 460 deletions(-) delete mode 100644 src/supplemental/nanolib/blf/CMakeLists.txt delete mode 100644 src/supplemental/nanolib/blf/blf.cc diff --git a/src/supplemental/nanolib/CMakeLists.txt b/src/supplemental/nanolib/CMakeLists.txt index 3274e966f..a8e6d7aaa 100644 --- a/src/supplemental/nanolib/CMakeLists.txt +++ b/src/supplemental/nanolib/CMakeLists.txt @@ -46,5 +46,4 @@ endif (SUPP_RULE_ENGINE) add_subdirectory(linkedlist) add_subdirectory(ringbuffer) add_subdirectory(parquet) -add_subdirectory(blf) diff --git a/src/supplemental/nanolib/blf/CMakeLists.txt b/src/supplemental/nanolib/blf/CMakeLists.txt deleted file mode 100644 index 769b4b37a..000000000 --- a/src/supplemental/nanolib/blf/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -if(NNG_ENABLE_BLF) - nng_sources(blf.cc) - find_package(Vector_BLF REQUIRED) - nng_include_directories(${Vector_BLF_INCLUDE_DIRS}) - nng_link_libraries(${Vector_BLF_LIBRARIES}) -endif() diff --git a/src/supplemental/nanolib/blf/blf.cc b/src/supplemental/nanolib/blf/blf.cc deleted file mode 100644 index 6229d57c0..000000000 --- a/src/supplemental/nanolib/blf/blf.cc +++ /dev/null @@ -1,453 +0,0 @@ -#include "nng/supplemental/nanolib/blf.h" -#include "nng/supplemental/nanolib/log.h" -#include "nng/supplemental/nanolib/queue.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -using namespace std; - -#define UINT64_MAX_DIGITS 20 -#define _Atomic(X) std::atomic -static atomic_bool is_available = { false }; -#define WAIT_FOR_AVAILABLE \ - while (!is_available) \ - nng_msleep(10); -static conf_blf *g_conf = NULL; - -#define DO_IT_IF_NOT_NULL(func, arg1, arg2) \ - if (arg1) { \ - func(arg1, arg2); \ - } - -#define FREE_IF_NOT_NULL(free, size) DO_IT_IF_NOT_NULL(nng_free, free, size) - -#define json_read_num(structure, field, key, jso) \ - do { \ - cJSON *jso_key = cJSON_GetObjectItem(jso, key); \ - if (NULL == jso_key) { \ - log_debug("Config %s is not set, use default!", key); \ - break; \ - } \ - if (cJSON_IsNumber(jso_key)) { \ - if (jso_key->valuedouble > 0) \ - (structure)->field = jso_key->valuedouble; \ - } \ - } while (0); - -CircularQueue blf_queue; -CircularQueue blf_file_queue; -pthread_mutex_t blf_queue_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_cond_t blf_queue_not_empty = PTHREAD_COND_INITIALIZER; - -static bool -directory_exists(const std::string &directory_path) -{ - struct stat buffer; - return (stat(directory_path.c_str(), &buffer) == 0 && - S_ISDIR(buffer.st_mode)); -} - -static bool -create_directory(const std::string &directory_path) -{ - int status = mkdir( - directory_path.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - return (status == 0); -} - -blf_file_range * -blf_file_range_alloc(uint32_t start_idx, uint32_t end_idx, char *filename) -{ - blf_file_range *range = new blf_file_range; - range->start_idx = start_idx; - range->end_idx = end_idx; - range->filename = nng_strdup(filename); - return range; -} - -void -blf_file_range_free(blf_file_range *range) -{ - if (range) { - FREE_IF_NOT_NULL(range->filename, strlen(range->filename)); - delete range; - } -} - -blf_object * -blf_object_alloc(uint64_t *keys, uint8_t **darray, uint32_t *dsize, - uint32_t size, nng_aio *aio, void *arg) -{ - blf_object *elem = new blf_object; - elem->keys = keys; - elem->darray = darray; - elem->dsize = dsize; - elem->size = size; - elem->aio = aio; - elem->arg = arg; - elem->ranges = new blf_file_ranges; - elem->ranges->range = NULL; - elem->ranges->start = 0; - elem->ranges->size = 0; - return elem; -} - -void -blf_object_free(blf_object *elem) -{ - if (elem) { - FREE_IF_NOT_NULL(elem->keys, elem->size); - FREE_IF_NOT_NULL(elem->dsize, elem->size); - nng_aio_set_prov_data(elem->aio, elem->arg); - nng_aio_set_output(elem->aio, 1, elem->ranges); - uint32_t *szp = (uint32_t *) malloc(sizeof(uint32_t)); - *szp = elem->size; - nng_aio_set_msg(elem->aio, (nng_msg *) szp); - DO_IT_IF_NOT_NULL(nng_aio_finish_sync, elem->aio, 0); - FREE_IF_NOT_NULL(elem->darray, elem->size); - for (int i = 0; i < elem->ranges->size; i++) { - blf_file_range_free(elem->ranges->range[i]); - } - free(elem->ranges->range); - delete elem->ranges; - delete elem; - } -} - -static char * -get_file_name(conf_blf *conf, uint64_t key_start, uint64_t key_end) -{ - char *file_name = NULL; - char *dir = conf->dir; - char *prefix = conf->file_name_prefix; - - file_name = (char *) malloc(strlen(prefix) + strlen(dir) + - UINT64_MAX_DIGITS + UINT64_MAX_DIGITS + 16); - if (file_name == NULL) { - log_error("Failed to allocate memory for file name."); - return NULL; - } - - sprintf(file_name, "%s/%s-%" PRIu64 "~%" PRIu64 ".blf", dir, prefix, - key_start, key_end); - ENQUEUE(blf_file_queue, file_name); - return file_name; -} - -static int -compute_new_index(blf_object *obj, uint32_t index, uint32_t file_size) -{ - uint64_t size = 0; - uint32_t new_index; - for (new_index = index; size < file_size && new_index < obj->size - 1; - new_index++) { - size += obj->dsize[new_index]; - } - return new_index; -} - -static int -remove_old_file(void) -{ - char *filename = (char *) DEQUEUE(blf_file_queue); - if (remove(filename) == 0) { - log_debug("File '%s' removed successfully.\n", filename); - } else { - log_error("Error removing the file %s", filename); - return -1; - } - - free(filename); - return 0; -} - -void -update_blf_file_ranges(conf_blf *conf, blf_object *elem, blf_file_range *range) -{ - if (elem->ranges->size != conf->file_count) { - elem->ranges->range = - (blf_file_range **) realloc(elem->ranges->range, - sizeof(blf_file_range *) * (++elem->ranges->size)); - elem->ranges->range[elem->ranges->size - 1] = range; - } else { - // Free old ranges and insert new ranges - // update start index - blf_file_range_free(elem->ranges->range[elem->ranges->start]); - elem->ranges->range[elem->ranges->start] = range; - elem->ranges->start++; - elem->ranges->start %= elem->ranges->size; - } -} - -void -read_binary_data(const std::string &inputString, unsigned int inputSize, - array &data) -{ - std::vector binaryData; - for (size_t i = 0; 2 * i < inputString.length(); i++) { - std::istringstream iss(inputString.substr(2 * i, 2)); - unsigned int value; - iss >> std::hex >> value; - data[i] = (static_cast(value)); - } -} - -void -blf_write_can_message(Vector::BLF::File &file, cJSON *jso) -{ - /* write a CanMessage */ - // puts(cJSON_Print(jso)); - auto *canMessage = new Vector::BLF::CanMessage; - json_read_num(canMessage, id, "id", jso); - json_read_num(canMessage, objectTimeStamp, "t", jso); - json_read_num(canMessage, channel, "bus", jso); - json_read_num(canMessage, flags, "d", jso); - json_read_num(canMessage, dlc, "l", jso); - cJSON *data = cJSON_GetObjectItem(jso, "data"); - // printf("ori: %s\n", data->valuestring); - read_binary_data(data->valuestring, canMessage->dlc, canMessage->data); - file.write(canMessage); -} - -int -blf_write_core( - char *name, blf_object *elem, uint32_t old_index, uint32_t new_index) -{ - /* open file for writing */ - Vector::BLF::File file; - file.open(name, std::ios_base::out); - if (!file.is_open()) { - std::cout << "Unable to open file" << std::endl; - return -1; - } - - for (uint32_t i = old_index; i <= new_index; i++) { - - cJSON *jso = cJSON_ParseWithLength( - (const char *) elem->darray[i], elem->dsize[i]); - cJSON *frames = cJSON_GetObjectItem(jso, "frames"); - cJSON *frame = NULL; - cJSON_ArrayForEach(frame, frames) - { - /* write a CanMessage */ - blf_write_can_message(file, frame); - } - - cJSON_Delete(jso); - } - - - /* close file */ - file.close(); - return 0; -} - -int -blf_write(conf_blf *conf, blf_object *elem) -{ - uint32_t old_index = 0; - uint32_t new_index = 0; -again: - - new_index = compute_new_index(elem, old_index, conf->file_size); - uint64_t key_start = elem->keys[old_index]; - uint64_t key_end = elem->keys[new_index]; - pthread_mutex_lock(&blf_queue_mutex); - char *filename = get_file_name(conf, key_start, key_end); - if (filename == NULL) { - pthread_mutex_unlock(&blf_queue_mutex); - log_error("Failed to get file name"); - return -1; - } - - if (QUEUE_SIZE(blf_file_queue) > conf->file_count) { - remove_old_file(); - } - pthread_mutex_unlock(&blf_queue_mutex); - - { - blf_file_range *range = - blf_file_range_alloc(old_index, new_index, filename); - update_blf_file_ranges(conf, elem, range); - // write value - blf_write_core(filename, elem, old_index, new_index); - old_index = new_index; - - if (new_index != elem->size - 1) - goto again; - } - - blf_object_free(elem); - return 0; -} - -void -blf_write_loop(void *config) -{ - if (config == NULL) { - log_error("blf conf is NULL"); - } - - conf_blf *conf = (conf_blf *) config; - if (!directory_exists(conf->dir)) { - if (!create_directory(conf->dir)) { - log_error("Failed to create directory %s", conf->dir); - return; - } - } - - while (true) { - // wait for mqtt messages to send method request - pthread_mutex_lock(&blf_queue_mutex); - - while (IS_EMPTY(blf_queue)) { - pthread_cond_wait( - &blf_queue_not_empty, &blf_queue_mutex); - } - - log_debug("fetch element from blf queue"); - blf_object *ele = (blf_object *) DEQUEUE(blf_queue); - - pthread_mutex_unlock(&blf_queue_mutex); - - blf_write(conf, ele); - } -} - -int -blf_write_batch_async(blf_object *elem) -{ - if (g_conf == NULL || g_conf->enable == false) { - log_error("BLF is not ready or not launch!"); - return -1; - } - WAIT_FOR_AVAILABLE - pthread_mutex_lock(&blf_queue_mutex); - if (IS_EMPTY(blf_queue)) { - pthread_cond_broadcast(&blf_queue_not_empty); - } - ENQUEUE(blf_queue, elem); - log_debug("enqueue element."); - - pthread_mutex_unlock(&blf_queue_mutex); - return 0; -} - -int -blf_write_launcher(conf_blf *conf) -{ - g_conf = conf; - INIT_QUEUE(blf_queue); - INIT_QUEUE(blf_file_queue); - is_available = true; - thread write_loop(blf_write_loop, conf); - write_loop.detach(); - return 0; -} - -static void -get_range(const char *name, uint64_t range[2]) -{ - const char *start = strrchr(name, '-'); - sscanf(start, "-%ld~%ld.parquet", &range[0], &range[1]); - return; -} - -static bool -compare_callback(void *name, uint64_t key) -{ - uint64_t range[2] = { 0 }; - get_range((const char *) name, range); - return (key >= range[0] && key <= range[1]); -} - -static bool -compare_callback_span(void *name, uint64_t low, uint64_t high) -{ - uint64_t range[2] = { 0 }; - get_range((const char *) name, range); - return !(low > range[1] || high < range[0]); -} - -const char * -blf_find(uint64_t key) -{ - if (g_conf == NULL || g_conf->enable == false) { - log_error("BLF is not ready or not launch!"); - return NULL; - } - WAIT_FOR_AVAILABLE - const char *value = NULL; - void *elem = NULL; - pthread_mutex_lock(&blf_queue_mutex); - FOREACH_QUEUE(blf_file_queue, elem) - { - if (elem && compare_callback(elem, key)) { - value = nng_strdup((char *) elem); - break; - } - } - pthread_mutex_unlock(&blf_queue_mutex); - return value; -} - -const char ** -blf_find_span(uint64_t start_key, uint64_t end_key, uint32_t *size) -{ - if (g_conf == NULL || g_conf->enable == false) { - log_error("BLF is not ready or not launch!"); - return NULL; - } - if (start_key > end_key) { - log_error("Start key can't be greater than end_key."); - *size = 0; - return NULL; - } - - WAIT_FOR_AVAILABLE - - uint64_t low = start_key; - uint64_t high = end_key; - uint32_t local_size = 0; - const char *value = NULL; - const char **array = NULL; - const char **ret = NULL; - void *elem = NULL; - - pthread_mutex_lock(&blf_queue_mutex); - if (blf_file_queue.size != 0) { - array = (const char **) nng_alloc( - sizeof(char *) * blf_file_queue.size); - - ret = array; - FOREACH_QUEUE(blf_file_queue, elem) - { - if (elem) { - if (compare_callback_span(elem, low, high)) { - ++local_size; - value = nng_strdup((char *) elem); - *array++ = value; - } - } - } - } - - pthread_mutex_unlock(&blf_queue_mutex); - (*size) = local_size; - return ret; -} From 92ff03dde189a74fe73d92f7ae0b6cc658df61eb Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Wed, 29 Apr 2026 14:22:20 +0800 Subject: [PATCH 05/10] * NEW [conf] add new nng_proxy conf obj Signed-off-by: JaylinYu --- include/nng/supplemental/nanolib/conf.h | 15 ++++++++++++-- src/supplemental/nanolib/conf.c | 26 +++++++++++-------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/include/nng/supplemental/nanolib/conf.h b/include/nng/supplemental/nanolib/conf.h index 1a8062539..d3110fca7 100644 --- a/include/nng/supplemental/nanolib/conf.h +++ b/include/nng/supplemental/nanolib/conf.h @@ -597,6 +597,17 @@ struct conf_web_hook { }; typedef struct conf_web_hook conf_web_hook; +struct conf_nng_bridge { + bool enable; + char *sub_url; + char *pub_url; + char protocol; + char transport; + nng_socket pub_sock; + nng_socket sub_sock; +}; + +typedef struct conf_nng_bridge conf_nng_bridge; typedef enum { memory, @@ -616,7 +627,7 @@ struct conf { int num_taskq_thread; int max_taskq_thread; uint32_t parallel; // broker ctx - uint64_t total_ctx; // Total ctx of work (bridge + AWS + broker + HTTP) + uint64_t total_ctx; // aio number of any ctx will send msg to bridge. (bridge + AWS + broker + HTTP + nng_sub) uint64_t max_packet_size; // byte uint32_t client_max_packet_size; // byte uint32_t max_inflight_window; @@ -641,7 +652,7 @@ struct conf { conf_bridge aws_bridge; // AWS IoT Core conf_exchange exchange; conf_parquet parquet; - conf_blf blf; + conf_nng_bridge nng_proxy; #if defined(SUPP_PLUGIN) conf_plugin plugin; #endif diff --git a/src/supplemental/nanolib/conf.c b/src/supplemental/nanolib/conf.c index 8bca7a055..d9e14654a 100644 --- a/src/supplemental/nanolib/conf.c +++ b/src/supplemental/nanolib/conf.c @@ -50,6 +50,7 @@ static void conf_sqlite_destroy(conf_sqlite *sqlite); static void conf_web_hook_parse(conf_web_hook *webhook, const char *path); static void conf_web_hook_destroy(conf_web_hook *web_hook); static void conf_preset_sessions_init(conf_preset_session *session); +static void conf_nng_proxy_init(conf_nng_bridge *proxy); #if defined(ENABLE_LOG) static void conf_log_init(conf_log *log); @@ -969,7 +970,9 @@ conf_init(conf *nanomq_conf) nng_mtx_alloc(&nanomq_conf->auth_http.acl_cache_mtx); nanomq_conf->auth_http.acl_cache_reset_aio = NULL; nanomq_conf->ext_qos_db = NULL; + conf_preset_sessions_init(&nanomq_conf->pre_sessions); + conf_nng_proxy_init(&nanomq_conf->nng_proxy); memset(nanomq_conf->exec_path, 0, 512); nng_atomic_alloc(&nanomq_conf->lc); // Marks current total connections } @@ -1222,19 +1225,6 @@ print_parquet_conf(conf_parquet *parquet) log_info("parquet limit_frequency: %d", parquet->limit_frequency); } -static void -print_blf_conf(conf_blf *blf) -{ - if (!blf->enable) - return; - log_info("blf dir: %s", blf->dir); - const char *encode_type = get_compress_type(blf->comp_type); - log_info("blf compress: %s", encode_type); - log_info("blf file_name_prefix: %s", blf->file_name_prefix); - log_info("blf file_count: %d", blf->file_count); - log_info("blf file_size: %d", blf->file_size); -} - #if defined(SUPP_RULE_ENGINE) static void print_rule_engine_conf(conf_rule *rule_eng) @@ -1428,14 +1418,12 @@ print_conf(conf *nanomq_conf) conf_auth_http *auth_http = &(nanomq_conf->auth_http); conf_web_hook *webhook = &(nanomq_conf->web_hook); conf_parquet *parquet = &(nanomq_conf->parquet); - conf_blf *blf = &(nanomq_conf->blf); conf_exchange *exchange = &(nanomq_conf->exchange); print_auth_conf(auth); print_auth_http_conf(auth_http); print_webhook_conf(webhook); print_exchange_conf(exchange); print_parquet_conf(parquet); - print_blf_conf(blf); print_bridge_conf(&nanomq_conf->bridge, ""); #if defined(SUPP_AWS_BRIDGE) print_bridge_conf(&nanomq_conf->aws_bridge, "aws."); @@ -3103,6 +3091,14 @@ conf_bridge_init(conf_bridge *bridge) conf_sqlite_init(&bridge->sqlite); } +static void +conf_nng_proxy_init(conf_nng_bridge *proxy) +{ + proxy->enable = true; + proxy->sub_url = nng_strdup("tcp://localhost:9900"); + proxy->pub_url = nng_strdup("tcp://localhost:9901"); +} + void conf_bridge_node_init(conf_bridge_node *node) { From 1933f32808f562d75575ba7c2c26c83246510859 Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Wed, 29 Apr 2026 16:04:17 +0800 Subject: [PATCH 06/10] * NEW [mqtt_parser] add nng_sub0_msg_adapter API set nano_qos_db as NULL while create new conn_param Signed-off-by: JaylinYu --- include/nng/protocol/mqtt/mqtt_parser.h | 2 ++ include/nng/supplemental/nanolib/conf.h | 1 + src/sp/protocol/mqtt/mqtt_parser.c | 25 +++++++++++++++++++++++++ 3 files changed, 28 insertions(+) diff --git a/include/nng/protocol/mqtt/mqtt_parser.h b/include/nng/protocol/mqtt/mqtt_parser.h index 986677540..36c86fb6e 100644 --- a/include/nng/protocol/mqtt/mqtt_parser.h +++ b/include/nng/protocol/mqtt/mqtt_parser.h @@ -147,4 +147,6 @@ NNG_DECL int nmq_auth_http_sub_pub( conn_param *cparam, bool is_sub, topic_queue *topics, conf_auth_http *conf); NNG_DECL int mqtt_get_remaining_length(uint8_t *, uint32_t, uint32_t *, uint8_t *); +NNG_DECL nng_msg *nng_sub0_msg_adapter(nng_msg *, char *); + #endif // NNG_MQTT_H diff --git a/include/nng/supplemental/nanolib/conf.h b/include/nng/supplemental/nanolib/conf.h index d3110fca7..5fb7d1f3d 100644 --- a/include/nng/supplemental/nanolib/conf.h +++ b/include/nng/supplemental/nanolib/conf.h @@ -605,6 +605,7 @@ struct conf_nng_bridge { char transport; nng_socket pub_sock; nng_socket sub_sock; + conn_param *vclient; }; typedef struct conf_nng_bridge conf_nng_bridge; diff --git a/src/sp/protocol/mqtt/mqtt_parser.c b/src/sp/protocol/mqtt/mqtt_parser.c index 4976f86d1..826b9b73e 100644 --- a/src/sp/protocol/mqtt/mqtt_parser.c +++ b/src/sp/protocol/mqtt/mqtt_parser.c @@ -877,6 +877,7 @@ conn_param_init(conn_param *cparam) cparam->properties = NULL; cparam->will_prop_len = 0; cparam->will_properties = NULL; + cparam->nano_qos_db = NULL; } int @@ -2138,3 +2139,27 @@ mqtt_get_remaining_length(uint8_t *packet, uint32_t len, return MQTT_ERR_INVAL; } + +/** + * @brief convert NNG sub0 msg to standard MQTT V4 msg. + * + * @param origin topic with wildcard + * @param input topic in pub packet + * @return true + * @return false + */ +nng_msg * +nng_sub0_msg_adapter(nng_msg *origin, char *topic) +{ + nng_msg *mqtt_msg = NULL; + const uint8_t *payload = nng_msg_body(origin); + size_t payload_len = nng_msg_len(origin); + + mqtt_msg = nano_encode_publish_msg(MQTT_PROTOCOL_VERSION_v311, 0, + false, false, payload, payload_len, NULL, topic, NULL); + if (mqtt_msg == NULL) { + log_error("Build MQTT msg from NNG sub0 msg failed"); + } + + return mqtt_msg; +} \ No newline at end of file From 62c5d5b4c57b9dafac89ccfa45c2561c6d35990a Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Wed, 29 Apr 2026 19:49:13 +0800 Subject: [PATCH 07/10] * MDF [mqtt_parser] enhance nng_sub0_msg_adapter API reuse old API Signed-off-by: JaylinYu --- src/sp/protocol/mqtt/mqtt_parser.c | 52 +++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/src/sp/protocol/mqtt/mqtt_parser.c b/src/sp/protocol/mqtt/mqtt_parser.c index 826b9b73e..b0dbd34a9 100644 --- a/src/sp/protocol/mqtt/mqtt_parser.c +++ b/src/sp/protocol/mqtt/mqtt_parser.c @@ -2143,23 +2143,59 @@ mqtt_get_remaining_length(uint8_t *packet, uint32_t len, /** * @brief convert NNG sub0 msg to standard MQTT V4 msg. * - * @param origin topic with wildcard - * @param input topic in pub packet - * @return true - * @return false + * @param origin original NNG Sub msg + * @param topic overwrite original topic of NNG + * @return nng_msg */ nng_msg * nng_sub0_msg_adapter(nng_msg *origin, char *topic) { nng_msg *mqtt_msg = NULL; - const uint8_t *payload = nng_msg_body(origin); - size_t payload_len = nng_msg_len(origin); + const uint8_t *body = nng_msg_body(origin); + size_t body_len = nng_msg_len(origin); + + if (body == NULL || body_len == 0) { + log_error("Empty origin message"); + return NULL; + } + + // Find the last '/' to separate topic and payload. + // NNG SUB msg format: "topic/path/payload" -> Topic: "topic/path", Payload: "payload" + const char *ptr = (const char *)body; + char *sep = NULL; + + for (size_t i = 0; i < body_len; i++) { + if (ptr[i] == '/') { + sep = &ptr[i]; + } + } + + // If no separator found, or starts with '/', fallback to using the whole message as payload. + if (sep == NULL || sep == ptr) { + log_warn("No valid topic/payload separator found in NNG sub0 msg."); + mqtt_msg = nano_encode_publish_msg(MQTT_PROTOCOL_VERSION_v311, + 0, false, false, body, body_len, NULL, topic, NULL); + return mqtt_msg; + } + + size_t topic_len = sep - ptr; + char *topic_buf = nng_zalloc(topic_len + 1); + uint8_t *payload_data = (uint8_t *) sep + 1; + size_t payload_len = body_len - (topic_len + 1); + + if (!topic_buf) { + log_error("Failed to allocate memory for topic"); + return NULL; + } + + memcpy(topic_buf, ptr, topic_len); mqtt_msg = nano_encode_publish_msg(MQTT_PROTOCOL_VERSION_v311, 0, - false, false, payload, payload_len, NULL, topic, NULL); + false, false, payload_data, payload_len, NULL, topic_buf, NULL); + + nng_free(topic_buf, topic_len + 1); if (mqtt_msg == NULL) { log_error("Build MQTT msg from NNG sub0 msg failed"); } - return mqtt_msg; } \ No newline at end of file From fca8eff592a50fccb52e074728f68dd214d70b56 Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Thu, 30 Apr 2026 14:18:34 +0800 Subject: [PATCH 08/10] * NEW [conf] add new nng_proxy conf obj Signed-off-by: JaylinYu --- include/nng/supplemental/nanolib/conf.h | 38 ++++++-- include/nng/supplemental/nanolib/nanolib.h | 22 +++-- src/supplemental/nanolib/conf.c | 21 ++++- src/supplemental/nanolib/conf_ver2.c | 102 +++++++++++++++++++++ 4 files changed, 162 insertions(+), 21 deletions(-) diff --git a/include/nng/supplemental/nanolib/conf.h b/include/nng/supplemental/nanolib/conf.h index 5fb7d1f3d..8eeb492b0 100644 --- a/include/nng/supplemental/nanolib/conf.h +++ b/include/nng/supplemental/nanolib/conf.h @@ -597,15 +597,37 @@ struct conf_web_hook { }; typedef struct conf_web_hook conf_web_hook; + +struct conf_nng_pub_node { + char *name; + nng_socket pub_sock; + char *pub_url; + char *clientid; + conn_param *client; // For ACL usage + topics **pub_list; + size_t forwards_count; +}; +typedef struct conf_nng_pub_node conf_nng_pub_node; + +struct conf_nng_sub_node { + char *name; + nng_socket sub_sock; + char *sub_url; + char *clientid; + conn_param *client; // For ACL usage + // NNG topics must be topic/payload, no multiple "/" + topics **sub_list; + size_t inwards_count; +}; +typedef struct conf_nng_sub_node conf_nng_sub_node; + struct conf_nng_bridge { - bool enable; - char *sub_url; - char *pub_url; - char protocol; - char transport; - nng_socket pub_sock; - nng_socket sub_sock; - conn_param *vclient; + bool pub_enable; + bool sub_enable; + conf_nng_pub_node **pnodes; + conf_nng_sub_node **snodes; + size_t pub_count; + size_t sub_count; }; typedef struct conf_nng_bridge conf_nng_bridge; diff --git a/include/nng/supplemental/nanolib/nanolib.h b/include/nng/supplemental/nanolib/nanolib.h index 6664f3695..f411e1bff 100644 --- a/include/nng/supplemental/nanolib/nanolib.h +++ b/include/nng/supplemental/nanolib/nanolib.h @@ -8,20 +8,22 @@ #include #include "nng/supplemental/nanolib/conf.h" -extern void conf_tls_init(conf_tls *tls); -extern void conf_tls_destroy(conf_tls *tls); -extern void conf_tls_parse( +NNG_DECL void conf_tls_init(conf_tls *tls); +NNG_DECL void conf_tls_destroy(conf_tls *tls); +NNG_DECL void conf_tls_parse( conf_tls *tls, const char *path, const char *prefix1, const char *prefix2); -extern void conf_http_server_init(conf_http_server *http, uint16_t port); -extern void conf_http_server_destroy(conf_http_server *http); +NNG_DECL void conf_http_server_init(conf_http_server *http, uint16_t port); +NNG_DECL void conf_http_server_destroy(conf_http_server *http); -extern void conf_session_node_init(conf_session_node *node); -extern void conf_bridge_node_init(conf_bridge_node *node); -extern void conf_bridge_sub_properties_init(conf_bridge_sub_properties *prop); -extern void conf_bridge_conn_properties_init( +NNG_DECL void conf_session_node_init(conf_session_node *node); +NNG_DECL void conf_bridge_node_init(conf_bridge_node *node); +NNG_DECL void conf_bridge_snode_init(conf_nng_sub_node *node); +NNG_DECL void conf_bridge_pnode_init(conf_nng_pub_node *node); +NNG_DECL void conf_bridge_sub_properties_init(conf_bridge_sub_properties *prop); +NNG_DECL void conf_bridge_conn_properties_init( conf_bridge_conn_properties *prop); -extern void conf_bridge_conn_will_properties_init( +NNG_DECL void conf_bridge_conn_will_properties_init( conf_bridge_conn_will_properties *prop); #endif diff --git a/src/supplemental/nanolib/conf.c b/src/supplemental/nanolib/conf.c index d9e14654a..4729e0341 100644 --- a/src/supplemental/nanolib/conf.c +++ b/src/supplemental/nanolib/conf.c @@ -3094,9 +3094,24 @@ conf_bridge_init(conf_bridge *bridge) static void conf_nng_proxy_init(conf_nng_bridge *proxy) { - proxy->enable = true; - proxy->sub_url = nng_strdup("tcp://localhost:9900"); - proxy->pub_url = nng_strdup("tcp://localhost:9901"); + proxy->pub_enable = false; + proxy->sub_enable = false; + proxy->pnodes = NULL; + proxy->snodes = NULL; +} + +void +conf_bridge_pnode_init(conf_nng_pub_node *node) +{ + node->pub_url = NULL; + node->pub_list = NULL; +} + +void +conf_bridge_snode_init(conf_nng_sub_node *node) +{ + node->sub_url = NULL; + node->sub_list = NULL; } void diff --git a/src/supplemental/nanolib/conf_ver2.c b/src/supplemental/nanolib/conf_ver2.c index b26099bce..f08de4afd 100644 --- a/src/supplemental/nanolib/conf_ver2.c +++ b/src/supplemental/nanolib/conf_ver2.c @@ -1189,6 +1189,64 @@ conf_session_node_parse(conf_session_node *node, cJSON *obj) node->sub_count = cvector_size(node->sub_list); } +void +conf_nng_pnode_parse( + conf_nng_pub_node *node, cJSON *obj) +{ + node->name = nng_strdup(obj->string); + hocon_read_str(node, clientid, obj); + hocon_read_str(node, pub_url, obj); + cJSON *forward = NULL; + cJSON *forwards = hocon_get_obj("forwards", obj); + cJSON_ArrayForEach(forward, forwards) + { + topics *s = NNI_ALLOC_STRUCT(s); + s->retain = NO_RETAIN; + s->qos = NO_QOS; + hocon_read_str(s, remote_topic, forward); + hocon_read_str(s, local_topic, forward); + hocon_read_num(s, qos, forward); + cJSON *jso_key2 = cJSON_GetObjectItem(forward, "qos"); + if (cJSON_IsNumber(jso_key2) && + (jso_key2->valuedouble == 0 || jso_key2->valuedouble == 1 || + jso_key2->valuedouble == 2)) { + s->qos = jso_key2->valuedouble; + } else { + if (jso_key2 != NULL) + log_warn("invalid qos level detected in pub list"); + } + } +} + +void +conf_nng_snode_parse( + conf_nng_sub_node *node, cJSON *obj) +{ + node->name = nng_strdup(obj->string); + hocon_read_str(node, clientid, obj); + hocon_read_str(node, sub_url, obj); + cJSON *subscriptions = hocon_get_obj("subscription", obj); + cJSON *subscription = NULL; + cJSON_ArrayForEach(subscription, subscriptions) + { + topics *s = NNI_ALLOC_STRUCT(s); + s->retain = NO_RETAIN; + s->qos = NO_QOS; + hocon_read_str(s, remote_topic, subscription); + hocon_read_str(s, local_topic, subscription); + hocon_read_num(s, qos, subscription); + cJSON *jso_key2 = cJSON_GetObjectItem(subscription, "qos"); + if (cJSON_IsNumber(jso_key2) && + (jso_key2->valuedouble == 0 || jso_key2->valuedouble == 1 || + jso_key2->valuedouble == 2)) { + s->qos = jso_key2->valuedouble; + } else { + if (jso_key2 != NULL) + log_warn("invalid qos level detected in sub list"); + } + } +} + void conf_bridge_node_parse( conf_bridge_node *node, conf_sqlite *bridge_sqlite, cJSON *obj) @@ -1387,6 +1445,48 @@ conf_bridge_parse_ver2(conf *config, cJSON *jso) return; } +static void +conf_nng_proxy_pub_parse_ver2(conf *config, cJSON *jso) +{ + cJSON *node_array = hocon_get_obj("bridges.nng.pub", jso); + cJSON *node_item = NULL; + + config->nng_proxy.pnodes = NULL; + cJSON_ArrayForEach(node_item, node_array) + { + conf_nng_pub_node *node = NNI_ALLOC_STRUCT(node); + conf_bridge_pnode_init(node); + conf_nng_pnode_parse(node, jso); + cvector_push_back(config->nng_proxy.pnodes, node); + config->nng_proxy.pub_enable = true; + } + + config->nng_proxy.pub_count = cvector_size(config->nng_proxy.pnodes); + + return; +} + +static void +conf_nng_proxy_sub_parse_ver2(conf *config, cJSON *jso) +{ + cJSON *node_array = hocon_get_obj("bridges.nng.sub", jso); + cJSON *node_item = NULL; + + config->nng_proxy.snodes = NULL; + cJSON_ArrayForEach(node_item, node_array) + { + conf_nng_sub_node *node = NNI_ALLOC_STRUCT(node); + conf_bridge_snode_init(node); + conf_nng_snode_parse(node, jso); + cvector_push_back(config->nng_proxy.snodes, node); + config->nng_proxy.sub_enable = true; + } + + config->nng_proxy.sub_count = cvector_size(config->nng_proxy.snodes); + + return; +} + #if defined(SUPP_PLUGIN) static void conf_plugin_parse_ver2(conf *config, cJSON *jso) @@ -1962,6 +2062,8 @@ conf_parse_ver2(conf *config) conf_pre_session_parse_ver2(config, jso); conf_authorization_prase_ver2(config, jso); conf_bridge_parse_ver2(config, jso); + conf_nng_proxy_sub_parse_ver2(config, jso); + conf_nng_proxy_pub_parse_ver2(config, jso); #if defined(SUPP_PLUGIN) conf_plugin_parse_ver2(config, jso); #endif From 877632b87fd10e91cc23602c46442a6852e04a61 Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Thu, 30 Apr 2026 14:19:01 +0800 Subject: [PATCH 09/10] * MDF [mqtt_parser] set new rules on nng sub proxy Must seperate Topic & Payload with "/" Signed-off-by: JaylinYu --- src/sp/protocol/mqtt/mqtt_parser.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/sp/protocol/mqtt/mqtt_parser.c b/src/sp/protocol/mqtt/mqtt_parser.c index b0dbd34a9..f143d399a 100644 --- a/src/sp/protocol/mqtt/mqtt_parser.c +++ b/src/sp/protocol/mqtt/mqtt_parser.c @@ -2143,8 +2143,8 @@ mqtt_get_remaining_length(uint8_t *packet, uint32_t len, /** * @brief convert NNG sub0 msg to standard MQTT V4 msg. * - * @param origin original NNG Sub msg - * @param topic overwrite original topic of NNG + * @param origin original NNG sub msg + * @param topic Default topic of NNG sub msg * @return nng_msg */ nng_msg * @@ -2159,14 +2159,15 @@ nng_sub0_msg_adapter(nng_msg *origin, char *topic) return NULL; } - // Find the last '/' to separate topic and payload. - // NNG SUB msg format: "topic/path/payload" -> Topic: "topic/path", Payload: "payload" + // Define '/' to separate topic and payload. + // NNG SUB msg format: "topic/path/payload" -> Topic: "topic", Payload: "path/payload" const char *ptr = (const char *)body; char *sep = NULL; for (size_t i = 0; i < body_len; i++) { if (ptr[i] == '/') { sep = &ptr[i]; + break; } } From bd5a0759f67118c1de000dfdda92292023f2a57e Mon Sep 17 00:00:00 2001 From: JaylinYu Date: Thu, 30 Apr 2026 16:14:36 +0800 Subject: [PATCH 10/10] * MDF [conf_ver2] safety enhancement of config parsing Signed-off-by: JaylinYu --- src/supplemental/nanolib/conf_ver2.c | 40 +++++++++++++++++++++------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/src/supplemental/nanolib/conf_ver2.c b/src/supplemental/nanolib/conf_ver2.c index f08de4afd..9d8c33c09 100644 --- a/src/supplemental/nanolib/conf_ver2.c +++ b/src/supplemental/nanolib/conf_ver2.c @@ -1215,7 +1215,13 @@ conf_nng_pnode_parse( if (jso_key2 != NULL) log_warn("invalid qos level detected in pub list"); } + if (s->remote_topic) + s->remote_topic_len = strlen(s->remote_topic); + if (s->local_topic) + s->local_topic_len = strlen(s->local_topic); + cvector_push_back(node->pub_list, s); } + node->forwards_count = cvector_size(node->pub_list); } void @@ -1244,7 +1250,13 @@ conf_nng_snode_parse( if (jso_key2 != NULL) log_warn("invalid qos level detected in sub list"); } + if (s->remote_topic) + s->remote_topic_len = strlen(s->remote_topic); + if (s->local_topic) + s->local_topic_len = strlen(s->local_topic); + cvector_push_back(node->sub_list, s); } + node->inwards_count = cvector_size(node->sub_list); } void @@ -1317,8 +1329,10 @@ conf_bridge_node_parse( NNI_FREE_STRUCT(s); continue; } - s->remote_topic_len = strlen(s->remote_topic); - s->local_topic_len = strlen(s->local_topic); + if (s->remote_topic) + s->remote_topic_len = strlen(s->remote_topic); + if (s->local_topic) + s->local_topic_len = strlen(s->local_topic); preprocess_topics(s, false); cvector_push_back(node->forwards_list, s); } @@ -1374,8 +1388,10 @@ conf_bridge_node_parse( NNI_FREE_STRUCT(s); continue; } - s->remote_topic_len = strlen(s->remote_topic); - s->local_topic_len = strlen(s->local_topic); + if (s->remote_topic) + s->remote_topic_len = strlen(s->remote_topic); + if (s->local_topic) + s->local_topic_len = strlen(s->local_topic); s->stream_id = 0; hocon_read_num(s, stream_id, subscription); @@ -1456,7 +1472,7 @@ conf_nng_proxy_pub_parse_ver2(conf *config, cJSON *jso) { conf_nng_pub_node *node = NNI_ALLOC_STRUCT(node); conf_bridge_pnode_init(node); - conf_nng_pnode_parse(node, jso); + conf_nng_pnode_parse(node, node_item); cvector_push_back(config->nng_proxy.pnodes, node); config->nng_proxy.pub_enable = true; } @@ -1477,7 +1493,7 @@ conf_nng_proxy_sub_parse_ver2(conf *config, cJSON *jso) { conf_nng_sub_node *node = NNI_ALLOC_STRUCT(node); conf_bridge_snode_init(node); - conf_nng_snode_parse(node, jso); + conf_nng_snode_parse(node, node_item); cvector_push_back(config->nng_proxy.snodes, node); config->nng_proxy.sub_enable = true; } @@ -1629,8 +1645,10 @@ conf_aws_bridge_parse_ver2(conf *config, cJSON *jso) NNI_FREE_STRUCT(s); continue; } - s->remote_topic_len = strlen(s->remote_topic); - s->local_topic_len = strlen(s->local_topic); + if (s->remote_topic) + s->remote_topic_len = strlen(s->remote_topic); + if (s->local_topic) + s->local_topic_len = strlen(s->local_topic); for (int i = 0; i < (int) s->remote_topic_len; ++i) if (s->remote_topic[i] == '+' || @@ -1664,8 +1682,10 @@ conf_aws_bridge_parse_ver2(conf *config, cJSON *jso) NNI_FREE_STRUCT(s); continue; } - s->remote_topic_len = strlen(s->remote_topic); - s->local_topic_len = strlen(s->local_topic); + if (s->remote_topic) + s->remote_topic_len = strlen(s->remote_topic); + if (s->local_topic) + s->local_topic_len = strlen(s->local_topic); s->stream_id = 0; hocon_read_num(s, stream_id, subscription);