diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java index 8cb185683..d07b11052 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java @@ -34,10 +34,10 @@ @Slf4j public class ParallelDocumentMigrationsTest extends SourceTestBase { - final static List SOURCE_IMAGES = List.of( + static final List SOURCE_IMAGES = List.of( SearchClusterContainer.ES_V7_10_2 ); - final static List TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0); + static final List TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0); public static Stream makeDocumentMigrationArgs() { List sourceImageArgs = SOURCE_IMAGES.stream() diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java index 24223d259..4c5e78480 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java @@ -49,9 +49,9 @@ @Slf4j public class SourceTestBase { public static final String GENERATOR_BASE_IMAGE = "migrations/elasticsearch_client_test_console:latest"; - public final static int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024; + public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024; public static final String SOURCE_SERVER_ALIAS = "source"; - public final static long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600; + public static final long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600; protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVersion baseSourceImage) { return new Object[] { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java index 21f55415b..cb3d8f408 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/InvalidResponse.java @@ -19,7 +19,7 @@ public class InvalidResponse extends RfsException { private static final Pattern unknownSetting = Pattern.compile("unknown setting \\[(.+)\\].+"); private static final ObjectMapper objectMapper = new ObjectMapper(); - private final HttpResponse response; + private final transient HttpResponse response; public InvalidResponse(String message, HttpResponse response) { super(message); @@ -41,22 +41,18 @@ public Set getIllegalArguments() { errorBody.map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add); // Check root cause errors - errorBody.map(node -> node.get("root_cause")).ifPresent(nodes -> { + errorBody.map(node -> node.get("root_cause")).ifPresent(nodes -> nodes.forEach( - node -> { - Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add); - } - ); - }); + node -> Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add) + ) + ); // Check all suppressed errors - errorBody.map(node -> node.get("suppressed")).ifPresent(nodes -> { + errorBody.map(node -> node.get("suppressed")).ifPresent(nodes -> nodes.forEach( - node -> { - Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add); - } - ); - }); + node -> + Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add) + )); var onlyExpectedErrors = interimResults.stream() .map(Entry::getKey) diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java index 780b714d2..74d161b5e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java @@ -6,7 +6,6 @@ import java.util.concurrent.Callable; import java.util.function.Function; -import jdk.jshell.spi.ExecutionControl; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index 5047021df..ee4eb39f9 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -451,7 +451,7 @@ public String getFailureMessage() { } public static class OperationFailed extends RfsException { - public final HttpResponse response; + public final transient HttpResponse response; public OperationFailed(String message, HttpResponse response) { super(message +"\nBody:\n" + response); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java index bca74027a..52fd7f758 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java @@ -6,13 +6,10 @@ import java.util.Comparator; import java.util.Optional; -import lombok.extern.slf4j.Slf4j; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import org.opensearch.migrations.bulkload.models.ShardMetadata; import lombok.ToString; +import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java index 4e4d3c494..5c5c9987e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/SnapshotCreator.java @@ -6,12 +6,13 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.migrations.bulkload.tracing.IRfsContexts; +import lombok.Getter; + public abstract class SnapshotCreator { private static final Logger logger = LogManager.getLogger(SnapshotCreator.class); private static final ObjectMapper mapper = new ObjectMapper(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IRfsContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IRfsContexts.java index f2c51008e..d9d6ac606 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IRfsContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IRfsContexts.java @@ -3,22 +3,22 @@ import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; -public abstract class IRfsContexts { - public static class ActivityNames { +public interface IRfsContexts { + class ActivityNames { private ActivityNames() {} public static final String HTTP_REQUEST = "httpRequest"; public static final String CHECK_THEN_PUT_REQUESTS = "checkThenPutRequest"; } - public static class MetricNames { + class MetricNames { private MetricNames() {} public static final String BYTES_READ = "bytesRead"; public static final String BYTES_SENT = "bytesSent"; } - public interface IRequestContext extends IScopedInstrumentationAttributes { + interface IRequestContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.HTTP_REQUEST; void addBytesSent(int i); @@ -26,7 +26,7 @@ public interface IRequestContext extends IScopedInstrumentationAttributes { void addBytesRead(int i); } - public interface ICheckedIdempotentPutRequestContext extends IScopedInstrumentationAttributes { + interface ICheckedIdempotentPutRequestContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.CHECK_THEN_PUT_REQUESTS; IRequestContext createCheckRequestContext(); @@ -34,7 +34,7 @@ public interface ICheckedIdempotentPutRequestContext extends IScopedInstrumentat IRequestContext createPutContext(); } - public interface ICreateSnapshotContext extends IScopedInstrumentationAttributes { + interface ICreateSnapshotContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = IMetadataMigrationContexts.ActivityNames.CREATE_SNAPSHOT; IRequestContext createRegisterRequest(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java index e5796be32..3781e60c9 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java @@ -3,9 +3,9 @@ import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; -public abstract class IWorkCoordinationContexts { +public interface IWorkCoordinationContexts { - public static class ActivityNames { + class ActivityNames { public static final String COORDINATION_INITIALIZATION = "workCoordinationInitialization"; public static final String CREATE_UNASSIGNED_WORK_ITEM = "createUnassignedWork"; public static final String PENDING_WORK_CHECK = "pendingWorkCheck"; @@ -17,7 +17,7 @@ public static class ActivityNames { private ActivityNames() {} } - public static class MetricNames { + class MetricNames { public static final String NEXT_WORK_ASSIGNED = "nextWorkAssignedCount"; public static final String NO_NEXT_WORK_AVAILABLE = "noNextWorkAvailableCount"; public static final String RECOVERABLE_CLOCK_ERROR = "recoverableClockErrorCount"; @@ -26,37 +26,37 @@ public static class MetricNames { private MetricNames() {} } - public interface IRetryableActivityContext extends IScopedInstrumentationAttributes { + interface IRetryableActivityContext extends IScopedInstrumentationAttributes { void recordRetry(); void recordFailure(); } - public interface IInitializeCoordinatorStateContext extends IRetryableActivityContext { + interface IInitializeCoordinatorStateContext extends IRetryableActivityContext { String ACTIVITY_NAME = ActivityNames.COORDINATION_INITIALIZATION; } - public interface ICreateUnassignedWorkItemContext extends IRetryableActivityContext { + interface ICreateUnassignedWorkItemContext extends IRetryableActivityContext { String ACTIVITY_NAME = ActivityNames.CREATE_UNASSIGNED_WORK_ITEM; } - public interface IPendingWorkItemsContext extends IScopedInstrumentationAttributes { + interface IPendingWorkItemsContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.PENDING_WORK_CHECK; IRefreshContext getRefreshContext(); } - public interface IRefreshContext extends IRetryableActivityContext { + interface IRefreshContext extends IRetryableActivityContext { String ACTIVITY_NAME = ActivityNames.SYNC_REFRESH_CLUSTER; } - public interface IBaseAcquireWorkContext extends IRetryableActivityContext {} + interface IBaseAcquireWorkContext extends IRetryableActivityContext {} - public interface IAcquireSpecificWorkContext extends IBaseAcquireWorkContext { + interface IAcquireSpecificWorkContext extends IBaseAcquireWorkContext { String ACTIVITY_NAME = ActivityNames.ACQUIRE_SPECIFIC_WORK; } - public interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext { + interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext { String ACTIVITY_NAME = ActivityNames.ACQUIRE_NEXT_WORK; IRefreshContext getRefreshContext(); @@ -70,13 +70,13 @@ public interface IAcquireNextWorkItemContext extends IBaseAcquireWorkContext { void recordFailure(OpenSearchWorkCoordinator.PotentialClockDriftDetectedException e); } - public interface ICompleteWorkItemContext extends IRetryableActivityContext { + interface ICompleteWorkItemContext extends IRetryableActivityContext { String ACTIVITY_NAME = ActivityNames.COMPLETE_WORK; IRefreshContext getRefreshContext(); } - public interface IScopedWorkContext extends IScopedInstrumentationAttributes { + interface IScopedWorkContext extends IScopedInstrumentationAttributes { C createOpeningContext(); ICompleteWorkItemContext createCloseContet(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RfsContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RfsContexts.java index 90e8a677f..925a0f3be 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RfsContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RfsContexts.java @@ -13,13 +13,11 @@ import lombok.Getter; import lombok.NonNull; -public class RfsContexts extends IRfsContexts { +public interface RfsContexts extends IRfsContexts { - private RfsContexts() {} + String COUNT_UNITS = "count"; - public static final String COUNT_UNITS = "count"; - - public static class GenericRequestContext extends BaseSpanContext + class GenericRequestContext extends BaseSpanContext implements IRfsContexts.IRequestContext { @@ -97,7 +95,7 @@ public void addBytesRead(int i) { } } - public static class CheckedIdempotentPutRequestContext extends BaseSpanContext + class CheckedIdempotentPutRequestContext extends BaseSpanContext implements IRfsContexts.ICheckedIdempotentPutRequestContext { @Getter diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java index c86c9741a..e8cc5fafe 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java @@ -12,11 +12,9 @@ import lombok.Getter; import lombok.NonNull; -public class WorkCoordinationContexts extends IWorkCoordinationContexts { - private WorkCoordinationContexts() {} - +public interface WorkCoordinationContexts extends IWorkCoordinationContexts { @AllArgsConstructor - public static class RetryLabels { + class RetryLabels { CommonScopedMetricInstruments.ScopeLabels scopeLabels; public final String retry; public final String failure; @@ -30,7 +28,7 @@ private static RetryLabels autoLabels(String activityName) { ); } - public static class RetryMetricInstruments extends CommonScopedMetricInstruments { + class RetryMetricInstruments extends CommonScopedMetricInstruments { public final LongCounter retryCounter; public final LongCounter failureCounter; @@ -41,7 +39,7 @@ private RetryMetricInstruments(Meter meter, RetryLabels retryLabels) { } } - public interface RetryableActivityContextMetricMixin + interface RetryableActivityContextMetricMixin extends IRetryableActivityContext { T getRetryMetrics(); @@ -60,7 +58,7 @@ default void recordFailure() { } @Getter - public static class InitializeCoordinatorStateContext extends BaseSpanContext + class InitializeCoordinatorStateContext extends BaseSpanContext implements IInitializeCoordinatorStateContext, RetryableActivityContextMetricMixin { @@ -101,7 +99,7 @@ public MetricInstruments getRetryMetrics() { } @Getter - public static class CreateUnassignedWorkItemContext extends BaseSpanContext + class CreateUnassignedWorkItemContext extends BaseSpanContext implements ICreateUnassignedWorkItemContext, RetryableActivityContextMetricMixin { @@ -138,7 +136,7 @@ public MetricInstruments getRetryMetrics() { } @Getter - public static class PendingItems extends BaseSpanContext + class PendingItems extends BaseSpanContext implements IPendingWorkItemsContext { final IScopedInstrumentationAttributes enclosingScope; @@ -180,7 +178,7 @@ public MetricInstruments getMetrics() { } @Getter - public static class Refresh extends BaseSpanContext + class Refresh extends BaseSpanContext implements IRefreshContext, RetryableActivityContextMetricMixin { @@ -217,7 +215,7 @@ public MetricInstruments getRetryMetrics() { } @Getter - public static class AcquireSpecificWorkContext extends BaseSpanContext + class AcquireSpecificWorkContext extends BaseSpanContext implements IAcquireSpecificWorkContext, RetryableActivityContextMetricMixin { @@ -254,7 +252,7 @@ public MetricInstruments getRetryMetrics() { } @Getter - public static class AcquireNextWorkItemContext extends BaseSpanContext + class AcquireNextWorkItemContext extends BaseSpanContext implements IAcquireNextWorkItemContext, RetryableActivityContextMetricMixin { @@ -326,7 +324,7 @@ public void recordFailure(OpenSearchWorkCoordinator.PotentialClockDriftDetectedE } @Getter - public static class CompleteWorkItemContext extends BaseSpanContext + class CompleteWorkItemContext extends BaseSpanContext implements ICompleteWorkItemContext, RetryableActivityContextMetricMixin { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java index 845da3d2f..c8a19369a 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java @@ -9,6 +9,12 @@ public class TransformFunctions { private static final ObjectMapper mapper = new ObjectMapper(); + public static final String MAPPINGS_KEY_STR = "mappings"; + public static final String SETTINGS_KEY_STR = "settings"; + public static final String NUMBER_OF_REPLICAS_KEY_STR = "number_of_replicas"; + public static final String INDEX_KEY_STR = "index"; + + private TransformFunctions() {} public static Transformer getTransformer( Version sourceVersion, @@ -61,13 +67,11 @@ public static ObjectNode convertFlatSettingsToTree(ObjectNode flatSettings) { * - [{"audit_message":{"properties":{"address":{"type":"text"}}}}] */ public static void removeIntermediateMappingsLevels(ObjectNode root) { - if (root.has("mappings")) { - try { - ArrayNode mappingsList = (ArrayNode) root.get("mappings"); - root.set("mappings", getMappingsFromBeneathIntermediate(mappingsList)); - } catch (ClassCastException e) { - // mappings isn't an array - return; + if (root.has(MAPPINGS_KEY_STR)) { + var val = root.get(MAPPINGS_KEY_STR); + if (val instanceof ArrayNode) { + ArrayNode mappingsList = (ArrayNode) val; + root.set(MAPPINGS_KEY_STR, getMappingsFromBeneathIntermediate(mappingsList)); } } } @@ -94,13 +98,13 @@ public static ObjectNode getMappingsFromBeneathIntermediate(ArrayNode mappingsRo public static void removeIntermediateIndexSettingsLevel(ObjectNode root) { // Remove the intermediate key "index" under "settings", will start like: // {"index":{"number_of_shards":"1","number_of_replicas":"1"}} - if (root.has("settings")) { - ObjectNode settingsRoot = (ObjectNode) root.get("settings"); - if (settingsRoot.has("index")) { - ObjectNode indexSettingsRoot = (ObjectNode) settingsRoot.get("index"); + if (root.has(SETTINGS_KEY_STR)) { + ObjectNode settingsRoot = (ObjectNode) root.get(SETTINGS_KEY_STR); + if (settingsRoot.has(INDEX_KEY_STR)) { + ObjectNode indexSettingsRoot = (ObjectNode) settingsRoot.get(INDEX_KEY_STR); settingsRoot.setAll(indexSettingsRoot); - settingsRoot.remove("index"); - root.set("settings", settingsRoot); + settingsRoot.remove(INDEX_KEY_STR); + root.set(SETTINGS_KEY_STR, settingsRoot); } } } @@ -112,20 +116,20 @@ public static void removeIntermediateIndexSettingsLevel(ObjectNode root) { * the minimum number of replicas being 2. */ public static void fixReplicasForDimensionality(ObjectNode root, int dimensionality) { - if (root.has("settings")) { - ObjectNode settingsRoot = (ObjectNode) root.get("settings"); - if (settingsRoot.has("number_of_replicas")) { + if (root.has(SETTINGS_KEY_STR)) { + ObjectNode settingsRoot = (ObjectNode) root.get(SETTINGS_KEY_STR); + if (settingsRoot.has(NUMBER_OF_REPLICAS_KEY_STR)) { // dimensionality must be at least 1 dimensionality = Math.max(dimensionality, 1); // If the total number of copies requested in the original settings is not a multiple of the // dimensionality, then up it to the next largest multiple of the dimensionality. - int numberOfCopies = settingsRoot.get("number_of_replicas").asInt() + 1; + int numberOfCopies = settingsRoot.get(NUMBER_OF_REPLICAS_KEY_STR).asInt() + 1; int remainder = numberOfCopies % dimensionality; int newNumberOfCopies = (remainder > 0) ? (numberOfCopies + dimensionality - remainder) : numberOfCopies; int newNumberOfReplicas = newNumberOfCopies - 1; - settingsRoot.put("number_of_replicas", newNumberOfReplicas); + settingsRoot.put(NUMBER_OF_REPLICAS_KEY_STR, newNumberOfReplicas); } } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_6_8_to_OS_2_11.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_6_8_to_OS_2_11.java index 18997190c..16087ac2f 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_6_8_to_OS_2_11.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_6_8_to_OS_2_11.java @@ -4,8 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.migrations.bulkload.models.GlobalMetadata; import org.opensearch.migrations.bulkload.models.IndexMetadata; @@ -15,14 +13,16 @@ import org.opensearch.migrations.transformation.entity.Index; import org.opensearch.migrations.transformation.rules.IndexMappingTypeRemoval; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class Transformer_ES_6_8_to_OS_2_11 implements Transformer { - private static final Logger logger = LogManager.getLogger(Transformer_ES_6_8_to_OS_2_11.class); private static final ObjectMapper mapper = new ObjectMapper(); private final List> indexTransformations = List.of(new IndexMappingTypeRemoval()); private final List> indexTemplateTransformations = List.of(new IndexMappingTypeRemoval()); - private int awarenessAttributeDimensionality; + private final int awarenessAttributeDimensionality; public Transformer_ES_6_8_to_OS_2_11(int awarenessAttributeDimensionality) { this.awarenessAttributeDimensionality = awarenessAttributeDimensionality; @@ -39,7 +39,7 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) { templatesRoot.fields().forEachRemaining(template -> { var templateCopy = (ObjectNode) template.getValue().deepCopy(); var indexTemplate = (Index) () -> templateCopy; - transformIndex(indexTemplate, IndexType.Template); + transformIndex(indexTemplate, IndexType.TEMPLATE); templates.set(template.getKey(), indexTemplate.getRawJson()); }); newRoot.set("templates", templates); @@ -63,32 +63,34 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) { @Override public IndexMetadata transformIndexMetadata(IndexMetadata index) { var copy = index.deepCopy(); - transformIndex(copy, IndexType.Concrete); + transformIndex(copy, IndexType.CONCRETE); return new IndexMetadataData_OS_2_11(copy.getRawJson(), copy.getId(), copy.getName()); } private void transformIndex(Index index, IndexType type) { - logger.debug("Original Object: " + index.getRawJson().toString()); + log.atDebug().setMessage(()->"Original Object: {}").addArgument(index.getRawJson().toString()).log(); var newRoot = index.getRawJson(); switch (type) { - case Concrete: + case CONCRETE: indexTransformations.forEach(transformer -> transformer.applyTransformation(index)); break; - case Template: + case TEMPLATE: indexTemplateTransformations.forEach(transformer -> transformer.applyTransformation(index)); break; + default: + throw new IllegalArgumentException("Unknown type: " + type); } newRoot.set("settings", TransformFunctions.convertFlatSettingsToTree((ObjectNode) newRoot.get("settings"))); TransformFunctions.removeIntermediateIndexSettingsLevel(newRoot); // run before fixNumberOfReplicas TransformFunctions.fixReplicasForDimensionality(newRoot, awarenessAttributeDimensionality); - logger.debug("Transformed Object: " + newRoot.toString()); + log.atDebug().setMessage(()->"Transformed Object: {}").addArgument(newRoot.toString()).log(); } private enum IndexType { - Concrete, - Template; + CONCRETE, + TEMPLATE; } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_7_10_OS_2_11.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_7_10_OS_2_11.java index 51f036fb8..429748429 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_7_10_OS_2_11.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/Transformer_ES_7_10_OS_2_11.java @@ -1,18 +1,19 @@ package org.opensearch.migrations.bulkload.transformers; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.migrations.bulkload.models.GlobalMetadata; import org.opensearch.migrations.bulkload.models.IndexMetadata; import org.opensearch.migrations.bulkload.version_os_2_11.GlobalMetadataData_OS_2_11; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class Transformer_ES_7_10_OS_2_11 implements Transformer { - private static final Logger logger = LogManager.getLogger(Transformer_ES_7_10_OS_2_11.class); - private static final ObjectMapper mapper = new ObjectMapper(); - private int awarenessAttributeDimensionality; + public static final String INDEX_TEMPLATE_KEY_STR = "index_template"; + public static final String TEMPLATES_KEY_STR = "templates"; + public static final String COMPONENT_TEMPLATE_KEY_STR = "component_template"; + private final int awarenessAttributeDimensionality; public Transformer_ES_7_10_OS_2_11(int awarenessAttributeDimensionality) { this.awarenessAttributeDimensionality = awarenessAttributeDimensionality; @@ -23,24 +24,24 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) { ObjectNode root = metaData.toObjectNode().deepCopy(); // Transform the legacy templates - if (root.get("templates") != null) { - ObjectNode templatesRoot = (ObjectNode) root.get("templates").deepCopy(); + if (root.get(TEMPLATES_KEY_STR) != null) { + ObjectNode templatesRoot = (ObjectNode) root.get(TEMPLATES_KEY_STR).deepCopy(); templatesRoot.fieldNames().forEachRemaining(templateName -> { ObjectNode template = (ObjectNode) templatesRoot.get(templateName); - logger.info("Transforming template: " + templateName); - logger.debug("Original template: " + template.toString()); + log.atInfo().setMessage("Transforming template: {}").addArgument(templateName).log(); + log.atDebug().setMessage("Original template: {}").addArgument(template).log(); TransformFunctions.removeIntermediateIndexSettingsLevel(template); // run before fixNumberOfReplicas TransformFunctions.fixReplicasForDimensionality(templatesRoot, awarenessAttributeDimensionality); - logger.debug("Transformed template: " + template.toString()); + log.atDebug().setMessage("Transformed template: {}").addArgument(template).log(); templatesRoot.set(templateName, template); }); - root.set("templates", templatesRoot); + root.set(TEMPLATES_KEY_STR, templatesRoot); } // Transform the index templates - if (root.get("index_template") != null) { - ObjectNode indexTemplatesRoot = (ObjectNode) root.get("index_template").deepCopy(); - ObjectNode indexTemplateValuesRoot = (ObjectNode) indexTemplatesRoot.get("index_template"); + if (root.get(INDEX_TEMPLATE_KEY_STR) != null) { + ObjectNode indexTemplatesRoot = (ObjectNode) root.get(INDEX_TEMPLATE_KEY_STR).deepCopy(); + ObjectNode indexTemplateValuesRoot = (ObjectNode) indexTemplatesRoot.get(INDEX_TEMPLATE_KEY_STR); indexTemplateValuesRoot.fieldNames().forEachRemaining(templateName -> { ObjectNode template = (ObjectNode) indexTemplateValuesRoot.get(templateName); ObjectNode templateSubRoot = (ObjectNode) template.get("template"); @@ -49,21 +50,21 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) { return; } - logger.info("Transforming template: " + templateName); - logger.debug("Original template: " + template.toString()); + log.atInfo().setMessage("Transforming index template: {}").addArgument(templateName).log(); + log.atDebug().setMessage("Original index template: {}").addArgument(template).log(); TransformFunctions.removeIntermediateIndexSettingsLevel(templateSubRoot); // run before // fixNumberOfReplicas TransformFunctions.fixReplicasForDimensionality(templateSubRoot, awarenessAttributeDimensionality); - logger.debug("Transformed template: " + template.toString()); + log.atDebug().setMessage("Transformed index template: {}").addArgument(template).log(); indexTemplateValuesRoot.set(templateName, template); }); - root.set("index_template", indexTemplatesRoot); + root.set(INDEX_TEMPLATE_KEY_STR, indexTemplatesRoot); } // Transform the component templates - if (root.get("component_template") != null) { - ObjectNode componentTemplatesRoot = (ObjectNode) root.get("component_template").deepCopy(); - ObjectNode componentTemplateValuesRoot = (ObjectNode) componentTemplatesRoot.get("component_template"); + if (root.get(COMPONENT_TEMPLATE_KEY_STR) != null) { + ObjectNode componentTemplatesRoot = (ObjectNode) root.get(COMPONENT_TEMPLATE_KEY_STR).deepCopy(); + ObjectNode componentTemplateValuesRoot = (ObjectNode) componentTemplatesRoot.get(COMPONENT_TEMPLATE_KEY_STR); componentTemplateValuesRoot.fieldNames().forEachRemaining(templateName -> { ObjectNode template = (ObjectNode) componentTemplateValuesRoot.get(templateName); ObjectNode templateSubRoot = (ObjectNode) template.get("template"); @@ -72,13 +73,13 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) { return; } - logger.info("Transforming template: " + templateName); - logger.debug("Original template: " + template.toString()); + log.atInfo().setMessage("Transforming component template: {}").addArgument(templateName).log(); + log.atDebug().setMessage("Original component template: {}").addArgument(template).log(); // No transformation needed for component templates - logger.debug("Transformed template: " + template.toString()); + log.atDebug().setMessage("Transformed component template: {}").addArgument(template).log(); componentTemplateValuesRoot.set(templateName, template); }); - root.set("component_template", componentTemplatesRoot); + root.set(COMPONENT_TEMPLATE_KEY_STR, componentTemplatesRoot); } return new GlobalMetadataData_OS_2_11(root); @@ -86,7 +87,7 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) { @Override public IndexMetadata transformIndexMetadata(IndexMetadata indexData) { - logger.debug("Original Object: " + indexData.getRawJson().toString()); + log.atDebug().setMessage("Original Object: {}").addArgument(indexData.getRawJson()).log(); var copy = indexData.deepCopy(); var newRoot = copy.getRawJson(); @@ -96,7 +97,7 @@ public IndexMetadata transformIndexMetadata(IndexMetadata indexData) { TransformFunctions.removeIntermediateIndexSettingsLevel(newRoot); // run before fixNumberOfReplicas TransformFunctions.fixReplicasForDimensionality(newRoot, awarenessAttributeDimensionality); - logger.debug("Transformed Object: " + newRoot.toString()); + log.atDebug().setMessage("Transformed Object: {}").addArgument(newRoot).log(); return copy; } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotRepoProvider_ES_7_10.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotRepoProvider_ES_7_10.java index e6a72c4b2..adac50139 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotRepoProvider_ES_7_10.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotRepoProvider_ES_7_10.java @@ -38,13 +38,12 @@ public List getIndicesInSnapshot(String snapshotName) { .orElse(null); if (targetSnapshot != null) { - targetSnapshot.getIndexMetadataLookup().keySet().forEach(indexId -> { + targetSnapshot.getIndexMetadataLookup().keySet().forEach(indexId -> getRepoData().getIndices().forEach((indexName, rawIndex) -> { if (indexId.equals(rawIndex.getId())) { matchedIndices.add(SnapshotRepoData_ES_7_10.Index.fromRawIndex(indexName, rawIndex)); } - }); - }); + })); } return matchedIndices; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteDataProvider.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteDataProvider.java index a48019486..9cddfcb59 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteDataProvider.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteDataProvider.java @@ -24,9 +24,7 @@ public List getSnapshots() { @Override public List getIndicesInSnapshot(String snapshotName) { var indexes = new ArrayList(); - indexData.fields().forEachRemaining(index -> { - indexes.add(new RemoteIndexSnapshotData(index.getKey())); - }); + indexData.fields().forEachRemaining(index -> indexes.add(new RemoteIndexSnapshotData(index.getKey()))); return indexes; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java index 47a8abf57..e52576899 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReaderClient.java @@ -89,16 +89,14 @@ public ObjectNode getIndexes() { ObjectNode combineIndexDetails(List indexDetailsResponse) { var combinedDetails = objectMapper.createObjectNode(); - indexDetailsResponse.stream().forEach(detailsResponse -> { + indexDetailsResponse.stream().forEach(detailsResponse -> detailsResponse.fields().forEachRemaining(indexDetails -> { var indexName = indexDetails.getKey(); combinedDetails.putIfAbsent(indexName, objectMapper.createObjectNode()); var existingIndexDetails = (ObjectNode)combinedDetails.get(indexName); - indexDetails.getValue().fields().forEachRemaining(details -> { - existingIndexDetails.set(details.getKey(), details.getValue()); - }); - }); - }); + indexDetails.getValue().fields().forEachRemaining(details -> + existingIndexDetails.set(details.getKey(), details.getValue())); + })); return combinedDetails; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index 480b882e6..4ccd61780 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -27,11 +27,11 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator { public static final String INDEX_NAME = ".migrations_working_state"; public static final int MAX_REFRESH_RETRIES = 6; public static final int MAX_SETUP_RETRIES = 6; - final long ACQUIRE_WORK_RETRY_BASE_MS = 10; + static final long ACQUIRE_WORK_RETRY_BASE_MS = 10; // we'll retry lease acquisitions for up to - final int MAX_DRIFT_RETRIES = 13; // last delay before failure: 40 seconds - final int MAX_MALFORMED_ASSIGNED_WORK_DOC_RETRIES = 17; // last delay before failure: 655.36 seconds - final int MAX_ASSIGNED_DOCUMENT_NOT_FOUND_RETRY_INTERVAL = 60 * 1000; + static final int MAX_DRIFT_RETRIES = 13; // last delay before failure: 40 seconds + static final int MAX_MALFORMED_ASSIGNED_WORK_DOC_RETRIES = 17; // last delay before failure: 655.36 seconds + static final int MAX_ASSIGNED_DOCUMENT_NOT_FOUND_RETRY_INTERVAL = 60 * 1000; public static final String SCRIPT_VERSION_TEMPLATE = "{SCRIPT_VERSION}"; public static final String WORKER_ID_TEMPLATE = "{WORKER_ID}"; @@ -115,6 +115,9 @@ public OpenSearchWorkCoordinator( this.objectMapper = new ObjectMapper(); } + /** + * IWorkCoordinator extends AutoCloseable but this class has no resources that it owns that need to be closed. + */ @Override public void close() throws Exception { } @@ -130,14 +133,10 @@ public void setup(Supplier 0 && " + // don't obtain a lease lock - " ctx._source." - + COMPLETED_AT_FIELD_NAME - + " == null) {" + " ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {" + // already done - " if (ctx._source." - + LEASE_HOLDER_ID_FIELD_NAME - + " == params.workerId && " - + " ctx._source." - + EXPIRATION_FIELD_NAME - + " > serverTimeSeconds) {" + " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " + + " ctx._source." + EXPIRATION_FIELD_NAME + " > serverTimeSeconds) {" + // count as an update to force the caller to lookup the expiration time, but no need to modify it " ctx.op = \\\"update\\\";" - + " } else if (ctx._source." - + EXPIRATION_FIELD_NAME - + " < serverTimeSeconds && " - + // is expired - " ctx._source." - + EXPIRATION_FIELD_NAME - + " < newExpiration) {" - + // sanity check - " ctx._source." - + EXPIRATION_FIELD_NAME - + " = newExpiration;" - + " ctx._source." - + LEASE_HOLDER_ID_FIELD_NAME - + " = params.workerId;" + + " } else if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " + // is expired + " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" + // sanity check + " ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;" + + " ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " = params.workerId;" + " ctx._source.numAttempts += 1;" + " } else {" + " ctx.op = \\\"noop\\\";" @@ -394,29 +363,18 @@ public void completeWorkItem( + " \"script\": {\n" + " \"lang\": \"painless\",\n" + " \"params\": { \n" - + " \"clientTimestamp\": " - + CLIENT_TIMESTAMP_TEMPLATE - + ",\n" - + " \"workerId\": \"" - + WORKER_ID_TEMPLATE - + "\"\n" + + " \"clientTimestamp\": " + CLIENT_TIMESTAMP_TEMPLATE + ",\n" + + " \"workerId\": \"" + WORKER_ID_TEMPLATE + "\"\n" + " },\n" + " \"source\": \"" - + " if (ctx._source.scriptVersion != \\\"" - + SCRIPT_VERSION_TEMPLATE - + "\\\") {" + + " if (ctx._source.scriptVersion != \\\"" + SCRIPT_VERSION_TEMPLATE + "\\\") {" + " throw new IllegalArgumentException(\\\"scriptVersion mismatch. Not all participants are using the same script: sourceVersion=\\\" + ctx.source.scriptVersion);" + " } " - + " if (ctx._source." - + LEASE_HOLDER_ID_FIELD_NAME - + " != params.workerId) {" + + " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " != params.workerId) {" + " throw new IllegalArgumentException(\\\"work item was owned by \\\" + ctx._source." - + LEASE_HOLDER_ID_FIELD_NAME - + " + \\\" not \\\" + params.workerId);" + + LEASE_HOLDER_ID_FIELD_NAME + " + \\\" not \\\" + params.workerId);" + " } else {" - + " ctx._source." - + COMPLETED_AT_FIELD_NAME - + " = System.currentTimeMillis() / 1000;" + + " ctx._source." + COMPLETED_AT_FIELD_NAME + " = System.currentTimeMillis() / 1000;" + " }" + "\"\n" + " }\n" @@ -460,16 +418,12 @@ private int numWorkItemsArePending( + " \"bool\": {" + " \"must\": [" + " { \"exists\":" - + " { \"field\": \"" - + EXPIRATION_FIELD_NAME - + "\"}" + + " { \"field\": \"" + EXPIRATION_FIELD_NAME + "\"}" + " }" + " ]," + " \"must_not\": [" + " { \"exists\":" - + " { \"field\": \"" - + COMPLETED_AT_FIELD_NAME - + "\"}" + + " { \"field\": \"" + COMPLETED_AT_FIELD_NAME + "\"}" + " }" + " ]" + " }" @@ -522,8 +476,7 @@ UpdateResult assignOneWorkItem(long expirationWindowSeconds) throws IOException + "\"query\": {" + " \"function_score\": {\n" + QUERY_INCOMPLETE_EXPIRED_ITEMS_STR + "," + " \"random_score\": {},\n" - + " \"boost_mode\": \"replace\"\n" - + // Try to avoid the workers fighting for the same work items + + " \"boost_mode\": \"replace\"\n" + // Try to avoid the workers fighting for the same work items " }" + "}," + "\"size\": 1,\n" @@ -543,10 +496,8 @@ UpdateResult assignOneWorkItem(long expirationWindowSeconds) throws IOException + " throw new IllegalArgumentException(\\\"The current times indicated between the client and server are too different.\\\");" + " }" + " long newExpiration = params.clientTimestamp + (((long)Math.pow(2, ctx._source.numAttempts)) * params.expirationWindow);" - + " if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " - + // is expired - " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" - + // sanity check + + " if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " + // is expired + " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" + // sanity check " ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;" + " ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " = params.workerId;" + " ctx._source.numAttempts += 1;" @@ -683,7 +634,7 @@ private WorkItemAndDuration getAssignedWorkItem(LeaseChecker leaseChecker, ctx.addTraceException(e, false); var sleepBeforeNextRetryDuration = Duration.ofMillis( Math.min(MAX_ASSIGNED_DOCUMENT_NOT_FOUND_RETRY_INTERVAL, - (long) (Math.pow(2.0, (double) (retries-1)) * ACQUIRE_WORK_RETRY_BASE_MS))); + (long) (Math.pow(2.0, (retries-1)) * ACQUIRE_WORK_RETRY_BASE_MS))); leaseChecker.checkRetryWaitTimeOrThrow(e, retries-1, sleepBeforeNextRetryDuration); log.atWarn().setMessage(() -> "Couldn't complete work assignment due to exception. " @@ -695,8 +646,8 @@ private WorkItemAndDuration getAssignedWorkItem(LeaseChecker leaseChecker, @AllArgsConstructor private static class MaxTriesExceededException extends Exception { - Object suppliedValue; - Object transformedValue; + final transient Object suppliedValue; + final transient Object transformedValue; } @Getter diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java index 68374ee4c..69f061f82 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java @@ -29,7 +29,7 @@ public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexConte // TODO - parallelize this, maybe ~400-1K requests per thread and do it asynchronously BiConsumer logger = (indexName, accepted) -> { - if (!accepted) { + if (Boolean.FALSE.equals(accepted)) { log.info("Index " + indexName + " rejected by allowlist"); } }; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java index e10689dff..7e79418ae 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java @@ -100,7 +100,7 @@ private static void prepareShardWorkItems( SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider(); BiConsumer logger = (indexName, accepted) -> { - if (!accepted) { + if (Boolean.FALSE.equals(accepted)) { log.info("Index " + indexName + " rejected by allowlist"); } }; diff --git a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java index 47a567f37..1981b1779 100644 --- a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java +++ b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java @@ -31,7 +31,7 @@ private List getProviders() { } /** - * Gets a snapshot resource provider for the given version and source repo + * Gets a snapshot resource provider for the given version and source repo * @param version The version of the source cluster * @param repo The source repo that contains of the snapshot * @return The snapshot resource provider diff --git a/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/IMetadataMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/IMetadataMigrationContexts.java index 5aad062f6..60fffdd03 100644 --- a/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/IMetadataMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/IMetadataMigrationContexts.java @@ -3,13 +3,13 @@ import org.opensearch.migrations.bulkload.tracing.IRfsContexts; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; -public abstract class IMetadataMigrationContexts { +public interface IMetadataMigrationContexts { - public interface ITemplateContext extends IRfsContexts.ICheckedIdempotentPutRequestContext { + interface ITemplateContext extends IRfsContexts.ICheckedIdempotentPutRequestContext { String ACTIVITY_NAME = ActivityNames.MIGRATE_INDEX_TEMPLATE; } - public interface IClusterMetadataContext extends IScopedInstrumentationAttributes { + interface IClusterMetadataContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.MIGRATE_METADATA; ITemplateContext createMigrateLegacyTemplateContext(); @@ -19,11 +19,11 @@ public interface IClusterMetadataContext extends IScopedInstrumentationAttribute IRfsContexts.ICheckedIdempotentPutRequestContext createMigrateTemplateContext(); } - public interface ICreateIndexContext extends IRfsContexts.ICheckedIdempotentPutRequestContext { + interface ICreateIndexContext extends IRfsContexts.ICheckedIdempotentPutRequestContext { String ACTIVITY_NAME = ActivityNames.CREATE_INDEX; } - public static class ActivityNames { + class ActivityNames { public static final String CREATE_SNAPSHOT = "createSnapshot"; public static final String CREATE_INDEX = "createIndex"; public static final String MIGRATE_METADATA = "migrateMetadata"; @@ -32,7 +32,7 @@ public static class ActivityNames { private ActivityNames() {} } - public static class MetricNames { + class MetricNames { private MetricNames() {} } diff --git a/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/MetadataMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/MetadataMigrationContexts.java index 2af5be910..4387305bf 100644 --- a/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/MetadataMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/metadata/tracing/MetadataMigrationContexts.java @@ -11,8 +11,8 @@ import lombok.NonNull; -public class MetadataMigrationContexts { - public static class ClusterMetadataContext extends BaseSpanContext +public interface MetadataMigrationContexts { + class ClusterMetadataContext extends BaseSpanContext implements IMetadataMigrationContexts.IClusterMetadataContext { @@ -70,7 +70,7 @@ public IRfsContexts.ICheckedIdempotentPutRequestContext createMigrateTemplateCon } } - public static class MigrateTemplateContext extends BaseNestedSpanContext< + class MigrateTemplateContext extends BaseNestedSpanContext< RootMetadataMigrationContext, IMetadataMigrationContexts.IClusterMetadataContext> implements IMetadataMigrationContexts.ITemplateContext { @@ -113,7 +113,7 @@ public IRfsContexts.IRequestContext createPutContext() { } } - public static class CreateIndexContext extends BaseSpanContext + class CreateIndexContext extends BaseSpanContext implements IMetadataMigrationContexts.ICreateIndexContext { diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java index ef03e882f..7f3ffb246 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java @@ -13,9 +13,9 @@ import lombok.NonNull; -public class DocumentMigrationContexts extends IDocumentMigrationContexts { +public interface DocumentMigrationContexts extends IDocumentMigrationContexts { - public static abstract class BaseDocumentMigrationContext extends BaseSpanContext { + abstract class BaseDocumentMigrationContext extends BaseSpanContext { protected BaseDocumentMigrationContext(RootDocumentMigrationContext rootScope) { super(rootScope); } @@ -25,7 +25,7 @@ public RootWorkCoordinationContext getWorkCoordinationRootContext() { } } - public static class ShardSetupAttemptContext extends BaseDocumentMigrationContext + class ShardSetupAttemptContext extends BaseDocumentMigrationContext implements IShardSetupAttemptContext { protected ShardSetupAttemptContext(RootDocumentMigrationContext rootScope) { @@ -75,7 +75,7 @@ public IAddShardWorkItemContext createShardWorkItemContext() { } } - public static class AddShardWorkItemContext extends BaseNestedSpanContext< + class AddShardWorkItemContext extends BaseNestedSpanContext< RootDocumentMigrationContext, IShardSetupAttemptContext> implements IAddShardWorkItemContext { @@ -115,7 +115,7 @@ public IWorkCoordinationContexts.ICreateUnassignedWorkItemContext createUnassign } - public static class DocumentReindexContext extends BaseDocumentMigrationContext implements IDocumentReindexContext { + class DocumentReindexContext extends BaseDocumentMigrationContext implements IDocumentReindexContext { protected DocumentReindexContext(RootDocumentMigrationContext rootScope) { super(rootScope); diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java index 78d4ed42b..9a3cb580d 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java @@ -4,9 +4,9 @@ import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts; import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; -public abstract class IDocumentMigrationContexts { +public interface IDocumentMigrationContexts { - public static class ActivityNames { + class ActivityNames { private ActivityNames() {} public static final String DOCUMENT_REINDEX = "documentReindex"; @@ -14,11 +14,11 @@ private ActivityNames() {} public static final String ADD_SHARD_WORK_ITEM = "addShardWorkItem"; } - public static class MetricNames { + class MetricNames { private MetricNames() {} } - public interface IShardSetupAttemptContext extends IScopedInstrumentationAttributes { + interface IShardSetupAttemptContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.SHARD_SETUP_ATTEMPT; IWorkCoordinationContexts.IAcquireSpecificWorkContext createWorkAcquisitionContext(); @@ -28,13 +28,13 @@ public interface IShardSetupAttemptContext extends IScopedInstrumentationAttribu IAddShardWorkItemContext createShardWorkItemContext(); } - public interface IAddShardWorkItemContext extends IScopedInstrumentationAttributes { + interface IAddShardWorkItemContext extends IScopedInstrumentationAttributes { String ACTIVITY_NAME = ActivityNames.ADD_SHARD_WORK_ITEM; IWorkCoordinationContexts.ICreateUnassignedWorkItemContext createUnassignedWorkItemContext(); } - public interface IDocumentReindexContext + interface IDocumentReindexContext extends IWorkCoordinationContexts.IScopedWorkContext { String ACTIVITY_NAME = ActivityNames.DOCUMENT_REINDEX; diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index c8bf853a7..2f080f403 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -49,8 +49,8 @@ class StreamChannelConnectionCaptureSerializerTest { // Reference Timestamp chosen in the future with nanosecond precision resemble an upper bound on space overhead public static final Instant REFERENCE_TIMESTAMP = Instant.parse("2999-01-01T23:59:59.98765432Z"); - private final static String FAKE_EXCEPTION_DATA = "abcdefghijklmnop"; - private final static String FAKE_READ_PACKET_DATA = "ABCDEFGHIJKLMNOP"; + private static final String FAKE_EXCEPTION_DATA = "abcdefghijklmnop"; + private static final String FAKE_READ_PACKET_DATA = "ABCDEFGHIJKLMNOP"; private static int getEstimatedTrafficStreamByteSize(int readWriteEventCount, int averageDataPacketSize) { var fixedTimestamp = Timestamp.newBuilder() diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/IWireCaptureContexts.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/IWireCaptureContexts.java index 4e02b0e9f..02b5ab430 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/IWireCaptureContexts.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/IWireCaptureContexts.java @@ -4,9 +4,9 @@ import org.opensearch.migrations.tracing.IWithTypedEnclosingScope; import org.opensearch.migrations.tracing.commoncontexts.IHttpTransactionContext; -public abstract class IWireCaptureContexts { +public interface IWireCaptureContexts { - public static class ActivityNames { + class ActivityNames { private ActivityNames() {} public static final String BLOCKED = "blocked"; @@ -15,7 +15,7 @@ private ActivityNames() {} public static final String GATHERING_RESPONSE = "gatheringResponse"; } - public static class MetricNames { + class MetricNames { private MetricNames() {} public static final String UNREGISTERED = "unregistered"; @@ -28,7 +28,7 @@ private MetricNames() {} public static final String BYTES_WRITTEN = "bytesWritten"; } - public interface ICapturingConnectionContext + interface ICapturingConnectionContext extends org.opensearch.migrations.tracing.commoncontexts.IConnectionContext { IHttpMessageContext createInitialRequestContext(); @@ -38,7 +38,7 @@ public interface ICapturingConnectionContext void onRemoved(); } - public interface IHttpMessageContext + interface IHttpMessageContext extends IHttpTransactionContext, IWithStartTimeAndAttributes, @@ -52,7 +52,7 @@ public interface IHttpMessageContext IRequestContext createNextRequestContext(); } - public interface IRequestContext extends IHttpMessageContext { + interface IRequestContext extends IHttpMessageContext { String ACTIVITY_NAME = ActivityNames.GATHERING_REQUEST; default String getActivityName() { @@ -68,7 +68,7 @@ default String getActivityName() { void onBytesRead(int size); } - public interface IBlockingContext extends IHttpMessageContext { + interface IBlockingContext extends IHttpMessageContext { String ACTIVITY_NAME = ActivityNames.BLOCKED; default String getActivityName() { @@ -76,7 +76,7 @@ default String getActivityName() { } } - public interface IWaitingForResponseContext extends IHttpMessageContext { + interface IWaitingForResponseContext extends IHttpMessageContext { String ACTIVITY_NAME = ActivityNames.WAITING_FOR_RESPONSE; default String getActivityName() { @@ -84,7 +84,7 @@ default String getActivityName() { } } - public interface IResponseContext extends IHttpMessageContext { + interface IResponseContext extends IHttpMessageContext { String ACTIVITY_NAME = ActivityNames.GATHERING_RESPONSE; default String getActivityName() { diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/WireCaptureContexts.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/WireCaptureContexts.java index c593a92c8..aec642c87 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/WireCaptureContexts.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/tracing/WireCaptureContexts.java @@ -10,11 +10,11 @@ import lombok.Getter; import lombok.NonNull; -public class WireCaptureContexts extends IWireCaptureContexts { - public static final String COUNT_UNITS = "count"; - public static final String BYTES_UNIT = "bytes"; +public interface WireCaptureContexts extends IWireCaptureContexts { + String COUNT_UNITS = "count"; + String BYTES_UNIT = "bytes"; - public static class ConnectionContext extends org.opensearch.migrations.trafficcapture.tracing.ConnectionContext + class ConnectionContext extends org.opensearch.migrations.trafficcapture.tracing.ConnectionContext implements IWireCaptureContexts.ICapturingConnectionContext { public ConnectionContext(IRootWireLoggingContext rootInstrumentationScope, String connectionId, String nodeId) { @@ -64,7 +64,7 @@ public void onRemoved() { } @Getter - public abstract static class HttpMessageContext extends BaseNestedSpanContext< + abstract static class HttpMessageContext extends BaseNestedSpanContext< RootWireLoggingContext, IConnectionContext> implements IWireCaptureContexts.IHttpMessageContext { @@ -118,7 +118,7 @@ public IWireCaptureContexts.IRequestContext createNextRequestContext() { } } - public static class RequestContext extends HttpMessageContext implements IWireCaptureContexts.IRequestContext { + class RequestContext extends HttpMessageContext implements IWireCaptureContexts.IRequestContext { public RequestContext( RootWireLoggingContext rootWireLoggingContext, IConnectionContext enclosingScope, @@ -176,7 +176,7 @@ public void onBytesRead(int size) { } } - public static class BlockingContext extends HttpMessageContext implements IWireCaptureContexts.IBlockingContext { + class BlockingContext extends HttpMessageContext implements IWireCaptureContexts.IBlockingContext { public BlockingContext( RootWireLoggingContext rootWireLoggingContext, IConnectionContext enclosingScope, @@ -206,7 +206,7 @@ public RequestContext.MetricInstruments getMetrics() { } } - public static class WaitingForResponseContext extends HttpMessageContext + class WaitingForResponseContext extends HttpMessageContext implements IWireCaptureContexts.IWaitingForResponseContext { public WaitingForResponseContext( @@ -238,7 +238,7 @@ public RequestContext.MetricInstruments getMetrics() { } } - public static class ResponseContext extends HttpMessageContext implements IWireCaptureContexts.IResponseContext { + class ResponseContext extends HttpMessageContext implements IWireCaptureContexts.IResponseContext { public ResponseContext( RootWireLoggingContext rootWireLoggingContext, IConnectionContext enclosingScope, diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxySetupTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxySetupTest.java index 4e5ff226e..088a371e7 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxySetupTest.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxySetupTest.java @@ -14,7 +14,7 @@ public class CaptureProxySetupTest { - public final static String kafkaBrokerString = "invalid:9092"; + public static final String kafkaBrokerString = "invalid:9092"; public static final String TLS_PROTOCOLS_KEY = "plugins.security.ssl.http.enabled_protocols"; @Test diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java index 7523583c2..6579a2501 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java @@ -37,7 +37,7 @@ @Slf4j class NettyScanningHttpProxyTest { - private final static String EXPECTED_REQUEST_STRING = "GET / HTTP/1.1\r\n" + private static final String EXPECTED_REQUEST_STRING = "GET / HTTP/1.1\r\n" + "Host: localhost\r\n" + "User-Agent: UnitTest\r\n" + "DumbAndLongHeaderValue-0: 0\r\n" @@ -63,7 +63,7 @@ class NettyScanningHttpProxyTest { + "Accept-Encoding: gzip, x-gzip, deflate\r\n" + "Connection: keep-alive\r\n" + "\r\n"; - private final static String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + private static final String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + "Content-transfer-encoding: chunked\r\n" + "Date: Thu, 08 Jun 2023 23:06:23 GMT\r\n" + // This should be OK since it's always the same length diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResponse.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResponse.java index 7cc561cc8..4de7951b0 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResponse.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResponse.java @@ -34,6 +34,7 @@ public Builder(Instant requestSendTime) { super(requestSendTime); } + @Override public AggregatedRawResponse build() { return new AggregatedRawResponse( rawResponse, diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResult.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResult.java index a62cd21d0..ff83b5ffe 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResult.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/AggregatedRawResult.java @@ -53,7 +53,7 @@ public B addErrorCause(Throwable t) { } public B addResponsePacket(byte[] packet) { - return (B) addResponsePacket(packet, Instant.now()); + return addResponsePacket(packet, Instant.now()); } public B addResponsePacket(byte[] packet, Instant timestamp) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/EndOfInput.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/EndOfInput.java index 4e145df4b..a879ba7e7 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/EndOfInput.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/EndOfInput.java @@ -6,4 +6,4 @@ * is closed. It allows the NettySendByteBufsToPacketHandlerHandler class to determine * whether all the contents were received or if there was an error in-flight. */ -public class EndOfInput {} +public final class EndOfInput {} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java index c25dbe5dd..dcf1ca62d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java @@ -54,6 +54,10 @@ public static boolean retryIsUnnecessaryGivenStatusCode(int statusCode) { return shouldRetry(targetRequestBytes, currentResponse, reconstructedSourceTransactionFuture); } + /** + * @param targetRequestBytes the raw request as it was sent to the target cluster, which can be useful because + * of the HTTP verb and path. + */ public TrackedFuture shouldRetry(ByteBuf targetRequestBytes, AggregatedRawResponse currentResponse, diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/IReplayContexts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/IReplayContexts.java index ac82fbf39..b380fc617 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/IReplayContexts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/IReplayContexts.java @@ -11,9 +11,9 @@ import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes; import org.opensearch.migrations.tracing.IWithTypedEnclosingScope; -public abstract class IReplayContexts { +public interface IReplayContexts { - public static class ActivityNames { + class ActivityNames { private ActivityNames() {} public static final String CHANNEL = "channel"; @@ -33,7 +33,7 @@ private ActivityNames() {} public static final String TUPLE_COMPARISON = "comparingResults"; } - public static class MetricNames { + class MetricNames { private MetricNames() {} public static final String KAFKA_RECORD_READ = "kafkaRecordsRead"; @@ -68,9 +68,9 @@ private MetricNames() {} public static final String TUPLE_COMPARISON = "tupleComparison"; } - public interface IAccumulationScope extends IScopedInstrumentationAttributes {} + interface IAccumulationScope extends IScopedInstrumentationAttributes {} - public interface IChannelKeyContext + interface IChannelKeyContext extends IAccumulationScope, org.opensearch.migrations.tracing.commoncontexts.IConnectionContext { @@ -98,7 +98,7 @@ default String getNodeId() { void addFailedChannelCreation(); } - public interface ISocketContext extends IAccumulationScope, IWithTypedEnclosingScope { + interface ISocketContext extends IAccumulationScope, IWithTypedEnclosingScope { public static final String ACTIVITY_NAME = ActivityNames.TCP_CONNECTION; @Override @@ -107,7 +107,7 @@ default String getActivityName() { } } - public interface IKafkaRecordContext extends IAccumulationScope, IWithTypedEnclosingScope { + interface IKafkaRecordContext extends IAccumulationScope, IWithTypedEnclosingScope { String ACTIVITY_NAME = ActivityNames.RECORD_LIFETIME; @Override @@ -127,7 +127,7 @@ default AttributesBuilder fillAttributesForSpansBelow(AttributesBuilder builder) ITrafficStreamsLifecycleContext createTrafficLifecyleContext(ITrafficStreamKey tsk); } - public interface ITrafficStreamsLifecycleContext + interface ITrafficStreamsLifecycleContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -156,7 +156,7 @@ IReplayerHttpTransactionContext createHttpTransactionContext( ); } - public interface IReplayerHttpTransactionContext + interface IReplayerHttpTransactionContext extends org.opensearch.migrations.tracing.commoncontexts.IHttpTransactionContext, IAccumulationScope, @@ -212,7 +212,7 @@ default AttributesBuilder fillAttributesForSpansBelow(AttributesBuilder builder) ITupleHandlingContext createTupleContext(); } - public interface IRequestAccumulationContext + interface IRequestAccumulationContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -224,7 +224,7 @@ default String getActivityName() { } } - public interface IResponseAccumulationContext + interface IResponseAccumulationContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -236,7 +236,7 @@ default String getActivityName() { } } - public interface IRequestTransformationContext + interface IRequestTransformationContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -276,7 +276,7 @@ default String getActivityName() { void aggregateOutputChunk(int sizeInBytes); } - public interface IScheduledContext + interface IScheduledContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -288,7 +288,7 @@ default String getActivityName() { } } - public interface ITargetRequestContext + interface ITargetRequestContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -312,7 +312,7 @@ default String getActivityName() { IReceivingHttpResponseContext createHttpReceivingContext(); } - public interface IRequestConnectingContext + interface IRequestConnectingContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -324,7 +324,7 @@ default String getActivityName() { } } - public interface IRequestSendingContext + interface IRequestSendingContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -336,7 +336,7 @@ default String getActivityName() { } } - public interface IWaitingForHttpResponseContext + interface IWaitingForHttpResponseContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -348,7 +348,7 @@ default String getActivityName() { } } - public interface IReceivingHttpResponseContext + interface IReceivingHttpResponseContext extends IAccumulationScope, IWithTypedEnclosingScope { @@ -360,7 +360,7 @@ default String getActivityName() { } } - public interface ITupleHandlingContext + interface ITupleHandlingContext extends IAccumulationScope, IWithTypedEnclosingScope { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ReplayContexts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ReplayContexts.java index 942141000..ac7784de2 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ReplayContexts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ReplayContexts.java @@ -23,15 +23,13 @@ import lombok.Getter; import lombok.NonNull; import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -@Slf4j -public abstract class ReplayContexts extends IReplayContexts { +public interface ReplayContexts extends IReplayContexts { - public static final String COUNT_UNIT_STR = "count"; - public static final String BYTES_UNIT_STR = "bytes"; + String COUNT_UNIT_STR = "count"; + String BYTES_UNIT_STR = "bytes"; - public static class SocketContext extends DirectNestedSpanContext< + class SocketContext extends DirectNestedSpanContext< RootReplayerContext, ChannelKeyContext, IChannelKeyContext> implements ISocketContext { @@ -74,7 +72,7 @@ public MetricInstruments getMetrics() { } } - public static class ChannelKeyContext extends BaseNestedSpanContext< + class ChannelKeyContext extends BaseNestedSpanContext< RootReplayerContext, IScopedInstrumentationAttributes> implements IReplayContexts.IChannelKeyContext { @Getter @@ -135,7 +133,7 @@ public void addFailedChannelCreation() { } } - public static class KafkaRecordContext extends BaseNestedSpanContext + class KafkaRecordContext extends BaseNestedSpanContext implements IReplayContexts.IKafkaRecordContext { @@ -189,7 +187,7 @@ public IReplayContexts.ITrafficStreamsLifecycleContext createTrafficLifecyleCont } } - public static class TrafficStreamLifecycleContext extends BaseNestedSpanContext< + class TrafficStreamLifecycleContext extends BaseNestedSpanContext< RootReplayerContext, IScopedInstrumentationAttributes> implements IReplayContexts.ITrafficStreamsLifecycleContext { private final ITrafficStreamKey trafficStreamKey; @@ -255,7 +253,7 @@ public IReplayContexts.IChannelKeyContext getLogicalEnclosingScope() { } } - public static class HttpTransactionContext extends BaseNestedSpanContext< + class HttpTransactionContext extends BaseNestedSpanContext< RootReplayerContext, IReplayContexts.ITrafficStreamsLifecycleContext> implements IReplayContexts.IReplayerHttpTransactionContext { final UniqueReplayerRequestKey replayerRequestKey; @@ -347,7 +345,7 @@ public IReplayContexts.ITupleHandlingContext createTupleContext() { } } - public static class RequestAccumulationContext extends DirectNestedSpanContext< + class RequestAccumulationContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.IRequestAccumulationContext { @@ -371,7 +369,7 @@ private MetricInstruments(Meter meter, String activityName) { } } - public static class ResponseAccumulationContext extends DirectNestedSpanContext< + class ResponseAccumulationContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.IResponseAccumulationContext { @@ -395,7 +393,7 @@ private MetricInstruments(Meter meter, String activityName) { } } - public static class RequestTransformationContext extends DirectNestedSpanContext< + class RequestTransformationContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.IRequestTransformationContext { @@ -547,7 +545,7 @@ public void aggregateOutputChunk(int sizeInBytes) { } } - public static class ScheduledContext extends DirectNestedSpanContext< + class ScheduledContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.IScheduledContext { @@ -586,7 +584,7 @@ public void sendMeterEventsForEnd() { } } - public static class TargetRequestContext extends DirectNestedSpanContext< + class TargetRequestContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.ITargetRequestContext { @@ -656,7 +654,7 @@ public IReplayContexts.IWaitingForHttpResponseContext createWaitingForResponseCo } } - public static class RequestConnectingContext extends DirectNestedSpanContext< + class RequestConnectingContext extends DirectNestedSpanContext< RootReplayerContext, TargetRequestContext, IReplayContexts.ITargetRequestContext> implements IReplayContexts.IRequestConnectingContext { @@ -680,7 +678,7 @@ private MetricInstruments(Meter meter, String activityName) { } } - public static class RequestSendingContext extends DirectNestedSpanContext< + class RequestSendingContext extends DirectNestedSpanContext< RootReplayerContext, TargetRequestContext, IReplayContexts.ITargetRequestContext> implements IReplayContexts.IRequestSendingContext { @@ -704,7 +702,7 @@ private MetricInstruments(Meter meter, String activityName) { } } - public static class WaitingForHttpResponseContext extends DirectNestedSpanContext< + class WaitingForHttpResponseContext extends DirectNestedSpanContext< RootReplayerContext, TargetRequestContext, IReplayContexts.ITargetRequestContext> implements IReplayContexts.IWaitingForHttpResponseContext { @@ -729,7 +727,7 @@ private MetricInstruments(Meter meter, String activityName) { } - public static class ReceivingHttpResponseContext extends DirectNestedSpanContext< + class ReceivingHttpResponseContext extends DirectNestedSpanContext< RootReplayerContext, TargetRequestContext, IReplayContexts.ITargetRequestContext> implements IReplayContexts.IReceivingHttpResponseContext { @@ -756,7 +754,7 @@ private MetricInstruments(Meter meter, String activityName) { @Getter @Setter - public static class TupleHandlingContext extends DirectNestedSpanContext< + class TupleHandlingContext extends DirectNestedSpanContext< RootReplayerContext, HttpTransactionContext, IReplayContexts.IReplayerHttpTransactionContext> implements IReplayContexts.ITupleHandlingContext { diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java index 77b948da0..a5929530b 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java @@ -29,13 +29,13 @@ public static void setup() { CountingNettyResourceLeakDetector.activate(); } - final static String SAMPLE_REQUEST_STRING = "GET / HTTP/1.1\r\n" + static final String SAMPLE_REQUEST_STRING = "GET / HTTP/1.1\r\n" + "Host: localhost\r\n" + "Connection: Keep-Alive\r\n" + "User-Agent: UnitTest\r\n" + "\r\n"; - final static String SAMPLE_REQUEST_AS_BLOCKS = "[G],[E],[T],[ ],[/],[ ],[H],[T],[T],[P],[/],[1],[.],[1]," + static final String SAMPLE_REQUEST_AS_BLOCKS = "[G],[E],[T],[ ],[/],[ ],[H],[T],[T],[P],[/],[1],[.],[1]," + "[\r],[\n]," + "[H],[o],[s],[t],[:],[ ],[l],[o],[c],[a],[l],[h],[o],[s],[t]," + "[\r],[\n]," @@ -45,13 +45,13 @@ public static void setup() { + "[\r],[\n]," + "[\r],[\n]"; - final static String SAMPLE_REQUEST_AS_PARSED_HTTP = "GET / HTTP/1.1\r\n" + static final String SAMPLE_REQUEST_AS_PARSED_HTTP = "GET / HTTP/1.1\r\n" + "Host: localhost\r\n" + "Connection: Keep-Alive\r\n" + "User-Agent: UnitTest\r\n" + "\r\n"; - final static String SAMPLE_REQUEST_AS_PARSED_HTTP_SORTED = "GET / HTTP/1.1\r\n" + static final String SAMPLE_REQUEST_AS_PARSED_HTTP_SORTED = "GET / HTTP/1.1\r\n" + "Connection: Keep-Alive\r\n" + "Host: localhost\r\n" + "User-Agent: UnitTest\r\n" diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java index 610c63baa..3dbab055f 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java @@ -41,7 +41,7 @@ class ResultsToLogsConsumerTest extends InstrumentationTest { private static final ObjectMapper mapper = new ObjectMapper(); public static final String TEST_EXCEPTION_MESSAGE = "TEST_EXCEPTION"; - public final static String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + public static final String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + "Content-transfer-encoding: chunked\r\n" + "Date: Thu, 08 Jun 2023 23:06:23 GMT\r\n" + // This should be OK since it's always the same length diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index c13950705..1c0803db6 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -63,13 +63,13 @@ public class NettyPacketToHttpConsumerTest extends InstrumentationTest { public static final int LARGE_RESPONSE_CONTENT_LENGTH = 2 * 1024 * 1024; public static final int LARGE_RESPONSE_LENGTH = LARGE_RESPONSE_CONTENT_LENGTH + 107; - final static String EXPECTED_REQUEST_STRING = "GET / HTTP/1.1\r\n" + static final String EXPECTED_REQUEST_STRING = "GET / HTTP/1.1\r\n" + "Connection: Keep-Alive\r\n" + "Host: localhost\r\n" + "User-Agent: UnitTest\r\n" + "Connection: Keep-Alive\r\n" + "\r\n"; - public final static String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + public static final String EXPECTED_RESPONSE_STRING = "HTTP/1.1 200 OK\r\n" + "Content-Type: text/plain\r\n" + "Funtime: checkIt!\r\n" + "transfer-encoding: chunked\r\n" diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java index ea1fcb28f..073d882e4 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java @@ -37,7 +37,7 @@ @WrapWithNettyLeakDetection class HttpJsonTransformingConsumerTest extends InstrumentationTest { - private final static String NDJSON_TEST_REQUEST = ( + private static final String NDJSON_TEST_REQUEST = ( "POST /test HTTP/1.1\r\n" + "Host: foo.example\r\n" + "Content-Type: application/json\r\n" + diff --git a/testHelperFixtures/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java b/testHelperFixtures/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java index c1d47f859..bef187b18 100644 --- a/testHelperFixtures/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java +++ b/testHelperFixtures/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java @@ -37,8 +37,8 @@ */ public class SimpleHttpClientForTesting implements AutoCloseable { - private final static Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5); - private final static Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5); + private static final Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5); + private static final Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5); private final CloseableHttpClient httpClient;