Skip to content

Commit

Permalink
Subscription: fix ConcurrentModificationException for `ConsumerGroupM…
Browse files Browse the repository at this point in the history
…eta` & fix the logic of `isTopicSubscribedByConsumerGroup` & avoid consumer from outputting too much content in string form (#14425)
  • Loading branch information
VGalaxies authored Dec 14, 2024
1 parent f68726f commit e80b992
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
import org.apache.iotdb.session.subscription.util.PollTimer;
import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
Expand Down Expand Up @@ -1463,8 +1464,11 @@ protected Map<String, String> coreReportMessage() {
result.put("consumerGroupId", consumerGroupId);
result.put("isClosed", isClosed.toString());
result.put("fileSaveDir", fileSaveDir);
result.put("inFlightFilesCommitContextSet", inFlightFilesCommitContextSet.toString());
result.put("subscribedTopicNames", subscribedTopics.keySet().toString());
result.put(
"inFlightFilesCommitContextSet",
CollectionUtils.getLimitedString(inFlightFilesCommitContextSet, 32));
result.put(
"subscribedTopicNames", CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.session.subscription.util;

import java.util.Collection;
import java.util.stream.Collectors;

public class CollectionUtils {

public static String getLimitedString(final Collection<?> collection, final int limit) {
return collection.stream().limit(limit).collect(Collectors.toList())
+ (collection.size() > limit ? " ... (" + (collection.size() - limit) + " more)" : "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,30 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class ConsumerGroupMeta {

private String consumerGroupId;
private long creationTime;
private Map<String, Set<String>> topicNameToSubscribedConsumerIdSet = new HashMap<>();
private Map<String, ConsumerMeta> consumerIdToConsumerMeta = new HashMap<>();
private Map<String, Set<String>> topicNameToSubscribedConsumerIdSet;
private Map<String, ConsumerMeta> consumerIdToConsumerMeta;

public ConsumerGroupMeta() {
// Empty constructor
this.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
this.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
}

public ConsumerGroupMeta(
final String consumerGroupId, final long creationTime, final ConsumerMeta firstConsumerMeta) {
this();

this.consumerGroupId = consumerGroupId;
this.creationTime = creationTime;
this.topicNameToSubscribedConsumerIdSet = new HashMap<>();
this.consumerIdToConsumerMeta = new HashMap<>();

consumerIdToConsumerMeta.put(firstConsumerMeta.getConsumerId(), firstConsumerMeta);
}
Expand All @@ -61,8 +62,9 @@ public ConsumerGroupMeta deepCopy() {
final ConsumerGroupMeta copied = new ConsumerGroupMeta();
copied.consumerGroupId = consumerGroupId;
copied.creationTime = creationTime;
copied.topicNameToSubscribedConsumerIdSet = new HashMap<>(topicNameToSubscribedConsumerIdSet);
copied.consumerIdToConsumerMeta = new HashMap<>(consumerIdToConsumerMeta);
copied.topicNameToSubscribedConsumerIdSet =
new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet);
copied.consumerIdToConsumerMeta = new ConcurrentHashMap<>(consumerIdToConsumerMeta);
return copied;
}

Expand Down Expand Up @@ -149,8 +151,12 @@ public Set<String> getTopicsSubscribedByConsumer(final String consumerId) {
return topics;
}

public Set<String> getTopicsSubscribedByConsumerGroup() {
return topicNameToSubscribedConsumerIdSet.keySet();
public boolean isTopicSubscribedByConsumerGroup(final String topic) {
final Set<String> subscribedConsumerIdSet = topicNameToSubscribedConsumerIdSet.get(topic);
if (Objects.isNull(subscribedConsumerIdSet)) {
return false;
}
return !subscribedConsumerIdSet.isEmpty();
}

public void addSubscription(final String consumerId, final Set<String> topics) {
Expand Down Expand Up @@ -228,7 +234,7 @@ public static ConsumerGroupMeta deserialize(final InputStream inputStream) throw
consumerGroupMeta.consumerGroupId = ReadWriteIOUtils.readString(inputStream);
consumerGroupMeta.creationTime = ReadWriteIOUtils.readLong(inputStream);

consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new HashMap<>();
consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(inputStream);
Expand All @@ -242,7 +248,7 @@ public static ConsumerGroupMeta deserialize(final InputStream inputStream) throw
consumerGroupMeta.topicNameToSubscribedConsumerIdSet.put(key, value);
}

consumerGroupMeta.consumerIdToConsumerMeta = new HashMap<>();
consumerGroupMeta.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(inputStream);
Expand All @@ -259,7 +265,7 @@ public static ConsumerGroupMeta deserialize(final ByteBuffer byteBuffer) {
consumerGroupMeta.consumerGroupId = ReadWriteIOUtils.readString(byteBuffer);
consumerGroupMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);

consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new HashMap<>();
consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(byteBuffer);
Expand All @@ -273,7 +279,7 @@ public static ConsumerGroupMeta deserialize(final ByteBuffer byteBuffer) {
consumerGroupMeta.topicNameToSubscribedConsumerIdSet.put(key, value);
}

consumerGroupMeta.consumerIdToConsumerMeta = new HashMap<>();
consumerGroupMeta.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(byteBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public boolean isEmpty() {

public Set<String> getSubscribedConsumerGroupIds(final String topicName) {
return consumerGroupIdToConsumerGroupMetaMap.entrySet().stream()
.filter(entry -> entry.getValue().getTopicsSubscribedByConsumerGroup().contains(topicName))
.filter(entry -> entry.getValue().isTopicSubscribedByConsumerGroup(topicName))
.map(Entry::getKey)
.collect(Collectors.toSet());
}
Expand All @@ -129,13 +129,12 @@ public boolean isTopicSubscribedByConsumerGroup(
return consumerGroupIdToConsumerGroupMetaMap.containsKey(consumerGroupId)
&& consumerGroupIdToConsumerGroupMetaMap
.get(consumerGroupId)
.getTopicsSubscribedByConsumerGroup()
.contains(topicName);
.isTopicSubscribedByConsumerGroup(topicName);
}

public boolean isTopicSubscribedByConsumerGroup(final String topicName) {
return consumerGroupIdToConsumerGroupMetaMap.values().stream()
.anyMatch(meta -> meta.getTopicsSubscribedByConsumerGroup().contains(topicName));
.anyMatch(meta -> meta.isTopicSubscribedByConsumerGroup(topicName));
}

///////////////////////////////// Snapshot /////////////////////////////////
Expand Down

0 comments on commit e80b992

Please sign in to comment.