From 4e37aae15d3cc8c80d90b80d2979f2cf65f8fffb Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Mon, 23 Sep 2024 18:29:21 -0400 Subject: [PATCH 1/3] Linting changes Signed-off-by: Greg Schohn --- .../migrations/RfsMigrateDocuments.java | 3 +- .../bulkload/common/ByteArrayIndexInput.java | 6 +- .../common/LuceneDocumentsReader.java | 101 ++++++++------- .../bulkload/common/OpenSearchClient.java | 121 ++++++++---------- .../bulkload/common/RestClient.java | 1 - .../migrations/bulkload/common/S3Repo.java | 85 ++++++------ .../bulkload/common/SnapshotCreator.java | 8 +- .../bulkload/common/SourceRepoAccessor.java | 4 +- .../version_universal/RemoteReaderClient.java | 4 +- .../OpenSearchWorkCoordinator.java | 4 +- .../bulkload/worker/DocumentsRunner.java | 5 +- .../traffic/source/BlockingTrafficSource.java | 4 +- .../utils/kafka/Base64Formatter.java | 6 +- 13 files changed, 174 insertions(+), 178 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index 1e47b08d9..c1ab52b97 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -263,7 +263,8 @@ public static DocumentsRunner.CompletionStatus run(Function length) { - throw new EOFException("seek past EOF"); + throw new EOFException(); } pos = (int) l; } @@ -78,7 +78,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw @Override public byte readByte() throws IOException { if (pos >= offset + length) { - throw new EOFException("seek past EOF"); + throw new EOFException(); } return bytes[offset + pos++]; } @@ -86,7 +86,7 @@ public byte readByte() throws IOException { @Override public void readBytes(final byte[] b, final int offset, int len) throws IOException { if (pos + len > this.offset + length) { - throw new EOFException("seek past EOF"); + throw new EOFException(); } System.arraycopy(bytes, this.offset + pos, b, offset, len); pos += len; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java index f87502629..780b714d2 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java @@ -6,6 +6,7 @@ import java.util.concurrent.Callable; import java.util.function.Function; +import jdk.jshell.spi.ExecutionControl; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; @@ -154,61 +155,65 @@ protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletes } protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean isLive) { + Document document; try { - Document document = reader.document(docId); - String id = null; - BytesRef sourceBytes = null; - try { - for (var field : document.getFields()) { - String fieldName = field.name(); - switch (fieldName) { - case "_id": { - // ES 6+ - var idBytes = field.binaryValue(); - id = Uid.decodeId(idBytes.bytes); - break; - } - case "_uid": { - // ES 5 - id = field.stringValue(); - break; - } - case "_source": { - // All versions (?) - sourceBytes = field.binaryValue(); - break; - } - } - } - if (id == null) { - log.atError().setMessage("Document with index" + docId + " does not have an id. Skipping").log(); - return null; // Skip documents with missing id - } + document = reader.document(docId); + } catch (IOException e) { + log.atError().setMessage("Failed to read document at Lucene index location {}").addArgument(docId).setCause(e).log(); + return null; + } - if (sourceBytes == null || sourceBytes.bytes.length == 0) { - log.atWarn().setMessage("Document {} doesn't have the _source field enabled").addArgument(id).log(); - return null; // Skip these + String id = null; + BytesRef sourceBytes = null; + try { + for (var field : document.getFields()) { + String fieldName = field.name(); + switch (fieldName) { + case "_id": { + // ES 6+ + var idBytes = field.binaryValue(); + id = Uid.decodeId(idBytes.bytes); + break; + } + case "_uid": { + // ES 5 + id = field.stringValue(); + break; + } + case "_source": { + // All versions (?) + sourceBytes = field.binaryValue(); + break; + } + default: + break; } - - log.atDebug().setMessage("Reading document {}").addArgument(id).log(); - } catch (Exception e) { - StringBuilder errorMessage = new StringBuilder(); - errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); - document.getFields().forEach(f -> errorMessage.append(f.name()).append(", ")); - log.atError().setMessage(errorMessage.toString()).setCause(e).log(); - return null; // Skip documents with invalid id + } + if (id == null) { + log.atError().setMessage("Document with index" + docId + " does not have an id. Skipping").log(); + return null; // Skip documents with missing id } - if (!isLive) { - log.atDebug().setMessage("Document {} is not live").addArgument(id).log(); - return null; // Skip these + if (sourceBytes == null || sourceBytes.bytes.length == 0) { + log.atWarn().setMessage("Document {} doesn't have the _source field enabled").addArgument(id).log(); + return null; // Skip these } - log.atDebug().setMessage("Document {} read successfully").addArgument(id).log(); - return new RfsLuceneDocument(id, sourceBytes.utf8ToString()); - } catch (Exception e) { - log.atError().setMessage("Failed to read document at Lucene index location {}").addArgument(docId).setCause(e).log(); - return null; + log.atDebug().setMessage("Reading document {}").addArgument(id).log(); + } catch (RuntimeException e) { + StringBuilder errorMessage = new StringBuilder(); + errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); + document.getFields().forEach(f -> errorMessage.append(f.name()).append(", ")); + log.atError().setMessage(errorMessage.toString()).setCause(e).log(); + return null; // Skip documents with invalid id + } + + if (!isLive) { + log.atDebug().setMessage("Document {} is not live").addArgument(id).log(); + return null; // Skip these } + + log.atDebug().setMessage("Document {} read successfully").addArgument(id).log(); + return new RfsLuceneDocument(id, sourceBytes.utf8ToString()); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index 2f9e5d289..5047021df 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -32,23 +32,26 @@ public class OpenSearchClient { protected static final ObjectMapper objectMapper = new ObjectMapper(); - private static final int defaultMaxRetryAttempts = 3; - private static final Duration defaultBackoff = Duration.ofSeconds(1); - private static final Duration defaultMaxBackoff = Duration.ofSeconds(10); - private static final Retry snapshotRetryStrategy = Retry.backoff(defaultMaxRetryAttempts, defaultBackoff) - .maxBackoff(defaultMaxBackoff); - protected static final Retry checkIfItemExistsRetryStrategy = Retry.backoff(defaultMaxRetryAttempts, defaultBackoff) - .maxBackoff(defaultMaxBackoff); - private static final Retry createItemExistsRetryStrategy = Retry.backoff(defaultMaxRetryAttempts, defaultBackoff) - .maxBackoff(defaultMaxBackoff) - .filter(throwable -> !(throwable instanceof InvalidResponse)); // Do not retry on this exception - - private static final int bulkMaxRetryAttempts = 15; - private static final Duration bulkBackoff = Duration.ofSeconds(2); - private static final Duration bulkMaxBackoff = Duration.ofSeconds(60); + private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 3; + private static final Duration DEFAULT_BACKOFF = Duration.ofSeconds(1); + private static final Duration DEFAULT_MAX_BACKOFF = Duration.ofSeconds(10); + private static final Retry SNAPSHOT_RETRY_STRATEGY = Retry.backoff(DEFAULT_MAX_RETRY_ATTEMPTS, DEFAULT_BACKOFF) + .maxBackoff(DEFAULT_MAX_BACKOFF); + protected static final Retry CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY = + Retry.backoff(DEFAULT_MAX_RETRY_ATTEMPTS, DEFAULT_BACKOFF) + .maxBackoff(DEFAULT_MAX_BACKOFF); + private static final Retry CREATE_ITEM_EXISTS_RETRY_STRATEGY = + Retry.backoff(DEFAULT_MAX_RETRY_ATTEMPTS, DEFAULT_BACKOFF) + .maxBackoff(DEFAULT_MAX_BACKOFF) + .filter(throwable -> !(throwable instanceof InvalidResponse)); // Do not retry on this exception + + private static final int BULK_MAX_RETRY_ATTEMPTS = 15; + private static final Duration BULK_BACKOFF = Duration.ofSeconds(2); + private static final Duration BULK_MAX_BACKOFF = Duration.ofSeconds(60); /** Retries for up 10 minutes */ - private static final Retry bulkRetryStrategy = Retry.backoff(bulkMaxRetryAttempts, bulkBackoff) - .maxBackoff(bulkMaxBackoff); + private static final Retry BULK_RETRY_STRATEGY = Retry.backoff(BULK_MAX_RETRY_ATTEMPTS, BULK_BACKOFF) + .maxBackoff(BULK_MAX_BACKOFF); + public static final String SNAPSHOT_PREFIX_STR = "_snapshot/"; static { objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -76,7 +79,7 @@ public Version getClusterVersion() { } }) .doOnError(e -> log.error(e.getMessage())) - .retryWhen(checkIfItemExistsRetryStrategy) + .retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) .block(); } @@ -198,19 +201,12 @@ private Optional createObjectIdempotent( new InvalidResponse("Create object failed for " + objectPath + "\r\n" + resp.body, resp) ); } else { - String errorMessage = ("Could not create object: " - + objectPath - + ". Response Code: " - + resp.statusCode - + ", Response Message: " - + resp.statusText - + ", Response Body: " - + resp.body); + String errorMessage = "Could not create object: " + objectPath + ". " + getString(resp); return Mono.error(new OperationFailed(errorMessage, resp)); } }) .doOnError(e -> log.error(e.getMessage())) - .retryWhen(createItemExistsRetryStrategy) + .retryWhen(CREATE_ITEM_EXISTS_RETRY_STRATEGY) .block(); return Optional.of(settings); @@ -221,35 +217,39 @@ private Optional createObjectIdempotent( return Optional.empty(); } + private static String getString(HttpResponse resp) { + return "Response Code: " + + resp.statusCode + + ", Response Message: " + + resp.statusText + + ", Response Body: " + + resp.body; + } + private boolean hasObjectCheck( String objectPath, IRfsContexts.ICheckedIdempotentPutRequestContext context ) { var requestContext = Optional.ofNullable(context) - .map(c -> c.createCheckRequestContext()) + .map(IRfsContexts.ICheckedIdempotentPutRequestContext::createCheckRequestContext) .orElse(null); var getResponse = client.getAsync(objectPath, requestContext) .flatMap(resp -> { - if (resp.statusCode == HttpURLConnection.HTTP_NOT_FOUND || resp.statusCode == HttpURLConnection.HTTP_OK) { + if (resp.statusCode == HttpURLConnection.HTTP_NOT_FOUND || + resp.statusCode == HttpURLConnection.HTTP_OK) + { return Mono.just(resp); } else { - String errorMessage = ("Could not create object: " - + objectPath - + ". Response Code: " - + resp.statusCode - + ", Response Message: " - + resp.statusText - + ", Response Body: " - + resp.body); + String errorMessage = "Could not create object: " + objectPath + ". " + getString(resp); return Mono.error(new OperationFailed(errorMessage, resp)); } }) .doOnError(e -> log.error(e.getMessage())) - .retryWhen(checkIfItemExistsRetryStrategy) + .retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) .block(); - assert getResponse != null : ("getResponse should not be null; it should either be a valid response or an exception" - + " should have been thrown."); + assert getResponse != null : ("getResponse should not be null; it should either be a valid response or " + + "an exception should have been thrown."); return getResponse.statusCode == HttpURLConnection.HTTP_OK; } @@ -261,24 +261,17 @@ public void registerSnapshotRepo( ObjectNode settings, IRfsContexts.ICreateSnapshotContext context ) { - String targetPath = "_snapshot/" + repoName; + String targetPath = SNAPSHOT_PREFIX_STR + repoName; client.putAsync(targetPath, settings.toString(), context.createRegisterRequest()).flatMap(resp -> { if (resp.statusCode == HttpURLConnection.HTTP_OK) { return Mono.just(resp); } else { - String errorMessage = ("Could not register snapshot repo: " - + targetPath - + ". Response Code: " - + resp.statusCode - + ", Response Message: " - + resp.statusText - + ", Response Body: " - + resp.body); + String errorMessage = "Could not register snapshot repo: " + targetPath + ". " + getString(resp); return Mono.error(new OperationFailed(errorMessage, resp)); } }) .doOnError(e -> log.error(e.getMessage())) - .retryWhen(snapshotRetryStrategy) + .retryWhen(SNAPSHOT_RETRY_STRATEGY) .block(); } @@ -291,24 +284,17 @@ public void createSnapshot( ObjectNode settings, IRfsContexts.ICreateSnapshotContext context ) { - String targetPath = "_snapshot/" + repoName + "/" + snapshotName; + String targetPath = SNAPSHOT_PREFIX_STR + repoName + "/" + snapshotName; client.putAsync(targetPath, settings.toString(), context.createSnapshotContext()).flatMap(resp -> { if (resp.statusCode == HttpURLConnection.HTTP_OK) { return Mono.just(resp); } else { - String errorMessage = ("Could not create snapshot: " - + targetPath - + ". Response Code: " - + resp.statusCode - + ", Response Message: " - + resp.statusText - + ", Response Body: " - + resp.body); + String errorMessage = "Could not create snapshot: " + targetPath + "." + getString(resp); return Mono.error(new OperationFailed(errorMessage, resp)); } }) .doOnError(e -> log.error(e.getMessage())) - .retryWhen(snapshotRetryStrategy) + .retryWhen(SNAPSHOT_RETRY_STRATEGY) .block(); } @@ -321,7 +307,7 @@ public Optional getSnapshotStatus( String snapshotName, IRfsContexts.ICreateSnapshotContext context ) { - String targetPath = "_snapshot/" + repoName + "/" + snapshotName; + String targetPath = SNAPSHOT_PREFIX_STR + repoName + "/" + snapshotName; var getResponse = client.getAsync(targetPath, context.createGetSnapshotContext()).flatMap(resp -> { if (resp.statusCode == HttpURLConnection.HTTP_OK || resp.statusCode == HttpURLConnection.HTTP_NOT_FOUND) { return Mono.just(resp); @@ -336,11 +322,11 @@ public Optional getSnapshotStatus( } }) .doOnError(e -> log.error(e.getMessage())) - .retryWhen(snapshotRetryStrategy) + .retryWhen(SNAPSHOT_RETRY_STRATEGY) .block(); - assert getResponse != null : ("getResponse should not be null; it should either be a valid response or an exception" - + " should have been thrown."); + assert getResponse != null : ("getResponse should not be null; it should either be a valid response or an " + + "exception should have been thrown."); if (getResponse.statusCode == HttpURLConnection.HTTP_OK) { try { return Optional.of(objectMapper.readValue(getResponse.body, ObjectNode.class)); @@ -360,10 +346,12 @@ public Optional getSnapshotStatus( } Retry getBulkRetryStrategy() { - return bulkRetryStrategy; + return BULK_RETRY_STRATEGY; } - public Mono sendBulkRequest(String indexName, List docs, IRfsContexts.IRequestContext context) { + public Mono sendBulkRequest(String indexName, List docs, + IRfsContexts.IRequestContext context) + { final var docsMap = docs.stream().collect(Collectors.toMap(d -> d.getDocId(), d -> d)); return Mono.defer(() -> { final String targetPath = indexName + "/_bulk"; @@ -380,7 +368,8 @@ public Mono sendBulkRequest(String indexName, List { - var resp = new BulkResponse(response.statusCode, response.statusText, response.headers, response.body); + var resp = + new BulkResponse(response.statusCode, response.statusText, response.headers, response.body); if (!resp.hasBadStatusCode() && !resp.hasFailedOperations()) { return Mono.just(resp); } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RestClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RestClient.java index 9b3f9fb08..fb47d4984 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RestClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RestClient.java @@ -48,7 +48,6 @@ public class RestClient { private static final String USER_AGENT_HEADER_NAME = HttpHeaderNames.USER_AGENT.toString(); private static final String CONTENT_TYPE_HEADER_NAME = HttpHeaderNames.CONTENT_TYPE.toString(); - private static final String CONTENT_ENCODING_HEADER_NAME = HttpHeaderNames.CONTENT_ENCODING.toString(); private static final String ACCEPT_ENCODING_HEADER_NAME = HttpHeaderNames.ACCEPT_ENCODING.toString(); private static final String HOST_HEADER_NAME = HttpHeaderNames.HOST.toString(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java index 6627d6753..bca74027a 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java @@ -6,6 +6,7 @@ import java.util.Comparator; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -26,11 +27,12 @@ import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest; @ToString(onlyExplicitlyIncluded = true) +@Slf4j public class S3Repo implements SourceRepo { - private static final Logger logger = LogManager.getLogger(S3Repo.class); private static final double S3_TARGET_THROUGHPUT_GIBPS = 8.0; // Arbitrarily chosen private static final long S3_MAX_MEMORY_BYTES = 1024L * 1024 * 1024; // Arbitrarily chosen private static final long S3_MINIMUM_PART_SIZE_BYTES = 8L * 1024 * 1024; // Default, but be explicit + public static final String INDICES_PREFIX_STR = "indices/"; private final Path s3LocalDir; @ToString.Include @@ -80,11 +82,12 @@ private void ensureFileExistsLocally(S3Uri s3Uri, Path localPath) { ensureS3LocalDirectoryExists(localPath.getParent()); if (doesFileExistLocally(localPath)) { - logger.debug("File already exists locally: {}", localPath); + log.atDebug().setMessage("File already exists locally: {}").addArgument(localPath).log(); return; } - logger.info("Downloading file from S3: {} to {}", s3Uri.uri, localPath); + log.atInfo() + .setMessage("Downloading file from S3: {} to {}").addArgument(s3Uri.uri).addArgument(localPath).log(); GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(s3Uri.bucketName).key(s3Uri.key).build(); s3Client.getObject(getObjectRequest, AsyncResponseTransformer.toFile(localPath)).join(); @@ -147,7 +150,7 @@ public Path getSnapshotMetadataFilePath(String snapshotId) { @Override public Path getIndexMetadataFilePath(String indexId, String indexFileId) { - String suffix = "indices/" + indexId + "/meta-" + indexFileId + ".dat"; + String suffix = INDICES_PREFIX_STR + indexId + "/meta-" + indexFileId + ".dat"; Path filePath = s3LocalDir.resolve(suffix); S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix); ensureFileExistsLocally(fileUri, filePath); @@ -156,14 +159,13 @@ public Path getIndexMetadataFilePath(String indexId, String indexFileId) { @Override public Path getShardDirPath(String indexId, int shardId) { - String suffix = "indices/" + indexId + "/" + shardId; - Path shardDirPath = s3LocalDir.resolve(suffix); - return shardDirPath; + String suffix = INDICES_PREFIX_STR + indexId + "/" + shardId; + return s3LocalDir.resolve(suffix); } @Override public Path getShardMetadataFilePath(String snapshotId, String indexId, int shardId) { - String suffix = "indices/" + indexId + "/" + shardId + "/snap-" + snapshotId + ".dat"; + String suffix = INDICES_PREFIX_STR + indexId + "/" + shardId + "/snap-" + snapshotId + ".dat"; Path filePath = s3LocalDir.resolve(suffix); S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix); ensureFileExistsLocally(fileUri, filePath); @@ -172,7 +174,7 @@ public Path getShardMetadataFilePath(String snapshotId, String indexId, int shar @Override public Path getBlobFilePath(String indexId, int shardId, String blobName) { - String suffix = "indices/" + indexId + "/" + shardId + "/" + blobName; + String suffix = INDICES_PREFIX_STR + indexId + "/" + shardId + "/" + blobName; Path filePath = s3LocalDir.resolve(suffix); S3Uri fileUri = new S3Uri(s3RepoUri.uri + "/" + suffix); ensureFileExistsLocally(fileUri, filePath); @@ -181,39 +183,38 @@ public Path getBlobFilePath(String indexId, int shardId, String blobName) { @Override public void prepBlobFiles(ShardMetadata shardMetadata) { - S3TransferManager transferManager = S3TransferManager.builder().s3Client(s3Client).build(); - - Path shardDirPath = getShardDirPath(shardMetadata.getIndexId(), shardMetadata.getShardId()); - ensureS3LocalDirectoryExists(shardDirPath); - - String blobFilesS3Prefix = s3RepoUri.key - + "indices/" - + shardMetadata.getIndexId() - + "/" - + shardMetadata.getShardId() - + "/"; - - logger.info( - "Downloading blob files from S3: s3://%s/%s to %s", - s3RepoUri.bucketName, - blobFilesS3Prefix, - shardDirPath - ); - DirectoryDownload directoryDownload = transferManager.downloadDirectory( - DownloadDirectoryRequest.builder() - .destination(shardDirPath) - .bucket(s3RepoUri.bucketName) - .listObjectsV2RequestTransformer(l -> l.prefix(blobFilesS3Prefix)) - .build() - ); - - // Wait for the transfer to complete - CompletedDirectoryDownload completedDirectoryDownload = directoryDownload.completionFuture().join(); - - logger.info("Blob file download(s) complete"); - - // Print out any failed downloads - completedDirectoryDownload.failedTransfers().forEach(logger::error); + try (S3TransferManager transferManager = S3TransferManager.builder().s3Client(s3Client).build()) { + + Path shardDirPath = getShardDirPath(shardMetadata.getIndexId(), shardMetadata.getShardId()); + ensureS3LocalDirectoryExists(shardDirPath); + + String blobFilesS3Prefix = s3RepoUri.key + + INDICES_PREFIX_STR + + shardMetadata.getIndexId() + + "/" + + shardMetadata.getShardId() + + "/"; + + log.atInfo().setMessage("Downloading blob files from S3: s3://%s/%s to %s") + .addArgument(s3RepoUri.bucketName) + .addArgument(blobFilesS3Prefix) + .addArgument(shardDirPath).log(); + DirectoryDownload directoryDownload = transferManager.downloadDirectory( + DownloadDirectoryRequest.builder() + .destination(shardDirPath) + .bucket(s3RepoUri.bucketName) + .listObjectsV2RequestTransformer(l -> l.prefix(blobFilesS3Prefix)) + .build() + ); + + // Wait for the transfer to complete + CompletedDirectoryDownload completedDirectoryDownload = directoryDownload.completionFuture().join(); + + log.atInfo().setMessage(()->"Blob file download(s) complete").log(); + + // Print out any failed downloads + completedDirectoryDownload.failedTransfers().forEach(x->log.error("{}", x)); + } } public static class CannotFindSnapshotRepoRoot extends RfsException { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java index 53942e4e6..4e4d3c494 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -20,9 +21,10 @@ public abstract class SnapshotCreator { private final OpenSearchClient client; private final IRfsContexts.ICreateSnapshotContext context; + @Getter private final String snapshotName; - public SnapshotCreator(String snapshotName, OpenSearchClient client, IRfsContexts.ICreateSnapshotContext context) { + protected SnapshotCreator(String snapshotName, OpenSearchClient client, IRfsContexts.ICreateSnapshotContext context) { this.snapshotName = snapshotName; this.client = client; this.context = context; @@ -34,10 +36,6 @@ public String getRepoName() { return "migration_assistant_repo"; } - public String getSnapshotName() { - return snapshotName; - } - public void registerRepo() { ObjectNode settings = getRequestBodyForRegisterRepo(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SourceRepoAccessor.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SourceRepoAccessor.java index 0a9d4a0ba..a9f6fd1fb 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SourceRepoAccessor.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SourceRepoAccessor.java @@ -9,7 +9,7 @@ public abstract class SourceRepoAccessor { private final SourceRepo repo; - public SourceRepoAccessor(SourceRepo repo) { + protected SourceRepoAccessor(SourceRepo repo) { this.repo = repo; } @@ -19,7 +19,7 @@ public Path getRepoRootDir() { public InputStream getSnapshotRepoDataFile() { return load(repo.getSnapshotRepoDataFilePath()); - }; + } public InputStream getGlobalMetadataFile(String snapshotId) { return load(repo.getGlobalMetadataFilePath(snapshotId)); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java index a0be6f10d..47a8abf57 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java @@ -36,7 +36,7 @@ public ObjectNode getClusterData() { .flatMap(this::getJsonForTemplateApis) .map(json -> Map.entry(entry.getKey(), json)) .doOnError(e -> log.error("Error fetching template {}: {}", entry.getKey(), e.getMessage())) - .retryWhen(checkIfItemExistsRetryStrategy) + .retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) ) .collectMap(Entry::getKey, Entry::getValue) .block(); @@ -74,7 +74,7 @@ public ObjectNode getIndexes() { .getAsync(endpoint, null) .flatMap(this::getJsonForIndexApis) .doOnError(e -> log.error(e.getMessage())) - .retryWhen(checkIfItemExistsRetryStrategy) + .retryWhen(CHECK_IF_ITEM_EXISTS_RETRY_STRATEGY) ) .collectList() .block(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index ec3ec39f6..480b882e6 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -683,7 +683,7 @@ private WorkItemAndDuration getAssignedWorkItem(LeaseChecker leaseChecker, ctx.addTraceException(e, false); var sleepBeforeNextRetryDuration = Duration.ofMillis( Math.min(MAX_ASSIGNED_DOCUMENT_NOT_FOUND_RETRY_INTERVAL, - (long) (Math.pow(2.0, retries-1) * ACQUIRE_WORK_RETRY_BASE_MS))); + (long) (Math.pow(2.0, (double) (retries-1)) * ACQUIRE_WORK_RETRY_BASE_MS))); leaseChecker.checkRetryWaitTimeOrThrow(e, retries-1, sleepBeforeNextRetryDuration); log.atWarn().setMessage(() -> "Couldn't complete work assignment due to exception. " @@ -711,7 +711,7 @@ public PotentialClockDriftDetectedException(String s, long timestampEpochSeconds @AllArgsConstructor public static class ResponseException extends Exception { - final AbstractedHttpClient.AbstractHttpResponse response; + final transient AbstractedHttpClient.AbstractHttpResponse response; @Override public String getMessage() { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java index 45c87a3aa..958b5d3e6 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java @@ -49,7 +49,10 @@ public CompletionStatus migrateNextShard( return workCoordinator.ensurePhaseCompletion(wc -> { try { return wc.acquireNextWorkItem(maxInitialLeaseDuration, context::createOpeningContext); - } catch (Exception e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Lombok.sneakyThrow(e); + } catch (IOException e) { throw Lombok.sneakyThrow(e); } }, new IWorkCoordinator.WorkAcquisitionOutcomeVisitor<>() { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java index 1c48a431d..aacafc91c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.java @@ -194,7 +194,9 @@ private Void blockIfNeeded(ITrafficSourceContexts.IReadChunkContext readContext) .setMessage(() -> "acquiring readGate semaphore with timeout=" + waitIntervalMs) .log(); try (var waitContext = blockContext.createWaitForSignalContext()) { - readGate.tryAcquire(waitIntervalMs, TimeUnit.MILLISECONDS); + var didAcquire = readGate.tryAcquire(waitIntervalMs, TimeUnit.MILLISECONDS); + log.atTrace().setMessage("semaphore ") + .addArgument(() -> (didAcquire ? "" : "not ") + "acquired").log(); } } } diff --git a/libraries/kafkaCommandLineFormatter/src/main/java/org/opensearch/migrations/utils/kafka/Base64Formatter.java b/libraries/kafkaCommandLineFormatter/src/main/java/org/opensearch/migrations/utils/kafka/Base64Formatter.java index 02c5a4f5c..d0bde42b9 100644 --- a/libraries/kafkaCommandLineFormatter/src/main/java/org/opensearch/migrations/utils/kafka/Base64Formatter.java +++ b/libraries/kafkaCommandLineFormatter/src/main/java/org/opensearch/migrations/utils/kafka/Base64Formatter.java @@ -9,9 +9,7 @@ public class Base64Formatter implements MessageFormatter { @Override - public void writeTo(ConsumerRecord record, PrintStream out) { - out.println(Base64.getEncoder().encodeToString(record.value())); + public void writeTo(ConsumerRecord kafkaRecord, PrintStream out) { + out.println(Base64.getEncoder().encodeToString(kafkaRecord.value())); } - - @Override public void close() {} } From 3600c6052bbc55f9b7cec7e129c1f5d0e6e5dd88 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Mon, 23 Sep 2024 23:20:34 -0400 Subject: [PATCH 2/3] A round of linting changes across the codebase. Signed-off-by: Greg Schohn --- .../ParallelDocumentMigrationsTest.java | 4 +- .../migrations/bulkload/SourceTestBase.java | 4 +- .../bulkload/common/InvalidResponse.java | 22 ++-- .../common/LuceneDocumentsReader.java | 1 - .../bulkload/common/OpenSearchClient.java | 2 +- .../migrations/bulkload/common/S3Repo.java | 5 +- .../bulkload/common/SnapshotCreator.java | 3 +- .../bulkload/tracing/IRfsContexts.java | 12 +- .../tracing/IWorkCoordinationContexts.java | 26 ++-- .../bulkload/tracing/RfsContexts.java | 10 +- .../tracing/WorkCoordinationContexts.java | 24 ++-- .../transformers/TransformFunctions.java | 40 +++--- .../Transformer_ES_6_8_to_OS_2_11.java | 26 ++-- .../Transformer_ES_7_10_OS_2_11.java | 57 ++++---- .../SnapshotRepoProvider_ES_7_10.java | 5 +- .../version_universal/RemoteDataProvider.java | 4 +- .../version_universal/RemoteReaderClient.java | 10 +- .../OpenSearchWorkCoordinator.java | 123 ++++++------------ .../bulkload/worker/IndexRunner.java | 2 +- .../bulkload/worker/ShardWorkPreparer.java | 2 +- .../cluster/ClusterProviderRegistry.java | 2 +- .../tracing/IMetadataMigrationContexts.java | 12 +- .../tracing/MetadataMigrationContexts.java | 8 +- .../tracing/DocumentMigrationContexts.java | 10 +- .../tracing/IDocumentMigrationContexts.java | 12 +- ...hannelConnectionCaptureSerializerTest.java | 4 +- .../netty/tracing/IWireCaptureContexts.java | 18 +-- .../netty/tracing/WireCaptureContexts.java | 18 +-- .../proxyserver/CaptureProxySetupTest.java | 2 +- .../netty/NettyScanningHttpProxyTest.java | 4 +- .../replay/AggregatedRawResponse.java | 1 + .../replay/AggregatedRawResult.java | 2 +- .../replay/datahandlers/http/EndOfInput.java | 2 +- .../replay/http/retries/DefaultRetry.java | 4 + .../replay/tracing/IReplayContexts.java | 38 +++--- .../replay/tracing/ReplayContexts.java | 38 +++--- .../replay/HttpByteBufFormatterTest.java | 8 +- .../replay/ResultsToLogsConsumerTest.java | 2 +- .../NettyPacketToHttpConsumerTest.java | 4 +- .../HttpJsonTransformingConsumerTest.java | 2 +- .../testutils/SimpleHttpClientForTesting.java | 4 +- 41 files changed, 261 insertions(+), 316 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java index 8cb185683..d07b11052 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java @@ -34,10 +34,10 @@ @Slf4j public class ParallelDocumentMigrationsTest extends SourceTestBase { - final static List SOURCE_IMAGES = List.of( + static final List SOURCE_IMAGES = List.of( SearchClusterContainer.ES_V7_10_2 ); - final static List TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0); + static final List TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0); public static Stream makeDocumentMigrationArgs() { List sourceImageArgs = SOURCE_IMAGES.stream() diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java index 24223d259..4c5e78480 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java @@ -49,9 +49,9 @@ @Slf4j public class SourceTestBase { public static final String GENERATOR_BASE_IMAGE = "migrations/elasticsearch_client_test_console:latest"; - public final static int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024; + public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024; public static final String SOURCE_SERVER_ALIAS = "source"; - public final static long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600; + public static final long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600; protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVersion baseSourceImage) { return new Object[] { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java index 21f55415b..cb3d8f408 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java @@ -19,7 +19,7 @@ public class InvalidResponse extends RfsException { private static final Pattern unknownSetting = Pattern.compile("unknown setting \\[(.+)\\].+"); private static final ObjectMapper objectMapper = new ObjectMapper(); - private final HttpResponse response; + private final transient HttpResponse response; public InvalidResponse(String message, HttpResponse response) { super(message); @@ -41,22 +41,18 @@ public Set getIllegalArguments() { errorBody.map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add); // Check root cause errors - errorBody.map(node -> node.get("root_cause")).ifPresent(nodes -> { + errorBody.map(node -> node.get("root_cause")).ifPresent(nodes -> nodes.forEach( - node -> { - Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add); - } - ); - }); + node -> Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add) + ) + ); // Check all suppressed errors - errorBody.map(node -> node.get("suppressed")).ifPresent(nodes -> { + errorBody.map(node -> node.get("suppressed")).ifPresent(nodes -> nodes.forEach( - node -> { - Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add); - } - ); - }); + node -> + Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add) + )); var onlyExpectedErrors = interimResults.stream() .map(Entry::getKey) diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java index 780b714d2..74d161b5e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java @@ -6,7 +6,6 @@ import java.util.concurrent.Callable; import java.util.function.Function; -import jdk.jshell.spi.ExecutionControl; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index 5047021df..ee4eb39f9 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -451,7 +451,7 @@ public String getFailureMessage() { } public static class OperationFailed extends RfsException { - public final HttpResponse response; + public final transient HttpResponse response; public OperationFailed(String message, HttpResponse response) { super(message +"\nBody:\n" + response); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java index bca74027a..52fd7f758 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java @@ -6,13 +6,10 @@ import java.util.Comparator; import java.util.Optional; -import lombok.extern.slf4j.Slf4j; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import org.opensearch.migrations.bulkload.models.ShardMetadata; import lombok.ToString; +import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java index 4e4d3c494..5c5c9987e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java @@ -6,12 +6,13 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.migrations.bulkload.tracing.IRfsContexts; +import lombok.Getter; + public abstract class SnapshotCreator { private static final Logger logger = LogManager.getLogger(SnapshotCreator.class); private static final ObjectMapper mapper = new ObjectMapper(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IRfsContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IRfsContexts.java index f2c51008e..d9d6ac606 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IRfsContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IRfsContexts.java @@ -3,22 +3,22 @@ import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; -public abstract class IRfsContexts { - public static class ActivityNames { +public interface IRfsContexts { + class ActivityNames { private ActivityNames() {} public static final String HTTP_REQUEST = "httpRequest"; public static final String CHECK_THEN_PUT_REQUESTS = "checkThenPutRequest"; } - public static class MetricNames { + class MetricNames { private MetricNames() {} public static final String BYTES_READ = "bytesRead"; public static final String BYTES_SENT = "bytesSent"; } - public interface IRequestContext extends IScopedInstrumentationAttributes { + interface IRequestContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.HTTP_REQUEST; void addBytesSent(int i); @@ -26,7 +26,7 @@ public interface IRequestContext extends IScopedInstrumentationAttributes { void addBytesRead(int i); } - public interface ICheckedIdempotentPutRequestContext extends IScopedInstrumentationAttributes { + interface ICheckedIdempotentPutRequestContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.CHECK_THEN_PUT_REQUESTS; IRequestContext createCheckRequestContext(); @@ -34,7 +34,7 @@ public interface ICheckedIdempotentPutRequestContext extends IScopedInstrumentat IRequestContext createPutContext(); } - public interface ICreateSnapshotContext extends IScopedInstrumentationAttributes { + interface ICreateSnapshotContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = IMetadataMigrationContexts.ActivityNames.CREATE_SNAPSHOT; IRequestContext createRegisterRequest(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java index e5796be32..3781e60c9 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java @@ -3,9 +3,9 @@ import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; -public abstract class IWorkCoordinationContexts { +public interface IWorkCoordinationContexts { - public static class ActivityNames { + class ActivityNames { public static final String COORDINATION_INITIALIZATION = "workCoordinationInitialization"; public static final String CREATE_UNASSIGNED_WORK_ITEM = "createUnassignedWork"; public static final String PENDING_WORK_CHECK = "pendingWorkCheck"; @@ -17,7 +17,7 @@ public static class ActivityNames { private ActivityNames() {} } - public static class MetricNames { + class MetricNames { public static final String NEXT_WORK_ASSIGNED = "nextWorkAssignedCount"; public static final String NO_NEXT_WORK_AVAILABLE = "noNextWorkAvailableCount"; public static final String RECOVERABLE_CLOCK_ERROR = "recoverableClockErrorCount"; @@ -26,37 +26,37 @@ public static class MetricNames { private MetricNames() {} } - public interface IRetryableActivityContext extends IScopedInstrumentationAttributes { + interface IRetryableActivityContext extends IScopedInstrumentationAttributes { void recordRetry(); void recordFailure(); } - public interface IInitializeCoordinatorStateContext extends IRetryableActivityContext { + interface IInitializeCoordinatorStateContext extends IRetryableActivityContext { String ACTIVITY_NAME = ActivityNames.COORDINATION_INITIALIZATION; } - public interface ICreateUnassignedWorkItemContext extends IRetryableActivityContext { + interface ICreateUnassignedWorkItemContext extends IRetryableActivityContext { String ACTIVITY_NAME = ActivityNames.CREATE_UNASSIGNED_WORK_ITEM; } - public interface IPendingWorkItemsContext extends IScopedInstrumentationAttributes { + interface IPendingWorkItemsContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.PENDING_WORK_CHECK; IRefreshContext getRefreshContext(); } - public interface IRefreshContext extends IRetryableActivityContext { + interface IRefreshContext extends IRetryableActivityContext { String ACTIVITY_NAME = ActivityNames.SYNC_REFRESH_CLUSTER; } - public interface IBaseAcquireWorkContext extends IRetryableActivityContext {} + interface IBaseAcquireWorkContext extends IRetryableActivityContext {} - public interface IAcquireSpecificWorkContext extends IBaseAcquireWorkContext { + interface IAcquireSpecificWorkContext extends IBaseAcquireWorkContext { String ACTIVITY_NAME = ActivityNames.ACQUIRE_SPECIFIC_WORK; } - public interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext { + interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext { String ACTIVITY_NAME = ActivityNames.ACQUIRE_NEXT_WORK; IRefreshContext getRefreshContext(); @@ -70,13 +70,13 @@ public interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext { void recordFailure(OpenSearchWorkCoordinator.PotentialClockDriftDetectedException e); } - public interface ICompleteWorkItemContext extends IRetryableActivityContext { + interface ICompleteWorkItemContext extends IRetryableActivityContext { String ACTIVITY_NAME = ActivityNames.COMPLETE_WORK; IRefreshContext getRefreshContext(); } - public interface IScopedWorkContext extends IScopedInstrumentationAttributes { + interface IScopedWorkContext extends IScopedInstrumentationAttributes { C createOpeningContext(); ICompleteWorkItemContext createCloseContet(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RfsContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RfsContexts.java index 90e8a677f..925a0f3be 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RfsContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RfsContexts.java @@ -13,13 +13,11 @@ import lombok.Getter; import lombok.NonNull; -public class RfsContexts extends IRfsContexts { +public interface RfsContexts extends IRfsContexts { - private RfsContexts() {} + String COUNT_UNITS = "count"; - public static final String COUNT_UNITS = "count"; - - public static class GenericRequestContext extends BaseSpanContext + class GenericRequestContext extends BaseSpanContext implements IRfsContexts.IRequestContext { @@ -97,7 +95,7 @@ public void addBytesRead(int i) { } } - public static class CheckedIdempotentPutRequestContext extends BaseSpanContext + class CheckedIdempotentPutRequestContext extends BaseSpanContext implements IRfsContexts.ICheckedIdempotentPutRequestContext { @Getter diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java index c86c9741a..e8cc5fafe 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java @@ -12,11 +12,9 @@ import lombok.Getter; import lombok.NonNull; -public class WorkCoordinationContexts extends IWorkCoordinationContexts { - private WorkCoordinationContexts() {} - +public interface WorkCoordinationContexts extends IWorkCoordinationContexts { @AllArgsConstructor - public static class RetryLabels { + class RetryLabels { CommonScopedMetricInstruments.ScopeLabels scopeLabels; public final String retry; public final String failure; @@ -30,7 +28,7 @@ private static RetryLabels autoLabels(String activityName) { ); } - public static class RetryMetricInstruments extends CommonScopedMetricInstruments { + class RetryMetricInstruments extends CommonScopedMetricInstruments { public final LongCounter retryCounter; public final LongCounter failureCounter; @@ -41,7 +39,7 @@ private RetryMetricInstruments(Meter meter, RetryLabels retryLabels) { } } - public interface RetryableActivityContextMetricMixin + interface RetryableActivityContextMetricMixin extends IRetryableActivityContext { T getRetryMetrics(); @@ -60,7 +58,7 @@ default void recordFailure() { } @Getter - public static class InitializeCoordinatorStateContext extends BaseSpanContext + class InitializeCoordinatorStateContext extends BaseSpanContext implements IInitializeCoordinatorStateContext, RetryableActivityContextMetricMixin { @@ -101,7 +99,7 @@ public MetricInstruments getRetryMetrics() { } @Getter - public static class CreateUnassignedWorkItemContext extends BaseSpanContext + class CreateUnassignedWorkItemContext extends BaseSpanContext implements ICreateUnassignedWorkItemContext, RetryableActivityContextMetricMixin { @@ -138,7 +136,7 @@ public MetricInstruments getRetryMetrics() { } @Getter - public static class PendingItems extends BaseSpanContext + class PendingItems extends BaseSpanContext implements IPendingWorkItemsContext { final IScopedInstrumentationAttributes enclosingScope; @@ -180,7 +178,7 @@ public MetricInstruments getMetrics() { } @Getter - public static class Refresh extends BaseSpanContext + class Refresh extends BaseSpanContext implements IRefreshContext, RetryableActivityContextMetricMixin { @@ -217,7 +215,7 @@ public MetricInstruments getRetryMetrics() { } @Getter - public static class AcquireSpecificWorkContext extends BaseSpanContext + class AcquireSpecificWorkContext extends BaseSpanContext implements IAcquireSpecificWorkContext, RetryableActivityContextMetricMixin { @@ -254,7 +252,7 @@ public MetricInstruments getRetryMetrics() { } @Getter - public static class AcquireNextWorkItemContext extends BaseSpanContext + class AcquireNextWorkItemContext extends BaseSpanContext implements IAcquireNextWorkItemContext, RetryableActivityContextMetricMixin { @@ -326,7 +324,7 @@ public void recordFailure(OpenSearchWorkCoordinator.PotentialClockDriftDetectedE } @Getter - public static class CompleteWorkItemContext extends BaseSpanContext + class CompleteWorkItemContext extends BaseSpanContext implements ICompleteWorkItemContext, RetryableActivityContextMetricMixin { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java index 845da3d2f..c8a19369a 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java @@ -9,6 +9,12 @@ public class TransformFunctions { private static final ObjectMapper mapper = new ObjectMapper(); + public static final String MAPPINGS_KEY_STR = "mappings"; + public static final String SETTINGS_KEY_STR = "settings"; + public static final String NUMBER_OF_REPLICAS_KEY_STR = "number_of_replicas"; + public static final String INDEX_KEY_STR = "index"; + + private TransformFunctions() {} public static Transformer getTransformer( Version sourceVersion, @@ -61,13 +67,11 @@ public static ObjectNode convertFlatSettingsToTree(ObjectNode flatSettings) { * - [{"audit_message":{"properties":{"address":{"type":"text"}}}}] */ public static void removeIntermediateMappingsLevels(ObjectNode root) { - if (root.has("mappings")) { - try { - ArrayNode mappingsList = (ArrayNode) root.get("mappings"); - root.set("mappings", getMappingsFromBeneathIntermediate(mappingsList)); - } catch (ClassCastException e) { - // mappings isn't an array - return; + if (root.has(MAPPINGS_KEY_STR)) { + var val = root.get(MAPPINGS_KEY_STR); + if (val instanceof ArrayNode) { + ArrayNode mappingsList = (ArrayNode) val; + root.set(MAPPINGS_KEY_STR, getMappingsFromBeneathIntermediate(mappingsList)); } } } @@ -94,13 +98,13 @@ public static ObjectNode getMappingsFromBeneathIntermediate(ArrayNode mappingsRo public static void removeIntermediateIndexSettingsLevel(ObjectNode root) { // Remove the intermediate key "index" under "settings", will start like: // {"index":{"number_of_shards":"1","number_of_replicas":"1"}} - if (root.has("settings")) { - ObjectNode settingsRoot = (ObjectNode) root.get("settings"); - if (settingsRoot.has("index")) { - ObjectNode indexSettingsRoot = (ObjectNode) settingsRoot.get("index"); + if (root.has(SETTINGS_KEY_STR)) { + ObjectNode settingsRoot = (ObjectNode) root.get(SETTINGS_KEY_STR); + if (settingsRoot.has(INDEX_KEY_STR)) { + ObjectNode indexSettingsRoot = (ObjectNode) settingsRoot.get(INDEX_KEY_STR); settingsRoot.setAll(indexSettingsRoot); - settingsRoot.remove("index"); - root.set("settings", settingsRoot); + settingsRoot.remove(INDEX_KEY_STR); + root.set(SETTINGS_KEY_STR, settingsRoot); } } } @@ -112,20 +116,20 @@ public static void removeIntermediateIndexSettingsLevel(ObjectNode root) { * the minimum number of replicas being 2. */ public static void fixReplicasForDimensionality(ObjectNode root, int dimensionality) { - if (root.has("settings")) { - ObjectNode settingsRoot = (ObjectNode) root.get("settings"); - if (settingsRoot.has("number_of_replicas")) { + if (root.has(SETTINGS_KEY_STR)) { + ObjectNode settingsRoot = (ObjectNode) root.get(SETTINGS_KEY_STR); + if (settingsRoot.has(NUMBER_OF_REPLICAS_KEY_STR)) { // dimensionality must be at least 1 dimensionality = Math.max(dimensionality, 1); // If the total number of copies requested in the original settings is not a multiple of the // dimensionality, then up it to the next largest multiple of the dimensionality. - int numberOfCopies = settingsRoot.get("number_of_replicas").asInt() + 1; + int numberOfCopies = settingsRoot.get(NUMBER_OF_REPLICAS_KEY_STR).asInt() + 1; int remainder = numberOfCopies % dimensionality; int newNumberOfCopies = (remainder > 0) ? (numberOfCopies + dimensionality - remainder) : numberOfCopies; int newNumberOfReplicas = newNumberOfCopies - 1; - settingsRoot.put("number_of_replicas", newNumberOfReplicas); + settingsRoot.put(NUMBER_OF_REPLICAS_KEY_STR, newNumberOfReplicas); } } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_6_8_to_OS_2_11.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_6_8_to_OS_2_11.java index 18997190c..16087ac2f 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_6_8_to_OS_2_11.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_6_8_to_OS_2_11.java @@ -4,8 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.migrations.bulkload.models.GlobalMetadata; import org.opensearch.migrations.bulkload.models.IndexMetadata; @@ -15,14 +13,16 @@ import org.opensearch.migrations.transformation.entity.Index; import org.opensearch.migrations.transformation.rules.IndexMappingTypeRemoval; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class Transformer_ES_6_8_to_OS_2_11 implements Transformer { - private static final Logger logger = LogManager.getLogger(Transformer_ES_6_8_to_OS_2_11.class); private static final ObjectMapper mapper = new ObjectMapper(); private final List> indexTransformations = List.of(new IndexMappingTypeRemoval()); private final List> indexTemplateTransformations = List.of(new IndexMappingTypeRemoval()); - private int awarenessAttributeDimensionality; + private final int awarenessAttributeDimensionality; public Transformer_ES_6_8_to_OS_2_11(int awarenessAttributeDimensionality) { this.awarenessAttributeDimensionality = awarenessAttributeDimensionality; @@ -39,7 +39,7 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) { templatesRoot.fields().forEachRemaining(template -> { var templateCopy = (ObjectNode) template.getValue().deepCopy(); var indexTemplate = (Index) () -> templateCopy; - transformIndex(indexTemplate, IndexType.Template); + transformIndex(indexTemplate, IndexType.TEMPLATE); templates.set(template.getKey(), indexTemplate.getRawJson()); }); newRoot.set("templates", templates); @@ -63,32 +63,34 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) { @Override public IndexMetadata transformIndexMetadata(IndexMetadata index) { var copy = index.deepCopy(); - transformIndex(copy, IndexType.Concrete); + transformIndex(copy, IndexType.CONCRETE); return new IndexMetadataData_OS_2_11(copy.getRawJson(), copy.getId(), copy.getName()); } private void transformIndex(Index index, IndexType type) { - logger.debug("Original Object: " + index.getRawJson().toString()); + log.atDebug().setMessage(()->"Original Object: {}").addArgument(index.getRawJson().toString()).log(); var newRoot = index.getRawJson(); switch (type) { - case Concrete: + case CONCRETE: indexTransformations.forEach(transformer -> transformer.applyTransformation(index)); break; - case Template: + case TEMPLATE: indexTemplateTransformations.forEach(transformer -> transformer.applyTransformation(index)); break; + default: + throw new IllegalArgumentException("Unknown type: " + type); } newRoot.set("settings", TransformFunctions.convertFlatSettingsToTree((ObjectNode) newRoot.get("settings"))); TransformFunctions.removeIntermediateIndexSettingsLevel(newRoot); // run before fixNumberOfReplicas TransformFunctions.fixReplicasForDimensionality(newRoot, awarenessAttributeDimensionality); - logger.debug("Transformed Object: " + newRoot.toString()); + log.atDebug().setMessage(()->"Transformed Object: {}").addArgument(newRoot.toString()).log(); } private enum IndexType { - Concrete, - Template; + CONCRETE, + TEMPLATE; } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_7_10_OS_2_11.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_7_10_OS_2_11.java index 51f036fb8..429748429 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_7_10_OS_2_11.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_7_10_OS_2_11.java @@ -1,18 +1,19 @@ package org.opensearch.migrations.bulkload.transformers; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.migrations.bulkload.models.GlobalMetadata; import org.opensearch.migrations.bulkload.models.IndexMetadata; import org.opensearch.migrations.bulkload.version_os_2_11.GlobalMetadataData_OS_2_11; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class Transformer_ES_7_10_OS_2_11 implements Transformer { - private static final Logger logger = LogManager.getLogger(Transformer_ES_7_10_OS_2_11.class); - private static final ObjectMapper mapper = new ObjectMapper(); - private int awarenessAttributeDimensionality; + public static final String INDEX_TEMPLATE_KEY_STR = "index_template"; + public static final String TEMPLATES_KEY_STR = "templates"; + public static final String COMPONENT_TEMPLATE_KEY_STR = "component_template"; + private final int awarenessAttributeDimensionality; public Transformer_ES_7_10_OS_2_11(int awarenessAttributeDimensionality) { this.awarenessAttributeDimensionality = awarenessAttributeDimensionality; @@ -23,24 +24,24 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) { ObjectNode root = metaData.toObjectNode().deepCopy(); // Transform the legacy templates - if (root.get("templates") != null) { - ObjectNode templatesRoot = (ObjectNode) root.get("templates").deepCopy(); + if (root.get(TEMPLATES_KEY_STR) != null) { + ObjectNode templatesRoot = (ObjectNode) root.get(TEMPLATES_KEY_STR).deepCopy(); templatesRoot.fieldNames().forEachRemaining(templateName -> { ObjectNode template = (ObjectNode) templatesRoot.get(templateName); - logger.info("Transforming template: " + templateName); - logger.debug("Original template: " + template.toString()); + log.atInfo().setMessage("Transforming template: {}").addArgument(templateName).log(); + log.atDebug().setMessage("Original template: {}").addArgument(template).log(); TransformFunctions.removeIntermediateIndexSettingsLevel(template); // run before fixNumberOfReplicas TransformFunctions.fixReplicasForDimensionality(templatesRoot, awarenessAttributeDimensionality); - logger.debug("Transformed template: " + template.toString()); + log.atDebug().setMessage("Transformed template: {}").addArgument(template).log(); templatesRoot.set(templateName, template); }); - root.set("templates", templatesRoot); + root.set(TEMPLATES_KEY_STR, templatesRoot); } // Transform the index templates - if (root.get("index_template") != null) { - ObjectNode indexTemplatesRoot = (ObjectNode) root.get("index_template").deepCopy(); - ObjectNode indexTemplateValuesRoot = (ObjectNode) indexTemplatesRoot.get("index_template"); + if (root.get(INDEX_TEMPLATE_KEY_STR) != null) { + ObjectNode indexTemplatesRoot = (ObjectNode) root.get(INDEX_TEMPLATE_KEY_STR).deepCopy(); + ObjectNode indexTemplateValuesRoot = (ObjectNode) indexTemplatesRoot.get(INDEX_TEMPLATE_KEY_STR); indexTemplateValuesRoot.fieldNames().forEachRemaining(templateName -> { ObjectNode template = (ObjectNode) indexTemplateValuesRoot.get(templateName); ObjectNode templateSubRoot = (ObjectNode) template.get("template"); @@ -49,21 +50,21 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) { return; } - logger.info("Transforming template: " + templateName); - logger.debug("Original template: " + template.toString()); + log.atInfo().setMessage("Transforming index template: {}").addArgument(templateName).log(); + log.atDebug().setMessage("Original index template: {}").addArgument(template).log(); TransformFunctions.removeIntermediateIndexSettingsLevel(templateSubRoot); // run before // fixNumberOfReplicas TransformFunctions.fixReplicasForDimensionality(templateSubRoot, awarenessAttributeDimensionality); - logger.debug("Transformed template: " + template.toString()); + log.atDebug().setMessage("Transformed index template: {}").addArgument(template).log(); indexTemplateValuesRoot.set(templateName, template); }); - root.set("index_template", indexTemplatesRoot); + root.set(INDEX_TEMPLATE_KEY_STR, indexTemplatesRoot); } // Transform the component templates - if (root.get("component_template") != null) { - ObjectNode componentTemplatesRoot = (ObjectNode) root.get("component_template").deepCopy(); - ObjectNode componentTemplateValuesRoot = (ObjectNode) componentTemplatesRoot.get("component_template"); + if (root.get(COMPONENT_TEMPLATE_KEY_STR) != null) { + ObjectNode componentTemplatesRoot = (ObjectNode) root.get(COMPONENT_TEMPLATE_KEY_STR).deepCopy(); + ObjectNode componentTemplateValuesRoot = (ObjectNode) componentTemplatesRoot.get(COMPONENT_TEMPLATE_KEY_STR); componentTemplateValuesRoot.fieldNames().forEachRemaining(templateName -> { ObjectNode template = (ObjectNode) componentTemplateValuesRoot.get(templateName); ObjectNode templateSubRoot = (ObjectNode) template.get("template"); @@ -72,13 +73,13 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) { return; } - logger.info("Transforming template: " + templateName); - logger.debug("Original template: " + template.toString()); + log.atInfo().setMessage("Transforming component template: {}").addArgument(templateName).log(); + log.atDebug().setMessage("Original component template: {}").addArgument(template).log(); // No transformation needed for component templates - logger.debug("Transformed template: " + template.toString()); + log.atDebug().setMessage("Transformed component template: {}").addArgument(template).log(); componentTemplateValuesRoot.set(templateName, template); }); - root.set("component_template", componentTemplatesRoot); + root.set(COMPONENT_TEMPLATE_KEY_STR, componentTemplatesRoot); } return new GlobalMetadataData_OS_2_11(root); @@ -86,7 +87,7 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) { @Override public IndexMetadata transformIndexMetadata(IndexMetadata indexData) { - logger.debug("Original Object: " + indexData.getRawJson().toString()); + log.atDebug().setMessage("Original Object: {}").addArgument(indexData.getRawJson()).log(); var copy = indexData.deepCopy(); var newRoot = copy.getRawJson(); @@ -96,7 +97,7 @@ public IndexMetadata transformIndexMetadata(IndexMetadata indexData) { TransformFunctions.removeIntermediateIndexSettingsLevel(newRoot); // run before fixNumberOfReplicas TransformFunctions.fixReplicasForDimensionality(newRoot, awarenessAttributeDimensionality); - logger.debug("Transformed Object: " + newRoot.toString()); + log.atDebug().setMessage("Transformed Object: {}").addArgument(newRoot).log(); return copy; } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotRepoProvider_ES_7_10.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotRepoProvider_ES_7_10.java index e6a72c4b2..adac50139 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotRepoProvider_ES_7_10.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotRepoProvider_ES_7_10.java @@ -38,13 +38,12 @@ public List getIndicesInSnapshot(String snapshotName) { .orElse(null); if (targetSnapshot != null) { - targetSnapshot.getIndexMetadataLookup().keySet().forEach(indexId -> { + targetSnapshot.getIndexMetadataLookup().keySet().forEach(indexId -> getRepoData().getIndices().forEach((indexName, rawIndex) -> { if (indexId.equals(rawIndex.getId())) { matchedIndices.add(SnapshotRepoData_ES_7_10.Index.fromRawIndex(indexName, rawIndex)); } - }); - }); + })); } return matchedIndices; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteDataProvider.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteDataProvider.java index a48019486..9cddfcb59 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteDataProvider.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteDataProvider.java @@ -24,9 +24,7 @@ public List getSnapshots() { @Override public List getIndicesInSnapshot(String snapshotName) { var indexes = new ArrayList(); - indexData.fields().forEachRemaining(index -> { - indexes.add(new RemoteIndexSnapshotData(index.getKey())); - }); + indexData.fields().forEachRemaining(index -> indexes.add(new RemoteIndexSnapshotData(index.getKey()))); return indexes; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java index 47a8abf57..e52576899 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java @@ -89,16 +89,14 @@ public ObjectNode getIndexes() { ObjectNode combineIndexDetails(List indexDetailsResponse) { var combinedDetails = objectMapper.createObjectNode(); - indexDetailsResponse.stream().forEach(detailsResponse -> { + indexDetailsResponse.stream().forEach(detailsResponse -> detailsResponse.fields().forEachRemaining(indexDetails -> { var indexName = indexDetails.getKey(); combinedDetails.putIfAbsent(indexName, objectMapper.createObjectNode()); var existingIndexDetails = (ObjectNode)combinedDetails.get(indexName); - indexDetails.getValue().fields().forEachRemaining(details -> { - existingIndexDetails.set(details.getKey(), details.getValue()); - }); - }); - }); + indexDetails.getValue().fields().forEachRemaining(details -> + existingIndexDetails.set(details.getKey(), details.getValue())); + })); return combinedDetails; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index 480b882e6..4ccd61780 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -27,11 +27,11 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator { public static final String INDEX_NAME = ".migrations_working_state"; public static final int MAX_REFRESH_RETRIES = 6; public static final int MAX_SETUP_RETRIES = 6; - final long ACQUIRE_WORK_RETRY_BASE_MS = 10; + static final long ACQUIRE_WORK_RETRY_BASE_MS = 10; // we'll retry lease acquisitions for up to - final int MAX_DRIFT_RETRIES = 13; // last delay before failure: 40 seconds - final int MAX_MALFORMED_ASSIGNED_WORK_DOC_RETRIES = 17; // last delay before failure: 655.36 seconds - final int MAX_ASSIGNED_DOCUMENT_NOT_FOUND_RETRY_INTERVAL = 60 * 1000; + static final int MAX_DRIFT_RETRIES = 13; // last delay before failure: 40 seconds + static final int MAX_MALFORMED_ASSIGNED_WORK_DOC_RETRIES = 17; // last delay before failure: 655.36 seconds + static final int MAX_ASSIGNED_DOCUMENT_NOT_FOUND_RETRY_INTERVAL = 60 * 1000; public static final String SCRIPT_VERSION_TEMPLATE = "{SCRIPT_VERSION}"; public static final String WORKER_ID_TEMPLATE = "{WORKER_ID}"; @@ -115,6 +115,9 @@ public OpenSearchWorkCoordinator( this.objectMapper = new ObjectMapper(); } + /** + * IWorkCoordinator extends AutoCloseable but this class has no resources that it owns that need to be closed. + */ @Override public void close() throws Exception { } @@ -130,14 +133,10 @@ public void setup(Supplier 0 && " + // don't obtain a lease lock - " ctx._source." - + COMPLETED_AT_FIELD_NAME - + " == null) {" + " ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {" + // already done - " if (ctx._source." - + LEASE_HOLDER_ID_FIELD_NAME - + " == params.workerId && " - + " ctx._source." - + EXPIRATION_FIELD_NAME - + " > serverTimeSeconds) {" + " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " + + " ctx._source." + EXPIRATION_FIELD_NAME + " > serverTimeSeconds) {" + // count as an update to force the caller to lookup the expiration time, but no need to modify it " ctx.op = \\\"update\\\";" - + " } else if (ctx._source." - + EXPIRATION_FIELD_NAME - + " < serverTimeSeconds && " - + // is expired - " ctx._source." - + EXPIRATION_FIELD_NAME - + " < newExpiration) {" - + // sanity check - " ctx._source." - + EXPIRATION_FIELD_NAME - + " = newExpiration;" - + " ctx._source." - + LEASE_HOLDER_ID_FIELD_NAME - + " = params.workerId;" + + " } else if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " + // is expired + " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" + // sanity check + " ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;" + + " ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " = params.workerId;" + " ctx._source.numAttempts += 1;" + " } else {" + " ctx.op = \\\"noop\\\";" @@ -394,29 +363,18 @@ public void completeWorkItem( + " \"script\": {\n" + " \"lang\": \"painless\",\n" + " \"params\": { \n" - + " \"clientTimestamp\": " - + CLIENT_TIMESTAMP_TEMPLATE - + ",\n" - + " \"workerId\": \"" - + WORKER_ID_TEMPLATE - + "\"\n" + + " \"clientTimestamp\": " + CLIENT_TIMESTAMP_TEMPLATE + ",\n" + + " \"workerId\": \"" + WORKER_ID_TEMPLATE + "\"\n" + " },\n" + " \"source\": \"" - + " if (ctx._source.scriptVersion != \\\"" - + SCRIPT_VERSION_TEMPLATE - + "\\\") {" + + " if (ctx._source.scriptVersion != \\\"" + SCRIPT_VERSION_TEMPLATE + "\\\") {" + " throw new IllegalArgumentException(\\\"scriptVersion mismatch. Not all participants are using the same script: sourceVersion=\\\" + ctx.source.scriptVersion);" + " } " - + " if (ctx._source." - + LEASE_HOLDER_ID_FIELD_NAME - + " != params.workerId) {" + + " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " != params.workerId) {" + " throw new IllegalArgumentException(\\\"work item was owned by \\\" + ctx._source." - + LEASE_HOLDER_ID_FIELD_NAME - + " + \\\" not \\\" + params.workerId);" + + LEASE_HOLDER_ID_FIELD_NAME + " + \\\" not \\\" + params.workerId);" + " } else {" - + " ctx._source." - + COMPLETED_AT_FIELD_NAME - + " = System.currentTimeMillis() / 1000;" + + " ctx._source." + COMPLETED_AT_FIELD_NAME + " = System.currentTimeMillis() / 1000;" + " }" + "\"\n" + " }\n" @@ -460,16 +418,12 @@ private int numWorkItemsArePending( + " \"bool\": {" + " \"must\": [" + " { \"exists\":" - + " { \"field\": \"" - + EXPIRATION_FIELD_NAME - + "\"}" + + " { \"field\": \"" + EXPIRATION_FIELD_NAME + "\"}" + " }" + " ]," + " \"must_not\": [" + " { \"exists\":" - + " { \"field\": \"" - + COMPLETED_AT_FIELD_NAME - + "\"}" + + " { \"field\": \"" + COMPLETED_AT_FIELD_NAME + "\"}" + " }" + " ]" + " }" @@ -522,8 +476,7 @@ UpdateResult assignOneWorkItem(long expirationWindowSeconds) throws IOException + "\"query\": {" + " \"function_score\": {\n" + QUERY_INCOMPLETE_EXPIRED_ITEMS_STR + "," + " \"random_score\": {},\n" - + " \"boost_mode\": \"replace\"\n" - + // Try to avoid the workers fighting for the same work items + + " \"boost_mode\": \"replace\"\n" + // Try to avoid the workers fighting for the same work items " }" + "}," + "\"size\": 1,\n" @@ -543,10 +496,8 @@ UpdateResult assignOneWorkItem(long expirationWindowSeconds) throws IOException + " throw new IllegalArgumentException(\\\"The current times indicated between the client and server are too different.\\\");" + " }" + " long newExpiration = params.clientTimestamp + (((long)Math.pow(2, ctx._source.numAttempts)) * params.expirationWindow);" - + " if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " - + // is expired - " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" - + // sanity check + + " if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " + // is expired + " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" + // sanity check " ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;" + " ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " = params.workerId;" + " ctx._source.numAttempts += 1;" @@ -683,7 +634,7 @@ private WorkItemAndDuration getAssignedWorkItem(LeaseChecker leaseChecker, ctx.addTraceException(e, false); var sleepBeforeNextRetryDuration = Duration.ofMillis( Math.min(MAX_ASSIGNED_DOCUMENT_NOT_FOUND_RETRY_INTERVAL, - (long) (Math.pow(2.0, (double) (retries-1)) * ACQUIRE_WORK_RETRY_BASE_MS))); + (long) (Math.pow(2.0, (retries-1)) * ACQUIRE_WORK_RETRY_BASE_MS))); leaseChecker.checkRetryWaitTimeOrThrow(e, retries-1, sleepBeforeNextRetryDuration); log.atWarn().setMessage(() -> "Couldn't complete work assignment due to exception. " @@ -695,8 +646,8 @@ private WorkItemAndDuration getAssignedWorkItem(LeaseChecker leaseChecker, @AllArgsConstructor private static class MaxTriesExceededException extends Exception { - Object suppliedValue; - Object transformedValue; + final transient Object suppliedValue; + final transient Object transformedValue; } @Getter diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java index 68374ee4c..69f061f82 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java @@ -29,7 +29,7 @@ public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexConte // TODO - parallelize this, maybe ~400-1K requests per thread and do it asynchronously BiConsumer logger = (indexName, accepted) -> { - if (!accepted) { + if (Boolean.FALSE.equals(accepted)) { log.info("Index " + indexName + " rejected by allowlist"); } }; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java index e10689dff..7e79418ae 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java @@ -100,7 +100,7 @@ private static void prepareShardWorkItems( SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider(); BiConsumer logger = (indexName, accepted) -> { - if (!accepted) { + if (Boolean.FALSE.equals(accepted)) { log.info("Index " + indexName + " rejected by allowlist"); } }; diff --git a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java index 47a567f37..1981b1779 100644 --- a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java +++ b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java @@ -31,7 +31,7 @@ private List getProviders() { } /** - * Gets a snapshot resource provider for the given version and source repo + * Gets a snapshot resource provider for the given version and source repo * @param version The version of the source cluster * @param repo The source repo that contains of the snapshot * @return The snapshot resource provider diff --git a/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/IMetadataMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/IMetadataMigrationContexts.java index 5aad062f6..60fffdd03 100644 --- a/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/IMetadataMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/IMetadataMigrationContexts.java @@ -3,13 +3,13 @@ import org.opensearch.migrations.bulkload.tracing.IRfsContexts; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; -public abstract class IMetadataMigrationContexts { +public interface IMetadataMigrationContexts { - public interface ITemplateContext extends IRfsContexts.ICheckedIdempotentPutRequestContext { + interface ITemplateContext extends IRfsContexts.ICheckedIdempotentPutRequestContext { String ACTIVITY_NAME = ActivityNames.MIGRATE_INDEX_TEMPLATE; } - public interface IClusterMetadataContext extends IScopedInstrumentationAttributes { + interface IClusterMetadataContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.MIGRATE_METADATA; ITemplateContext createMigrateLegacyTemplateContext(); @@ -19,11 +19,11 @@ public interface IClusterMetadataContext extends IScopedInstrumentationAttribute IRfsContexts.ICheckedIdempotentPutRequestContext createMigrateTemplateContext(); } - public interface ICreateIndexContext extends IRfsContexts.ICheckedIdempotentPutRequestContext { + interface ICreateIndexContext extends IRfsContexts.ICheckedIdempotentPutRequestContext { String ACTIVITY_NAME = ActivityNames.CREATE_INDEX; } - public static class ActivityNames { + class ActivityNames { public static final String CREATE_SNAPSHOT = "createSnapshot"; public static final String CREATE_INDEX = "createIndex"; public static final String MIGRATE_METADATA = "migrateMetadata"; @@ -32,7 +32,7 @@ public static class ActivityNames { private ActivityNames() {} } - public static class MetricNames { + class MetricNames { private MetricNames() {} } diff --git a/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/MetadataMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/MetadataMigrationContexts.java index 2af5be910..4387305bf 100644 --- a/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/MetadataMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/MetadataMigrationContexts.java @@ -11,8 +11,8 @@ import lombok.NonNull; -public class MetadataMigrationContexts { - public static class ClusterMetadataContext extends BaseSpanContext +public interface MetadataMigrationContexts { + class ClusterMetadataContext extends BaseSpanContext implements IMetadataMigrationContexts.IClusterMetadataContext { @@ -70,7 +70,7 @@ public IRfsContexts.ICheckedIdempotentPutRequestContext createMigrateTemplateCon } } - public static class MigrateTemplateContext extends BaseNestedSpanContext< + class MigrateTemplateContext extends BaseNestedSpanContext< RootMetadataMigrationContext, IMetadataMigrationContexts.IClusterMetadataContext> implements IMetadataMigrationContexts.ITemplateContext { @@ -113,7 +113,7 @@ public IRfsContexts.IRequestContext createPutContext() { } } - public static class CreateIndexContext extends BaseSpanContext + class CreateIndexContext extends BaseSpanContext implements IMetadataMigrationContexts.ICreateIndexContext { diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java index ef03e882f..7f3ffb246 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java @@ -13,9 +13,9 @@ import lombok.NonNull; -public class DocumentMigrationContexts extends IDocumentMigrationContexts { +public interface DocumentMigrationContexts extends IDocumentMigrationContexts { - public static abstract class BaseDocumentMigrationContext extends BaseSpanContext { + abstract class BaseDocumentMigrationContext extends BaseSpanContext { protected BaseDocumentMigrationContext(RootDocumentMigrationContext rootScope) { super(rootScope); } @@ -25,7 +25,7 @@ public RootWorkCoordinationContext getWorkCoordinationRootContext() { } } - public static class ShardSetupAttemptContext extends BaseDocumentMigrationContext + class ShardSetupAttemptContext extends BaseDocumentMigrationContext implements IShardSetupAttemptContext { protected ShardSetupAttemptContext(RootDocumentMigrationContext rootScope) { @@ -75,7 +75,7 @@ public IAddShardWorkItemContext createShardWorkItemContext() { } } - public static class AddShardWorkItemContext extends BaseNestedSpanContext< + class AddShardWorkItemContext extends BaseNestedSpanContext< RootDocumentMigrationContext, IShardSetupAttemptContext> implements IAddShardWorkItemContext { @@ -115,7 +115,7 @@ public IWorkCoordinationContexts.ICreateUnassignedWorkItemContext createUnassign } - public static class DocumentReindexContext extends BaseDocumentMigrationContext implements IDocumentReindexContext { + class DocumentReindexContext extends BaseDocumentMigrationContext implements IDocumentReindexContext { protected DocumentReindexContext(RootDocumentMigrationContext rootScope) { super(rootScope); diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java index 78d4ed42b..9a3cb580d 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java @@ -4,9 +4,9 @@ import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; -public abstract class IDocumentMigrationContexts { +public interface IDocumentMigrationContexts { - public static class ActivityNames { + class ActivityNames { private ActivityNames() {} public static final String DOCUMENT_REINDEX = "documentReindex"; @@ -14,11 +14,11 @@ private ActivityNames() {} public static final String ADD_SHARD_WORK_ITEM = "addShardWorkItem"; } - public static class MetricNames { + class MetricNames { private MetricNames() {} } - public interface IShardSetupAttemptContext extends IScopedInstrumentationAttributes { + interface IShardSetupAttemptContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.SHARD_SETUP_ATTEMPT; IWorkCoordinationContexts.IAcquireSpecificWorkContext createWorkAcquisitionContext(); @@ -28,13 +28,13 @@ public interface IShardSetupAttemptContext extends IScopedInstrumentationAttribu IAddShardWorkItemContext createShardWorkItemContext(); } - public interface IAddShardWorkItemContext extends IScopedInstrumentationAttributes { + interface IAddShardWorkItemContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.ADD_SHARD_WORK_ITEM; IWorkCoordinationContexts.ICreateUnassignedWorkItemContext createUnassignedWorkItemContext(); } - public interface IDocumentReindexContext + interface IDocumentReindexContext extends IWorkCoordinationContexts.IScopedWorkContext { String ACTIVITY_NAME = ActivityNames.DOCUMENT_REINDEX; diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index c8bf853a7..2f080f403 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -49,8 +49,8 @@ class StreamChannelConnectionCaptureSerializerTest { // Reference Timestamp chosen in the future with nanosecond precision resemble an upper bound on space overhead public static final Instant REFERENCE_TIMESTAMP = Instant.parse("2999-01-01T23:59:59.98765432Z"); - private final static String FAKE_EXCEPTION_DATA = "abcdefghijklmnop"; - private final static String FAKE_READ_PACKET_DATA = "ABCDEFGHIJKLMNOP"; + private static final String FAKE_EXCEPTION_DATA = "abcdefghijklmnop"; + private static final String FAKE_READ_PACKET_DATA = "ABCDEFGHIJKLMNOP"; private static int getEstimatedTrafficStreamByteSize(int readWriteEventCount, int averageDataPacketSize) { var fixedTimestamp = Timestamp.newBuilder() diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/IWireCaptureContexts.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/IWireCaptureContexts.java index 4e02b0e9f..02b5ab430 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/IWireCaptureContexts.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/IWireCaptureContexts.java @@ -4,9 +4,9 @@ import org.opensearch.migrations.tracing.IWithTypedEnclosingScope; import org.opensearch.migrations.tracing.commoncontexts.IHttpTransactionContext; -public abstract class IWireCaptureContexts { +public interface IWireCaptureContexts { - public static class ActivityNames { + class ActivityNames { private ActivityNames() {} public static final String BLOCKED = "blocked"; @@ -15,7 +15,7 @@ private ActivityNames() {} public static final String GATHERING_RESPONSE = "gatheringResponse"; } - public static class MetricNames { + class MetricNames { private MetricNames() {} public static final String UNREGISTERED = "unregistered"; @@ -28,7 +28,7 @@ private MetricNames() {} public static final String BYTES_WRITTEN = "bytesWritten"; } - public interface ICapturingConnectionContext + interface ICapturingConnectionContext extends org.opensearch.migrations.tracing.commoncontexts.IConnectionContext { IHttpMessageContext createInitialRequestContext(); @@ -38,7 +38,7 @@ public interface ICapturingConnectionContext void onRemoved(); } - public interface IHttpMessageContext + interface IHttpMessageContext extends IHttpTransactionContext, IWithStartTimeAndAttributes, @@ -52,7 +52,7 @@ public interface IHttpMessageContext IRequestContext createNextRequestContext(); } - public interface IRequestContext extends IHttpMessageContext { + interface IRequestContext extends IHttpMessageContext { String ACTIVITY_NAME = ActivityNames.GATHERING_REQUEST; default String getActivityName() { @@ -68,7 +68,7 @@ default String getActivityName() { void onBytesRead(int size); } - public interface IBlockingContext extends IHttpMessageContext { + interface IBlockingContext extends IHttpMessageContext { String ACTIVITY_NAME = ActivityNames.BLOCKED; default String getActivityName() { @@ -76,7 +76,7 @@ default String getActivityName() { } } - public interface IWaitingForResponseContext extends IHttpMessageContext { + interface IWaitingForResponseContext extends IHttpMessageContext { String ACTIVITY_NAME = ActivityNames.WAITING_FOR_RESPONSE; default String getActivityName() { @@ -84,7 +84,7 @@ default String getActivityName() { } } - public interface IResponseContext extends IHttpMessageContext { + interface IResponseContext extends IHttpMessageContext { String ACTIVITY_NAME = ActivityNames.GATHERING_RESPONSE; default String getActivityName() { diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/WireCaptureContexts.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/WireCaptureContexts.java index c593a92c8..aec642c87 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/WireCaptureContexts.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/WireCaptureContexts.java @@ -10,11 +10,11 @@ import lombok.Getter; import lombok.NonNull; -public class WireCaptureContexts extends IWireCaptureContexts { - public static final String COUNT_UNITS = "count"; - public static final String BYTES_UNIT = "bytes"; +public interface WireCaptureContexts extends IWireCaptureContexts { + String COUNT_UNITS = "count"; + String BYTES_UNIT = "bytes"; - public static class ConnectionContext extends org.opensearch.migrations.trafficcapture.tracing.ConnectionContext + class ConnectionContext extends org.opensearch.migrations.trafficcapture.tracing.ConnectionContext implements IWireCaptureContexts.ICapturingConnectionContext { public ConnectionContext(IRootWireLoggingContext rootInstrumentationScope, String connectionId, String nodeId) { @@ -64,7 +64,7 @@ public void onRemoved() { } @Getter - public abstract static class HttpMessageContext extends BaseNestedSpanContext< + abstract static class HttpMessageContext extends BaseNestedSpanContext< RootWireLoggingContext, IConnectionContext> implements IWireCaptureContexts.IHttpMessageContext { @@ -118,7 +118,7 @@ public IWireCaptureContexts.IRequestContext createNextRequestContext() { } } - public static class RequestContext extends HttpMessageContext implements IWireCaptureContexts.IRequestContext { + class RequestContext extends HttpMessageContext implements IWireCaptureContexts.IRequestContext { public RequestContext( RootWireLoggingContext rootWireLoggingContext, IConnectionContext enclosingScope, @@ -176,7 +176,7 @@ public void onBytesRead(int size) { } } - public static class BlockingContext extends HttpMessageContext implements IWireCaptureContexts.IBlockingContext { + class BlockingContext extends HttpMessageContext implements IWireCaptureContexts.IBlockingContext { public BlockingContext( RootWireLoggingContext rootWireLoggingContext, IConnectionContext enclosingScope, @@ -206,7 +206,7 @@ public RequestContext.MetricInstruments getMetrics() { } } - public static class WaitingForResponseContext extends HttpMessageContext + class WaitingForResponseContext extends HttpMessageContext implements IWireCaptureContexts.IWaitingForResponseContext { public WaitingForResponseContext( @@ -238,7 +238,7 @@ public RequestContext.MetricInstruments getMetrics() { } } - public static class ResponseContext extends HttpMessageContext implements IWireCaptureContexts.IResponseContext { + class ResponseContext extends HttpMessageContext implements IWireCaptureContexts.IResponseContext { public ResponseContext( RootWireLoggingContext rootWireLoggingContext, IConnectionContext enclosingScope, diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxySetupTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxySetupTest.java index 4e5ff226e..088a371e7 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxySetupTest.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxySetupTest.java @@ -14,7 +14,7 @@ public class CaptureProxySetupTest { - public final static String kafkaBrokerString = "invalid:9092"; + public static final String kafkaBrokerString = "invalid:9092"; public static final String TLS_PROTOCOLS_KEY = "plugins.security.ssl.http.enabled_protocols"; @Test diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java index 7523583c2..6579a2501 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java @@ -37,7 +37,7 @@ @Slf4j class NettyScanningHttpProxyTest { - private final static String EXPECTED_REQUEST_STRING = "GET / HTTP/1.1\r\n" + private static final String EXPECTED_REQUEST_STRING = "GET / HTTP/1.1\r\n" + "Host: localhost\r\n" + "User-Agent: UnitTest\r\n" + "DumbAndLongHeaderValue-0: 0\r\n" @@ -63,7 +63,7 @@ class NettyScanningHttpProxyTest { + "Accept-Encoding: gzip, x-gzip, deflate\r\n" + "Connection: keep-alive\r\n" + "\r\n"; - private final static String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + private static final String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + "Content-transfer-encoding: chunked\r\n" + "Date: Thu, 08 Jun 2023 23:06:23 GMT\r\n" + // This should be OK since it's always the same length diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResponse.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResponse.java index 7cc561cc8..4de7951b0 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResponse.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResponse.java @@ -34,6 +34,7 @@ public Builder(Instant requestSendTime) { super(requestSendTime); } + @Override public AggregatedRawResponse build() { return new AggregatedRawResponse( rawResponse, diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResult.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResult.java index a62cd21d0..ff83b5ffe 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResult.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResult.java @@ -53,7 +53,7 @@ public B addErrorCause(Throwable t) { } public B addResponsePacket(byte[] packet) { - return (B) addResponsePacket(packet, Instant.now()); + return addResponsePacket(packet, Instant.now()); } public B addResponsePacket(byte[] packet, Instant timestamp) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/EndOfInput.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/EndOfInput.java index 4e145df4b..a879ba7e7 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/EndOfInput.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/EndOfInput.java @@ -6,4 +6,4 @@ * is closed. It allows the NettySendByteBufsToPacketHandlerHandler class to determine * whether all the contents were received or if there was an error in-flight. */ -public class EndOfInput {} +public final class EndOfInput {} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java index c25dbe5dd..dcf1ca62d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java @@ -54,6 +54,10 @@ public static boolean retryIsUnnecessaryGivenStatusCode(int statusCode) { return shouldRetry(targetRequestBytes, currentResponse, reconstructedSourceTransactionFuture); } + /** + * @param targetRequestBytes the raw request as it was sent to the target cluster, which can be useful because + * of the HTTP verb and path. + */ public TrackedFuture shouldRetry(ByteBuf targetRequestBytes, AggregatedRawResponse currentResponse, diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/IReplayContexts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/IReplayContexts.java index ac82fbf39..b380fc617 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/IReplayContexts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/IReplayContexts.java @@ -11,9 +11,9 @@ import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; import org.opensearch.migrations.tracing.IWithTypedEnclosingScope; -public abstract class IReplayContexts { +public interface IReplayContexts { - public static class ActivityNames { + class ActivityNames { private ActivityNames() {} public static final String CHANNEL = "channel"; @@ -33,7 +33,7 @@ private ActivityNames() {} public static final String TUPLE_COMPARISON = "comparingResults"; } - public static class MetricNames { + class MetricNames { private MetricNames() {} public static final String KAFKA_RECORD_READ = "kafkaRecordsRead"; @@ -68,9 +68,9 @@ private MetricNames() {} public static final String TUPLE_COMPARISON = "tupleComparison"; } - public interface IAccumulationScope extends IScopedInstrumentationAttributes {} + interface IAccumulationScope extends IScopedInstrumentationAttributes {} - public interface IChannelKeyContext + interface IChannelKeyContext extends IAccumulationScope, org.opensearch.migrations.tracing.commoncontexts.IConnectionContext { @@ -98,7 +98,7 @@ default String getNodeId() { void addFailedChannelCreation(); } - public interface ISocketContext extends IAccumulationScope, IWithTypedEnclosingScope { + interface ISocketContext extends IAccumulationScope, IWithTypedEnclosingScope { public static final String ACTIVITY_NAME = ActivityNames.TCP_CONNECTION; @Override @@ -107,7 +107,7 @@ default String getActivityName() { } } - public interface IKafkaRecordContext extends IAccumulationScope, IWithTypedEnclosingScope { + interface IKafkaRecordContext extends IAccumulationScope, IWithTypedEnclosingScope { String ACTIVITY_NAME = ActivityNames.RECORD_LIFETIME; @Override @@ -127,7 +127,7 @@ default AttributesBuilder fillAttributesForSpansBelow(AttributesBuilder builder) ITrafficStreamsLifecycleContext createTrafficLifecyleContext(ITrafficStreamKey tsk); } - public interface ITrafficStreamsLifecycleContext + interface ITrafficStreamsLifecycleContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -156,7 +156,7 @@ IReplayerHttpTransactionContext createHttpTransactionContext( ); } - public interface IReplayerHttpTransactionContext + interface IReplayerHttpTransactionContext extends org.opensearch.migrations.tracing.commoncontexts.IHttpTransactionContext, IAccumulationScope, @@ -212,7 +212,7 @@ default AttributesBuilder fillAttributesForSpansBelow(AttributesBuilder builder) ITupleHandlingContext createTupleContext(); } - public interface IRequestAccumulationContext + interface IRequestAccumulationContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -224,7 +224,7 @@ default String getActivityName() { } } - public interface IResponseAccumulationContext + interface IResponseAccumulationContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -236,7 +236,7 @@ default String getActivityName() { } } - public interface IRequestTransformationContext + interface IRequestTransformationContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -276,7 +276,7 @@ default String getActivityName() { void aggregateOutputChunk(int sizeInBytes); } - public interface IScheduledContext + interface IScheduledContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -288,7 +288,7 @@ default String getActivityName() { } } - public interface ITargetRequestContext + interface ITargetRequestContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -312,7 +312,7 @@ default String getActivityName() { IReceivingHttpResponseContext createHttpReceivingContext(); } - public interface IRequestConnectingContext + interface IRequestConnectingContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -324,7 +324,7 @@ default String getActivityName() { } } - public interface IRequestSendingContext + interface IRequestSendingContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -336,7 +336,7 @@ default String getActivityName() { } } - public interface IWaitingForHttpResponseContext + interface IWaitingForHttpResponseContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -348,7 +348,7 @@ default String getActivityName() { } } - public interface IReceivingHttpResponseContext + interface IReceivingHttpResponseContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -360,7 +360,7 @@ default String getActivityName() { } } - public interface ITupleHandlingContext + interface ITupleHandlingContext extends IAccumulationScope, IWithTypedEnclosingScope { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ReplayContexts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ReplayContexts.java index 942141000..ac7784de2 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ReplayContexts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ReplayContexts.java @@ -23,15 +23,13 @@ import lombok.Getter; import lombok.NonNull; import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -@Slf4j -public abstract class ReplayContexts extends IReplayContexts { +public interface ReplayContexts extends IReplayContexts { - public static final String COUNT_UNIT_STR = "count"; - public static final String BYTES_UNIT_STR = "bytes"; + String COUNT_UNIT_STR = "count"; + String BYTES_UNIT_STR = "bytes"; - public static class SocketContext extends DirectNestedSpanContext< + class SocketContext extends DirectNestedSpanContext< RootReplayerContext, ChannelKeyContext, IChannelKeyContext> implements ISocketContext { @@ -74,7 +72,7 @@ public MetricInstruments getMetrics() { } } - public static class ChannelKeyContext extends BaseNestedSpanContext< + class ChannelKeyContext extends BaseNestedSpanContext< RootReplayerContext, IScopedInstrumentationAttributes> implements IReplayContexts.IChannelKeyContext { @Getter @@ -135,7 +133,7 @@ public void addFailedChannelCreation() { } } - public static class KafkaRecordContext extends BaseNestedSpanContext + class KafkaRecordContext extends BaseNestedSpanContext implements IReplayContexts.IKafkaRecordContext { @@ -189,7 +187,7 @@ public IReplayContexts.ITrafficStreamsLifecycleContext createTrafficLifecyleCont } } - public static class TrafficStreamLifecycleContext extends BaseNestedSpanContext< + class TrafficStreamLifecycleContext extends BaseNestedSpanContext< RootReplayerContext, IScopedInstrumentationAttributes> implements IReplayContexts.ITrafficStreamsLifecycleContext { private final ITrafficStreamKey trafficStreamKey; @@ -255,7 +253,7 @@ public IReplayContexts.IChannelKeyContext getLogicalEnclosingScope() { } } - public static class HttpTransactionContext extends BaseNestedSpanContext< + class HttpTransactionContext extends BaseNestedSpanContext< RootReplayerContext, IReplayContexts.ITrafficStreamsLifecycleContext> implements IReplayContexts.IReplayerHttpTransactionContext { final UniqueReplayerRequestKey replayerRequestKey; @@ -347,7 +345,7 @@ public IReplayContexts.ITupleHandlingContext createTupleContext() { } } - public static class RequestAccumulationContext extends DirectNestedSpanContext< + class RequestAccumulationContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.IRequestAccumulationContext { @@ -371,7 +369,7 @@ private MetricInstruments(Meter meter, String activityName) { } } - public static class ResponseAccumulationContext extends DirectNestedSpanContext< + class ResponseAccumulationContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.IResponseAccumulationContext { @@ -395,7 +393,7 @@ private MetricInstruments(Meter meter, String activityName) { } } - public static class RequestTransformationContext extends DirectNestedSpanContext< + class RequestTransformationContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.IRequestTransformationContext { @@ -547,7 +545,7 @@ public void aggregateOutputChunk(int sizeInBytes) { } } - public static class ScheduledContext extends DirectNestedSpanContext< + class ScheduledContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.IScheduledContext { @@ -586,7 +584,7 @@ public void sendMeterEventsForEnd() { } } - public static class TargetRequestContext extends DirectNestedSpanContext< + class TargetRequestContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.ITargetRequestContext { @@ -656,7 +654,7 @@ public IReplayContexts.IWaitingForHttpResponseContext createWaitingForResponseCo } } - public static class RequestConnectingContext extends DirectNestedSpanContext< + class RequestConnectingContext extends DirectNestedSpanContext< RootReplayerContext, TargetRequestContext, IReplayContexts.ITargetRequestContext> implements IReplayContexts.IRequestConnectingContext { @@ -680,7 +678,7 @@ private MetricInstruments(Meter meter, String activityName) { } } - public static class RequestSendingContext extends DirectNestedSpanContext< + class RequestSendingContext extends DirectNestedSpanContext< RootReplayerContext, TargetRequestContext, IReplayContexts.ITargetRequestContext> implements IReplayContexts.IRequestSendingContext { @@ -704,7 +702,7 @@ private MetricInstruments(Meter meter, String activityName) { } } - public static class WaitingForHttpResponseContext extends DirectNestedSpanContext< + class WaitingForHttpResponseContext extends DirectNestedSpanContext< RootReplayerContext, TargetRequestContext, IReplayContexts.ITargetRequestContext> implements IReplayContexts.IWaitingForHttpResponseContext { @@ -729,7 +727,7 @@ private MetricInstruments(Meter meter, String activityName) { } - public static class ReceivingHttpResponseContext extends DirectNestedSpanContext< + class ReceivingHttpResponseContext extends DirectNestedSpanContext< RootReplayerContext, TargetRequestContext, IReplayContexts.ITargetRequestContext> implements IReplayContexts.IReceivingHttpResponseContext { @@ -756,7 +754,7 @@ private MetricInstruments(Meter meter, String activityName) { @Getter @Setter - public static class TupleHandlingContext extends DirectNestedSpanContext< + class TupleHandlingContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.ITupleHandlingContext { diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java index 77b948da0..a5929530b 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java @@ -29,13 +29,13 @@ public static void setup() { CountingNettyResourceLeakDetector.activate(); } - final static String SAMPLE_REQUEST_STRING = "GET / HTTP/1.1\r\n" + static final String SAMPLE_REQUEST_STRING = "GET / HTTP/1.1\r\n" + "Host: localhost\r\n" + "Connection: Keep-Alive\r\n" + "User-Agent: UnitTest\r\n" + "\r\n"; - final static String SAMPLE_REQUEST_AS_BLOCKS = "[G],[E],[T],[ ],[/],[ ],[H],[T],[T],[P],[/],[1],[.],[1]," + static final String SAMPLE_REQUEST_AS_BLOCKS = "[G],[E],[T],[ ],[/],[ ],[H],[T],[T],[P],[/],[1],[.],[1]," + "[\r],[\n]," + "[H],[o],[s],[t],[:],[ ],[l],[o],[c],[a],[l],[h],[o],[s],[t]," + "[\r],[\n]," @@ -45,13 +45,13 @@ public static void setup() { + "[\r],[\n]," + "[\r],[\n]"; - final static String SAMPLE_REQUEST_AS_PARSED_HTTP = "GET / HTTP/1.1\r\n" + static final String SAMPLE_REQUEST_AS_PARSED_HTTP = "GET / HTTP/1.1\r\n" + "Host: localhost\r\n" + "Connection: Keep-Alive\r\n" + "User-Agent: UnitTest\r\n" + "\r\n"; - final static String SAMPLE_REQUEST_AS_PARSED_HTTP_SORTED = "GET / HTTP/1.1\r\n" + static final String SAMPLE_REQUEST_AS_PARSED_HTTP_SORTED = "GET / HTTP/1.1\r\n" + "Connection: Keep-Alive\r\n" + "Host: localhost\r\n" + "User-Agent: UnitTest\r\n" diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java index 610c63baa..3dbab055f 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java @@ -41,7 +41,7 @@ class ResultsToLogsConsumerTest extends InstrumentationTest { private static final ObjectMapper mapper = new ObjectMapper(); public static final String TEST_EXCEPTION_MESSAGE = "TEST_EXCEPTION"; - public final static String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + public static final String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + "Content-transfer-encoding: chunked\r\n" + "Date: Thu, 08 Jun 2023 23:06:23 GMT\r\n" + // This should be OK since it's always the same length diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index c13950705..1c0803db6 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -63,13 +63,13 @@ public class NettyPacketToHttpConsumerTest extends InstrumentationTest { public static final int LARGE_RESPONSE_CONTENT_LENGTH = 2 * 1024 * 1024; public static final int LARGE_RESPONSE_LENGTH = LARGE_RESPONSE_CONTENT_LENGTH + 107; - final static String EXPECTED_REQUEST_STRING = "GET / HTTP/1.1\r\n" + static final String EXPECTED_REQUEST_STRING = "GET / HTTP/1.1\r\n" + "Connection: Keep-Alive\r\n" + "Host: localhost\r\n" + "User-Agent: UnitTest\r\n" + "Connection: Keep-Alive\r\n" + "\r\n"; - public final static String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + public static final String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + "Content-Type: text/plain\r\n" + "Funtime: checkIt!\r\n" + "transfer-encoding: chunked\r\n" diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java index ea1fcb28f..073d882e4 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java @@ -37,7 +37,7 @@ @WrapWithNettyLeakDetection class HttpJsonTransformingConsumerTest extends InstrumentationTest { - private final static String NDJSON_TEST_REQUEST = ( + private static final String NDJSON_TEST_REQUEST = ( "POST /test HTTP/1.1\r\n" + "Host: foo.example\r\n" + "Content-Type: application/json\r\n" + diff --git a/testHelperFixtures/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java b/testHelperFixtures/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java index c1d47f859..bef187b18 100644 --- a/testHelperFixtures/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java +++ b/testHelperFixtures/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java @@ -37,8 +37,8 @@ */ public class SimpleHttpClientForTesting implements AutoCloseable { - private final static Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5); - private final static Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5); + private static final Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5); + private static final Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5); private final CloseableHttpClient httpClient; From ace3c370aa388b720859ef9105597dd17960fb5f Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Tue, 24 Sep 2024 16:11:21 -0400 Subject: [PATCH 3/3] Next tranche of linting fixes. Signed-off-by: Greg Schohn --- .../common/EphemeralSourceRepoAccessor.java | 54 ------------------- .../bulkload/common/InvalidResponse.java | 4 +- .../bulkload/common/SnapshotCreator.java | 19 ++++--- .../common/SnapshotShardUnpacker.java | 11 ++-- .../workcoordination/IWorkCoordinator.java | 3 ++ .../OpenSearchWorkCoordinator.java | 7 --- .../bulkload/worker/IndexRunner.java | 6 +-- .../reindexer/FailedRequestsLogger.java | 14 ++--- .../reindexer/FailedRequestsLoggerTest.java | 4 +- .../FileConnectionCaptureFactory.java | 20 +++---- .../proxyserver/TestHeaderRewrites.java | 8 +-- .../migrations/replay/ReplayEngine.java | 2 +- .../opensearch/migrations/replay/Utils.java | 2 + .../replay/tracing/ChannelContextManager.java | 2 +- .../JsonJoltTransformerProvider.java | 2 +- 15 files changed, 50 insertions(+), 108 deletions(-) delete mode 100644 RFS/src/main/java/org/opensearch/migrations/bulkload/common/EphemeralSourceRepoAccessor.java diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/EphemeralSourceRepoAccessor.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/EphemeralSourceRepoAccessor.java deleted file mode 100644 index 1f59f441c..000000000 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/EphemeralSourceRepoAccessor.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.opensearch.migrations.bulkload.common; - -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/* - * Provides access to the underlying files in the source repo and deletes the files after the Stream is closed. This - * is useful/interesting in the case where the files are large/numerous and you can easily re-acquire them - such as - * if they are being loaded from S3. - * - * TODO: find a better approach to this (see https://opensearch.atlassian.net/browse/MIGRATIONS-1786) - */ -public class EphemeralSourceRepoAccessor extends SourceRepoAccessor { - private static final Logger logger = LogManager.getLogger(EphemeralSourceRepoAccessor.class); - - public EphemeralSourceRepoAccessor(SourceRepo repo) { - super(repo); - } - - @Override - protected InputStream load(Path path) { - try { - return new EphemeralFileInputStream(path); - } catch (Exception e) { - throw new CouldNotLoadRepoFile("Could not load file: " + path, e); - } - } - - public static class EphemeralFileInputStream extends FileInputStream { - private final Path filePath; - - public EphemeralFileInputStream(Path filePath) throws IOException { - super(filePath.toFile()); - this.filePath = filePath; - } - - @Override - public void close() throws IOException { - try { - super.close(); - } finally { - logger.info("Deleting local file: {}", filePath); - logger.warn("See: https://opensearch.atlassian.net/browse/MIGRATIONS-1786"); - Files.deleteIfExists(filePath); - } - } - } -} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java index cb3d8f408..4d40aa234 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java @@ -17,7 +17,7 @@ @Slf4j public class InvalidResponse extends RfsException { - private static final Pattern unknownSetting = Pattern.compile("unknown setting \\[(.+)\\].+"); + private static final Pattern UNKNOWN_SETTING = Pattern.compile("unknown setting \\[(.+)\\].+"); private static final ObjectMapper objectMapper = new ObjectMapper(); private final transient HttpResponse response; @@ -78,7 +78,7 @@ private static Map.Entry getUnknownSetting(JsonNode json) { } return Map.entry(typeNode, reasonNode); }).map(entry -> { - var matcher = unknownSetting.matcher(entry.getValue().asText()); + var matcher = UNKNOWN_SETTING.matcher(entry.getValue().asText()); if (!matcher.matches()) { return null; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java index 5c5c9987e..8cbef4a84 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.extern.slf4j.Slf4j; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -13,8 +14,8 @@ import lombok.Getter; +@Slf4j public abstract class SnapshotCreator { - private static final Logger logger = LogManager.getLogger(SnapshotCreator.class); private static final ObjectMapper mapper = new ObjectMapper(); static { mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -43,9 +44,9 @@ public void registerRepo() { // Register the repo; it's fine if it already exists try { client.registerSnapshotRepo(getRepoName(), settings, context); - logger.info("Snapshot repo registration successful"); + log.atInfo().setMessage("Snapshot repo registration successful").log(); } catch (Exception e) { - logger.error("Snapshot repo registration failed", e); + log.atError().setMessage("Snapshot repo registration failed").setCause(e).log(); throw new RepoRegistrationFailed(getRepoName()); } } @@ -60,9 +61,9 @@ public void createSnapshot() { // Create the snapshot; idempotent operation try { client.createSnapshot(getRepoName(), snapshotName, body, context); - logger.info("Snapshot {} creation initiated", snapshotName); + log.atInfo().setMessage("Snapshot {} creation initiated").addArgument(snapshotName).log(); } catch (Exception e) { - logger.error("Snapshot {} creation failed", snapshotName, e); + log.atError().setMessage("Snapshot {} creation failed").addArgument(snapshotName).setCause(e).log(); throw new SnapshotCreationFailed(snapshotName); } } @@ -72,12 +73,12 @@ public boolean isSnapshotFinished() { try { response = client.getSnapshotStatus(getRepoName(), snapshotName, context); } catch (Exception e) { - logger.error("Failed to get snapshot status", e); + log.atError().setMessage("Failed to get snapshot status").setCause(e).log(); throw new SnapshotStatusCheckFailed(snapshotName); } if (response.isEmpty()) { - logger.error("Snapshot {} does not exist", snapshotName); + log.atError().setMessage("Snapshot {} does not exist").addArgument(snapshotName).log(); throw new SnapshotDoesNotExist(snapshotName); } @@ -91,7 +92,9 @@ public boolean isSnapshotFinished() { } else if (state.equals("IN_PROGRESS")) { return false; } else { - logger.error("Snapshot {} has failed with state {}", snapshotName, state); + log.atError().setMessage("Snapshot {} has failed with state {}") + .addArgument(snapshotName) + .addArgument(state).log(); throw new SnapshotCreationFailed(snapshotName); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotShardUnpacker.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotShardUnpacker.java index 6869d81a8..01dbbe6a0 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotShardUnpacker.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotShardUnpacker.java @@ -5,6 +5,7 @@ import java.nio.file.Path; import java.nio.file.Paths; +import lombok.extern.slf4j.Slf4j; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.FSDirectory; @@ -19,8 +20,8 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor +@Slf4j public class SnapshotShardUnpacker { - private static final Logger logger = LogManager.getLogger(SnapshotShardUnpacker.class); private final SourceRepoAccessor repoAccessor; private final Path luceneFilesBasePath; private final ShardMetadata shardMetadata; @@ -52,11 +53,9 @@ public Path unpack() { Files.createDirectories(luceneIndexDir); try (FSDirectory primaryDirectory = FSDirectory.open(luceneIndexDir, lockFactory)) { for (ShardFileInfo fileMetadata : shardMetadata.getFiles()) { - logger.info( - "Unpacking - Blob Name: {}, Lucene Name: {}", - fileMetadata.getName(), - fileMetadata.getPhysicalName() - ); + log.atInfo().setMessage("Unpacking - Blob Name: {}, Lucene Name: {}") + .addArgument(fileMetadata.getName()) + .addArgument(fileMetadata.getPhysicalName()).log(); try ( IndexOutput indexOutput = primaryDirectory.createOutput( fileMetadata.getPhysicalName(), diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java index df0b0d34b..4c861164d 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java @@ -177,4 +177,7 @@ public T visit(WorkAcquisitionOutcomeVisitor v) throws IOException, Inter } } + @Override + default void close() throws Exception { + } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index 4ccd61780..c57e60a6e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -115,13 +115,6 @@ public OpenSearchWorkCoordinator( this.objectMapper = new ObjectMapper(); } - /** - * IWorkCoordinator extends AutoCloseable but this class has no resources that it owns that need to be closed. - */ - @Override - public void close() throws Exception { - } - public void setup(Supplier contextSupplier) throws IOException, InterruptedException { var body = "{\n" diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java index 69f061f82..9097d2d68 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java @@ -30,7 +30,7 @@ public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexConte BiConsumer logger = (indexName, accepted) -> { if (Boolean.FALSE.equals(accepted)) { - log.info("Index " + indexName + " rejected by allowlist"); + log.atInfo().setMessage("Index {} rejected by allowlist").addArgument(indexName).log(); } }; var results = IndexMetadataResults.builder(); @@ -44,11 +44,11 @@ public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexConte var transformedRoot = transformer.transformIndexMetadata(indexMetadata); var created = indexCreator.create(transformedRoot, mode, context); if (created) { - log.debug("Index " + indexName + " created successfully"); + log.atDebug().setMessage("Index {} created successfully").addArgument(indexName).log(); results.indexName(indexName); transformedRoot.getAliases().fieldNames().forEachRemaining(results::alias); } else { - log.warn("Index " + indexName + " already existed; no work required"); + log.atWarn().setMessage("Index {} already existed; no work required").addArgument(indexName).log(); } }); return results.build(); diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/FailedRequestsLogger.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/FailedRequestsLogger.java index d6be23530..d657794df 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/FailedRequestsLogger.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/FailedRequestsLogger.java @@ -6,12 +6,12 @@ import org.opensearch.migrations.bulkload.common.OpenSearchClient.OperationFailed; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Slf4j public class FailedRequestsLogger { - static final String LOGGER_NAME = "FailedRequestsLogger"; - private final Logger logger = LoggerFactory.getLogger(LOGGER_NAME); public void logBulkFailure( String indexName, @@ -28,23 +28,23 @@ public void logBulkFailure( .map(response -> response.body); if (responseBody.isPresent()) { - logger.atInfo() + log.atInfo() .setMessage( "Bulk request failed for {} index on {} items, reason {}, bulk request body followed by response:\n{}\n{}" ) .addArgument(indexName) .addArgument(failedItemCounter::getAsInt) .addArgument(rootCause.getMessage()) - .addArgument(bulkRequestBodySupplier::get) + .addArgument(bulkRequestBodySupplier) .addArgument(responseBody::get) .log(); } else { - logger.atInfo() + log.atInfo() .setMessage("Bulk request failed for {} index on {} documents, reason {}, bulk request body:\n{}") .addArgument(indexName) .addArgument(failedItemCounter::getAsInt) - .addArgument(rootCause.getMessage()) - .addArgument(bulkRequestBodySupplier::get) + .addArgument(() -> Optional.ofNullable(rootCause).map(Throwable::getMessage).orElse("[NULL]")) + .addArgument(bulkRequestBodySupplier) .log(); } } diff --git a/RFS/src/test/java/org/opensearch/migrations/reindexer/FailedRequestsLoggerTest.java b/RFS/src/test/java/org/opensearch/migrations/reindexer/FailedRequestsLoggerTest.java index ef44deec1..55bed16da 100644 --- a/RFS/src/test/java/org/opensearch/migrations/reindexer/FailedRequestsLoggerTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/reindexer/FailedRequestsLoggerTest.java @@ -15,7 +15,7 @@ class FailedRequestsLoggerTest { @Test void testLogBulkFailure_withNoBody() { - try (var logs = new CloseableLogSetup("FailedRequestsLogger")) { + try (var logs = new CloseableLogSetup(FailedRequestsLogger.class.getName())) { var logger = new FailedRequestsLogger(); var indexName = "myIndexName"; @@ -40,7 +40,7 @@ void testLogBulkFailure_withNoBody() { @Test void testLogBulkFailure_withResponseBody() { - try (var logs = new CloseableLogSetup("FailedRequestsLogger")) { + try (var logs = new CloseableLogSetup(FailedRequestsLogger.class.getName())) { var logger = new FailedRequestsLogger(); var indexName = "yourIndexName"; diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java index 69a591c4c..d72cfaf03 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java @@ -19,13 +19,12 @@ * Reference implementation of a TrafficStream protobuf-encoded sink. * TrafficStreams are dumped to individual files that are named according to the TrafficStream id. * - * @deprecated - This class is NOT meant to be used for production. + * WARNING: This class is NOT intended to be used for production. */ @Slf4j -@Deprecated(since = "0.1", forRemoval = false) public class FileConnectionCaptureFactory implements IConnectionCaptureFactory { private final BiFunction outputStreamCreator; - private String nodeId; + private final String nodeId; private final int bufferSize; public FileConnectionCaptureFactory( @@ -72,14 +71,11 @@ public CompletableFuture kickoffCloseStream(CodedOutputStreamHolder output var osh = (CodedOutputStreamAndByteBufferWrapper) outputStreamHolder; return CompletableFuture.runAsync(() -> { try { - FileOutputStream fs = outputStreamCreator.apply(connectionId, index); - var bb = osh.getByteBuffer(); - byte[] filledBytes = Arrays.copyOfRange(bb.array(), 0, bb.position()); - fs.write(filledBytes); - fs.flush(); - log.warn( - "NOT removing the CodedOutputStream from the WeakHashMap, which is a memory leak. Doing this until the system knows when to properly flush buffers" - ); + try (FileOutputStream fs = outputStreamCreator.apply(connectionId, index)) { + var bb = osh.getByteBuffer(); + fs.write(Arrays.copyOfRange(bb.array(), 0, bb.position())); + fs.flush(); + } } catch (IOException e) { throw Lombok.sneakyThrow(e); } @@ -88,7 +84,7 @@ public CompletableFuture kickoffCloseStream(CodedOutputStreamHolder output } @Override - public IChannelConnectionCaptureSerializer createOffloader(IConnectionContext ctx) { + public IChannelConnectionCaptureSerializer createOffloader(IConnectionContext ctx) { final var connectionId = ctx.getConnectionId(); return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId, new StreamManager(connectionId)); } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/TestHeaderRewrites.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/TestHeaderRewrites.java index a110a3362..20221a807 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/TestHeaderRewrites.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/TestHeaderRewrites.java @@ -25,10 +25,10 @@ public class TestHeaderRewrites { public static final String ONLY_FOR_HEADERS_VALUE = "this is only for headers"; -public static final String BODY_WITH_HEADERS_CONTENTS = "\n" + - "body: should stay\n" + - "body: untouched\n" + - "body:\n"; + public static final String BODY_WITH_HEADERS_CONTENTS = "\n" + + "body: should stay\n" + + "body: untouched\n" + + "body:\n"; @Test public void testHeaderRewrites() throws Exception { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayEngine.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayEngine.java index 4e2b198aa..d95b29854 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayEngine.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayEngine.java @@ -88,7 +88,7 @@ private void updateContentTimeControllerWhenIdling() { return; } var currentSourceTimeOp = timeShifter.transformRealTimeToSourceTime(Instant.now()); - if (!currentSourceTimeOp.isPresent()) { + if (currentSourceTimeOp.isEmpty()) { // do nothing - the traffic source shouldn't be blocking initially. // Leave it manage its own initialization since we don't have any better information about what a // start time might be yet. diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Utils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Utils.java index 3da161544..a9005b107 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Utils.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Utils.java @@ -24,6 +24,8 @@ public class Utils { public static final int MAX_PAYLOAD_BYTES_TO_PRINT = 100 * 1024 * 1024; // 100MiB based on // https://docs.aws.amazon.com/opensearch-service/latest/developerguide/limits.html#network-limits + private Utils() {} + public static Instant setIfLater(AtomicReference referenceValue, Instant pointInTime) { return referenceValue.updateAndGet( existingInstant -> existingInstant.isBefore(pointInTime) ? pointInTime : existingInstant diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ChannelContextManager.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ChannelContextManager.java index 711b1d1de..ddd172895 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ChannelContextManager.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ChannelContextManager.java @@ -20,7 +20,7 @@ private static class RefCountedContext { final IReplayContexts.IChannelKeyContext context; private int refCount; - private RefCountedContext(IReplayContexts.IChannelKeyContext context) { + RefCountedContext(IReplayContexts.IChannelKeyContext context) { this.context = context; } diff --git a/TrafficCapture/transformationPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonJoltTransformerProvider.java b/TrafficCapture/transformationPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonJoltTransformerProvider.java index 27ad19efb..458c41f4a 100644 --- a/TrafficCapture/transformationPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonJoltTransformerProvider.java +++ b/TrafficCapture/transformationPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonJoltTransformerProvider.java @@ -62,7 +62,7 @@ private String getConfigUsageStr() { + "Each of the Maps should have one key-value, either \"canned\" or \"script\". " + "Canned values should be a string that specifies the name of the pre-built transformation to use " + Arrays.stream(JsonJoltTransformBuilder.CANNED_OPERATION.values()) - .map(e -> e.toString()) + .map(Object::toString) .collect(Collectors.joining(",")) + ". " + "Script values should be a fully-formed inlined Jolt transformation in json form. "