From ded7db00814ac98a07fb7d5c661531d501031d3a Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 3 Jul 2024 03:32:42 -0400 Subject: [PATCH 1/4] * NEW [proto/quic] Batch sending is also supported on quic protocol layer. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_quic_client.c | 94 ++++++++++++++++------- 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_quic_client.c b/src/mqtt/protocol/mqtt/mqtt_quic_client.c index 5e605c350..79fff4dc5 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic_client.c @@ -90,6 +90,10 @@ struct mqtt_sock_s { nni_duration retry; nni_duration keepalive; // mqtt keepalive nni_duration timeleft; // left time to send next ping + uint16_t batchsz; // resend qos in batchs + uint16_t batchcnt; + nni_duration batchtmo; // interval of batch sending + uint16_t lastpid; mqtt_quic_ctx master; // to which we delegate send/recv calls nni_list recv_queue; // ctx pending to receive @@ -1052,6 +1056,9 @@ mqtt_timer_cb(void *arg) { mqtt_sock_t *s = arg; mqtt_pipe_t *p; + nni_msg *msg = NULL; + uint16_t pid; + uint16_t ptype; if (nng_aio_result(&s->time_aio) != 0) { log_warn("sleep aio finish error!"); @@ -1067,8 +1074,39 @@ mqtt_timer_cb(void *arg) nni_mtx_unlock(&s->mtx); return; } - // Ping would be send at transport layer + // If batchcnt > 0. Batch sending was started. So handle batch first. + if (s->batchcnt > 0 && s->batchcnt < s->batchsz) { + pid = s->lastpid + 1; + msg = nni_id_get_min(&p->sent_unack, &pid); + s->batchcnt ++; + } + if (msg != NULL) { + nni_msg_clone(msg); + s->lastpid = pid; + log_info("NO.%d Batch sending id%d msg%p", s->batchcnt-1, pid, msg); + ptype = nni_mqtt_msg_get_packet_type(msg); + if (ptype == NNG_MQTT_PUBLISH) + nni_mqtt_msg_set_publish_dup(msg, true); + if (!p->busy) { + p->busy = true; + nni_aio_set_msg(&p->send_aio, msg); + nni_pipe_send(p->qpipe, &p->send_aio); + } else { + nni_lmq_put(&p->send_messages, msg); + } + } + if (s->batchcnt > 0) { + s->batchcnt %= s->batchsz; + nni_mtx_unlock(&s->mtx); + if (s->batchcnt == 0) + nni_sleep_aio(s->retry, &s->time_aio); + else + nni_sleep_aio(s->batchtmo, &s->time_aio); + return; + } + + // Ping would be send at transport layer if (p->pingcnt > 1) { log_warn("MQTT Timeout and disconnect"); nni_mtx_unlock(&s->mtx); @@ -1096,31 +1134,32 @@ mqtt_timer_cb(void *arg) // start message resending // uint16_t pid = p->rid; - // msg = nni_id_get_min(&p->sent_unack, &pid); - // if (msg != NULL) { - // uint16_t ptype; - // ptype = nni_mqtt_msg_get_packet_type(msg); - // if (ptype == NNG_MQTT_PUBLISH) { - // nni_mqtt_msg_set_publish_dup(msg, true); - // } - // if (!p->busy) { - // p->busy = true; - // nni_msg_clone(msg); - // aio = nni_mqtt_msg_get_aio(msg); - // if (aio) { - // nni_aio_bump_count(aio, - // nni_msg_header_len(msg) + - // nni_msg_len(msg)); - // nni_aio_set_msg(aio, NULL); - // } - // nni_aio_set_msg(&p->send_aio, msg); - // quic_pipe_send(p->qpipe, &p->send_aio); - - // nni_mtx_unlock(&s->mtx); - // nni_sleep_aio(s->retry * NNI_SECOND, &s->time_aio); - // return; - // } - // } + msg = nni_id_get_min(&p->sent_unack, &pid); + if (msg != NULL) { + ptype = nni_mqtt_msg_get_packet_type(msg); + s->lastpid = pid; + s->batchcnt ++; + log_info("Batch sending started id%d msg%p", pid, msg); + if (ptype == NNG_MQTT_PUBLISH) + nni_mqtt_msg_set_publish_dup(msg, true); + if (!p->busy) { + p->busy = true; + nni_msg_clone(msg); + //aio = nni_mqtt_msg_get_aio(msg); + //if (aio) { + // nni_aio_bump_count(aio, + // nni_msg_header_len(msg) + + // nni_msg_len(msg)); + // nni_aio_set_msg(aio, NULL); + //} + nni_aio_set_msg(&p->send_aio, msg); + nni_pipe_send(p->qpipe, &p->send_aio); + + nni_mtx_unlock(&s->mtx); + nni_sleep_aio(s->batchtmo, &s->time_aio); + return; + } + } #if defined(NNG_SUPP_SQLITE) if (!p->busy) { nni_msg *msg = NULL; @@ -1184,6 +1223,9 @@ static void mqtt_quic_sock_init(void *arg, nni_sock *sock) s->retry = MQTT_QUIC_RETRTY * NNI_SECOND; s->keepalive = NNI_SECOND * 10; // default mqtt keepalive s->timeleft = NNI_SECOND * 10; + s->batchcnt = 0; + s->batchsz = 8; + s->batchtmo = 10; // Interval of batch sending (ms) nni_lmq_init(&s->send_messages, NNG_MAX_SEND_LMQ); nni_aio_list_init(&s->send_queue); From 1e856d7ea14e6a0f6d60e58597cbd8b0eebdefb7 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 3 Jul 2024 23:29:05 -0400 Subject: [PATCH 2/4] * NEW [proto/quic] lastpid will also increment when qosmsg not found in send_unack. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_quic_client.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_quic_client.c b/src/mqtt/protocol/mqtt/mqtt_quic_client.c index 79fff4dc5..719aa6f4b 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic_client.c @@ -1075,9 +1075,9 @@ mqtt_timer_cb(void *arg) return; } - // If batchcnt > 0. Batch sending was started. So handle batch first. + // If batchcnt > 0. Batch sending was started. So handle batch first. if (s->batchcnt > 0 && s->batchcnt < s->batchsz) { - pid = s->lastpid + 1; + pid = ++ s->lastpid; msg = nni_id_get_min(&p->sent_unack, &pid); s->batchcnt ++; } @@ -1095,6 +1095,8 @@ mqtt_timer_cb(void *arg) } else { nni_lmq_put(&p->send_messages, msg); } + } else { + log_warn("Batch sending id%d missing", pid); } if (s->batchcnt > 0) { s->batchcnt %= s->batchsz; From 0bf8bc2f558367462f74e3ba866d8b921444392b Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 3 Jul 2024 23:40:57 -0400 Subject: [PATCH 3/4] * FIX [proto/quic] Replace s->mtx with p->lk in mqtt_quic_timer_cb. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_quic_client.c | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_quic_client.c b/src/mqtt/protocol/mqtt/mqtt_quic_client.c index 719aa6f4b..8a1f3b7bb 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic_client.c @@ -1074,7 +1074,9 @@ mqtt_timer_cb(void *arg) nni_mtx_unlock(&s->mtx); return; } + nni_mtx_unlock(&s->mtx); + nni_mtx_lock(&p->lk); // If batchcnt > 0. Batch sending was started. So handle batch first. if (s->batchcnt > 0 && s->batchcnt < s->batchsz) { pid = ++ s->lastpid; @@ -1095,12 +1097,12 @@ mqtt_timer_cb(void *arg) } else { nni_lmq_put(&p->send_messages, msg); } - } else { - log_warn("Batch sending id%d missing", pid); + //} else { + // log_warn("Batch sending id%d missing", pid); } if (s->batchcnt > 0) { s->batchcnt %= s->batchsz; - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); if (s->batchcnt == 0) nni_sleep_aio(s->retry, &s->time_aio); else @@ -1111,7 +1113,7 @@ mqtt_timer_cb(void *arg) // Ping would be send at transport layer if (p->pingcnt > 1) { log_warn("MQTT Timeout and disconnect"); - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); nni_pipe_close(p->qpipe); return; } @@ -1128,7 +1130,7 @@ mqtt_timer_cb(void *arg) nni_aio_set_msg(&p->send_aio, p->pingmsg); nni_pipe_send(p->qpipe, &p->send_aio); p->pingcnt ++; - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); log_info("Send pingreq (sock%p)(%dms)", s, s->keepalive); nni_sleep_aio(s->retry, &s->time_aio); return; @@ -1157,7 +1159,7 @@ mqtt_timer_cb(void *arg) nni_aio_set_msg(&p->send_aio, msg); nni_pipe_send(p->qpipe, &p->send_aio); - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); nni_sleep_aio(s->batchtmo, &s->time_aio); return; } @@ -1176,14 +1178,14 @@ mqtt_timer_cb(void *arg) nni_aio_set_msg(&p->send_aio, msg); nni_pipe_send(p->qpipe, &p->send_aio); - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); nni_sleep_aio(s->retry, &s->time_aio); return; } } } #endif - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); nni_sleep_aio(s->retry, &s->time_aio); return; } From 0e7b0d0dadfe8abed9a29ace4da0b60ddd8cbd3e Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 4 Jul 2024 04:14:43 -0400 Subject: [PATCH 4/4] * FIX [mqtt/proto] Update the level of logs. Signed-off-by: wanghaemq --- src/mqtt/protocol/mqtt/mqtt_quic_client.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mqtt/protocol/mqtt/mqtt_quic_client.c b/src/mqtt/protocol/mqtt/mqtt_quic_client.c index 8a1f3b7bb..3c722bbff 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic_client.c @@ -1086,7 +1086,7 @@ mqtt_timer_cb(void *arg) if (msg != NULL) { nni_msg_clone(msg); s->lastpid = pid; - log_info("NO.%d Batch sending id%d msg%p", s->batchcnt-1, pid, msg); + log_debug("NO.%d Batch sending id%d msg%p", s->batchcnt-1, pid, msg); ptype = nni_mqtt_msg_get_packet_type(msg); if (ptype == NNG_MQTT_PUBLISH) nni_mqtt_msg_set_publish_dup(msg, true); @@ -1143,7 +1143,7 @@ mqtt_timer_cb(void *arg) ptype = nni_mqtt_msg_get_packet_type(msg); s->lastpid = pid; s->batchcnt ++; - log_info("Batch sending started id%d msg%p", pid, msg); + log_debug("Batch sending started id%d msg%p", pid, msg); if (ptype == NNG_MQTT_PUBLISH) nni_mqtt_msg_set_publish_dup(msg, true); if (!p->busy) {