Skip to content

Commit

Permalink
Implement tuple transformations
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Oct 5, 2024
1 parent 5015220 commit e0b1a7b
Show file tree
Hide file tree
Showing 34 changed files with 1,211 additions and 414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public byte[][] getCopyOfPackets() {

public ByteBuf getResponseAsByteBuf() {
return packets == null ? Unpooled.EMPTY_BUFFER :
ByteBufList.asCompositeByteBufRetained(packets.stream()
ByteBufList.asCompositeByteBuf(packets.stream()
.map(Map.Entry::getValue).map(Unpooled::wrappedBuffer))
.asReadOnly();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,26 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import org.opensearch.migrations.replay.datahandlers.http.HttpJsonMessageWithFaultingPayload;
import org.opensearch.migrations.replay.datahandlers.http.HttpJsonRequestWithFaultingPayload;
import org.opensearch.migrations.replay.datahandlers.http.HttpJsonResponseWithFaultingPayload;
import org.opensearch.migrations.replay.datahandlers.http.NettyDecodedHttpRequestPreliminaryHandler;
import org.opensearch.migrations.replay.datahandlers.http.NettyDecodedHttpResponsePreliminaryHandler;
import org.opensearch.migrations.replay.datahandlers.http.NettyJsonBodyAccumulateHandler;
import org.opensearch.migrations.replay.datatypes.ByteBufList;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.util.NettyUtils;
import org.opensearch.migrations.replay.util.NonNullRefSafeHolder;
import org.opensearch.migrations.replay.util.RefSafeHolder;
import org.opensearch.migrations.replay.util.RefSafeStreamUtils;
import org.opensearch.migrations.transform.JsonKeysForHttpMessage;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.base64.Base64;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.base64.Base64Dialect;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -34,7 +45,6 @@
public class ParsedHttpMessagesAsDicts {
public static final String STATUS_CODE_KEY = "Status-Code";
public static final String RESPONSE_TIME_MS_KEY = "response_time_ms";
public static final int MAX_PAYLOAD_BYTES_TO_CAPTURE = 256 * 1024 * 1024;
public static final String EXCEPTION_KEY_STRING = "Exception";

public final Optional<Map<String, Object>> sourceRequestOp;
Expand Down Expand Up @@ -129,18 +139,12 @@ public static void fillStatusCodeMetrics(
context.setTargetStatus((Integer) targetResponseList.get(targetResponseList.size() - 1).get(STATUS_CODE_KEY));
}
}

private static Map<String, Object> fillMap(
LinkedHashMap<String, Object> map,
HttpHeaders headers,
ByteBuf content
) {
try (var encodedBufHolder = RefSafeHolder.create(Base64.encode(content, false, Base64Dialect.STANDARD))) {
private static String byteBufToBase64String(ByteBuf content) {
try (var encodedBufHolder = RefSafeHolder.create(io.netty.handler.codec.base64.Base64.encode(content.duplicate(),
false, Base64Dialect.STANDARD))) {
var encodedBuf = encodedBufHolder.get();
assert encodedBuf != null : "Base64.encode should not return null";
headers.entries().forEach(kvp -> map.put(kvp.getKey(), kvp.getValue()));
map.put("body", encodedBuf.toString(StandardCharsets.UTF_8));
return map;
return encodedBuf.toString(StandardCharsets.UTF_8);
}
}

Expand Down Expand Up @@ -170,21 +174,43 @@ private static Map<String, Object> convertRequest(
@NonNull List<byte[]> data
) {
return makeSafeMap(context, () -> {
var map = new LinkedHashMap<String, Object>();
try (
var bufStream = NettyUtils.createRefCntNeutralCloseableByteBufStream(data);
var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpRequestFromBufs(bufStream,
MAX_PAYLOAD_BYTES_TO_CAPTURE))
) {
var message = messageHolder.get();
try (var compositeByteBufHolder = NonNullRefSafeHolder.create(RefSafeStreamUtils.refSafeTransform(
data.stream(),
Unpooled::wrappedBuffer,
ByteBufList::asCompositeByteBufRetained
))) {

var channel = new EmbeddedChannel(
// IN ByteBuf, OUT DefaultHttpRequest | LastHttpContent
new HttpRequestDecoder(),
// IN: DefaultHttpRequest | LastHttpContent(Compressed), OUT: DefaultHttpRequest | LastHttpContent(Uncompressed)
new HttpContentDecompressor(),
// IN DefaultHttpRequest, OUT HttpJsonRequestWithFaultingPayload
new NettyDecodedHttpRequestPreliminaryHandler(
context.getLogicalEnclosingScope().createTransformationContext()),
// IN HttpJsonRequestWithFaultingPayload | HttpContent | LastHttpContent
// OUT HttpJsonRequestWithFaultingPayload
new NettyJsonBodyAccumulateHandler(
context.getLogicalEnclosingScope().createTransformationContext())
);

channel.writeInbound(compositeByteBufHolder.get().retainedDuplicate());
HttpJsonRequestWithFaultingPayload message = channel.readInbound();
channel.finishAndReleaseAll();

if (message != null) {
map.put("Request-URI", message.uri());
map.put("Method", message.method().toString());
map.put("HTTP-Version", message.protocolVersion().toString());
context.setMethod(message.method().toString());
context.setEndpoint(message.uri());
context.setHttpVersion(message.protocolVersion().toString());
return fillMap(map, message.headers(), message.content());
var map = new LinkedHashMap<>(message.headers());
map.put("Request-URI", message.path());
map.put("Method", message.method());
map.put("HTTP-Version", message.protocol());
context.setMethod(message.method());
context.setEndpoint(message.path());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
}
return map;
} else {
return Map.of(EXCEPTION_KEY_STRING, "Message couldn't be parsed as a full http message");
}
Expand All @@ -198,23 +224,56 @@ private static Map<String, Object> convertResponse(
Duration latency
) {
return makeSafeMap(context, () -> {
var map = new LinkedHashMap<String, Object>();
try (
var bufStream = NettyUtils.createRefCntNeutralCloseableByteBufStream(data);
var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpResponseFromBufs(bufStream,
MAX_PAYLOAD_BYTES_TO_CAPTURE))
) {
var message = messageHolder.get();
try (var compositeByteBufHolder = NonNullRefSafeHolder.create(
RefSafeStreamUtils.refSafeTransform(data.stream(),
Unpooled::wrappedBuffer,
ByteBufList::asCompositeByteBufRetained
))) {
var channel = new EmbeddedChannel(
// IN ByteBuf, OUT DefaultHttpResponse | LastHttpContent
new HttpResponseDecoder(),
// IN: DefaultHttpRequest | LastHttpContent(Compressed), OUT: DefaultHttpRequest | LastHttpContent(Uncompressed)
new HttpContentDecompressor(),
// IN DefaultHttpResponse, OUT HttpJsonResponseWithFaultingPayload
new NettyDecodedHttpResponsePreliminaryHandler(
context.getLogicalEnclosingScope().createTransformationContext()),
// IN HttpJsonResponseWithFaultingPayload | HttpContent | LastHttpContent
// OUT HttpJsonResponseWithFaultingPayload
new NettyJsonBodyAccumulateHandler(
context.getLogicalEnclosingScope().createTransformationContext())
);

channel.writeInbound(compositeByteBufHolder.get().retainedDuplicate());
HttpJsonResponseWithFaultingPayload message = channel.readInbound();
channel.finishAndReleaseAll();

if (message != null) {
map.put("HTTP-Version", message.protocolVersion());
map.put(STATUS_CODE_KEY, message.status().code());
map.put("Reason-Phrase", message.status().reasonPhrase());
var map = new LinkedHashMap<>(message.headers());
context.setHttpVersion(message.protocol());
map.put("HTTP-Version", message.protocol());
map.put(STATUS_CODE_KEY, Integer.parseInt(message.code()));
map.put("Reason-Phrase", message.reason());
map.put(RESPONSE_TIME_MS_KEY, latency.toMillis());
return fillMap(map, message.headers(), message.content());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
}
return map;
} else {
return Map.of(EXCEPTION_KEY_STRING, "Message couldn't be parsed as a full http message");
}
}
});
}

private static void encodeBinaryPayloadIfExists(HttpJsonMessageWithFaultingPayload message) {
if (message.payload() != null) {
if (message.payload().containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)) {
var byteBufBinaryBody = (ByteBuf) message.payload().get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY);
message.payload().put(JsonKeysForHttpMessage.INLINED_BASE64_BODY_DOCUMENT_KEY, byteBufToBase64String(byteBufBinaryBody));
message.payload().remove(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY);
byteBufBinaryBody.release();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey;
import org.opensearch.migrations.transform.IJsonTransformer;

import io.netty.buffer.ByteBuf;
import lombok.Lombok;
Expand All @@ -27,26 +28,27 @@ public class ResultsToLogsConsumer implements BiConsumer<SourceTargetCaptureTupl
public static final String TRANSACTION_SUMMARY_LOGGER = "TransactionSummaryLogger";
private static final String MISSING_STR = "-";
private static final ObjectMapper PLAIN_MAPPER = new ObjectMapper();
private static final IJsonTransformer NOOP_JSON_TRANSFORMER = new TransformationLoader().getTransformerFactoryLoader(
null, null, "NoopTransformerProvider");

private final Logger tupleLogger;
private final Logger progressLogger;
private final AtomicInteger tupleCounter;
private final IJsonTransformer tupleTransformer;

public ResultsToLogsConsumer() {
this(null, null);
}
private final AtomicInteger tupleCounter;

public ResultsToLogsConsumer(Logger tupleLogger, Logger progressLogger) {
public ResultsToLogsConsumer(Logger tupleLogger, Logger progressLogger, IJsonTransformer tupleTransformer) {
this.tupleLogger = tupleLogger != null ? tupleLogger : LoggerFactory.getLogger(OUTPUT_TUPLE_JSON_LOGGER);
this.progressLogger = progressLogger != null ? progressLogger : makeTransactionSummaryLogger();
tupleCounter = new AtomicInteger();
this.tupleTransformer = tupleTransformer != null ? tupleTransformer : NOOP_JSON_TRANSFORMER ;
}

// set this up so that the preamble prints out once, right after we have a logger
// if it's configured to output at all
private static Logger makeTransactionSummaryLogger() {
var logger = LoggerFactory.getLogger(TRANSACTION_SUMMARY_LOGGER);
logger.atInfo().setMessage("{}").addArgument(() -> getTransactionSummaryStringPreamble()).log();
logger.atInfo().setMessage("{}").addArgument(ResultsToLogsConsumer::getTransactionSummaryStringPreamble).log();
return logger;
}

Expand All @@ -73,43 +75,64 @@ private Map<String, Object> toJSONObject(SourceTargetCaptureTuple tuple, ParsedH
/**
* Writes a tuple object to an output stream as a JSON object.
* The JSON tuple is output on one line, and has several objects: "sourceRequest", "sourceResponse",
* "targetRequest", and "targetResponse". The "connectionId" is also included to aid in debugging.
* "targetRequest", and "targetResponses". The "connectionId", "numRequests", and "numErrors" are also included to aid in debugging.
* An example of the format is below.
* <p>
* {
* "sourceRequest": {
* "Request-URI": XYZ,
* "Method": XYZ,
* "HTTP-Version": XYZ
* "body": XYZ,
* "header-1": XYZ,
* "header-2": XYZ
* },
* "targetRequest": {
* "Request-URI": XYZ,
* "Method": XYZ,
* "HTTP-Version": XYZ
* "body": XYZ,
* "header-1": XYZ,
* "header-2": XYZ
* },
* "sourceResponse": {
* "HTTP-Version": ABC,
* "Status-Code": ABC,
* "Reason-Phrase": ABC,
* "response_time_ms": 123,
* "body": ABC,
* "header-1": ABC
* },
* "targetResponse": {
* "HTTP-Version": ABC,
* "Status-Code": ABC,
* "Reason-Phrase": ABC,
* "response_time_ms": 123,
* "body": ABC,
* "header-2": ABC
* },
* "connectionId": "0242acfffe1d0008-0000000c-00000003-0745a19f7c3c5fc9-121001ff.0"
* "sourceRequest": {
* "Request-URI": "/api/v1/resource",
* "Method": "POST",
* "HTTP-Version": "HTTP/1.1",
* "header-1": "Content-Type: application/json",
* "header-2": "Authorization: Bearer token",
* "payload": {
* "inlinedJsonBody": {
* "key1": "value1",
* "key2": "value2"
* }
* }
* },
* "targetRequest": {
* "Request-URI": "/api/v1/target",
* "Method": "GET",
* "HTTP-Version": "HTTP/1.1",
* "header-1": "Accept: application/json",
* "header-2": "Authorization: Bearer token",
* "payload": {
* "inlinedBinaryBody": "0101010101"
* }
* },
* "sourceResponse": {
* "response_time_ms": 150,
* "HTTP-Version": "HTTP/1.1",
* "Status-Code": 200,
* "Reason-Phrase": "OK",
* "header-1": "Content-Type: application/json",
* "header-2": "Cache-Control: no-cache",
* "payload": {
* "inlinedJsonBody": {
* "responseKey1": "responseValue1",
* "responseKey2": "responseValue2"
* }
* }
* },
* "targetResponses": [{
* "response_time_ms": 100,
* "HTTP-Version": "HTTP/1.1",
* "Status-Code": 201,
* "Reason-Phrase": "Created",
* "header-1": "Content-Type: application/json",
* "payload": {
* "inlinedJsonSequenceBodies": [
* {"sequenceKey1": "sequenceValue1"},
* {"sequenceKey2": "sequenceValue2"}
* ]
* }
* }],
* "connectionId": "conn-12345",
* "numRequests": 5,
* "numErrors": 1,
* "error": "TimeoutException: Request timed out"
* }
*
* @param tuple the RequestResponseResponseTriple object to be converted into json and written to the stream.
Expand All @@ -122,7 +145,9 @@ public void accept(SourceTargetCaptureTuple tuple, ParsedHttpMessagesAsDicts par
.log();
if (tupleLogger.isInfoEnabled()) {
try {
var tupleString = PLAIN_MAPPER.writeValueAsString(toJSONObject(tuple, parsedMessages));
var originalTuple = toJSONObject(tuple, parsedMessages);
var transformedTuple = tupleTransformer.transformJson(originalTuple);
var tupleString = PLAIN_MAPPER.writeValueAsString(transformedTuple);
tupleLogger.atInfo().setMessage("{}").addArgument(() -> tupleString).log();
} catch (Exception e) {
log.atError().setMessage("Exception converting tuple to string").setCause(e).log();
Expand Down
Loading

0 comments on commit e0b1a7b

Please sign in to comment.