Skip to content

Commit

Permalink
Built out as much as I can for the test case
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed May 21, 2024
1 parent aa17a7b commit 707f7a4
Showing 1 changed file with 66 additions and 7 deletions.
73 changes: 66 additions & 7 deletions RFS/src/test/java/com/rfs/integration/SnapshotStateTest.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
package com.rfs.integration;

import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.lucene.util.IOUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

import com.rfs.common.ConnectionDetails;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.IndexMetadata;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;


import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.hamcrest.CoreMatchers.equalTo;

import java.io.File;
Expand All @@ -28,14 +38,20 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;

public class SnapshotStateTest {


private static final Logger logger = LoggerFactory.getLogger(SnapshotStateTest.class);

private final File SNAPSHOT_DIR = new File("./snapshotRepo");
private static GenericContainer<?> clusterContainer;
private static CloseableHttpClient httpClient;
private static String clusterUrl;

@SuppressWarnings("resource")
@BeforeEach
public void setUp() throws Exception {
IOUtils.rm(Path.of(SNAPSHOT_DIR.getAbsolutePath()));
Expand All @@ -46,6 +62,7 @@ public void setUp() throws Exception {
.withEnv("discovery.type", "single-node")
.withEnv("path.repo", "/usr/share/elasticsearch/snapshots")
.withFileSystemBind(SNAPSHOT_DIR.getName(), "/usr/share/elasticsearch/snapshots", BindMode.READ_WRITE)
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forHttp("/").forPort(9200).forStatusCode(200).withStartupTimeout(Duration.ofMinutes(1)));

clusterContainer.start();
Expand Down Expand Up @@ -123,15 +140,57 @@ public void SingleSnapshot_SingleDocument() throws Exception {
// PSUEDO: Create an 1 index with 1 document
createDocument("my-index", "doc1", "{\"foo\":\"bar\"}");
// PSUEDO: Save snapshot1
takeSnapshot("snapshot-1", "my-index");
final var snapshotName = "snapshot-1";
takeSnapshot(snapshotName, "my-index");
// PSUEDO: Start RFS worker reader, point to snapshot1
var repo = new FileSystemRepo(Path.of(SNAPSHOT_DIR.getAbsolutePath()));
var snapShotProvider = new SnapshotRepoProvider_ES_7_10(repo);
List<IndexMetadata.Data> indices = snapShotProvider.getIndicesInSnapshot(snapshotName)
.stream()
.map(index -> {
try {
return new IndexMetadataFactory_ES_7_10().fromRepo(repo, snapShotProvider, snapshotName, index.getName());
} catch (final Exception e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());

var sourceFileBasePath = new File("./unpacked");

for (final IndexMetadata.Data index : indices) {
for (int shardId = 0; shardId < index.getNumberOfShards(); shardId++) {
var shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, snapShotProvider, snapshotName, index.getName(), shardId);
SnapshotShardUnpacker.unpack(repo, shardMetadata, Paths.get(sourceFileBasePath.getAbsolutePath()), Integer.MAX_VALUE);
}
}

var targetConnection = mock(ConnectionDetails.class);

for (final IndexMetadata.Data index : indices) {
for (int shardId = 0; shardId < index.getNumberOfShards(); shardId++) {
final var documents = new LuceneDocumentsReader().readDocuments(Paths.get(sourceFileBasePath.getAbsolutePath()), index.getName(), shardId);

final var finalShardId = shardId; // Define in local context for the lambda
DocumentReindexer.reindex(index.getName(), documents, targetConnection)
.doOnError(error -> logger.error("Error during reindexing: " + error))
.doOnSuccess(done -> logger.info("Reindexing completed for index " + index.getName() + ", shard " + finalShardId))
// Wait for the shard reindexing to complete before proceeding; fine in this demo script, but
// shouldn't be done quite this way in the real RFS Worker.
.block();
}
}

// PSUEDO: Attach sink to inspect all of the operations performed on the target cluster

// Action
// PSUEDO: Start reindex on the worker
// PSUEDO: Wait until the operations sink has settled with expected operations.

// Validation
// verify(targetConnection); Not viable


// PSUEDO: Read the actions from the sink
// PSUEDO: Flush all read-only operations from the sink, such as refresh, searchs, etc...
// PSUEDO: Scan the sink for ONLY the following:
Expand Down

0 comments on commit 707f7a4

Please sign in to comment.