Skip to content

Commit

Permalink
KAFKA-18399 Remove ZooKeeper from KafkaApis (8/N): ELECT_LEADERS , AL…
Browse files Browse the repository at this point in the history
…TER_PARTITION, UPDATE_FEATURES (#18453)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
TaiJuWu authored and chia7712 committed Jan 14, 2025
1 parent 4be1376 commit c7654f7
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 145 deletions.
118 changes: 0 additions & 118 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult}
import org.apache.kafka.common.message.ElectLeadersResponseData.{PartitionResult, ReplicaElectionResult}
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
Expand Down Expand Up @@ -217,7 +216,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.ALTER_CLIENT_QUOTAS => forwardToController(request)
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => forwardToController(request)
case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
case ApiKeys.UPDATE_FEATURES => forwardToController(request)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
Expand Down Expand Up @@ -2399,77 +2397,6 @@ class KafkaApis(val requestChannel: RequestChannel,
true
}

def handleElectLeaders(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val electionRequest = request.body[ElectLeadersRequest]

def sendResponseCallback(
error: ApiError
)(
results: Map[TopicPartition, ApiError]
): Unit = {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
val adjustedResults = if (electionRequest.data.topicPartitions == null) {
/* When performing elections across all of the partitions we should only return
* partitions for which there was an election or resulted in an error. In other
* words, partitions that didn't need election because they ready have the correct
* leader are not returned to the client.
*/
results.filter { case (_, error) =>
error.error != Errors.ELECTION_NOT_NEEDED
}
} else results

val electionResults = new util.ArrayList[ReplicaElectionResult]()
adjustedResults
.groupBy { case (tp, _) => tp.topic }
.foreachEntry { (topic, ps) =>
val electionResult = new ReplicaElectionResult()

electionResult.setTopic(topic)
ps.foreachEntry { (topicPartition, error) =>
val partitionResult = new PartitionResult()
partitionResult.setPartitionId(topicPartition.partition)
partitionResult.setErrorCode(error.error.code)
partitionResult.setErrorMessage(error.message)
electionResult.partitionResult.add(partitionResult)
}

electionResults.add(electionResult)
}

new ElectLeadersResponse(
requestThrottleMs,
error.error.code,
electionResults,
electionRequest.version
)
})
}

if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)
val partitionErrors: Map[TopicPartition, ApiError] =
electionRequest.topicPartitions.asScala.iterator.map(partition => partition -> error).toMap

sendResponseCallback(error)(partitionErrors)
} else {
val partitions = if (electionRequest.data.topicPartitions == null) {
metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions)
} else {
electionRequest.topicPartitions.asScala
}

replicaManager.electLeaders(
zkSupport.controller,
partitions,
electionRequest.electionType,
sendResponseCallback(ApiError.NONE),
electionRequest.data.timeoutMs
)
}
}

def handleOffsetDeleteRequest(
request: RequestChannel.Request,
requestLocal: RequestLocal
Expand Down Expand Up @@ -2628,51 +2555,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
val alterPartitionRequest = request.body[AlterPartitionRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)

if (!zkSupport.controller.isActive)
requestHelper.sendResponseExemptThrottle(request, alterPartitionRequest.getErrorResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.NOT_CONTROLLER.exception))
else
zkSupport.controller.alterPartitions(alterPartitionRequest.data, request.context.apiVersion, alterPartitionResp =>
requestHelper.sendResponseExemptThrottle(request, new AlterPartitionResponse(alterPartitionResp)))
}

def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val updateFeaturesRequest = request.body[UpdateFeaturesRequest]

def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): Unit = {
def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
errors match {
case Left(topLevelError) =>
UpdateFeaturesResponse.createWithErrors(
topLevelError,
Collections.emptySet(),
throttleTimeMs)
case Right(featureUpdateErrors) =>
// This response is not correct, but since this is ZK specific code it will be removed in 4.0
UpdateFeaturesResponse.createWithErrors(
ApiError.NONE,
featureUpdateErrors.asJava.keySet(),
throttleTimeMs)
}
}
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createResponse(requestThrottleMs))
}

if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
sendResponseCallback(Left(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED)))
} else if (!zkSupport.controller.isActive) {
sendResponseCallback(Left(new ApiError(Errors.NOT_CONTROLLER)))
} else {
zkSupport.controller.updateFeatures(updateFeaturesRequest, sendResponseCallback)
}
}

def handleDescribeCluster(request: RequestChannel.Request): Unit = {
val response = authHelper.computeDescribeClusterResponse(
request,
Expand Down
27 changes: 0 additions & 27 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9932,25 +9932,12 @@ class KafkaApisTest extends Logging {
request
}

private def verifyShouldNeverHandleErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, e.getMessage)
}

private def verifyShouldAlwaysForwardErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage)
}

@Test
def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
}

@Test
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
Expand Down Expand Up @@ -10048,20 +10035,6 @@ class KafkaApisTest extends Logging {
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
}

@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleUpdateFeatures)
}

@Test
def testRaftShouldAlwaysForwardElectLeaders(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders)
}

@Test
def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")
Expand Down

0 comments on commit c7654f7

Please sign in to comment.