diff --git a/src/supplemental/quic/CMakeLists.txt b/src/supplemental/quic/CMakeLists.txt index ec6ce3251..360956d7e 100644 --- a/src/supplemental/quic/CMakeLists.txt +++ b/src/supplemental/quic/CMakeLists.txt @@ -13,9 +13,14 @@ if (NNG_ENABLE_QUIC) add_subdirectory(msquic) add_dependencies(nng msquic) + nng_sources(msquic_common.c) + nng_sources(quic_private.h) + nng_sources(quic_api.c) nng_sources(quic_api.h) + nng_sources(msquic_dial.c) + nng_test(quic_api_test) find_path(INTERNAL_MSQUIC_INCLUDE_DIR diff --git a/src/supplemental/quic/msquic_common.c b/src/supplemental/quic/msquic_common.c new file mode 100644 index 000000000..1d763f4ae --- /dev/null +++ b/src/supplemental/quic/msquic_common.c @@ -0,0 +1,46 @@ +#include "quic_private.h" +#include "core/nng_impl.h" + +static const QUIC_API_TABLE *MsQuic = NULL; + +/***************************** MsQuic Bindings *****************************/ + +void +msquic_set_api_table(const QUIC_API_TABLE *table) +{ + MsQuic = table; +} + +void +msquic_conn_close(HQUIC qconn, int rv) +{ + MsQuic->ConnectionShutdown(qconn, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, (QUIC_UINT62)rv); +} + +void +msquic_conn_fini(HQUIC qconn) +{ + MsQuic->ConnectionClose(qconn); +} + +void +msquic_strm_close(HQUIC qstrm) +{ + log_info("stream %p shutdown", qstrm); + MsQuic->StreamShutdown( + qstrm, QUIC_STREAM_SHUTDOWN_FLAG_ABORT | QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, NNG_ECONNSHUT); +} + +void +msquic_strm_fini(HQUIC qstrm) +{ + log_info("stream %p fini", qstrm); + MsQuic->StreamClose(qstrm); +} + +void +msquic_strm_recv_start(HQUIC qstrm) +{ + MsQuic->StreamReceiveSetEnabled(qstrm, TRUE); +} + diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 1e548e7e7..aad1f6b19 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -19,6 +19,7 @@ // closed. #include "quic_api.h" +#include "quic_private.h" #include "core/nng_impl.h" #include "msquic.h" @@ -39,65 +40,17 @@ #include #include -#include "nng/mqtt/mqtt_client.h" -#include "nng/supplemental/nanolib/conf.h" -#include "nng/protocol/mqtt/mqtt_parser.h" -#include "supplemental/mqtt/mqtt_msg.h" - -#include "openssl/pem.h" -#include "openssl/x509.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -struct nni_quic_conn { - nng_stream stream; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; - nni_aio * dial_aio; - // nni_aio * qstrmaio; // Link to msquic_strm_cb - nni_quic_dialer *dialer; - - // MsQuic - HQUIC qstrm; // quic stream - uint8_t reason_code; - - nni_reap_node reap; -}; - static const QUIC_API_TABLE *MsQuic = NULL; -// Config for msquic -static const QUIC_REGISTRATION_CONFIG quic_reg_config = { - "mqtt", - QUIC_EXECUTION_PROFILE_LOW_LATENCY -}; - -static const QUIC_BUFFER quic_alpn = { - sizeof("mqtt") - 1, - (uint8_t *) "mqtt" -}; - -HQUIC registration; -HQUIC configuration; +// The registration and configuration for dialer +static HQUIC registration; +static HQUIC configuration; static int msquic_open(); static void msquic_close(); + static int msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d); -static void msquic_conn_close(HQUIC qconn, int rv); -static void msquic_conn_fini(HQUIC qconn); static int msquic_strm_open(HQUIC qconn, nni_quic_dialer *d); -static void msquic_strm_close(HQUIC qstrm); -static void msquic_strm_fini(HQUIC qstrm); -static void msquic_strm_recv_start(HQUIC qstrm); static void quic_dialer_cb(void *arg); static void quic_stream_error(void *arg, int err); @@ -270,7 +223,7 @@ nni_quic_dial(void *arg, const char *host, const char *port, nni_aio *aio) nni_atomic_inc64(&d->ref); // Create a connection whenever dial. So it's okey. right? - if ((rv = nni_msquic_quic_alloc(&c, d)) != 0) { + if ((rv = nni_msquic_quic_dialer_conn_alloc(&c, d)) != 0) { nni_aio_finish_error(aio, rv); nni_msquic_quic_dialer_rele(d); return; @@ -711,15 +664,16 @@ quic_stream_set(void *arg, const char *name, const void *buf, size_t sz, nni_typ } int -nni_msquic_quic_alloc(nni_quic_conn **cp, nni_quic_dialer *d) +nni_msquic_quic_dialer_conn_alloc(nni_quic_conn **cp, nni_quic_dialer *d) { nni_quic_conn *c; if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { return (NNG_ENOMEM); } - c->closed = false; - c->dialer = d; + c->closed = false; + c->dialer = d; + c->listener = NULL; nni_mtx_init(&c->mtx); nni_aio_list_init(&c->readq); @@ -860,8 +814,6 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, return QUIC_STATUS_SUCCESS; } -// The clients's callback for stream events from MsQuic. -// New recv cb of quic transport _IRQL_requires_max_(DISPATCH_LEVEL) _Function_class_(QUIC_STREAM_CALLBACK) QUIC_STATUS QUIC_API msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, @@ -963,8 +915,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, return QUIC_STATUS_PENDING; case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: - // The peer gracefully shut down its send direction of the - // stream. + // The peer aborted its send direction of the stream. log_warn("[strm][%p] PEER_SEND_ABORTED errorcode %llu\n", stream, (unsigned long long) Event->PEER_SEND_ABORTED.ErrorCode); if (c->reason_code == 0) @@ -973,7 +924,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_ABORTED, c); break; case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: - // The peer aborted its send direction of the stream. + // The peer gracefully shut down its send direction of the stream. log_warn("[strm][%p] Peer send shut down\n", stream); MsQuic->StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0); quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN, c); @@ -1027,55 +978,6 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, return QUIC_STATUS_SUCCESS; } -static int is_msquic_inited = 0; - -static void -msquic_close() -{ - if (MsQuic != NULL) { - if (configuration != NULL) { - MsQuic->ConfigurationClose(configuration); - } - if (registration != NULL) { - // This will block until all outstanding child objects - // have been closed. - MsQuic->RegistrationClose(registration); - } - MsQuicClose(MsQuic); - is_msquic_inited = 0; - } -} - -static int -msquic_open() -{ - if (is_msquic_inited == 1) - return 0; - - QUIC_STATUS rv = QUIC_STATUS_SUCCESS; - // only Open MsQUIC lib once, otherwise cause memleak - if (MsQuic == NULL) - if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) { - log_error("MsQuicOpen2 failed, 0x%x!\n", rv); - goto error; - } - - // Create a registration for the app's connections. - rv = MsQuic->RegistrationOpen(&quic_reg_config, ®istration); - if (QUIC_FAILED(rv)) { - log_error("RegistrationOpen failed, 0x%x!\n", rv); - goto error; - } - - is_msquic_inited = 1; - log_info("Msquic is enabled"); - return 0; - -error: - msquic_close(); - return -1; -} - // Helper function to load a client configuration. static BOOLEAN msquic_load_config(QUIC_SETTINGS *settings, nni_quic_dialer *d) @@ -1195,18 +1097,6 @@ msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d) return (NNG_ECONNREFUSED); } -static void -msquic_conn_close(HQUIC qconn, int rv) -{ - MsQuic->ConnectionShutdown(qconn, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, (QUIC_UINT62)rv); -} - -static void -msquic_conn_fini(HQUIC qconn) -{ - MsQuic->ConnectionClose(qconn); -} - static int msquic_strm_open(HQUIC qconn, nni_quic_dialer *d) { @@ -1243,23 +1133,53 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d) return (NNG_ECLOSED); } -static void -msquic_strm_close(HQUIC qstrm) -{ - log_info("stream %p shutdown", qstrm); - MsQuic->StreamShutdown( - qstrm, QUIC_STREAM_SHUTDOWN_FLAG_ABORT | QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, NNG_ECONNSHUT); -} +static int is_msquic_inited = 0; static void -msquic_strm_fini(HQUIC qstrm) +msquic_close() { - log_info("stream %p fini", qstrm); - MsQuic->StreamClose(qstrm); + if (MsQuic != NULL) { + if (configuration != NULL) { + MsQuic->ConfigurationClose(configuration); + } + if (registration != NULL) { + // This will block until all outstanding child objects + // have been closed. + MsQuic->RegistrationClose(registration); + } + MsQuicClose(MsQuic); + is_msquic_inited = 0; + } } -static void -msquic_strm_recv_start(HQUIC qstrm) +static int +msquic_open() { - MsQuic->StreamReceiveSetEnabled(qstrm, TRUE); + if (is_msquic_inited == 1) + return 0; + + QUIC_STATUS rv = QUIC_STATUS_SUCCESS; + // only Open MsQUIC lib once, otherwise cause memleak + if (MsQuic == NULL) + if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) { + log_error("MsQuicOpen2 failed, 0x%x!\n", rv); + goto error; + } + msquic_set_api_table(MsQuic); + + // Create a registration for the app's connections. + rv = MsQuic->RegistrationOpen(&quic_reg_config, ®istration); + if (QUIC_FAILED(rv)) { + log_error("RegistrationOpen failed, 0x%x!\n", rv); + goto error; + } + + is_msquic_inited = 1; + log_info("Msquic is enabled"); + return 0; + +error: + msquic_close(registration, NULL); + return -1; } + diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index caba102be..c4e9fef6f 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -18,8 +18,8 @@ // The quic connection would be free if all nng streams // closed. -#include "msquic_posix.h" #include "quic_api.h" +#include "quic_private.h" #include "core/nng_impl.h" #include "msquic.h" @@ -40,60 +40,23 @@ #include #include -#include "nng/mqtt/mqtt_client.h" -#include "nng/supplemental/nanolib/conf.h" -#include "nng/protocol/mqtt/mqtt_parser.h" -#include "supplemental/mqtt/mqtt_msg.h" - -#include "openssl/pem.h" -#include "openssl/x509.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -struct nni_quic_conn { - nng_stream stream; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; - nni_aio * dial_aio; - // nni_aio * qstrmaio; // Link to msquic_strm_cb - nni_quic_dialer *dialer; - - // MsQuic - HQUIC qstrm; // quic stream - uint8_t reason_code; - - nni_reap_node reap; -}; - static const QUIC_API_TABLE *MsQuic = NULL; -// Config for msquic -static const QUIC_REGISTRATION_CONFIG quic_reg_config = { - "mqtt_listener", - QUIC_EXECUTION_PROFILE_LOW_LATENCY -}; - -static const QUIC_BUFFER quic_alpn = { - sizeof("mqtt") - 1, - (uint8_t *) "mqtt" -}; +// The registration and configuration for listener +static HQUIC registration; +static HQUIC configuration; -HQUIC registration; -HQUIC configuration +static int msquic_open(); +static void msquic_close(); static void msquic_listener_fini(HQUIC ql); static void msquic_listener_stop(HQUIC ql); static int msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l); +static void quic_stream_error(void *arg, int err); +static void quic_stream_close(void *arg); +static void quic_stream_dowrite(nni_quic_conn *c); + /***************************** MsQuic Listener ******************************/ int @@ -120,7 +83,7 @@ nni_quic_listener_init(void **argp) nni_atomic_inc64(&l->ref); // 0RTT is disabled by default - l->enable_0rtt = false; + l->enable_0rtt = true; // multi_stream is disabled by default l->enable_mltstrm = false; @@ -144,7 +107,7 @@ quic_listener_doclose(nni_quic_listener *l) nni_aio_finish_error(aio, NNG_ECLOSED); } while ((aio = nni_list_first(&l->incomings)) != NULL) { - qconn = nni_aio_get_prov_data(aio); + HQUIC qconn = nni_aio_get_prov_data(aio); nni_aio_list_remove(aio); nni_aio_free(aio); msquic_conn_fini(qconn); @@ -157,9 +120,6 @@ quic_listener_doclose(nni_quic_listener *l) void nni_quic_listener_close(nni_quic_listener *l) { - nni_aio *aio; - HQUIC qconn; - nni_mtx_lock(&l->mtx); quic_listener_doclose(l); nni_mtx_unlock(&l->mtx); @@ -169,10 +129,7 @@ nni_quic_listener_close(nni_quic_listener *l) int nni_quic_listener_listen(nni_quic_listener *l, const char *h, const char *p) { - socklen_t len; - int rv; - int fd; - nni_posix_pfd * pfd; + int rv; nni_mtx_lock(&l->mtx); if (l->started) { @@ -184,7 +141,11 @@ nni_quic_listener_listen(nni_quic_listener *l, const char *h, const char *p) return (NNG_ECLOSED); } - msquic_listen(l->ql, h, p, l); + rv = msquic_listen(l->ql, h, p, l); + if (rv != 0) { + nni_mtx_unlock(&l->mtx); + return rv; + } l->started = true; nni_mtx_unlock(&l->mtx); @@ -214,32 +175,18 @@ quic_listener_doaccept(nni_quic_listener *l) nni_aio *aio; while ((aio = nni_list_first(&l->acceptq)) != NULL) { - int newfd; - int fd; - int rv; - int nd; - int ka; - HQUIC qconn; nni_aio * aioc; nni_quic_conn * c; - // Get the connection - if ((aioc == nni_list_first(&l->incomings)) == NULL) { + // Get the connection + if ((aioc = nni_list_first(&l->incomings)) == NULL) { // No wait and return immediately return; } - qconn = nni_aio_get_prov_data(aioc); // Must exists + c = nni_aio_get_prov_data(aioc); // Must exists nni_aio_list_remove(aioc); nni_aio_free(aioc); - // Create a nni quic connection - if ((rv = nni_msquic_quic_alloc(&c, NULL)) != 0) { - msquic_conn_fini(qconn); - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - continue; - } - nni_aio_list_remove(aio); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); @@ -299,24 +246,677 @@ nni_quic_listener_fini(nni_quic_listener *l) NNI_FREE_STRUCT(l); } +/**************************** MsQuic Connection ****************************/ -/***************************** MsQuic Bindings *****************************/ +static void +quic_stream_cb(int events, void *arg) +{ + log_debug("[quic cb] start %d\n", events); + nni_quic_conn *c = arg; + nni_quic_listener *l; + nni_quic_session *ss; + nni_aio *aio; + + if (!c) + return; + + ss = c->session; + l = c->listener; + + switch (events) { + case QUIC_STREAM_EVENT_SEND_COMPLETE: + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->writeq)) == NULL) { + log_error("Aio lost after sending: conn %p", c); + nni_mtx_unlock(&c->mtx); + break; + } + nni_aio_list_remove(aio); + QUIC_BUFFER *buf = nni_aio_get_input(aio, 0); + free(buf); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Start next send only after finished the last send + quic_stream_dowrite(c); + + nni_mtx_unlock(&c->mtx); + break; + case QUIC_STREAM_EVENT_START_COMPLETE: + nni_mtx_lock(&l->mtx); + + if (c->started == true) { + nni_mtx_unlock(&l->mtx); + break; + } + + // Push connection to incomings + nni_aio_alloc(&aio, NULL, NULL); + nni_aio_set_prov_data(aio, (void *)c); + nni_aio_list_append(&l->incomings, aio); + + quic_listener_doaccept(l); + + // This stream is started from now. + c->started = true; + + nni_mtx_unlock(&l->mtx); + break; + // case QUIC_STREAM_EVENT_RECEIVE: // get a fin from stream + // TODO Need more talk about those cases + // case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: + // case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: + // case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: + case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: + // case QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED: + // Marked it as closed, prevent explicit shutdown + c->closed = true; + // It's the only place to free msquic stream + msquic_strm_fini(c->qstrm); + quic_stream_error(arg, NNG_ECONNSHUT); + break; + default: + break; + } + log_debug("[quic cb] end\n"); +} static void -msquic_load_listener_config() +quic_stream_fini(void *arg) +{ + nni_quic_conn *c = arg; + quic_stream_close(c); + + if (c->dialer) { + nni_msquic_quic_dialer_rele(c->dialer); + } + NNI_FREE_STRUCT(c); +} + +//static nni_reap_list quic_reap_list = { +// .rl_offset = offsetof(nni_quic_conn, reap), +// .rl_func = quic_stream_fini, +//}; +static void +quic_stream_free(void *arg) +{ + nni_quic_conn *c = arg; + quic_stream_fini(c); +} + +// Notify upper layer that something happened. +// Includes closed by peer or transport layer. +// Or get a FIN from quic stream. +static void +quic_stream_error(void *arg, int err) +{ + nni_quic_conn *c = arg; + nni_aio * aio; + + nni_mtx_lock(&c->mtx); + // only close aio of this stream + while ((aio = nni_list_first(&c->writeq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, err); + } + while ((aio = nni_list_first(&c->readq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, err); + } + nni_mtx_unlock(&c->mtx); +} + +static void +quic_stream_close(void *arg) +{ + nni_quic_conn *c = arg; + nni_mtx_lock(&c->mtx); + if (c->closed != true) { + c->closed = true; + msquic_strm_close(c->qstrm); + } + nni_mtx_unlock(&c->mtx); +} + +static void +quic_stream_cancel(nni_aio *aio, void *arg, int rv) +{ + nni_quic_conn *c = arg; + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +static void +quic_stream_recv(void *arg, nni_aio *aio) +{ + nni_quic_conn *c = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, quic_stream_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // Receive start if there are only one aio in readq. + if (nni_list_first(&c->readq) == aio) { + // In msquic. To avoid repeated memory copies. We just enable + // the receive. And doread in msquic_strm_cb. So there is + // only one copy from msquic to nanonng. + msquic_strm_recv_start(c->qstrm); + } + nni_mtx_unlock(&c->mtx); +} + +static void +quic_stream_dowrite_prior(nni_quic_conn *c, nni_aio *aio) +{ + log_debug("[quic dowrite adv] start\n"); + int rv; + unsigned naiov; + nni_iov * aiov; + size_t n = 0; + + if (c->closed) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + + QUIC_BUFFER *buf=(QUIC_BUFFER*)malloc(sizeof(QUIC_BUFFER)*naiov); + for (uint8_t i = 0; i < naiov; ++i) { + log_debug("buf%d sz %d", i, aiov[i].iov_len); + buf[i].Buffer = aiov[i].iov_buf; + buf[i].Length = aiov[i].iov_len; + n += aiov[i].iov_len; + } + nni_aio_set_input(aio, 0, buf); + + if (QUIC_FAILED(rv = MsQuic->StreamSend(c->qstrm, buf, + naiov, QUIC_SEND_FLAG_NONE, aio))) { + log_error("Failed in StreamSend, 0x%x!", rv); + free(buf); + return; + } + + nni_aio_bump_count(aio, n); + log_debug("[quic dowrite adv] end"); +} + +static void +quic_stream_dowrite(nni_quic_conn *c) +{ + log_debug("[quic dowrite] start %p", c->qstrm); + nni_aio *aio; + int rv; + + if (c->closed) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned naiov; + nni_iov * aiov; + size_t n = 0; + + nni_aio_get_iov(aio, &naiov, &aiov); + if (naiov == 0) + log_warn("A msg without content?"); + + QUIC_BUFFER *buf=(QUIC_BUFFER*)malloc(sizeof(QUIC_BUFFER)*naiov); + for (uint8_t i = 0; i < naiov; ++i) { + log_debug("buf%d sz %d", i, aiov[i].iov_len); + buf[i].Buffer = aiov[i].iov_buf; + buf[i].Length = aiov[i].iov_len; + n += aiov[i].iov_len; + } + nni_aio_set_input(aio, 0, buf); + + if (QUIC_FAILED(rv = MsQuic->StreamSend(c->qstrm, buf, + naiov, QUIC_SEND_FLAG_NONE, NULL))) { + log_error("Failed in StreamSend, 0x%x!", rv); + free(buf); + // nni_aio_list_remove(aio); + // nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + nni_aio_bump_count(aio, n); + + break; + // Different from tcp. + // Here we just send one msg at once. + } +} + +static void +quic_stream_send(void *arg, nni_aio *aio) +{ + nni_quic_conn *c = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&c->mtx); + if ((rv = nni_aio_schedule(aio, quic_stream_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + // QUIC_HIGH_PRIOR_MSG Feature! + int *flags = nni_aio_get_prov_data(aio); + nni_aio_set_prov_data(aio, NULL); + + if (flags) { + if (*flags & QUIC_HIGH_PRIOR_MSG) { + quic_stream_dowrite_prior(c, aio); + nni_mtx_unlock(&c->mtx); + return; + } + } + + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + quic_stream_dowrite(c); + // In msquic. Write can be done at any time. + } + nni_mtx_unlock(&c->mtx); +} + +static int +quic_stream_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(name); + NNI_ARG_UNUSED(buf); + NNI_ARG_UNUSED(szp); + NNI_ARG_UNUSED(t); + return 0; + + // nni_quic_conn *c = arg; + // return (nni_getopt(tcp_options, name, c, buf, szp, t)); +} + +static int +quic_stream_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(name); + NNI_ARG_UNUSED(buf); + NNI_ARG_UNUSED(sz); + NNI_ARG_UNUSED(t); + return 0; + + // nni_quic_conn *c = arg; + // return (nni_setopt(tcp_options, name, c, buf, sz, t)); +} + +int +nni_msquic_quic_listener_conn_alloc(nni_quic_conn **cp, nni_quic_session *ss) +{ + nni_quic_conn *c; + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->dialer = NULL; + c->listener = ss->listener; + c->session = ss; + c->started = false; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + c->stream.s_free = quic_stream_free; + c->stream.s_close = quic_stream_close; + c->stream.s_recv = quic_stream_recv; + c->stream.s_send = quic_stream_send; + c->stream.s_get = quic_stream_get; + c->stream.s_set = quic_stream_set; + + *cp = c; + return (0); +} + +static int +quic_listener_session_alloc(nni_quic_session **ss, nni_quic_listener *l, HQUIC qconn) +{ + nni_quic_session *s; + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + + s->closed = false; + s->qconn = qconn; + s->listener = l; + + nni_aio_list_init(&s->conns); + nni_mtx_init(&s->mtx); + + *ss = s; + return (0); +} + + +/***************************** MsQuic Bindings *****************************/ + +typedef struct QUIC_CREDENTIAL_CONFIG_HELPER { + QUIC_CREDENTIAL_CONFIG CredConfig; + union { + QUIC_CERTIFICATE_HASH CertHash; + QUIC_CERTIFICATE_HASH_STORE CertHashStore; + QUIC_CERTIFICATE_FILE CertFile; + QUIC_CERTIFICATE_FILE_PROTECTED CertFileProtected; + }; +} QUIC_CREDENTIAL_CONFIG_HELPER; + +uint8_t +DecodeHexChar( + _In_ char c + ) +{ + if (c >= '0' && c <= '9') return c - '0'; + if (c >= 'A' && c <= 'F') return 10 + c - 'A'; + if (c >= 'a' && c <= 'f') return 10 + c - 'a'; + return 0; +} + +uint32_t +DecodeHexBuffer( + _In_z_ const char* HexBuffer, + _In_ uint32_t OutBufferLen, + _Out_writes_to_(OutBufferLen, return) + uint8_t* OutBuffer + ) +{ + uint32_t HexBufferLen = (uint32_t)strlen(HexBuffer) / 2; + if (HexBufferLen > OutBufferLen) { + return 0; + } + + for (uint32_t i = 0; i < HexBufferLen; i++) { + OutBuffer[i] = + (DecodeHexChar(HexBuffer[i * 2]) << 4) | + DecodeHexChar(HexBuffer[i * 2 + 1]); + } + + return HexBufferLen; +} + +static BOOLEAN +msquic_load_listener_config(QUIC_SETTINGS *s, nni_quic_listener *l) { - return; + QUIC_SETTINGS Settings = *s; + + // Configures the server's idle timeout. + Settings.IdleTimeoutMs = QUIC_IDLE_TIMEOUT_DEFAULT * 1000; + Settings.IsSet.IdleTimeoutMs = TRUE; + + // Configures the server's resumption level to allow for resumption and 0-RTT. + Settings.ServerResumptionLevel = QUIC_SERVER_RESUME_AND_ZERORTT; + Settings.IsSet.ServerResumptionLevel = TRUE; + + // Configures the server's settings to allow for the peer to open a single + // bidirectional stream. By default connections are not configured to allow + // any streams from the peer. + Settings.PeerBidiStreamCount = 1; + Settings.IsSet.PeerBidiStreamCount = TRUE; + + QUIC_CREDENTIAL_CONFIG_HELPER Config; + memset(&Config, 0, sizeof(Config)); + Config.CredConfig.Flags = QUIC_CREDENTIAL_FLAG_NONE; + + const char* Cert; + const char* KeyFile; + if ((Cert = l->cert_hash) != NULL) { + // Load the server's certificate from the default certificate store, + // using the provided certificate hash. + uint32_t CertHashLen = + DecodeHexBuffer( + Cert, + sizeof(Config.CertHash.ShaHash), + Config.CertHash.ShaHash); + if (CertHashLen != sizeof(Config.CertHash.ShaHash)) { + return FALSE; + } + Config.CredConfig.Type = QUIC_CREDENTIAL_TYPE_CERTIFICATE_HASH; + Config.CredConfig.CertificateHash = &Config.CertHash; + + } else if ((Cert = l->cert_fpath) != NULL && + (KeyFile = l->key_fpath) != NULL) { + // Loads the server's certificate from the file. + const char* Password = l->pwd; + if (Password != NULL) { + Config.CertFileProtected.CertificateFile = (char*)Cert; + Config.CertFileProtected.PrivateKeyFile = (char*)KeyFile; + Config.CertFileProtected.PrivateKeyPassword = (char*)Password; + Config.CredConfig.Type = QUIC_CREDENTIAL_TYPE_CERTIFICATE_FILE_PROTECTED; + Config.CredConfig.CertificateFileProtected = &Config.CertFileProtected; + } else { + Config.CertFile.CertificateFile = (char*)Cert; + Config.CertFile.PrivateKeyFile = (char*)KeyFile; + Config.CredConfig.Type = QUIC_CREDENTIAL_TYPE_CERTIFICATE_FILE; + Config.CredConfig.CertificateFile = &Config.CertFile; + } + + } else { + log_error("Must specify ['-cert_hash'] or ['cert_file' and 'key_file' (and optionally 'password')]!\n"); + return FALSE; + } + + // Allocate/initialize the configuration object, with the configured ALPN + // and settings. + QUIC_STATUS Status = QUIC_STATUS_SUCCESS; + if (QUIC_FAILED(Status = MsQuic->ConfigurationOpen(registration, + &quic_alpn, 1, &Settings, sizeof(Settings), NULL, &configuration))) { + log_error("ConfigurationOpen failed, 0x%x!\n", Status); + return FALSE; + } + + // Loads the TLS credential part of the configuration. + if (QUIC_FAILED(Status = MsQuic->ConfigurationLoadCredential(configuration, &Config.CredConfig))) { + log_error("ConfigurationLoadCredential failed, 0x%x!\n", Status); + return FALSE; + } + + return TRUE; } +_IRQL_requires_max_(DISPATCH_LEVEL) +_Function_class_(QUIC_STREAM_CALLBACK) QUIC_STATUS QUIC_API +msquic_strm_listener_cb(_In_ HQUIC stream, _In_opt_ void *Context, + _Inout_ QUIC_STREAM_EVENT *Event) +{ + nni_quic_conn *c = Context; + nni_aio *aio; + nni_iov * aiov; + unsigned naiov; + uint32_t rlen, rlen2, rpos; + uint8_t *rbuf; + uint32_t count; + + log_debug("quic_strm_cb triggered! %d conn %p strm %p", Event->Type, c, stream); + switch (Event->Type) { + case QUIC_STREAM_EVENT_SEND_COMPLETE: + log_debug("QUIC_STREAM_EVENT_SEND_COMPLETE!"); + if (Event->SEND_COMPLETE.Canceled) { + log_warn("[strm][%p] Data sent Canceled: %d", + stream, Event->SEND_COMPLETE.Canceled); + } + // Priority msg send + if ((aio = Event->SEND_COMPLETE.ClientContext) != NULL) { + QUIC_BUFFER *buf = nni_aio_get_input(aio, 0); + free(buf); + Event->SEND_COMPLETE.ClientContext = NULL; + // TODO free by user cb or msquic layer??? + // nni_msg *msg = nni_aio_get_msg(aio); + // nni_msg_free(msg); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + break; + } + // Ordinary send + quic_stream_cb(QUIC_STREAM_EVENT_SEND_COMPLETE, c); + break; + case QUIC_STREAM_EVENT_RECEIVE: + // Data was received from the peer on the stream. + count = Event->RECEIVE.BufferCount; + + log_debug("[strm][%p] Data received Flag: %d", stream, Event->RECEIVE.Flags); + + if (Event->RECEIVE.Flags & QUIC_RECEIVE_FLAG_FIN) { + if (c->reason_code == 0) + c->reason_code = CLIENT_IDENTIFIER_NOT_VALID; + log_warn("FIN received in QUIC stream"); + break; + } + + nni_mtx_lock(&c->mtx); + if (c->closed) { + // Actively closed the quic stream by upper layer. So ignore. + nni_mtx_unlock(&c->mtx); + return QUIC_STATUS_PENDING; + } + // Get all the buffers in quic stream + if (count == 0) { + nni_mtx_unlock(&c->mtx); + return QUIC_STATUS_PENDING; + } + + rbuf = Event->RECEIVE.Buffers[0].Buffer; + rlen = Event->RECEIVE.Buffers[0].Length; + + rpos = 0; + while ((aio = nni_list_first(&c->readq)) != NULL) { + nni_aio_get_iov(aio, &naiov, &aiov); + int n = 0; + for (uint8_t i=0; i= aiov[i].iov_len) { + memcpy(aiov[i].iov_buf, rbuf+rpos, aiov[i].iov_len); + rpos += aiov[i].iov_len; + n += aiov[i].iov_len; + } else { + memcpy(aiov[i].iov_buf, rbuf+rpos, rlen2); + rpos += rlen2; + n += rlen2; + } + } + if (n == 0) { // rbuf run out + break; + } + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } + + MsQuic->StreamReceiveComplete(c->qstrm, rpos); + nni_mtx_unlock(&c->mtx); + + return QUIC_STATUS_PENDING; + case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: + // The peer abort its send direction of the stream. + log_warn("[strm][%p] PEER_SEND_ABORTED errorcode %llu\n", stream, + (unsigned long long) Event->PEER_SEND_ABORTED.ErrorCode); + if (c->reason_code == 0) + c->reason_code = SERVER_SHUTTING_DOWN; + + msquic_strm_close(c->qstrm); + + quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_ABORTED, c); + break; + case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: + // The peer aborted its send direction of the stream. + log_warn("[strm][%p] Peer send shut down\n", stream); + MsQuic->StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0); + quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN, c); + break; + case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: + // The peer gracefully shut down its send direction of the stream. + log_warn("[strm][%p] QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE.", stream); + // TODO The next msg would better to be sent with a FIN flag. + break; + + case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: + // Both directions of the stream have been shut down and MsQuic + // is done with the stream. It can now be safely cleaned up. + log_warn("[strm][%p] QUIC_STREAM_EVENT shutdown: All done.", + stream); + log_info("close stream with Error Code: %llu", + (unsigned long long) + Event->SHUTDOWN_COMPLETE.ConnectionErrorCode); + quic_stream_cb(QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE, c); + break; + case QUIC_STREAM_EVENT_START_COMPLETE: + log_info( + "QUIC_STREAM_EVENT_START_COMPLETE [%p] ID: %ld Status: %d", + stream, Event->START_COMPLETE.ID, + Event->START_COMPLETE.Status); + if (!Event->START_COMPLETE.PeerAccepted) { + log_warn("Peer refused"); + quic_stream_cb(QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE, c); + break; + } + + quic_stream_cb(QUIC_STREAM_EVENT_START_COMPLETE, c); + + break; + case QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE: + log_info("QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE"); + break; + case QUIC_STREAM_EVENT_PEER_ACCEPTED: + log_info("QUIC_STREAM_EVENT_PEER_ACCEPTED"); + break; + case QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED: + // The peer has requested that we stop sending. Close abortively. + log_warn("[strm][%p] Peer RECEIVE aborted\n", stream); + log_warn("QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED Error Code: %llu", + (unsigned long long) Event->PEER_RECEIVE_ABORTED.ErrorCode); + + quic_stream_cb(QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED, c); + break; + + default: + log_warn("Unknown Event Type %d", Event->Type); + break; + } + return QUIC_STATUS_SUCCESS; +} + + _IRQL_requires_max_(DISPATCH_LEVEL) _Function_class_(QUIC_CONNECTION_CALLBACK) QUIC_STATUS QUIC_API -msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, +msquic_connection_listener_cb(_In_ HQUIC Connection, _In_opt_ void *Context, _Inout_ QUIC_CONNECTION_EVENT *ev) { - nni_quic_listener *l = Context; - HQUIC qconn = Connection; + nni_quic_session *ss = Context; + HQUIC qconn = Connection; - log_debug("msquic_connection_cb triggered! %d", ev->Type); + log_debug("msquic_connection_listener_cb triggered! %d", ev->Type); switch (ev->Type) { case QUIC_CONNECTION_EVENT_CONNECTED: // The handshake has completed for the connection. @@ -324,11 +924,33 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, log_info("[conn][%p] is Connected. Resumed Session %d", qconn, ev->CONNECTED.SessionResumed); - if (l->enable_0rtt) { + if (ss->listener->enable_0rtt) { MsQuic->ConnectionSendResumptionTicket(qconn, QUIC_SEND_RESUMPTION_FLAG_NONE, 0, NULL); } + break; + case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: + HQUIC qstrm = ev->PEER_STREAM_STARTED.Stream; + QUIC_STREAM_OPEN_FLAGS flags = ev->PEER_STREAM_STARTED.Flags; + + int rv; + nni_quic_conn *c; + + // Create a nni quic connection + if ((rv = nni_msquic_quic_listener_conn_alloc(&c, ss)) != 0) { + log_warn("Error in alloc new quic stream."); + // msquic_conn_fini(qconn); + break; + } + + log_info("[conn][%p] Peer stream %p started. flags %d.", qconn, qstrm, flags); + MsQuic->SetCallbackHandler(qstrm, (void *)msquic_strm_listener_cb, c); - nni_aio_finish(d->qconaio, 0, 0); + quic_stream_cb(QUIC_STREAM_EVENT_START_COMPLETE, c); + + break; + case QUIC_CONNECTION_EVENT_RESUMED: + // TODO + log_warn("[conn][%p] This connection is resumed.", qconn); break; case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: log_warn("[conn][%p] Shutdown by transport, 0x%x, Error Code %llu\n", @@ -375,29 +997,28 @@ _IRQL_requires_max_(PASSIVE_LEVEL) _Function_class_(QUIC_LISTENER_CALLBACK) QUIC_STATUS QUIC_API msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVENT *ev) { - HQUIC *qconn; - QUIC_NEW_CONNECTION_INFO *qinfo; + HQUIC qconn; + const QUIC_NEW_CONNECTION_INFO *qinfo; QUIC_STATUS rv = QUIC_STATUS_NOT_SUPPORTED; nni_quic_listener *l = arg; - nni_aio *aio; + nni_quic_session *ss; + + NNI_ARG_UNUSED(ql); switch (ev->Type) { case QUIC_LISTENER_EVENT_NEW_CONNECTION: qconn = ev->NEW_CONNECTION.Connection; qinfo = ev->NEW_CONNECTION.Info; - MsQuic->SetCallbackHandler(qconn, msquic_connection_cb, ql); - rv = MsQuic->ConnectionSetConfiguration(qconn, configuration); - - nni_mtx_lock(&l->mtx); - - // Push connection to incomings - nni_aio_alloc(&aio, NULL, NULL); - nni_aio_set_prov_data(aio, (void *)qconn); - nni_aio_list_append(&l->incomings, aio); + int rc = quic_listener_session_alloc(&ss, l, qconn); + if (rc != 0) { + log_error("error in alloc session"); + break; + } + log_info("new connection incoming %p %*.s", qconn, qinfo->ClientAlpnListLength, qinfo->ClientAlpnList); - quic_listener_doaccept(l); - nni_mtx_unlock(&l->mtx); + MsQuic->SetCallbackHandler(qconn, msquic_connection_listener_cb, ss); + rv = MsQuic->ConnectionSetConfiguration(qconn, configuration); break; case QUIC_LISTENER_EVENT_STOP_COMPLETE: break; @@ -411,20 +1032,31 @@ msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVEN static int msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l) { - HQUIC addr; + QUIC_ADDR addr; QUIC_STATUS rv = 0; - QuicAddrSetFamily(&addr, QUIC_ADDRESS_FAMILY_UNSPEC); - QuicAddrSetPort(&addr, atoi(p)); + addr.Ip.sa_family = QUIC_ADDRESS_FAMILY_UNSPEC; + addr.Ipv4.sin_port = htons(atol(p)); + // QuicAddrSetFamily(&addr, QUIC_ADDRESS_FAMILY_UNSPEC); + // QuicAddrSetPort(&addr, atoi(p)); + + NNI_ARG_UNUSED(h); // Listen all interfaces - msquic_load_listener_config(); + if (0 != msquic_open()) { + // so... close the quic connection + return (NNG_ESYSERR); + } + + if (FALSE == msquic_load_listener_config(&l->settings, l)) { + return (NNG_EINVAL); + } if (QUIC_FAILED(rv = MsQuic->ListenerOpen(registration, msquic_listener_cb, (void *)l, &ql))) { log_error("error in listen open %ld", rv); goto error; } - if (QUIC_FAILED(rv = MsQuic->ListenerStart(ql, alpn, 1, &addr))) { + if (QUIC_FAILED(rv = MsQuic->ListenerStart(ql, &quic_alpn, 1, &addr))) { log_error("error in listen start %ld", rv); goto error; } @@ -450,3 +1082,54 @@ msquic_listener_fini(HQUIC ql) MsQuic->ListenerClose(ql); } +static int is_msquic_inited = 0; + +static void +msquic_close() +{ + if (MsQuic != NULL) { + if (configuration != NULL) { + MsQuic->ConfigurationClose(configuration); + } + if (registration != NULL) { + // This will block until all outstanding child objects + // have been closed. + MsQuic->RegistrationClose(registration); + } + MsQuicClose(MsQuic); + is_msquic_inited = 0; + } +} + +static int +msquic_open() +{ + if (is_msquic_inited == 1) + return 0; + + QUIC_STATUS rv = QUIC_STATUS_SUCCESS; + // only Open MsQUIC lib once, otherwise cause memleak + if (MsQuic == NULL) + if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) { + log_error("MsQuicOpen2 failed, 0x%x!\n", rv); + goto error; + } + msquic_set_api_table(MsQuic); + + // Create a registration for the app's connections. + rv = MsQuic->RegistrationOpen(&quic_reg_config, ®istration); + if (QUIC_FAILED(rv)) { + log_error("RegistrationOpen failed, 0x%x!\n", rv); + goto error; + } + + is_msquic_inited = 1; + log_info("Msquic is enabled"); + return 0; + +error: + msquic_close(registration, NULL); + return -1; +} + + diff --git a/src/supplemental/quic/quic_api.c b/src/supplemental/quic/quic_api.c index 4ac573a9a..de767a710 100644 --- a/src/supplemental/quic/quic_api.c +++ b/src/supplemental/quic/quic_api.c @@ -7,8 +7,8 @@ // found online at https://opensource.org/licenses/MIT. // #include "quic_api.h" +#include "quic_private.h" #include "core/nng_impl.h" -#include "msquic.h" #include "nng/mqtt/mqtt_client.h" #include "nng/nng.h" @@ -16,9 +16,6 @@ #include "nng/protocol/mqtt/mqtt_parser.h" #include "supplemental/mqtt/mqtt_msg.h" -#include "openssl/pem.h" -#include "openssl/x509.h" - #include #include #include @@ -40,6 +37,7 @@ struct quic_dialer { void * d; // platform dialer }; +/* int nni_quic_listener_alloc(nng_stream_listener **lp, const nni_url *url) { @@ -48,6 +46,7 @@ nni_quic_listener_alloc(nng_stream_listener **lp, const nni_url *url) return 0; } +*/ static void quic_dial_cancel(nni_aio *aio, void *arg, int rv) diff --git a/src/supplemental/quic/quic_api.h b/src/supplemental/quic/quic_api.h index f1f12b3a0..d971e18b1 100644 --- a/src/supplemental/quic/quic_api.h +++ b/src/supplemental/quic/quic_api.h @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #include "nng/nng.h" -#include "msquic.h" typedef struct quic_dialer quic_dialer; typedef struct quic_listener quic_listener; @@ -19,83 +18,4 @@ typedef struct quic_listener quic_listener; extern int nni_quic_dialer_alloc(nng_stream_dialer **, const nni_url *); extern int nni_quic_listener_alloc(nng_stream_listener **, const nni_url *); -typedef struct nni_quic_dialer nni_quic_dialer; - -extern int nni_quic_dialer_init(void **); -extern void nni_quic_dialer_fini(nni_quic_dialer *d); -extern void nni_quic_dial(void *, const char *, const char *, nni_aio *); -extern void nni_quic_dialer_close(void *); - -typedef struct nni_quic_listener nni_quic_listener; - -extern int nni_quic_listener_init(void **); -extern void nni_quic_listener_listen(nni_quic_listener *, const char *, const char *); -extern void nni_quic_listener_accept(nni_quic_listener *, nng_aio *aio); - -typedef struct nni_quic_conn nni_quic_conn; - -extern int nni_msquic_quic_alloc(nni_quic_conn **, nni_quic_dialer *); -extern void nni_msquic_quic_init(nni_quic_conn *); -extern void nni_msquic_quic_start(nni_quic_conn *, int, int); -extern void nni_msquic_quic_dialer_rele(nni_quic_dialer *); - -struct nni_quic_dialer { - nni_aio *qconaio; // for quic connection - nni_quic_conn *currcon; - nni_list connq; // pending connections/quic streams - bool closed; - bool nodelay; - bool keepalive; - struct sockaddr_storage src; - size_t srclen; - nni_mtx mtx; - nni_atomic_u64 ref; - nni_atomic_bool fini; - - // MsQuic - HQUIC qconn; // quic connection - bool enable_0rtt; - bool enable_mltstrm; - uint8_t reason_code; - // ResumptionTicket - char rticket[4096]; // Ususally it would be within 4096. - // But in msquic. The maximum size is 65535. - uint16_t rticket_sz; - // CertificateFile - char * cacert; - char * key; - char * password; - bool verify_peer; - char * ca; - - // Quic settings - uint64_t qidle_timeout; - uint32_t qkeepalive; - uint64_t qconnect_timeout; - uint32_t qdiscon_timeout; - uint32_t qsend_idle_timeout; - uint32_t qinitial_rtt_ms; - uint32_t qmax_ack_delay_ms; - - QUIC_SETTINGS settings; -}; - -struct nni_quic_listener { - nni_mtx mtx; - nni_atomic_u64 ref; - nni_atomic_bool fini; - bool closed; - bool started; - nni_list acceptq; - nni_list incomings; - - // MsQuic - HQUIC ql; // Quic Listener - - // Quic Settings - bool enable_0rtt; - bool enable_mltstrm; - - QUIC_SETTINGS settings; -}; #endif diff --git a/src/supplemental/quic/quic_listener.c b/src/supplemental/quic/quic_listener.c index 44e15263d..baf5138c0 100644 --- a/src/supplemental/quic/quic_listener.c +++ b/src/supplemental/quic/quic_listener.c @@ -7,6 +7,7 @@ // found online at https://opensource.org/licenses/MIT. // #include "quic_api.h" +#include "quic_private.h" #include "core/nng_impl.h" #include "msquic.h" @@ -54,7 +55,7 @@ static int quic_listener_listen(void *arg) { quic_listener *l = arg; - return (nni_quic_listener_listen(l->l, &l->sa)); + return (nni_quic_listener_listen(l->l, l->host, l->port)); } static void @@ -64,26 +65,28 @@ quic_listener_accept(void *arg, nng_aio *aio) nni_quic_listener_accept(l->l, aio); } -/* static int quic_listener_get( void *arg, const char *name, void *buf, size_t *szp, nni_type t) { + /* quic_listener *l = arg; if (strcmp(name, NNG_OPT_TCP_BOUND_PORT) == 0) { return (quic_listener_get_port(l, buf, szp, t)); } return (nni_quic_listener_get(l->l, name, buf, szp, t)); + */ } static int quic_listener_set( void *arg, const char *name, const void *buf, size_t sz, nni_type t) { + /* quic_listener *l = arg; return (nni_quic_listener_set(l->l, name, buf, sz, t)); + */ } -*/ static int quic_listener_alloc_addr(nng_stream_listener **lp, const char *h, const char *p) @@ -98,8 +101,8 @@ quic_listener_alloc_addr(nng_stream_listener **lp, const char *h, const char *p) NNI_FREE_STRUCT(l); return (rv); } - l->host = h; - l->port = p; + l->host = strdup(h); + l->port = strdup(p); l->ops.sl_free = quic_listener_free; l->ops.sl_close = quic_listener_close; diff --git a/src/supplemental/quic/quic_listener_test.c b/src/supplemental/quic/quic_listener_test.c new file mode 100644 index 000000000..22b0b9dec --- /dev/null +++ b/src/supplemental/quic/quic_listener_test.c @@ -0,0 +1,75 @@ +// +// Copyright 2023 NanoMQ Team, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include + +void +test_quic_echo(void) +{ + nng_stream_listener *l; + nng_stream_dialer * d; + nng_aio * aiod; + nng_aio * aiol; + nng_stream * sd; + nng_stream * sl; + void * td; + void * tl; + uint8_t * bufr; + uint8_t * bufs; + size_t size = 450001; + + // allocate messages + NUTS_ASSERT((bufs = nng_alloc(size)) != NULL); + NUTS_ASSERT((bufr = nng_alloc(size)) != NULL); + for (size_t i = 0; i < size; i++) { + bufs[i] = rand() & 0xff; + } + + NUTS_PASS(nng_aio_alloc(&aiod, NULL, NULL)); + NUTS_PASS(nng_aio_alloc(&aiol, NULL, NULL)); + nng_aio_set_timeout(aiod, 5000); // 5 sec + nng_aio_set_timeout(aiol, 5000); // 5 sec + + NUTS_PASS(nng_stream_listener_alloc(&l, "quic://127.0.0.1:14567")); + NUTS_PASS(nng_stream_listener_listen(l)); + + NUTS_PASS(nng_stream_dialer_alloc(&d, "quic://127.0.0.1:14567")); + + nng_stream_listener_accept(l, aiol); + nng_stream_dialer_dial(d, aiod); + + nng_aio_wait(aiol); + nng_aio_wait(aiod); + NUTS_PASS(nng_aio_result(aiol)); + NUTS_PASS(nng_aio_result(aiod)); + + NUTS_TRUE((sl = nng_aio_get_output(aiol, 0)) != NULL); + NUTS_TRUE((sd = nng_aio_get_output(aiod, 0)) != NULL); + + td = nuts_stream_send_start(sd, bufs, size); + tl = nuts_stream_recv_start(sl, bufr, size); + + NUTS_PASS(nuts_stream_wait(td)); + NUTS_PASS(nuts_stream_wait(tl)); + NUTS_TRUE(memcmp(bufs, bufr, size) == 0); + + nng_free(bufr, size); + nng_free(bufs, size); + nng_stream_free(sd); + nng_stream_free(sl); + nng_stream_dialer_free(d); + nng_stream_listener_free(l); + nng_aio_free(aiod); + nng_aio_free(aiol); +} + +TEST_LIST = { + { "quic dialer listener echo test", test_quic_echo }, + { NULL, NULL }, +}; diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h new file mode 100644 index 000000000..508478bcc --- /dev/null +++ b/src/supplemental/quic/quic_private.h @@ -0,0 +1,154 @@ +#ifndef NNG_SUPP_QUIC_PRIVATE_H +#define NNG_SUPP_QUIC_PRIVATE_H + +#include "msquic.h" +#include "core/nng_impl.h" +#include "nng/nng.h" + +// Config for msquic +static const QUIC_REGISTRATION_CONFIG quic_reg_config = { + "mqtt", + QUIC_EXECUTION_PROFILE_LOW_LATENCY +}; + +static const QUIC_BUFFER quic_alpn = { + sizeof("mqtt") - 1, + (uint8_t *) "mqtt" +}; + +typedef struct nni_quic_dialer nni_quic_dialer; + +int nni_quic_dialer_init(void **); +void nni_quic_dialer_fini(nni_quic_dialer *d); +void nni_quic_dial(void *, const char *, const char *, nni_aio *); +void nni_quic_dialer_close(void *); + +typedef struct nni_quic_listener nni_quic_listener; +typedef struct nni_quic_session nni_quic_session; + +int nni_quic_listener_init(void **); +int nni_quic_listener_listen(nni_quic_listener *, const char *, const char *); +void nni_quic_listener_accept(nni_quic_listener *, nng_aio *aio); +void nni_quic_listener_close(nni_quic_listener *l); +void nni_quic_listener_fini(nni_quic_listener *l); + +typedef struct nni_quic_conn nni_quic_conn; + +// Might no different TODO +int nni_msquic_quic_dialer_conn_alloc(nni_quic_conn **, nni_quic_dialer *); +int nni_msquic_quic_listener_conn_alloc(nni_quic_conn **, nni_quic_session *); +void nni_msquic_quic_dialer_rele(nni_quic_dialer *); + +// MsQuic binding +void msquic_set_api_table(const QUIC_API_TABLE *table); + +void msquic_conn_close(HQUIC qconn, int rv); +void msquic_conn_fini(HQUIC qconn); + +void msquic_strm_close(HQUIC qstrm); +void msquic_strm_fini(HQUIC qstrm); +void msquic_strm_recv_start(HQUIC qstrm); + +struct nni_quic_dialer { + nni_aio *qconaio; // for quic connection + nni_quic_conn *currcon; + nni_list connq; // pending connections/quic streams + bool closed; + bool nodelay; + bool keepalive; + struct sockaddr_storage src; + size_t srclen; + nni_mtx mtx; + nni_atomic_u64 ref; + nni_atomic_bool fini; + + // MsQuic + HQUIC qconn; // quic connection + bool enable_0rtt; + bool enable_mltstrm; + uint8_t reason_code; + // ResumptionTicket + char rticket[4096]; // Ususally it would be within 4096. + // But in msquic. The maximum size is 65535. + uint16_t rticket_sz; + // CertificateFile + char * cacert; + char * key; + char * password; + bool verify_peer; + char * ca; + + // Quic settings + uint64_t qidle_timeout; + uint32_t qkeepalive; + uint64_t qconnect_timeout; + uint32_t qdiscon_timeout; + uint32_t qsend_idle_timeout; + uint32_t qinitial_rtt_ms; + uint32_t qmax_ack_delay_ms; + + QUIC_SETTINGS settings; +}; + +struct nni_quic_listener { + nni_mtx mtx; + nni_atomic_u64 ref; + nni_atomic_bool fini; + bool closed; + bool started; + nni_list acceptq; + nni_list incomings; + + // MsQuic + HQUIC ql; // Quic Listener + + // Quic Settings + bool enable_0rtt; + bool enable_mltstrm; + + // option 1 + char * cert_hash; + + // option 2 + char * cert_fpath; + char * key_fpath; + char * pwd; + + QUIC_SETTINGS settings; +}; + +struct nni_quic_session { + // MsQuic + HQUIC qconn; // quic connection + + nni_list conns; // The quic streams in this quic connection + + nni_quic_listener *listener; + + bool closed; + nni_mtx mtx; +}; + + +struct nni_quic_conn { + nng_stream stream; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_aio * dial_aio; + bool started; + // nni_aio * qstrmaio; // Link to msquic_strm_cb + + nni_quic_dialer *dialer; + nni_quic_listener *listener; + nni_quic_session *session; + + // MsQuic + HQUIC qstrm; // quic stream + uint8_t reason_code; + + nni_reap_node reap; +}; + +#endif