-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18401: Transaction version 2 does not support commit transaction without records #18448
Conversation
…n without records
@@ -854,7 +856,7 @@ synchronized TxnRequestHandler nextRequest(boolean hasIncompleteBatches) { | |||
return null; | |||
} | |||
|
|||
if (nextRequestHandler.isEndTxn() && (!isTransactionV2Enabled() && !transactionStarted)) { | |||
if (nextRequestHandler.isEndTxn() && !transactionStarted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit here: I wonder if we can change the log here for tv2 so that we say we didn't attempt to add any partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon me, could you give me some hint? Did you mean we need to modify the log in line 862
log.debug("Not sending EndTxn for completed transaction since no partitions " +
"or offsets were successfully added");
to something else when using TV2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right -- the implication of successfully added is a little confusing in the TV2 case because it is not a matter of "successfully" adding but a matter of not attempting to add any at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I modified the log to Not sending EndTxn for completed transaction since no send or sendOffsetsToTransaction were triggered
under TV2. Thank you.
core/src/test/java/kafka/clients/producer/ProducerIntegrationTest.java
Outdated
Show resolved
Hide resolved
new ClusterTest(features = Array( | ||
new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))), | ||
)) | ||
def testTransactionWithSendOffset(cluster: ClusterInstance): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test new behavior/one related to the change? The one other thing I was curious about was a test where we try to send a record, it fails to be added and we should be able to abort, but a commit (in the case where a partition was not added) will fail. That one might be tricky to code -- especially the part where we fail to add the partition on the server side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test new behavior/one related to the change?
Yes, I want to ensure that if a transaction with sendOffsetToTransaction triggered and no sendRecords are fired, then executing commitTransaction should not raise any errors. And on the server side, the transaction state should be COMPLETE_COMMIT, which means the END_TXN request has been sent to the broker.
The one other thing I was curious about was a test where we try to send a record, it fails to be added and we should be able to abort, but a commit (in the case where a partition was not added) will fail. That one might be tricky to code -- especially the part where we fail to add the partition on the server side.
Hmm... I need some time to figure out how to make adding the partition fail on server side. Once that's done, I think I can come up with the test you mentioned.
Thanks for the suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one test, testFencingOnAddPartitions
in TransactionTest.scala [0], which seems to do something similar—creating a scenario where TransactionCoordinator#handleAddPartitionsToTransaction
raises an error, Errors.PRODUCER_FENCED
due to a mismatch in the producer epoch [1] and cause AddPartitionToTxn request failed.
However, in this test, we cannot commit or abort the transaction successfully. Committing the transaction directly fails from the client side since the last error caused by the failed send is not empty[2]. In the case of aborting the transaction, since there is a newer producer (producer2), the broker raises the error Errors.PRODUCER_FENCED, causing the abort to fail.
Perhaps the producer fenced test case is not what we want? I'm just wondering under what circumstances the AddPartitionToTxn
would fail and in producer we can abort transaction successfully but commit transaction would fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit tricky. Depending on the error, a producer will enter a fatal error state or an abortable error state. In the abortable error state, we can always abort, but not commit. In fatal state, we can't do anything. The fenced error I believe puts the producer in fatal state. There are a few other errors that are only "abortable" but not sure if they can easily come up in test scenarios. Some there are some auth errors like this for example, but that may be hard to replicate on a test. KIP-1050 does a pretty good job explaining the current state of transactional errors. You can see in the "current handling" columns of the tables in the KIP. https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050:+Consistent+error+handling+for+Transactions
I'm thinking though that this might be a bit complicated for testing, and it is ok to not include if it is too much trouble. I think the main thing I wanted to confirm is that if we attempted to send a record/offset, we can no longer do an empty commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the main thing I wanted to confirm is that if we attempted to send a record/offset, we can no longer do an empty commit.
Sorry for the delay, I was a little bit confused about this comment, what does an empty commit means? Does testTransactionWithAndWithoutSend
in ProducerIntegrationTest.scala cover that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or did you mean something like
producer.initTransactions()
producer.beginTransaction()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes)).get()
producer.beginTransaction() // <-- fail TransactionalId foobar: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
producer.commitTransaction()
in this case, it would fail in client side actually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for not being clear. I meant something similar to the above, but we wouldn't call begin transaction again. Something like
producer.initTransactions()
producer.beginTransaction()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes)).get()
// we would create a case where this fails and we wouldn't actually write a record. I think it would take us to an abortable state
producer.commitTransaction() // fails
Basically, just wanted to confirm that once we hit "true" for transaction started, we don't skip the EndTxn in the new code that has been added. I guess maybe this is a very specific scenario with the send failing and it is sufficient to just have the test where once we send/add offsets, we confirm the EndTxn sent. (No need for the error.) If we do that, I think your current test may be sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation, I think I need to figure out a scenario that producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes)).get()
failed to send record. Will update the test ASAP!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's looking pretty good @brandboat. I just have one more question about the testing we are doing. |
@brandboat Any updates here? Did we want to add/change any tests? This was the only thing I was thinking might be remaining
|
val commitError = assertThrows(classOf[KafkaException], () => producer.commitTransaction()) // fail due to last send failed | ||
assertInstanceOf(classOf[RecordTooLargeException], commitError.getCause) | ||
|
||
if (txnVersion == 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! happy to see this worked!
small nit -- is there a reason to run this test on the other versions? I think we should either have cases to test for versions 0 and 1 or not run those for this one. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. I see your comment now. I will look at the revised version when it is ready, but I think the nit still stands.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nit -- is there a reason to run this test on the other versions? I think we should either have cases to test for versions 0 and 1 or not run those for this one. WDYT?
Yeah.. though I think the test case perhaps is wrong. But I observed that the abortTransaction in this test case only works under TV_2 instead of TV_0 and TV_1. I haven't look into it yet, not sure why...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, in the beginning I thought RecordTooLargeException
will put transaction in FATAL_ERROR
state in TransactionManager, but the state is ABORTABLE_ERROR
actually, so the test seems in the right direction.
I'll deep dive and check why abortTransaction will hang under TV_0 & TV_1 in the test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jolshan , I just update the test, now the test seems ok.
small nit -- is there a reason to run this test on the other versions? I think we should either have cases to test for versions 0 and 1 or not run those for this one. WDYT?
After I increment max.message.bytes to 100, abortTransaction not hanging anymore with TV_0 and TV_1, but I didn't find out the root cause of why doing this could solve the hanging issue...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what the ticket would be. I will continue to think about this, but I don't think it needs to block this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recreated the failure locally and it seems like for TV0 and TV1 we are unable to join the ioThread when trying to close the producer. With TV2, the thread seems to join fine. Were you observing this as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply, the comment #18448 (comment) was incorrect—apologies for the misleading input. The behavior I was referring to, where the response (EndTxnResponse) is called first, followed by WriteTxnMarkerRequest, is actually the same for TV0 and TV1.
I recreated the failure locally and it seems like for TV0 and TV1 we are unable to join the ioThread when trying to close the producer. With TV2, the thread seems to join fine. Were you observing this as well?
What I observed in TV0 and TV1 is that the producer continuously sends InitProducerId requests[0][1] because it encounters Errors.CONCURRENT_TRANSACTIONS. Since this is a retriable error, the producer enters a retry loop, repeatedly sending InitProducerId requests to the broker. But the last transaction remains incomplete, the entire process gets stuck in an endless retry loop, and causing the test time out.
The CONCURRENT_TRANSACTIONS error occurs because WriteTxnMarker fails due to a RecordTooLargeException, preventing the transaction state from correctly transitioning to the CompleteAbort state.
Although TV2 passes in this scenario, we can still reproduce the CONCURRENT_TRANSACTIONS error if we add the following lines after abortTransaction(). This happens because WriteTxnMarker failed in the previous abort attempt:
producer.beginTransaction()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", "key".getBytes, "value".getBytes))
producer.commitTransaction()
I'm not sure if this is something we need to aware of...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me ponder this a bit. Thanks @brandboat for the deeper investigation. Is this because the TV0, TV1, send intProducerId when it needs to bump the epoch? This is a difference between TV0/1 and TV2.
If this is correct, the issue is that any request after EndTxn will see concurrent transactions, which makes sense and is expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is correct, the issue is that any request after EndTxn will see concurrent transactions, which makes sense and is expected.
Yep, that's the root cause, glad to hear that this is not an issue. Thanks for the explanation. 😃
this check makes test flaky, and abortTransaction will send EndTxnRequest anyway, this check seems redundant here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your diligence with the tests @brandboat
Thanks for your patience, I learned a lot! |
…n without records (#18448) Fix the issue where producer.commitTransaction under transaction version 2 throws error if no partition or offset is added to transaction. The solution is to avoid sending the endTxnRequest unless producer.send or producer.sendOffsetsToTransaction is triggered. Reviewers: Justine Olshan <[email protected]>
…n without records (apache#18448) Fix the issue where producer.commitTransaction under transaction version 2 throws error if no partition or offset is added to transaction. The solution is to avoid sending the endTxnRequest unless producer.send or producer.sendOffsetsToTransaction is triggered. Reviewers: Justine Olshan <[email protected]>
…n without records (apache#18448) Fix the issue where producer.commitTransaction under transaction version 2 throws error if no partition or offset is added to transaction. The solution is to avoid sending the endTxnRequest unless producer.send or producer.sendOffsetsToTransaction is triggered. Reviewers: Justine Olshan <[email protected]>
related to KAFKA-18401,
Fix the issue where producer.commitTransaction under transaction version 2 throws error if no partition or offset is added to transaction. The solution is to avoid sending the endTxnRequest unless producer.send or producer.sendOffsetsToTransaction is triggered.
Committer Checklist (excluded from commit message)