diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java index 4386f5c537..3a2ba21244 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java @@ -107,10 +107,12 @@ public boolean tryComplete() { return true; } for (Map.Entry entry : readRecordsResult.entrySet()) { - TopicPartition tp = entry.getKey(); PartitionLog.ReadRecordsResult result = entry.getValue(); - PartitionLog partitionLog = replicaManager.getPartitionLog(tp, context.getNamespacePrefix()); - PositionImpl currLastPosition = (PositionImpl) partitionLog.getLastPosition(context.getTopicManager()); + PartitionLog partitionLog = result.partitionLog(); + if (partitionLog == null) { + return true; + } + PositionImpl currLastPosition = (PositionImpl) partitionLog.getLastPosition(); if (currLastPosition.compareTo(PositionImpl.EARLIEST) == 0) { HAS_ERROR_UPDATER.set(this, true); return forceComplete(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 65d3876317..638406d536 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -30,6 +30,8 @@ import io.streamnative.pulsar.handlers.kop.schemaregistry.SchemaRegistryChannelInitializer; import io.streamnative.pulsar.handlers.kop.stats.PrometheusMetricsProvider; import io.streamnative.pulsar.handlers.kop.stats.StatsLogger; +import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer; +import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.storage.ReplicaManager; import io.streamnative.pulsar.handlers.kop.utils.ConfigurationUtils; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; @@ -47,8 +49,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; @@ -87,6 +91,8 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag private DelayedOperationPurgatory producePurgatory; private DelayedOperationPurgatory fetchPurgatory; private LookupClient lookupClient; + + private KafkaTopicLookupService kafkaTopicLookupService; @VisibleForTesting @Getter private Map> channelInitializerMap; @@ -113,6 +119,9 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag private final Map groupCoordinatorsByTenant = new ConcurrentHashMap<>(); private final Map transactionCoordinatorByTenant = new ConcurrentHashMap<>(); + + private OrderedExecutor recoveryExecutor; + @Override public GroupCoordinator getGroupCoordinator(String tenant) { return groupCoordinatorsByTenant.computeIfAbsent(tenant, this::createAndBootGroupCoordinator); @@ -256,6 +265,12 @@ private void invalidatePartitionLog(TopicName topicName) { }); namespaceService.addNamespaceBundleOwnershipListener(bundleListener); + recoveryExecutor = OrderedExecutor + .newBuilder() + .name("kafka-tx-recovery") + .numThreads(kafkaConfig.getKafkaTransactionRecoveryNumThreads()) + .build(); + if (kafkaConfig.isKafkaManageSystemNamespaces()) { // initialize default Group Coordinator getGroupCoordinator(kafkaConfig.getKafkaMetadataTenant()); @@ -411,6 +426,20 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi lookupClient); } + class ProducerStateManagerSnapshotProvider implements Function { + @Override + public ProducerStateManagerSnapshotBuffer apply(String tenant) { + if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { + return new MemoryProducerStateManagerSnapshotBuffer(); + } + return getTransactionCoordinator(tenant) + .getProducerStateManagerSnapshotBuffer(); + } + } + + private Function getProducerStateManagerSnapshotBufferByTenant = + new ProducerStateManagerSnapshotProvider(); + // this is called after initialize, and with kafkaConfig, brokerService all set. @Override public Map> newChannelInitializers() { @@ -426,13 +455,19 @@ public Map> newChannelIniti .timeoutTimer(SystemTimer.builder().executorName("fetch").build()) .build(); + kafkaTopicLookupService = new KafkaTopicLookupService(brokerService); + replicaManager = new ReplicaManager( kafkaConfig, requestStats, Time.SYSTEM, brokerService.getEntryFilters(), producePurgatory, - fetchPurgatory); + fetchPurgatory, + kafkaTopicLookupService, + getProducerStateManagerSnapshotBufferByTenant, + recoveryExecutor + ); try { ImmutableMap.Builder> builder = @@ -480,6 +515,17 @@ public void close() { statsProvider.stop(); sendResponseScheduler.shutdown(); + if (offsetTopicClient != null) { + offsetTopicClient.close(); + } + if (txnTopicClient != null) { + txnTopicClient.close(); + } + if (adminManager != null) { + adminManager.shutdown(); + } + recoveryExecutor.shutdown(); + List> closeHandles = new ArrayList<>(); if (offsetTopicClient != null) { closeHandles.add(offsetTopicClient.closeAsync()); @@ -571,6 +617,8 @@ public TransactionCoordinator initTransactionCoordinator(String tenant, PulsarAd .transactionLogNumPartitions(kafkaConfig.getKafkaTxnLogTopicNumPartitions()) .transactionMetadataTopicName(MetadataUtils.constructTxnLogTopicBaseName(tenant, kafkaConfig)) .transactionProducerIdTopicName(MetadataUtils.constructTxnProducerIdTopicBaseName(tenant, kafkaConfig)) + .transactionProducerStateSnapshotTopicName(MetadataUtils.constructTxProducerStateTopicBaseName(tenant, + kafkaConfig)) .abortTimedOutTransactionsIntervalMs(kafkaConfig.getKafkaTxnAbortTimedOutTransactionCleanupIntervalMs()) .transactionalIdExpirationMs(kafkaConfig.getKafkaTransactionalIdExpirationMs()) .removeExpiredTransactionalIdsIntervalMs( diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index f4de60ce0a..c167c21cae 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -1754,7 +1754,6 @@ protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup, joinGroupResult.getLeaderId(), members ); - if (log.isTraceEnabled()) { log.trace("Sending join group response {} for correlation id {} to client {}.", response, joinGroup.getHeader().correlationId(), joinGroup.getHeader().clientId()); @@ -2151,6 +2150,10 @@ protected void handleInitProducerId(KafkaHeaderAndRequest kafkaHeaderAndRequest, .setErrorCode(resp.getError().code()) .setProducerId(resp.getProducerId()) .setProducerEpoch(resp.getProducerEpoch()); + if (resp.getError() == Errors.COORDINATOR_LOAD_IN_PROGRESS + || resp.getError() == Errors.CONCURRENT_TRANSACTIONS) { + responseData.setThrottleTimeMs(1000); + } response.complete(new InitProducerIdResponse(responseData)); }); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 5e9dbfee21..2ff8e06b93 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -55,6 +55,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { private static final int OffsetsMessageTTL = 3 * 24 * 3600; // txn configuration public static final int DefaultTxnLogTopicNumPartitions = 50; + public static final int DefaultTxnProducerStateLogTopicNumPartitions = 8; public static final int DefaultTxnCoordinatorSchedulerNum = 1; public static final int DefaultTxnStateManagerSchedulerNum = 1; public static final long DefaultAbortTimedOutTransactionsIntervalMs = TimeUnit.SECONDS.toMillis(10); @@ -425,6 +426,24 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private int kafkaTxnLogTopicNumPartitions = DefaultTxnLogTopicNumPartitions; + @FieldContext( + category = CATEGORY_KOP_TRANSACTION, + doc = "Number of partitions for the transaction producer state topic" + ) + private int kafkaTxnProducerStateTopicNumPartitions = DefaultTxnProducerStateLogTopicNumPartitions; + + @FieldContext( + category = CATEGORY_KOP_TRANSACTION, + doc = "Interval for taking snapshots of the status of pending transactions" + ) + private int kafkaTxnProducerStateTopicSnapshotIntervalSeconds = 300; + + @FieldContext( + category = CATEGORY_KOP_TRANSACTION, + doc = "Number of threads dedicated to transaction recovery" + ) + private int kafkaTransactionRecoveryNumThreads = 8; + @FieldContext( category = CATEGORY_KOP_TRANSACTION, doc = "The interval in milliseconds at which to rollback transactions that have timed out." diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java index 843ed4915d..5a03921672 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManager.java @@ -48,7 +48,6 @@ public class KafkaTopicConsumerManager implements Closeable { private final PersistentTopic topic; - private final KafkaRequestHandler requestHandler; private final AtomicBoolean closed = new AtomicBoolean(false); @@ -67,13 +66,21 @@ public class KafkaTopicConsumerManager implements Closeable { private final boolean skipMessagesWithoutIndex; + private final String description; + KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) { + this(requestHandler.ctx.channel() + "", + requestHandler.isSkipMessagesWithoutIndex(), + topic); + } + + public KafkaTopicConsumerManager(String description, boolean skipMessagesWithoutIndex, PersistentTopic topic) { this.topic = topic; this.cursors = new ConcurrentHashMap<>(); this.createdCursors = new ConcurrentHashMap<>(); this.lastAccessTimes = new ConcurrentHashMap<>(); - this.requestHandler = requestHandler; - this.skipMessagesWithoutIndex = requestHandler.isSkipMessagesWithoutIndex(); + this.description = description; + this.skipMessagesWithoutIndex = skipMessagesWithoutIndex; } // delete expired cursors, so backlog can be cleared. @@ -96,7 +103,7 @@ void deleteOneExpiredCursor(long offset) { if (cursorFuture != null) { if (log.isDebugEnabled()) { log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}", - requestHandler.ctx.channel(), offset, cursors.size()); + description, offset, cursors.size()); } // TODO: Should we just cancel this future? @@ -118,14 +125,19 @@ public void deleteOneCursorAsync(ManagedCursor cursor, String reason) { public void deleteCursorComplete(Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] Cursor {} for topic {} deleted successfully for reason: {}.", - requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason); + description, cursor.getName(), topic.getName(), reason); } } @Override public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.", - requestHandler.ctx.channel(), cursor.getName(), topic.getName(), reason, exception); + if (exception instanceof ManagedLedgerException.CursorNotFoundException) { + log.debug("[{}] Cursor already deleted {} for topic {} for reason: {} - {}.", + description, cursor.getName(), topic.getName(), reason, exception.toString()); + } else { + log.warn("[{}] Error deleting cursor {} for topic {} for reason: {}.", + description, cursor.getName(), topic.getName(), reason, exception); + } } }, null); createdCursors.remove(cursor.getName()); @@ -148,7 +160,7 @@ public CompletableFuture> removeCursorFuture(long offs if (log.isDebugEnabled()) { log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}", - requestHandler.ctx.channel(), offset, cursors.size()); + description, offset, cursors.size()); } return cursorFuture; } @@ -182,7 +194,7 @@ public void add(long offset, Pair pair) { if (log.isDebugEnabled()) { log.debug("[{}] Add cursor back {} for offset: {}", - requestHandler.ctx.channel(), pair.getLeft().getName(), offset); + description, pair.getLeft().getName(), offset); } } @@ -194,7 +206,7 @@ public void close() { } if (log.isDebugEnabled()) { log.debug("[{}] Close TCM for topic {}.", - requestHandler.ctx.channel(), topic.getName()); + description, topic.getName()); } final List>> cursorFuturesToClose = new ArrayList<>(); cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture)); @@ -224,7 +236,7 @@ private CompletableFuture> asyncGetCursorByOffset(long if (((ManagedLedgerImpl) ledger).getState() == ManagedLedgerImpl.State.Closed) { log.error("[{}] Async get cursor for offset {} for topic {} failed, " + "because current managedLedger has been closed", - requestHandler.ctx.channel(), offset, topic.getName()); + description, offset, topic.getName()); CompletableFuture> future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Current managedLedger for " + topic.getName() + " has been closed.")); @@ -243,7 +255,7 @@ private CompletableFuture> asyncGetCursorByOffset(long final PositionImpl previous = ((ManagedLedgerImpl) ledger).getPreviousPosition((PositionImpl) position); if (log.isDebugEnabled()) { log.debug("[{}] Create cursor {} for offset: {}. position: {}, previousPosition: {}", - requestHandler.ctx.channel(), cursorName, offset, position, previous); + description, cursorName, offset, position, previous); } try { final ManagedCursor newCursor = ledger.newNonDurableCursor(previous, cursorName); @@ -252,7 +264,7 @@ private CompletableFuture> asyncGetCursorByOffset(long return Pair.of(newCursor, offset); } catch (ManagedLedgerException e) { log.error("[{}] Error new cursor for topic {} at offset {} - {}. will cause fetch data error.", - requestHandler.ctx.channel(), topic.getName(), offset, previous, e); + description, topic.getName(), offset, previous, e); return null; } }); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java index e155986654..afa0ac831a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicLookupService.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.kop; -import io.netty.channel.Channel; import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.NonNull; @@ -37,7 +36,7 @@ public class KafkaTopicLookupService { } // A wrapper of `BrokerService#getTopic` that is to find the topic's associated `PersistentTopic` instance - public CompletableFuture> getTopic(String topicName, Channel channel) { + public CompletableFuture> getTopic(String topicName, Object requestor) { CompletableFuture> topicCompletableFuture = new CompletableFuture<>(); brokerService.getTopicIfExists(topicName).whenComplete((t2, throwable) -> { TopicName topicNameObject = TopicName.get(topicName); @@ -47,7 +46,7 @@ public CompletableFuture> getTopic(String topicName, C if (topicNameObject.getPartitionIndex() == 0) { log.warn("Get partition-0 error [{}].", throwable.getMessage()); } else { - handleGetTopicException(topicName, topicCompletableFuture, throwable, channel); + handleGetTopicException(topicName, topicCompletableFuture, throwable, requestor); return; } } @@ -60,11 +59,11 @@ public CompletableFuture> getTopic(String topicName, C String nonPartitionedTopicName = topicNameObject.getPartitionedTopicName(); if (log.isDebugEnabled()) { log.debug("[{}]Try to get non-partitioned topic for name {}", - channel, nonPartitionedTopicName); + requestor, nonPartitionedTopicName); } brokerService.getTopicIfExists(nonPartitionedTopicName).whenComplete((nonPartitionedTopic, ex) -> { if (ex != null) { - handleGetTopicException(nonPartitionedTopicName, topicCompletableFuture, ex, channel); + handleGetTopicException(nonPartitionedTopicName, topicCompletableFuture, ex, requestor); // Failed to getTopic from current broker, remove non-partitioned topic cache, // which added in getTopicBroker. KopBrokerLookupManager.removeTopicManagerCache(nonPartitionedTopicName); @@ -75,14 +74,14 @@ public CompletableFuture> getTopic(String topicName, C topicCompletableFuture.complete(Optional.of(persistentTopic)); } else { log.error("[{}]Get empty non-partitioned topic for name {}", - channel, nonPartitionedTopicName); + requestor, nonPartitionedTopicName); KopBrokerLookupManager.removeTopicManagerCache(nonPartitionedTopicName); topicCompletableFuture.complete(Optional.empty()); } }); return; } - log.error("[{}]Get empty topic for name {}", channel, topicName); + log.error("[{}]Get empty topic for name {}", requestor, topicName); KopBrokerLookupManager.removeTopicManagerCache(topicName); topicCompletableFuture.complete(Optional.empty()); }); @@ -93,15 +92,15 @@ private void handleGetTopicException(@NonNull final String topicName, @NonNull final CompletableFuture> topicCompletableFuture, @NonNull final Throwable ex, - @NonNull final Channel channel) { + @NonNull final Object requestor) { // The ServiceUnitNotReadyException is retryable, so we should print a warning log instead of error log if (ex instanceof BrokerServiceException.ServiceUnitNotReadyException) { log.warn("[{}] Failed to getTopic {}: {}", - channel, topicName, ex.getMessage()); + requestor, topicName, ex.getMessage()); topicCompletableFuture.complete(Optional.empty()); } else { log.error("[{}] Failed to getTopic {}. exception:", - channel, topicName, ex); + requestor, topicName, ex); topicCompletableFuture.completeExceptionally(ex); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingTopicFutures.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingTopicFutures.java index 5a213b825d..84cecac37e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingTopicFutures.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingTopicFutures.java @@ -14,7 +14,7 @@ package io.streamnative.pulsar.handlers.kop; import com.google.common.annotations.VisibleForTesting; -import java.util.Optional; +import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -22,7 +22,6 @@ import lombok.Getter; import lombok.NonNull; import org.apache.bookkeeper.common.util.MathUtils; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; /** * Pending futures of PersistentTopic. @@ -56,8 +55,8 @@ private synchronized void decrementCount() { count--; } - public synchronized void addListener(CompletableFuture> topicFuture, - @NonNull Consumer> persistentTopicConsumer, + public synchronized void addListener(CompletableFuture topicFuture, + @NonNull Consumer persistentTopicConsumer, @NonNull Consumer exceptionConsumer) { if (count == 0) { count = 1; @@ -109,19 +108,19 @@ public synchronized int size() { class TopicThrowablePair { @Getter - private final Optional persistentTopicOpt; + private final PartitionLog persistentTopicOpt; @Getter private final Throwable throwable; - public static TopicThrowablePair withTopic(final Optional persistentTopicOpt) { + public static TopicThrowablePair withTopic(final PartitionLog persistentTopicOpt) { return new TopicThrowablePair(persistentTopicOpt, null); } public static TopicThrowablePair withThrowable(final Throwable throwable) { - return new TopicThrowablePair(Optional.empty(), throwable); + return new TopicThrowablePair(null, throwable); } - private TopicThrowablePair(final Optional persistentTopicOpt, final Throwable throwable) { + private TopicThrowablePair(final PartitionLog persistentTopicOpt, final Throwable throwable) { this.persistentTopicOpt = persistentTopicOpt; this.throwable = throwable; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index e5981de3ff..f1f5d90ac1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -78,7 +78,7 @@ public static GroupCoordinator of( Time time ) { ScheduledExecutorService coordinatorExecutor = OrderedScheduler.newSchedulerBuilder() - .name("group-coordinator-executor") + .name("group-coordinator-executor-" + tenant) .numThreads(1) .build(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java index 90e745539a..8ca883f8d5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionConfig.java @@ -26,6 +26,7 @@ public class TransactionConfig { public static final String DefaultTransactionMetadataTopicName = "public/default/__transaction_state"; + public static final String DefaultProducerStateSnapshotTopicName = "public/default/__transaction_producer_state"; public static final String DefaultProducerIdTopicName = "public/default/__transaction_producerid_generator"; public static final long DefaultTransactionsMaxTimeoutMs = TimeUnit.MINUTES.toMillis(15); public static final long DefaultTransactionalIdExpirationMs = TimeUnit.DAYS.toMillis(7); @@ -42,6 +43,8 @@ public class TransactionConfig { @Default private String transactionMetadataTopicName = DefaultTransactionMetadataTopicName; @Default + private String transactionProducerStateSnapshotTopicName = DefaultProducerStateSnapshotTopicName; + @Default private long transactionMaxTimeoutMs = DefaultTransactionsMaxTimeoutMs; @Default private long transactionalIdExpirationMs = DefaultTransactionalIdExpirationMs; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index d81d135265..1f67797511 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -28,6 +28,8 @@ import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMetadata.TxnTransitMetadata; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager.CoordinatorEpochAndTxnMetadata; import io.streamnative.pulsar.handlers.kop.scala.Either; +import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManagerSnapshotBuffer; +import io.streamnative.pulsar.handlers.kop.storage.PulsarTopicProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils; import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch; import java.util.Optional; @@ -38,6 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -69,6 +72,9 @@ public class TransactionCoordinator { private final TransactionStateManager txnManager; private final TransactionMarkerChannelManager transactionMarkerChannelManager; + @Getter + private ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer; + private final ScheduledExecutorService scheduler; private final Time time; @@ -106,7 +112,9 @@ protected TransactionCoordinator(TransactionConfig transactionConfig, TransactionStateManager txnManager, Time time, String namespacePrefixForMetadata, - String namespacePrefixForUserTopics) { + String namespacePrefixForUserTopics, + Function + producerStateManagerSnapshotBufferFactory) { this.namespacePrefixForMetadata = namespacePrefixForMetadata; this.namespacePrefixForUserTopics = namespacePrefixForUserTopics; this.transactionConfig = transactionConfig; @@ -115,6 +123,7 @@ protected TransactionCoordinator(TransactionConfig transactionConfig, this.transactionMarkerChannelManager = transactionMarkerChannelManager; this.scheduler = scheduler; this.time = time; + this.producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBufferFactory.apply(transactionConfig); } public static TransactionCoordinator of(String tenant, @@ -146,7 +155,10 @@ public static TransactionCoordinator of(String tenant, transactionStateManager, time, namespacePrefixForMetadata, - namespacePrefixForUserTopics); + namespacePrefixForUserTopics, + (config) -> new PulsarTopicProducerStateManagerSnapshotBuffer( + config.getTransactionProducerStateSnapshotTopicName(), txnTopicClient) + ); } /** @@ -909,6 +921,7 @@ public void shutdown() { producerIdManager.shutdown(); txnManager.shutdown(); transactionMarkerChannelManager.close(); + producerStateManagerSnapshotBuffer.shutdown(); scheduler.shutdown(); // TODO shutdown txn log.info("Shutdown transaction coordinator complete."); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java index 40fef2aa2f..66411ad23b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java @@ -104,7 +104,7 @@ public void enqueueWriteTxnMarkers(final List txnMarkerEntries, @Override public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception { - log.info("[TransactionMarkerChannelHandler] channelActive to {}", channelHandlerContext.channel()); + log.info("[TransactionMarkerChannelHandler] Connected to broker {}", channelHandlerContext.channel()); handleAuthentication(channelHandlerContext); super.channelActive(channelHandlerContext); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AbortedTxn.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AbortedTxn.java new file mode 100644 index 0000000000..fe1e58995d --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AbortedTxn.java @@ -0,0 +1,47 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.experimental.Accessors; + +/** + * AbortedTxn is used cache the aborted index. + */ +@Data +@Accessors(fluent = true) +@AllArgsConstructor +public final class AbortedTxn { + + private static final int VersionOffset = 0; + private static final int VersionSize = 2; + private static final int ProducerIdOffset = VersionOffset + VersionSize; + private static final int ProducerIdSize = 8; + private static final int FirstOffsetOffset = ProducerIdOffset + ProducerIdSize; + private static final int FirstOffsetSize = 8; + private static final int LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize; + private static final int LastOffsetSize = 8; + private static final int LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize; + private static final int LastStableOffsetSize = 8; + private static final int TotalSize = LastStableOffsetOffset + LastStableOffsetSize; + + private static final Short CurrentVersion = 0; + + private final long producerId; + private final long firstOffset; + private final long lastOffset; + private final long lastStableOffset; + +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/CompletedTxn.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/CompletedTxn.java new file mode 100644 index 0000000000..6a616da3f4 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/CompletedTxn.java @@ -0,0 +1,28 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.experimental.Accessors; + +@Data +@Accessors(fluent = true) +@AllArgsConstructor +public final class CompletedTxn { + private long producerId; + private long firstOffset; + private long lastOffset; + private boolean isAborted; +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/MemoryProducerStateManagerSnapshotBuffer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/MemoryProducerStateManagerSnapshotBuffer.java new file mode 100644 index 0000000000..7a893b5fa8 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/MemoryProducerStateManagerSnapshotBuffer.java @@ -0,0 +1,40 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +public class MemoryProducerStateManagerSnapshotBuffer implements ProducerStateManagerSnapshotBuffer { + private Map latestSnapshots = new ConcurrentHashMap<>(); + + @Override + public CompletableFuture write(ProducerStateManagerSnapshot snapshot) { + return CompletableFuture.runAsync(() -> { + latestSnapshots.compute(snapshot.getTopicPartition(), (tp, current) -> { + if (current == null || current.getOffset() <= snapshot.getOffset()) { + return snapshot; + } else { + return current; + } + }); + }); + } + + @Override + public CompletableFuture readLatestSnapshot(String topicPartition) { + return CompletableFuture.supplyAsync(() -> latestSnapshots.get(topicPartition)); + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index 9e8bba4a35..9d22c04a64 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -21,6 +21,7 @@ import io.netty.util.Recycler; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; import io.streamnative.pulsar.handlers.kop.KafkaTopicConsumerManager; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import io.streamnative.pulsar.handlers.kop.KafkaTopicManager; import io.streamnative.pulsar.handlers.kop.MessageFetchContext; import io.streamnative.pulsar.handlers.kop.MessagePublishContext; @@ -43,16 +44,19 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.Getter; import lombok.ToString; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.MathUtils; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -68,7 +72,10 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; @@ -81,6 +88,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; /** * Analyze result. @@ -109,11 +117,25 @@ public class PartitionLog { private final Time time; private final TopicPartition topicPartition; private final String fullPartitionName; - private final AtomicReference> entryFormatter = new AtomicReference<>(); - private final ProducerStateManager producerStateManager; + @Getter + private volatile ProducerStateManager producerStateManager; private final ImmutableMap entryfilterMap; private final boolean preciseTopicPublishRateLimitingEnable; + private final KafkaTopicLookupService kafkaTopicLookupService; + + private final ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer; + + private final ExecutorService recoveryExecutor; + + @Getter + private volatile PersistentTopic persistentTopic; + + private final CompletableFuture initFuture = new CompletableFuture<>(); + + private volatile Map topicProperties; + + private volatile EntryFormatter entryFormatter; public PartitionLog(KafkaServiceConfiguration kafkaConfig, RequestStats requestStats, @@ -121,55 +143,93 @@ public PartitionLog(KafkaServiceConfiguration kafkaConfig, TopicPartition topicPartition, String fullPartitionName, ImmutableMap entryfilterMap, - ProducerStateManager producerStateManager) { + KafkaTopicLookupService kafkaTopicLookupService, + ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer, + OrderedExecutor recoveryExecutor) { this.kafkaConfig = kafkaConfig; this.entryfilterMap = entryfilterMap; this.requestStats = requestStats; this.time = time; this.topicPartition = topicPartition; this.fullPartitionName = fullPartitionName; - this.producerStateManager = producerStateManager; this.preciseTopicPublishRateLimitingEnable = kafkaConfig.isPreciseTopicPublishRateLimiterEnable(); + this.kafkaTopicLookupService = kafkaTopicLookupService; + this.producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBuffer; + this.recoveryExecutor = recoveryExecutor.chooseThread(fullPartitionName); } - private CompletableFuture getEntryFormatter( - CompletableFuture> topicFuture) { - return entryFormatter.accumulateAndGet(null, (current, ___) -> { - if (current != null) { - return current; + public CompletableFuture initialise() { + loadTopicProperties().whenComplete((___, errorLoadTopic) -> { + if (errorLoadTopic != null) { + initFuture.completeExceptionally(errorLoadTopic); + return; } - return topicFuture.thenCompose((persistentTopic) -> { - if (!persistentTopic.isPresent()) { - throw new IllegalStateException("Topic " + fullPartitionName + " is not ready"); - } - TopicName logicalName = TopicName.get(persistentTopic.get().getName()); - TopicName actualName; - if (logicalName.isPartitioned()) { - actualName = TopicName.getPartitionedTopicName(persistentTopic.get().getName()); - } else { - actualName = logicalName; - } - CompletableFuture result = persistentTopic.get().getBrokerService() - .fetchPartitionedTopicMetadataAsync(actualName) - .thenApply(metadata -> { - if (metadata.partitions > 0) { - return buildEntryFormatter(metadata.properties); - } else { - return buildEntryFormatter(persistentTopic.get().getManagedLedger().getProperties()); - } + if (kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { + producerStateManager + .recover(this, recoveryExecutor) + .thenRun(() -> initFuture.complete(this)) + .exceptionally(error -> { + initFuture.completeExceptionally(error); + return null; }); + } else { + initFuture.complete(this); + } + }); + return initFuture; + } + + public CompletableFuture awaitInitialisation() { + return initFuture; + } - result.exceptionally(ex -> { - // this error will happen in a separate thread, and during the execution of - // accumulateAndGet - // the only thing we can do is to clear the cache - log.error("Cannot create the EntryFormatter for {}", fullPartitionName, ex); - entryFormatter.set(null); - return null; + public boolean isInitialised() { + return initFuture.isDone() && !initFuture.isCompletedExceptionally(); + } + + public boolean isInitialisationFailed() { + return initFuture.isDone() && initFuture.isCompletedExceptionally(); + } + + private CompletableFuture loadTopicProperties() { + CompletableFuture> persistentTopicFuture = + kafkaTopicLookupService.getTopic(fullPartitionName, this); + return persistentTopicFuture + .thenCompose(topic -> fetchTopicProperties(topic)) + .thenAccept(properties -> { + this.topicProperties = properties; + log.info("Topic properties for {} are {}", fullPartitionName, properties); + this.entryFormatter = buildEntryFormatter(topicProperties); + this.producerStateManager = + new ProducerStateManager(fullPartitionName, + producerStateManagerSnapshotBuffer, + kafkaConfig.getKafkaTxnProducerStateTopicSnapshotIntervalSeconds()); }); - return result; - }); - }); + } + + private CompletableFuture> fetchTopicProperties(Optional persistentTopic) { + if (!persistentTopic.isPresent()) { + log.info("Topic {} not loaded here", fullPartitionName); + return FutureUtil.failedFuture(new NotLeaderOrFollowerException()); + } + this.persistentTopic = persistentTopic.get(); + TopicName logicalName = TopicName.get(persistentTopic.get().getName()); + TopicName actualName; + if (logicalName.isPartitioned()) { + actualName = TopicName.getPartitionedTopicName(persistentTopic.get().getName()); + } else { + actualName = logicalName; + } + return persistentTopic.get().getBrokerService() + .fetchPartitionedTopicMetadataAsync(actualName) + .thenApply(metadata -> { + if (metadata.partitions > 0) { + return metadata.properties; + } else { + return persistentTopic.get().getManagedLedger().getProperties(); + } + }) + .thenApply(map -> map != null ? map : Collections.emptyMap()); } private EntryFormatter buildEntryFormatter(Map topicProperties) { @@ -225,6 +285,8 @@ protected ReadRecordsResult newObject(Handle handle) { private Position lastPosition; private Errors errors; + private PartitionLog partitionLog; + private ReadRecordsResult(Recycler.Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } @@ -237,14 +299,16 @@ public static ReadRecordsResult get(DecodeResult decodeResult, List abortedTransactions, long highWatermark, long lastStableOffset, - Position lastPosition) { + Position lastPosition, + PartitionLog partitionLog) { return ReadRecordsResult.get( decodeResult, abortedTransactions, highWatermark, lastStableOffset, lastPosition, - null); + null, + partitionLog); } public static ReadRecordsResult get(DecodeResult decodeResult, @@ -252,7 +316,8 @@ public static ReadRecordsResult get(DecodeResult decodeResult, long highWatermark, long lastStableOffset, Position lastPosition, - Errors errors) { + Errors errors, + PartitionLog partitionLog) { ReadRecordsResult readRecordsResult = RECYCLER.get(); readRecordsResult.decodeResult = decodeResult; readRecordsResult.abortedTransactions = abortedTransactions; @@ -260,31 +325,35 @@ public static ReadRecordsResult get(DecodeResult decodeResult, readRecordsResult.lastStableOffset = lastStableOffset; readRecordsResult.lastPosition = lastPosition; readRecordsResult.errors = errors; + readRecordsResult.partitionLog = partitionLog; return readRecordsResult; } public static ReadRecordsResult empty(long highWatermark, - long lastStableOffset, - Position lastPosition) { + long lastStableOffset, + Position lastPosition, + PartitionLog partitionLog) { return ReadRecordsResult.get( DecodeResult.get(MemoryRecords.EMPTY), Collections.emptyList(), highWatermark, lastStableOffset, - lastPosition); + lastPosition, + partitionLog); } - public static ReadRecordsResult error(Errors errors) { - return ReadRecordsResult.error(PositionImpl.EARLIEST, errors); + public static ReadRecordsResult error(Errors errors, PartitionLog partitionLog) { + return ReadRecordsResult.error(PositionImpl.EARLIEST, errors, partitionLog); } - public static ReadRecordsResult error(Position position, Errors errors) { + public static ReadRecordsResult error(Position position, Errors errors, PartitionLog partitionLog) { return ReadRecordsResult.get(null, null, -1, -1, position, - errors); + errors, + partitionLog); } public FetchResponse.PartitionData toPartitionData() { @@ -321,6 +390,7 @@ public void recycle() { this.lastStableOffset = -1; this.highWatermark = -1; this.abortedTransactions = null; + this.partitionLog = null; if (this.decodeResult != null) { this.decodeResult.recycle(); this.decodeResult = null; @@ -336,8 +406,11 @@ public enum AppendOrigin { Log } + // TODO: the first and last offset only make sense here if there is only a sinlge completed txn. + // It might make sense to refactor this method. public AnalyzeResult analyzeAndValidateProducerState(MemoryRecords records, Optional firstOffset, + Long lastOffset, AppendOrigin origin) { Map updatedProducers = Maps.newHashMap(); List completedTxns = Lists.newArrayList(); @@ -348,7 +421,12 @@ public AnalyzeResult analyzeAndValidateProducerState(MemoryRecords records, // compute the last stable offset without relying on additional index lookups. Optional maybeCompletedTxn = updateProducers(batch, updatedProducers, firstOffset, origin); - maybeCompletedTxn.ifPresent(completedTxns::add); + maybeCompletedTxn.ifPresent(txn -> { + if (lastOffset != null) { + txn.lastOffset(lastOffset); + } + completedTxns.add(txn); + }); } } @@ -362,7 +440,7 @@ private Optional updateProducers( AppendOrigin origin) { Long producerId = batch.producerId(); ProducerAppendInfo appendInfo = - producers.computeIfAbsent(producerId, pid -> producerStateManager.prepareUpdate(producerId, origin)); + producers.computeIfAbsent(producerId, pid -> producerStateManager.prepareUpdate(pid, origin)); return appendInfo.append(batch, firstOffset); } @@ -386,6 +464,12 @@ public CompletableFuture appendRecords(final MemoryRecords records, final AppendRecordsContext appendRecordsContext) { CompletableFuture appendFuture = new CompletableFuture<>(); KafkaTopicManager topicManager = appendRecordsContext.getTopicManager(); + if (topicManager == null) { + log.error("topicManager is null for {}???", fullPartitionName, + new Exception("topicManager is null for " + fullPartitionName).fillInStackTrace()); + return CompletableFuture + .failedFuture(new KafkaStorageException("topicManager is null for " + fullPartitionName)); + } final long beforeRecordsProcess = time.nanoseconds(); try { final LogAppendInfo appendInfo = analyzeAndValidateRecords(records); @@ -398,38 +482,14 @@ public CompletableFuture appendRecords(final MemoryRecords records, MemoryRecords validRecords = trimInvalidBytes(records, appendInfo); // Append Message into pulsar - final CompletableFuture> topicFuture = - topicManager.getTopic(fullPartitionName); - if (topicFuture.isCompletedExceptionally()) { - topicFuture.exceptionally(e -> { - appendFuture.completeExceptionally(e); - return Optional.empty(); - }); - return appendFuture; - } - if (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent()) { - appendFuture.completeExceptionally(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - return appendFuture; - } - CompletableFuture entryFormatterHandle = getEntryFormatter(topicFuture); - final Consumer> persistentTopicConsumer = persistentTopicOpt -> { - if (!persistentTopicOpt.isPresent()) { - appendFuture.completeExceptionally(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - return; + final Consumer sequentialExecutor = ___ -> { + final ManagedLedger managedLedger = persistentTopic.getManagedLedger(); + if (entryFormatter instanceof KafkaMixedEntryFormatter) { + final long logEndOffset = MessageMetadataUtils.getLogEndOffset(managedLedger); + appendInfo.firstOffset(Optional.of(logEndOffset)); } - - final ManagedLedger managedLedger = persistentTopicOpt.get().getManagedLedger(); - entryFormatterHandle.whenComplete((entryFormatter, ee) ->{ - if (ee != null) { - appendFuture.completeExceptionally(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - return; - } - if (entryFormatter instanceof KafkaMixedEntryFormatter) { - final long logEndOffset = MessageMetadataUtils.getLogEndOffset(managedLedger); - appendInfo.firstOffset(Optional.of(logEndOffset)); - } - final EncodeRequest encodeRequest = EncodeRequest.get(validRecords, appendInfo); + final EncodeRequest encodeRequest = EncodeRequest.get(validRecords, appendInfo); requestStats.getPendingTopicLatencyStats().registerSuccessfulEvent( time.nanoseconds() - beforeRecordsProcess, TimeUnit.NANOSECONDS); @@ -443,17 +503,16 @@ public CompletableFuture appendRecords(final MemoryRecords records, appendRecordsContext.getStartSendOperationForThrottling() .accept(encodeResult.getEncodedByteBuf().readableBytes()); - publishMessages(persistentTopicOpt, + publishMessages( appendFuture, appendInfo, encodeResult, appendRecordsContext); - }); }; appendRecordsContext.getPendingTopicFuturesMap() .computeIfAbsent(topicPartition, ignored -> new PendingTopicFutures(requestStats)) - .addListener(topicFuture, persistentTopicConsumer, appendFuture::completeExceptionally); + .addListener(initFuture, sequentialExecutor, appendFuture::completeExceptionally); } catch (Exception exception) { log.error("Failed to handle produce request for {}", topicPartition, exception); appendFuture.completeExceptionally(exception); @@ -462,23 +521,7 @@ public CompletableFuture appendRecords(final MemoryRecords records, return appendFuture; } - public Position getLastPosition(KafkaTopicManager topicManager) { - final CompletableFuture> topicFuture = - topicManager.getTopic(fullPartitionName); - if (topicFuture.isCompletedExceptionally()) { - return PositionImpl.EARLIEST; - } - if (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent()) { - return PositionImpl.EARLIEST; - } - Optional topicOpt = topicFuture.getNow(Optional.empty()); - if (topicOpt.isPresent()) { - return getLastPosition(topicOpt.get()); - } - return PositionImpl.EARLIEST; - } - - private Position getLastPosition(PersistentTopic persistentTopic) { + public Position getLastPosition() { return persistentTopic.getLastPosition(); } @@ -501,11 +544,11 @@ public CompletableFuture readRecords(final FetchRequest.Parti log.debug("Fetch for {}: no tcm for topic {} return NOT_LEADER_FOR_PARTITION.", topicPartition, fullPartitionName); } - future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER)); + future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER, this)); return; } if (checkOffsetOutOfRange(tcm, offset, topicPartition, startPrepareMetadataNanos)) { - future.complete(ReadRecordsResult.error(Errors.OFFSET_OUT_OF_RANGE)); + future.complete(ReadRecordsResult.error(Errors.OFFSET_OUT_OF_RANGE, this)); return; } @@ -520,16 +563,16 @@ public CompletableFuture readRecords(final FetchRequest.Parti log.warn("KafkaTopicConsumerManager is closed, remove TCM of {}", fullPartitionName); registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); context.getSharedState().getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(fullPartitionName); - future.complete(ReadRecordsResult.error(Errors.NONE)); + future.complete(ReadRecordsResult.error(Errors.NONE, this)); return; } cursorFuture.thenAccept((cursorLongPair) -> { if (cursorLongPair == null) { log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. " - + "Fetch for topic return error.", offset, topicPartition); + + "Fetch for topic return error.", offset, topicPartition); registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); - future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER)); + future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER, this)); return; } final ManagedCursor cursor = cursorLongPair.getLeft(); @@ -546,37 +589,39 @@ public CompletableFuture readRecords(final FetchRequest.Parti ReadRecordsResult.empty( highWaterMark, firstUndecidedOffset, - tcm.getManagedLedger().getLastConfirmedEntry() + tcm.getManagedLedger().getLastConfirmedEntry(), this ) ); return; } } - readEntries(cursor, topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes, topicManager) - .whenComplete((entries, throwable) -> { - if (throwable != null) { - tcm.deleteOneCursorAsync(cursorLongPair.getLeft(), - "cursor.readEntry fail. deleteCursor"); - if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException - || throwable instanceof ManagedLedgerException.ManagedLedgerFencedException) { - future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER)); - return; - } - log.error("Read entry error on {}", partitionData, throwable); - future.complete(ReadRecordsResult.error(Errors.UNKNOWN_SERVER_ERROR)); - return; - } - long readSize = entries.stream().mapToLong(Entry::getLength).sum(); - limitBytes.addAndGet(-1 * readSize); - // Add new offset back to TCM after entries are read successfully - tcm.add(cursorOffset.get(), Pair.of(cursor, cursorOffset.get())); - handleEntries(future, entries, partitionData, tcm, cursor, readCommitted, context); - }); + readEntries(cursor, topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes, + fullPartitionName -> { + topicManager.invalidateCacheForFencedManagerLedgerOnTopic(fullPartitionName); + }).whenComplete((entries, throwable) -> { + if (throwable != null) { + tcm.deleteOneCursorAsync(cursorLongPair.getLeft(), + "cursor.readEntry fail. deleteCursor"); + if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException + || throwable instanceof ManagedLedgerException.ManagedLedgerFencedException) { + future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER, this)); + return; + } + log.error("Read entry error on {}", partitionData, throwable); + future.complete(ReadRecordsResult.error(Errors.UNKNOWN_SERVER_ERROR, this)); + return; + } + long readSize = entries.stream().mapToLong(Entry::getLength).sum(); + limitBytes.addAndGet(-1 * readSize); + // Add new offset back to TCM after entries are read successfully + tcm.add(cursorOffset.get(), Pair.of(cursor, cursorOffset.get())); + handleEntries(future, entries, partitionData, tcm, cursor, readCommitted, context); + }); }).exceptionally(ex -> { registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); context.getSharedState() .getKafkaTopicConsumerManagerCache().removeAndCloseByTopic(fullPartitionName); - future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER)); + future.complete(ReadRecordsResult.error(Errors.NOT_LEADER_OR_FOLLOWER, this)); return null; }); }); @@ -602,7 +647,9 @@ private boolean checkOffsetOutOfRange(KafkaTopicConsumerManager tcm, log.error("Received request for offset {} for partition {}, " + "but we only have entries less than {}.", offset, topicPartition, logEndOffset); - registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); + if (startPrepareMetadataNanos > 0) { + registerPrepareMetadataFailedEvent(startPrepareMetadataNanos); + } return true; } return false; @@ -626,11 +673,12 @@ private void handleEntries(final CompletableFuture future, final List committedEntries = readCommitted ? getCommittedEntries(entries, lso) : entries; if (log.isDebugEnabled()) { - log.debug("Read {} entries but only {} entries are committed", - entries.size(), committedEntries.size()); + log.debug("Read {} entries but only {} entries are committed, lso {}, highWatermark {}", + entries.size(), committedEntries.size(), lso, highWatermark); } if (committedEntries.isEmpty()) { - future.complete(ReadRecordsResult.error(tcm.getManagedLedger().getLastConfirmedEntry(), Errors.NONE)); + future.complete(ReadRecordsResult.error(tcm.getManagedLedger().getLastConfirmedEntry(), Errors.NONE, + this)); return; } @@ -641,14 +689,6 @@ private void handleEntries(final CompletableFuture future, final CompletableFuture groupNameFuture = kafkaConfig.isKopEnableGroupLevelConsumerMetrics() ? context.getCurrentConnectedGroupNameAsync() : CompletableFuture.completedFuture(null); - final CompletableFuture entryFormatterHandle = - getEntryFormatter(context.getTopicManager().getTopic(fullPartitionName)); - - entryFormatterHandle.whenComplete((entryFormatter, ee) -> { - if (ee != null) { - future.complete(ReadRecordsResult.error(Errors.KAFKA_STORAGE_ERROR)); - return; - } groupNameFuture.whenCompleteAsync((groupName, ex) -> { if (ex != null) { log.error("Get groupId failed.", ex); @@ -672,16 +712,13 @@ private void handleEntries(final CompletableFuture future, log.debug("Partition {} read entry completed in {} ns", topicPartition, MathUtils.nowInNano() - startDecodingEntriesNanos); } - - future.complete(ReadRecordsResult.get(decodeResult, abortedTransactions, highWatermark, lso, lastPosition)); + future.complete(ReadRecordsResult + .get(decodeResult, abortedTransactions, highWatermark, lso, lastPosition, this)); }, context.getDecodeExecutor()).exceptionally(ex -> { log.error("Partition {} read entry exceptionally. ", topicPartition, ex); - future.complete(ReadRecordsResult.error(Errors.KAFKA_STORAGE_ERROR)); + future.complete(ReadRecordsResult.error(Errors.KAFKA_STORAGE_ERROR, this)); return null; }); - - - }); } private static byte getCompatibleMagic(short apiVersion) { @@ -736,7 +773,7 @@ private CompletableFuture> readEntries(final ManagedCursor cursor, final AtomicLong cursorOffset, final int maxReadEntriesNum, final long adjustedMaxBytes, - final KafkaTopicManager topicManager) { + final Consumer invalidateCacheOnTopic) { final OpStatsLogger messageReadStats = requestStats.getMessageReadStats(); // read readeEntryNum size entry. final long startReadingMessagesNanos = MathUtils.nowInNano(); @@ -793,7 +830,7 @@ public void readEntriesComplete(List entries, Object ctx) { public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { log.error("Error read entry for topic: {}", fullPartitionName); if (exception instanceof ManagedLedgerException.ManagedLedgerFencedException) { - topicManager.invalidateCacheForFencedManagerLedgerOnTopic(fullPartitionName); + invalidateCacheOnTopic.accept(fullPartitionName); } messageReadStats.registerFailedEvent( MathUtils.elapsedNanos(startReadingMessagesNanos), TimeUnit.NANOSECONDS); @@ -823,18 +860,10 @@ public void markDeleteFailed(ManagedLedgerException e, Object ctx) { }, null); } - private void publishMessages(final Optional persistentTopicOpt, - final CompletableFuture appendFuture, + private void publishMessages(final CompletableFuture appendFuture, final LogAppendInfo appendInfo, final EncodeResult encodeResult, final AppendRecordsContext appendRecordsContext) { - if (!persistentTopicOpt.isPresent()) { - encodeResult.recycle(); - // It will trigger a retry send of Kafka client - appendFuture.completeExceptionally(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - return; - } - PersistentTopic persistentTopic = persistentTopicOpt.get(); checkAndRecordPublishQuota(persistentTopic, appendInfo.validBytes(), appendInfo.numMessages(), appendRecordsContext); if (persistentTopic.isSystemTopic()) { @@ -867,21 +896,8 @@ private void publishMessages(final Optional persistentTopicOpt, final long lastOffset = offset + numMessages - 1; AnalyzeResult analyzeResult = analyzeAndValidateProducerState( - encodeResult.getRecords(), Optional.of(offset), AppendOrigin.Client); - analyzeResult.updatedProducers().forEach((pid, producerAppendInfo) -> { - if (log.isDebugEnabled()) { - log.debug("Append pid: [{}], appendInfo: [{}], lastOffset: [{}]", - pid, producerAppendInfo, lastOffset); - } - producerStateManager.update(producerAppendInfo); - }); - analyzeResult.completedTxns().forEach(completedTxn -> { - // update to real last offset - completedTxn.lastOffset(lastOffset - 1); - long lastStableOffset = producerStateManager.lastStableOffset(completedTxn); - producerStateManager.updateTxnIndex(completedTxn, lastStableOffset); - producerStateManager.completeTxn(completedTxn); - }); + encodeResult.getRecords(), Optional.of(offset), lastOffset, AppendOrigin.Client); + updateProducerStateManager(lastOffset, analyzeResult); appendFuture.complete(offset); } else { @@ -895,7 +911,7 @@ private void publishMessages(final Optional persistentTopicOpt, } private void checkAndRecordPublishQuota(Topic topic, int msgSize, int numMessages, - AppendRecordsContext appendRecordsContext) { + AppendRecordsContext appendRecordsContext) { final boolean isPublishRateExceeded; if (preciseTopicPublishRateLimitingEnable) { boolean isPreciseTopicPublishRateExceeded = @@ -1045,4 +1061,271 @@ private MemoryRecords trimInvalidBytes(MemoryRecords records, LogAppendInfo info return MemoryRecords.readableRecords(validByteBuffer); } } + + public CompletableFuture fetchOldestAvailableIndexFromTopic() { + final CompletableFuture future = new CompletableFuture<>(); + + // The future that is returned by getTopicConsumerManager is always completed normally + KafkaTopicConsumerManager tcm = new KafkaTopicConsumerManager("purge-aborted-tx", + true, persistentTopic); + future.whenComplete((___, error) -> { + // release resources in any case + try { + tcm.close(); + } catch (Exception err) { + log.error("Cannot safely close the temporary KafkaTopicConsumerManager for {}", + fullPartitionName, err); + } + }); + + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) tcm.getManagedLedger(); + if (managedLedger.getNumberOfEntries() == 0) { + long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger); + log.info("First offset for topic {} is {} as the topic is empty (numberOfEntries=0)", + fullPartitionName, currentOffset); + future.complete(currentOffset); + + return future; + } + // this is a DUMMY entry with -1 + PositionImpl firstPosition = managedLedger.getFirstPosition(); + // look for the first entry with data + PositionImpl nextValidPosition = managedLedger.getNextValidPosition(firstPosition); + + managedLedger.asyncReadEntry(nextValidPosition, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + try { + long startOffset = MessageMetadataUtils.peekBaseOffsetFromEntry(entry); + log.info("First offset for topic {} is {} - position {}", fullPartitionName, + startOffset, entry.getPosition()); + future.complete(startOffset); + } catch (Exception err) { + future.completeExceptionally(err); + } finally { + entry.release(); + } + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + + return future; + + } + + public CompletableFuture takeProducerSnapshot() { + return initFuture.thenCompose((___) -> { + // snapshot can be taken only on the same thread that is used for writes + ManagedLedgerImpl ml = (ManagedLedgerImpl) getPersistentTopic().getManagedLedger(); + Executor executorService = ml.getExecutor(); + return this + .getProducerStateManager() + .takeSnapshot(executorService); + }); + } + + public CompletableFuture recoverTxEntries( + long offset, + Executor executor) { + if (!kafkaConfig.isKafkaTransactionCoordinatorEnabled()) { + // no need to scan the topic, because transactions are disabled + return CompletableFuture.completedFuture(Long.valueOf(0)); + } + return fetchOldestAvailableIndexFromTopic().thenCompose((minOffset -> { + log.info("start recoverTxEntries for {} at offset {} minOffset {}", + fullPartitionName, offset, minOffset); + final CompletableFuture future = new CompletableFuture<>(); + + // The future that is returned by getTopicConsumerManager is always completed normally + KafkaTopicConsumerManager tcm = new KafkaTopicConsumerManager("recover-tx", + true, persistentTopic); + future.whenComplete((___, error) -> { + // release resources in any case + try { + tcm.close(); + } catch (Exception err) { + log.error("Cannot safely close the temporary KafkaTopicConsumerManager for {}", + fullPartitionName, err); + } + }); + + final long offsetToStart; + if (checkOffsetOutOfRange(tcm, offset, topicPartition, -1)) { + offsetToStart = 0; + log.info("recoverTxEntries for {}: offset {} is out-of-range, " + + "maybe the topic has been deleted/recreated, " + + "starting recovery from {}", + topicPartition, offset, offsetToStart); + } else { + offsetToStart = offset; + } + + producerStateManager.handleMissingDataBeforeRecovery(minOffset, offset); + + if (log.isDebugEnabled()) { + log.debug("recoverTxEntries for {}: remove tcm to get cursor for fetch offset: {} .", + topicPartition, offsetToStart); + } + + + final CompletableFuture> cursorFuture = tcm.removeCursorFuture(offsetToStart); + + if (cursorFuture == null) { + // tcm is closed, just return a NONE error because the channel may be still active + log.warn("KafkaTopicConsumerManager is closed, remove TCM of {}", fullPartitionName); + future.completeExceptionally(new NotLeaderOrFollowerException()); + return future; + } + + cursorFuture.thenAccept((cursorLongPair) -> { + + if (cursorLongPair == null) { + log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. " + + "Fetch for topic return error.", offsetToStart, topicPartition); + future.completeExceptionally(new NotLeaderOrFollowerException()); + return; + } + final ManagedCursor cursor = cursorLongPair.getLeft(); + final AtomicLong cursorOffset = new AtomicLong(cursorLongPair.getRight()); + + AtomicLong entryCounter = new AtomicLong(); + readNextEntriesForRecovery(cursor, cursorOffset, tcm, entryCounter, + future, executor); + + }).exceptionally(ex -> { + future.completeExceptionally(new NotLeaderOrFollowerException()); + return null; + }); + return future; + })); + } + + + private void readNextEntriesForRecovery(ManagedCursor cursor, AtomicLong cursorOffset, + KafkaTopicConsumerManager tcm, + AtomicLong entryCounter, + CompletableFuture future, Executor executor) { + if (log.isDebugEnabled()) { + log.debug("readNextEntriesForRecovery {} cursorOffset {}", fullPartitionName, cursorOffset); + } + int maxReadEntriesNum = 200; + long adjustedMaxBytes = Long.MAX_VALUE; + readEntries(cursor, topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes, + (partitionName) -> {}) + .whenCompleteAsync((entries, throwable) -> { + if (throwable != null) { + log.error("Read entry error on {}", fullPartitionName, throwable); + tcm.deleteOneCursorAsync(cursor, + "cursor.readEntry fail. deleteCursor"); + if (throwable instanceof ManagedLedgerException.CursorAlreadyClosedException + || throwable instanceof ManagedLedgerException.ManagedLedgerFencedException) { + future.completeExceptionally(new NotLeaderOrFollowerException()); + return; + } + future.completeExceptionally(new UnknownServerException(throwable)); + return; + } + + // Add new offset back to TCM after entries are read successfully + tcm.add(cursorOffset.get(), Pair.of(cursor, cursorOffset.get())); + + if (entries.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("No more entries to recover for {}", fullPartitionName); + } + future.completeAsync(() -> entryCounter.get(), executor); + return; + } + + CompletableFuture decodedEntries = new CompletableFuture<>(); + decodeEntriesForRecovery(decodedEntries, entries); + + decodedEntries.thenAccept((decodeResult) -> { + try { + MemoryRecords records = decodeResult.getRecords(); + // When we retrieve many entries, this firstOffset's baseOffset is not necessarily + // the base offset for all records. + Optional firstOffset = Optional + .ofNullable(records.firstBatch()) + .map(batch -> batch.baseOffset()); + + long[] lastOffSetHolder = {-1L}; + records.batches().forEach(batch -> { + batch.forEach(record -> { + if (lastOffSetHolder[0] < record.offset()) { + lastOffSetHolder[0] = record.offset(); + } + entryCounter.incrementAndGet(); + }); + }); + long lastOffset = lastOffSetHolder[0]; + + if (log.isDebugEnabled()) { + log.debug("Read some entries while recovering {} firstOffSet {} lastOffset {}", + fullPartitionName, + firstOffset.orElse(null), lastOffset); + } + + // Get the relevant offsets from each record + AnalyzeResult analyzeResult = analyzeAndValidateProducerState(records, + Optional.empty(), null, AppendOrigin.Log); + + updateProducerStateManager(lastOffset, analyzeResult); + if (log.isDebugEnabled()) { + log.debug("Completed recovery of batch {} {}", analyzeResult, fullPartitionName); + } + } finally { + decodeResult.recycle(); + } + readNextEntriesForRecovery(cursor, cursorOffset, tcm, entryCounter, future, executor); + + }).exceptionally(error -> { + log.error("Bad error while recovering {}", fullPartitionName, error); + future.completeExceptionally(error); + return null; + }); + }, executor); + } + + private void updateProducerStateManager(long lastOffset, AnalyzeResult analyzeResult) { + analyzeResult.updatedProducers().forEach((pid, producerAppendInfo) -> { + if (log.isDebugEnabled()) { + log.debug("Append pid: [{}], appendInfo: [{}], lastOffset: [{}]", + pid, producerAppendInfo, lastOffset); + } + producerStateManager.update(producerAppendInfo); + }); + analyzeResult.completedTxns().forEach(completedTxn -> { + long lastStableOffset = producerStateManager.lastStableOffset(completedTxn); + producerStateManager.updateTxnIndex(completedTxn, lastStableOffset); + producerStateManager.completeTxn(completedTxn); + }); + producerStateManager.updateMapEndOffset(lastOffset); + + // do system clean up stuff in this thread + producerStateManager.maybeTakeSnapshot(recoveryExecutor); + } + + private void decodeEntriesForRecovery(final CompletableFuture future, + final List entries) { + + if (log.isDebugEnabled()) { + log.debug("Read {} entries", entries.size()); + } + final byte magic = RecordBatch.CURRENT_MAGIC_VALUE; + final long startDecodingEntriesNanos = MathUtils.nowInNano(); + try { + DecodeResult decodeResult = entryFormatter.decode(entries, magic); + requestStats.getFetchDecodeStats().registerSuccessfulEvent( + MathUtils.elapsedNanos(startDecodingEntriesNanos), TimeUnit.NANOSECONDS); + future.complete(decodeResult); + } catch (Exception error) { + future.completeExceptionally(error); + } + } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java index a54f9a93b4..5ee8a6aeeb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogManager.java @@ -16,18 +16,28 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import io.streamnative.pulsar.handlers.kop.RequestStats; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; /** * Manage {@link PartitionLog}. */ @AllArgsConstructor +@Slf4j public class PartitionLogManager { private final KafkaServiceConfiguration kafkaConfig; @@ -36,32 +46,79 @@ public class PartitionLogManager { private final Time time; private final ImmutableMap entryfilterMap; + private final KafkaTopicLookupService kafkaTopicLookupService; + + private final Function producerStateManagerSnapshotBuffer; + + private final OrderedExecutor recoveryExecutor; + public PartitionLogManager(KafkaServiceConfiguration kafkaConfig, RequestStats requestStats, final ImmutableMap entryfilterMap, - Time time) { + Time time, + KafkaTopicLookupService kafkaTopicLookupService, + Function producerStateManagerSnapshotBuffer, + OrderedExecutor recoveryExecutor) { this.kafkaConfig = kafkaConfig; this.requestStats = requestStats; this.logMap = Maps.newConcurrentMap(); this.entryfilterMap = entryfilterMap; this.time = time; + this.kafkaTopicLookupService = kafkaTopicLookupService; + this.producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBuffer; + this.recoveryExecutor = recoveryExecutor; } public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix) { String kopTopic = KopTopic.toString(topicPartition, namespacePrefix); + String tenant = TopicName.get(kopTopic).getTenant(); + ProducerStateManagerSnapshotBuffer prodPerTenant = producerStateManagerSnapshotBuffer.apply(tenant); + PartitionLog res = logMap.computeIfAbsent(kopTopic, key -> { + PartitionLog partitionLog = new PartitionLog(kafkaConfig, requestStats, + time, topicPartition, key, entryfilterMap, + kafkaTopicLookupService, + prodPerTenant, recoveryExecutor); + + CompletableFuture initialiseResult = partitionLog + .initialise(); - return logMap.computeIfAbsent(kopTopic, key -> { - return new PartitionLog(kafkaConfig, requestStats, time, topicPartition, kopTopic, entryfilterMap, - new ProducerStateManager(kopTopic)); + initialiseResult.whenComplete((___, error) -> { + if (error != null) { + // in case of failure we have to remove the CompletableFuture from the map + log.error("Recovery of {} failed", key, error); + logMap.remove(key, partitionLog); + } + }); + + return partitionLog; }); + if (res.isInitialisationFailed()) { + log.error("Recovery of {} failed", kopTopic); + logMap.remove(kopTopic, res); + } + return res; } public PartitionLog removeLog(String topicName) { + log.info("removePartitionLog {}", topicName); return logMap.remove(topicName); } public int size() { return logMap.size(); } + + public CompletableFuture takeProducerStateSnapshots() { + List> handles = new ArrayList<>(); + logMap.values().forEach(log -> { + if (log.isInitialised()) { + handles.add(log + .getProducerStateManager() + .takeSnapshot(recoveryExecutor) + .thenApply(___ -> null)); + } + }); + return FutureUtil.waitForAll(handles); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerAppendInfo.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerAppendInfo.java index 0629d99196..2745f4ad97 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerAppendInfo.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerAppendInfo.java @@ -40,7 +40,7 @@ public class ProducerAppendInfo { private final String topicPartition; // The id of the producer appending to the log - private final Long producerId; + private final long producerId; // The current entry associated with the producer id which contains metadata for a fixed number of // the most recent appends made by the producer. Validation of the first incoming append will @@ -69,10 +69,11 @@ public ProducerAppendInfo(String topicPartition, initUpdatedEntry(); } - private void checkProducerEpoch(Short producerEpoch) { - if (producerEpoch < updatedEntry.producerEpoch()) { - String message = String.format("Producer's epoch in %s is %s, which is smaller than the last seen " - + "epoch %s", topicPartition, producerEpoch, currentEntry.producerEpoch()); + private void checkProducerEpoch(short producerEpoch) { + if (updatedEntry.producerEpoch() != null + && producerEpoch < updatedEntry.producerEpoch()) { + String message = String.format("Producer %s's epoch in %s is %s, which is smaller than the last seen " + + "epoch %s", producerId, topicPartition, producerEpoch, currentEntry.producerEpoch()); throw new IllegalArgumentException(message); } } @@ -126,9 +127,9 @@ public void updateCurrentTxnFirstOffset(Boolean isTransactional, long firstOffse public Optional appendEndTxnMarker( EndTransactionMarker endTxnMarker, - Short producerEpoch, - Long offset, - Long timestamp) { + short producerEpoch, + long offset, + long timestamp) { checkProducerEpoch(producerEpoch); // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateEntry.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateEntry.java index b45b8630c3..735ce9987d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateEntry.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateEntry.java @@ -30,7 +30,7 @@ @AllArgsConstructor public class ProducerStateEntry { - private Long producerId; + private long producerId; private Short producerEpoch; private Integer coordinatorEpoch; private Long lastTimestamp; @@ -38,7 +38,8 @@ public class ProducerStateEntry { public boolean maybeUpdateProducerEpoch(Short producerEpoch) { - if (!this.producerEpoch.equals(producerEpoch)) { + if (this.producerEpoch == null + || !this.producerEpoch.equals(producerEpoch)) { this.producerEpoch = producerEpoch; return true; } else { @@ -52,7 +53,7 @@ public void update(ProducerStateEntry nextEntry) { this.lastTimestamp(nextEntry.lastTimestamp); } - public static ProducerStateEntry empty(Long producerId){ + public static ProducerStateEntry empty(long producerId){ return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, Optional.empty()); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java index 08ab544038..bb5aecef0d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java @@ -14,89 +14,27 @@ package io.streamnative.pulsar.handlers.kop.storage; import com.google.common.collect.Maps; -import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.experimental.Accessors; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.util.SafeRunnable; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.FetchResponse; -/** - * AbortedTxn is used cache the aborted index. - */ -@Data -@Accessors(fluent = true) -@AllArgsConstructor -class AbortedTxn { - - private static final int VersionOffset = 0; - private static final int VersionSize = 2; - private static final int ProducerIdOffset = VersionOffset + VersionSize; - private static final int ProducerIdSize = 8; - private static final int FirstOffsetOffset = ProducerIdOffset + ProducerIdSize; - private static final int FirstOffsetSize = 8; - private static final int LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize; - private static final int LastOffsetSize = 8; - private static final int LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize; - private static final int LastStableOffsetSize = 8; - private static final int TotalSize = LastStableOffsetOffset + LastStableOffsetSize; - - private static final Short CurrentVersion = 0; - - private final Long producerId; - private final Long firstOffset; - private final Long lastOffset; - private final Long lastStableOffset; - - protected ByteBuffer toByteBuffer() { - ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TotalSize); - buffer.putShort(CurrentVersion); - buffer.putLong(producerId); - buffer.putLong(firstOffset); - buffer.putLong(lastOffset); - buffer.putLong(lastStableOffset); - buffer.flip(); - return buffer; - } -} - -@Data -@Accessors(fluent = true) -@AllArgsConstructor -class CompletedTxn { - private Long producerId; - private Long firstOffset; - private Long lastOffset; - private Boolean isAborted; -} - -@Data -@Accessors(fluent = true) -@EqualsAndHashCode -class TxnMetadata { - private final long producerId; - private final long firstOffset; - private long lastOffset; - - public TxnMetadata(long producerId, long firstOffset) { - this.producerId = producerId; - this.firstOffset = firstOffset; - } -} - /** * Producer state manager. */ @Slf4j public class ProducerStateManager { + @Getter private final String topicPartition; private final Map producers = Maps.newConcurrentMap(); @@ -104,8 +42,125 @@ public class ProducerStateManager { private final TreeMap ongoingTxns = Maps.newTreeMap(); private final List abortedIndexList = new ArrayList<>(); - public ProducerStateManager(String topicPartition) { + private final ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer; + + private final int kafkaTxnProducerStateTopicSnapshotIntervalSeconds; + + private volatile long mapEndOffset = -1; + + private long lastSnapshotTime; + + + public ProducerStateManager(String topicPartition, + ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer, + int kafkaTxnProducerStateTopicSnapshotIntervalSeconds) { this.topicPartition = topicPartition; + this.producerStateManagerSnapshotBuffer = producerStateManagerSnapshotBuffer; + this.kafkaTxnProducerStateTopicSnapshotIntervalSeconds = kafkaTxnProducerStateTopicSnapshotIntervalSeconds; + this.lastSnapshotTime = System.currentTimeMillis(); + } + + public CompletableFuture recover(PartitionLog partitionLog, Executor executor) { + return producerStateManagerSnapshotBuffer + .readLatestSnapshot(topicPartition) + .thenCompose(snapshot -> applySnapshotAndRecover(snapshot, partitionLog, executor)); + } + + private CompletableFuture applySnapshotAndRecover(ProducerStateManagerSnapshot snapshot, + PartitionLog partitionLog, + Executor executor) { + long offSetPosition = 0; + synchronized (abortedIndexList) { + this.abortedIndexList.clear(); + this.producers.clear(); + this.ongoingTxns.clear(); + if (snapshot != null) { + this.abortedIndexList.addAll(snapshot.getAbortedIndexList()); + this.producers.putAll(snapshot.getProducers()); + this.ongoingTxns.putAll(snapshot.getOngoingTxns()); + this.mapEndOffset = snapshot.getOffset(); + offSetPosition = snapshot.getOffset(); + log.info("Recover topic {} from offset {}", topicPartition, offSetPosition); + log.info("ongoingTxns transactions after recovery {}", snapshot.getOngoingTxns()); + log.info("Aborted transactions after recovery {}", snapshot.getAbortedIndexList()); + } else { + log.info("No snapshot found for topic {}, recovering from the beginning", topicPartition); + } + } + long startRecovery = System.currentTimeMillis(); + // recover from log + CompletableFuture result = partitionLog + .recoverTxEntries(offSetPosition, executor) + .thenApply(numEntries -> { + log.info("Recovery of {} finished. Scanned {} entries, time {} ms, new mapEndOffset {}", + topicPartition, + numEntries, + System.currentTimeMillis() - startRecovery, + mapEndOffset); + return null; + }); + + return result; + } + + public CompletableFuture takeSnapshot(Executor executor) { + CompletableFuture result = new CompletableFuture<>(); + executor.execute(new SafeRunnable() { + @Override + public void safeRun() { + if (mapEndOffset == -1) { + result.complete(null); + return; + } + ProducerStateManagerSnapshot snapshot = getProducerStateManagerSnapshot(); + log.info("Taking snapshot for {} at {}", topicPartition, snapshot); + producerStateManagerSnapshotBuffer + .write(snapshot) + .whenComplete((res, error) -> { + if (error != null) { + result.completeExceptionally(error); + } else { + log.info("Snapshot for {} taken at offset {}", + topicPartition, snapshot.getOffset()); + result.complete(snapshot); + } + }); + } + }); + return result; + } + + void maybeTakeSnapshot(Executor executor) { + if (mapEndOffset == -1) { + return; + } + long now = System.currentTimeMillis(); + long deltaFromLast = (now - lastSnapshotTime) / 1000; + if (log.isDebugEnabled()) { + log.debug("maybeTakeSnapshot deltaFromLast {} vs kafkaTxnProducerStateTopicSnapshotIntervalSeconds {} ", + deltaFromLast, kafkaTxnProducerStateTopicSnapshotIntervalSeconds); + } + if (deltaFromLast < kafkaTxnProducerStateTopicSnapshotIntervalSeconds) { + return; + } + lastSnapshotTime = now; + + takeSnapshot(executor); + } + + private ProducerStateManagerSnapshot getProducerStateManagerSnapshot() { + ProducerStateManagerSnapshot snapshot; + synchronized (abortedIndexList) { + snapshot = new ProducerStateManagerSnapshot(topicPartition, + mapEndOffset, + new HashMap<>(producers), + new TreeMap<>(ongoingTxns), + new ArrayList<>(abortedIndexList)); + } + if (log.isDebugEnabled()) { + log.debug("Snapshot for {}: {}", topicPartition, snapshot); + } + return snapshot; } public ProducerAppendInfo prepareUpdate(Long producerId, PartitionLog.AppendOrigin origin) { @@ -120,7 +175,7 @@ public ProducerAppendInfo prepareUpdate(Long producerId, PartitionLog.AppendOrig */ public long lastStableOffset(CompletedTxn completedTxn) { for (TxnMetadata txnMetadata : ongoingTxns.values()) { - if (!completedTxn.producerId().equals(txnMetadata.producerId())) { + if (completedTxn.producerId() != txnMetadata.producerId()) { return txnMetadata.firstOffset(); } } @@ -129,6 +184,9 @@ public long lastStableOffset(CompletedTxn completedTxn) { public Optional firstUndecidedOffset() { Map.Entry entry = ongoingTxns.firstEntry(); + if (log.isDebugEnabled()) { + log.debug("firstUndecidedOffset {} (ongoingTxns {})", entry, ongoingTxns); + } if (entry == null) { return Optional.empty(); } @@ -173,10 +231,20 @@ public void update(ProducerAppendInfo appendInfo) { } } + public void updateMapEndOffset(long mapEndOffset) { + this.mapEndOffset = mapEndOffset; + } + public void updateTxnIndex(CompletedTxn completedTxn, long lastStableOffset) { if (completedTxn.isAborted()) { - abortedIndexList.add(new AbortedTxn(completedTxn.producerId(), completedTxn.firstOffset(), - completedTxn.lastOffset(), lastStableOffset)); + AbortedTxn abortedTxn = new AbortedTxn(completedTxn.producerId(), completedTxn.firstOffset(), + completedTxn.lastOffset(), lastStableOffset); + if (log.isDebugEnabled()) { + log.debug("Adding new AbortedTxn {}", abortedTxn); + } + synchronized (abortedIndexList) { + abortedIndexList.add(abortedTxn); + } } } @@ -200,4 +268,24 @@ public List getAbortedIndexList(long fetchOffs return abortedTransactions; } + public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset) { + if (mapEndOffset == -1) { + // empty topic + return; + } + // topic has been trimmed + if (snapshotOffset < minOffset) { + log.info("{} handleMissingDataBeforeRecovery mapEndOffset {} snapshotOffset " + + "{} minOffset {} RESETTING STATE", + topicPartition, + mapEndOffset, minOffset); + // topic was not empty (mapEndOffset has some value) + // but there is no more data on the topic (trimmed?) + ongoingTxns.clear(); + abortedIndexList.clear(); + producers.clear(); + mapEndOffset = -1; + } + } + } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java index d6016f625d..40d075840e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java @@ -18,6 +18,7 @@ import io.streamnative.pulsar.handlers.kop.DelayedFetch; import io.streamnative.pulsar.handlers.kop.DelayedProduceAndFetch; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import io.streamnative.pulsar.handlers.kop.MessageFetchContext; import io.streamnative.pulsar.handlers.kop.RequestStats; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; @@ -31,21 +32,26 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; +import org.apache.pulsar.client.api.PulsarClientException; /** * Used to append records. Mapping to Kafka ReplicaManager.scala. @@ -55,6 +61,7 @@ public class ReplicaManager { private final PartitionLogManager logManager; private final DelayedOperationPurgatory producePurgatory; private final DelayedOperationPurgatory fetchPurgatory; + private final String metadataNamespace; public ReplicaManager(KafkaServiceConfiguration kafkaConfig, @@ -62,22 +69,26 @@ public ReplicaManager(KafkaServiceConfiguration kafkaConfig, Time time, ImmutableMap entryfilterMap, DelayedOperationPurgatory producePurgatory, - DelayedOperationPurgatory fetchPurgatory) { + DelayedOperationPurgatory fetchPurgatory, + KafkaTopicLookupService kafkaTopicLookupService, + Function producerStateManagerSnapshotBuffer, + OrderedExecutor recoveryExecutor) { this.logManager = new PartitionLogManager(kafkaConfig, requestStats, entryfilterMap, - time); + time, kafkaTopicLookupService, producerStateManagerSnapshotBuffer, recoveryExecutor); this.producePurgatory = producePurgatory; this.fetchPurgatory = fetchPurgatory; this.metadataNamespace = kafkaConfig.getKafkaMetadataNamespace(); } - public PartitionLog getPartitionLog(TopicPartition topicPartition, String namespacePrefix) { + public PartitionLog getPartitionLog(TopicPartition topicPartition, + String namespacePrefix) { return logManager.getLog(topicPartition, namespacePrefix); } public void removePartitionLog(String topicName) { PartitionLog partitionLog = logManager.removeLog(topicName); if (log.isDebugEnabled() && partitionLog != null) { - log.debug("PartitionLog: {} has bean removed.", partitionLog); + log.debug("PartitionLog: {} has bean removed.", topicName); } } @@ -107,7 +118,7 @@ public void run() { } }); if (log.isDebugEnabled()) { - log.debug("Complete handle appendRecords."); + log.debug("Complete handle appendRecords. {}", responseMap); } completableFuture.complete(responseMap); @@ -130,63 +141,100 @@ public CompletableFuture> final AppendRecordsContext appendRecordsContext) { CompletableFuture> completableFuture = new CompletableFuture<>(); - final AtomicInteger topicPartitionNum = new AtomicInteger(entriesPerPartition.size()); - final Map responseMap = new ConcurrentHashMap<>(); + try { + final AtomicInteger topicPartitionNum = new AtomicInteger(entriesPerPartition.size()); + final Map responseMap = new ConcurrentHashMap<>(); - PendingProduceCallback complete = - new PendingProduceCallback(topicPartitionNum, responseMap, completableFuture, entriesPerPartition); - BiConsumer addPartitionResponse = - (topicPartition, response) -> { - responseMap.put(topicPartition, response); - // reset topicPartitionNum - int restTopicPartitionNum = topicPartitionNum.decrementAndGet(); - if (restTopicPartitionNum < 0) { - return; - } - if (restTopicPartitionNum == 0) { - // If all tasks are sent, cancel the timer tasks to avoid full gc or oom - producePurgatory.checkAndComplete(new DelayedOperationKey.TopicPartitionOperationKey(topicPartition)); - complete.run(); - } - }; - entriesPerPartition.forEach((topicPartition, memoryRecords) -> { - String fullPartitionName = KopTopic.toString(topicPartition, namespacePrefix); - // reject appending to internal topics if it is not allowed - if (!internalTopicsAllowed && KopTopic.isInternalTopic(fullPartitionName, metadataNamespace)) { - addPartitionResponse.accept(topicPartition, new ProduceResponse.PartitionResponse( - Errors.forException(new InvalidTopicException( - String.format("Cannot append to internal topic %s", topicPartition.topic()))))); - } else { - PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix); - if (requiredAcks == 0) { - partitionLog.appendRecords(memoryRecords, origin, appendRecordsContext); + PendingProduceCallback complete = + new PendingProduceCallback(topicPartitionNum, responseMap, completableFuture, entriesPerPartition); + BiConsumer addPartitionResponse = + (topicPartition, response) -> { + if (log.isDebugEnabled()) { + log.debug("Completed produce for {}", topicPartition); + } + responseMap.put(topicPartition, response); + // reset topicPartitionNum + int restTopicPartitionNum = topicPartitionNum.decrementAndGet(); + if (restTopicPartitionNum < 0) { + return; + } + if (restTopicPartitionNum == 0) { + complete.run(); + } + }; + entriesPerPartition.forEach((topicPartition, memoryRecords) -> { + String fullPartitionName = KopTopic.toString(topicPartition, namespacePrefix); + // reject appending to internal topics if it is not allowed + if (!internalTopicsAllowed && KopTopic.isInternalTopic(fullPartitionName, metadataNamespace)) { + addPartitionResponse.accept(topicPartition, new ProduceResponse.PartitionResponse( + Errors.forException(new InvalidTopicException( + String.format("Cannot append to internal topic %s", topicPartition.topic()))))); } else { + PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix); + if (requiredAcks == 0) { + partitionLog.appendRecords(memoryRecords, origin, appendRecordsContext); + return; + } partitionLog.appendRecords(memoryRecords, origin, appendRecordsContext) - .thenAccept(offset -> addPartitionResponse.accept(topicPartition, - new ProduceResponse.PartitionResponse(Errors.NONE, offset, -1L, -1L))) - .exceptionally(ex -> { - addPartitionResponse.accept(topicPartition, - new ProduceResponse.PartitionResponse(Errors.forException(ex.getCause()))); - return null; - }); + .thenAccept(offset -> addPartitionResponse.accept(topicPartition, + new ProduceResponse.PartitionResponse(Errors.NONE, offset, -1L, -1L))) + .exceptionally(ex -> { + if (isCannotLoadTopicError(ex)) { + log.error("Cannot load topic error while handling append for {}", + fullPartitionName, ex); + addPartitionResponse.accept(topicPartition, + new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)); + } else if (ex.getCause() instanceof BrokerServiceException.PersistenceException) { + log.error("Persistence error while handling append for {}", fullPartitionName, ex); + // BrokerServiceException$PersistenceException: + // org.apache.bookkeeper.mledger.ManagedLedgerException: + // org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: + // org.apache.pulsar.metadata.api.MetadataStoreExcept + + addPartitionResponse.accept(topicPartition, + new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)); + } else if (ex.getCause() instanceof PulsarClientException) { + log.error("Error on Pulsar Client while handling append for {}", + fullPartitionName, ex); + + addPartitionResponse.accept(topicPartition, + new ProduceResponse.PartitionResponse(Errors.BROKER_NOT_AVAILABLE)); + } else if (ex.getCause() instanceof NotLeaderOrFollowerException) { + addPartitionResponse.accept(topicPartition, + new ProduceResponse.PartitionResponse(Errors.forException(ex.getCause()))); + } else { + log.error("System error while handling append for {}", fullPartitionName, ex); + addPartitionResponse.accept(topicPartition, + new ProduceResponse.PartitionResponse(Errors.forException(ex.getCause()))); + } + return null; + }); } + }); + // delay produce + if (timeout <= 0) { + complete.run(); + } else { + // producePurgatory will retain a reference to the callback for timeout ms, + // even if the operation succeeds + List delayedCreateKeys = + entriesPerPartition.keySet().stream() + .map(DelayedOperationKey.TopicPartitionOperationKey::new).collect(Collectors.toList()); + DelayedProduceAndFetch delayedProduce = new DelayedProduceAndFetch(timeout, topicPartitionNum, + complete); + producePurgatory.tryCompleteElseWatch(delayedProduce, delayedCreateKeys); } - }); - // delay produce - if (timeout <= 0) { - complete.run(); - } else { - // producePurgatory will retain a reference to the callback for timeout ms, - // even if the operation succeeds - List delayedCreateKeys = - entriesPerPartition.keySet().stream() - .map(DelayedOperationKey.TopicPartitionOperationKey::new).collect(Collectors.toList()); - DelayedProduceAndFetch delayedProduce = new DelayedProduceAndFetch(timeout, topicPartitionNum, complete); - producePurgatory.tryCompleteElseWatch(delayedProduce, delayedCreateKeys); + } catch (Throwable error) { + log.error("Internal error", error); + completableFuture.completeExceptionally(error); } return completableFuture; } + private static boolean isCannotLoadTopicError(Throwable error) { + String asString = error + ""; + return asString.contains("Failed to load topic within timeout"); + } public CompletableFuture> fetchMessage( final long timeout, @@ -260,12 +308,25 @@ public CompletableFuture> re }; readPartitionInfo.forEach((tp, fetchInfo) -> { getPartitionLog(tp, context.getNamespacePrefix()) - .readRecords(fetchInfo, readCommitted, - limitBytes, maxReadEntriesNum, context) - .thenAccept(readResult -> { - result.put(tp, readResult); - complete.run(); + .awaitInitialisation() + .whenComplete((partitionLog, failed) ->{ + if (failed != null) { + result.put(tp, + PartitionLog.ReadRecordsResult + .error(Errors.forException(failed.getCause()), null)); + complete.run(); + return; + } + partitionLog + .readRecords(fetchInfo, readCommitted, + limitBytes, maxReadEntriesNum, context + ) + .thenAccept(readResult -> { + result.put(tp, readResult); + complete.run(); + }); }); + }); return resultFuture; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/TxnMetadata.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/TxnMetadata.java new file mode 100644 index 0000000000..b512d9139b --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/TxnMetadata.java @@ -0,0 +1,32 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.storage; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; + +@Data +@Accessors(fluent = true) +@EqualsAndHashCode +public final class TxnMetadata { + private final long producerId; + private final long firstOffset; + private long lastOffset; + + public TxnMetadata(long producerId, long firstOffset) { + this.producerId = producerId; + this.firstOffset = firstOffset; + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java index f86c2dfbb1..fa386b26f7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java @@ -229,6 +229,11 @@ public static JoinGroupResponse newJoinGroup(Errors errors, .setMetadata(entry.getValue()) ) .collect(Collectors.toList())); + + if (errors == Errors.COORDINATOR_LOAD_IN_PROGRESS) { + data.setThrottleTimeMs(1000); + } + return new JoinGroupResponse(data); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java index f1dcd8c06b..017039a1e0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtils.java @@ -40,7 +40,7 @@ public class MetadataUtils { public static String constructOffsetsTopicBaseName(String tenant, KafkaServiceConfiguration conf) { return tenant + "/" + conf.getKafkaMetadataNamespace() - + "/" + Topic.GROUP_METADATA_TOPIC_NAME; + + "/" + Topic.GROUP_METADATA_TOPIC_NAME; } public static String constructTxnLogTopicBaseName(String tenant, KafkaServiceConfiguration conf) { @@ -48,6 +48,11 @@ public static String constructTxnLogTopicBaseName(String tenant, KafkaServiceCon + "/" + Topic.TRANSACTION_STATE_TOPIC_NAME; } + public static String constructTxProducerStateTopicBaseName(String tenant, KafkaServiceConfiguration conf) { + return tenant + "/" + conf.getKafkaMetadataNamespace() + + "/__transaction_producer_state"; + } + public static String constructTxnProducerIdTopicBaseName(String tenant, KafkaServiceConfiguration conf) { return tenant + "/" + conf.getKafkaMetadataNamespace() + "/__transaction_producerid_generator"; @@ -62,6 +67,10 @@ public static String constructMetadataNamespace(String tenant, KafkaServiceConfi return tenant + "/" + conf.getKafkaMetadataNamespace(); } + public static String constructProducerIdTopicNamespace(String tenant, KafkaServiceConfiguration conf) { + return tenant + "/" + conf.getKafkaMetadataNamespace(); + } + public static String constructUserTopicsNamespace(String tenant, KafkaServiceConfiguration conf) { return tenant + "/" + conf.getKafkaNamespace(); } @@ -73,7 +82,7 @@ public static void createOffsetMetadataIfMissing(String tenant, PulsarAdmin puls KopTopic kopTopic = new KopTopic(constructOffsetsTopicBaseName(tenant, conf), constructMetadataNamespace(tenant, conf)); createKafkaMetadataIfMissing(tenant, conf.getKafkaMetadataNamespace(), pulsarAdmin, clusterData, conf, kopTopic, - conf.getOffsetsTopicNumPartitions(), false); + conf.getOffsetsTopicNumPartitions(), true, false); } public static void createTxnMetadataIfMissing(String tenant, @@ -84,11 +93,17 @@ public static void createTxnMetadataIfMissing(String tenant, KopTopic kopTopic = new KopTopic(constructTxnLogTopicBaseName(tenant, conf), constructMetadataNamespace(tenant, conf)); createKafkaMetadataIfMissing(tenant, conf.getKafkaMetadataNamespace(), pulsarAdmin, clusterData, conf, kopTopic, - conf.getKafkaTxnLogTopicNumPartitions(), false); + conf.getKafkaTxnLogTopicNumPartitions(), true, false); + KopTopic kopTopicProducerState = new KopTopic(constructTxProducerStateTopicBaseName(tenant, conf), + constructMetadataNamespace(tenant, conf)); + createKafkaMetadataIfMissing(tenant, conf.getKafkaMetadataNamespace(), pulsarAdmin, clusterData, conf, + kopTopicProducerState, conf.getKafkaTxnProducerStateTopicNumPartitions(), true, false); if (conf.isKafkaTransactionProducerIdsStoredOnPulsar()) { KopTopic producerIdKopTopic = new KopTopic(constructTxnProducerIdTopicBaseName(tenant, conf), - constructMetadataNamespace(tenant, conf)); - createTopicIfNotExist(pulsarAdmin, producerIdKopTopic.getFullName(), 1); + constructProducerIdTopicNamespace(tenant, conf)); + createKafkaMetadataIfMissing(tenant, conf.getKafkaMetadataNamespace(), + pulsarAdmin, clusterData, conf, producerIdKopTopic, + conf.getKafkaTxnLogTopicNumPartitions(), false, true); } } @@ -113,8 +128,9 @@ private static void createKafkaMetadataIfMissing(String tenant, KafkaServiceConfiguration conf, KopTopic kopTopic, int partitionNum, + boolean partitioned, boolean infiniteRetention) - throws PulsarAdminException { + throws PulsarAdminException { if (!conf.isKafkaManageSystemNamespaces()) { log.info("Skipping initialization of topic {} for tenant {}", kopTopic.getFullName(), tenant); return; @@ -159,7 +175,7 @@ private static void createKafkaMetadataIfMissing(String tenant, namespaceExists = true; // Check if the offsets topic exists and create it if not - createTopicIfNotExist(pulsarAdmin, kopTopic.getFullName(), partitionNum); + createTopicIfNotExist(conf, pulsarAdmin, kopTopic.getFullName(), partitionNum, partitioned); offsetsTopicExists = true; } catch (PulsarAdminException e) { if (e instanceof ConflictException) { @@ -168,7 +184,7 @@ private static void createKafkaMetadataIfMissing(String tenant, } log.error("Failed to successfully initialize Kafka Metadata {}", - kafkaMetadataNamespace, e); + kafkaMetadataNamespace, e); throw e; } finally { log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {}," @@ -311,18 +327,30 @@ public static void createKafkaNamespaceIfMissing(PulsarAdmin pulsarAdmin, } } - private static void createTopicIfNotExist(final PulsarAdmin admin, + private static void createTopicIfNotExist(final KafkaServiceConfiguration conf, + final PulsarAdmin admin, final String topic, - final int numPartitions) throws PulsarAdminException { - try { - admin.topics().createPartitionedTopic(topic, numPartitions); - } catch (PulsarAdminException.ConflictException e) { - log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage()); - } - try { - // Ensure all partitions are created - admin.topics().createMissedPartitions(topic); - } catch (PulsarAdminException ignored) { + final int numPartitions, + final boolean partitioned) throws PulsarAdminException { + if (partitioned) { + log.info("Creating partitioned topic {} (with {} partitions) if it does not exist", topic, numPartitions); + try { + admin.topics().createPartitionedTopic(topic, numPartitions); + } catch (PulsarAdminException.ConflictException e) { + log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage()); + } + try { + // Ensure all partitions are created + admin.topics().createMissedPartitions(topic); + } catch (PulsarAdminException ignored) { + } + } else { + log.info("Creating non-partitioned topic {}-{} if it does not exist", topic, numPartitions); + try { + admin.topics().createNonPartitionedTopic(topic); + } catch (PulsarAdminException.ConflictException e) { + log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage()); + } } } @@ -334,6 +362,6 @@ public static void createSchemaRegistryMetadataIfMissing(String tenant, KopTopic kopTopic = new KopTopic(constructSchemaRegistryTopicName(tenant, conf), constructMetadataNamespace(tenant, conf)); createKafkaMetadataIfMissing(tenant, conf.getKopSchemaRegistryNamespace(), pulsarAdmin, clusterData, - conf, kopTopic, 1, true); + conf, kopTopic, 1, false, true); } } diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/PendingTopicFuturesTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/PendingTopicFuturesTest.java index ab5e4bfc45..007452a9f2 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/PendingTopicFuturesTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/PendingTopicFuturesTest.java @@ -13,10 +13,12 @@ */ package io.streamnative.pulsar.handlers.kop; +import static org.mockito.Mockito.mock; + +import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; @@ -25,10 +27,10 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.testng.Assert; import org.testng.annotations.Test; + /** * Test for PendingTopicFutures. */ @@ -61,7 +63,7 @@ private static List range(int start, int end) { @Test(timeOut = 10000) void testNormalComplete() throws ExecutionException, InterruptedException { final PendingTopicFutures pendingTopicFutures = new PendingTopicFutures(null); - final CompletableFuture> topicFuture = new CompletableFuture<>(); + final CompletableFuture topicFuture = new CompletableFuture<>(); final List completedIndexes = new ArrayList<>(); final List changesOfPendingCount = new ArrayList<>(); int randomNum = ThreadLocalRandom.current().nextInt(0, 9); @@ -72,7 +74,7 @@ void testNormalComplete() throws ExecutionException, InterruptedException { topicFuture, ignored -> completedIndexes.add(index), (ignore) -> {}); changesOfPendingCount.add(pendingTopicFutures.size()); if (randomNum == i) { - topicFuture.complete(Optional.empty()); + topicFuture.complete(mock(PartitionLog.class)); } } @@ -94,7 +96,7 @@ void testNormalComplete() throws ExecutionException, InterruptedException { @Test(timeOut = 10000) void testExceptionalComplete() throws ExecutionException, InterruptedException { final PendingTopicFutures pendingTopicFutures = new PendingTopicFutures(null); - final CompletableFuture> topicFuture = new CompletableFuture<>(); + final CompletableFuture topicFuture = new CompletableFuture<>(); final List exceptionMessages = new ArrayList<>(); final List changesOfPendingCount = new ArrayList<>(); @@ -128,7 +130,7 @@ void testExceptionalComplete() throws ExecutionException, InterruptedException { @Test(timeOut = 10000) void testParallelAccess() throws ExecutionException, InterruptedException { final PendingTopicFutures pendingTopicFutures = new PendingTopicFutures(null); - final CompletableFuture> topicFuture = new CompletableFuture<>(); + final CompletableFuture topicFuture = new CompletableFuture<>(); final List completedIndexes = new CopyOnWriteArrayList<>(); int randomNum = ThreadLocalRandom.current().nextInt(0, 9); @@ -142,7 +144,7 @@ void testParallelAccess() throws ExecutionException, InterruptedException { topicFuture, ignored -> completedIndexes.add(index), (ignore) -> { }); if (randomNum == index) { - topicFuture.complete(Optional.empty()); + topicFuture.complete(mock(PartitionLog.class)); } })); } diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java index 3126872d0b..b9c05715ec 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java @@ -13,13 +13,17 @@ */ package io.streamnative.pulsar.handlers.kop.format; +import static org.mockito.Mockito.mock; + import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; +import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; -import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManager; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Optional; import java.util.Random; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; @@ -29,7 +33,6 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; - /** * The performance test for {@link EntryFormatter#encode(EncodeRequest)}. */ @@ -48,7 +51,9 @@ public class EncodePerformanceTest { new TopicPartition("test", 1), "test", null, - new ProducerStateManager("test")); + mock(KafkaTopicLookupService.class), + new MemoryProducerStateManagerSnapshotBuffer(), + mock(OrderedExecutor.class)); public static void main(String[] args) { pulsarServiceConfiguration.setEntryFormat("pulsar"); diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java index 3b5511dc2b..089b6e2baa 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterTest.java @@ -20,8 +20,9 @@ import com.google.common.collect.ImmutableMap; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; +import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; -import io.streamnative.pulsar.handlers.kop.storage.ProducerStateManager; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -29,6 +30,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.Entry; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -80,7 +82,9 @@ public class EntryFormatterTest { new TopicPartition("test", 1), "test", null, - new ProducerStateManager("test")); + mock(KafkaTopicLookupService.class), + new MemoryProducerStateManagerSnapshotBuffer(), + mock(OrderedExecutor.class)); private void init() { pulsarServiceConfiguration.setEntryFormat("pulsar"); diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java index 442f94034c..1ed3f7607c 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/utils/MetadataUtilsTest.java @@ -66,6 +66,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { .constructOffsetsTopicBaseName(conf.getKafkaMetadataTenant(), conf), namespacePrefix); final KopTopic txnTopic = new KopTopic(MetadataUtils .constructTxnLogTopicBaseName(conf.getKafkaMetadataTenant(), conf), namespacePrefix); + final KopTopic txnProducerStateTopic = new KopTopic(MetadataUtils + .constructTxProducerStateTopicBaseName(conf.getKafkaMetadataTenant(), conf), namespacePrefix); List emptyList = Lists.newArrayList(); @@ -83,6 +85,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { Topics mockTopics = mock(Topics.class); doReturn(offsetTopicMetadata).when(mockTopics).getPartitionedTopicMetadata(eq(offsetsTopic.getFullName())); doReturn(offsetTopicMetadata).when(mockTopics).getPartitionedTopicMetadata(eq(txnTopic.getFullName())); + doReturn(offsetTopicMetadata).when(mockTopics) + .getPartitionedTopicMetadata(eq(txnProducerStateTopic.getFullName())); PulsarAdmin mockPulsarAdmin = mock(PulsarAdmin.class); @@ -125,6 +129,8 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { eq(offsetsTopic.getFullName()), eq(conf.getOffsetsTopicNumPartitions())); verify(mockTopics, times(1)).createPartitionedTopic( eq(txnTopic.getFullName()), eq(conf.getKafkaTxnLogTopicNumPartitions())); + verify(mockTopics, times(1)).createPartitionedTopic( + eq(txnProducerStateTopic.getFullName()), eq(conf.getKafkaTxnProducerStateTopicNumPartitions())); // check user topics namespace doesn't set the policy verify(mockNamespaces, times(1)).createNamespace(eq(conf.getKafkaTenant() + "/" + conf.getKafkaNamespace()), any(Set.class)); @@ -172,11 +178,16 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { for (int i = 0; i < conf.getKafkaTxnLogTopicNumPartitions() - 2; i++) { incompletePartitionList.add(txnTopic.getPartitionName(i)); } + for (int i = 0; i < conf.getKafkaTxnProducerStateTopicNumPartitions() - 2; i++) { + incompletePartitionList.add(txnProducerStateTopic.getPartitionName(i)); + } doReturn(new PartitionedTopicMetadata(8)).when(mockTopics) .getPartitionedTopicMetadata(eq(offsetsTopic.getFullName())); doReturn(new PartitionedTopicMetadata(8)).when(mockTopics) .getPartitionedTopicMetadata(eq(txnTopic.getFullName())); + doReturn(new PartitionedTopicMetadata(8)).when(mockTopics) + .getPartitionedTopicMetadata(eq(txnProducerStateTopic.getFullName())); doReturn(incompletePartitionList).when(mockTopics).getList(eq(conf.getKafkaMetadataTenant() + "/" + conf.getKafkaMetadataNamespace())); @@ -184,10 +195,11 @@ public void testCreateKafkaMetadataIfMissing() throws Exception { MetadataUtils.createTxnMetadataIfMissing(conf.getKafkaMetadataTenant(), mockPulsarAdmin, clusterData, conf); verify(mockTenants, times(1)).updateTenant(eq(conf.getKafkaMetadataTenant()), any(TenantInfo.class)); - verify(mockNamespaces, times(2)).setNamespaceReplicationClusters(eq(conf.getKafkaMetadataTenant() + verify(mockNamespaces, times(3)).setNamespaceReplicationClusters(eq(conf.getKafkaMetadataTenant() + "/" + conf.getKafkaMetadataNamespace()), any(Set.class)); verify(mockTopics, times(1)).createMissedPartitions(contains(offsetsTopic.getOriginalName())); verify(mockTopics, times(1)).createMissedPartitions(contains(txnTopic.getOriginalName())); + verify(mockTopics, times(1)).createMissedPartitions(contains(txnProducerStateTopic.getOriginalName())); } @Test(timeOut = 30000) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTestBase.java index 76456410a3..c8c97c520e 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTestBase.java @@ -67,6 +67,13 @@ public static Object[][] batchSizeList() { @BeforeClass @Override protected void setup() throws Exception { + + this.conf.setDefaultNumberOfNamespaceBundles(4); + this.conf.setOffsetsTopicNumPartitions(50); + this.conf.setKafkaTxnLogTopicNumPartitions(50); + this.conf.setKafkaTransactionCoordinatorEnabled(true); + this.conf.setBrokerDeduplicationEnabled(true); + super.internalSetup(); log.info("success internal setup"); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index 1a4e524c7d..be491e29e7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -16,7 +16,10 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; @@ -37,6 +40,8 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; import lombok.Getter; @@ -66,17 +71,20 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor; import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.data.ACL; +import org.awaitility.Awaitility; import org.testng.Assert; /** @@ -826,4 +834,56 @@ public TransactionCoordinator getTransactionCoordinator(String tenant) { } }); } + + + /** + * Execute the task that trims consumed ledgers. + * @throws Exception + */ + public void trimConsumedLedgers(String topic) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + mapper.enable(SerializationFeature.INDENT_OUTPUT); + log.info("trimConsumedLedgers {}", topic); + log.info("Stats {}", + mapper.writeValueAsString(admin + .topics() + .getInternalStats(topic))); + TopicName topicName = TopicName.get(topic); + String namespace = topicName.getNamespace(); + + RetentionPolicies oldRetentionPolicies = admin.namespaces().getRetention(namespace); + Boolean deduplicationStatus = admin.namespaces().getDeduplicationStatus(namespace); + try { + admin.namespaces().setRetention(namespace, new RetentionPolicies(0, 0)); + admin.namespaces().setDeduplicationStatus(namespace, false); + + + KafkaTopicLookupService lookupService = new KafkaTopicLookupService(pulsar.getBrokerService()); + PersistentTopic topicHandle = lookupService.getTopic(topic, "test").get().get(); + + Awaitility.await().untilAsserted(() -> { + log.debug("Subscriptions {}", topicHandle.getSubscriptions()); + assertTrue(topicHandle.getSubscriptions().isEmpty()); + }); + + log.info("Stats {}", + mapper.writeValueAsString(admin + .topics() + .getInternalStats(topic))); + + CompletableFuture future = new CompletableFuture<>(); + topicHandle.getManagedLedger() + .getConfig().setRetentionTime(1, TimeUnit.SECONDS); + Thread.sleep(2000); + topicHandle.getManagedLedger().trimConsumedLedgersInBackground(future); + future.get(10, TimeUnit.SECONDS); + } finally { + admin.namespaces().setRetention(namespace, oldRetentionPolicies); + if (deduplicationStatus != null) { + admin.namespaces().setDeduplicationStatus(namespace, deduplicationStatus); + } else { + admin.namespaces().removeDeduplicationStatus(namespace); + } + } + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java index 72f25d8a66..d231be51b8 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java @@ -314,7 +314,7 @@ public void testUpdateGroupId() { }); } - @Test(timeOut = 20000, expectedExceptions = KeeperException.NoNodeException.class) + @Test(timeOut = 30000, expectedExceptions = KeeperException.NoNodeException.class) public void testFindTransactionCoordinatorShouldNotStoreGroupId() throws Exception { String kafkaServer = "localhost:" + getKafkaBrokerPort(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java index b7e1c55c08..b045307010 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java @@ -19,19 +19,25 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; +import static org.testng.Assert.fail; import com.google.common.collect.ImmutableMap; +import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState; import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager; +import io.streamnative.pulsar.handlers.kop.scala.Either; +import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,6 +45,8 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -49,10 +57,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -70,10 +80,18 @@ public class TransactionTest extends KopProtocolHandlerTestBase { @Override protected void setup() throws Exception { this.conf.setDefaultNumberOfNamespaceBundles(4); - this.conf.setOffsetsTopicNumPartitions(50); - this.conf.setKafkaTxnLogTopicNumPartitions(50); + this.conf.setOffsetsTopicNumPartitions(10); + this.conf.setKafkaTxnLogTopicNumPartitions(10); + this.conf.setKafkaTxnProducerStateTopicNumPartitions(10); this.conf.setKafkaTransactionCoordinatorEnabled(true); this.conf.setBrokerDeduplicationEnabled(true); + + // enable tx expiration, but producers have + // a very long TRANSACTION_TIMEOUT_CONFIG + // so they won't expire by default + this.conf.setKafkaTransactionalIdExpirationMs(5000); + this.conf.setKafkaTransactionalIdExpirationEnable(true); + this.conf.setTopicLevelPoliciesEnabled(false); super.internalSetup(); log.info("success internal setup"); } @@ -141,7 +159,7 @@ public void testMultiCommits() throws Exception { }); } - public void basicProduceAndConsumeTest(String topicName, + private void basicProduceAndConsumeTest(String topicName, String transactionalId, String isolation, boolean isBatch) throws Exception { @@ -183,18 +201,44 @@ public void basicProduceAndConsumeTest(String topicName, } } - consumeTxnMessage(topicName, totalTxnCount * messageCountPerTxn, lastMessage, isolation); + final int expected; + switch (isolation) { + case "read_committed": + expected = totalTxnCount * messageCountPerTxn / 2; + break; + case "read_uncommitted": + expected = totalTxnCount * messageCountPerTxn; + break; + default: + expected = -1; + fail(); + } + consumeTxnMessage(topicName, expected, lastMessage, isolation); + } + + private List consumeTxnMessage(String topicName, + int totalMessageCount, + String lastMessage, + String isolation) throws InterruptedException { + return consumeTxnMessage(topicName, + totalMessageCount, + lastMessage, + isolation, + "test_consumer"); } - private void consumeTxnMessage(String topicName, - int totalMessageCount, - String lastMessage, - String isolation) { + private List consumeTxnMessage(String topicName, + int totalMessageCount, + String lastMessage, + String isolation, + String group) throws InterruptedException { @Cleanup - KafkaConsumer consumer = buildTransactionConsumer("test_consumer", isolation); + KafkaConsumer consumer = buildTransactionConsumer(group, isolation); consumer.subscribe(Collections.singleton(topicName)); - log.info("the last message is: {}", lastMessage); + List messages = new ArrayList<>(); + + log.info("waiting for message {} in topic {}", lastMessage, topicName); AtomicInteger receiveCount = new AtomicInteger(0); while (true) { ConsumerRecords consumerRecords = @@ -202,14 +246,16 @@ private void consumeTxnMessage(String topicName, boolean readFinish = false; for (ConsumerRecord record : consumerRecords) { - if (isolation.equals("read_committed")) { - assertFalse(record.value().contains("abort msg txnIndex")); - } log.info("Fetch for receive record offset: {}, key: {}, value: {}", record.offset(), record.key(), record.value()); + if (isolation.equals("read_committed")) { + assertFalse(record.value().contains("abort"), "in read_committed isolation " + + "we read a message that should have been aborted: " + record.value()); + } receiveCount.incrementAndGet(); + messages.add(record.value()); if (lastMessage.equalsIgnoreCase(record.value())) { - log.info("receive the last message"); + log.info("received the last message"); readFinish = true; } } @@ -219,14 +265,12 @@ private void consumeTxnMessage(String topicName, break; } } - log.info("Fetch for receive message finish. isolation: {}, receive count: {}", isolation, receiveCount.get()); - - if (isolation.equals("read_committed")) { - Assert.assertEquals(receiveCount.get(), totalMessageCount / 2); - } else { - Assert.assertEquals(receiveCount.get(), totalMessageCount); - } + log.info("Fetch for receive message finish. isolation: {}, receive count: {} messages {}", + isolation, receiveCount.get(), messages); + Assert.assertEquals(receiveCount.get(), totalMessageCount, "messages: " + messages); log.info("Fetch for finish consume messages. isolation: {}", isolation); + + return messages; } @Test(timeOut = 1000 * 15) @@ -289,7 +333,7 @@ public void txnOffsetTest(String topic, int messageCnt, boolean isCommit) throws if (records.isEmpty()) { msgCnt.decrementAndGet(); } else { - Assert.fail("The transaction was committed, the consumer shouldn't receive any more messages."); + fail("The transaction was committed, the consumer shouldn't receive any more messages."); } } else { for (ConsumerRecord record : records) { @@ -301,6 +345,553 @@ public void txnOffsetTest(String topic, int messageCnt, boolean isCommit) throws } } + @DataProvider(name = "basicRecoveryTestAfterTopicUnloadNumTransactions") + protected static Object[][] basicRecoveryTestAfterTopicUnloadNumTransactions() { + // isBatch + return new Object[][]{ + {0}, + {3}, + {5} + }; + } + + + @Test(timeOut = 1000 * 30, dataProvider = "basicRecoveryTestAfterTopicUnloadNumTransactions") + public void basicRecoveryTestAfterTopicUnload(int numTransactionsBetweenSnapshots) throws Exception { + + String topicName = "basicRecoveryTestAfterTopicUnload_" + numTransactionsBetweenSnapshots; + String transactionalId = "myProducer_" + UUID.randomUUID(); + String isolation = "read_committed"; + boolean isBatch = false; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + + producer.initTransactions(); + + int totalTxnCount = 10; + int messageCountPerTxn = 20; + + String lastMessage = ""; + for (int txnIndex = 0; txnIndex < totalTxnCount; txnIndex++) { + producer.beginTransaction(); + + String contentBase; + if (txnIndex % 2 != 0) { + contentBase = "commit msg txnIndex %s messageIndex %s"; + } else { + contentBase = "abort msg txnIndex %s messageIndex %s"; + } + + for (int messageIndex = 0; messageIndex < messageCountPerTxn; messageIndex++) { + String msgContent = String.format(contentBase, txnIndex, messageIndex); + log.info("send txn message {}", msgContent); + lastMessage = msgContent; + if (isBatch) { + producer.send(new ProducerRecord<>(topicName, messageIndex, msgContent)); + } else { + producer.send(new ProducerRecord<>(topicName, messageIndex, msgContent)).get(); + } + } + producer.flush(); + + if (numTransactionsBetweenSnapshots > 0 + && (txnIndex % numTransactionsBetweenSnapshots) == 0) { + // force take snapshot + takeSnapshot(topicName); + } + + if (txnIndex % 2 != 0) { + producer.commitTransaction(); + } else { + producer.abortTransaction(); + } + } + + waitForTransactionsToBeInStableState(transactionalId); + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + + final int expected = totalTxnCount * messageCountPerTxn / 2; + consumeTxnMessage(topicName, expected, lastMessage, isolation); + } + + + private TransactionState dumpTransactionState(String transactionalId) { + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + TransactionCoordinator transactionCoordinator = + protocolHandler.getTransactionCoordinator(tenant); + Either> transactionState = + transactionCoordinator.getTxnManager().getTransactionState(transactionalId); + log.debug("transactionalId {} status {}", transactionalId, transactionState); + assertFalse(transactionState.isLeft(), "transaction " + + transactionalId + " error " + transactionState.getLeft()); + return transactionState.getRight().get().getTransactionMetadata().getState(); + } + + private void waitForTransactionsToBeInStableState(String transactionalId) { + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + TransactionCoordinator transactionCoordinator = + protocolHandler.getTransactionCoordinator(tenant); + Awaitility.await().untilAsserted(() -> { + Either> transactionState = + transactionCoordinator.getTxnManager().getTransactionState(transactionalId); + log.debug("transactionalId {} status {}", transactionalId, transactionState); + assertFalse(transactionState.isLeft()); + TransactionState state = transactionState.getRight() + .get().getTransactionMetadata().getState(); + boolean isStable; + switch (state) { + case COMPLETE_COMMIT: + case COMPLETE_ABORT: + case EMPTY: + isStable = true; + break; + default: + isStable = false; + break; + } + assertTrue(isStable, "Transaction " + transactionalId + + " is not stable to reach a stable state, is it " + state); + }); + } + + private void takeSnapshot(String topicName) throws Exception { + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + + int numPartitions = + admin.topics().getPartitionedTopicMetadata(topicName).partitions; + for (int i = 0; i < numPartitions; i++) { + PartitionLog partitionLog = protocolHandler + .getReplicaManager() + .getPartitionLog(new TopicPartition(topicName, i), tenant + "/" + namespace); + + // we can only take the snapshot on the only thread that is allowed to process mutations + // on the state + partitionLog + .takeProducerSnapshot() + .get(); + + } + } + + @Test(timeOut = 1000 * 30, dataProvider = "basicRecoveryTestAfterTopicUnloadNumTransactions") + public void basicTestWithTopicUnload(int numTransactionsBetweenUnloads) throws Exception { + + String topicName = "basicTestWithTopicUnload_" + numTransactionsBetweenUnloads; + String transactionalId = "myProducer_" + UUID.randomUUID(); + String isolation = "read_committed"; + boolean isBatch = false; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + + producer.initTransactions(); + + int totalTxnCount = 10; + int messageCountPerTxn = 20; + + String lastMessage = ""; + for (int txnIndex = 0; txnIndex < totalTxnCount; txnIndex++) { + producer.beginTransaction(); + + String contentBase; + if (txnIndex % 2 != 0) { + contentBase = "commit msg txnIndex %s messageIndex %s"; + } else { + contentBase = "abort msg txnIndex %s messageIndex %s"; + } + + for (int messageIndex = 0; messageIndex < messageCountPerTxn; messageIndex++) { + String msgContent = String.format(contentBase, txnIndex, messageIndex); + log.info("send txn message {}", msgContent); + lastMessage = msgContent; + if (isBatch) { + producer.send(new ProducerRecord<>(topicName, messageIndex, msgContent)); + } else { + producer.send(new ProducerRecord<>(topicName, messageIndex, msgContent)).get(); + } + } + producer.flush(); + + if (numTransactionsBetweenUnloads > 0 + && (txnIndex % numTransactionsBetweenUnloads) == 0) { + + // dump the state before un load, this helps troubleshooting + // problems in case of flaky test + TransactionState transactionState = dumpTransactionState(transactionalId); + assertEquals(TransactionState.ONGOING, transactionState); + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + } + + if (txnIndex % 2 != 0) { + producer.commitTransaction(); + } else { + producer.abortTransaction(); + } + } + + + final int expected = totalTxnCount * messageCountPerTxn / 2; + consumeTxnMessage(topicName, expected, lastMessage, isolation); + } + + @DataProvider(name = "takeSnapshotBeforeRecovery") + protected static Object[][] takeSnapshotBeforeRecovery() { + // isBatch + return new Object[][]{ + {true}, + {false} + }; + } + + @Test(timeOut = 1000 * 20, dataProvider = "takeSnapshotBeforeRecovery") + public void basicRecoveryAbortedTransaction(boolean takeSnapshotBeforeRecovery) throws Exception { + + String topicName = "basicRecoveryAbortedTransaction_" + takeSnapshotBeforeRecovery; + String transactionalId = "myProducer" + UUID.randomUUID(); + String isolation = "read_committed"; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + + producer.initTransactions(); + + + producer.beginTransaction(); + + String firstMessage = "aborted msg 1"; + + producer.send(new ProducerRecord<>(topicName, 0, firstMessage)).get(); + producer.flush(); + // force take snapshot + takeSnapshot(topicName); + + // recovery will re-process the topic from this point onwards + String secondMessage = "aborted msg 2"; + producer.send(new ProducerRecord<>(topicName, 0, secondMessage)).get(); + + producer.abortTransaction(); + + producer.beginTransaction(); + String lastMessage = "committed mgs"; + producer.send(new ProducerRecord<>(topicName, 0, "foo")).get(); + producer.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer.commitTransaction(); + + if (takeSnapshotBeforeRecovery) { + takeSnapshot(topicName); + } + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + + consumeTxnMessage(topicName, 2, lastMessage, isolation); + } + + @Test(timeOut = 1000 * 30, dataProvider = "takeSnapshotBeforeRecovery") + public void basicRecoveryAbortedTransactionDueToProducerFenced(boolean takeSnapshotBeforeRecovery) + throws Exception { + + String topicName = "basicRecoveryAbortedTransactionDueToProducerFenced_" + takeSnapshotBeforeRecovery; + String transactionalId = "myProducer" + UUID.randomUUID(); + String isolation = "read_committed"; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + + producer.initTransactions(); + + producer.beginTransaction(); + + String firstMessage = "aborted msg 1"; + + producer.send(new ProducerRecord<>(topicName, 0, firstMessage)).get(); + producer.flush(); + // force take snapshot + takeSnapshot(topicName); + + // recovery will re-process the topic from this point onwards + String secondMessage = "aborted msg 2"; + producer.send(new ProducerRecord<>(topicName, 0, secondMessage)).get(); + + + @Cleanup + KafkaProducer producer2 = buildTransactionProducer(transactionalId); + producer2.initTransactions(); + + // the transaction is automatically aborted, because the first instance of the + // producer has been fenced + expectThrows(ProducerFencedException.class, () -> { + producer.commitTransaction(); + }); + + + producer2.beginTransaction(); + String lastMessage = "committed mgs"; + producer2.send(new ProducerRecord<>(topicName, 0, "foo")).get(); + producer2.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer2.commitTransaction(); + + if (takeSnapshotBeforeRecovery) { + // force take snapshot + takeSnapshot(topicName); + } + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + + consumeTxnMessage(topicName, 2, lastMessage, isolation); + } + + + @Test(timeOut = 1000 * 30, dataProvider = "takeSnapshotBeforeRecovery") + public void basicRecoveryAbortedTransactionDueToProducerTimedOut(boolean takeSnapshotBeforeRecovery) + throws Exception { + + String topicName = "basicRecoveryAbortedTransactionDueToProducerTimedOut_" + takeSnapshotBeforeRecovery; + String transactionalId = "myProducer" + UUID.randomUUID(); + String isolation = "read_committed"; + + String namespace = TopicName.get(topicName).getNamespace(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId, 1000); + + producer.initTransactions(); + + producer.beginTransaction(); + + String firstMessage = "aborted msg 1"; + + producer.send(new ProducerRecord<>(topicName, 0, firstMessage)).get(); + producer.flush(); + // force take snapshot + takeSnapshot(topicName); + + // recovery will re-process the topic from this point onwards + String secondMessage = "aborted msg 2"; + producer.send(new ProducerRecord<>(topicName, 0, secondMessage)).get(); + + Thread.sleep(conf.getKafkaTransactionalIdExpirationMs() + 5000); + + // the transaction is automatically aborted, because of producer timeout + expectThrows(ProducerFencedException.class, () -> { + producer.commitTransaction(); + }); + + @Cleanup + KafkaProducer producer2 = buildTransactionProducer(transactionalId, 1000); + producer2.initTransactions(); + producer2.beginTransaction(); + String lastMessage = "committed mgs"; + producer2.send(new ProducerRecord<>(topicName, 0, "foo")).get(); + producer2.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer2.commitTransaction(); + + if (takeSnapshotBeforeRecovery) { + // force take snapshot + takeSnapshot(topicName); + } + + // unload the namespace, this will force a recovery + pulsar.getAdminClient().namespaces().unload(namespace); + + consumeTxnMessage(topicName, 2, lastMessage, isolation); + } + + /** + * TODO: Disable for now, we need introduce UUID for topic. + */ + @Test(timeOut = 1000 * 20, enabled = false) + public void basicRecoveryAfterDeleteCreateTopic() + throws Exception { + + String topicName = "basicRecoveryAfterDeleteCreateTopic"; + String transactionalId = "myProducer-deleteCreate"; + String isolation = "read_committed"; + + TopicName fullTopicName = TopicName.get(topicName); + + String namespace = fullTopicName.getNamespace(); + + // use Kafka API, this way we assign a topic UUID + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties()); + kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 4, (short) 1))); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId, 1000); + + producer.initTransactions(); + + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + + producer.beginTransaction(); + + producer.send(new ProducerRecord<>(topicName, 0, "deleted msg 1")).get(); + producer.send(new ProducerRecord<>(topicName, 0, "deleted msg 1")).get(); + producer.send(new ProducerRecord<>(topicName, 0, "deleted msg 1")).get(); + producer.send(new ProducerRecord<>(topicName, 0, "deleted msg 1")).get(); + producer.send(new ProducerRecord<>(topicName, 0, "deleted msg 1")).get(); + producer.send(new ProducerRecord<>(topicName, 0, "deleted msg 1")).get(); + producer.send(new ProducerRecord<>(topicName, 0, "deleted msg 1")).get(); + producer.send(new ProducerRecord<>(topicName, 0, "deleted msg 1")).get(); + producer.send(new ProducerRecord<>(topicName, 0, "deleted msg 1")).get(); + producer.flush(); + + // force take snapshot + takeSnapshot(topicName); + + String secondMessage = "deleted msg 2"; + producer.send(new ProducerRecord<>(topicName, 0, secondMessage)).get(); + producer.flush(); + + // verify that a non-transactional consumer can read the messages + consumeTxnMessage(topicName, 10, secondMessage, "read_uncommitted", + "uncommitted_reader1"); + + for (int i = 0; i < 10; i++) { + log.info("************DELETE"); + } + // delete/create + pulsar.getAdminClient().namespaces().unload(namespace); + admin.topics().deletePartitionedTopic(topicName, true); + + // unfortunately the PH is not notified of the deletion + // so we unload the namespace in order to clear local references/caches + pulsar.getAdminClient().namespaces().unload(namespace); + + protocolHandler.getReplicaManager().removePartitionLog(fullTopicName.getPartition(0).toString()); + protocolHandler.getReplicaManager().removePartitionLog(fullTopicName.getPartition(1).toString()); + protocolHandler.getReplicaManager().removePartitionLog(fullTopicName.getPartition(2).toString()); + protocolHandler.getReplicaManager().removePartitionLog(fullTopicName.getPartition(3).toString()); + + // create the topic again, using the kafka APIs + kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 4, (short) 1))); + + // the snapshot now points to a offset that doesn't make sense in the new topic + // because the new topic is empty + + @Cleanup + KafkaProducer producer2 = buildTransactionProducer(transactionalId, 1000); + producer2.initTransactions(); + producer2.beginTransaction(); + String lastMessage = "committed mgs"; + + // this "send" triggers recovery of the ProducerStateManager on the topic + producer2.send(new ProducerRecord<>(topicName, 0, "good-message")).get(); + producer2.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); + producer2.commitTransaction(); + + consumeTxnMessage(topicName, 2, lastMessage, isolation, "readcommitter-reader-1"); + } + + @Test(timeOut = 60000) + public void testRecoverFromInvalidSnapshotAfterTrim() throws Exception { + + String topicName = "testRecoverFromInvalidSnapshotAfterTrim"; + String transactionalId = "myProducer_" + UUID.randomUUID(); + String isolation = "read_committed"; + + TopicName fullTopicName = TopicName.get(topicName); + + pulsar.getAdminClient().topics().createPartitionedTopic(topicName, 1); + + String namespace = fullTopicName.getNamespace(); + TopicPartition topicPartition = new TopicPartition(topicName, 0); + String namespacePrefix = namespace; + + KafkaProducer producer = buildTransactionProducer(transactionalId); + + producer.initTransactions(); + + producer.beginTransaction(); + producer.send(new ProducerRecord<>(topicName, 0, "aborted 1")).get(); // OFFSET 0 + producer.flush(); + producer.abortTransaction(); // OFFSET 1 + + producer.beginTransaction(); + String lastMessage = "msg1b"; + producer.send(new ProducerRecord<>(topicName, 0, "msg1")).get(); // OFFSET 2 + producer.send(new ProducerRecord<>(topicName, 0, lastMessage)).get(); // OFFSET 3 + producer.commitTransaction(); // OFFSET 4 + + assertEquals( + consumeTxnMessage(topicName, 2, lastMessage, isolation, "first_group"), + List.of("msg1", "msg1b")); + + waitForTransactionsToBeInStableState(transactionalId); + + // unload and reload in order to have at least 2 ledgers in the + // topic, this way we can drop the head ledger + admin.namespaces().unload(namespace); + + producer.beginTransaction(); + producer.send(new ProducerRecord<>(topicName, 0, "msg2")).get(); // OFFSET 5 + producer.send(new ProducerRecord<>(topicName, 0, "msg3")).get(); // OFFSET 6 + producer.commitTransaction(); // OFFSET 7 + + // take a snapshot now, it refers to the offset of the last written record + takeSnapshot(topicName); + + waitForTransactionsToBeInStableState(transactionalId); + + admin.namespaces().unload(namespace); + admin.lookups().lookupTopic(fullTopicName.getPartition(0).toString()); + + KafkaProtocolHandler protocolHandler = (KafkaProtocolHandler) + pulsar.getProtocolHandlers().protocol("kafka"); + PartitionLog partitionLog = protocolHandler + .getReplicaManager() + .getPartitionLog(topicPartition, namespacePrefix); + partitionLog.awaitInitialisation().get(); + assertEquals(0L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); + + // all the messages up to here will be trimmed + + trimConsumedLedgers(fullTopicName.getPartition(0).toString()); + + admin.namespaces().unload(namespace); + admin.lookups().lookupTopic(fullTopicName.getPartition(0).toString()); + + // continue writing, this triggers recovery + producer.beginTransaction(); + producer.send(new ProducerRecord<>(topicName, 0, "msg4")).get(); // OFFSET 8 + producer.send(new ProducerRecord<>(topicName, 0, "msg5")).get(); // OFFSET 9 + producer.commitTransaction(); // OFFSET 10 + producer.close(); + + partitionLog = protocolHandler + .getReplicaManager() + .getPartitionLog(topicPartition, namespacePrefix); + partitionLog.awaitInitialisation().get(); + assertEquals(8L, partitionLog.fetchOldestAvailableIndexFromTopic().get().longValue()); + + // use a new consumer group, it will read from the beginning of the topic + assertEquals( + consumeTxnMessage(topicName, 2, "msg5", isolation, "second_group"), + List.of("msg4", "msg5")); + + } + + + private List prepareData(String sourceTopicName, String messageContent, int messageCount) throws ExecutionException, InterruptedException { @@ -333,7 +924,7 @@ private void waitForTxnMarkerWriteComplete(Map } private KafkaProducer buildTransactionProducer(String transactionalId) { + return buildTransactionProducer(transactionalId, -1); + } + + private KafkaProducer buildTransactionProducer(String transactionalId, int txTimeout) { Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaServerAdder()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000 * 10); producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + if (txTimeout > 0) { + producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout); + } else { + // very long time-out + producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600 * 1000); + } addCustomizeProps(producerProps); return new KafkaProducer<>(producerProps); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionWithOAuthBearerAuthTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionWithOAuthBearerAuthTest.java index bf46f34d36..f320f98718 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionWithOAuthBearerAuthTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionWithOAuthBearerAuthTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; @Slf4j public class TransactionWithOAuthBearerAuthTest extends TransactionTest { @@ -43,6 +44,11 @@ protected void setup() throws Exception { adminCredentialPath = HydraOAuthUtils.createOAuthClient(ADMIN_USER, ADMIN_SECRET); super.resetConfig(); + conf.setDefaultNumberOfNamespaceBundles(4); + conf.setKafkaMetadataNamespace("__kafka"); + conf.setOffsetsTopicNumPartitions(10); + conf.setKafkaTxnLogTopicNumPartitions(10); + conf.setKafkaTxnProducerStateTopicNumPartitions(10); conf.setKafkaTransactionCoordinatorEnabled(true); conf.setBrokerDeduplicationEnabled(true); conf.setAuthenticationEnabled(true); @@ -99,4 +105,10 @@ protected void addCustomizeProps(Properties properties) { )); } + @Test(enabled = false) + @Override + public void basicRecoveryAbortedTransactionDueToProducerTimedOut(boolean takeSnapshotBeforeRecovery) { + // this test is disabled in this suite because the token expires + } + } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinatorTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinatorTest.java index fe6dd36b57..a9652d7330 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinatorTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinatorTest.java @@ -33,6 +33,7 @@ import io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler; import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; import io.streamnative.pulsar.handlers.kop.scala.Either; +import io.streamnative.pulsar.handlers.kop.storage.MemoryProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTime; import java.util.Collections; @@ -135,7 +136,9 @@ protected void initializeState() { transactionManager, time, METADATA_NAMESPACE_PREFIX, - NAMESPACE_PREFIX); + NAMESPACE_PREFIX, + (config) -> new MemoryProducerStateManagerSnapshotBuffer() + ); result = null; error = Errors.NONE; capturedTxn = ArgumentCaptor.forClass(TransactionMetadata.class); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogTest.java index 65d3de08f6..cce3aac4cb 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLogTest.java @@ -13,10 +13,14 @@ */ package io.streamnative.pulsar.handlers.kop.storage; +import static org.mockito.Mockito.mock; + import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; +import io.streamnative.pulsar.handlers.kop.KafkaTopicLookupService; import java.nio.ByteBuffer; import java.util.Arrays; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.record.CompressionType; @@ -46,7 +50,9 @@ public class PartitionLogTest { new TopicPartition("test", 1), "test", null, - new ProducerStateManager("test")); + mock(KafkaTopicLookupService.class), + new MemoryProducerStateManagerSnapshotBuffer(), + mock(OrderedExecutor.class)); @DataProvider(name = "compressionTypes") Object[] allCompressionTypes() { @@ -157,4 +163,4 @@ private MemoryRecords buildIdempotentRecords(int[] batchSizes) { return MemoryRecords.readableRecords(buffer); } -} \ No newline at end of file +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerTest.java index 6b3123bd46..4459742f13 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManagerTest.java @@ -46,6 +46,7 @@ public class ProducerStateManagerTest extends KopProtocolHandlerTestBase { private final Long producerId = 1L; private final MockTime time = new MockTime(); private ProducerStateManager stateManager; + private ProducerStateManagerSnapshotBuffer producerStateManagerSnapshotBuffer; @BeforeClass @Override @@ -59,7 +60,9 @@ protected void setup() throws Exception { @BeforeMethod protected void setUp() { - stateManager = new ProducerStateManager(partition.toString()); + producerStateManagerSnapshotBuffer = new MemoryProducerStateManagerSnapshotBuffer(); + stateManager = new ProducerStateManager(partition.toString(), producerStateManagerSnapshotBuffer, + conf.getKafkaTxnProducerStateTopicSnapshotIntervalSeconds()); } @AfterMethod @@ -208,7 +211,8 @@ public void testNonTransactionalAppendWithOngoingTransaction() { @Test(timeOut = defaultTestTimeout) public void testSequenceNotValidatedForGroupMetadataTopic() { TopicPartition partition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); - stateManager = new ProducerStateManager(partition.toString()); + stateManager = new ProducerStateManager(partition.toString(), producerStateManagerSnapshotBuffer, + conf.getKafkaTxnProducerStateTopicSnapshotIntervalSeconds()); short epoch = 0; append(stateManager, producerId, epoch, 99L, time.milliseconds(), true, PartitionLog.AppendOrigin.Coordinator);