From 2b8fcbbc5abfdb14d636489bef4aaec8e50f12fb Mon Sep 17 00:00:00 2001 From: SkylerLin <44233950+linguoxuan@users.noreply.github.com> Date: Fri, 27 Feb 2026 23:16:33 +0800 Subject: [PATCH] feat: retry failed transaction commit --- src/iceberg/table.cc | 1 + src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/retry_util_test.cc | 303 ++++++++++++++++++ src/iceberg/transaction.cc | 81 ++++- src/iceberg/transaction.h | 8 + src/iceberg/update/pending_update.h | 3 + src/iceberg/update/snapshot_manager.cc | 3 + src/iceberg/update/snapshot_manager.h | 2 + src/iceberg/update/snapshot_update.cc | 15 +- .../update/update_snapshot_reference.cc | 16 +- .../update/update_snapshot_reference.h | 3 + src/iceberg/util/retry_util.h | 180 +++++++++++ 12 files changed, 596 insertions(+), 20 deletions(-) create mode 100644 src/iceberg/test/retry_util_test.cc create mode 100644 src/iceberg/util/retry_util.h diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 0550f61d5..df5333bf7 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -82,6 +82,7 @@ Status Table::Refresh() { ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_)); if (metadata_location_ != refreshed_table->metadata_file_location()) { metadata_ = std::move(refreshed_table->metadata_); + metadata_location_ = std::string(refreshed_table->metadata_file_location()); io_ = std::move(refreshed_table->io_); metadata_cache_ = std::make_unique(metadata_.get()); } diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fdd88888e..20436732f 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -115,6 +115,7 @@ add_iceberg_test(util_test endian_test.cc formatter_test.cc location_util_test.cc + retry_util_test.cc string_util_test.cc transform_util_test.cc truncate_util_test.cc diff --git a/src/iceberg/test/retry_util_test.cc b/src/iceberg/test/retry_util_test.cc new file mode 100644 index 000000000..a5e2065d7 --- /dev/null +++ b/src/iceberg/test/retry_util_test.cc @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/retry_util.h" + +#include + +#include "iceberg/result.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +// -------------------------------------------------------------------------- +// Test: Successful on first attempt — no retries +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, SuccessOnFirstAttempt) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner() + .WithRetries(3) + .WithExponentialBackoff(1, 10, 5000, 2.0) + .Run( + [&]() -> Result { + ++call_count; + return 42; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 42); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: Retry once then succeed +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, RetryOnceThenSucceed) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner() + .WithRetries(3) + .WithExponentialBackoff(1, 10, 5000, 2.0) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("transient failure"); + } + return 42; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 42); + EXPECT_EQ(call_count, 2); + EXPECT_EQ(attempts, 2); +} + +// -------------------------------------------------------------------------- +// Test: Max attempts exhausted +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, MaxAttemptsExhausted) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner() + .WithRetries(2) + .WithExponentialBackoff(1, 10, 5000, 2.0) + .Run( + [&]() -> Result { + ++call_count; + return CommitFailed("always fails"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(call_count, 3); // 1 initial + 2 retries + EXPECT_EQ(attempts, 3); +} + +// -------------------------------------------------------------------------- +// Test: OnlyRetryOn filters correctly +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, OnlyRetryOnFilter) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner() + .WithRetries(3) + .WithExponentialBackoff(1, 10, 5000, 2.0) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + // Return a non-retryable error + return ValidationFailed("schema conflict"); + }, + &attempts); + + // Should NOT retry because ValidationFailed is not in the retry list + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: OnlyRetryOn retries matching error +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, OnlyRetryOnMatchingError) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner() + .WithRetries(2) + .WithExponentialBackoff(1, 10, 5000, 2.0) + .OnlyRetryOn(ErrorKind::kCommitFailed) + .Run( + [&]() -> Result { + ++call_count; + if (call_count <= 2) { + return CommitFailed("transient"); + } + return 100; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 100); + EXPECT_EQ(call_count, 3); // 2 failures + 1 success + EXPECT_EQ(attempts, 3); +} + +// -------------------------------------------------------------------------- +// Test: StopRetryOn stops on matching error +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, StopRetryOnMatchingError) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner() + .WithRetries(5) + .WithExponentialBackoff(1, 10, 5000, 2.0) + .StopRetryOn({ErrorKind::kCommitStateUnknown}) + .Run( + [&]() -> Result { + ++call_count; + return CommitStateUnknown("datacenter on fire"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitStateUnknown)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: Zero retries means only one attempt +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, ZeroRetries) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner() + .WithRetries(0) + .WithExponentialBackoff(1, 10, 5000, 2.0) + .Run( + [&]() -> Result { + ++call_count; + return CommitFailed("fail"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: MakeCommitRetryRunner has correct configuration +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, MakeCommitRetryRunnerConfig) { + int call_count = 0; + int32_t attempts = 0; + + // MakeCommitRetryRunner should only retry on kCommitFailed + auto result = MakeCommitRetryRunner(2, 1, 10, 5000) + .Run( + [&]() -> Result { + ++call_count; + // ValidationFailed should not be retried + return ValidationFailed("not retryable"); + }, + &attempts); + + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_EQ(call_count, 1); + EXPECT_EQ(attempts, 1); +} + +// -------------------------------------------------------------------------- +// Test: MakeCommitRetryRunner retries CommitFailed +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) { + int call_count = 0; + int32_t attempts = 0; + + auto result = MakeCommitRetryRunner(3, 1, 10, 5000) + .Run( + [&]() -> Result { + ++call_count; + if (call_count <= 2) { + return CommitFailed("transient"); + } + return 99; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 99); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +// -------------------------------------------------------------------------- +// Test: OnlyRetryOn with multiple error kinds +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) { + int call_count = 0; + int32_t attempts = 0; + + auto result = + RetryRunner() + .WithRetries(5) + .WithExponentialBackoff(1, 10, 5000, 2.0) + .OnlyRetryOn({ErrorKind::kCommitFailed, ErrorKind::kServiceUnavailable}) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return CommitFailed("conflict"); + } + if (call_count == 2) { + return ServiceUnavailable("server busy"); + } + return 77; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 77); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +// -------------------------------------------------------------------------- +// Test: Default retry (no filter) retries all errors +// -------------------------------------------------------------------------- +TEST(RetryRunnerTest, DefaultRetryAllErrors) { + int call_count = 0; + int32_t attempts = 0; + + auto result = RetryRunner() + .WithRetries(3) + .WithExponentialBackoff(1, 10, 5000, 2.0) + .Run( + [&]() -> Result { + ++call_count; + if (call_count == 1) { + return IOError("disk full"); + } + if (call_count == 2) { + return ValidationFailed("bad schema"); + } + return 55; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 55); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 6df45b30c..57e7d9685 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -49,6 +48,7 @@ #include "iceberg/util/checked_cast.h" #include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" +#include "iceberg/util/retry_util.h" namespace iceberg { @@ -84,7 +84,7 @@ const TableMetadata& Transaction::current() const { return metadata_builder_->cu std::string Transaction::MetadataFileLocation(std::string_view filename) const { const auto metadata_location = current().properties.Get(TableProperties::kWriteMetadataLocation); - if (metadata_location.empty()) { + if (!metadata_location.empty()) { return std::format("{}/{}", LocationUtil::StripTrailingSlash(metadata_location), filename); } @@ -153,7 +153,7 @@ Status Transaction::Apply(PendingUpdate& update) { last_update_committed_ = true; - if (auto_commit_) { + if (auto_commit_ && !applying_updates_) { ICEBERG_RETURN_UNEXPECTED(Commit()); } @@ -310,22 +310,26 @@ Result> Transaction::Commit() { return table_; } - std::vector> requirements; - switch (kind_) { - case Kind::kCreate: { - ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(updates)); - } break; - case Kind::kUpdate: { - ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable( - *metadata_builder_->base(), updates)); - - } break; + Result> commit_result; + if (!CanRetry()) { + ICEBERG_ASSIGN_OR_RAISE( + auto requirements, + kind_ == Kind::kCreate + ? TableRequirements::ForCreateTable(updates) + : TableRequirements::ForUpdateTable(*metadata_builder_->base(), updates)); + commit_result = table_->catalog()->UpdateTable(table_->name(), requirements, updates); + } else { + const auto& props = table_->properties(); + int32_t num_retries = props.Get(TableProperties::kCommitNumRetries); + int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs); + int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs); + int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs); + + commit_result = + MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms) + .Run([this]() -> Result> { return CommitOnce(); }); } - // XXX: we should handle commit failure and retry here. - auto commit_result = - table_->catalog()->UpdateTable(table_->name(), requirements, updates); - for (const auto& update : pending_updates_) { if (auto update_ptr = update.lock()) { std::ignore = update_ptr->Finalize(commit_result.has_value() @@ -343,6 +347,49 @@ Result> Transaction::Commit() { return table_; } +Result> Transaction::CommitOnce() { + auto refresh_result = table_->Refresh(); + if (!refresh_result.has_value()) { + return std::unexpected(refresh_result.error()); + } + + if (metadata_builder_->base() != table_->metadata().get()) { + metadata_builder_ = TableMetadataBuilder::BuildFrom(table_->metadata().get()); + applying_updates_ = true; + for (const auto& weak_update : pending_updates_) { + if (auto update = weak_update.lock()) { + auto commit_status = update->Commit(); + if (!commit_status.has_value()) { + applying_updates_ = false; + return std::unexpected(commit_status.error()); + } + } + } + applying_updates_ = false; + } + + ICEBERG_ASSIGN_OR_RAISE( + auto requirements, TableRequirements::ForUpdateTable(*metadata_builder_->base(), + metadata_builder_->changes())); + + return table_->catalog()->UpdateTable(table_->name(), requirements, + metadata_builder_->changes()); +} + +bool Transaction::CanRetry() const { + if (kind_ == Kind::kCreate) { + return false; + } + for (const auto& weak_update : pending_updates_) { + if (auto update = weak_update.lock()) { + if (!update->IsRetryable()) { + return false; + } + } + } + return true; +} + Result> Transaction::NewUpdatePartitionSpec() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_spec, UpdatePartitionSpec::Make(shared_from_this())); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 438054b51..4c8ad22c6 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -130,6 +130,12 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> CommitOnce(); + + /// \brief Whether this transaction can retry after a commit conflict. + bool CanRetry() const; + private: friend class PendingUpdate; @@ -144,6 +150,8 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> pending_updates_; diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index f44812a85..68d93f843 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -58,6 +58,9 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { /// \brief Return the kind of this pending update. virtual Kind kind() const = 0; + /// \brief Whether this update can be retried after a commit conflict. + virtual bool IsRetryable() const { return true; } + /// \brief Apply the pending changes and commit. /// /// \return An OK status if the commit was successful, or an error: diff --git a/src/iceberg/update/snapshot_manager.cc b/src/iceberg/update/snapshot_manager.cc index d882dd320..cf3d31457 100644 --- a/src/iceberg/update/snapshot_manager.cc +++ b/src/iceberg/update/snapshot_manager.cc @@ -201,6 +201,9 @@ SnapshotManager::UpdateSnapshotReferencesOperation() { Status SnapshotManager::CommitIfRefUpdatesExist() { if (update_snap_refs_ != nullptr) { ICEBERG_RETURN_UNEXPECTED(update_snap_refs_->Commit()); + // Keep the shared_ptr alive so that pending_updates_ weak_ptr in Transaction + // remains valid until Transaction::Commit() checks CanRetry(). + committed_snap_refs_.push_back(std::move(update_snap_refs_)); update_snap_refs_ = nullptr; } return {}; diff --git a/src/iceberg/update/snapshot_manager.h b/src/iceberg/update/snapshot_manager.h index fd81f8339..ecc9805e8 100644 --- a/src/iceberg/update/snapshot_manager.h +++ b/src/iceberg/update/snapshot_manager.h @@ -21,6 +21,7 @@ #include #include +#include #include "iceberg/iceberg_export.h" #include "iceberg/result.h" @@ -198,6 +199,7 @@ class ICEBERG_EXPORT SnapshotManager : public ErrorCollector { std::shared_ptr transaction_; const bool is_external_transaction_; std::shared_ptr update_snap_refs_; + std::vector> committed_snap_refs_; }; } // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index b44682561..f9636414b 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -220,7 +220,8 @@ Result> SnapshotUpdate::WriteDeleteManifests( } int64_t SnapshotUpdate::SnapshotId() { - if (!snapshot_id_.has_value()) { + while (!snapshot_id_.has_value() || + base().SnapshotById(snapshot_id_.value()).has_value()) { snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base()); } return snapshot_id_.value(); @@ -228,6 +229,18 @@ int64_t SnapshotUpdate::SnapshotId() { Result SnapshotUpdate::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + if (staged_snapshot_ != nullptr) { + for (const auto& manifest_list : manifest_lists_) { + std::ignore = DeleteFile(manifest_list); + } + manifest_lists_.clear(); + CleanUncommitted(std::unordered_set{}); + + staged_snapshot_ = nullptr; + summary_.Clear(); + } + ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot, SnapshotUtil::OptionalLatestSnapshot(base(), target_branch_)); diff --git a/src/iceberg/update/update_snapshot_reference.cc b/src/iceberg/update/update_snapshot_reference.cc index 923f0c8df..a6e36f355 100644 --- a/src/iceberg/update/update_snapshot_reference.cc +++ b/src/iceberg/update/update_snapshot_reference.cc @@ -41,7 +41,9 @@ Result> UpdateSnapshotReference::Make( } UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr transaction) - : PendingUpdate(std::move(transaction)), updated_refs_(base().refs) {} + : PendingUpdate(std::move(transaction)), + updated_refs_(base().refs), + original_refs_(base().refs) {} UpdateSnapshotReference::~UpdateSnapshotReference() = default; @@ -220,8 +222,18 @@ UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::stri Result UpdateSnapshotReference::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - ApplyResult result; const auto& current_refs = base().refs; + for (const auto& [name, ref] : updated_refs_) { + if (!original_refs_.contains(name) && current_refs.contains(name)) { + if (ref->type() == SnapshotRefType::kTag) { + return CommitFailed("tag '{}' was created concurrently", name); + } else { + return CommitFailed("branch '{}' was created concurrently", name); + } + } + } + + ApplyResult result; // Identify references which have been removed for (const auto& [name, ref] : current_refs) { diff --git a/src/iceberg/update/update_snapshot_reference.h b/src/iceberg/update/update_snapshot_reference.h index e13f5bfa9..43a399706 100644 --- a/src/iceberg/update/update_snapshot_reference.h +++ b/src/iceberg/update/update_snapshot_reference.h @@ -134,6 +134,8 @@ class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { Kind kind() const final { return Kind::kUpdateSnapshotReference; } + bool IsRetryable() const override { return false; } + struct ApplyResult { /// References to set or update (name, ref pairs) std::vector>> to_set; @@ -152,6 +154,7 @@ class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { bool fast_forward); std::unordered_map> updated_refs_; + std::unordered_map> original_refs_; }; } // namespace iceberg diff --git a/src/iceberg/util/retry_util.h b/src/iceberg/util/retry_util.h new file mode 100644 index 000000000..9da6b67e3 --- /dev/null +++ b/src/iceberg/util/retry_util.h @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief Configuration for retry behavior +struct RetryConfig { + /// Maximum number of retry attempts (not including the first attempt) + int32_t num_retries = 4; + /// Minimum wait time between retries in milliseconds + int32_t min_wait_ms = 100; + /// Maximum wait time between retries in milliseconds + int32_t max_wait_ms = 60 * 1000; // 1 minute + /// Total maximum time for all retries in milliseconds + int32_t total_timeout_ms = 30 * 60 * 1000; // 30 minutes + /// Exponential backoff scale factor + double scale_factor = 2.0; +}; + +/// \brief Utility class for running tasks with retry logic +class RetryRunner { + public: + RetryRunner() = default; + + RetryRunner& WithRetries(int32_t num_retries) { + config_.num_retries = num_retries; + return *this; + } + + RetryRunner& WithExponentialBackoff(int32_t min_wait_ms, int32_t max_wait_ms, + int32_t total_timeout_ms, double scale_factor) { + config_.min_wait_ms = min_wait_ms; + config_.max_wait_ms = max_wait_ms; + config_.total_timeout_ms = total_timeout_ms; + config_.scale_factor = scale_factor; + return *this; + } + + /// \brief Specify error types that should trigger a retry + RetryRunner& OnlyRetryOn(std::initializer_list error_kinds) { + only_retry_on_ = std::vector(error_kinds); + return *this; + } + + /// \brief Specify error types that should trigger a retry + RetryRunner& OnlyRetryOn(ErrorKind error_kind) { + only_retry_on_ = std::vector{error_kind}; + return *this; + } + + /// \brief Specify error types that should stop retries immediately + RetryRunner& StopRetryOn(std::initializer_list error_kinds) { + stop_retry_on_ = std::vector(error_kinds); + return *this; + } + + /// \brief Run a task that returns a Result + template ::value_type> + Result Run(F&& task, int32_t* attempt_counter = nullptr) { + auto start_time = std::chrono::steady_clock::now(); + int32_t attempt = 0; + int32_t max_attempts = config_.num_retries + 1; + + while (true) { + ++attempt; + if (attempt_counter != nullptr) { + *attempt_counter = attempt; + } + + auto result = task(); + if (result.has_value()) { + return result; + } + + const auto& error = result.error(); + + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time) + .count(); + + // total_timeout_ms <= 0 means no total timeout limit + bool timed_out = config_.total_timeout_ms > 0 && + elapsed > config_.total_timeout_ms && attempt > 1; + if (attempt >= max_attempts || timed_out) { + return result; + } + + if (!ShouldRetry(error.kind)) { + return result; + } + + int32_t delay_ms = CalculateDelay(attempt); + Sleep(delay_ms); + } + } + + private: + /// \brief Check if the given error kind should trigger a retry + bool ShouldRetry(ErrorKind kind) const { + if (only_retry_on_.has_value()) { + for (const auto& retry_kind : only_retry_on_.value()) { + if (kind == retry_kind) { + return true; + } + } + return false; + } + + if (stop_retry_on_.has_value()) { + for (const auto& stop_kind : stop_retry_on_.value()) { + if (kind == stop_kind) { + return false; + } + } + } + + return true; + } + + /// \brief Calculate delay with exponential backoff and jitter + int32_t CalculateDelay(int32_t attempt) const { + // Calculate base delay with exponential backoff + double base_delay = config_.min_wait_ms * std::pow(config_.scale_factor, attempt - 1); + int32_t delay_ms = static_cast( + std::min(base_delay, static_cast(config_.max_wait_ms))); + + static thread_local std::mt19937 gen(std::random_device{}()); + int32_t jitter_range = std::max(1, delay_ms / 10); + std::uniform_int_distribution<> dis(-jitter_range, jitter_range); + delay_ms += dis(gen); + return std::max(1, delay_ms); + } + + /// \brief Sleep for the specified duration + void Sleep(int32_t ms) const { + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + } + + RetryConfig config_; + std::optional> only_retry_on_; + std::optional> stop_retry_on_; +}; + +/// \brief Helper function to create a RetryRunner with table commit configuration +inline RetryRunner MakeCommitRetryRunner(int32_t num_retries, int32_t min_wait_ms, + int32_t max_wait_ms, int32_t total_timeout_ms) { + return RetryRunner() + .WithRetries(num_retries) + .WithExponentialBackoff(min_wait_ms, max_wait_ms, total_timeout_ms, 2.0) + .OnlyRetryOn(ErrorKind::kCommitFailed); +} + +} // namespace iceberg