diff --git a/include/nng/nng.h b/include/nng/nng.h index a25ce2398..4307d2183 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -1668,6 +1668,7 @@ NNG_DECL uint8_t conn_param_get_will_qos(conn_param *cparam); NNG_DECL uint8_t conn_param_get_will_retain(conn_param *cparam); NNG_DECL uint8_t conn_param_get_protover(conn_param *cparam); NNG_DECL uint16_t conn_param_get_keepalive(conn_param *cparam); +NNG_DECL uint16_t conn_param_get_topic_alias_max_out(conn_param *cparam); NNG_DECL uint32_t conn_param_get_clientid_len(conn_param *cparam); NNG_DECL void *conn_param_get_qos_db(conn_param *cparam); NNG_DECL char *conn_param_get_ip_addr_v4(conn_param *cparam); diff --git a/src/core/message.h b/src/core/message.h index c64490161..a1c8d835c 100644 --- a/src/core/message.h +++ b/src/core/message.h @@ -116,6 +116,7 @@ struct conn_param { uint16_t rx_max; uint16_t keepalive_mqtt; uint16_t topic_alias_max; + uint16_t topic_alias_max_out; uint8_t pro_ver; uint8_t con_flag; uint8_t clean_start; diff --git a/src/nng.c b/src/nng.c index 82f1a33cd..23cf5e499 100644 --- a/src/nng.c +++ b/src/nng.c @@ -2518,6 +2518,12 @@ conn_param_get_keepalive(conn_param *cparam) return cparam->keepalive_mqtt; } +uint16_t +conn_param_get_topic_alias_max_out(conn_param *cparam) +{ + return cparam->topic_alias_max_out; +} + uint8_t conn_param_get_protover(conn_param *cparam) { diff --git a/src/sp/protocol/mqtt/mqtt_parser.c b/src/sp/protocol/mqtt/mqtt_parser.c index 4976f86d1..d3318c3ba 100644 --- a/src/sp/protocol/mqtt/mqtt_parser.c +++ b/src/sp/protocol/mqtt/mqtt_parser.c @@ -858,6 +858,7 @@ conn_param_init(conn_param *cparam) cparam->rx_max = 65535; cparam->max_packet_size = 0; cparam->topic_alias_max = 0; + cparam->topic_alias_max_out = 0; cparam->req_resp_info = 0; cparam->req_problem_info = 1; cparam->auth_method = NULL; diff --git a/src/sp/transport/mqtt/broker_tcp.c b/src/sp/transport/mqtt/broker_tcp.c index 82e5c884d..3254e0004 100644 --- a/src/sp/transport/mqtt/broker_tcp.c +++ b/src/sp/transport/mqtt/broker_tcp.c @@ -424,23 +424,47 @@ tcptran_pipe_nego_cb(void *arg) nni_list_append(&ep->waitpipes, p); // Match happens before accept_cb. Make pipe id ready tcptran_ep_match(ep); - if (p->tcp_cparam->max_packet_size == 0) { + conn_param *cparam = p->tcp_cparam; + property *props = cparam->properties; + if (props != NULL) { + // CONNECT's Topic Alias Maximum is client + // capability and must not be reused as + // broker's CONNACK capability. + property_remove(props, TOPIC_ALIAS_MAXIMUM); + } + if (cparam->max_packet_size == 0) { // set default max packet size for client - p->tcp_cparam->max_packet_size = - p->conf == NULL + cparam->max_packet_size = p->conf == NULL ? NANO_MAX_PACKET_SIZE : p->conf->client_max_packet_size; - if (p->tcp_cparam->properties != NULL) { - property_remove(p->tcp_cparam->properties, - MAXIMUM_PACKET_SIZE); - property_append(p->tcp_cparam->properties, - property_set_value_u32(MAXIMUM_PACKET_SIZE, - p->tcp_cparam->max_packet_size)); + if (props != NULL) { + property_remove( + props, MAXIMUM_PACKET_SIZE); + property_append(props, + property_set_value_u32( + MAXIMUM_PACKET_SIZE, + cparam->max_packet_size)); + } + } + // Advertise broker's Topic Alias Maximum in CONNACK + // for MQTT v5. Default to 65535 to allow clients to + // use Topic Aliases. + if (p->pro_ver == MQTT_PROTOCOL_VERSION_v5) { + if (props == NULL) { + props = property_alloc(); + cparam->properties = props; + } + if (props != NULL) { + cparam->topic_alias_max_out = 65535; + property_append(props, + property_set_value_u16( + TOPIC_ALIAS_MAXIMUM, + cparam->topic_alias_max_out)); } } log_debug("max_packet_size of %.*s is %d", - p->tcp_cparam->clientid.len, p->tcp_cparam->clientid.body, - p->tcp_cparam->max_packet_size); + cparam->clientid.len, cparam->clientid.body, + cparam->max_packet_size); nni_mtx_unlock(&ep->mtx); return; } else { diff --git a/src/supplemental/mqtt/mqtt_codec.c b/src/supplemental/mqtt/mqtt_codec.c index f8779073d..1939ddc47 100644 --- a/src/supplemental/mqtt/mqtt_codec.c +++ b/src/supplemental/mqtt/mqtt_codec.c @@ -4326,10 +4326,7 @@ encode_properties(nni_msg *msg, property *prop, uint8_t cmd) nni_mqtt_msg_append_u8(msg, p->data.p_value.u8); break; case U16: - if (p->id == TOPIC_ALIAS_MAXIMUM) - nni_mqtt_msg_append_u16(msg, 0xFFFF); - else - nni_mqtt_msg_append_u16(msg, p->data.p_value.u16); + nni_mqtt_msg_append_u16(msg, p->data.p_value.u16); break; case U32: nni_mqtt_msg_append_u32(msg, p->data.p_value.u32); diff --git a/src/supplemental/mqtt/mqtt_msg.c b/src/supplemental/mqtt/mqtt_msg.c index 8bb1d88c3..2afba9c8d 100644 --- a/src/supplemental/mqtt/mqtt_msg.c +++ b/src/supplemental/mqtt/mqtt_msg.c @@ -1119,6 +1119,7 @@ nni_get_conn_param_from_msg(nni_msg *msg) conn_ctx->rx_max = 65535; conn_ctx->max_packet_size = 0; conn_ctx->topic_alias_max = 0; + conn_ctx->topic_alias_max_out = 0; conn_ctx->req_resp_info = 0; conn_ctx->req_problem_info = 1; conn_ctx->auth_method = NULL;