Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// 2PC functionality is disabled, clients that attempt to use this functionality
// would receive an authorization failed error.
responseCallback(initTransactionError(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED))
} else if (keepPreparedTxn) {
// if the request is to keep the prepared transaction, then return an
// unsupported version error since the feature hasn't been implemented yet.
responseCallback(initTransactionError(Errors.UNSUPPORTED_VERSION))
} else if (keepPreparedTxn && !enableTwoPCFlag) {
// keepPreparedTxn is only meaningful for 2PC recovery
responseCallback(initTransactionError(Errors.INVALID_REQUEST))
} else if (!txnManager.validateTransactionTimeoutMs(enableTwoPCFlag, transactionTimeoutMs)) {
// check transactionTimeoutMs is not larger than the broker configured maximum allowed value
responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
Expand Down Expand Up @@ -171,6 +170,23 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata)
}

// 2PC recovery: if the client wants to keep a prepared transaction and an ongoing 2PC
// transaction exists, preserve it and return its producer id/epoch so the client can
// finalize it. There is no state change, so respond directly without a log append.
val preservedResult = coordinatorEpochAndMetadata.toOption.flatMap { existingEpochAndMetadata =>
val txnMetadata = existingEpochAndMetadata.transactionMetadata

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The epoch fence is skipped on the preserve path —
the prepared txn's epoch must stay valid for completeTransaction.

txnMetadata.inLock(() =>
if (keepPreparedTxn && txnMetadata.state == TransactionState.ONGOING && txnMetadata.isDistributedTwoPhaseCommitTxn)
Some(new InitProducerIdResult(txnMetadata.producerId, txnMetadata.producerEpoch,
txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE))
else
None
)
}

if (preservedResult.isDefined) {
responseCallback(preservedResult.get)
} else {
val result: ApiResult[(Int, TxnTransitMetadata)] = coordinatorEpochAndMetadata.flatMap {
existingEpochAndMetadata =>
val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
Expand Down Expand Up @@ -222,6 +238,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
sendPidResponseCallback, requestLocal = requestLocal)
}
}
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1668,6 +1668,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseData = new InitProducerIdResponseData()
.setProducerId(result.producerId)
.setProducerEpoch(result.producerEpoch)
.setOngoingTxnProducerId(result.ongoingTxnProducerId)
.setOngoingTxnProducerEpoch(result.ongoingTxnProducerEpoch)
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(finalError.code)
val responseBody = new InitProducerIdResponse(responseData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,59 @@ class TransactionCoordinatorTest {
assertEquals(new InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result)
}

@Test
def shouldReturnInvalidRequestWhenKeepPreparedTxnTrueButTwoPCDisabled(): Unit = {
mockPidGenerator()

coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, enableTwoPCFlag = false,
keepPreparedTxn = true, None, initProducerIdMockCallback)
assertEquals(new InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result)
}

@Test
def shouldAcceptKeepPreparedTxnWhen2PCEnabledAndNoOngoingTxn(): Unit = {
initPidGenericMocks(transactionalId)
when(transactionManager.isTransaction2pcEnabled()).thenReturn(true)

when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(None))

when(transactionManager.putTransactionStateIfNotExists(capturedTxn.capture()))
.thenAnswer(_ => Right(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch, capturedTxn.getValue)))

when(transactionManager.appendTransactionToLog(
ArgumentMatchers.eq(transactionalId),
ArgumentMatchers.eq(coordinatorEpoch),
any[TxnTransitMetadata],
capturedErrorsCallback.capture(),
any(),
any())
).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))

coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, enableTwoPCFlag = true,
keepPreparedTxn = true, None, initProducerIdMockCallback)
assertEquals(new InitProducerIdResult(0L, 0, Errors.NONE), result)
}

@Test
def shouldPreserveOngoing2PCTxnWhenKeepPreparedTxnTrue(): Unit = {
mockPidGenerator()
when(transactionManager.validateTransactionTimeoutMs(anyBoolean(), anyInt())).thenReturn(true)
when(transactionManager.isTransaction2pcEnabled()).thenReturn(true)

val ongoing2PCMetadata = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID,
producerEpoch, RecordBatch.NO_PRODUCER_EPOCH, Int.MaxValue, TransactionState.ONGOING, util.Set.of, time.milliseconds(), time.milliseconds(), TV_2)

when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
.thenReturn(Right(Some(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch, ongoing2PCMetadata))))

coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, enableTwoPCFlag = true,
keepPreparedTxn = true, None, initProducerIdMockCallback)

assertEquals(new InitProducerIdResult(producerId, producerEpoch, producerId, producerEpoch, Errors.NONE), result)
verify(transactionManager, never()).appendTransactionToLog(any(), anyInt(), any(), any(), any(), any())
}

@Test
def shouldReturnInvalidRequestWhen2PCEnabledButBroker2PCConfigFalse(): Unit = {
mockPidGenerator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
package org.apache.kafka.coordinator.transaction;

import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.RecordBatch;

public record InitProducerIdResult(long producerId, short producerEpoch, Errors error) {
public record InitProducerIdResult(long producerId, short producerEpoch,
long ongoingTxnProducerId, short ongoingTxnProducerEpoch,
Errors error) {

public InitProducerIdResult(long producerId, short producerEpoch, Errors error) {
this(producerId, producerEpoch, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error);
}
}