Skip to content

Commit

Permalink
Fix reported replication factor of segment with zero required replicas (
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz authored Jul 31, 2023
1 parent c648b1c commit e9b4f1e
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.Sets;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.ServerHolder;
Expand Down Expand Up @@ -194,18 +195,24 @@ private boolean moveSegment(DataSegment segment, ServerHolder serverA, ServerHol
@Override
public void replicateSegment(DataSegment segment, Map<String, Integer> tierToReplicaCount)
{
// Identify empty tiers and determine total required replicas
final Set<String> allTiersInCluster = Sets.newHashSet(cluster.getTierNames());
tierToReplicaCount.forEach((tier, requiredReplicas) -> {
reportTierCapacityStats(segment, requiredReplicas, tier);

SegmentReplicaCount replicaCount = replicaCountMap.computeIfAbsent(segment.getId(), tier);
replicaCount.setRequired(requiredReplicas, tierToHistoricalCount.getOrDefault(tier, 0));
if (tierToReplicaCount == null || tierToReplicaCount.isEmpty()) {
// Track the counts for a segment even if it requires 0 replicas on all tiers
replicaCountMap.computeIfAbsent(segment.getId(), DruidServer.DEFAULT_TIER);
} else {
// Identify empty tiers and determine total required replicas
tierToReplicaCount.forEach((tier, requiredReplicas) -> {
reportTierCapacityStats(segment, requiredReplicas, tier);

if (!allTiersInCluster.contains(tier)) {
tiersWithNoServer.add(tier);
}
});
SegmentReplicaCount replicaCount = replicaCountMap.computeIfAbsent(segment.getId(), tier);
replicaCount.setRequired(requiredReplicas, tierToHistoricalCount.getOrDefault(tier, 0));

if (!allTiersInCluster.contains(tier)) {
tiersWithNoServer.add(tier);
}
});
}

SegmentReplicaCount replicaCountInCluster = replicaCountMap.getTotal(segment.getId());
final int replicaSurplus = replicaCountInCluster.loadedNotDropping()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalDropRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
Expand Down Expand Up @@ -1262,6 +1265,39 @@ public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace()
EasyMock.verify(mockPeon);
}

@Test
public void testSegmentWithZeroRequiredReplicasHasZeroReplicationFactor()
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Collections.singletonList(
new ForeverLoadRule(Collections.emptyMap(), false)
)
).anyTimes();
EasyMock.replay(databaseRuleManager);

final DruidCluster cluster = DruidCluster
.builder()
.add(createServerHolder("server", "normal", new TestLoadQueuePeon()))
.build();

final DataSegment segment = usedSegments.get(0);
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(cluster, segment)
.withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(loadQueueManager)
.build();
params = ruleRunner.run(params);

Assert.assertNotNull(params);
SegmentReplicationStatus replicationStatus = params.getSegmentReplicationStatus();
Assert.assertNotNull(replicationStatus);

SegmentReplicaCount replicaCounts = replicationStatus.getReplicaCountsInCluster(segment.getId());
Assert.assertNotNull(replicaCounts);
Assert.assertEquals(0, replicaCounts.required());
Assert.assertEquals(0, replicaCounts.totalLoaded());
Assert.assertEquals(0, replicaCounts.requiredAndLoadable());
}

private CoordinatorRunStats runDutyAndGetStats(DruidCoordinatorRuntimeParams params)
{
params = ruleRunner.run(params);
Expand Down

0 comments on commit e9b4f1e

Please sign in to comment.