Skip to content
Open
2 changes: 2 additions & 0 deletions include/nng/protocol/mqtt/mqtt_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,6 @@ NNG_DECL int nmq_auth_http_sub_pub(
conn_param *cparam, bool is_sub, topic_queue *topics, conf_auth_http *conf);

NNG_DECL int mqtt_get_remaining_length(uint8_t *, uint32_t, uint32_t *, uint8_t *);
NNG_DECL nng_msg *nng_sub0_msg_adapter(nng_msg *, char *);

#endif // NNG_MQTT_H
38 changes: 36 additions & 2 deletions include/nng/supplemental/nanolib/conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,40 @@ struct conf_web_hook {

typedef struct conf_web_hook conf_web_hook;

struct conf_nng_pub_node {
char *name;
nng_socket pub_sock;
char *pub_url;
char *clientid;
conn_param *client; // For ACL usage
topics **pub_list;
size_t forwards_count;
};
typedef struct conf_nng_pub_node conf_nng_pub_node;

struct conf_nng_sub_node {
char *name;
nng_socket sub_sock;
char *sub_url;
char *clientid;
conn_param *client; // For ACL usage
// NNG topics must be topic/payload, no multiple "/"
topics **sub_list;
size_t inwards_count;
};
typedef struct conf_nng_sub_node conf_nng_sub_node;

struct conf_nng_bridge {
bool pub_enable;
bool sub_enable;
conf_nng_pub_node **pnodes;
conf_nng_sub_node **snodes;
size_t pub_count;
size_t sub_count;
};

typedef struct conf_nng_bridge conf_nng_bridge;

typedef enum {
memory,
sqlite,
Expand All @@ -616,7 +650,7 @@ struct conf {
int num_taskq_thread;
int max_taskq_thread;
uint32_t parallel; // broker ctx
uint64_t total_ctx; // Total ctx of work (bridge + AWS + broker + HTTP)
uint64_t total_ctx; // aio number of any ctx will send msg to bridge. (bridge + AWS + broker + HTTP + nng_sub)
uint64_t max_packet_size; // byte
uint32_t client_max_packet_size; // byte
uint32_t max_inflight_window;
Expand All @@ -641,7 +675,7 @@ struct conf {
conf_bridge aws_bridge; // AWS IoT Core
conf_exchange exchange;
conf_parquet parquet;
conf_blf blf;
conf_nng_bridge nng_proxy;
#if defined(SUPP_PLUGIN)
conf_plugin plugin;
#endif
Expand Down
22 changes: 12 additions & 10 deletions include/nng/supplemental/nanolib/nanolib.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@
#include <stdlib.h>
#include "nng/supplemental/nanolib/conf.h"

extern void conf_tls_init(conf_tls *tls);
extern void conf_tls_destroy(conf_tls *tls);
extern void conf_tls_parse(
NNG_DECL void conf_tls_init(conf_tls *tls);
NNG_DECL void conf_tls_destroy(conf_tls *tls);
NNG_DECL void conf_tls_parse(
conf_tls *tls, const char *path, const char *prefix1, const char *prefix2);

extern void conf_http_server_init(conf_http_server *http, uint16_t port);
extern void conf_http_server_destroy(conf_http_server *http);
NNG_DECL void conf_http_server_init(conf_http_server *http, uint16_t port);
NNG_DECL void conf_http_server_destroy(conf_http_server *http);

extern void conf_session_node_init(conf_session_node *node);
extern void conf_bridge_node_init(conf_bridge_node *node);
extern void conf_bridge_sub_properties_init(conf_bridge_sub_properties *prop);
extern void conf_bridge_conn_properties_init(
NNG_DECL void conf_session_node_init(conf_session_node *node);
NNG_DECL void conf_bridge_node_init(conf_bridge_node *node);
NNG_DECL void conf_bridge_snode_init(conf_nng_sub_node *node);
NNG_DECL void conf_bridge_pnode_init(conf_nng_pub_node *node);
NNG_DECL void conf_bridge_sub_properties_init(conf_bridge_sub_properties *prop);
NNG_DECL void conf_bridge_conn_properties_init(
conf_bridge_conn_properties *prop);
extern void conf_bridge_conn_will_properties_init(
NNG_DECL void conf_bridge_conn_will_properties_init(
conf_bridge_conn_will_properties *prop);

#endif
62 changes: 62 additions & 0 deletions src/sp/protocol/mqtt/mqtt_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,7 @@ conn_param_init(conn_param *cparam)
cparam->properties = NULL;
cparam->will_prop_len = 0;
cparam->will_properties = NULL;
cparam->nano_qos_db = NULL;
}

int
Expand Down Expand Up @@ -2138,3 +2139,64 @@ mqtt_get_remaining_length(uint8_t *packet, uint32_t len,

return MQTT_ERR_INVAL;
}

/**
* @brief convert NNG sub0 msg to standard MQTT V4 msg.
*
* @param origin original NNG sub msg
* @param topic Default topic of NNG sub msg
* @return nng_msg
*/
nng_msg *
nng_sub0_msg_adapter(nng_msg *origin, char *topic)
{
nng_msg *mqtt_msg = NULL;
const uint8_t *body = nng_msg_body(origin);
size_t body_len = nng_msg_len(origin);

if (body == NULL || body_len == 0) {
log_error("Empty origin message");
return NULL;
}

// Define '/' to separate topic and payload.
// NNG SUB msg format: "topic/path/payload" -> Topic: "topic", Payload: "path/payload"
const char *ptr = (const char *)body;
char *sep = NULL;

for (size_t i = 0; i < body_len; i++) {
if (ptr[i] == '/') {
sep = &ptr[i];
break;
}
}

// If no separator found, or starts with '/', fallback to using the whole message as payload.
if (sep == NULL || sep == ptr) {
log_warn("No valid topic/payload separator found in NNG sub0 msg.");
mqtt_msg = nano_encode_publish_msg(MQTT_PROTOCOL_VERSION_v311,
0, false, false, body, body_len, NULL, topic, NULL);
return mqtt_msg;
Comment on lines +2154 to +2179
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate adapter inputs before dereference/fallback publish.

origin is dereferenced before any null check, and fallback publish can use topic without validating it. Both can trigger crashes on bad inputs.

Suggested fix
 nng_msg *
 nng_sub0_msg_adapter(nng_msg *origin, char *topic)
 {
 	nng_msg *mqtt_msg = NULL;
+	if (origin == NULL) {
+		log_error("origin message is NULL");
+		return NULL;
+	}
 	const uint8_t *body     = nng_msg_body(origin);
 	size_t         body_len = nng_msg_len(origin);
@@
 	if (sep == NULL || sep == ptr) {
+		if (topic == NULL || topic[0] == '\0') {
+			log_error("Default topic is required when separator is missing");
+			return NULL;
+		}
 		log_warn("No valid topic/payload separator found in NNG sub0 msg.");
 		mqtt_msg = nano_encode_publish_msg(MQTT_PROTOCOL_VERSION_v311,
 		    0, false, false, body, body_len, NULL, topic, NULL);
 		return mqtt_msg;
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sp/protocol/mqtt/mqtt_parser.c` around lines 2154 - 2179, Check and
validate inputs before dereferencing: ensure origin is non-NULL before calling
nng_msg_body/nng_msg_len in the mqtt_parser.c function, and ensure topic is
non-NULL/valid before passing it into nano_encode_publish_msg for the fallback
branch; if origin is NULL return NULL (or an appropriate error) early, and if
topic is NULL use a safe default (or pass NULL explicitly only if
nano_encode_publish_msg supports it) or abort the fallback publish, updating the
early-return path that logs "No valid topic/payload separator..." to perform
these checks and avoid dereferencing invalid pointers.

}

size_t topic_len = sep - ptr;
char *topic_buf = nng_zalloc(topic_len + 1);
uint8_t *payload_data = (uint8_t *) sep + 1;
size_t payload_len = body_len - (topic_len + 1);

if (!topic_buf) {
log_error("Failed to allocate memory for topic");
return NULL;
}

memcpy(topic_buf, ptr, topic_len);

mqtt_msg = nano_encode_publish_msg(MQTT_PROTOCOL_VERSION_v311, 0,
false, false, payload_data, payload_len, NULL, topic_buf, NULL);

nng_free(topic_buf, topic_len + 1);
if (mqtt_msg == NULL) {
log_error("Build MQTT msg from NNG sub0 msg failed");
}
return mqtt_msg;
}
7 changes: 6 additions & 1 deletion src/sp/protocol/mqtt/nmq_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,12 @@ nano_pipe_timer_cb(void *arg)
return;
}
nni_mtx_lock(&p->lk);

if (nni_atomic_get_bool(&p->closed)) {
// stop sending msg to closed pipe
nni_mtx_unlock(&p->lk);
log_warn("pipe is already closed");
return;
}
// According to the MQTT protocol, when keepalive is 0, the server should not check the keepalive timeout.
if (p->keepalive != 0) {
qos_backoff = p->ka_refresh * (qos_duration) * 1000 -
Expand Down
15 changes: 8 additions & 7 deletions src/sp/transport/mqtt/broker_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,6 @@ tcptran_pipe_close(void *arg)
nni_free(p->npipe->subinfol, sizeof(nni_list));
p->npipe->subinfol = NULL;
}
void *nano_qos_db = p->npipe->nano_qos_db;
if (!p->conf->sqlite.enable && nano_qos_db != NULL) {
nni_qos_db_remove_all_msg(
false, nano_qos_db, tran_close_unack_msg_cb);
nni_qos_db_fini_id_hash(nano_qos_db);
p->npipe->nano_qos_db = NULL;
}
nni_lmq_flush(&p->rslmq);
nni_mtx_unlock(&p->mtx);

Expand Down Expand Up @@ -228,6 +221,14 @@ tcptran_pipe_fini(void *arg)
nni_mtx_unlock(&ep->mtx);
}
nni_mtx_lock(&p->mtx);
void *nano_qos_db = p->npipe->nano_qos_db;
if (p->npipe != NULL && p->conf != NULL &&
!p->conf->sqlite.enable && nano_qos_db != NULL) {
Comment on lines +224 to +226
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Guard p->npipe before accessing nano_qos_db.

Line 224 dereferences p->npipe before the null check. In teardown/error paths this can crash.

Suggested fix
-	void *nano_qos_db = p->npipe->nano_qos_db;
-	if (p->npipe != NULL && p->conf != NULL &&
+	void *nano_qos_db = NULL;
+	if (p->npipe != NULL) {
+		nano_qos_db = p->npipe->nano_qos_db;
+	}
+	if (p->npipe != NULL && p->conf != NULL &&
 	    !p->conf->sqlite.enable && nano_qos_db != NULL) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void *nano_qos_db = p->npipe->nano_qos_db;
if (p->npipe != NULL && p->conf != NULL &&
!p->conf->sqlite.enable && nano_qos_db != NULL) {
void *nano_qos_db = NULL;
if (p->npipe != NULL) {
nano_qos_db = p->npipe->nano_qos_db;
}
if (p->npipe != NULL && p->conf != NULL &&
!p->conf->sqlite.enable && nano_qos_db != NULL) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sp/transport/mqtt/broker_tcp.c` around lines 224 - 226, The code reads
p->npipe->nano_qos_db into nano_qos_db before checking p->npipe for NULL, which
can crash; change the logic to first ensure p->npipe is not NULL (and p->conf is
not NULL) then read p->npipe->nano_qos_db (or simply move the nano_qos_db
assignment below the if that checks p->npipe and p->conf), and ensure the
conditional uses the existing symbols p->npipe, p->conf, and nano_qos_db so that
nano_qos_db is only dereferenced when p->npipe is valid.

nni_qos_db_remove_all_msg(
false, nano_qos_db, tran_close_unack_msg_cb);
nni_qos_db_fini_id_hash(nano_qos_db);
p->npipe->nano_qos_db = NULL;
}
if (p->tcp_cparam) {
conn_param_free(p->tcp_cparam); //reap thread working
p->tcp_cparam = NULL;
Expand Down
15 changes: 8 additions & 7 deletions src/sp/transport/mqtts/broker_tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,6 @@ tlstran_pipe_close(void *arg)
nni_free(p->npipe->subinfol, sizeof(nni_list));
p->npipe->subinfol = NULL;
}
void *nano_qos_db = p->npipe->nano_qos_db;
if (!p->conf->sqlite.enable && nano_qos_db != NULL) {
nni_qos_db_remove_all_msg(
false, nano_qos_db, tran_close_unack_msg_cb);
nni_qos_db_fini_id_hash(nano_qos_db);
p->npipe->nano_qos_db = NULL;
}
nni_lmq_flush(&p->rslmq);
nni_mtx_unlock(&p->mtx);

Expand Down Expand Up @@ -235,6 +228,14 @@ tlstran_pipe_fini(void *arg)
nni_mtx_unlock(&ep->mtx);
}
nni_mtx_lock(&p->mtx);
void *nano_qos_db = p->npipe->nano_qos_db;
if (p->npipe != NULL && p->conf != NULL &&
!p->conf->sqlite.enable && nano_qos_db != NULL) {
Comment on lines +231 to +233
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Fix null-deref risk in QoS DB teardown.

Line 231 reads p->npipe->nano_qos_db before confirming p->npipe is non-null.

Suggested fix
-	void *nano_qos_db = p->npipe->nano_qos_db;
-	if (p->npipe != NULL && p->conf != NULL &&
+	void *nano_qos_db = NULL;
+	if (p->npipe != NULL) {
+		nano_qos_db = p->npipe->nano_qos_db;
+	}
+	if (p->npipe != NULL && p->conf != NULL &&
 		!p->conf->sqlite.enable && nano_qos_db != NULL) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void *nano_qos_db = p->npipe->nano_qos_db;
if (p->npipe != NULL && p->conf != NULL &&
!p->conf->sqlite.enable && nano_qos_db != NULL) {
void *nano_qos_db = NULL;
if (p->npipe != NULL) {
nano_qos_db = p->npipe->nano_qos_db;
}
if (p->npipe != NULL && p->conf != NULL &&
!p->conf->sqlite.enable && nano_qos_db != NULL) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sp/transport/mqtts/broker_tls.c` around lines 231 - 233, The code reads
p->npipe->nano_qos_db into nano_qos_db before checking p->npipe for NULL,
risking a null-deref; modify the teardown in broker_tls.c so you only access
p->npipe->nano_qos_db after confirming p and p->npipe are non-NULL (e.g., move
the assignment of nano_qos_db inside the if that checks p->npipe != NULL and
p->conf != NULL), and ensure subsequent logic that uses nano_qos_db still
handles a NULL nano_qos_db safely.

nni_qos_db_remove_all_msg(
false, nano_qos_db, tran_close_unack_msg_cb);
nni_qos_db_fini_id_hash(nano_qos_db);
p->npipe->nano_qos_db = NULL;
}
if (p->tcp_cparam) {
conn_param_free(p->tcp_cparam);
p->tcp_cparam = NULL;
Expand Down
20 changes: 12 additions & 8 deletions src/sp/transport/mqttws/nmq_websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,8 @@ wstran_pipe_recv_cb(void *arg)
nni_aio_finish_error(p->ep_aio, rv);
} else if (uaio != NULL) {
nni_aio_set_msg(uaio, NULL);
nni_aio_finish_error(uaio, p->err_code == MQTT_SUCCESS ? rv : p->err_code);
nni_aio_finish_error(uaio,
p->err_code == MQTT_SUCCESS ? rv : (int) p->err_code);
} else {
// Let protocol layer do the close first.
nng_stream_close(p->ws);
Expand Down Expand Up @@ -1333,6 +1334,15 @@ wstran_pipe_fini(void *arg)
nni_msg_free(p->tmp_msg);
nni_lmq_flush(&p->recvlmq);
nng_free(p->qos_buf, 16 + NNI_NANO_MAX_PACKET_SIZE);
// must free nano_qos_db in fini for safety
void *nano_qos_db = p->npipe->nano_qos_db;
if (p->npipe != NULL && p->conf != NULL &&
!p->conf->sqlite.enable && nano_qos_db != NULL) {
Comment on lines +1338 to +1340
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Guard p->npipe before reading nano_qos_db in fini.

Line 1338 dereferences p->npipe before the null check, which can crash in partial-init teardown paths.

Suggested fix
-	void *nano_qos_db = p->npipe->nano_qos_db;
-	if (p->npipe != NULL && p->conf != NULL &&
+	void *nano_qos_db = NULL;
+	if (p->npipe != NULL) {
+		nano_qos_db = p->npipe->nano_qos_db;
+	}
+	if (p->npipe != NULL && p->conf != NULL &&
 		!p->conf->sqlite.enable && nano_qos_db != NULL) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/sp/transport/mqttws/nmq_websocket.c` around lines 1338 - 1340, The code
reads p->npipe->nano_qos_db into nano_qos_db before checking p->npipe for NULL,
which can dereference a NULL pointer during teardown; fix by guarding p->npipe
first (e.g., check p != NULL && p->npipe != NULL) before accessing
p->npipe->nano_qos_db in the fini/cleanup path, or move the nano_qos_db
assignment to after the existing p->npipe and p->conf null checks; update the
relevant block that references p, p->npipe, p->conf and nano_qos_db so that
nano_qos_db is only read when p->npipe is confirmed non-NULL.

nni_qos_db_remove_all_msg(
false, nano_qos_db, tran_close_unack_msg_cb);
nni_qos_db_fini_id_hash(nano_qos_db);
p->npipe->nano_qos_db = NULL;
}
nni_mtx_unlock(&p->mtx);

nng_stream_free(p->ws);
Expand Down Expand Up @@ -1375,13 +1385,7 @@ wstran_pipe_close(void *arg)
nni_free(p->npipe->subinfol, sizeof(nni_list));
p->npipe->subinfol = NULL;
}
void *nano_qos_db = p->npipe->nano_qos_db;
if (!p->conf->sqlite.enable && nano_qos_db != NULL) {
nni_qos_db_remove_all_msg(
false, nano_qos_db, tran_close_unack_msg_cb);
nni_qos_db_fini_id_hash(nano_qos_db);
p->npipe->nano_qos_db = NULL;
}

nni_lmq_flush(&p->rslmq);
nni_mtx_unlock(&p->mtx);
nng_stream_close(p->ws);
Expand Down
1 change: 0 additions & 1 deletion src/supplemental/nanolib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,4 @@ endif (SUPP_RULE_ENGINE)
add_subdirectory(linkedlist)
add_subdirectory(ringbuffer)
add_subdirectory(parquet)
add_subdirectory(blf)

6 changes: 0 additions & 6 deletions src/supplemental/nanolib/blf/CMakeLists.txt

This file was deleted.

Loading
Loading