Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[fix][transaction] Use pulsar format when write marker to __consumer_offsets topic #1994

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*/
public class EntryFormatterFactory {

enum EntryFormat {
public enum EntryFormat {
PULSAR,
KAFKA,
MIXED_KAFKA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class AnalyzeResult {
public class PartitionLog {

public static final String KAFKA_TOPIC_UUID_PROPERTY_NAME = "kafkaTopicUUID";
public static final String KAFKA_ENTRY_FORMATTER_PROPERTY_NAME = "kafkaEntryFormat";
private static final String PID_PREFIX = "KOP-PID-PREFIX";

private static final KopLogValidator.CompressionCodec DEFAULT_COMPRESSION =
Expand Down Expand Up @@ -252,7 +253,8 @@ private CompletableFuture<Map<String, String>> fetchTopicProperties(Optional<Per
private EntryFormatter buildEntryFormatter(Map<String, String> topicProperties) {
final String entryFormat;
if (topicProperties != null) {
entryFormat = topicProperties.getOrDefault("kafkaEntryFormat", kafkaConfig.getEntryFormat());
entryFormat = topicProperties
.getOrDefault(KAFKA_ENTRY_FORMATTER_PROPERTY_NAME, kafkaConfig.getEntryFormat());
} else {
entryFormat = kafkaConfig.getEntryFormat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.internals.Topic;
Expand Down Expand Up @@ -332,10 +335,12 @@ private static void createTopicIfNotExist(final KafkaServiceConfiguration conf,
final String topic,
final int numPartitions,
final boolean partitioned) throws PulsarAdminException {
Map<String, String> properties = Map.of(
PartitionLog.KAFKA_ENTRY_FORMATTER_PROPERTY_NAME, EntryFormatterFactory.EntryFormat.PULSAR.name());
if (partitioned) {
log.info("Creating partitioned topic {} (with {} partitions) if it does not exist", topic, numPartitions);
try {
admin.topics().createPartitionedTopic(topic, numPartitions);
admin.topics().createPartitionedTopic(topic, numPartitions, properties);
} catch (PulsarAdminException.ConflictException e) {
log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage());
}
Expand All @@ -347,7 +352,7 @@ private static void createTopicIfNotExist(final KafkaServiceConfiguration conf,
} else {
log.info("Creating non-partitioned topic {}-{} if it does not exist", topic, numPartitions);
try {
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(topic, properties);
} catch (PulsarAdminException.ConflictException e) {
log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
verify(mockNamespaces, times(1)).setNamespaceMessageTTL(eq(conf.getKafkaMetadataTenant() + "/"
+ conf.getKafkaMetadataNamespace()), any(Integer.class));
verify(mockTopics, times(1)).createPartitionedTopic(
eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions()));
eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions()), any());
verify(mockTopics, times(1)).createPartitionedTopic(
eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions()));
eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions()), any());
verify(mockTopics, times(1)).createPartitionedTopic(
eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions()));
eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions()), any());
// check user topics namespace doesn't set the policy
verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaTenant() + "/"
+ conf.getKafkaNamespace()), any(Set.class));
Expand Down
Loading