From 650ab62d004edce24981f935af6fda40eefa14c7 Mon Sep 17 00:00:00 2001 From: Hankunming <1109939087@qq.com> Date: Tue, 17 Dec 2024 19:29:34 +0800 Subject: [PATCH] feat: support reset offset for lite pull consumer --- .../client/impl/factory/MQClientInstance.java | 16 ++++++- .../impl/factory/MQClientInstanceTest.java | 44 +++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index eba654c22d0..868914d14f7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -46,6 +46,7 @@ import org.apache.rocketmq.client.impl.MQAdminImpl; import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; @@ -1248,12 +1249,25 @@ public String findBrokerAddrByTopic(final String topic) { public synchronized void resetOffset(String topic, String group, Map offsetTable) { DefaultMQPushConsumerImpl consumer = null; + DefaultLitePullConsumerImpl litePullConsumer; try { MQConsumerInner impl = this.consumerTable.get(group); if (impl instanceof DefaultMQPushConsumerImpl) { consumer = (DefaultMQPushConsumerImpl) impl; + } else if (impl instanceof DefaultLitePullConsumerImpl) { + litePullConsumer = (DefaultLitePullConsumerImpl) impl; + litePullConsumer.pause(offsetTable.keySet()); + for (Entry messageQueueLongEntry : offsetTable.entrySet()) { + try { + litePullConsumer.seek(messageQueueLongEntry.getKey(), messageQueueLongEntry.getValue()); + } catch (MQClientException e) { + log.warn("[reset-offset] reset offset failed, topic={}, group={}, mq={}", topic, group, messageQueueLongEntry.getValue()); + } + } + litePullConsumer.resume(offsetTable.keySet()); + return; } else { - log.info("[reset-offset] consumer does not exist. group={}", group); + log.info("[reset-offset] consumer does not support reset offset. group={}", group); return; } consumer.suspend(); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java index d71bc25b9b3..317c7434d4a 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java @@ -26,10 +26,12 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.MQConsumerInner; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.impl.consumer.RebalanceImpl; +import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -102,6 +104,12 @@ public class MQClientInstanceTest { @Mock private ClientConfig clientConfig; + @Mock + private DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; + + @Mock + private RebalancePushImpl rebalancePushImpl; + private final MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); private final String topic = "FooBar"; @@ -501,4 +509,40 @@ private List createBrokerDatas() { brokerData.setBrokerAddrs(brokerAddrs); return Collections.singletonList(brokerData); } + + @Test + public void testResetOffsetForDefaultMQPushConsumer() { + Map offsetTable = new HashMap<>(); + MessageQueue messageQueue = new MessageQueue(topic, defaultBroker, 0); + offsetTable.put(messageQueue, 100L); + ConcurrentHashMap processQueueTable = new ConcurrentHashMap<>(); + processQueueTable.put(messageQueue, new ProcessQueue()); + + when(defaultMQPushConsumerImpl.getRebalanceImpl()).thenReturn(rebalancePushImpl); + when(rebalancePushImpl.getProcessQueueTable()).thenReturn(processQueueTable); + + mqClientInstance.registerConsumer(group, defaultMQPushConsumerImpl); + mqClientInstance.resetOffset(topic, group, offsetTable); + + verify(defaultMQPushConsumerImpl).suspend(); + verify(defaultMQPushConsumerImpl).resume(); + verify(rebalancePushImpl).removeUnnecessaryMessageQueue(any(MessageQueue.class), any(ProcessQueue.class)); + verify(defaultMQPushConsumerImpl, times(1)).updateConsumeOffset(any(MessageQueue.class), anyLong()); + } + + @Test + public void testResetOffsetForDefaultLitePullConsumer() throws MQClientException { + Map offsetTable = new HashMap<>(); + MessageQueue messageQueue = new MessageQueue(topic, defaultBroker, 0); + offsetTable.put(messageQueue, 100L); + + DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = mock(DefaultLitePullConsumerImpl.class); + + mqClientInstance.registerConsumer(group, defaultLitePullConsumerImpl); + mqClientInstance.resetOffset(topic, group, offsetTable); + + verify(defaultLitePullConsumerImpl).pause(any(Set.class)); + verify(defaultLitePullConsumerImpl).seek(any(MessageQueue.class), anyLong()); + verify(defaultLitePullConsumerImpl).resume(any(Set.class)); + } }