Skip to content

Commit

Permalink
Add Transformation support for tuples
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Oct 2, 2024
1 parent 5015220 commit f96fe2f
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey;
import org.opensearch.migrations.transform.IJsonTransformer;

import io.netty.buffer.ByteBuf;
import lombok.Lombok;
Expand All @@ -27,26 +28,31 @@ public class ResultsToLogsConsumer implements BiConsumer<SourceTargetCaptureTupl
public static final String TRANSACTION_SUMMARY_LOGGER = "TransactionSummaryLogger";
private static final String MISSING_STR = "-";
private static final ObjectMapper PLAIN_MAPPER = new ObjectMapper();
private static final IJsonTransformer NOOP_JSON_TRANSFORMER = new TransformationLoader().getTransformerFactoryLoader(
null, null, "NoopTransformerProvider");

private final Logger tupleLogger;
private final Logger progressLogger;
private final IJsonTransformer tupleTransformer;

private final AtomicInteger tupleCounter;

public ResultsToLogsConsumer() {
this(null, null);
this(null, null, null);

Check warning on line 41 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java#L41

Added line #L41 was not covered by tests
}

public ResultsToLogsConsumer(Logger tupleLogger, Logger progressLogger) {
public ResultsToLogsConsumer(Logger tupleLogger, Logger progressLogger, IJsonTransformer tupleTransformer) {
this.tupleLogger = tupleLogger != null ? tupleLogger : LoggerFactory.getLogger(OUTPUT_TUPLE_JSON_LOGGER);
this.progressLogger = progressLogger != null ? progressLogger : makeTransactionSummaryLogger();
tupleCounter = new AtomicInteger();
this.tupleTransformer = tupleTransformer != null ? tupleTransformer : NOOP_JSON_TRANSFORMER ;
}

// set this up so that the preamble prints out once, right after we have a logger
// if it's configured to output at all
private static Logger makeTransactionSummaryLogger() {
var logger = LoggerFactory.getLogger(TRANSACTION_SUMMARY_LOGGER);
logger.atInfo().setMessage("{}").addArgument(() -> getTransactionSummaryStringPreamble()).log();
logger.atInfo().setMessage("{}").addArgument(ResultsToLogsConsumer::getTransactionSummaryStringPreamble).log();
return logger;
}

Expand All @@ -73,43 +79,48 @@ private Map<String, Object> toJSONObject(SourceTargetCaptureTuple tuple, ParsedH
/**
* Writes a tuple object to an output stream as a JSON object.
* The JSON tuple is output on one line, and has several objects: "sourceRequest", "sourceResponse",
* "targetRequest", and "targetResponse". The "connectionId" is also included to aid in debugging.
* "targetRequest", and "targetResponses". The "connectionId", "numRequests", and "numErrors" are also included to aid in debugging.
* An example of the format is below.
* <p>
* {
* "sourceRequest": {
* "Request-URI": XYZ,
* "Method": XYZ,
* "HTTP-Version": XYZ
* "body": XYZ,
* "header-1": XYZ,
* "header-2": XYZ
* },
* "targetRequest": {
* "Request-URI": XYZ,
* "Method": XYZ,
* "HTTP-Version": XYZ
* "body": XYZ,
* "header-1": XYZ,
* "header-2": XYZ
* },
* "sourceResponse": {
* "HTTP-Version": ABC,
* "Status-Code": ABC,
* "Reason-Phrase": ABC,
* "response_time_ms": 123,
* "body": ABC,
* "header-1": ABC
* },
* "targetResponse": {
* "HTTP-Version": ABC,
* "Status-Code": ABC,
* "Reason-Phrase": ABC,
* "response_time_ms": 123,
* "body": ABC,
* "header-2": ABC
* },
* "connectionId": "0242acfffe1d0008-0000000c-00000003-0745a19f7c3c5fc9-121001ff.0"
* "sourceRequest": {
* "Request-URI": "XYZ",
* "Method": "XYZ",
* "HTTP-Version": "XYZ",
* "header-1": "XYZ",
* "header-2": "XYZ",
* "body": "BASE64ENCODEDSTRING"
* },
* "targetRequest": {
* "Request-URI": "XYZ",
* "Method": "XYZ",
* "HTTP-Version": "XYZ",
* "header-1": "XYZ",
* "header-2": "XYZ",
* "body": "BASE64ENCODEDSTRING"
* },
* "sourceResponse": {
* "response_time_ms": 0,
* "HTTP-Version": "XYZ",
* "Status-Code": XYZ,
* "Reason-Phrase": "XYZ",
* "header-1": "XYZ",
* "header-2": "XYZ",
* "body": "BASE64ENCODEDSTRING"
* },
* "targetResponses": [{
* "response_time_ms": 0,
* "HTTP-Version": "XYZ",
* "Status-Code": XYZ,
* "Reason-Phrase": "XYZ",
* "header-1": "XYZ",
* "header-2": "XYZ",
* "body": "BASE64ENCODEDSTRING"
* }],
* "connectionId": "XYZ",
* "numRequests": XYZ,
* "numErrors": XYZ
* "error": "XYZ"
* }
*
* @param tuple the RequestResponseResponseTriple object to be converted into json and written to the stream.
Expand All @@ -122,7 +133,9 @@ public void accept(SourceTargetCaptureTuple tuple, ParsedHttpMessagesAsDicts par
.log();
if (tupleLogger.isInfoEnabled()) {
try {
var tupleString = PLAIN_MAPPER.writeValueAsString(toJSONObject(tuple, parsedMessages));
var originalTuple = toJSONObject(tuple, parsedMessages);
var transformedTuple = tupleTransformer.transformJson(originalTuple);
var tupleString = PLAIN_MAPPER.writeValueAsString(transformedTuple);
tupleLogger.atInfo().setMessage("{}").addArgument(() -> tupleString).log();
} catch (Exception e) {
log.atError().setMessage("Exception converting tuple to string").setCause(e).log();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
Expand Down Expand Up @@ -136,31 +138,12 @@ public static class Parameters {
+ "(cannot be used with other auth arguments)")
String useSigV4ServiceAndRegion;

@Parameter(
required = false,
names = "--transformer-config-base64",
arity = 1,
description = "Configuration of message transformers. The same contents as --transformer-config but " +
"Base64 encoded so that the configuration is easier to pass as a command line parameter.")
String transformerConfigEncoded;
@ParametersDelegate

Check warning on line 141 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L141

Added line #L141 was not covered by tests
private RequestTransformationParams requestTransformationParams = new RequestTransformationParams();

@Parameter(
required = false,
names = "--transformer-config",
arity = 1,
description = "Configuration of message transformers. Either as a string that identifies the "
+ "transformer that should be run (with default settings) or as json to specify options "
+ "as well as multiple transformers to run in sequence. "
+ "For json, keys are the (simple) names of the loaded transformers and values are the "
+ "configuration passed to each of the transformers.")
String transformerConfig;
@ParametersDelegate

Check warning on line 144 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L144

Added line #L144 was not covered by tests
private TupleTransformationParams tupleTransformationParams = new TupleTransformationParams();

@Parameter(
required = false,
names = "--transformer-config-file",
arity = 1,
description = "Path to the JSON configuration file of message transformers.")
String transformerConfigFile;
@Parameter(
required = false,
names = "--user-agent",
Expand Down Expand Up @@ -257,6 +240,68 @@ public static class Parameters {
String otelCollectorEndpoint;
}

public interface TransformerParams {
String getTransformerConfigEncoded();
String getTransformerConfig();
String getTransformerConfigFile();
}

@Getter
public static class RequestTransformationParams implements TransformerParams {

Check warning on line 250 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L250

Added line #L250 was not covered by tests

@Parameter(
required = false,
names = "--transformer-config-encoded",
arity = 1,
description = "Configuration of message transformers. The same contents as --transformer-config but " +
"Base64 encoded so that the configuration is easier to pass as a command line parameter.")
private String transformerConfigEncoded;

Check warning on line 258 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L258

Added line #L258 was not covered by tests

@Parameter(
required = false,
names = "--transformer-config",
arity = 1,
description = "Configuration of message transformers. Either as a string that identifies the "
+ "transformer that should be run (with default settings) or as json to specify options "
+ "as well as multiple transformers to run in sequence. "
+ "For json, keys are the (simple) names of the loaded transformers and values are the "
+ "configuration passed to each of the transformers.")
private String transformerConfig;

Check warning on line 269 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L269

Added line #L269 was not covered by tests

@Parameter(
required = false,
names = "--transformer-config-file",
arity = 1,
description = "Path to the JSON configuration file of message transformers.")
private String transformerConfigFile;

Check warning on line 276 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L276

Added line #L276 was not covered by tests
}

@Getter
public static class TupleTransformationParams implements TransformerParams {

Check warning on line 280 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L280

Added line #L280 was not covered by tests

@Parameter(
required = false,
names = "--tuple-transformer-config-base64",
arity = 1,
description = "Base64 encoded transformer configuration for tuple transformation.")
private String transformerConfigEncoded;

Check warning on line 287 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L287

Added line #L287 was not covered by tests

@Parameter(
required = false,
names = "--tuple-transformer-config",
arity = 1,
description = "Transformer configuration string for tuple transformation.")
private String transformerConfig;

Check warning on line 294 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L294

Added line #L294 was not covered by tests

@Parameter(
required = false,
names = "--tuple-transformer-config-file",
arity = 1,
description = "Path to the JSON configuration file for tuple transformation.")
private String transformerConfigFile;

Check warning on line 301 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L301

Added line #L301 was not covered by tests
}


private static Parameters parseArgs(String[] args) {
Parameters p = new Parameters();
JCommander jCommander = new JCommander(p);
Expand All @@ -276,31 +321,31 @@ private static int isConfigured(String s) {
return (s == null || s.isBlank()) ? 0 : 1;
}

private static String getTransformerConfig(Parameters params) {
var configuredCount = isConfigured(params.transformerConfigFile) +
isConfigured(params.transformerConfigEncoded) +
isConfigured(params.transformerConfig);
private static String getTransformerConfig(TransformerParams params) {
var configuredCount = isConfigured(params.getTransformerConfigFile()) +
isConfigured(params.getTransformerConfigEncoded()) +
isConfigured(params.getTransformerConfig());

Check warning on line 327 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L325-L327

Added lines #L325 - L327 were not covered by tests
if (configuredCount > 1) {
System.err.println("Specify only one of --transformer-config-base64, --transformer-config or " +
"--transformer-config-file.");
System.exit(4);
}

if (params.transformerConfigFile != null && !params.transformerConfigFile.isBlank()) {
if (params.getTransformerConfigFile() != null && !params.getTransformerConfigFile().isBlank()) {
try {
return Files.readString(Paths.get(params.transformerConfigFile), StandardCharsets.UTF_8);
return Files.readString(Paths.get(params.getTransformerConfigFile()), StandardCharsets.UTF_8);

Check warning on line 336 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L336

Added line #L336 was not covered by tests
} catch (IOException e) {
System.err.println("Error reading transformer configuration file: " + e.getMessage());
System.exit(5);
}
}

if (params.transformerConfig != null && !params.transformerConfig.isBlank()) {
return params.transformerConfig;
if (params.getTransformerConfig() != null && !params.getTransformerConfig().isBlank()) {
return params.getTransformerConfig();

Check warning on line 344 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L344

Added line #L344 was not covered by tests
}

if (params.transformerConfigEncoded != null && !params.transformerConfigEncoded.isBlank()) {
return new String(Base64.getDecoder().decode(params.transformerConfigEncoded));
if (params.getTransformerConfigEncoded() != null && !params.getTransformerConfigEncoded().isBlank()) {
return new String(Base64.getDecoder().decode(params.getTransformerConfigEncoded()));

Check warning on line 348 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L348

Added line #L348 was not covered by tests
}

return null;
Expand Down Expand Up @@ -365,18 +410,26 @@ public static void main(String[] args) throws Exception {
var timeShifter = new TimeShifter(params.speedupFactor);
var serverTimeout = Duration.ofSeconds(params.targetServerResponseTimeoutSeconds);

String transformerConfig = getTransformerConfig(params);
if (transformerConfig != null) {
log.atInfo().setMessage(() -> "Transformations config string: " + transformerConfig).log();
String requestTransformerConfig = getTransformerConfig(params.requestTransformationParams);

Check warning on line 413 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L413

Added line #L413 was not covered by tests
if (requestTransformerConfig != null) {
log.atInfo().setMessage("Request Transformations config string: {}")
.addArgument(requestTransformerConfig).log();

Check warning on line 416 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L415-L416

Added lines #L415 - L416 were not covered by tests
}

String tupleTransformerConfig = getTransformerConfig(params.tupleTransformationParams);

Check warning on line 419 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L419

Added line #L419 was not covered by tests
if (requestTransformerConfig != null) {
log.atInfo().setMessage("Tuple Transformations config string: {}")
.addArgument(tupleTransformerConfig).log();

Check warning on line 422 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L421-L422

Added lines #L421 - L422 were not covered by tests
}

final var orderedRequestTracker = new OrderedWorkerTracker<Void>();
final var hostname = uri.getHost();

var tr = new TrafficReplayerTopLevel(
topContext,
uri,
authTransformer,
new TransformationLoader().getTransformerFactoryLoader(hostname, params.userAgent, transformerConfig),
new TransformationLoader().getTransformerFactoryLoader(hostname, params.userAgent, requestTransformerConfig),

Check warning on line 432 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L432

Added line #L432 was not covered by tests
TrafficReplayerTopLevel.makeNettyPacketConsumerConnectionPool(
uri,
params.allowInsecureConnections,
Expand Down Expand Up @@ -404,7 +457,8 @@ public static void main(String[] args) throws Exception {
}, ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS);

setupShutdownHookForReplayer(tr);
var tupleWriter = new TupleParserChainConsumer(new ResultsToLogsConsumer());
var tupleWriter = new TupleParserChainConsumer(new ResultsToLogsConsumer(null, null,
new TransformationLoader().getTransformerFactoryLoader(hostname, params.userAgent, requestTransformerConfig)));

Check warning on line 461 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L460-L461

Added lines #L460 - L461 were not covered by tests
tr.setupRunAndWaitForReplayWithShutdownChecks(
Duration.ofSeconds(params.observedPacketConnectionTimeout),
serverTimeout,
Expand Down
Loading

0 comments on commit f96fe2f

Please sign in to comment.