From 1ea2298b3dd49ea7d8efdb9b42b063f48b93f897 Mon Sep 17 00:00:00 2001 From: Jason Hernandez <7144515+jasonhernandez@users.noreply.github.com> Date: Fri, 12 Jun 2026 16:46:10 -0700 Subject: [PATCH] adapter: migrate LaunchDarkly SDK off fork to upstream 3.1.1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move launchdarkly-server-sdk from the MaterializeInc/rust-server-sdk fork back to upstream crates.io 3.1.1, restoring the launchdarkly-sdk- transport + MetricsTransport setup and dropping the [patch.crates-io] override. The fork existed for launchdarkly/rust-server-sdk#116: a StreamingData Source/eventsource StreamClosed bug where a non-Eof stream error left the data source stuck with no reconnect, silently breaking LD sync. A prior upgrade to upstream 3.0.1 had to be reverted (incident-984) because that bug was still unfixed upstream. The fixes have since landed — rust-server-sdk#168 and rust-eventsource-client#134/#135 — and 3.1.1 resolves eventsource-client to 0.17.5, which carries them. Use the rustls + aws-lc-rs features (hyper-rustls-native-roots, crypto-aws-lc-rs), now the upstream defaults, instead of the prior attempt's native-tls/crypto-openssl, avoiding the OpenSSL path. The transport build_https() call is identical either way. deny.toml gains skips for the duplicate versions the transport stack pulls (older tower/rustls-native-certs; newer rand/rand_core/getrandom/ cpufeatures) and re-adds the launchdarkly-sdk-transport wrapper. Adds a test, test_metric_frozen_on_midstream_error, modeling the exact incident-984 failure mode (200 OK then a mid-stream timeout): it asserts the last_sse_time_seconds gauge freezes so the staleness alert can detect a stuck data source. Co-Authored-By: Claude Fable 5 --- Cargo.lock | 381 +++++++++++++++++++++++++---- Cargo.toml | 6 +- deny.toml | 10 + src/adapter/Cargo.toml | 2 +- src/adapter/src/config/frontend.rs | 319 +++++++++++++++++++++--- src/dyncfg-launchdarkly/Cargo.toml | 2 +- src/dyncfg-launchdarkly/src/lib.rs | 21 +- 7 files changed, 642 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81963db55ccae..f7a57320caa97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,7 +31,7 @@ checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ "cfg-if", "cipher", - "cpufeatures", + "cpufeatures 0.2.17", ] [[package]] @@ -1548,7 +1548,7 @@ dependencies = [ "bitflags 2.11.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -1908,6 +1908,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chrono" version = "0.4.41" @@ -1975,6 +1986,12 @@ dependencies = [ "half 1.6.0", ] +[[package]] +name = "cidr" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "579504560394e388085d0c080ea587dfa5c15f7e251b4d5247d1e1a61d1d6928" + [[package]] name = "cipher" version = "0.4.4" @@ -2318,6 +2335,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.3.0" @@ -2998,7 +3024,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -3380,7 +3406,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3412,17 +3438,18 @@ dependencies = [ [[package]] name = "eventsource-client" -version = "0.16.0" +version = "0.17.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c26451361cde19fe2322835b6e684f4825902edf3139ce9d6a70e6542566a0b2" +checksum = "96df8cfa11d3c8e4e1a48b81c9c600ecbe8c11d1326f41e1524cc4d42f7bf6d9" dependencies = [ "base64 0.22.1", + "bytes", "futures", - "hyper 0.14.32", - "hyper-timeout 0.4.1", + "http 1.4.2", + "launchdarkly-sdk-transport", "log", "pin-project", - "rand 0.8.5", + "rand 0.10.1", "tokio", ] @@ -3918,11 +3945,25 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "r-efi", + "r-efi 5.3.0", "wasi 0.14.2+wasi-0.2.4", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "rand_core 0.10.1", + "wasip2", + "wasip3", +] + [[package]] name = "gimli" version = "0.32.3" @@ -4345,6 +4386,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-http-proxy" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ad4b0a1e37510028bc4ba81d0e38d239c39671b0f0ce9e02dfa93a8133f7c08" +dependencies = [ + "bytes", + "futures-util", + "headers", + "http 1.4.2", + "hyper 1.9.0", + "hyper-rustls", + "hyper-util", + "pin-project-lite", + "rustls-native-certs 0.7.3", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-openssl" version = "0.10.2" @@ -4375,25 +4436,13 @@ dependencies = [ "hyper 1.9.0", "hyper-util", "rustls", - "rustls-native-certs", + "rustls-native-certs 0.8.3", "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", ] -[[package]] -name = "hyper-timeout" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" -dependencies = [ - "hyper 0.14.32", - "pin-project-lite", - "tokio", - "tokio-io-timeout", -] - [[package]] name = "hyper-timeout" version = "0.5.1" @@ -4453,7 +4502,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.4", "system-configuration", "tokio", "tower-service", @@ -4853,7 +4902,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.52.0", + "windows-sys 0.61.1", ] [[package]] @@ -5048,7 +5097,7 @@ dependencies = [ "http-body-util", "hyper 1.9.0", "hyper-openssl", - "hyper-timeout 0.5.1", + "hyper-timeout", "hyper-util", "jiff", "jsonpath-rust", @@ -5129,20 +5178,43 @@ dependencies = [ "tracing", ] +[[package]] +name = "launchdarkly-sdk-transport" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a048341aa360a768a7d91ebf6232a574f355ada0724acff2df7c5d8dc1e04c98" +dependencies = [ + "bytes", + "futures", + "http 1.4.2", + "http-body-util", + "hyper 1.9.0", + "hyper-http-proxy", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "log", + "no-proxy", + "tower 0.4.13", +] + [[package]] name = "launchdarkly-server-sdk" -version = "2.6.2" -source = "git+https://github.com/MaterializeInc/rust-server-sdk?rev=3e0a0b98b09a2970f292577a07e1c9382b65b5da#3e0a0b98b09a2970f292577a07e1c9382b65b5da" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0440c21aaa0670948071d9cbd8142e9d057a4b40feb3e6293e0efc6de9879b69" dependencies = [ "aws-lc-rs", + "bitflags 2.11.0", + "bytes", "chrono", "crossbeam-channel", "data-encoding", "eventsource-client", "futures", - "hyper 0.14.32", + "http 1.4.2", + "launchdarkly-sdk-transport", "launchdarkly-server-sdk-evaluation", - "lazy_static", "log", "lru", "moka", @@ -5158,9 +5230,9 @@ dependencies = [ [[package]] name = "launchdarkly-server-sdk-evaluation" -version = "2.0.1" +version = "2.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63706c23ee67f699563e5c52c7542361eccc966cba0430b6a7862c0ecaee9432" +checksum = "4bc97a52681b9860197ad81a2c1a34d96bb647459bcdf067ac1fae42c9fe8c30" dependencies = [ "base16ct", "chrono", @@ -5185,6 +5257,12 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "lexical-core" version = "1.0.5" @@ -5763,10 +5841,10 @@ dependencies = [ "hex", "http 1.4.2", "humantime", - "hyper-tls 0.5.0", "imbl", "ipnet", "itertools 0.14.0", + "launchdarkly-sdk-transport", "launchdarkly-server-sdk", "maplit", "mz-adapter-types", @@ -6636,7 +6714,7 @@ version = "0.0.0" dependencies = [ "anyhow", "humantime", - "hyper-tls 0.5.0", + "launchdarkly-sdk-transport", "launchdarkly-server-sdk", "mz-build-info", "mz-dyncfg", @@ -8885,6 +8963,15 @@ dependencies = [ "libc", ] +[[package]] +name = "no-proxy" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f79c902b31ceac6856e262af5dbaffef75390cf4647c9fef7b55da69a4b912e" +dependencies = [ + "cidr", +] + [[package]] name = "nom" version = "7.1.3" @@ -10261,7 +10348,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03da047801ff44bb6a4d407d4860c05fd70bb81714e6b2f3812603d5b145b042" dependencies = [ "heck", - "itertools 0.10.5", + "itertools 0.14.0", "log", "multimap", "petgraph", @@ -10282,7 +10369,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -10507,6 +10594,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "radium" version = "0.7.0" @@ -10548,6 +10641,17 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -10606,6 +10710,12 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "rand_distr" version = "0.5.1" @@ -11198,7 +11308,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -11211,7 +11321,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.52.0", + "windows-sys 0.61.1", ] [[package]] @@ -11228,6 +11338,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe 0.1.6", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.3" @@ -11240,6 +11363,15 @@ dependencies = [ "security-framework 3.7.0", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -11787,7 +11919,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -11804,7 +11936,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", "sha2-asm", ] @@ -12331,7 +12463,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.1.4", - "windows-sys 0.52.0", + "windows-sys 0.61.1", ] [[package]] @@ -12350,7 +12482,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230a1b821ccbd75b185820a1f1ff7b14d21da1e442e22c0863ea5f08771a8874" dependencies = [ "rustix 1.1.4", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -12628,16 +12760,6 @@ dependencies = [ "windows-sys 0.61.1", ] -[[package]] -name = "tokio-io-timeout" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90c49f106be240de154571dd31fbe48acb10ba6c6dd6f6517ad603abffa42de9" -dependencies = [ - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-io-utility" version = "0.7.6" @@ -12845,7 +12967,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.9.0", - "hyper-timeout 0.5.1", + "hyper-timeout", "hyper-util", "percent-encoding", "pin-project", @@ -12910,6 +13032,7 @@ dependencies = [ "pin-project-lite", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -13425,6 +13548,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "unit-prefix" version = "0.5.2" @@ -13657,6 +13786,24 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasip2" +version = "1.0.4+wasi-0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67efb37e106e55ce722a510d6b5f9c17f083e5fc79afc2badeb12cc313d9487" +dependencies = [ + "wit-bindgen 0.57.1", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", +] + [[package]] name = "wasite" version = "0.1.0" @@ -13718,6 +13865,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap 2.11.4", + "wasm-encoder", + "wasmparser", +] + [[package]] name = "wasm-streams" version = "0.4.1" @@ -13731,6 +13900,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags 2.11.0", + "hashbrown 0.15.3", + "indexmap 2.11.4", + "semver", +] + [[package]] name = "wasmtimer" version = "0.4.3" @@ -14060,6 +14241,32 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" @@ -14069,6 +14276,74 @@ dependencies = [ "bitflags 2.11.0", ] +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap 2.11.4", + "prettyplease", + "syn 2.0.117", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.117", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags 2.11.0", + "indexmap 2.11.4", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.11.4", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 6a2ee08bf8921..2b08d7719d319 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -392,7 +392,8 @@ junit-report = "0.8.3" k8s-controller = "0.10.0" k8s-openapi = { version = "0.27.0", features = ["schemars", "v1_32"] } kube = { version = "3.1.0", default-features = false, features = ["client", "derive", "openssl-tls", "runtime", "ws"] } -launchdarkly-server-sdk = { version = "2.6.2", default-features = false } +launchdarkly-server-sdk = { version = "3.1.1", default-features = false, features = ["hyper-rustls-native-roots", "crypto-aws-lc-rs"] } +launchdarkly-sdk-transport = "0.1" lgalloc = "0.6.0" libc = "0.2.186" lru = "0.16.3" @@ -637,9 +638,6 @@ postgres_array = { git = "https://github.com/MaterializeInc/rust-postgres-array" # Waiting on https://github.com/MaterializeInc/serde-value/pull/35. serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" } -# Waiting for resolution of https://github.com/launchdarkly/rust-server-sdk/issues/116 -launchdarkly-server-sdk = { git = "https://github.com/MaterializeInc/rust-server-sdk", rev = "3e0a0b98b09a2970f292577a07e1c9382b65b5da" } - # Waiting on https://github.com/edenhill/librdkafka/pull/4051. rdkafka = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" } rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" } diff --git a/deny.toml b/deny.toml index edcf630ef7f19..ea83ee6ca8e4c 100644 --- a/deny.toml +++ b/deny.toml @@ -150,6 +150,15 @@ skip = [ { name = "hashlink", version = "0.9.1" }, # held back by owo-colors 4.3 (mz-deploy terminal styling) { name = "supports-color", version = "2.1.0" }, + + # Pulled by launchdarkly-server-sdk 3.x via launchdarkly-sdk-transport / + # eventsource-client (proxy/timeout/rustls stack and the SDK's RNG path). + # NB: tower 0.4.13 is already skipped above (mz-deploy). + { name = "rustls-native-certs", version = "0.7.3" }, + { name = "rand", version = "0.10.1" }, + { name = "rand_core", version = "0.10.1" }, + { name = "getrandom", version = "0.4.2" }, + { name = "cpufeatures", version = "0.3.0" }, ] [[bans.deny]] @@ -206,6 +215,7 @@ wrappers = [ "globset", "launchdarkly-server-sdk", "launchdarkly-server-sdk-evaluation", + "launchdarkly-sdk-transport", "native-tls", "opendal", "os_info", diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index 74da334662554..32c6eeed02f92 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -29,10 +29,10 @@ hex.workspace = true humantime.workspace = true imbl.workspace = true http.workspace = true -hyper-tls = "0.5.0" ipnet.workspace = true itertools.workspace = true launchdarkly-server-sdk.workspace = true +launchdarkly-sdk-transport.workspace = true maplit.workspace = true mz-adapter-types = { path = "../adapter-types" } mz-audit-log = { path = "../audit-log" } diff --git a/src/adapter/src/config/frontend.rs b/src/adapter/src/config/frontend.rs index da19751f24cdc..67e841bc04f2f 100644 --- a/src/adapter/src/config/frontend.rs +++ b/src/adapter/src/config/frontend.rs @@ -10,14 +10,16 @@ use std::collections::BTreeMap; use std::fs; use std::path::PathBuf; -use std::sync::Arc; use std::time::Duration; +use bytes::Bytes; use derivative::Derivative; -use hyper_tls::HttpsConnector; +use futures::TryStreamExt; +use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture}; use launchdarkly_server_sdk as ld; use mz_build_info::BuildInfo; use mz_cloud_provider::CloudProvider; +use mz_ore::metrics::UIntGauge; use mz_ore::now::NowFn; use mz_sql::catalog::EnvironmentId; use serde_json::Value as JsonValue; @@ -64,7 +66,7 @@ impl SystemParameterFrontend { /// Create a new [SystemParameterFrontend] initialize. /// /// This will create and initialize an [ld::Client] instance. The - /// [ld::Client::initialized_async] call will be attempted in a loop with an + /// [ld::Client::wait_for_initialization] call will be attempted in a loop with an /// exponential backoff with power `2s` and max duration `60s`. pub async fn from(sync_config: &SystemParameterSyncConfig) -> Result { match &sync_config.backend_config { @@ -146,25 +148,68 @@ impl SystemParameterFrontend { } } -fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config { +/// An [`HttpTransport`] wrapper that records timestamps on successful HTTP +/// responses. Used to populate Prometheus metrics that track LaunchDarkly +/// connectivity health. +/// +/// Two instances are created — one for the event processor (CSE metric, tracks +/// outbound event sends) and one for the streaming data source (SSE metric, +/// tracks inbound SSE events). +#[derive(Clone)] +struct MetricsTransport { + inner: T, + last_success_gauge: UIntGauge, + now_fn: NowFn, +} + +impl HttpTransport for MetricsTransport { + fn request(&self, request: http::Request>) -> ResponseFuture { + let inner_fut = self.inner.request(request); + let gauge = self.last_success_gauge.clone(); + let now_fn = self.now_fn.clone(); + Box::pin(async move { + let resp = inner_fut.await?; + if resp.status().is_success() { + gauge.set(now_fn() / 1000); + let (parts, body) = resp.into_parts(); + let wrapped: ByteStream = Box::pin(body.inspect_ok(move |_| { + gauge.set(now_fn() / 1000); + })); + Ok(http::Response::from_parts(parts, wrapped)) + } else { + Ok(resp) + } + }) + } +} + +fn ld_config(api_key: &str, metrics: &Metrics, now_fn: &NowFn) -> ld::Config { + let transport = launchdarkly_sdk_transport::HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(300)) + .build_https() + .expect("failed to create HTTPS transport"); + + let cse_transport = MetricsTransport { + inner: transport.clone(), + last_success_gauge: metrics.last_cse_time_seconds.clone(), + now_fn: now_fn.clone(), + }; + let data_source_transport = MetricsTransport { + inner: transport, + last_success_gauge: metrics.last_sse_time_seconds.clone(), + now_fn: now_fn.clone(), + }; + + let mut event_processor = ld::EventProcessorBuilder::new(); + event_processor.transport(cse_transport); + + let mut data_source = ld::StreamingDataSourceBuilder::new(); + data_source.transport(data_source_transport); + ld::ConfigBuilder::new(api_key) - .event_processor( - ld::EventProcessorBuilder::new() - .https_connector(HttpsConnector::new()) - .on_success({ - let last_cse_time_seconds = metrics.last_cse_time_seconds.clone(); - Arc::new(move |result| { - if let Ok(ts) = u64::try_from(result.time_from_server / 1000) { - last_cse_time_seconds.set(ts); - } else { - tracing::warn!( - "Cannot convert time_from_server / 1000 from u128 to u64" - ); - } - }) - }), - ) - .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new())) + .event_processor(&event_processor) + .data_source(&data_source) .build() .expect("valid config") } @@ -174,19 +219,9 @@ async fn ld_client( metrics: &Metrics, now_fn: &NowFn, ) -> Result { - let ld_client = ld::Client::build(ld_config(api_key, metrics))?; + let ld_client = ld::Client::build(ld_config(api_key, metrics, now_fn))?; tracing::info!("waiting for SystemParameterFrontend to initialize"); - // Start and initialize LD client for the frontend. The callback passed - // will export the last time when an SSE event from the LD server was - // received in a Prometheus metric. - ld_client.start_with_default_executor_and_callback({ - let last_sse_time_seconds = metrics.last_sse_time_seconds.clone(); - let now_fn = now_fn.clone(); - Arc::new(move |_ev| { - let ts = now_fn() / 1000; - last_sse_time_seconds.set(ts); - }) - }); + ld_client.start_with_default_executor(); let max_backoff = Duration::from_secs(60); let mut backoff = Duration::from_secs(5); @@ -273,3 +308,221 @@ fn ld_ctx( ctx_builder.build().map_err(|e| anyhow::anyhow!(e)) } + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use launchdarkly_sdk_transport::{ByteStream, TransportError}; + use mz_ore::metrics::MetricsRegistry; + use std::sync::Arc; + use std::sync::atomic::{AtomicU64, Ordering}; + + /// A fake transport that simulates a long-lived SSE streaming connection: + /// returns 200 OK immediately, then delivers multiple SSE events as body + /// chunks (exactly how LaunchDarkly's streaming data source works). + #[derive(Clone)] + struct FakeSseTransport; + + impl HttpTransport for FakeSseTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + let body: ByteStream = Box::pin(futures::stream::iter(vec![ + Ok(Bytes::from("event: put\ndata: {\"flags\":{}}\n\n")), + Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag1\"}\n\n")), + Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag2\"}\n\n")), + ])); + Box::pin(async move { + http::Response::builder() + .status(200) + .body(body) + .map_err(|e| TransportError::new(std::io::Error::other(e))) + }) + } + } + + /// A fake transport that returns an error, simulating a failed connection. + #[derive(Clone)] + struct FailingTransport; + + impl HttpTransport for FailingTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + Box::pin(async move { + Err(TransportError::new(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "connection refused", + ))) + }) + } + } + + /// A fake transport that returns 200 OK, delivers one event, then errors + /// mid-stream with a timeout: the non-Eof stream error a dropped long-lived + /// SSE connection surfaces. + #[derive(Clone)] + struct MidStreamFailureTransport; + + impl HttpTransport for MidStreamFailureTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + let body: ByteStream = Box::pin(futures::stream::iter(vec![ + Ok(Bytes::from("event: put\ndata: {\"flags\":{}}\n\n")), + Err(TransportError::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "body timed out", + ))), + ])); + Box::pin(async move { + http::Response::builder() + .status(200) + .body(body) + .map_err(|e| TransportError::new(std::io::Error::other(e))) + }) + } + } + + fn test_gauge(registry: &MetricsRegistry, name: &str) -> UIntGauge { + registry.register(mz_ore::metric!( + name: name, + help: "test gauge", + )) + } + + /// Verifies that MetricsTransport updates the gauge on each body chunk, + /// not just on the initial HTTP 200 response head. This matters for + /// long-lived streaming connections where SSE events arrive as body chunks. + #[mz_ore::test(tokio::test)] + async fn test_metric_updated_on_body_chunks() -> Result<(), anyhow::Error> { + let time = Arc::new(AtomicU64::new(1_000_000)); + let time_clone = Arc::clone(&time); + let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst)); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_sse_gauge"); + + let transport = MetricsTransport { + inner: FakeSseTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + assert_eq!(gauge.get(), 0); + + let request = http::Request::builder() + .uri("https://stream.launchdarkly.com/all") + .body(None)?; + let response = transport.request(request).await?; + + assert_eq!(gauge.get(), 1000); + + time.store(2_800_000, Ordering::SeqCst); + + let mut body = response.into_body(); + let mut event_count = 0; + while let Some(Ok(_chunk)) = body.next().await { + event_count += 1; + } + assert_eq!(event_count, 3); + + assert_eq!(gauge.get(), 2800); + Ok(()) + } + + #[mz_ore::test(tokio::test)] + async fn test_cse_metric_updates_correctly_per_request() -> Result<(), anyhow::Error> { + let time = Arc::new(AtomicU64::new(1_000_000)); + let time_clone = Arc::clone(&time); + let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst)); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_cse_gauge"); + + let transport = MetricsTransport { + inner: FakeSseTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + let req = || -> Result>, http::Error> { + http::Request::builder() + .uri("https://events.launchdarkly.com/bulk") + .body(None) + }; + + let _ = transport.request(req()?).await?; + assert_eq!(gauge.get(), 1000); + + time.store(2_000_000, Ordering::SeqCst); + let _ = transport.request(req()?).await?; + assert_eq!(gauge.get(), 2000); + + time.store(3_000_000, Ordering::SeqCst); + let _ = transport.request(req()?).await?; + assert_eq!(gauge.get(), 3000); + Ok(()) + } + + #[mz_ore::test(tokio::test)] + async fn test_metric_not_updated_on_failed_request() -> Result<(), anyhow::Error> { + let now_fn = NowFn::from(|| 5_000_000u64); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_fail_gauge"); + + let transport = MetricsTransport { + inner: FailingTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + let request = http::Request::builder() + .uri("https://stream.launchdarkly.com/all") + .body(None)?; + let result = transport.request(request).await; + assert!(result.is_err()); + assert_eq!(gauge.get(), 0, "gauge must not update on transport error"); + Ok(()) + } + + /// Verifies that when an SSE connection returns 200 OK and then dies + /// mid-stream, `last_sse_time_seconds` advances only for the events that + /// arrived and then freezes — the frozen timestamp is what lets the + /// staleness alert detect a stuck data source. + #[mz_ore::test(tokio::test)] + async fn test_metric_frozen_on_midstream_error() -> Result<(), anyhow::Error> { + let time = Arc::new(AtomicU64::new(1_000_000)); + let time_clone = Arc::clone(&time); + let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst)); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_midstream_gauge"); + + let transport = MetricsTransport { + inner: MidStreamFailureTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + // The 200 OK response head updates the gauge. + let request = http::Request::builder() + .uri("https://stream.launchdarkly.com/all") + .body(None)?; + let response = transport.request(request).await?; + assert_eq!(gauge.get(), 1000); + + // The first event arrives and advances the gauge. + time.store(2_000_000, Ordering::SeqCst); + let mut body = response.into_body(); + assert!(matches!(body.next().await, Some(Ok(_)))); + assert_eq!(gauge.get(), 2000); + + // The stream then errors mid-flight. Time has moved forward, but the + // gauge must stay frozen at the last successful event. + time.store(9_000_000, Ordering::SeqCst); + assert!(matches!(body.next().await, Some(Err(_)))); + assert_eq!( + gauge.get(), + 2000, + "gauge must freeze on mid-stream error so the staleness alert can fire" + ); + Ok(()) + } +} diff --git a/src/dyncfg-launchdarkly/Cargo.toml b/src/dyncfg-launchdarkly/Cargo.toml index 42ff83496db0e..b1efcbece2523 100644 --- a/src/dyncfg-launchdarkly/Cargo.toml +++ b/src/dyncfg-launchdarkly/Cargo.toml @@ -13,8 +13,8 @@ workspace = true [dependencies] anyhow.workspace = true humantime.workspace = true -hyper-tls = "0.5.0" launchdarkly-server-sdk.workspace = true +launchdarkly-sdk-transport.workspace = true mz-build-info = { path = "../build-info" } mz-dyncfg = { path = "../dyncfg" } mz-ore = { path = "../ore", default-features = false, features = ["async"] } diff --git a/src/dyncfg-launchdarkly/src/lib.rs b/src/dyncfg-launchdarkly/src/lib.rs index 3e25eb5b41bb4..50f412f777763 100644 --- a/src/dyncfg-launchdarkly/src/lib.rs +++ b/src/dyncfg-launchdarkly/src/lib.rs @@ -11,7 +11,6 @@ use std::time::Duration; -use hyper_tls::HttpsConnector; use launchdarkly_server_sdk as ld; use mz_build_info::BuildInfo; use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal}; @@ -50,13 +49,21 @@ where let _ = dyn_into_flag(entry.val())?; } let ld_client = if let Some(key) = launchdarkly_sdk_key { + let transport = launchdarkly_sdk_transport::HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(300)) + .build_https() + .expect("failed to create HTTPS transport"); + + let mut data_source = ld::StreamingDataSourceBuilder::new(); + data_source.transport(transport.clone()); + + let mut event_processor = ld::EventProcessorBuilder::new(); + event_processor.transport(transport); + let config = ld::ConfigBuilder::new(key) - .event_processor( - ld::EventProcessorBuilder::new().https_connector(HttpsConnector::new()), - ) - .data_source( - ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()), - ) + .data_source(&data_source) + .event_processor(&event_processor) .build() .expect("valid config"); let client = ld::Client::build(config)?;