From f182f7fbca90f60e6c333098cb2ee0b789d713c0 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Mon, 29 Jun 2026 23:34:51 +0530 Subject: [PATCH 1/2] KAFKA-20739: Accept keepPreparedTxn and return ongoing 2PC txn --- .../transaction/TransactionCoordinator.scala | 25 ++++++++++++++++--- .../main/scala/kafka/server/KafkaApis.scala | 2 ++ .../transaction/InitProducerIdResult.java | 9 ++++++- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 36a2fa034cfa9..9c67096f682fa 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -139,10 +139,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig, // 2PC functionality is disabled, clients that attempt to use this functionality // would receive an authorization failed error. responseCallback(initTransactionError(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) - } else if (keepPreparedTxn) { - // if the request is to keep the prepared transaction, then return an - // unsupported version error since the feature hasn't been implemented yet. - responseCallback(initTransactionError(Errors.UNSUPPORTED_VERSION)) + } else if (keepPreparedTxn && !enableTwoPCFlag) { + // keepPreparedTxn is only meaningful for 2PC recovery + responseCallback(initTransactionError(Errors.INVALID_REQUEST)) } else if (!txnManager.validateTransactionTimeoutMs(enableTwoPCFlag, transactionTimeoutMs)) { // check transactionTimeoutMs is not larger than the broker configured maximum allowed value responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT)) @@ -171,6 +170,23 @@ class TransactionCoordinator(txnConfig: TransactionConfig, case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata) } + // 2PC recovery: if the client wants to keep a prepared transaction and an ongoing 2PC + // transaction exists, preserve it and return its producer id/epoch so the client can + // finalize it. There is no state change, so respond directly without a log append. + val preservedResult = coordinatorEpochAndMetadata.toOption.flatMap { existingEpochAndMetadata => + val txnMetadata = existingEpochAndMetadata.transactionMetadata + txnMetadata.inLock(() => + if (keepPreparedTxn && txnMetadata.state == TransactionState.ONGOING && txnMetadata.isDistributedTwoPhaseCommitTxn) + Some(new InitProducerIdResult(txnMetadata.producerId, txnMetadata.producerEpoch, + txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE)) + else + None + ) + } + + if (preservedResult.isDefined) { + responseCallback(preservedResult.get) + } else { val result: ApiResult[(Int, TxnTransitMetadata)] = coordinatorEpochAndMetadata.flatMap { existingEpochAndMetadata => val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch @@ -222,6 +238,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig, sendPidResponseCallback, requestLocal = requestLocal) } } + } } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 36948c34fb879..680f0c544038f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1668,6 +1668,8 @@ class KafkaApis(val requestChannel: RequestChannel, val responseData = new InitProducerIdResponseData() .setProducerId(result.producerId) .setProducerEpoch(result.producerEpoch) + .setOngoingTxnProducerId(result.ongoingTxnProducerId) + .setOngoingTxnProducerEpoch(result.ongoingTxnProducerEpoch) .setThrottleTimeMs(requestThrottleMs) .setErrorCode(finalError.code) val responseBody = new InitProducerIdResponse(responseData) diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/InitProducerIdResult.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/InitProducerIdResult.java index 2d31c000d0d4e..2d17e7ffd28d8 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/InitProducerIdResult.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/InitProducerIdResult.java @@ -17,6 +17,13 @@ package org.apache.kafka.coordinator.transaction; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.internal.RecordBatch; -public record InitProducerIdResult(long producerId, short producerEpoch, Errors error) { +public record InitProducerIdResult(long producerId, short producerEpoch, + long ongoingTxnProducerId, short ongoingTxnProducerEpoch, + Errors error) { + + public InitProducerIdResult(long producerId, short producerEpoch, Errors error) { + this(producerId, producerEpoch, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error); + } } From eb0f6cd28ee05744b584fdc383d0f73d2abd0352 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Mon, 29 Jun 2026 23:35:32 +0530 Subject: [PATCH 2/2] KAFKA-20739: Add coordinator keepPreparedTxn recovery tests --- .../TransactionCoordinatorTest.scala | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index 91e5a4f7ce596..3dd5cbc30cd47 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -109,6 +109,59 @@ class TransactionCoordinatorTest { assertEquals(new InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result) } + @Test + def shouldReturnInvalidRequestWhenKeepPreparedTxnTrueButTwoPCDisabled(): Unit = { + mockPidGenerator() + + coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, enableTwoPCFlag = false, + keepPreparedTxn = true, None, initProducerIdMockCallback) + assertEquals(new InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result) + } + + @Test + def shouldAcceptKeepPreparedTxnWhen2PCEnabledAndNoOngoingTxn(): Unit = { + initPidGenericMocks(transactionalId) + when(transactionManager.isTransaction2pcEnabled()).thenReturn(true) + + when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) + .thenReturn(Right(None)) + + when(transactionManager.putTransactionStateIfNotExists(capturedTxn.capture())) + .thenAnswer(_ => Right(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch, capturedTxn.getValue))) + + when(transactionManager.appendTransactionToLog( + ArgumentMatchers.eq(transactionalId), + ArgumentMatchers.eq(coordinatorEpoch), + any[TxnTransitMetadata], + capturedErrorsCallback.capture(), + any(), + any()) + ).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE)) + + coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, enableTwoPCFlag = true, + keepPreparedTxn = true, None, initProducerIdMockCallback) + assertEquals(new InitProducerIdResult(0L, 0, Errors.NONE), result) + } + + @Test + def shouldPreserveOngoing2PCTxnWhenKeepPreparedTxnTrue(): Unit = { + mockPidGenerator() + when(transactionManager.validateTransactionTimeoutMs(anyBoolean(), anyInt())).thenReturn(true) + when(transactionManager.isTransaction2pcEnabled()).thenReturn(true) + + val ongoing2PCMetadata = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID, + producerEpoch, RecordBatch.NO_PRODUCER_EPOCH, Int.MaxValue, TransactionState.ONGOING, util.Set.of, time.milliseconds(), time.milliseconds(), TV_2) + + when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) + .thenReturn(Right(Some(new CoordinatorEpochAndTxnMetadata(coordinatorEpoch, ongoing2PCMetadata)))) + + coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, enableTwoPCFlag = true, + keepPreparedTxn = true, None, initProducerIdMockCallback) + + assertEquals(new InitProducerIdResult(producerId, producerEpoch, producerId, producerEpoch, Errors.NONE), result) + verify(transactionManager, never()).appendTransactionToLog(any(), anyInt(), any(), any(), any(), any()) + } + @Test def shouldReturnInvalidRequestWhen2PCEnabledButBroker2PCConfigFalse(): Unit = { mockPidGenerator()