From 126b7b834158e655298101981c3cb52d1ea0f87b Mon Sep 17 00:00:00 2001 From: Jean-Marc Le Roux Date: Wed, 21 Jan 2026 21:00:09 +0100 Subject: [PATCH 1/5] feat(core): add http-client-reqwest feature, mark reqwest/tokio optional --- core/core/Cargo.toml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/core/Cargo.toml b/core/core/Cargo.toml index a9298eeb1c00..e5f27170a0e5 100644 --- a/core/core/Cargo.toml +++ b/core/core/Cargo.toml @@ -39,8 +39,11 @@ all-features = true [features] default = ["reqwest-rustls-tls", "executors-tokio"] +# Enable default http client using reqwest. +http-client-reqwest = ["dep:reqwest"] + # Enable reqwest rustls tls support. -reqwest-rustls-tls = ["reqwest/rustls-tls"] +reqwest-rustls-tls = ["http-client-reqwest", "reqwest/rustls-tls"] # Enable opendal's blocking support. blocking = ["internal-tokio-rt"] @@ -77,10 +80,10 @@ percent-encoding = "2" quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] } reqwest = { version = "0.12.24", features = [ "stream", -], default-features = false } +], default-features = false, optional = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -tokio = { workspace = true, features = ["macros", "io-util"] } +tokio = { workspace = true, features = ["macros", "io-util"], optional = true } url = { workspace = true } uuid = { workspace = true, features = ["serde", "v4"] } From b06b57ec5d827eadbe536193a81252f772bbd066 Mon Sep 17 00:00:00 2001 From: Jean-Marc Le Roux Date: Wed, 21 Jan 2026 21:08:50 +0100 Subject: [PATCH 2/5] refactor(core): extract reqwest code to feature-gated module Move reqwest-specific HTTP client implementation to a separate module that is conditionally compiled only when http-client-reqwest feature is enabled. This makes reqwest/tokio truly optional dependencies. - Create reqwest_impl.rs with GLOBAL_REQWEST_CLIENT, HttpFetch impl, is_temporary_error helper, and HttpBufferBody - Add StubHttpClient for builds without reqwest that returns an error when used, allowing Default to work in all configurations - Gate HttpClient::new() with #[cfg(feature = "http-client-reqwest")] - Update mod.rs to conditionally include reqwest_impl module --- core/core/src/raw/http_util/client.rs | 188 +++++--------------- core/core/src/raw/http_util/mod.rs | 8 +- core/core/src/raw/http_util/reqwest_impl.rs | 172 ++++++++++++++++++ 3 files changed, 219 insertions(+), 149 deletions(-) create mode 100644 core/core/src/raw/http_util/reqwest_impl.rs diff --git a/core/core/src/raw/http_util/client.rs b/core/core/src/raw/http_util/client.rs index 4c69cb8907e3..849e9ec75fe1 100644 --- a/core/core/src/raw/http_util/client.rs +++ b/core/core/src/raw/http_util/client.rs @@ -15,40 +15,20 @@ // specific language governing permissions and limitations // under the License. -use std::convert::Infallible; use std::fmt::Debug; use std::fmt::Formatter; -use std::future; -use std::mem; use std::ops::Deref; -use std::pin::Pin; -use std::str::FromStr; use std::sync::Arc; -use std::sync::LazyLock; -use std::task::Context; -use std::task::Poll; -use bytes::Bytes; use futures::Future; -use futures::TryStreamExt; use http::Request; use http::Response; -use http_body::Frame; -use http_body::SizeHint; -use raw::oio::Read; use super::HttpBody; -use super::parse_content_encoding; -use super::parse_content_length; +use crate::raw::oio::Read; use crate::raw::*; use crate::*; -/// Http client used across opendal for loading credentials. -/// This is merely a temporary solution because reqsign requires a reqwest client to be passed. -/// We will remove it after the next major version of reqsign, which will enable users to provide their own client. -#[allow(dead_code)] -pub static GLOBAL_REQWEST_CLIENT: LazyLock = LazyLock::new(reqwest::Client::new); - /// HttpFetcher is a type erased [`HttpFetch`]. pub type HttpFetcher = Arc; @@ -69,21 +49,60 @@ impl Debug for HttpClient { } } +#[cfg(feature = "http-client-reqwest")] +impl Default for HttpClient { + fn default() -> Self { + Self { + fetcher: Arc::new(super::reqwest_impl::GLOBAL_REQWEST_CLIENT.clone()), + } + } +} + +#[cfg(not(feature = "http-client-reqwest"))] impl Default for HttpClient { fn default() -> Self { Self { - fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()), + fetcher: Arc::new(StubHttpClient), } } } +/// A stub HTTP client that returns an error when used. +/// This is used when no HTTP client implementation is available. +#[cfg(not(feature = "http-client-reqwest"))] +struct StubHttpClient; + +#[cfg(not(feature = "http-client-reqwest"))] +impl HttpFetch for StubHttpClient { + async fn fetch(&self, _req: Request) -> Result> { + Err(Error::new( + ErrorKind::ConfigInvalid, + "No HTTP client available. Enable the 'http-client-reqwest' feature or provide a custom HTTP client via HttpClient::with().", + )) + } +} + impl HttpClient { /// Create a new http client in async context. + #[cfg(feature = "http-client-reqwest")] pub fn new() -> Result { Ok(Self::default()) } - /// Construct `Self` with given [`reqwest::Client`] + /// Create a new http client in async context. + /// + /// Returns an error when no HTTP client implementation is available. + /// Either enable the `http-client-reqwest` feature or use [`HttpClient::with`] + /// to provide a custom client. + #[cfg(not(feature = "http-client-reqwest"))] + pub fn new() -> Result { + Err(Error::new( + ErrorKind::ConfigInvalid, + "No HTTP client available. Enable the 'http-client-reqwest' feature or provide a custom HTTP client via HttpClient::with().", + )) + } + + /// Construct `Self` with given client that implements [`HttpFetch`] pub fn with(client: impl HttpFetch) -> Self { let fetcher = Arc::new(client); Self { fetcher } @@ -140,126 +159,3 @@ impl HttpFetch for Arc { self.deref().fetch_dyn(req).await } } - -impl HttpFetch for reqwest::Client { - async fn fetch(&self, req: Request) -> Result> { - // Uri stores all string alike data in `Bytes` which means - // the clone here is cheap. - let uri = req.uri().clone(); - let is_head = req.method() == http::Method::HEAD; - - let (parts, body) = req.into_parts(); - - let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| { - Error::new(ErrorKind::Unexpected, "request url is invalid") - .with_operation("http_util::Client::send::fetch") - .with_context("url", uri.to_string()) - .set_source(err) - })?; - - let mut req_builder = self.request(parts.method, url).headers(parts.headers); - - // Client under wasm doesn't support set version. - #[cfg(not(target_arch = "wasm32"))] - { - req_builder = req_builder.version(parts.version); - } - - // Don't set body if body is empty. - if !body.is_empty() { - #[cfg(not(target_arch = "wasm32"))] - { - req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body))) - } - #[cfg(target_arch = "wasm32")] - { - req_builder = req_builder.body(reqwest::Body::from(body.to_bytes())) - } - } - - let mut resp = req_builder.send().await.map_err(|err| { - Error::new(ErrorKind::Unexpected, "send http request") - .with_operation("http_util::Client::send") - .with_context("url", uri.to_string()) - .with_temporary(is_temporary_error(&err)) - .set_source(err) - })?; - - // Get content length from header so that we can check it. - // - // - If the request method is HEAD, we will ignore content length. - // - If response contains content_encoding, we should omit its content length. - let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() { - None - } else { - parse_content_length(resp.headers())? - }; - - let mut hr = Response::builder() - .status(resp.status()) - // Insert uri into response extension so that we can fetch - // it later. - .extension(uri.clone()); - - // Response builder under wasm doesn't support set version. - #[cfg(not(target_arch = "wasm32"))] - { - hr = hr.version(resp.version()); - } - - // Swap headers directly instead of copy the entire map. - mem::swap(hr.headers_mut().unwrap(), resp.headers_mut()); - - let bs = HttpBody::new( - resp.bytes_stream() - .try_filter(|v| future::ready(!v.is_empty())) - .map_ok(Buffer::from) - .map_err(move |err| { - Error::new(ErrorKind::Unexpected, "read data from http response") - .with_operation("http_util::Client::send") - .with_context("url", uri.to_string()) - .with_temporary(is_temporary_error(&err)) - .set_source(err) - }), - content_length, - ); - - let resp = hr.body(bs).expect("response must build succeed"); - Ok(resp) - } -} - -#[inline] -fn is_temporary_error(err: &reqwest::Error) -> bool { - // error sending request - err.is_request()|| - // request or response body error - err.is_body() || - // error decoding response body, for example, connection reset. - err.is_decode() -} - -struct HttpBufferBody(Buffer); - -impl http_body::Body for HttpBufferBody { - type Data = Bytes; - type Error = Infallible; - - fn poll_frame( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll, Self::Error>>> { - match self.0.next() { - Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))), - None => Poll::Ready(None), - } - } - - fn is_end_stream(&self) -> bool { - self.0.is_empty() - } - - fn size_hint(&self) -> SizeHint { - SizeHint::with_exact(self.0.len() as u64) - } -} diff --git a/core/core/src/raw/http_util/mod.rs b/core/core/src/raw/http_util/mod.rs index ae6b1e585999..6642e6b36864 100644 --- a/core/core/src/raw/http_util/mod.rs +++ b/core/core/src/raw/http_util/mod.rs @@ -23,13 +23,15 @@ //! it easier to develop services and layers outside opendal. mod client; -/// temporary client used by several features -#[allow(unused_imports)] -pub use client::GLOBAL_REQWEST_CLIENT; pub use client::HttpClient; pub use client::HttpFetch; pub use client::HttpFetcher; +#[cfg(feature = "http-client-reqwest")] +pub(crate) mod reqwest_impl; +#[cfg(feature = "http-client-reqwest")] +pub use self::reqwest_impl::GLOBAL_REQWEST_CLIENT; + mod body; pub use body::HttpBody; diff --git a/core/core/src/raw/http_util/reqwest_impl.rs b/core/core/src/raw/http_util/reqwest_impl.rs new file mode 100644 index 000000000000..02e6778ecf44 --- /dev/null +++ b/core/core/src/raw/http_util/reqwest_impl.rs @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reqwest-specific HTTP client implementation. + +use std::convert::Infallible; +use std::future; +use std::mem; +use std::pin::Pin; +use std::str::FromStr; +use std::sync::LazyLock; +use std::task::Context; +use std::task::Poll; + +use bytes::Bytes; +use futures::TryStreamExt; +use http::Request; +use http::Response; +use http_body::Frame; +use http_body::SizeHint; + +use super::HttpBody; +use super::HttpFetch; +use super::parse_content_encoding; +use super::parse_content_length; +use crate::Buffer; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +/// Http client used across opendal for loading credentials. +/// This is merely a temporary solution because reqsign requires a reqwest client to be passed. +/// We will remove it after the next major version of reqsign, which will enable users to provide their own client. +#[allow(dead_code)] +pub static GLOBAL_REQWEST_CLIENT: LazyLock = LazyLock::new(reqwest::Client::new); + +impl HttpFetch for reqwest::Client { + async fn fetch(&self, req: Request) -> Result> { + // Uri stores all string alike data in `Bytes` which means + // the clone here is cheap. + let uri = req.uri().clone(); + let is_head = req.method() == http::Method::HEAD; + + let (parts, body) = req.into_parts(); + + let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| { + Error::new(ErrorKind::Unexpected, "request url is invalid") + .with_operation("http_util::Client::send::fetch") + .with_context("url", uri.to_string()) + .set_source(err) + })?; + + let mut req_builder = self.request(parts.method, url).headers(parts.headers); + + // Client under wasm doesn't support set version. + #[cfg(not(target_arch = "wasm32"))] + { + req_builder = req_builder.version(parts.version); + } + + // Don't set body if body is empty. + if !body.is_empty() { + #[cfg(not(target_arch = "wasm32"))] + { + req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body))) + } + #[cfg(target_arch = "wasm32")] + { + req_builder = req_builder.body(reqwest::Body::from(body.to_bytes())) + } + } + + let mut resp = req_builder.send().await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "send http request") + .with_operation("http_util::Client::send") + .with_context("url", uri.to_string()) + .with_temporary(is_temporary_error(&err)) + .set_source(err) + })?; + + // Get content length from header so that we can check it. + // + // - If the request method is HEAD, we will ignore content length. + // - If response contains content_encoding, we should omit its content length. + let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() { + None + } else { + parse_content_length(resp.headers())? + }; + + let mut hr = Response::builder() + .status(resp.status()) + // Insert uri into response extension so that we can fetch + // it later. + .extension(uri.clone()); + + // Response builder under wasm doesn't support set version. + #[cfg(not(target_arch = "wasm32"))] + { + hr = hr.version(resp.version()); + } + + // Swap headers directly instead of copy the entire map. + mem::swap(hr.headers_mut().unwrap(), resp.headers_mut()); + + let bs = HttpBody::new( + resp.bytes_stream() + .try_filter(|v| future::ready(!v.is_empty())) + .map_ok(Buffer::from) + .map_err(move |err| { + Error::new(ErrorKind::Unexpected, "read data from http response") + .with_operation("http_util::Client::send") + .with_context("url", uri.to_string()) + .with_temporary(is_temporary_error(&err)) + .set_source(err) + }), + content_length, + ); + + let resp = hr.body(bs).expect("response must build succeed"); + Ok(resp) + } +} + +#[inline] +fn is_temporary_error(err: &reqwest::Error) -> bool { + // error sending request + err.is_request()|| + // request or response body error + err.is_body() || + // error decoding response body, for example, connection reset. + err.is_decode() +} + +struct HttpBufferBody(Buffer); + +impl http_body::Body for HttpBufferBody { + type Data = Bytes; + type Error = Infallible; + + fn poll_frame( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.0.next() { + Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))), + None => Poll::Ready(None), + } + } + + fn is_end_stream(&self) -> bool { + self.0.is_empty() + } + + fn size_hint(&self) -> SizeHint { + SizeHint::with_exact(self.0.len() as u64) + } +} From dec37c314e3201ecd66735e3244c81c74c56524b Mon Sep 17 00:00:00 2001 From: Jean-Marc Le Roux Date: Wed, 21 Jan 2026 21:13:38 +0100 Subject: [PATCH 3/5] feat(opendal): forward http-client-reqwest feature to core --- core/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/core/Cargo.toml b/core/Cargo.toml index 24a41d14b54e..9e2af6701e5f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -91,6 +91,7 @@ default = [ "layers-timeout", ] executors-tokio = ["opendal-core/executors-tokio"] +http-client-reqwest = ["opendal-core/http-client-reqwest"] internal-path-cache = ["opendal-core/internal-path-cache"] internal-tokio-rt = ["opendal-core/internal-tokio-rt"] layers-async-backtrace = ["dep:opendal-layer-async-backtrace"] From 977b931caf3105e0cd8bd6dc4658150d98997074 Mon Sep 17 00:00:00 2001 From: Jean-Marc Le Roux Date: Wed, 21 Jan 2026 21:19:53 +0100 Subject: [PATCH 4/5] fix(core): use std::time for WASI targets instead of web_time The web_time::web module only exists for browser WASM (target_os = "unknown"), not for WASI targets. This caused compilation failures for wasm32-wasip2. Changed cfg conditions to only use web_time for browser WASM, allowing WASI to use the standard std::time::SystemTime which it supports. --- core/core/src/raw/time.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/core/src/raw/time.rs b/core/core/src/raw/time.rs index b43f61a82f74..f1a3e3c4dc6d 100644 --- a/core/core/src/raw/time.rs +++ b/core/core/src/raw/time.rs @@ -24,9 +24,9 @@ use std::ops::{Add, AddAssign, Sub, SubAssign}; use std::str::FromStr; pub use std::time::{Duration, UNIX_EPOCH}; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] pub use std::time::{Instant, SystemTime}; -#[cfg(target_arch = "wasm32")] +#[cfg(all(target_arch = "wasm32", target_os = "unknown"))] pub use web_time::{Instant, SystemTime}; /// An instant in time represented as the number of nanoseconds since the Unix epoch. @@ -179,12 +179,12 @@ impl From for Timestamp { impl From for SystemTime { fn from(ts: Timestamp) -> Self { - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] { SystemTime::from(ts.0) } - #[cfg(target_arch = "wasm32")] + #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] { use std::time::SystemTime as StdSystemTime; @@ -199,12 +199,12 @@ impl TryFrom for Timestamp { fn try_from(t: SystemTime) -> Result { let t = { - #[cfg(not(target_arch = "wasm32"))] + #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] { t } - #[cfg(target_arch = "wasm32")] + #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] { ::to_std(t) } From 14055e12a8b7cca11493e2e95f48d19dda644475 Mon Sep 17 00:00:00 2001 From: Jean-Marc Le Roux Date: Wed, 21 Jan 2026 23:08:50 +0100 Subject: [PATCH 5/5] ci(core): add wasm32-wasip2 build job Adds a CI job to verify WASI P2 compatibility. This catches issues like the web_time cfg condition fix where web_time::web only exists for browser WASM (target_os = unknown), not WASI. Includes compatible layers and services that don't require reqwest/tokio: - layers: capability-check, chaos, concurrent-limit, immutable-index, logging, metrics, mime-guess, retry, throttle, timeout - services: dashmap, mini-moka --- .github/workflows/ci_core.yml | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/.github/workflows/ci_core.yml b/.github/workflows/ci_core.yml index 78a8a43f0339..d0da959483f4 100644 --- a/.github/workflows/ci_core.yml +++ b/.github/workflows/ci_core.yml @@ -241,6 +241,36 @@ jobs: rustup target add wasm32-unknown-unknown cargo build --target wasm32-unknown-unknown --no-default-features --features="${FEATURES[*]}" --locked + # Build under WASI to ensure WASI compatibility (uses std::time instead of web_time) + build_under_wasi: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - name: Setup Rust toolchain + uses: ./.github/actions/setup + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + - name: Build + working-directory: core + run: | + FEATURES=( + layers-capability-check + layers-chaos + layers-concurrent-limit + layers-immutable-index + layers-logging + layers-metrics + layers-mime-guess + layers-retry + layers-throttle + layers-timeout + services-dashmap + services-mini-moka + ) + rustup toolchain install nightly + rustup target add wasm32-wasip2 --toolchain nightly + cargo +nightly build --lib --target wasm32-wasip2 --no-default-features --features="${FEATURES[*]}" --locked + unit: runs-on: ubuntu-latest steps: