Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 76 additions & 30 deletions src/mqtt/protocol/mqtt/mqtt_quic_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ struct mqtt_sock_s {
nni_duration retry;
nni_duration keepalive; // mqtt keepalive
nni_duration timeleft; // left time to send next ping
uint16_t batchsz; // resend qos in batchs
uint16_t batchcnt;
nni_duration batchtmo; // interval of batch sending
uint16_t lastpid;

mqtt_quic_ctx master; // to which we delegate send/recv calls
nni_list recv_queue; // ctx pending to receive
Expand Down Expand Up @@ -1052,6 +1056,9 @@ mqtt_timer_cb(void *arg)
{
mqtt_sock_t *s = arg;
mqtt_pipe_t *p;
nni_msg *msg = NULL;
uint16_t pid;
uint16_t ptype;

if (nng_aio_result(&s->time_aio) != 0) {
log_warn("sleep aio finish error!");
Expand All @@ -1067,11 +1074,46 @@ mqtt_timer_cb(void *arg)
nni_mtx_unlock(&s->mtx);
return;
}
// Ping would be send at transport layer
nni_mtx_unlock(&s->mtx);

nni_mtx_lock(&p->lk);
// If batchcnt > 0. Batch sending was started. So handle batch first.
if (s->batchcnt > 0 && s->batchcnt < s->batchsz) {
pid = ++ s->lastpid;
msg = nni_id_get_min(&p->sent_unack, &pid);
s->batchcnt ++;
}
if (msg != NULL) {
nni_msg_clone(msg);
s->lastpid = pid;
log_debug("NO.%d Batch sending id%d msg%p", s->batchcnt-1, pid, msg);
ptype = nni_mqtt_msg_get_packet_type(msg);
if (ptype == NNG_MQTT_PUBLISH)
nni_mqtt_msg_set_publish_dup(msg, true);
if (!p->busy) {
p->busy = true;
nni_aio_set_msg(&p->send_aio, msg);
nni_pipe_send(p->qpipe, &p->send_aio);
} else {
nni_lmq_put(&p->send_messages, msg);
}
//} else {
// log_warn("Batch sending id%d missing", pid);
}
if (s->batchcnt > 0) {
s->batchcnt %= s->batchsz;
nni_mtx_unlock(&p->lk);
if (s->batchcnt == 0)
nni_sleep_aio(s->retry, &s->time_aio);
else
nni_sleep_aio(s->batchtmo, &s->time_aio);
return;
}

// Ping would be send at transport layer
if (p->pingcnt > 1) {
log_warn("MQTT Timeout and disconnect");
nni_mtx_unlock(&s->mtx);
nni_mtx_unlock(&p->lk);
nni_pipe_close(p->qpipe);
return;
}
Expand All @@ -1088,39 +1130,40 @@ mqtt_timer_cb(void *arg)
nni_aio_set_msg(&p->send_aio, p->pingmsg);
nni_pipe_send(p->qpipe, &p->send_aio);
p->pingcnt ++;
nni_mtx_unlock(&s->mtx);
nni_mtx_unlock(&p->lk);
log_info("Send pingreq (sock%p)(%dms)", s, s->keepalive);
nni_sleep_aio(s->retry, &s->time_aio);
return;
}

// start message resending
// uint16_t pid = p->rid;
// msg = nni_id_get_min(&p->sent_unack, &pid);
// if (msg != NULL) {
// uint16_t ptype;
// ptype = nni_mqtt_msg_get_packet_type(msg);
// if (ptype == NNG_MQTT_PUBLISH) {
// nni_mqtt_msg_set_publish_dup(msg, true);
// }
// if (!p->busy) {
// p->busy = true;
// nni_msg_clone(msg);
// aio = nni_mqtt_msg_get_aio(msg);
// if (aio) {
// nni_aio_bump_count(aio,
// nni_msg_header_len(msg) +
// nni_msg_len(msg));
// nni_aio_set_msg(aio, NULL);
// }
// nni_aio_set_msg(&p->send_aio, msg);
// quic_pipe_send(p->qpipe, &p->send_aio);

// nni_mtx_unlock(&s->mtx);
// nni_sleep_aio(s->retry * NNI_SECOND, &s->time_aio);
// return;
// }
// }
msg = nni_id_get_min(&p->sent_unack, &pid);
if (msg != NULL) {
ptype = nni_mqtt_msg_get_packet_type(msg);
s->lastpid = pid;
s->batchcnt ++;
log_debug("Batch sending started id%d msg%p", pid, msg);
if (ptype == NNG_MQTT_PUBLISH)
nni_mqtt_msg_set_publish_dup(msg, true);
if (!p->busy) {
p->busy = true;
nni_msg_clone(msg);
//aio = nni_mqtt_msg_get_aio(msg);
//if (aio) {
// nni_aio_bump_count(aio,
// nni_msg_header_len(msg) +
// nni_msg_len(msg));
// nni_aio_set_msg(aio, NULL);
//}
nni_aio_set_msg(&p->send_aio, msg);
nni_pipe_send(p->qpipe, &p->send_aio);

nni_mtx_unlock(&p->lk);
nni_sleep_aio(s->batchtmo, &s->time_aio);
return;
}
}
#if defined(NNG_SUPP_SQLITE)
if (!p->busy) {
nni_msg *msg = NULL;
Expand All @@ -1135,14 +1178,14 @@ mqtt_timer_cb(void *arg)
nni_aio_set_msg(&p->send_aio, msg);
nni_pipe_send(p->qpipe, &p->send_aio);

nni_mtx_unlock(&s->mtx);
nni_mtx_unlock(&p->lk);
nni_sleep_aio(s->retry, &s->time_aio);
return;
}
}
}
#endif
nni_mtx_unlock(&s->mtx);
nni_mtx_unlock(&p->lk);
nni_sleep_aio(s->retry, &s->time_aio);
return;
}
Expand Down Expand Up @@ -1184,6 +1227,9 @@ static void mqtt_quic_sock_init(void *arg, nni_sock *sock)
s->retry = MQTT_QUIC_RETRTY * NNI_SECOND;
s->keepalive = NNI_SECOND * 10; // default mqtt keepalive
s->timeleft = NNI_SECOND * 10;
s->batchcnt = 0;
s->batchsz = 8;
s->batchtmo = 10; // Interval of batch sending (ms)

nni_lmq_init(&s->send_messages, NNG_MAX_SEND_LMQ);
nni_aio_list_init(&s->send_queue);
Expand Down