From 79e81ae42df3b7365254abf8891505b3529574dc Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Thu, 26 Feb 2026 20:00:07 +0800 Subject: [PATCH 1/2] feat: Implement OAuth2 authentication for REST catalog --- src/iceberg/catalog/rest/CMakeLists.txt | 1 + src/iceberg/catalog/rest/auth/auth_manager.cc | 137 ++++++++++++++++++ .../catalog/rest/auth/auth_manager_internal.h | 5 + .../catalog/rest/auth/auth_managers.cc | 1 + .../catalog/rest/auth/auth_properties.h | 2 + src/iceberg/catalog/rest/auth/auth_session.cc | 15 ++ src/iceberg/catalog/rest/auth/auth_session.h | 22 +++ src/iceberg/catalog/rest/auth/oauth2_util.cc | 115 +++++++++++++++ src/iceberg/catalog/rest/auth/oauth2_util.h | 109 ++++++++++++++ src/iceberg/catalog/rest/http_client.cc | 2 +- src/iceberg/catalog/rest/meson.build | 2 + src/iceberg/catalog/rest/rest_catalog.cc | 13 +- src/iceberg/catalog/rest/type_fwd.h | 1 + src/iceberg/test/auth_manager_test.cc | 106 ++++++++++++++ 14 files changed, 529 insertions(+), 2 deletions(-) create mode 100644 src/iceberg/catalog/rest/auth/oauth2_util.cc create mode 100644 src/iceberg/catalog/rest/auth/oauth2_util.h diff --git a/src/iceberg/catalog/rest/CMakeLists.txt b/src/iceberg/catalog/rest/CMakeLists.txt index 1da47d680..2fe8d21bf 100644 --- a/src/iceberg/catalog/rest/CMakeLists.txt +++ b/src/iceberg/catalog/rest/CMakeLists.txt @@ -21,6 +21,7 @@ set(ICEBERG_REST_SOURCES auth/auth_manager.cc auth/auth_managers.cc auth/auth_session.cc + auth/oauth2_util.cc catalog_properties.cc endpoint.cc error_handlers.cc diff --git a/src/iceberg/catalog/rest/auth/auth_manager.cc b/src/iceberg/catalog/rest/auth/auth_manager.cc index 14946aef6..8c9b25672 100644 --- a/src/iceberg/catalog/rest/auth/auth_manager.cc +++ b/src/iceberg/catalog/rest/auth/auth_manager.cc @@ -19,9 +19,14 @@ #include "iceberg/catalog/rest/auth/auth_manager.h" +#include + #include "iceberg/catalog/rest/auth/auth_manager_internal.h" #include "iceberg/catalog/rest/auth/auth_properties.h" #include "iceberg/catalog/rest/auth/auth_session.h" +#include "iceberg/catalog/rest/auth/oauth2_util.h" +#include "iceberg/catalog/rest/catalog_properties.h" +#include "iceberg/catalog/rest/http_client.h" #include "iceberg/util/macros.h" #include "iceberg/util/transform_util.h" @@ -90,4 +95,136 @@ Result> MakeBasicAuthManager( return std::make_unique(); } +/// \brief OAuth2 authentication manager. +/// +/// Two-phase init: InitSession fetches and caches a token for the config request; +/// CatalogSession reuses the cached token and enables refresh. +class OAuth2AuthManager : public AuthManager { + public: + Result> InitSession( + HttpClient& init_client, + const std::unordered_map& properties) override { + // Credential takes priority: fetch a fresh token for the config request. + auto credential_it = properties.find(AuthProperties::kOAuth2Credential); + if (credential_it != properties.end() && !credential_it->second.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto ctx, ParseOAuth2Context(properties)); + auto noop_session = AuthSession::MakeDefault({}); + ICEBERG_ASSIGN_OR_RAISE(init_token_response_, + FetchToken(init_client, ctx.token_endpoint, ctx.client_id, + ctx.client_secret, ctx.scope, *noop_session)); + return AuthSession::MakeDefault( + {{"Authorization", "Bearer " + init_token_response_->access_token}}); + } + + auto token_it = properties.find(AuthProperties::kOAuth2Token); + if (token_it != properties.end() && !token_it->second.empty()) { + return AuthSession::MakeDefault({{"Authorization", "Bearer " + token_it->second}}); + } + + return AuthSession::MakeDefault({}); + } + + Result> CatalogSession( + HttpClient& client, + const std::unordered_map& properties) override { + // Reuse the token fetched during InitSession. + if (init_token_response_.has_value()) { + auto token_response = std::move(*init_token_response_); + init_token_response_.reset(); + ICEBERG_ASSIGN_OR_RAISE(auto ctx, ParseOAuth2Context(properties)); + return AuthSession::MakeOAuth2(token_response, ctx.token_endpoint, ctx.client_id, + ctx.client_secret, ctx.scope, client); + } + + // If both token and credential are provided, prefer the token. + auto token_it = properties.find(AuthProperties::kOAuth2Token); + if (token_it != properties.end() && !token_it->second.empty()) { + return AuthSession::MakeDefault({{"Authorization", "Bearer " + token_it->second}}); + } + + // Fetch a new token using client_credentials grant. + auto credential_it = properties.find(AuthProperties::kOAuth2Credential); + if (credential_it != properties.end() && !credential_it->second.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto ctx, ParseOAuth2Context(properties)); + auto noop_session = AuthSession::MakeDefault({}); + OAuthTokenResponse token_response; + ICEBERG_ASSIGN_OR_RAISE(token_response, + FetchToken(client, ctx.token_endpoint, ctx.client_id, + ctx.client_secret, ctx.scope, *noop_session)); + return AuthSession::MakeOAuth2(token_response, ctx.token_endpoint, ctx.client_id, + ctx.client_secret, ctx.scope, client); + } + + return AuthSession::MakeDefault({}); + } + + // TODO(lishuxu): Override TableSession() to support token exchange (RFC 8693). + // TODO(lishuxu): Override ContextualSession() to support per-context token exchange. + + private: + struct OAuth2Context { + std::string client_id; + std::string client_secret; + std::string token_endpoint; + std::string scope; + }; + + /// \brief Parse credential, token endpoint, and scope from properties. + static Result ParseOAuth2Context( + const std::unordered_map& properties) { + OAuth2Context ctx; + + auto credential_it = properties.find(AuthProperties::kOAuth2Credential); + if (credential_it == properties.end() || credential_it->second.empty()) { + return InvalidArgument("OAuth2 authentication requires '{}' property", + AuthProperties::kOAuth2Credential); + } + const auto& credential = credential_it->second; + auto colon_pos = credential.find(':'); + if (colon_pos == std::string::npos) { + return InvalidArgument( + "Invalid OAuth2 credential format: expected 'client_id:client_secret'"); + } + ctx.client_id = credential.substr(0, colon_pos); + ctx.client_secret = credential.substr(colon_pos + 1); + + auto uri_it = properties.find(AuthProperties::kOAuth2ServerUri); + if (uri_it != properties.end() && !uri_it->second.empty()) { + ctx.token_endpoint = uri_it->second; + } else { + // {uri}/v1/oauth/tokens. + auto catalog_uri_it = properties.find(RestCatalogProperties::kUri.key()); + if (catalog_uri_it == properties.end() || catalog_uri_it->second.empty()) { + return InvalidArgument( + "OAuth2 authentication requires '{}' or '{}' property to determine " + "token endpoint", + AuthProperties::kOAuth2ServerUri, RestCatalogProperties::kUri.key()); + } + std::string_view base = catalog_uri_it->second; + while (!base.empty() && base.back() == '/') { + base.remove_suffix(1); + } + ctx.token_endpoint = + std::string(base) + "/" + std::string(AuthProperties::kOAuth2DefaultTokenPath); + } + + ctx.scope = AuthProperties::kOAuth2DefaultScope; + auto scope_it = properties.find(AuthProperties::kOAuth2Scope); + if (scope_it != properties.end() && !scope_it->second.empty()) { + ctx.scope = scope_it->second; + } + + return ctx; + } + + /// Cached token from InitSession + std::optional init_token_response_; +}; + +Result> MakeOAuth2AuthManager( + [[maybe_unused]] std::string_view name, + [[maybe_unused]] const std::unordered_map& properties) { + return std::make_unique(); +} + } // namespace iceberg::rest::auth diff --git a/src/iceberg/catalog/rest/auth/auth_manager_internal.h b/src/iceberg/catalog/rest/auth/auth_manager_internal.h index 96e452390..e6f2b7bff 100644 --- a/src/iceberg/catalog/rest/auth/auth_manager_internal.h +++ b/src/iceberg/catalog/rest/auth/auth_manager_internal.h @@ -42,4 +42,9 @@ Result> MakeBasicAuthManager( std::string_view name, const std::unordered_map& properties); +/// \brief Create an OAuth2 authentication manager. +Result> MakeOAuth2AuthManager( + std::string_view name, + const std::unordered_map& properties); + } // namespace iceberg::rest::auth diff --git a/src/iceberg/catalog/rest/auth/auth_managers.cc b/src/iceberg/catalog/rest/auth/auth_managers.cc index d0bf24844..1410e1f87 100644 --- a/src/iceberg/catalog/rest/auth/auth_managers.cc +++ b/src/iceberg/catalog/rest/auth/auth_managers.cc @@ -65,6 +65,7 @@ AuthManagerRegistry CreateDefaultRegistry() { return { {AuthProperties::kAuthTypeNone, MakeNoopAuthManager}, {AuthProperties::kAuthTypeBasic, MakeBasicAuthManager}, + {AuthProperties::kAuthTypeOAuth2, MakeOAuth2AuthManager}, }; } diff --git a/src/iceberg/catalog/rest/auth/auth_properties.h b/src/iceberg/catalog/rest/auth/auth_properties.h index e14b7fcf7..55e081ae6 100644 --- a/src/iceberg/catalog/rest/auth/auth_properties.h +++ b/src/iceberg/catalog/rest/auth/auth_properties.h @@ -60,6 +60,8 @@ struct AuthProperties { inline static const std::string kOAuth2TokenRefreshEnabled = "token-refresh-enabled"; /// \brief Default OAuth2 scope for catalog operations. inline static const std::string kOAuth2DefaultScope = "catalog"; + /// \brief Default OAuth2 token endpoint path (relative to catalog URI). + inline static constexpr std::string_view kOAuth2DefaultTokenPath = "v1/oauth/tokens"; /// \brief Property key for SigV4 region. inline static const std::string kSigV4Region = "rest.auth.sigv4.region"; diff --git a/src/iceberg/catalog/rest/auth/auth_session.cc b/src/iceberg/catalog/rest/auth/auth_session.cc index 00ed946a9..2cfeb2c52 100644 --- a/src/iceberg/catalog/rest/auth/auth_session.cc +++ b/src/iceberg/catalog/rest/auth/auth_session.cc @@ -21,6 +21,8 @@ #include +#include "iceberg/catalog/rest/auth/oauth2_util.h" + namespace iceberg::rest::auth { namespace { @@ -49,4 +51,17 @@ std::shared_ptr AuthSession::MakeDefault( return std::make_shared(std::move(headers)); } +std::shared_ptr AuthSession::MakeOAuth2( + const OAuthTokenResponse& initial_token, const std::string& /*token_endpoint*/, + const std::string& /*client_id*/, const std::string& /*client_secret*/, + const std::string& /*scope*/, HttpClient& /*client*/) { + // TODO(lishuxu): Replace with OAuth2AuthSession that: + // - Stores access_token, refresh_token, expires_in, and token_endpoint + // - Checks token expiration in Authenticate() and transparently refreshes + // using RefreshToken() (or re-fetches via client_credentials if no + // refresh_token is available) + // For now, fall back to a static session with the initial token. + return MakeDefault({{"Authorization", "Bearer " + initial_token.access_token}}); +} + } // namespace iceberg::rest::auth diff --git a/src/iceberg/catalog/rest/auth/auth_session.h b/src/iceberg/catalog/rest/auth/auth_session.h index d81b3d939..26b93877b 100644 --- a/src/iceberg/catalog/rest/auth/auth_session.h +++ b/src/iceberg/catalog/rest/auth/auth_session.h @@ -24,6 +24,7 @@ #include #include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/type_fwd.h" #include "iceberg/result.h" /// \file iceberg/catalog/rest/auth/auth_session.h @@ -70,6 +71,27 @@ class ICEBERG_REST_EXPORT AuthSession { /// \return A new session that adds the given headers to requests. static std::shared_ptr MakeDefault( std::unordered_map headers); + + /// \brief Create an OAuth2 session with automatic token refresh. + /// + /// This factory method creates a session that holds an access token and + /// optionally a refresh token. When Authenticate() is called and the token + /// is expired, it transparently refreshes the token before setting the + /// Authorization header. + /// + /// \param initial_token The initial token response from FetchToken(). + /// \param token_endpoint Full URL of the OAuth2 token endpoint for refresh. + /// \param client_id OAuth2 client ID for refresh requests. + /// \param client_secret OAuth2 client secret for re-fetch if refresh fails. + /// \param scope OAuth2 scope for refresh requests. + /// \param client HTTP client for making refresh requests. + /// \return A new session that manages token lifecycle automatically. + static std::shared_ptr MakeOAuth2(const OAuthTokenResponse& initial_token, + const std::string& token_endpoint, + const std::string& client_id, + const std::string& client_secret, + const std::string& scope, + HttpClient& client); }; } // namespace iceberg::rest::auth diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.cc b/src/iceberg/catalog/rest/auth/oauth2_util.cc new file mode 100644 index 000000000..7254a6c84 --- /dev/null +++ b/src/iceberg/catalog/rest/auth/oauth2_util.cc @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/auth/oauth2_util.h" + +#include + +#include "iceberg/catalog/rest/auth/auth_session.h" +#include "iceberg/catalog/rest/error_handlers.h" +#include "iceberg/catalog/rest/http_client.h" +#include "iceberg/json_serde_internal.h" +#include "iceberg/util/json_util_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg::rest::auth { + +namespace { + +constexpr std::string_view kAccessToken = "access_token"; +constexpr std::string_view kTokenType = "token_type"; +constexpr std::string_view kExpiresIn = "expires_in"; +constexpr std::string_view kRefreshToken = "refresh_token"; +constexpr std::string_view kScope = "scope"; + +constexpr std::string_view kGrantType = "grant_type"; +constexpr std::string_view kClientCredentials = "client_credentials"; +constexpr std::string_view kClientId = "client_id"; +constexpr std::string_view kClientSecret = "client_secret"; + +} // namespace + +Status OAuthTokenResponse::Validate() const { + if (access_token.empty()) { + return ValidationFailed("OAuth2 token response missing required 'access_token'"); + } + if (token_type.empty()) { + return ValidationFailed("OAuth2 token response missing required 'token_type'"); + } + return {}; +} + +Result OAuthTokenResponseFromJsonString(const std::string& json_str) { + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(json_str)); + + OAuthTokenResponse response; + ICEBERG_ASSIGN_OR_RAISE(response.access_token, + GetJsonValue(json, kAccessToken)); + ICEBERG_ASSIGN_OR_RAISE(response.token_type, + GetJsonValue(json, kTokenType)); + ICEBERG_ASSIGN_OR_RAISE(response.expires_in, + GetJsonValueOrDefault(json, kExpiresIn, 0)); + ICEBERG_ASSIGN_OR_RAISE(response.refresh_token, + GetJsonValueOrDefault(json, kRefreshToken)); + ICEBERG_ASSIGN_OR_RAISE(response.scope, + GetJsonValueOrDefault(json, kScope)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + +Result FetchToken(HttpClient& client, + const std::string& token_endpoint, + const std::string& client_id, + const std::string& client_secret, + const std::string& scope, AuthSession& session) { + std::unordered_map form_data = { + {std::string(kGrantType), std::string(kClientCredentials)}, + {std::string(kClientId), client_id}, + {std::string(kClientSecret), client_secret}, + }; + if (!scope.empty()) { + form_data[std::string(kScope)] = scope; + } + + ICEBERG_ASSIGN_OR_RAISE(auto response, + client.PostForm(token_endpoint, form_data, /*headers=*/{}, + *DefaultErrorHandler::Instance(), session)); + + return OAuthTokenResponseFromJsonString(response.body()); +} + +Result RefreshToken(HttpClient& client, + const std::string& token_endpoint, + const std::string& client_id, + const std::string& refresh_token, + const std::string& scope, AuthSession& session) { + // TODO(lishuxu): Implement refresh_token grant type. + return NotImplemented("RefreshToken is not yet implemented"); +} + +Result ExchangeToken(HttpClient& client, + const std::string& token_endpoint, + const std::string& subject_token, + const std::string& subject_token_type, + const std::string& scope, AuthSession& session) { + // TODO(lishuxu): Implement token exchange (RFC 8693). + return NotImplemented("ExchangeToken is not yet implemented"); +} + +} // namespace iceberg::rest::auth diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.h b/src/iceberg/catalog/rest/auth/oauth2_util.h new file mode 100644 index 000000000..0d5b3621d --- /dev/null +++ b/src/iceberg/catalog/rest/auth/oauth2_util.h @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/type_fwd.h" +#include "iceberg/result.h" + +/// \file iceberg/catalog/rest/auth/oauth2_util.h +/// \brief OAuth2 token utilities for REST catalog authentication. + +namespace iceberg::rest::auth { + +/// \brief Response from an OAuth2 token endpoint. +struct ICEBERG_REST_EXPORT OAuthTokenResponse { + std::string access_token; // required + std::string token_type; // required, typically "bearer" + int64_t expires_in = 0; // optional, seconds until expiration + std::string refresh_token; // optional + std::string scope; // optional + + /// \brief Validates the token response. + Status Validate() const; + + bool operator==(const OAuthTokenResponse&) const = default; +}; + +/// \brief Parse an OAuthTokenResponse from a JSON string. +/// +/// \param json_str The JSON string to parse. +/// \return The parsed token response or an error. +ICEBERG_REST_EXPORT Result OAuthTokenResponseFromJsonString( + const std::string& json_str); + +/// \brief Fetch an OAuth2 token using the client_credentials grant type. +/// +/// Sends a POST request with form-encoded body to the token endpoint: +/// grant_type=client_credentials&client_id=...&client_secret=...&scope=... +/// +/// \param client HTTP client to use for the request. +/// \param token_endpoint Full URL of the OAuth2 token endpoint. +/// \param client_id OAuth2 client ID. +/// \param client_secret OAuth2 client secret. +/// \param scope OAuth2 scope to request. +/// \param session Auth session for the request (typically a no-op session). +/// \return The token response or an error. +ICEBERG_REST_EXPORT Result FetchToken( + HttpClient& client, const std::string& token_endpoint, const std::string& client_id, + const std::string& client_secret, const std::string& scope, AuthSession& session); + +/// \brief Refresh an expired access token using a refresh_token grant. +/// +/// Sends a POST request with form-encoded body to the token endpoint: +/// grant_type=refresh_token&refresh_token=...&client_id=...&scope=... +/// +/// \param client HTTP client to use for the request. +/// \param token_endpoint Full URL of the OAuth2 token endpoint. +/// \param client_id OAuth2 client ID (may be empty if not required by server). +/// \param refresh_token The refresh token from a previous token response. +/// \param scope OAuth2 scope to request. +/// \param session Auth session for the request. +/// \return A new token response with a fresh access_token, or an error. +ICEBERG_REST_EXPORT Result RefreshToken( + HttpClient& client, const std::string& token_endpoint, const std::string& client_id, + const std::string& refresh_token, const std::string& scope, AuthSession& session); + +/// \brief Exchange a token for a scoped token using RFC 8693 Token Exchange. +/// +/// Sends a POST request with form-encoded body to the token endpoint: +/// grant_type=urn:ietf:params:oauth:grant-type:token-exchange +/// &subject_token=...&subject_token_type=...&scope=... +/// +/// Used by TableSession and ContextualSession to obtain table/context-specific +/// tokens from a parent session's access token. +/// +/// \param client HTTP client to use for the request. +/// \param token_endpoint Full URL of the OAuth2 token endpoint. +/// \param subject_token The access token to exchange. +/// \param subject_token_type Token type URI (typically +/// "urn:ietf:params:oauth:token-type:access_token"). +/// \param scope OAuth2 scope to request for the exchanged token. +/// \param session Auth session for the request. +/// \return A new token response with a scoped access_token, or an error. +ICEBERG_REST_EXPORT Result ExchangeToken( + HttpClient& client, const std::string& token_endpoint, + const std::string& subject_token, const std::string& subject_token_type, + const std::string& scope, AuthSession& session); + +} // namespace iceberg::rest::auth diff --git a/src/iceberg/catalog/rest/http_client.cc b/src/iceberg/catalog/rest/http_client.cc index b0824621b..2e383b0ae 100644 --- a/src/iceberg/catalog/rest/http_client.cc +++ b/src/iceberg/catalog/rest/http_client.cc @@ -75,7 +75,7 @@ Result BuildHeaders( auth::AuthSession& session) { std::unordered_map headers(default_headers); for (const auto& [key, val] : request_headers) { - headers.emplace(key, val); + headers.insert_or_assign(key, val); } ICEBERG_RETURN_UNEXPECTED(session.Authenticate(headers)); return cpr::Header(headers.begin(), headers.end()); diff --git a/src/iceberg/catalog/rest/meson.build b/src/iceberg/catalog/rest/meson.build index 3a333963a..f1d4c33f5 100644 --- a/src/iceberg/catalog/rest/meson.build +++ b/src/iceberg/catalog/rest/meson.build @@ -19,6 +19,7 @@ iceberg_rest_sources = files( 'auth/auth_manager.cc', 'auth/auth_managers.cc', 'auth/auth_session.cc', + 'auth/oauth2_util.cc', 'catalog_properties.cc', 'endpoint.cc', 'error_handlers.cc', @@ -82,6 +83,7 @@ install_headers( 'auth/auth_managers.h', 'auth/auth_properties.h', 'auth/auth_session.h', + 'auth/oauth2_util.h', ], subdir: 'iceberg/catalog/rest/auth', ) diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 94c6b1e4e..b7430974f 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -71,8 +71,19 @@ Result FetchServerConfig(const ResourcePaths& paths, auth::AuthSession& session) { ICEBERG_ASSIGN_OR_RAISE(auto config_path, paths.Config()); HttpClient client(current_config.ExtractHeaders()); + + // Send the client's warehouse location to the service to keep in sync. + // This is needed for cases where the warehouse is configured client side, but may + // be used on the server side, like the Hive Metastore, where both client and service + // may have a warehouse location. + std::unordered_map params; + std::string warehouse = current_config.Get(RestCatalogProperties::kWarehouse); + if (!warehouse.empty()) { + params[RestCatalogProperties::kWarehouse.key()] = std::move(warehouse); + } + ICEBERG_ASSIGN_OR_RAISE(const auto response, - client.Get(config_path, /*params=*/{}, /*headers=*/{}, + client.Get(config_path, params, /*headers=*/{}, *DefaultErrorHandler::Instance(), session)); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); return CatalogConfigFromJson(json); diff --git a/src/iceberg/catalog/rest/type_fwd.h b/src/iceberg/catalog/rest/type_fwd.h index e72861053..adb44b9a1 100644 --- a/src/iceberg/catalog/rest/type_fwd.h +++ b/src/iceberg/catalog/rest/type_fwd.h @@ -40,5 +40,6 @@ namespace iceberg::rest::auth { class AuthManager; class AuthSession; +struct OAuthTokenResponse; } // namespace iceberg::rest::auth diff --git a/src/iceberg/test/auth_manager_test.cc b/src/iceberg/test/auth_manager_test.cc index 82db393d0..6aad1d650 100644 --- a/src/iceberg/test/auth_manager_test.cc +++ b/src/iceberg/test/auth_manager_test.cc @@ -28,6 +28,7 @@ #include "iceberg/catalog/rest/auth/auth_managers.h" #include "iceberg/catalog/rest/auth/auth_properties.h" #include "iceberg/catalog/rest/auth/auth_session.h" +#include "iceberg/catalog/rest/auth/oauth2_util.h" #include "iceberg/catalog/rest/http_client.h" #include "iceberg/test/matchers.h" @@ -195,4 +196,109 @@ TEST_F(AuthManagerTest, RegisterCustomAuthManager) { EXPECT_EQ(headers["X-Custom-Auth"], "custom-value"); } +// Verifies OAuth2 with static token +TEST_F(AuthManagerTest, OAuth2StaticToken) { + std::unordered_map properties = { + {AuthProperties::kAuthType, "oauth2"}, + {AuthProperties::kOAuth2Token, "my-static-token"}, + }; + + auto manager_result = AuthManagers::Load("test-catalog", properties); + ASSERT_THAT(manager_result, IsOk()); + + auto session_result = manager_result.value()->CatalogSession(client_, properties); + ASSERT_THAT(session_result, IsOk()); + + std::unordered_map headers; + EXPECT_THAT(session_result.value()->Authenticate(headers), IsOk()); + EXPECT_EQ(headers["Authorization"], "Bearer my-static-token"); +} + +// Verifies OAuth2 type is inferred from token property +TEST_F(AuthManagerTest, OAuth2InferredFromToken) { + std::unordered_map properties = { + {AuthProperties::kOAuth2Token, "inferred-token"}, + }; + + auto manager_result = AuthManagers::Load("test-catalog", properties); + ASSERT_THAT(manager_result, IsOk()); + + auto session_result = manager_result.value()->CatalogSession(client_, properties); + ASSERT_THAT(session_result, IsOk()); + + std::unordered_map headers; + EXPECT_THAT(session_result.value()->Authenticate(headers), IsOk()); + EXPECT_EQ(headers["Authorization"], "Bearer inferred-token"); +} + +// Verifies OAuth2 returns unauthenticated session when neither token nor credential is +// provided +TEST_F(AuthManagerTest, OAuth2MissingCredentials) { + std::unordered_map properties = { + {AuthProperties::kAuthType, "oauth2"}, + }; + + auto manager_result = AuthManagers::Load("test-catalog", properties); + ASSERT_THAT(manager_result, IsOk()); + + auto session_result = manager_result.value()->CatalogSession(client_, properties); + ASSERT_THAT(session_result, IsOk()); + + // Session should have no auth headers + std::unordered_map headers; + ASSERT_TRUE(session_result.value()->Authenticate(headers).has_value()); + EXPECT_EQ(headers.find("Authorization"), headers.end()); +} + +// Verifies OAuth2 fails with invalid credential format +TEST_F(AuthManagerTest, OAuth2InvalidCredentialFormat) { + std::unordered_map properties = { + {AuthProperties::kAuthType, "oauth2"}, + {AuthProperties::kOAuth2Credential, "no-colon-separator"}, + {"uri", "http://localhost:8181"}, + }; + + auto manager_result = AuthManagers::Load("test-catalog", properties); + ASSERT_THAT(manager_result, IsOk()); + + auto session_result = manager_result.value()->CatalogSession(client_, properties); + EXPECT_THAT(session_result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(session_result, HasErrorMessage("client_id:client_secret")); +} + +// Verifies OAuthTokenResponse JSON parsing +TEST_F(AuthManagerTest, OAuthTokenResponseParsing) { + std::string json = R"({ + "access_token": "test-access-token", + "token_type": "bearer", + "expires_in": 3600, + "refresh_token": "test-refresh-token", + "scope": "catalog" + })"; + + auto result = OAuthTokenResponseFromJsonString(json); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->access_token, "test-access-token"); + EXPECT_EQ(result->token_type, "bearer"); + EXPECT_EQ(result->expires_in, 3600); + EXPECT_EQ(result->refresh_token, "test-refresh-token"); + EXPECT_EQ(result->scope, "catalog"); +} + +// Verifies OAuthTokenResponse parsing with minimal fields +TEST_F(AuthManagerTest, OAuthTokenResponseMinimal) { + std::string json = R"({ + "access_token": "token123", + "token_type": "Bearer" + })"; + + auto result = OAuthTokenResponseFromJsonString(json); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->access_token, "token123"); + EXPECT_EQ(result->token_type, "Bearer"); + EXPECT_EQ(result->expires_in, 0); + EXPECT_TRUE(result->refresh_token.empty()); + EXPECT_TRUE(result->scope.empty()); +} + } // namespace iceberg::rest::auth From 0e741b91c70e6abbc22af7c9753946445fdbdfee Mon Sep 17 00:00:00 2001 From: "shuxu.li" Date: Thu, 26 Feb 2026 22:38:15 +0800 Subject: [PATCH 2/2] feat: Implement OAuth2 authentication for REST catalog --- src/iceberg/catalog/rest/auth/auth_manager.cc | 4 +-- src/iceberg/catalog/rest/auth/auth_session.cc | 7 +---- src/iceberg/catalog/rest/auth/oauth2_util.cc | 4 +-- src/iceberg/catalog/rest/auth/oauth2_util.h | 27 ------------------- 4 files changed, 4 insertions(+), 38 deletions(-) diff --git a/src/iceberg/catalog/rest/auth/auth_manager.cc b/src/iceberg/catalog/rest/auth/auth_manager.cc index 8c9b25672..c1d62e166 100644 --- a/src/iceberg/catalog/rest/auth/auth_manager.cc +++ b/src/iceberg/catalog/rest/auth/auth_manager.cc @@ -26,7 +26,6 @@ #include "iceberg/catalog/rest/auth/auth_session.h" #include "iceberg/catalog/rest/auth/oauth2_util.h" #include "iceberg/catalog/rest/catalog_properties.h" -#include "iceberg/catalog/rest/http_client.h" #include "iceberg/util/macros.h" #include "iceberg/util/transform_util.h" @@ -127,7 +126,6 @@ class OAuth2AuthManager : public AuthManager { Result> CatalogSession( HttpClient& client, const std::unordered_map& properties) override { - // Reuse the token fetched during InitSession. if (init_token_response_.has_value()) { auto token_response = std::move(*init_token_response_); init_token_response_.reset(); @@ -136,7 +134,7 @@ class OAuth2AuthManager : public AuthManager { ctx.client_secret, ctx.scope, client); } - // If both token and credential are provided, prefer the token. + // If token is provided, use it directly. auto token_it = properties.find(AuthProperties::kOAuth2Token); if (token_it != properties.end() && !token_it->second.empty()) { return AuthSession::MakeDefault({{"Authorization", "Bearer " + token_it->second}}); diff --git a/src/iceberg/catalog/rest/auth/auth_session.cc b/src/iceberg/catalog/rest/auth/auth_session.cc index 2cfeb2c52..3c06c384f 100644 --- a/src/iceberg/catalog/rest/auth/auth_session.cc +++ b/src/iceberg/catalog/rest/auth/auth_session.cc @@ -55,12 +55,7 @@ std::shared_ptr AuthSession::MakeOAuth2( const OAuthTokenResponse& initial_token, const std::string& /*token_endpoint*/, const std::string& /*client_id*/, const std::string& /*client_secret*/, const std::string& /*scope*/, HttpClient& /*client*/) { - // TODO(lishuxu): Replace with OAuth2AuthSession that: - // - Stores access_token, refresh_token, expires_in, and token_endpoint - // - Checks token expiration in Authenticate() and transparently refreshes - // using RefreshToken() (or re-fetches via client_credentials if no - // refresh_token is available) - // For now, fall back to a static session with the initial token. + // TODO(lishuxu): Create OAuth2AuthSession with auto-refresh support. return MakeDefault({{"Authorization", "Bearer " + initial_token.access_token}}); } diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.cc b/src/iceberg/catalog/rest/auth/oauth2_util.cc index 7254a6c84..8e632d531 100644 --- a/src/iceberg/catalog/rest/auth/oauth2_util.cc +++ b/src/iceberg/catalog/rest/auth/oauth2_util.cc @@ -78,13 +78,13 @@ Result FetchToken(HttpClient& client, const std::string& client_id, const std::string& client_secret, const std::string& scope, AuthSession& session) { - std::unordered_map form_data = { + std::unordered_map form_data{ {std::string(kGrantType), std::string(kClientCredentials)}, {std::string(kClientId), client_id}, {std::string(kClientSecret), client_secret}, }; if (!scope.empty()) { - form_data[std::string(kScope)] = scope; + form_data.emplace(std::string(kScope), scope); } ICEBERG_ASSIGN_OR_RAISE(auto response, diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.h b/src/iceberg/catalog/rest/auth/oauth2_util.h index 0d5b3621d..ba5015c47 100644 --- a/src/iceberg/catalog/rest/auth/oauth2_util.h +++ b/src/iceberg/catalog/rest/auth/oauth2_util.h @@ -69,38 +69,11 @@ ICEBERG_REST_EXPORT Result FetchToken( const std::string& client_secret, const std::string& scope, AuthSession& session); /// \brief Refresh an expired access token using a refresh_token grant. -/// -/// Sends a POST request with form-encoded body to the token endpoint: -/// grant_type=refresh_token&refresh_token=...&client_id=...&scope=... -/// -/// \param client HTTP client to use for the request. -/// \param token_endpoint Full URL of the OAuth2 token endpoint. -/// \param client_id OAuth2 client ID (may be empty if not required by server). -/// \param refresh_token The refresh token from a previous token response. -/// \param scope OAuth2 scope to request. -/// \param session Auth session for the request. -/// \return A new token response with a fresh access_token, or an error. ICEBERG_REST_EXPORT Result RefreshToken( HttpClient& client, const std::string& token_endpoint, const std::string& client_id, const std::string& refresh_token, const std::string& scope, AuthSession& session); /// \brief Exchange a token for a scoped token using RFC 8693 Token Exchange. -/// -/// Sends a POST request with form-encoded body to the token endpoint: -/// grant_type=urn:ietf:params:oauth:grant-type:token-exchange -/// &subject_token=...&subject_token_type=...&scope=... -/// -/// Used by TableSession and ContextualSession to obtain table/context-specific -/// tokens from a parent session's access token. -/// -/// \param client HTTP client to use for the request. -/// \param token_endpoint Full URL of the OAuth2 token endpoint. -/// \param subject_token The access token to exchange. -/// \param subject_token_type Token type URI (typically -/// "urn:ietf:params:oauth:token-type:access_token"). -/// \param scope OAuth2 scope to request for the exchanged token. -/// \param session Auth session for the request. -/// \return A new token response with a scoped access_token, or an error. ICEBERG_REST_EXPORT Result ExchangeToken( HttpClient& client, const std::string& token_endpoint, const std::string& subject_token, const std::string& subject_token_type,