diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java index 9bf88d49c1..a5a1c96ffb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerRequestCompletionHandler.java @@ -165,7 +165,6 @@ private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions( case UNKNOWN_TOPIC_OR_PARTITION: // this error was introduced in newer kafka client version, // recover this condition after bump the kafka client version - //case NOT_LEADER_OR_FOLLOWER: case NOT_ENOUGH_REPLICAS: case NOT_ENOUGH_REPLICAS_AFTER_APPEND: case REQUEST_TIMED_OUT: @@ -178,6 +177,7 @@ private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions( abortSendingAndRetryPartitions.retryPartitions.add(topicPartition); break; case LEADER_NOT_AVAILABLE: + case BROKER_NOT_AVAILABLE: case NOT_LEADER_OR_FOLLOWER: log.info("Sending {}'s transaction marker for partition {} has failed with error {}, " + "retrying with current coordinator epoch {} and invalidating cache", diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java index 881d40c4a2..2855f88429 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java @@ -406,11 +406,13 @@ private Errors statusCheck(String transactionalId, // note that for timed out request we return NOT_AVAILABLE error code to let client retry return Errors.COORDINATOR_NOT_AVAILABLE; case KAFKA_STORAGE_ERROR: -// case Errors.NOT_LEADER_OR_FOLLOWER: + case NOT_LEADER_OR_FOLLOWER: return Errors.NOT_COORDINATOR; case MESSAGE_TOO_LARGE: case RECORD_LIST_TOO_LARGE: default: + log.error("Unhandled error code {} for transactionalId {}, return UNKNOWN_SERVER_ERROR", + status.error, transactionalId); return Errors.UNKNOWN_SERVER_ERROR; } } @@ -464,7 +466,8 @@ transactionalId, coordinatorEpoch, newMetadata, partitionFor(transactionalId), metadata.completeTransitionTo(newMetadata); return errors; } catch (IllegalStateException ex) { - log.error("Failed to complete transition.", ex); + log.error("Failed to complete transition for {}. Return UNKNOWN_SERVER_ERROR", + transactionalId, ex); return Errors.UNKNOWN_SERVER_ERROR; } }