Skip to content

Commit

Permalink
Add unit tests for topic emptiness class
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 27, 2023
1 parent abb69fa commit 81968bf
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ public synchronized boolean isTopicEmpty() {
}

if (currentThreadId != topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId() ||
topicEmptinessMetadata.isCheckDurationExceeded(System.currentTimeMillis())) {
topicEmptinessMetadata.isWithinCheckInterval(System.currentTimeMillis())) {
return topicEmptinessMetadata.isTopicEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.concurrent.ConcurrentHashMap;

public class TopicEmptinessMetadata {
private static final long IS_EMPTY_CHECK_INTERVAL_MS = 60000L;
static final long IS_EMPTY_CHECK_INTERVAL_MS = 60000L;

private long lastIsEmptyCheckTime;
private Long topicEmptyCheckingOwnerThreadId;
Expand Down Expand Up @@ -41,11 +41,15 @@ public Long getTopicEmptyCheckingOwnerThreadId() {
return this.topicEmptyCheckingOwnerThreadId;
}

public ConcurrentHashMap<TopicPartition, Boolean> getTopicPartitionToIsEmpty() {
return this.topicPartitionToIsEmpty;
}

public boolean isTopicEmpty() {
return topicPartitionToIsEmpty.values().stream().allMatch(isEmpty -> isEmpty);
}

public boolean isCheckDurationExceeded(final long epochTimestamp) {
public boolean isWithinCheckInterval(final long epochTimestamp) {
return epochTimestamp < lastIsEmptyCheckTime + IS_EMPTY_CHECK_INTERVAL_MS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.opensearch.dataprepper.plugins.kafka.consumer;

import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.dataprepper.plugins.kafka.consumer.TopicEmptinessMetadata.IS_EMPTY_CHECK_INTERVAL_MS;

public class TopicEmptinessMetadataTest {
@Mock
private TopicPartition topicPartition;
@Mock
private TopicPartition topicPartition2;

private TopicEmptinessMetadata topicEmptinessMetadata;

@BeforeEach
void setup() {
MockitoAnnotations.openMocks(this);
this.topicEmptinessMetadata = new TopicEmptinessMetadata();
}

@Test
void updateTopicEmptinessStatus_AddEntry() {
topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false);
assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().containsKey(topicPartition), equalTo(true));
assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().get(topicPartition), equalTo(false));
}

@Test
void updateTopicEmptinessStatus_UpdateEntry() {
topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false);
assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().containsKey(topicPartition), equalTo(true));
assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().get(topicPartition), equalTo(false));

topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true);
assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().containsKey(topicPartition), equalTo(true));
assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().get(topicPartition), equalTo(true));
}

@Test
void isTopicEmpty_NoItems() {
assertThat(topicEmptinessMetadata.isTopicEmpty(), equalTo(true));
}

@Test
void isTopicEmpty_OnePartition_IsNotEmpty() {
topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false);
assertThat(topicEmptinessMetadata.isTopicEmpty(), equalTo(false));
}

@Test
void isTopicEmpty_OnePartition_IsEmpty() {
topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true);
assertThat(topicEmptinessMetadata.isTopicEmpty(), equalTo(true));
}

@Test
void isTopicEmpty_MultiplePartitions_OneNotEmpty() {
topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true);
topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition2, false);
assertThat(topicEmptinessMetadata.isTopicEmpty(), equalTo(false));
}

@Test
void isCheckDurationExceeded_NoPreviousChecks() {
assertThat(topicEmptinessMetadata.isWithinCheckInterval(System.currentTimeMillis()), equalTo(false));
}

@Test
void isCheckDurationExceeded_CurrentTimeBeforeLastCheck() {
final long time = System.currentTimeMillis();
topicEmptinessMetadata.setLastIsEmptyCheckTime(time);
assertThat(topicEmptinessMetadata.isWithinCheckInterval(time - 1), equalTo(true));
}

@Test
void isCheckDurationExceeded_CurrentTimeAfterLastCheck_BeforeInterval() {
final long time = System.currentTimeMillis();
topicEmptinessMetadata.setLastIsEmptyCheckTime(time);
assertThat(topicEmptinessMetadata.isWithinCheckInterval((time + IS_EMPTY_CHECK_INTERVAL_MS) - 1), equalTo(true));
}

@Test
void isCheckDurationExceeded_CurrentTimeAfterLastCheck_AtInterval() {
final long time = System.currentTimeMillis();
topicEmptinessMetadata.setLastIsEmptyCheckTime(time);
assertThat(topicEmptinessMetadata.isWithinCheckInterval(time + IS_EMPTY_CHECK_INTERVAL_MS), equalTo(false));
}

@Test
void isCheckDurationExceeded_CurrentTimeAfterLastCheck_AfterInterval() {
final long time = System.currentTimeMillis();
topicEmptinessMetadata.setLastIsEmptyCheckTime(time);
assertThat(topicEmptinessMetadata.isWithinCheckInterval(time + IS_EMPTY_CHECK_INTERVAL_MS + 1), equalTo(false));
}
}

0 comments on commit 81968bf

Please sign in to comment.