Skip to content

Commit

Permalink
[fix][transaction] Use pulsar format when write marker to __consumer_…
Browse files Browse the repository at this point in the history
…offsets topic (streamnative#1994)

### Motivation

We should use pulsar format to encode the marker when writing into the
`__consumer_offsets` topic, otherwise it will have the following errors:
```
2023-08-09T09:17:18,398+0000 [pulsar-client-io-69-3] ERROR org.apache.pulsar.client.impl.ConsumerImpl - [public/__kafka/__consumer_offsets-partition-42][reader-ca0ccccd79] Discarding corrupted message at 6:5
2023-08-09T09:17:18,402+0000 [pulsar-client-io-69-3] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [reader-ca0ccccd79] [98942] unable to obtain message in batch
java.lang.IllegalStateException: java.lang.IllegalStateException: Some required fields are missing
```

### Modifications

Use pulsar format when write marker to __consumer_offsets topic

(cherry picked from commit 5193592)
  • Loading branch information
Demogorgon314 committed Aug 15, 2023
1 parent 34599a9 commit 0f4a99e
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*/
public class EntryFormatterFactory {

enum EntryFormat {
public enum EntryFormat {
PULSAR,
KAFKA,
MIXED_KAFKA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class AnalyzeResult {
public class PartitionLog {

public static final String KAFKA_TOPIC_UUID_PROPERTY_NAME = "kafkaTopicUUID";
public static final String KAFKA_ENTRY_FORMATTER_PROPERTY_NAME = "kafkaEntryFormat";
private static final String PID_PREFIX = "KOP-PID-PREFIX";

private static final KopLogValidator.CompressionCodec DEFAULT_COMPRESSION =
Expand Down Expand Up @@ -257,7 +258,8 @@ private CompletableFuture<Map<String, String>> fetchTopicProperties(Optional<Per
private EntryFormatter buildEntryFormatter(Map<String, String> topicProperties) {
final String entryFormat;
if (topicProperties != null) {
entryFormat = topicProperties.getOrDefault("kafkaEntryFormat", kafkaConfig.getEntryFormat());
entryFormat = topicProperties
.getOrDefault(KAFKA_ENTRY_FORMATTER_PROPERTY_NAME, kafkaConfig.getEntryFormat());
} else {
entryFormat = kafkaConfig.getEntryFormat();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.internals.Topic;
Expand Down Expand Up @@ -332,10 +335,12 @@ private static void createTopicIfNotExist(final KafkaServiceConfiguration conf,
final String topic,
final int numPartitions,
final boolean partitioned) throws PulsarAdminException {
Map<String, String> properties = Map.of(
PartitionLog.KAFKA_ENTRY_FORMATTER_PROPERTY_NAME, EntryFormatterFactory.EntryFormat.PULSAR.name());
if (partitioned) {
log.info("Creating partitioned topic {} (with {} partitions) if it does not exist", topic, numPartitions);
try {
admin.topics().createPartitionedTopic(topic, numPartitions);
admin.topics().createPartitionedTopic(topic, numPartitions, properties);
} catch (PulsarAdminException.ConflictException e) {
log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage());
}
Expand All @@ -347,7 +352,7 @@ private static void createTopicIfNotExist(final KafkaServiceConfiguration conf,
} else {
log.info("Creating non-partitioned topic {}-{} if it does not exist", topic, numPartitions);
try {
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(topic, properties);
} catch (PulsarAdminException.ConflictException e) {
log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ public void testCreateKafkaMetadataIfMissing() throws Exception {
verify(mockNamespaces, times(1)).setNamespaceMessageTTL(eq(conf.getKafkaMetadataTenant() + "/"
+ conf.getKafkaMetadataNamespace()), any(Integer.class));
verify(mockTopics, times(1)).createPartitionedTopic(
eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions()));
eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions()), any());
verify(mockTopics, times(1)).createPartitionedTopic(
eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions()));
eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions()), any());
verify(mockTopics, times(1)).createPartitionedTopic(
eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions()));
eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions()), any());
// check user topics namespace doesn't set the policy
verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaTenant() + "/"
+ conf.getKafkaNamespace()), any(Set.class));
Expand Down

0 comments on commit 0f4a99e

Please sign in to comment.