Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions proto/kvrpcpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ message PrewriteRequest {
// for_update_ts constriants that should be checked when prewriting a pessimistic transaction.
// See https://github.com/tikv/tikv/issues/14311
repeated ForUpdateTSConstraint for_update_ts_constraints = 16;

// Reserved for file based transaction.
repeated uint64 txn_file_chunks = 100;
}

message PrewriteResponse {
Expand Down Expand Up @@ -271,6 +274,9 @@ message TxnHeartBeatRequest {
uint64 start_version = 3;
// The new TTL the sender would like.
uint64 advise_lock_ttl = 4;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message TxnHeartBeatResponse {
Expand Down Expand Up @@ -315,6 +321,9 @@ message CheckTxnStatusRequest {
// because the old versions of clients cannot handle some results returned from TiKV correctly.
// For new versions, this field should always be set to true.
bool verify_is_primary = 9;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message CheckTxnStatusResponse {
Expand Down Expand Up @@ -369,6 +378,9 @@ message CommitRequest {
repeated bytes keys = 3;
// Timestamp for the end of the transaction. Must be greater than `start_version`.
uint64 commit_version = 4;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message CommitResponse {
Expand Down Expand Up @@ -437,6 +449,9 @@ message BatchRollbackRequest {
uint64 start_version = 2;
// The keys to rollback.
repeated bytes keys = 3;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message BatchRollbackResponse {
Expand Down Expand Up @@ -480,6 +495,9 @@ message ResolveLockRequest {
repeated TxnInfo txn_infos = 4;
// Only resolve specified keys.
repeated bytes keys = 5;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message ResolveLockResponse {
Expand Down Expand Up @@ -971,6 +989,9 @@ message LockInfo {
// It's used in timeout errors. 0 means unknown or not applicable.
// It can be used to help the client decide whether to try resolving the lock.
uint64 duration_to_last_update_ms = 11;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

message KeyError {
Expand Down Expand Up @@ -1320,6 +1341,9 @@ message MvccInfo {
message TxnInfo {
uint64 txn = 1;
uint64 status = 2;

// Reserved for file based transaction.
bool is_txn_file = 100;
}

enum Action {
Expand Down
2 changes: 2 additions & 0 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pub enum Error {
},
#[error("Keyspace not found: {0}")]
KeyspaceNotFound(String),
#[error("Transaction not found error: {:?}", _0)]
TxnNotFound(kvrpcpb::TxnNotFound),
}

impl From<crate::proto::errorpb::Error> for Error {
Expand Down
24 changes: 24 additions & 0 deletions src/generated/kvrpcpb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ pub struct PrewriteRequest {
pub for_update_ts_constraints: ::prost::alloc::vec::Vec<
prewrite_request::ForUpdateTsConstraint,
>,
/// Reserved for file based transaction.
#[prost(uint64, repeated, tag = "100")]
pub txn_file_chunks: ::prost::alloc::vec::Vec<u64>,
}
/// Nested message and enum types in `PrewriteRequest`.
pub mod prewrite_request {
Expand Down Expand Up @@ -363,6 +366,9 @@ pub struct TxnHeartBeatRequest {
/// The new TTL the sender would like.
#[prost(uint64, tag = "4")]
pub advise_lock_ttl: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -422,6 +428,9 @@ pub struct CheckTxnStatusRequest {
/// For new versions, this field should always be set to true.
#[prost(bool, tag = "9")]
pub verify_is_primary: bool,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -497,6 +506,9 @@ pub struct CommitRequest {
/// Timestamp for the end of the transaction. Must be greater than `start_version`.
#[prost(uint64, tag = "4")]
pub commit_version: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -597,6 +609,9 @@ pub struct BatchRollbackRequest {
/// The keys to rollback.
#[prost(bytes = "vec", repeated, tag = "3")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -661,6 +676,9 @@ pub struct ResolveLockRequest {
/// Only resolve specified keys.
#[prost(bytes = "vec", repeated, tag = "5")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -1308,6 +1326,9 @@ pub struct LockInfo {
/// It can be used to help the client decide whether to try resolving the lock.
#[prost(uint64, tag = "11")]
pub duration_to_last_update_ms: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -1781,6 +1802,9 @@ pub struct TxnInfo {
pub txn: u64,
#[prost(uint64, tag = "2")]
pub status: u64,
/// Reserved for file based transaction.
#[prost(bool, tag = "100")]
pub is_txn_file: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
3 changes: 0 additions & 3 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,6 @@ mod test {

use super::*;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::OPTIMISTIC_BACKOFF;
use crate::mock::MockKvClient;
use crate::mock::MockPdClient;
use crate::proto::kvrpcpb;
Expand Down Expand Up @@ -700,7 +699,6 @@ mod test {
..Default::default()
};
let plan = crate::request::PlanBuilder::new(client, keyspace, scan)
.resolve_lock(OPTIMISTIC_BACKOFF, keyspace)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
.plan();
Expand Down Expand Up @@ -756,7 +754,6 @@ mod test {
new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false);
let keyspace = Keyspace::Enable { keyspace_id: 0 };
let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request)
.resolve_lock(OPTIMISTIC_BACKOFF, keyspace)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.plan();
let _ = plan.execute().await;
Expand Down
3 changes: 0 additions & 3 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ mod test {
)));

let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, request)
.resolve_lock(Backoff::no_jitter_backoff(1, 1, 3), Keyspace::Disable)
.retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
.extract_error()
.plan();
Expand All @@ -224,14 +223,12 @@ mod test {
// does not extract error
let plan =
crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req.clone())
.resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.plan();
assert!(plan.execute().await.is_ok());

// extract error
let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req)
.resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable)
.retry_multi_region(OPTIMISTIC_BACKOFF)
.extract_error()
.plan();
Expand Down
12 changes: 11 additions & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::pd::PdClient;
use crate::proto::errorpb;
use crate::proto::errorpb::EpochNotMatch;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::region::StoreId;
use crate::region::{RegionVerId, RegionWithLeader};
use crate::request::shard::HasNextBatch;
Expand Down Expand Up @@ -597,6 +598,7 @@ pub struct DefaultProcessor;

pub struct ResolveLock<P: Plan, PdC: PdClient> {
pub inner: P,
pub timestamp: Timestamp,
pub pd_client: Arc<PdC>,
pub backoff: Backoff,
pub keyspace: Keyspace,
Expand All @@ -606,6 +608,7 @@ impl<P: Plan, PdC: PdClient> Clone for ResolveLock<P, PdC> {
fn clone(&self) -> Self {
ResolveLock {
inner: self.inner.clone(),
timestamp: self.timestamp.clone(),
pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(),
keyspace: self.keyspace,
Expand Down Expand Up @@ -634,7 +637,13 @@ where
}

let pd_client = self.pd_client.clone();
let live_locks = resolve_locks(locks, pd_client.clone(), self.keyspace).await?;
let live_locks = resolve_locks(
locks,
self.timestamp.clone(),
pd_client.clone(),
self.keyspace,
)
.await?;
if live_locks.is_empty() {
result = self.inner.execute().await?;
} else {
Expand Down Expand Up @@ -953,6 +962,7 @@ mod test {
let plan = RetryableMultiRegion {
inner: ResolveLock {
inner: ErrPlan,
timestamp: Timestamp::default(),
backoff: Backoff::no_backoff(),
pd_client: Arc::new(MockPdClient::default()),
keyspace: Keyspace::Disable,
Expand Down
3 changes: 3 additions & 0 deletions src/request/plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::transaction::HasLocks;
use crate::transaction::ResolveLocksContext;
use crate::transaction::ResolveLocksOptions;
use crate::Result;
use crate::Timestamp;

/// Builder type for plans (see that module for more).
pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
Expand Down Expand Up @@ -72,6 +73,7 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
/// If there is a lock error, then resolve the lock and retry the request.
pub fn resolve_lock(
self,
timestamp: Timestamp,
backoff: Backoff,
keyspace: Keyspace,
) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
Expand All @@ -82,6 +84,7 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
pd_client: self.pd_client.clone(),
plan: ResolveLock {
inner: self.plan,
timestamp,
backoff,
pd_client: self.pd_client,
keyspace,
Expand Down
2 changes: 0 additions & 2 deletions src/store/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ has_region_error!(kvrpcpb::PessimisticLockResponse);
has_region_error!(kvrpcpb::ImportResponse);
has_region_error!(kvrpcpb::BatchRollbackResponse);
has_region_error!(kvrpcpb::PessimisticRollbackResponse);
has_region_error!(kvrpcpb::CleanupResponse);
has_region_error!(kvrpcpb::BatchGetResponse);
has_region_error!(kvrpcpb::ScanLockResponse);
has_region_error!(kvrpcpb::ResolveLockResponse);
Expand Down Expand Up @@ -79,7 +78,6 @@ macro_rules! has_key_error {
has_key_error!(kvrpcpb::GetResponse);
has_key_error!(kvrpcpb::CommitResponse);
has_key_error!(kvrpcpb::BatchRollbackResponse);
has_key_error!(kvrpcpb::CleanupResponse);
has_key_error!(kvrpcpb::ScanLockResponse);
has_key_error!(kvrpcpb::ResolveLockResponse);
has_key_error!(kvrpcpb::GcResponse);
Expand Down
1 change: 0 additions & 1 deletion src/store/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ impl_request!(GetRequest, kv_get, "kv_get");
impl_request!(ScanRequest, kv_scan, "kv_scan");
impl_request!(PrewriteRequest, kv_prewrite, "kv_prewrite");
impl_request!(CommitRequest, kv_commit, "kv_commit");
impl_request!(CleanupRequest, kv_cleanup, "kv_cleanup");
impl_request!(BatchGetRequest, kv_batch_get, "kv_batch_get");
impl_request!(BatchRollbackRequest, kv_batch_rollback, "kv_batch_rollback");
impl_request!(
Expand Down
Loading