From 642c8a34d5a0c3b838dc65813f34c360bc1f50d5 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 19 Mar 2024 17:37:30 +0800 Subject: [PATCH 1/6] support file based transaction Signed-off-by: Ping Yu --- proto/kvrpcpb.proto | 15 +++ src/common/errors.rs | 2 + src/generated/kvrpcpb.rs | 15 +++ src/raw/requests.rs | 5 +- src/request/mod.rs | 11 +- src/request/plan.rs | 12 +- src/request/plan_builder.rs | 3 + src/timestamp.rs | 4 + src/transaction/lock.rs | 193 +++++++++++++++++++++++++++------ src/transaction/requests.rs | 7 +- src/transaction/transaction.rs | 55 +++++++--- 11 files changed, 266 insertions(+), 56 deletions(-) diff --git a/proto/kvrpcpb.proto b/proto/kvrpcpb.proto index 69f2d4a4..a785549e 100644 --- a/proto/kvrpcpb.proto +++ b/proto/kvrpcpb.proto @@ -132,6 +132,9 @@ message PrewriteRequest { // for_update_ts constriants that should be checked when prewriting a pessimistic transaction. // See https://github.com/tikv/tikv/issues/14311 repeated ForUpdateTSConstraint for_update_ts_constraints = 16; + + // Reserved for file based transaction. + repeated uint64 txn_file_chunks = 100; } message PrewriteResponse { @@ -315,6 +318,9 @@ message CheckTxnStatusRequest { // because the old versions of clients cannot handle some results returned from TiKV correctly. // For new versions, this field should always be set to true. bool verify_is_primary = 9; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message CheckTxnStatusResponse { @@ -480,6 +486,9 @@ message ResolveLockRequest { repeated TxnInfo txn_infos = 4; // Only resolve specified keys. repeated bytes keys = 5; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message ResolveLockResponse { @@ -971,6 +980,9 @@ message LockInfo { // It's used in timeout errors. 0 means unknown or not applicable. // It can be used to help the client decide whether to try resolving the lock. uint64 duration_to_last_update_ms = 11; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message KeyError { @@ -1320,6 +1332,9 @@ message MvccInfo { message TxnInfo { uint64 txn = 1; uint64 status = 2; + + // Reserved for file based transaction. + bool is_txn_file = 100; } enum Action { diff --git a/src/common/errors.rs b/src/common/errors.rs index c5cd9b44..41e081a2 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -112,6 +112,8 @@ pub enum Error { }, #[error("Keyspace not found: {0}")] KeyspaceNotFound(String), + #[error("Transaction not found error: {:?}", _0)] + TxnNotFound(kvrpcpb::TxnNotFound), } impl From for Error { diff --git a/src/generated/kvrpcpb.rs b/src/generated/kvrpcpb.rs index ec039123..25685c8b 100644 --- a/src/generated/kvrpcpb.rs +++ b/src/generated/kvrpcpb.rs @@ -134,6 +134,9 @@ pub struct PrewriteRequest { pub for_update_ts_constraints: ::prost::alloc::vec::Vec< prewrite_request::ForUpdateTsConstraint, >, + /// Reserved for file based transaction. + #[prost(uint64, repeated, tag = "100")] + pub txn_file_chunks: ::prost::alloc::vec::Vec, } /// Nested message and enum types in `PrewriteRequest`. pub mod prewrite_request { @@ -422,6 +425,9 @@ pub struct CheckTxnStatusRequest { /// For new versions, this field should always be set to true. #[prost(bool, tag = "9")] pub verify_is_primary: bool, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -661,6 +667,9 @@ pub struct ResolveLockRequest { /// Only resolve specified keys. #[prost(bytes = "vec", repeated, tag = "5")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1308,6 +1317,9 @@ pub struct LockInfo { /// It can be used to help the client decide whether to try resolving the lock. #[prost(uint64, tag = "11")] pub duration_to_last_update_ms: u64, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1781,6 +1793,9 @@ pub struct TxnInfo { pub txn: u64, #[prost(uint64, tag = "2")] pub status: u64, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/raw/requests.rs b/src/raw/requests.rs index e7825b36..df9c028d 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -663,6 +663,7 @@ mod test { use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::kvrpcpb; + use crate::proto::pdpb::Timestamp; use crate::request::Keyspace; use crate::request::Plan; @@ -700,7 +701,7 @@ mod test { ..Default::default() }; let plan = crate::request::PlanBuilder::new(client, keyspace, scan) - .resolve_lock(OPTIMISTIC_BACKOFF, keyspace) + .resolve_lock(Timestamp::default(), OPTIMISTIC_BACKOFF, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); @@ -756,7 +757,7 @@ mod test { new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false); let keyspace = Keyspace::Enable { keyspace_id: 0 }; let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request) - .resolve_lock(OPTIMISTIC_BACKOFF, keyspace) + .resolve_lock(Timestamp::default(), OPTIMISTIC_BACKOFF, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .plan(); let _ = plan.execute().await; diff --git a/src/request/mod.rs b/src/request/mod.rs index a9913e74..cdd12f4f 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -105,6 +105,7 @@ mod test { use crate::region::RegionWithLeader; use crate::store::region_stream_for_keys; use crate::store::HasRegionError; + use crate::timestamp::TimestampExt as _; use crate::transaction::lowering::new_commit_request; use crate::Error; use crate::Key; @@ -197,7 +198,11 @@ mod test { ))); let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, request) - .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3), Keyspace::Disable) + .resolve_lock( + Timestamp::max(), + Backoff::no_jitter_backoff(1, 1, 3), + Keyspace::Disable, + ) .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3)) .extract_error() .plan(); @@ -224,14 +229,14 @@ mod test { // does not extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req.clone()) - .resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable) + .resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF, Keyspace::Disable) .retry_multi_region(OPTIMISTIC_BACKOFF) .plan(); assert!(plan.execute().await.is_ok()); // extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req) - .resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable) + .resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF, Keyspace::Disable) .retry_multi_region(OPTIMISTIC_BACKOFF) .extract_error() .plan(); diff --git a/src/request/plan.rs b/src/request/plan.rs index c4a1a368..b87a2f47 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -17,6 +17,7 @@ use crate::pd::PdClient; use crate::proto::errorpb; use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; +use crate::proto::pdpb::Timestamp; use crate::region::StoreId; use crate::region::{RegionVerId, RegionWithLeader}; use crate::request::shard::HasNextBatch; @@ -597,6 +598,7 @@ pub struct DefaultProcessor; pub struct ResolveLock { pub inner: P, + pub timestamp: Timestamp, pub pd_client: Arc, pub backoff: Backoff, pub keyspace: Keyspace, @@ -606,6 +608,7 @@ impl Clone for ResolveLock { fn clone(&self) -> Self { ResolveLock { inner: self.inner.clone(), + timestamp: self.timestamp.clone(), pd_client: self.pd_client.clone(), backoff: self.backoff.clone(), keyspace: self.keyspace, @@ -634,7 +637,13 @@ where } let pd_client = self.pd_client.clone(); - let live_locks = resolve_locks(locks, pd_client.clone(), self.keyspace).await?; + let live_locks = resolve_locks( + locks, + self.timestamp.clone(), + pd_client.clone(), + self.keyspace, + ) + .await?; if live_locks.is_empty() { result = self.inner.execute().await?; } else { @@ -953,6 +962,7 @@ mod test { let plan = RetryableMultiRegion { inner: ResolveLock { inner: ErrPlan, + timestamp: Timestamp::default(), backoff: Backoff::no_backoff(), pd_client: Arc::new(MockPdClient::default()), keyspace: Keyspace::Disable, diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index c117d14b..9c4fcdbe 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -30,6 +30,7 @@ use crate::transaction::HasLocks; use crate::transaction::ResolveLocksContext; use crate::transaction::ResolveLocksOptions; use crate::Result; +use crate::Timestamp; /// Builder type for plans (see that module for more). pub struct PlanBuilder { @@ -72,6 +73,7 @@ impl PlanBuilder { /// If there is a lock error, then resolve the lock and retry the request. pub fn resolve_lock( self, + timestamp: Timestamp, backoff: Backoff, keyspace: Keyspace, ) -> PlanBuilder, Ph> @@ -82,6 +84,7 @@ impl PlanBuilder { pd_client: self.pd_client.clone(), plan: ResolveLock { inner: self.plan, + timestamp, backoff, pd_client: self.pd_client, keyspace, diff --git a/src/timestamp.rs b/src/timestamp.rs index 5c610e77..f257a44d 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -22,6 +22,10 @@ pub trait TimestampExt: Sized { fn from_version(version: u64) -> Self; /// Convert u64 to an optional timestamp, where `0` represents no timestamp. fn try_from_version(version: u64) -> Option; + /// Return the maximum timestamp. + fn max() -> Self { + Self::from_version(u64::MAX) + } } impl TimestampExt for Timestamp { diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index efa835d6..be4fb8aa 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -7,7 +7,9 @@ use std::sync::Arc; use fail::fail_point; use log::debug; use log::error; +use log::warn; use tokio::sync::RwLock; +use tokio::time::sleep; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -44,11 +46,14 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; /// the status of the other keys in the same transaction. pub async fn resolve_locks( locks: Vec, + timestamp: Timestamp, pd_client: Arc, keyspace: Keyspace, ) -> Result /* live_locks */> { debug!("resolving locks"); let ts = pd_client.clone().get_timestamp().await?; + let caller_start_ts = timestamp.version(); + let current_ts = ts.version(); let (expired_locks, live_locks) = locks .into_iter() @@ -57,6 +62,9 @@ pub async fn resolve_locks( >= lock.lock_ttl as i64 }); + let mut live_locks = live_locks; + let mut lock_resolver = LockResolver::new(ResolveLocksContext::default()); + // records the commit version of each primary lock (representing the status of the transaction) let mut commit_versions: HashMap = HashMap::new(); let mut clean_regions: HashMap> = HashMap::new(); @@ -75,33 +83,52 @@ pub async fn resolve_locks( } let commit_version = match commit_versions.get(&lock.lock_version) { - Some(&commit_version) => commit_version, + Some(&commit_version) => Some(commit_version), None => { - let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); - let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) - .resolve_lock(OPTIMISTIC_BACKOFF, keyspace) - .retry_multi_region(DEFAULT_REGION_BACKOFF) - .merge(CollectSingle) - .post_process_default() - .plan(); - let commit_version = plan.execute().await?; - commit_versions.insert(lock.lock_version, commit_version); - commit_version + // TODO: handle primary mismatch error. + let status = lock_resolver + .get_txn_status_from_lock( + OPTIMISTIC_BACKOFF, + &lock, + caller_start_ts, + current_ts, + false, + pd_client.clone(), + keyspace, + ) + .await?; + match &status.kind { + TransactionStatusKind::Committed(ts) => { + let commit_version = ts.version(); + commit_versions.insert(lock.lock_version, commit_version); + Some(commit_version) + } + TransactionStatusKind::RolledBack => { + commit_versions.insert(lock.lock_version, 0); + Some(0) + } + TransactionStatusKind::Locked(..) => None, + } } }; - let cleaned_region = resolve_lock_with_retry( - &lock.key, - lock.lock_version, - commit_version, - pd_client.clone(), - keyspace, - ) - .await?; - clean_regions - .entry(lock.lock_version) - .or_default() - .insert(cleaned_region); + if let Some(commit_version) = commit_version { + let cleaned_region = resolve_lock_with_retry( + &lock.key, + lock.lock_version, + commit_version, + lock.is_txn_file, + pd_client.clone(), + keyspace, + ) + .await?; + clean_regions + .entry(lock.lock_version) + .or_default() + .insert(cleaned_region); + } else { + live_locks.push(lock); + } } Ok(live_locks) } @@ -110,21 +137,24 @@ async fn resolve_lock_with_retry( #[allow(clippy::ptr_arg)] key: &Vec, start_version: u64, commit_version: u64, + is_txn_file: bool, pd_client: Arc, keyspace: Keyspace, ) -> Result { debug!("resolving locks with retry"); // FIXME: Add backoff + let timestamp = Timestamp::from_version(start_version); let mut error = None; for i in 0..RESOLVE_LOCK_RETRY_LIMIT { debug!("resolving locks: attempt {}", (i + 1)); let store = pd_client.clone().store_for_key(key.into()).await?; let ver_id = store.region_with_leader.ver_id(); - let request = requests::new_resolve_lock_request(start_version, commit_version); + let request = + requests::new_resolve_lock_request(start_version, commit_version, is_txn_file); let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) .single_region_with_store(store) .await? - .resolve_lock(Backoff::no_backoff(), keyspace) + .resolve_lock(timestamp.clone(), Backoff::no_backoff(), keyspace) .extract_error() .plan(); match plan.execute().await { @@ -246,6 +276,7 @@ impl LockResolver { true, false, l.lock_type == kvrpcpb::Op::PessimisticLock as i32, + l.is_txn_file, ) .await?; @@ -284,6 +315,7 @@ impl LockResolver { true, true, l.lock_type == kvrpcpb::Op::PessimisticLock as i32, + l.is_txn_file, ) .await?; } else { @@ -292,7 +324,7 @@ impl LockResolver { } else { secondary_status.min_commit_ts }; - txn_infos.insert(txn_id, commit_ts); + txn_infos.insert(txn_id, (commit_ts, l.is_txn_file)); continue; } } @@ -305,8 +337,10 @@ impl LockResolver { ); return Err(Error::ResolveLockError(vec![lock_info.clone()])); } - TransactionStatusKind::Committed(ts) => txn_infos.insert(txn_id, ts.version()), - TransactionStatusKind::RolledBack => txn_infos.insert(txn_id, 0), + TransactionStatusKind::Committed(ts) => { + txn_infos.insert(txn_id, (ts.version(), l.is_txn_file)) + } + TransactionStatusKind::RolledBack => txn_infos.insert(txn_id, (0, l.is_txn_file)), }; } @@ -317,11 +351,12 @@ impl LockResolver { ); let mut txn_ids = Vec::with_capacity(txn_infos.len()); let mut txn_info_vec = Vec::with_capacity(txn_infos.len()); - for (txn_id, commit_ts) in txn_infos.into_iter() { + for (txn_id, (commit_ts, is_txn_file)) in txn_infos.into_iter() { txn_ids.push(txn_id); let mut txn_info = TxnInfo::default(); txn_info.txn = txn_id; txn_info.status = commit_ts; + txn_info.is_txn_file = is_txn_file; txn_info_vec.push(txn_info); } let cleaned_region = self @@ -348,6 +383,7 @@ impl LockResolver { rollback_if_not_exist: bool, force_sync_commit: bool, resolving_pessimistic_lock: bool, + is_txn_file: bool, ) -> Result> { if let Some(txn_status) = self.ctx.get_resolved(txn_id).await { return Ok(txn_status); @@ -369,6 +405,7 @@ impl LockResolver { rollback_if_not_exist, force_sync_commit, resolving_pessimistic_lock, + is_txn_file, ); let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, req) .retry_multi_region(DEFAULT_REGION_BACKOFF) @@ -376,11 +413,25 @@ impl LockResolver { .extract_error() .post_process_default() .plan(); - let mut res: TransactionStatus = plan.execute().await?; + let mut status: TransactionStatus = match plan.execute().await { + Ok(status) => status, + Err(Error::ExtractedErrors(mut errors)) => match errors.pop() { + Some(Error::KeyError(key_err)) => { + if let Some(txn_not_found) = key_err.txn_not_found { + return Err(Error::TxnNotFound(txn_not_found)); + } + // TODO: handle primary mismatch error. + return Err(Error::KeyError(key_err)); + } + Some(err) => return Err(err), + None => unreachable!(), + }, + Err(err) => return Err(err), + }; let current = pd_client.clone().get_timestamp().await?; - res.check_ttl(current); - let res = Arc::new(res); + status.check_ttl(current); + let res = Arc::new(status); if res.is_cacheable() { self.ctx.save_resolved(txn_id, res.clone()).await; } @@ -420,6 +471,74 @@ impl LockResolver { let _ = plan.execute().await?; Ok(ver_id) } + + #[allow(clippy::too_many_arguments)] + async fn get_txn_status_from_lock( + &mut self, + mut backoff: Backoff, + lock: &kvrpcpb::LockInfo, + caller_start_ts: u64, + current_ts: u64, + force_sync_commit: bool, + pd_client: Arc, + keyspace: Keyspace, + ) -> Result> { + let current_ts = if lock.lock_ttl == 0 { + // NOTE: lock_ttl = 0 is a special protocol!!! + // When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**. + // In this case, TiKV use lock TTL = 0 to notify client, and client should resolve the lock! + // Set current_ts to max uint64 to make the lock expired. + u64::MAX + } else { + current_ts + }; + + let mut rollback_if_not_exist = false; + loop { + match self + .check_txn_status( + pd_client.clone(), + keyspace, + lock.lock_version, + lock.primary_lock.clone(), + caller_start_ts, + current_ts, + rollback_if_not_exist, + force_sync_commit, + lock.lock_type == kvrpcpb::Op::PessimisticLock as i32, + lock.is_txn_file, + ) + .await + { + Ok(status) => return Ok(status), + Err(Error::TxnNotFound(txn_not_found)) => { + let current = pd_client.clone().get_timestamp().await?; + if lock_until_expired_ms(lock.lock_version, lock.lock_ttl, current) <= 0 { + warn!( + "lock txn not found, lock has expired, lock {:?}, caller_start_ts {}, current_ts {}", + lock, caller_start_ts, current_ts + ); + rollback_if_not_exist = true; + continue; + } else if lock.lock_type == kvrpcpb::Op::PessimisticLock as i32 { + let status = TransactionStatus { + kind: TransactionStatusKind::Locked(lock.lock_ttl, lock.clone()), + action: kvrpcpb::Action::NoAction, + is_expired: false, + }; + return Ok(Arc::new(status)); + } + + if let Some(duration) = backoff.next_delay_duration() { + sleep(duration).await; + continue; + } + return Err(Error::TxnNotFound(txn_not_found)); + } + Err(err) => return Err(err), + } + } + } } pub trait HasLocks { @@ -428,6 +547,12 @@ pub trait HasLocks { } } +// Return duration in milliseconds until lock expired. +// If the lock has expired, return a negative value. +pub fn lock_until_expired_ms(lock_version: u64, ttl: u64, current: Timestamp) -> i64 { + Timestamp::from_version(lock_version).physical + ttl as i64 - current.physical +} + #[cfg(test)] mod tests { use std::any::Any; @@ -463,7 +588,7 @@ mod tests { let key = vec![1]; let region1 = MockPdClient::region1(); - let resolved_region = resolve_lock_with_retry(&key, 1, 2, client.clone(), keyspace) + let resolved_region = resolve_lock_with_retry(&key, 1, 2, false, client.clone(), keyspace) .await .unwrap(); assert_eq!(region1.ver_id(), resolved_region); @@ -471,7 +596,7 @@ mod tests { // Test resolve lock over retry limit fail::cfg("region-error", "10*return").unwrap(); let key = vec![100]; - resolve_lock_with_retry(&key, 3, 4, client, keyspace) + resolve_lock_with_retry(&key, 3, 4, false, client, keyspace) .await .expect_err("should return error"); } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index ebbb52a8..2e7363d7 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -191,11 +191,12 @@ impl Merge for Collect { pub fn new_resolve_lock_request( start_version: u64, commit_version: u64, + is_txn_file: bool, ) -> kvrpcpb::ResolveLockRequest { let mut req = kvrpcpb::ResolveLockRequest::default(); req.start_version = start_version; req.commit_version = commit_version; - + req.is_txn_file = is_txn_file; req } @@ -213,6 +214,7 @@ impl KvRequest for kvrpcpb::ResolveLockRequest { type Response = kvrpcpb::ResolveLockResponse; } +#[allow(dead_code)] pub fn new_cleanup_request(key: Vec, start_version: u64) -> kvrpcpb::CleanupRequest { let mut req = kvrpcpb::CleanupRequest::default(); req.key = key; @@ -654,6 +656,7 @@ impl Process for DefaultProcessor { } } +#[allow(clippy::too_many_arguments)] pub fn new_check_txn_status_request( primary_key: Vec, lock_ts: u64, @@ -662,6 +665,7 @@ pub fn new_check_txn_status_request( rollback_if_not_exist: bool, force_sync_commit: bool, resolving_pessimistic_lock: bool, + is_txn_file: bool, ) -> kvrpcpb::CheckTxnStatusRequest { let mut req = kvrpcpb::CheckTxnStatusRequest::default(); req.primary_key = primary_key; @@ -671,6 +675,7 @@ pub fn new_check_txn_status_request( req.rollback_if_not_exist = rollback_if_not_exist; req.force_sync_commit = force_sync_commit; req.resolving_pessimistic_lock = resolving_pessimistic_lock; + req.is_txn_file = is_txn_file; req } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index b30ef6e4..5dd53e21 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -142,9 +142,9 @@ impl Transaction { self.buffer .get_or_else(key, |key| async move { - let request = new_get_request(key, timestamp); + let request = new_get_request(key, timestamp.clone()); let plan = PlanBuilder::new(rpc, keyspace, request) - .resolve_lock(retry_options.lock_backoff, keyspace) + .resolve_lock(timestamp, retry_options.lock_backoff, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) .post_process_default() @@ -277,9 +277,9 @@ impl Transaction { self.buffer .batch_get_or_else(keys, move |keys| async move { - let request = new_batch_get_request(keys, timestamp); + let request = new_batch_get_request(keys, timestamp.clone()); let plan = PlanBuilder::new(rpc, keyspace, request) - .resolve_lock(retry_options.lock_backoff, keyspace) + .resolve_lock(timestamp, retry_options.lock_backoff, keyspace) .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); @@ -761,6 +761,7 @@ impl Transaction { ); let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .resolve_lock( + self.timestamp.clone(), self.options.retry_options.lock_backoff.clone(), self.keyspace, ) @@ -793,10 +794,15 @@ impl Transaction { !key_only, reverse, move |new_range, new_limit| async move { - let request = - new_scan_request(new_range, timestamp, new_limit, key_only, reverse); + let request = new_scan_request( + new_range, + timestamp.clone(), + new_limit, + key_only, + reverse, + ); let plan = PlanBuilder::new(rpc, keyspace, request) - .resolve_lock(retry_options.lock_backoff, keyspace) + .resolve_lock(timestamp, retry_options.lock_backoff, keyspace) .retry_multi_region(retry_options.region_backoff) .merge(Collect) .plan(); @@ -853,6 +859,7 @@ impl Transaction { ); let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .resolve_lock( + self.timestamp.clone(), self.options.retry_options.lock_backoff.clone(), self.keyspace, ) @@ -905,11 +912,12 @@ impl Transaction { let req = new_pessimistic_rollback_request( keys.clone().into_iter(), - start_version, + start_version.clone(), for_update_ts, ); let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req) .resolve_lock( + start_version, self.options.retry_options.lock_backoff.clone(), self.keyspace, ) @@ -1337,6 +1345,7 @@ impl Committer { let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .resolve_lock( + self.start_version.clone(), self.options.retry_options.lock_backoff.clone(), self.keyspace, ) @@ -1380,6 +1389,7 @@ impl Committer { ); let plan = PlanBuilder::new(self.rpc.clone(), self.keyspace, req) .resolve_lock( + self.start_version.clone(), self.options.retry_options.lock_backoff.clone(), self.keyspace, ) @@ -1402,6 +1412,7 @@ impl Committer { async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> { debug!("committing secondary"); + let start_version = self.start_version.clone(); let mutations_len = self.mutations.len(); let primary_only = mutations_len == 1; #[cfg(not(feature = "integration-tests"))] @@ -1435,7 +1446,7 @@ impl Committer { let req = if self.options.async_commit { let keys = mutations.map(|m| m.key.into()); - new_commit_request(keys, self.start_version, commit_version) + new_commit_request(keys, start_version.clone(), commit_version) } else if primary_only { return Ok(()); } else { @@ -1443,10 +1454,14 @@ impl Committer { let keys = mutations .map(|m| m.key.into()) .filter(|key| &primary_key != key); - new_commit_request(keys, self.start_version, commit_version) + new_commit_request(keys, start_version.clone(), commit_version) }; let plan = PlanBuilder::new(self.rpc, self.keyspace, req) - .resolve_lock(self.options.retry_options.lock_backoff, self.keyspace) + .resolve_lock( + start_version, + self.options.retry_options.lock_backoff, + self.keyspace, + ) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); @@ -1463,20 +1478,30 @@ impl Committer { .mutations .into_iter() .map(|mutation| mutation.key.into()); + let start_version = self.start_version.clone(); match self.options.kind { TransactionKind::Optimistic => { - let req = new_batch_rollback_request(keys, self.start_version); + let req = new_batch_rollback_request(keys, start_version.clone()); let plan = PlanBuilder::new(self.rpc, self.keyspace, req) - .resolve_lock(self.options.retry_options.lock_backoff, self.keyspace) + .resolve_lock( + start_version.clone(), + self.options.retry_options.lock_backoff, + self.keyspace, + ) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); plan.execute().await?; } TransactionKind::Pessimistic(for_update_ts) => { - let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts); + let req = + new_pessimistic_rollback_request(keys, start_version.clone(), for_update_ts); let plan = PlanBuilder::new(self.rpc, self.keyspace, req) - .resolve_lock(self.options.retry_options.lock_backoff, self.keyspace) + .resolve_lock( + start_version.clone(), + self.options.retry_options.lock_backoff, + self.keyspace, + ) .retry_multi_region(self.options.retry_options.region_backoff) .extract_error() .plan(); From adea0f7857885b750657c6ab7b6a5159d4af9509 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 19 Jan 2026 16:41:06 +0800 Subject: [PATCH 2/6] fix Signed-off-by: Ping Yu --- src/transaction/lock.rs | 21 ++++++++++++++++++--- src/transaction/requests.rs | 1 + 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index be4fb8aa..ab8f6541 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -37,6 +37,11 @@ use crate::Result; const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; +fn format_key_for_log(key: &[u8]) -> String { + let prefix_len = key.len().min(16); + format!("len={}, prefix={:?}", key.len(), &key[..prefix_len]) +} + /// _Resolves_ the given locks. Returns locks still live. When there is no live locks, all the given locks are resolved. /// /// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock, @@ -70,7 +75,7 @@ pub async fn resolve_locks( let mut clean_regions: HashMap> = HashMap::new(); for lock in expired_locks { let region_ver_id = pd_client - .region_for_key(&lock.primary_lock.clone().into()) + .region_for_key(&lock.key.clone().into()) .await? .ver_id(); // skip if the region is cleaned @@ -143,7 +148,6 @@ async fn resolve_lock_with_retry( ) -> Result { debug!("resolving locks with retry"); // FIXME: Add backoff - let timestamp = Timestamp::from_version(start_version); let mut error = None; for i in 0..RESOLVE_LOCK_RETRY_LIMIT { debug!("resolving locks: attempt {}", (i + 1)); @@ -154,7 +158,6 @@ async fn resolve_lock_with_retry( let plan = crate::request::PlanBuilder::new(pd_client.clone(), keyspace, request) .single_region_with_store(store) .await? - .resolve_lock(timestamp.clone(), Backoff::no_backoff(), keyspace) .extract_error() .plan(); match plan.execute().await { @@ -169,6 +172,18 @@ async fn resolve_lock_with_retry( error = e; continue; } + Some(Error::KeyError(key_err)) => { + // Keyspace is not truncated here because we need full key info for logging. + error!( + "resolve_lock error, unexpected resolve err: {:?}, lock: {{key: {}, start_version: {}, commit_version: {}, is_txn_file: {}}}", + key_err, + format_key_for_log(key), + start_version, + commit_version, + is_txn_file, + ); + return Err(Error::KeyError(key_err)); + } Some(e) => return Err(e), None => unreachable!(), } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 2e7363d7..ab0e6a43 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -675,6 +675,7 @@ pub fn new_check_txn_status_request( req.rollback_if_not_exist = rollback_if_not_exist; req.force_sync_commit = force_sync_commit; req.resolving_pessimistic_lock = resolving_pessimistic_lock; + req.verify_is_primary = true; req.is_txn_file = is_txn_file; req } From 49d6cb4bcb5c47559968bf93a5291f0c9c21be29 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 19 Jan 2026 17:19:30 +0800 Subject: [PATCH 3/6] polish Signed-off-by: Ping Yu --- src/raw/requests.rs | 4 ---- src/request/mod.rs | 8 -------- src/transaction/lock.rs | 5 ++--- src/transaction/requests.rs | 31 ------------------------------- 4 files changed, 2 insertions(+), 46 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index df9c028d..1f018184 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -659,11 +659,9 @@ mod test { use super::*; use crate::backoff::DEFAULT_REGION_BACKOFF; - use crate::backoff::OPTIMISTIC_BACKOFF; use crate::mock::MockKvClient; use crate::mock::MockPdClient; use crate::proto::kvrpcpb; - use crate::proto::pdpb::Timestamp; use crate::request::Keyspace; use crate::request::Plan; @@ -701,7 +699,6 @@ mod test { ..Default::default() }; let plan = crate::request::PlanBuilder::new(client, keyspace, scan) - .resolve_lock(Timestamp::default(), OPTIMISTIC_BACKOFF, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(Collect) .plan(); @@ -757,7 +754,6 @@ mod test { new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false); let keyspace = Keyspace::Enable { keyspace_id: 0 }; let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request) - .resolve_lock(Timestamp::default(), OPTIMISTIC_BACKOFF, keyspace) .retry_multi_region(DEFAULT_REGION_BACKOFF) .plan(); let _ = plan.execute().await; diff --git a/src/request/mod.rs b/src/request/mod.rs index cdd12f4f..c8fd07be 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -105,7 +105,6 @@ mod test { use crate::region::RegionWithLeader; use crate::store::region_stream_for_keys; use crate::store::HasRegionError; - use crate::timestamp::TimestampExt as _; use crate::transaction::lowering::new_commit_request; use crate::Error; use crate::Key; @@ -198,11 +197,6 @@ mod test { ))); let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, request) - .resolve_lock( - Timestamp::max(), - Backoff::no_jitter_backoff(1, 1, 3), - Keyspace::Disable, - ) .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3)) .extract_error() .plan(); @@ -229,14 +223,12 @@ mod test { // does not extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req.clone()) - .resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF, Keyspace::Disable) .retry_multi_region(OPTIMISTIC_BACKOFF) .plan(); assert!(plan.execute().await.is_ok()); // extract error let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req) - .resolve_lock(Timestamp::max(), OPTIMISTIC_BACKOFF, Keyspace::Disable) .retry_multi_region(OPTIMISTIC_BACKOFF) .extract_error() .plan(); diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index ab8f6541..26d540ee 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -46,9 +46,8 @@ fn format_key_for_log(key: &[u8]) -> String { /// /// If a key has a lock, the latest status of the key is unknown. We need to "resolve" the lock, /// which means the key is finally either committed or rolled back, before we read the value of -/// the key. We first use `CleanupRequest` to let the status of the primary lock converge and get -/// its status (committed or rolled back). Then, we use the status of its primary lock to determine -/// the status of the other keys in the same transaction. +/// the key. We first use `CheckTxnStatus` to get the transaction's final status (committed or +/// rolled back), then use `ResolveLock` to resolve the remaining locks in the transaction. pub async fn resolve_locks( locks: Vec, timestamp: Timestamp, diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index ab0e6a43..f5859ba0 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -214,35 +214,6 @@ impl KvRequest for kvrpcpb::ResolveLockRequest { type Response = kvrpcpb::ResolveLockResponse; } -#[allow(dead_code)] -pub fn new_cleanup_request(key: Vec, start_version: u64) -> kvrpcpb::CleanupRequest { - let mut req = kvrpcpb::CleanupRequest::default(); - req.key = key; - req.start_version = start_version; - - req -} - -impl KvRequest for kvrpcpb::CleanupRequest { - type Response = kvrpcpb::CleanupResponse; -} - -shardable_key!(kvrpcpb::CleanupRequest); -collect_single!(kvrpcpb::CleanupResponse); -impl SingleKey for kvrpcpb::CleanupRequest { - fn key(&self) -> &Vec { - &self.key - } -} - -impl Process for DefaultProcessor { - type Out = u64; - - fn process(&self, input: Result) -> Result { - Ok(input?.commit_version) - } -} - pub fn new_prewrite_request( mutations: Vec, primary_lock: Vec, @@ -856,8 +827,6 @@ error_locks!(kvrpcpb::TxnHeartBeatResponse); error_locks!(kvrpcpb::CheckTxnStatusResponse); error_locks!(kvrpcpb::CheckSecondaryLocksResponse); -impl HasLocks for kvrpcpb::CleanupResponse {} - impl HasLocks for kvrpcpb::ScanLockResponse { fn take_locks(&mut self) -> Vec { std::mem::take(&mut self.locks) From 3b33c219fd66bef56624686ab31e926aab62ccb3 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 19 Jan 2026 17:33:28 +0800 Subject: [PATCH 4/6] pick proto Signed-off-by: Ping Yu --- proto/kvrpcpb.proto | 9 +++++++++ src/generated/kvrpcpb.rs | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/proto/kvrpcpb.proto b/proto/kvrpcpb.proto index a785549e..288ff39e 100644 --- a/proto/kvrpcpb.proto +++ b/proto/kvrpcpb.proto @@ -274,6 +274,9 @@ message TxnHeartBeatRequest { uint64 start_version = 3; // The new TTL the sender would like. uint64 advise_lock_ttl = 4; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message TxnHeartBeatResponse { @@ -375,6 +378,9 @@ message CommitRequest { repeated bytes keys = 3; // Timestamp for the end of the transaction. Must be greater than `start_version`. uint64 commit_version = 4; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message CommitResponse { @@ -443,6 +449,9 @@ message BatchRollbackRequest { uint64 start_version = 2; // The keys to rollback. repeated bytes keys = 3; + + // Reserved for file based transaction. + bool is_txn_file = 100; } message BatchRollbackResponse { diff --git a/src/generated/kvrpcpb.rs b/src/generated/kvrpcpb.rs index 25685c8b..4bf66314 100644 --- a/src/generated/kvrpcpb.rs +++ b/src/generated/kvrpcpb.rs @@ -366,6 +366,9 @@ pub struct TxnHeartBeatRequest { /// The new TTL the sender would like. #[prost(uint64, tag = "4")] pub advise_lock_ttl: u64, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -503,6 +506,9 @@ pub struct CommitRequest { /// Timestamp for the end of the transaction. Must be greater than `start_version`. #[prost(uint64, tag = "4")] pub commit_version: u64, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -603,6 +609,9 @@ pub struct BatchRollbackRequest { /// The keys to rollback. #[prost(bytes = "vec", repeated, tag = "3")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + /// Reserved for file based transaction. + #[prost(bool, tag = "100")] + pub is_txn_file: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] From 74ca4d04f74d63349c9a151cd215744363931d11 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 19 Jan 2026 18:06:10 +0800 Subject: [PATCH 5/6] resolve all locks Signed-off-by: Ping Yu --- src/store/errors.rs | 2 -- src/store/request.rs | 1 - src/transaction/lock.rs | 74 +++++++++++++++++++++++++++++++++-------- 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/src/store/errors.rs b/src/store/errors.rs index 47b6bdfa..e8fa1009 100644 --- a/src/store/errors.rs +++ b/src/store/errors.rs @@ -43,7 +43,6 @@ has_region_error!(kvrpcpb::PessimisticLockResponse); has_region_error!(kvrpcpb::ImportResponse); has_region_error!(kvrpcpb::BatchRollbackResponse); has_region_error!(kvrpcpb::PessimisticRollbackResponse); -has_region_error!(kvrpcpb::CleanupResponse); has_region_error!(kvrpcpb::BatchGetResponse); has_region_error!(kvrpcpb::ScanLockResponse); has_region_error!(kvrpcpb::ResolveLockResponse); @@ -79,7 +78,6 @@ macro_rules! has_key_error { has_key_error!(kvrpcpb::GetResponse); has_key_error!(kvrpcpb::CommitResponse); has_key_error!(kvrpcpb::BatchRollbackResponse); -has_key_error!(kvrpcpb::CleanupResponse); has_key_error!(kvrpcpb::ScanLockResponse); has_key_error!(kvrpcpb::ResolveLockResponse); has_key_error!(kvrpcpb::GcResponse); diff --git a/src/store/request.rs b/src/store/request.rs index df66b73a..65911dcc 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -89,7 +89,6 @@ impl_request!(GetRequest, kv_get, "kv_get"); impl_request!(ScanRequest, kv_scan, "kv_scan"); impl_request!(PrewriteRequest, kv_prewrite, "kv_prewrite"); impl_request!(CommitRequest, kv_commit, "kv_commit"); -impl_request!(CleanupRequest, kv_cleanup, "kv_cleanup"); impl_request!(BatchGetRequest, kv_batch_get, "kv_batch_get"); impl_request!(BatchRollbackRequest, kv_batch_rollback, "kv_batch_rollback"); impl_request!( diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 26d540ee..f899e8c9 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -58,21 +58,24 @@ pub async fn resolve_locks( let ts = pd_client.clone().get_timestamp().await?; let caller_start_ts = timestamp.version(); let current_ts = ts.version(); - let (expired_locks, live_locks) = - locks - .into_iter() - .partition::, _>(|lock| { - ts.physical - Timestamp::from_version(lock.lock_version).physical - >= lock.lock_ttl as i64 - }); - - let mut live_locks = live_locks; + + let mut live_locks = Vec::new(); let mut lock_resolver = LockResolver::new(ResolveLocksContext::default()); // records the commit version of each primary lock (representing the status of the transaction) let mut commit_versions: HashMap = HashMap::new(); let mut clean_regions: HashMap> = HashMap::new(); - for lock in expired_locks { + // We must check txn status for *all* locks, not only TTL-expired ones. + // + // TTL only indicates whether a lock is *possibly* orphaned; it does not mean the transaction + // is still running. A transaction may already be committed/rolled back while its locks are + // still visible (e.g. cleanup/resolve hasn't finished, retries after region errors, etc.). + // If we only resolve TTL-expired locks, we can unnecessarily sleep/backoff until TTL even + // though `CheckTxnStatus` would already report `Committed`/`RolledBack`. + // + // This matches the client-go `LockResolver.ResolveLocksWithOpts` flow: query txn status for + // each encountered lock, then resolve immediately when the status is final. + for lock in locks { let region_ver_id = pd_client .region_for_key(&lock.key.clone().into()) .await? @@ -111,7 +114,10 @@ pub async fn resolve_locks( commit_versions.insert(lock.lock_version, 0); Some(0) } - TransactionStatusKind::Locked(..) => None, + TransactionStatusKind::Locked(_, lock_info) => { + live_locks.push(lock_info.clone()); + None + } } } }; @@ -130,8 +136,6 @@ pub async fn resolve_locks( .entry(lock.lock_version) .or_default() .insert(cleaned_region); - } else { - live_locks.push(lock); } } Ok(live_locks) @@ -570,6 +574,8 @@ pub fn lock_until_expired_ms(lock_version: u64, ttl: u64, current: Timestamp) -> #[cfg(test)] mod tests { use std::any::Any; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; use serial_test::serial; @@ -614,4 +620,46 @@ mod tests { .await .expect_err("should return error"); } + + #[tokio::test] + #[serial] + async fn test_resolve_locks_resolves_committed_even_if_ttl_not_expired() { + let check_txn_status_count = Arc::new(AtomicUsize::new(0)); + let resolve_lock_count = Arc::new(AtomicUsize::new(0)); + + let check_txn_status_count_captured = check_txn_status_count.clone(); + let resolve_lock_count_captured = resolve_lock_count.clone(); + let client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook( + move |req: &dyn Any| { + if req.is::() { + check_txn_status_count_captured.fetch_add(1, Ordering::SeqCst); + let resp = kvrpcpb::CheckTxnStatusResponse { + commit_version: 2, + action: kvrpcpb::Action::NoAction as i32, + ..Default::default() + }; + return Ok(Box::new(resp) as Box); + } + if req.is::() { + resolve_lock_count_captured.fetch_add(1, Ordering::SeqCst); + return Ok(Box::::default() as Box); + } + panic!("unexpected request type: {:?}", req.type_id()); + }, + ))); + + let mut lock = kvrpcpb::LockInfo::default(); + lock.key = vec![1]; + lock.primary_lock = vec![1]; + lock.lock_version = 1; + lock.lock_ttl = 100; // not expired under MockPdClient's Timestamp::default() + + let live_locks = resolve_locks(vec![lock], Timestamp::default(), client, Keyspace::Disable) + .await + .unwrap(); + + assert!(live_locks.is_empty()); + assert_eq!(check_txn_status_count.load(Ordering::SeqCst), 1); + assert_eq!(resolve_lock_count.load(Ordering::SeqCst), 1); + } } From bde963cedaefd2981899e7c6c32e51163d076e1c Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 19 Jan 2026 18:42:13 +0800 Subject: [PATCH 6/6] remove not used Timestamp::max Signed-off-by: Ping Yu --- src/timestamp.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/timestamp.rs b/src/timestamp.rs index f257a44d..5c610e77 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -22,10 +22,6 @@ pub trait TimestampExt: Sized { fn from_version(version: u64) -> Self; /// Convert u64 to an optional timestamp, where `0` represents no timestamp. fn try_from_version(version: u64) -> Option; - /// Return the maximum timestamp. - fn max() -> Self { - Self::from_version(u64::MAX) - } } impl TimestampExt for Timestamp {