diff --git a/src/mqtt/protocol/mqtt/mqtt_quic_client.c b/src/mqtt/protocol/mqtt/mqtt_quic_client.c index 5e605c350..3c722bbff 100644 --- a/src/mqtt/protocol/mqtt/mqtt_quic_client.c +++ b/src/mqtt/protocol/mqtt/mqtt_quic_client.c @@ -90,6 +90,10 @@ struct mqtt_sock_s { nni_duration retry; nni_duration keepalive; // mqtt keepalive nni_duration timeleft; // left time to send next ping + uint16_t batchsz; // resend qos in batchs + uint16_t batchcnt; + nni_duration batchtmo; // interval of batch sending + uint16_t lastpid; mqtt_quic_ctx master; // to which we delegate send/recv calls nni_list recv_queue; // ctx pending to receive @@ -1052,6 +1056,9 @@ mqtt_timer_cb(void *arg) { mqtt_sock_t *s = arg; mqtt_pipe_t *p; + nni_msg *msg = NULL; + uint16_t pid; + uint16_t ptype; if (nng_aio_result(&s->time_aio) != 0) { log_warn("sleep aio finish error!"); @@ -1067,11 +1074,46 @@ mqtt_timer_cb(void *arg) nni_mtx_unlock(&s->mtx); return; } - // Ping would be send at transport layer + nni_mtx_unlock(&s->mtx); + nni_mtx_lock(&p->lk); + // If batchcnt > 0. Batch sending was started. So handle batch first. + if (s->batchcnt > 0 && s->batchcnt < s->batchsz) { + pid = ++ s->lastpid; + msg = nni_id_get_min(&p->sent_unack, &pid); + s->batchcnt ++; + } + if (msg != NULL) { + nni_msg_clone(msg); + s->lastpid = pid; + log_debug("NO.%d Batch sending id%d msg%p", s->batchcnt-1, pid, msg); + ptype = nni_mqtt_msg_get_packet_type(msg); + if (ptype == NNG_MQTT_PUBLISH) + nni_mqtt_msg_set_publish_dup(msg, true); + if (!p->busy) { + p->busy = true; + nni_aio_set_msg(&p->send_aio, msg); + nni_pipe_send(p->qpipe, &p->send_aio); + } else { + nni_lmq_put(&p->send_messages, msg); + } + //} else { + // log_warn("Batch sending id%d missing", pid); + } + if (s->batchcnt > 0) { + s->batchcnt %= s->batchsz; + nni_mtx_unlock(&p->lk); + if (s->batchcnt == 0) + nni_sleep_aio(s->retry, &s->time_aio); + else + nni_sleep_aio(s->batchtmo, &s->time_aio); + return; + } + + // Ping would be send at transport layer if (p->pingcnt > 1) { log_warn("MQTT Timeout and disconnect"); - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); nni_pipe_close(p->qpipe); return; } @@ -1088,7 +1130,7 @@ mqtt_timer_cb(void *arg) nni_aio_set_msg(&p->send_aio, p->pingmsg); nni_pipe_send(p->qpipe, &p->send_aio); p->pingcnt ++; - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); log_info("Send pingreq (sock%p)(%dms)", s, s->keepalive); nni_sleep_aio(s->retry, &s->time_aio); return; @@ -1096,31 +1138,32 @@ mqtt_timer_cb(void *arg) // start message resending // uint16_t pid = p->rid; - // msg = nni_id_get_min(&p->sent_unack, &pid); - // if (msg != NULL) { - // uint16_t ptype; - // ptype = nni_mqtt_msg_get_packet_type(msg); - // if (ptype == NNG_MQTT_PUBLISH) { - // nni_mqtt_msg_set_publish_dup(msg, true); - // } - // if (!p->busy) { - // p->busy = true; - // nni_msg_clone(msg); - // aio = nni_mqtt_msg_get_aio(msg); - // if (aio) { - // nni_aio_bump_count(aio, - // nni_msg_header_len(msg) + - // nni_msg_len(msg)); - // nni_aio_set_msg(aio, NULL); - // } - // nni_aio_set_msg(&p->send_aio, msg); - // quic_pipe_send(p->qpipe, &p->send_aio); - - // nni_mtx_unlock(&s->mtx); - // nni_sleep_aio(s->retry * NNI_SECOND, &s->time_aio); - // return; - // } - // } + msg = nni_id_get_min(&p->sent_unack, &pid); + if (msg != NULL) { + ptype = nni_mqtt_msg_get_packet_type(msg); + s->lastpid = pid; + s->batchcnt ++; + log_debug("Batch sending started id%d msg%p", pid, msg); + if (ptype == NNG_MQTT_PUBLISH) + nni_mqtt_msg_set_publish_dup(msg, true); + if (!p->busy) { + p->busy = true; + nni_msg_clone(msg); + //aio = nni_mqtt_msg_get_aio(msg); + //if (aio) { + // nni_aio_bump_count(aio, + // nni_msg_header_len(msg) + + // nni_msg_len(msg)); + // nni_aio_set_msg(aio, NULL); + //} + nni_aio_set_msg(&p->send_aio, msg); + nni_pipe_send(p->qpipe, &p->send_aio); + + nni_mtx_unlock(&p->lk); + nni_sleep_aio(s->batchtmo, &s->time_aio); + return; + } + } #if defined(NNG_SUPP_SQLITE) if (!p->busy) { nni_msg *msg = NULL; @@ -1135,14 +1178,14 @@ mqtt_timer_cb(void *arg) nni_aio_set_msg(&p->send_aio, msg); nni_pipe_send(p->qpipe, &p->send_aio); - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); nni_sleep_aio(s->retry, &s->time_aio); return; } } } #endif - nni_mtx_unlock(&s->mtx); + nni_mtx_unlock(&p->lk); nni_sleep_aio(s->retry, &s->time_aio); return; } @@ -1184,6 +1227,9 @@ static void mqtt_quic_sock_init(void *arg, nni_sock *sock) s->retry = MQTT_QUIC_RETRTY * NNI_SECOND; s->keepalive = NNI_SECOND * 10; // default mqtt keepalive s->timeleft = NNI_SECOND * 10; + s->batchcnt = 0; + s->batchsz = 8; + s->batchtmo = 10; // Interval of batch sending (ms) nni_lmq_init(&s->send_messages, NNG_MAX_SEND_LMQ); nni_aio_list_init(&s->send_queue);