diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java index ddff142c1..6efe1d8d6 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java @@ -14,6 +14,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey; +import org.opensearch.migrations.transform.IJsonTransformer; import io.netty.buffer.ByteBuf; import lombok.Lombok; @@ -27,26 +28,31 @@ public class ResultsToLogsConsumer implements BiConsumer getTransactionSummaryStringPreamble()).log(); + logger.atInfo().setMessage("{}").addArgument(ResultsToLogsConsumer::getTransactionSummaryStringPreamble).log(); return logger; } @@ -73,43 +79,48 @@ private Map toJSONObject(SourceTargetCaptureTuple tuple, ParsedH /** * Writes a tuple object to an output stream as a JSON object. * The JSON tuple is output on one line, and has several objects: "sourceRequest", "sourceResponse", - * "targetRequest", and "targetResponse". The "connectionId" is also included to aid in debugging. + * "targetRequest", and "targetResponses". The "connectionId", "numRequests", and "numErrors" are also included to aid in debugging. * An example of the format is below. *

* { - * "sourceRequest": { - * "Request-URI": XYZ, - * "Method": XYZ, - * "HTTP-Version": XYZ - * "body": XYZ, - * "header-1": XYZ, - * "header-2": XYZ - * }, - * "targetRequest": { - * "Request-URI": XYZ, - * "Method": XYZ, - * "HTTP-Version": XYZ - * "body": XYZ, - * "header-1": XYZ, - * "header-2": XYZ - * }, - * "sourceResponse": { - * "HTTP-Version": ABC, - * "Status-Code": ABC, - * "Reason-Phrase": ABC, - * "response_time_ms": 123, - * "body": ABC, - * "header-1": ABC - * }, - * "targetResponse": { - * "HTTP-Version": ABC, - * "Status-Code": ABC, - * "Reason-Phrase": ABC, - * "response_time_ms": 123, - * "body": ABC, - * "header-2": ABC - * }, - * "connectionId": "0242acfffe1d0008-0000000c-00000003-0745a19f7c3c5fc9-121001ff.0" + * "sourceRequest": { + * "Request-URI": "XYZ", + * "Method": "XYZ", + * "HTTP-Version": "XYZ", + * "header-1": "XYZ", + * "header-2": "XYZ", + * "body": "BASE64ENCODEDSTRING" + * }, + * "targetRequest": { + * "Request-URI": "XYZ", + * "Method": "XYZ", + * "HTTP-Version": "XYZ", + * "header-1": "XYZ", + * "header-2": "XYZ", + * "body": "BASE64ENCODEDSTRING" + * }, + * "sourceResponse": { + * "response_time_ms": 0, + * "HTTP-Version": "XYZ", + * "Status-Code": XYZ, + * "Reason-Phrase": "XYZ", + * "header-1": "XYZ", + * "header-2": "XYZ", + * "body": "BASE64ENCODEDSTRING" + * }, + * "targetResponses": [{ + * "response_time_ms": 0, + * "HTTP-Version": "XYZ", + * "Status-Code": XYZ, + * "Reason-Phrase": "XYZ", + * "header-1": "XYZ", + * "header-2": "XYZ", + * "body": "BASE64ENCODEDSTRING" + * }], + * "connectionId": "XYZ", + * "numRequests": XYZ, + * "numErrors": XYZ + * "error": "XYZ" * } * * @param tuple the RequestResponseResponseTriple object to be converted into json and written to the stream. @@ -122,7 +133,9 @@ public void accept(SourceTargetCaptureTuple tuple, ParsedHttpMessagesAsDicts par .log(); if (tupleLogger.isInfoEnabled()) { try { - var tupleString = PLAIN_MAPPER.writeValueAsString(toJSONObject(tuple, parsedMessages)); + var originalTuple = toJSONObject(tuple, parsedMessages); + var transformedTuple = tupleTransformer.transformJson(originalTuple); + var tupleString = PLAIN_MAPPER.writeValueAsString(transformedTuple); tupleLogger.atInfo().setMessage("{}").addArgument(() -> tupleString).log(); } catch (Exception e) { log.atError().setMessage("Exception converting tuple to string").setCause(e).log(); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index 73b88ad58..71ef7121c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -34,7 +34,9 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; +import com.beust.jcommander.ParametersDelegate; import io.netty.util.concurrent.DefaultThreadFactory; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -136,31 +138,12 @@ public static class Parameters { + "(cannot be used with other auth arguments)") String useSigV4ServiceAndRegion; - @Parameter( - required = false, - names = "--transformer-config-base64", - arity = 1, - description = "Configuration of message transformers. The same contents as --transformer-config but " + - "Base64 encoded so that the configuration is easier to pass as a command line parameter.") - String transformerConfigEncoded; + @ParametersDelegate + private RequestTransformationParams requestTransformationParams = new RequestTransformationParams(); - @Parameter( - required = false, - names = "--transformer-config", - arity = 1, - description = "Configuration of message transformers. Either as a string that identifies the " - + "transformer that should be run (with default settings) or as json to specify options " - + "as well as multiple transformers to run in sequence. " - + "For json, keys are the (simple) names of the loaded transformers and values are the " - + "configuration passed to each of the transformers.") - String transformerConfig; + @ParametersDelegate + private TupleTransformationParams tupleTransformationParams = new TupleTransformationParams(); - @Parameter( - required = false, - names = "--transformer-config-file", - arity = 1, - description = "Path to the JSON configuration file of message transformers.") - String transformerConfigFile; @Parameter( required = false, names = "--user-agent", @@ -257,6 +240,68 @@ public static class Parameters { String otelCollectorEndpoint; } + public interface TransformerParams { + String getTransformerConfigEncoded(); + String getTransformerConfig(); + String getTransformerConfigFile(); + } + + @Getter + public static class RequestTransformationParams implements TransformerParams { + + @Parameter( + required = false, + names = "--transformer-config-encoded", + arity = 1, + description = "Configuration of message transformers. The same contents as --transformer-config but " + + "Base64 encoded so that the configuration is easier to pass as a command line parameter.") + private String transformerConfigEncoded; + + @Parameter( + required = false, + names = "--transformer-config", + arity = 1, + description = "Configuration of message transformers. Either as a string that identifies the " + + "transformer that should be run (with default settings) or as json to specify options " + + "as well as multiple transformers to run in sequence. " + + "For json, keys are the (simple) names of the loaded transformers and values are the " + + "configuration passed to each of the transformers.") + private String transformerConfig; + + @Parameter( + required = false, + names = "--transformer-config-file", + arity = 1, + description = "Path to the JSON configuration file of message transformers.") + private String transformerConfigFile; + } + + @Getter + public static class TupleTransformationParams implements TransformerParams { + + @Parameter( + required = false, + names = "--tuple-transformer-config-base64", + arity = 1, + description = "Base64 encoded transformer configuration for tuple transformation.") + private String transformerConfigEncoded; + + @Parameter( + required = false, + names = "--tuple-transformer-config", + arity = 1, + description = "Transformer configuration string for tuple transformation.") + private String transformerConfig; + + @Parameter( + required = false, + names = "--tuple-transformer-config-file", + arity = 1, + description = "Path to the JSON configuration file for tuple transformation.") + private String transformerConfigFile; + } + + private static Parameters parseArgs(String[] args) { Parameters p = new Parameters(); JCommander jCommander = new JCommander(p); @@ -276,31 +321,31 @@ private static int isConfigured(String s) { return (s == null || s.isBlank()) ? 0 : 1; } - private static String getTransformerConfig(Parameters params) { - var configuredCount = isConfigured(params.transformerConfigFile) + - isConfigured(params.transformerConfigEncoded) + - isConfigured(params.transformerConfig); + private static String getTransformerConfig(TransformerParams params) { + var configuredCount = isConfigured(params.getTransformerConfigFile()) + + isConfigured(params.getTransformerConfigEncoded()) + + isConfigured(params.getTransformerConfig()); if (configuredCount > 1) { System.err.println("Specify only one of --transformer-config-base64, --transformer-config or " + "--transformer-config-file."); System.exit(4); } - if (params.transformerConfigFile != null && !params.transformerConfigFile.isBlank()) { + if (params.getTransformerConfigFile() != null && !params.getTransformerConfigFile().isBlank()) { try { - return Files.readString(Paths.get(params.transformerConfigFile), StandardCharsets.UTF_8); + return Files.readString(Paths.get(params.getTransformerConfigFile()), StandardCharsets.UTF_8); } catch (IOException e) { System.err.println("Error reading transformer configuration file: " + e.getMessage()); System.exit(5); } } - if (params.transformerConfig != null && !params.transformerConfig.isBlank()) { - return params.transformerConfig; + if (params.getTransformerConfig() != null && !params.getTransformerConfig().isBlank()) { + return params.getTransformerConfig(); } - if (params.transformerConfigEncoded != null && !params.transformerConfigEncoded.isBlank()) { - return new String(Base64.getDecoder().decode(params.transformerConfigEncoded)); + if (params.getTransformerConfigEncoded() != null && !params.getTransformerConfigEncoded().isBlank()) { + return new String(Base64.getDecoder().decode(params.getTransformerConfigEncoded())); } return null; @@ -365,10 +410,18 @@ public static void main(String[] args) throws Exception { var timeShifter = new TimeShifter(params.speedupFactor); var serverTimeout = Duration.ofSeconds(params.targetServerResponseTimeoutSeconds); - String transformerConfig = getTransformerConfig(params); - if (transformerConfig != null) { - log.atInfo().setMessage(() -> "Transformations config string: " + transformerConfig).log(); + String requestTransformerConfig = getTransformerConfig(params.requestTransformationParams); + if (requestTransformerConfig != null) { + log.atInfo().setMessage("Request Transformations config string: {}") + .addArgument(requestTransformerConfig).log(); } + + String tupleTransformerConfig = getTransformerConfig(params.tupleTransformationParams); + if (requestTransformerConfig != null) { + log.atInfo().setMessage("Tuple Transformations config string: {}") + .addArgument(tupleTransformerConfig).log(); + } + final var orderedRequestTracker = new OrderedWorkerTracker(); final var hostname = uri.getHost(); @@ -376,7 +429,7 @@ public static void main(String[] args) throws Exception { topContext, uri, authTransformer, - new TransformationLoader().getTransformerFactoryLoader(hostname, params.userAgent, transformerConfig), + new TransformationLoader().getTransformerFactoryLoader(hostname, params.userAgent, requestTransformerConfig), TrafficReplayerTopLevel.makeNettyPacketConsumerConnectionPool( uri, params.allowInsecureConnections, @@ -404,7 +457,8 @@ public static void main(String[] args) throws Exception { }, ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS); setupShutdownHookForReplayer(tr); - var tupleWriter = new TupleParserChainConsumer(new ResultsToLogsConsumer()); + var tupleWriter = new TupleParserChainConsumer(new ResultsToLogsConsumer(null, null, + new TransformationLoader().getTransformerFactoryLoader(hostname, params.userAgent, requestTransformerConfig))); tr.setupRunAndWaitForReplayWithShutdownChecks( Duration.ofSeconds(params.observedPacketConnectionTimeout), serverTimeout, diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java index 3dbab055f..eb8d37e87 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java @@ -7,6 +7,9 @@ import java.time.Instant; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Collectors; import com.fasterxml.jackson.core.JsonProcessingException; @@ -22,10 +25,12 @@ import org.opensearch.migrations.replay.datatypes.ByteBufList; import org.opensearch.migrations.replay.datatypes.HttpRequestTransformationStatus; import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKeyAndContext; +import org.opensearch.migrations.replay.tracing.IReplayContexts.ITupleHandlingContext; import org.opensearch.migrations.testutils.CloseableLogSetup; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import org.opensearch.migrations.tracing.InstrumentationTest; import org.opensearch.migrations.tracing.TestContext; +import org.opensearch.migrations.transform.IJsonTransformer; import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; @@ -34,7 +39,7 @@ @WrapWithNettyLeakDetection(repetitions = 4) class ResultsToLogsConsumerTest extends InstrumentationTest { static { - // Synchronize logging to for assertions + // Synchronize logging for assertions LogManager.setFactory(new Log4jContextFactory(new ClassLoaderContextSelector())); } private static final String NODE_ID = "n"; @@ -82,7 +87,7 @@ public void testOutputterWithNulls() throws IOException { var responses = new TransformedTargetRequestAndResponseList(null, HttpRequestTransformationStatus.skipped()); var emptyTuple = new SourceTargetCaptureTuple(rootContext.getTestTupleContext(), null, responses, null); try (var closeableLogSetup = new CloseableLogSetup(calculateLoggerName(this.getClass()))) { - var resultsToLogsConsumer = new ResultsToLogsConsumer(closeableLogSetup.getTestLogger(), null); + var resultsToLogsConsumer = new ResultsToLogsConsumer(closeableLogSetup.getTestLogger(), null, null); var consumer = new TupleParserChainConsumer(resultsToLogsConsumer); consumer.accept(emptyTuple); Assertions.assertEquals(1, closeableLogSetup.getLogEvents().size()); @@ -99,7 +104,7 @@ public void testOutputterWithException() { var exception = new Exception(TEST_EXCEPTION_MESSAGE); var emptyTuple = new SourceTargetCaptureTuple(rootContext.getTestTupleContext(), null, responses, exception); try (var closeableLogSetup = new CloseableLogSetup(calculateLoggerName(this.getClass()))) { - var resultsToLogsConsumer = new ResultsToLogsConsumer(closeableLogSetup.getTestLogger(), null); + var resultsToLogsConsumer = new ResultsToLogsConsumer(closeableLogSetup.getTestLogger(), null, null); var consumer = new TupleParserChainConsumer(resultsToLogsConsumer); consumer.accept(emptyTuple); Assertions.assertEquals(1, closeableLogSetup.getLogEvents().size()); @@ -172,7 +177,7 @@ public void testOutputterForGet() throws IOException { " \"numRequests\":1," + " \"numErrors\":0\r\n" + "}"; - testOutputterForRequest("get_withAuthHeader.txt", EXPECTED_LOGGED_OUTPUT); + testOutputterForRequest("get_withAuthHeader.txt", EXPECTED_LOGGED_OUTPUT, null); } @Test @@ -231,10 +236,11 @@ public void testOutputterForPost() throws IOException { " \"numRequests\":1," + " \"numErrors\":0\r\n" + "}"; - testOutputterForRequest("post_formUrlEncoded_withFixedLength.txt", EXPECTED_LOGGED_OUTPUT); + testOutputterForRequest("post_formUrlEncoded_withFixedLength.txt", EXPECTED_LOGGED_OUTPUT, null); } - private void testOutputterForRequest(String requestResourceName, String expected) throws IOException { + + private void testOutputterForRequest(String requestResourceName, String expected, IJsonTransformer transformer) throws IOException { var trafficStreamKey = PojoTrafficStreamKeyAndContext.build( NODE_ID, "c", @@ -264,7 +270,7 @@ private void testOutputterForRequest(String requestResourceName, String expected targetResponses, null ); - var streamConsumer = new ResultsToLogsConsumer(closeableLogSetup.getTestLogger(), null); + var streamConsumer = new ResultsToLogsConsumer(closeableLogSetup.getTestLogger(), null, transformer); var consumer = new TupleParserChainConsumer(streamConsumer); consumer.accept(tuple); Assertions.assertEquals(1, closeableLogSetup.getLogEvents().size()); @@ -291,4 +297,122 @@ private void testOutputterForRequest(String requestResourceName, String expected static String normalizeJson(String input) throws JsonProcessingException { return mapper.writeValueAsString(mapper.readTree(input)); } + + @Test + @ResourceLock("TestContext") + public void testTransformerWithJsonJoltTransformer() throws IOException { + // Define a simple Jolt specification that adds a new field "transformed": true + String joltSpec = + "{\n" + + " \"operation\": \"default\",\n" + + " \"spec\": {\n" + + " \"transformed\": true\n" + + " }\n" + + "}\n"; + + String fullConfig = "[{\"JsonJoltTransformerProvider\": { \"script\": " + joltSpec + "}}]"; + + IJsonTransformer jsonJoltTransformer = new TransformationLoader().getTransformerFactoryLoader(null, null, fullConfig); + + try (var tupleContext = rootContext.getTestTupleContext(); + var closeableLogSetup = new CloseableLogSetup(calculateLoggerName(this.getClass()))) { + + SourceTargetCaptureTuple tuple = createSampleTuple(tupleContext); + + ResultsToLogsConsumer consumer = new ResultsToLogsConsumer(closeableLogSetup.getTestLogger(), null, jsonJoltTransformer); + + // Use TupleParserChainConsumer to accept the tuple + TupleParserChainConsumer chainConsumer = new TupleParserChainConsumer(consumer); + chainConsumer.accept(tuple); + + // Verify that one log event was captured + Assertions.assertEquals(1, closeableLogSetup.getLogEvents().size(), "Expected one log event to be captured"); + String loggedJson = closeableLogSetup.getLogEvents().get(0); + + // Deserialize the logged JSON + var loggedMap = mapper.readValue(loggedJson, Map.class); + + // Assert that the "transformed" field was added + Assertions.assertTrue(loggedMap.containsKey("transformed"), "The transformed field should be present"); + Assertions.assertEquals(true, loggedMap.get("transformed"), "The transformed field should be true"); + + // Optionally, assert that existing fields remain unchanged + Assertions.assertTrue(loggedMap.containsKey("sourceRequest"), "sourceRequest should be present"); + Assertions.assertTrue(loggedMap.containsKey("sourceResponse"), "sourceResponse should be present"); + Assertions.assertTrue(loggedMap.containsKey("targetRequest"), "targetRequest should be present"); + Assertions.assertTrue(loggedMap.containsKey("targetResponses"), "targetResponses should be present"); + Assertions.assertTrue(loggedMap.containsKey("connectionId"), "connectionId should be present"); + Assertions.assertTrue(loggedMap.containsKey("numRequests"), "numRequests should be present"); + Assertions.assertTrue(loggedMap.containsKey("numErrors"), "numErrors should be present"); + } + } + + private SourceTargetCaptureTuple createSampleTuple(ITupleHandlingContext tupleContext) throws IOException { + var trafficStreamKey = PojoTrafficStreamKeyAndContext.build( + NODE_ID, + "c", + 0, + rootContext::createTrafficStreamContextForTest + ); + + var sourcePair = new RequestResponsePacketPair(trafficStreamKey, Instant.EPOCH, 0, 1); + + Map sourceRequest = new HashMap<>(); + sourceRequest.put("Request-URI", "/api/source"); + sourceRequest.put("Method", "GET"); + sourceRequest.put("HTTP-Version", "HTTP/1.1"); + sourceRequest.put("header-1", "Value1"); + sourceRequest.put("header-2", "Value2"); + sourceRequest.put("body", Base64.getEncoder().encodeToString("source request body".getBytes(StandardCharsets.UTF_8))); + byte[] rawRequestData = new ObjectMapper().writeValueAsBytes(sourceRequest); + sourcePair.addRequestData(Instant.EPOCH, rawRequestData); + + Map sourceResponse = new HashMap<>(); + sourceResponse.put("response_time_ms", 150); + sourceResponse.put("HTTP-Version", "HTTP/1.1"); + sourceResponse.put("Status-Code", 200); + sourceResponse.put("Reason-Phrase", "OK"); + sourceResponse.put("header-1", "ValueX"); + sourceResponse.put("header-2", "ValueY"); + sourceResponse.put("body", Base64.getEncoder().encodeToString("source response body".getBytes(StandardCharsets.UTF_8))); + byte[] rawResponseData = new ObjectMapper().writeValueAsBytes(sourceResponse); + sourcePair.addResponseData(Instant.EPOCH, rawResponseData); + + Map targetRequest = new HashMap<>(); + targetRequest.put("Request-URI", "/api/target"); + targetRequest.put("Method", "POST"); + targetRequest.put("HTTP-Version", "HTTP/1.1"); + targetRequest.put("header-1", "ValueA"); + targetRequest.put("header-2", "ValueB"); + targetRequest.put("body", Base64.getEncoder().encodeToString("target request body".getBytes(StandardCharsets.UTF_8))); + byte[] targetRequestBytes = new ObjectMapper().writeValueAsBytes(targetRequest); + ByteBufList targetRequestData = new ByteBufList(); + targetRequestData.add(Unpooled.wrappedBuffer(targetRequestBytes)); + + Map targetResponse = new HashMap<>(); + targetResponse.put("response_time_ms", 200); + targetResponse.put("HTTP-Version", "HTTP/1.1"); + targetResponse.put("Status-Code", 201); + targetResponse.put("Reason-Phrase", "Created"); + targetResponse.put("header-1", "ValueM"); + targetResponse.put("header-2", "ValueN"); + targetResponse.put("body", Base64.getEncoder().encodeToString("target response body".getBytes(StandardCharsets.UTF_8))); + byte[] targetResponseBytes = new ObjectMapper().writeValueAsBytes(targetResponse); + var targetResponses = new ArrayList>(); + targetResponses.add(new AbstractMap.SimpleEntry<>(Instant.now(), targetResponseBytes)); + + AggregatedRawResponse aggregatedResponse = new AggregatedRawResponse(null, 13, Duration.ofMillis(200), targetResponses, null); + TransformedTargetRequestAndResponseList transformedTargetRequestAndResponseList = new TransformedTargetRequestAndResponseList( + targetRequestData, + HttpRequestTransformationStatus.skipped(), + aggregatedResponse + ); + + return new SourceTargetCaptureTuple( + tupleContext, + sourcePair, + transformedTargetRequestAndResponseList, + null // No top-level error + ); + } }