Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[fix][transaction] Handle errors while writing transaction logs and m…
Browse files Browse the repository at this point in the history
…arkers (#1962)

### Modifications

Handle correctly NOT_LEADER_OR_FOLLOWER while writing TX markers.
Handle BROKER_NOT_AVAILABLE in
TransactionMarkerRequestCompletionHandler.

Co-authored-by: Enrico Olivelli <[email protected]>
  • Loading branch information
gaoran10 and eolivelli authored Jul 26, 2023
1 parent 1dda879 commit e931b6d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions(
case UNKNOWN_TOPIC_OR_PARTITION:
// this error was introduced in newer kafka client version,
// recover this condition after bump the kafka client version
//case NOT_LEADER_OR_FOLLOWER:
case NOT_ENOUGH_REPLICAS:
case NOT_ENOUGH_REPLICAS_AFTER_APPEND:
case REQUEST_TIMED_OUT:
Expand All @@ -178,6 +177,7 @@ private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions(
abortSendingAndRetryPartitions.retryPartitions.add(topicPartition);
break;
case LEADER_NOT_AVAILABLE:
case BROKER_NOT_AVAILABLE:
case NOT_LEADER_OR_FOLLOWER:
log.info("Sending {}'s transaction marker for partition {} has failed with error {}, "
+ "retrying with current coordinator epoch {} and invalidating cache",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,13 @@ private Errors statusCheck(String transactionalId,
// note that for timed out request we return NOT_AVAILABLE error code to let client retry
return Errors.COORDINATOR_NOT_AVAILABLE;
case KAFKA_STORAGE_ERROR:
// case Errors.NOT_LEADER_OR_FOLLOWER:
case NOT_LEADER_OR_FOLLOWER:
return Errors.NOT_COORDINATOR;
case MESSAGE_TOO_LARGE:
case RECORD_LIST_TOO_LARGE:
default:
log.error("Unhandled error code {} for transactionalId {}, return UNKNOWN_SERVER_ERROR",
status.error, transactionalId);
return Errors.UNKNOWN_SERVER_ERROR;
}
}
Expand Down Expand Up @@ -464,7 +466,8 @@ transactionalId, coordinatorEpoch, newMetadata, partitionFor(transactionalId),
metadata.completeTransitionTo(newMetadata);
return errors;
} catch (IllegalStateException ex) {
log.error("Failed to complete transition.", ex);
log.error("Failed to complete transition for {}. Return UNKNOWN_SERVER_ERROR",
transactionalId, ex);
return Errors.UNKNOWN_SERVER_ERROR;
}
}
Expand Down

0 comments on commit e931b6d

Please sign in to comment.