Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[bugfix] AppendRecordsContext cannot be Recyclable
Browse files Browse the repository at this point in the history
(cherry picked from commit 0b6fd77)
  • Loading branch information
eolivelli authored and gaoran10 committed Jul 20, 2023
1 parent 1dda879 commit fec2a25
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
PartitionLog.AppendOrigin.Client,
appendRecordsContext
).whenComplete((response, ex) -> {
appendRecordsContext.recycle();
if (ex != null) {
resultFuture.completeExceptionally(ex.getCause());
return;
Expand Down Expand Up @@ -2449,7 +2448,6 @@ protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest
PartitionLog.AppendOrigin.Coordinator,
appendRecordsContext
).whenComplete((result, ex) -> {
appendRecordsContext.recycle();
if (ex != null) {
log.error("[{}] Append txn marker ({}) failed.", ctx.channel(), marker, ex);
Map<TopicPartition, Errors> currentErrors = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,39 @@
package io.streamnative.pulsar.handlers.kop.storage;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Recycler;
import io.streamnative.pulsar.handlers.kop.KafkaTopicManager;
import io.streamnative.pulsar.handlers.kop.PendingTopicFutures;
import java.util.Map;
import java.util.function.Consumer;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;

/**
* AppendRecordsContext is use for pass parameters to ReplicaManager, to avoid long parameter lists.
*/
@Slf4j
@AllArgsConstructor
@Getter
public class AppendRecordsContext {
private static final Recycler<AppendRecordsContext> RECYCLER = new Recycler<AppendRecordsContext>() {
protected AppendRecordsContext newObject(Handle<AppendRecordsContext> handle) {
return new AppendRecordsContext(handle);
}
};

private final Recycler.Handle<AppendRecordsContext> recyclerHandle;
private KafkaTopicManager topicManager;
private Consumer<Integer> startSendOperationForThrottling;
private Consumer<Integer> completeSendOperationForThrottling;
private Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap;
private ChannelHandlerContext ctx;

private AppendRecordsContext(Recycler.Handle<AppendRecordsContext> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

// recycler and get for this object
public static AppendRecordsContext get(final KafkaTopicManager topicManager,
final Consumer<Integer> startSendOperationForThrottling,
final Consumer<Integer> completeSendOperationForThrottling,
final Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap,
final ChannelHandlerContext ctx) {
AppendRecordsContext context = RECYCLER.get();
context.topicManager = topicManager;
context.startSendOperationForThrottling = startSendOperationForThrottling;
context.completeSendOperationForThrottling = completeSendOperationForThrottling;
context.pendingTopicFuturesMap = pendingTopicFuturesMap;
context.ctx = ctx;

return context;
}

public void recycle() {
topicManager = null;
startSendOperationForThrottling = null;
completeSendOperationForThrottling = null;
pendingTopicFuturesMap = null;
recyclerHandle.recycle(this);
ctx = null;
return new AppendRecordsContext(topicManager,
startSendOperationForThrottling,
completeSendOperationForThrottling,
pendingTopicFuturesMap,
ctx);
}

}

0 comments on commit fec2a25

Please sign in to comment.