Skip to content

Commit

Permalink
Add the ability to transform a DiagnosticTrackableCompletableFuture i…
Browse files Browse the repository at this point in the history
…nto a string with a specified "value formatter"

Use that formatAsString() method to format render the type of AggregateTransfromedResponse that was produced by the CompletableFuture.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Jun 28, 2023
1 parent e15b1c2 commit 8c2e006
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ private void runReplayWithIOStreams(Duration observedPacketConnectionTimeout,
AtomicInteger successCount = new AtomicInteger();
AtomicInteger exceptionCount = new AtomicInteger();
var tupleWriter = new SourceTargetCaptureTuple.TupleToFileWriter(bufferedOutputStream);
ConcurrentHashMap<HttpMessageAndTimestamp, DiagnosticTrackableCompletableFuture<String,? extends AggregatedRawResponse>> requestFutureMap =
ConcurrentHashMap<HttpMessageAndTimestamp, DiagnosticTrackableCompletableFuture<String,AggregatedTransformedResponse>> requestFutureMap =
new ConcurrentHashMap<>();
ConcurrentHashMap<HttpMessageAndTimestamp, DiagnosticTrackableCompletableFuture<String,? extends AggregatedRawResponse>>
ConcurrentHashMap<HttpMessageAndTimestamp, DiagnosticTrackableCompletableFuture<String,AggregatedTransformedResponse>>
requestToFinalWorkFuturesMap = new ConcurrentHashMap<>();
CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator =
new CapturedTrafficToHttpTransactionAccumulator(observedPacketConnectionTimeout,
Expand All @@ -180,25 +180,31 @@ private void runReplayWithIOStreams(Duration observedPacketConnectionTimeout,
if (log.isTraceEnabled()) {
log.trace("Done receiving captured stream for this "+rrPair.requestData);
}
DiagnosticTrackableCompletableFuture<String,AggregatedRawResponse> resultantCf =
DiagnosticTrackableCompletableFuture<String,AggregatedTransformedResponse> resultantCf =
requestFutureMap.get(rrPair.requestData)
.map(f->
f.handle((summary, t) -> {
try {
AggregatedRawResponse rval =
AggregatedTransformedResponse rval =
packageAndWriteResponse(tupleWriter, rrPair, summary, t);
successCount.incrementAndGet();
return rval;
} catch (Exception e) {
exceptionCount.incrementAndGet();
throw e;
} finally {
log.error("removing rrPair.requestData for " +
rrPair.connectionId);
requestToFinalWorkFuturesMap.remove(rrPair.requestData);
log.error("removed rrPair.requestData to requestToFinalWorkFuturesMap for " +
rrPair.connectionId);
}
}), ()->"TrafficReplayer.runReplayWithIOStreams.progressTracker");
requestToFinalWorkFuturesMap.put(rrPair.requestData, resultantCf);
if (!resultantCf.future.isDone()) {
log.error("Adding " + rrPair.connectionId + " to requestToFinalWorkFuturesMap");
requestToFinalWorkFuturesMap.put(rrPair.requestData, resultantCf);
if (resultantCf.future.isDone()) {
requestToFinalWorkFuturesMap.remove(rrPair.requestData);
}
}
}
);
try {
Expand Down Expand Up @@ -244,37 +250,55 @@ private void runReplayWithIOStreams(Duration observedPacketConnectionTimeout,
private void
waitForRemainingWork(Level logLevel, @NonNull Duration timeout,
@NonNull ConcurrentHashMap<HttpMessageAndTimestamp,
DiagnosticTrackableCompletableFuture<String, ? extends AggregatedRawResponse>>
DiagnosticTrackableCompletableFuture<String, AggregatedTransformedResponse>>
requestToFinalWorkFuturesMap)
throws ExecutionException, InterruptedException, TimeoutException {
var allRemainingWorkArray =
(DiagnosticTrackableCompletableFuture<String, AggregatedTransformedResponse>[])
requestToFinalWorkFuturesMap.values().toArray(DiagnosticTrackableCompletableFuture[]::new);
log.atLevel(logLevel).log("All remaining work to wait on " + allRemainingWorkArray.length + " items: " +
Arrays.stream(allRemainingWorkArray).map(cf->cf.toString()).collect(Collectors.joining("\n")));

Arrays.stream(allRemainingWorkArray)
.map(dcf->dcf.formatAsString(TrafficReplayer::formatWorkItem))
.collect(Collectors.joining("\n")));

// remember, this block is ONLY for the leftover items. Lots of other items have been processed
// and were removed from the live map (hopefully)
var allWorkFuture =
StringTrackableCompletableFuture.allOf(allRemainingWorkArray, ()->"TrafficReplayer.AllWorkFinished");
var allWorkFuture = StringTrackableCompletableFuture.allOf(allRemainingWorkArray, () -> "TrafficReplayer.AllWorkFinished");
try {
allWorkFuture.composeHandleApplication((t, v) -> {
log.info("stopping packetHandlerFactory's group");
packetHandlerFactory.stopGroup();
// squash exceptions for individual requests
return StringTrackableCompletableFuture.completedFuture(null, () -> "finished all work");
}, () -> "TrafficReplayer.PacketHandlerFactory->stopGroup")
.get(timeout);
allWorkFuture.get(timeout);
} catch (TimeoutException e) {
allWorkFuture.future.cancel(true);
var didCancel = allWorkFuture.future.cancel(true);
if (!didCancel) {
assert allWorkFuture.future.isDone() : "expected future to have finished if cancel didn't succeed";
// continue with the rest of the function
} else {
throw e;
}
}
allWorkFuture.composeHandleApplication((t, v) -> {
log.info("stopping packetHandlerFactory's group");
packetHandlerFactory.stopGroup();
// squash exceptions for individual requests
return StringTrackableCompletableFuture.completedFuture(null, () -> "finished all work");
}, () -> "TrafficReplayer.PacketHandlerFactory->stopGroup");
}

private static String formatWorkItem(CompletableFuture<?> cf) {
try {
var resultValue = cf.get();
if (resultValue instanceof AggregatedTransformedResponse) {
return "" + ((AggregatedTransformedResponse) resultValue).getTransformationStatus();
}
return null;
} catch (ExecutionException | InterruptedException e) {
return e.getMessage();
}
}

private static AggregatedRawResponse
private static AggregatedTransformedResponse
packageAndWriteResponse(SourceTargetCaptureTuple.TupleToFileWriter tripleWriter,
RequestResponsePacketPair rrPair,
AggregatedRawResponse summary,
AggregatedTransformedResponse summary,
Throwable t) {
log.trace("done sending and finalizing data to the packet handler");
SourceTargetCaptureTuple requestResponseTriple;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.replay.util;

import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
Expand All @@ -21,6 +22,7 @@

@Slf4j
public class DiagnosticTrackableCompletableFuture<D, T> {

public final CompletableFuture<T> future;
private final RecursiveImmutableChain<AbstractMap.SimpleEntry<CompletableFuture,Supplier<D>>> diagnosticSupplierChain;

Expand Down Expand Up @@ -56,27 +58,27 @@ private DiagnosticTrackableCompletableFuture(
}

public DiagnosticTrackableCompletableFuture(@NonNull CompletableFuture<T> future, Supplier<D> diagnosticSupplier) {
this(future, new RecursiveImmutableChain(makePair(future, diagnosticSupplier), null));
this(future, new RecursiveImmutableChain(makeDiagnosticPair(future, diagnosticSupplier), null));
}

private static <T,D> AbstractMap.SimpleEntry<CompletableFuture,Supplier<D>>
makePair(CompletableFuture<T> future, Supplier<D> diagnosticSupplier) {
makeDiagnosticPair(CompletableFuture<T> future, Supplier<D> diagnosticSupplier) {
return new AbstractMap.SimpleEntry(future, diagnosticSupplier);
}

public <U> DiagnosticTrackableCompletableFuture<D, U>
map(Function<CompletableFuture<T>, CompletableFuture<U>> fn, Supplier<D> diagnosticSupplier) {
var newCf = fn.apply(future);
return new DiagnosticTrackableCompletableFuture<>(newCf,
this.diagnosticSupplierChain.chain(makePair(newCf, diagnosticSupplier)));
this.diagnosticSupplierChain.chain(makeDiagnosticPair(newCf, diagnosticSupplier)));
}

public <U> DiagnosticTrackableCompletableFuture<D, U>
thenCompose(Function<? super T, ? extends DiagnosticTrackableCompletableFuture<D, U>> fn,
Supplier<D> diagnosticSupplier) {
var newCf = this.future.thenCompose(v->fn.apply(v).future);
return new DiagnosticTrackableCompletableFuture<>(newCf,
this.diagnosticSupplierChain.chain(makePair(newCf, diagnosticSupplier)));
this.diagnosticSupplierChain.chain(makeDiagnosticPair(newCf, diagnosticSupplier)));
}

/**
Expand All @@ -102,7 +104,7 @@ public DiagnosticTrackableCompletableFuture(@NonNull CompletableFuture<T> future
return wcf.future;
});
return new DiagnosticTrackableCompletableFuture<>(newCf,
this.diagnosticSupplierChain.chain(makePair(newCf, diagnosticSupplier)));
this.diagnosticSupplierChain.chain(makeDiagnosticPair(newCf, diagnosticSupplier)));
}

public T get() throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -130,12 +132,25 @@ public Stream<AbstractMap.SimpleEntry<CompletableFuture,Supplier<D>>> diagnostic

@Override
public String toString() {
var strList = diagnosticStream().map(kvp->formatDiagnostics(kvp)).collect(Collectors.toList());
return formatAsString(x->null);
}

public String formatAsString(Function<CompletableFuture,String> resultFormatter) {
var strList = diagnosticStream().map(kvp->formatDiagnostics(kvp, resultFormatter)).collect(Collectors.toList());
Collections.reverse(strList);
return strList.stream().collect(Collectors.joining("->"));
}

protected String formatDiagnostics(AbstractMap.SimpleEntry<CompletableFuture, Supplier<D>> kvp) {
return "" + kvp.getValue().get() + (kvp.getKey().isDone() ? "[^]" : "[…]");
@SneakyThrows
protected String formatDiagnostics(AbstractMap.SimpleEntry<CompletableFuture, Supplier<D>> kvp,
Function<CompletableFuture,String> resultFormatter) {
var diagnosticInfo = kvp.getValue().get();
return "" + diagnosticInfo +
(kvp.getKey().isDone() ? formatWithDefault(resultFormatter, kvp.getKey()) : "[…]");
}

private static String formatWithDefault(Function<CompletableFuture,String> formatter, CompletableFuture cf) {
var str = formatter.apply(cf);
return "[" + (str == null ? "^" : str) + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public static <U> StringTrackableCompletableFuture<U> completedFuture(U v, Suppl
return new StringTrackableCompletableFuture<>(CompletableFuture.completedFuture(v), diagnosticSupplier);
}

public static <D> StringTrackableCompletableFuture<Void>
allOf(DiagnosticTrackableCompletableFuture<D,Void>[] allRemainingWorkArray, Supplier<String> diagnosticSupplier) {
public static <U> StringTrackableCompletableFuture<Void>
allOf(DiagnosticTrackableCompletableFuture<String,? extends U>[] allRemainingWorkArray, Supplier<String> diagnosticSupplier) {
return new StringTrackableCompletableFuture<>(
CompletableFuture.allOf(Arrays.stream(allRemainingWorkArray)
.map(tcf->tcf.future).toArray(CompletableFuture[]::new)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

class StringTrackableCompletableFutureTest {
Expand Down Expand Up @@ -50,14 +51,26 @@ public void futureWithThreeStages() throws Exception {
notifyAndCheckNewDiagnosticValue(stcf1, notifier1, "A[^]");
Assertions.assertEquals("A[^]->B[…]", stcf2.toString());
Assertions.assertEquals("A[^]->B[…]->C[…]", stcf3.toString());
Assertions.assertEquals("A[1]->B[…]->C[…]",
stcf3.formatAsString(StringTrackableCompletableFutureTest::formatCompletableFuture));
notifyAndCheckNewDiagnosticValue(stcf2, notifier2, "A[^]->B[^]");
Assertions.assertEquals("A[^]", stcf1.toString());
Assertions.assertEquals("A[^]->B[^]->C[…]", stcf3.toString());
Assertions.assertEquals("A[1]->B[11]->C[…]",
stcf3.formatAsString(StringTrackableCompletableFutureTest::formatCompletableFuture));
notifyAndCheckNewDiagnosticValue(stcf3, notifier3, "A[^]->B[^]->C[^]");
Assertions.assertEquals("A[^]", stcf1.toString());
Assertions.assertEquals("A[^]->B[^]", stcf2.toString());
}

public static String formatCompletableFuture(CompletableFuture<?> cf) {
try {
return "" + cf.get();
} catch (ExecutionException | InterruptedException e) {
return "EXCEPTION";
}
}

private void notifyAndCheckNewDiagnosticValue(DiagnosticTrackableCompletableFuture<String, Integer> stcf,
CompletableFuture lockObject, String expectedValue) throws Exception {
notify(lockObject);
Expand Down

0 comments on commit 8c2e006

Please sign in to comment.