Skip to content

Commit

Permalink
KAFKA-18295: Remove deprecated function Partitioner#onNewBatch
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Dec 22, 2024
1 parent b4be178 commit 40aa21c
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -946,15 +946,6 @@ private void throwIfProducerClosed() {
throw new IllegalStateException("Cannot perform operation after producer has been closed");
}

/**
* Call deprecated {@link Partitioner#onNewBatch}
*/
@SuppressWarnings("deprecation")
private void onNewBatch(String topic, Cluster cluster, int prevPartition) {
assert partitioner != null;
partitioner.onNewBatch(topic, cluster, prevPartition);
}

/**
* Implementation of asynchronously send a record to a topic.
*/
Expand Down Expand Up @@ -1020,9 +1011,6 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call

if (result.abortForNewBatch) {
int prevPartition = partition;
// IMPORTANT NOTE: the following onNewBatch and partition calls should not interrupted to allow
// the custom partitioner to correctly track its state
onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,4 @@ public interface Partitioner extends Configurable, Closeable {
* This is called when partitioner is closed.
*/
void close();

/**
* Note this method is only implemented in DefaultPartitioner and UniformStickyPartitioner which
* are now deprecated. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner">KIP-794</a> for more info.
* <p>
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
* this method can change the chosen sticky partition for the new batch.
* <p>
* After onNewBatch, the {@link #partition(String, Object, byte[], Object, byte[], Cluster)} method is called again
* which allows the implementation to "redirect" the message on new batch creation.
* @param topic The topic name
* @param cluster The current cluster metadata
* @param prevPartition The partition previously selected for the record that triggered a new batch
* @deprecated Since 3.3.0
*/
@Deprecated
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,5 @@ private int nextValue(String topic) {
return counter.getAndIncrement();
}

@SuppressWarnings("deprecation")
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
previousPartition.set(new TopicPartition(topic, prevPartition));
}

public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2343,7 +2343,6 @@ public void testPartitionAddedToTransactionAfterFullBatchRetry() throws Exceptio
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
assertEquals(future, producer.send(record));
assertFalse(future.isDone());
verify(ctx.partitioner).onNewBatch(topic, cluster, 0);
verify(ctx.transactionManager, never()).maybeAddPartition(topicPartition0);
verify(ctx.transactionManager).maybeAddPartition(topicPartition1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,77 +96,4 @@ public void testRoundRobinWithKeyBytes() {
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
}

@SuppressWarnings("deprecation")
@Test
public void testRoundRobinWithNullKeyBytes() {
final String topicA = "topicA";
final String topicB = "topicB";

List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());

final Map<Integer, Integer> partitionCount = new HashMap<>();

Partitioner partitioner = new RoundRobinPartitioner();
for (int i = 0; i < 30; ++i) {
int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
// Simulate single-message batches
partitioner.onNewBatch(topicA, testCluster, partition);
int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster);
assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection");
Integer count = partitionCount.get(partition);
if (null == count)
count = 0;
partitionCount.put(partition, count + 1);

if (i % 5 == 0) {
partitioner.partition(topicB, null, null, null, null, testCluster);
}
}

assertEquals(10, partitionCount.get(0).intValue());
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
}

@SuppressWarnings("deprecation")
@Test
public void testRoundRobinWithNullKeyBytesAndEvenPartitionCount() {
final String topicA = "topicA";
final String topicB = "topicB";

List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES), new PartitionInfo(topicA, 3, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());

final Map<Integer, Integer> partitionCount = new HashMap<>();

Partitioner partitioner = new RoundRobinPartitioner();
for (int i = 0; i < 40; ++i) {
int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
// Simulate single-message batches
partitioner.onNewBatch(topicA, testCluster, partition);
int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster);
assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection");
Integer count = partitionCount.get(partition);
if (null == count)
count = 0;
partitionCount.put(partition, count + 1);

if (i % 5 == 0) {
partitioner.partition(topicB, null, null, null, null, testCluster);
}
}

assertEquals(10, partitionCount.get(0).intValue());
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
assertEquals(10, partitionCount.get(3).intValue());
}
}
2 changes: 2 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4
</li>
<li>The <code>log.message.format.version</code> and <code>message.format.version</code> configs were removed.
</li>
<li>The function <code>onNewBatch</code> in <code>org.apache.kafka.clients.producer.Partitioner</code> class was removed.
</li>
</ul>
</li>
<li><b>Broker</b>
Expand Down

0 comments on commit 40aa21c

Please sign in to comment.