diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/anomaly/client/AnomalyDetector.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/anomaly/client/AnomalyDetector.java index d3dfeac..88e500b 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/anomaly/client/AnomalyDetector.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/anomaly/client/AnomalyDetector.java @@ -11,9 +11,18 @@ public class AnomalyDetector implements AnomalyDetectionClient { @Override public AnomalyTraceResult detect(List traceSpans) { + + List anomalySpans = + traceSpans.stream().filter(span -> span.getTiming().duration() > 1000).toList(); + + boolean isAnomaly = !anomalySpans.isEmpty(); + return new AnomalyTraceResult( - true, - "1206887328-a7863a66-528e-4f37-b805-04e1314fb924", - traceSpans.stream().map(span -> span.getId().getValue()).toList()); + isAnomaly, + traceSpans.stream() + .map(Span::getProjectKey) + .findAny() + .orElse("1206887328-a7863a66-528e-4f37-b805-04e1314fb924"), + anomalySpans.stream().map(span -> span.getId().getValue()).toList()); } } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/anomaly/messaging/streams/AnomalyTraceStreamProcessor.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/anomaly/messaging/streams/AnomalyTraceStreamProcessor.java index aeded32..14368a0 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/anomaly/messaging/streams/AnomalyTraceStreamProcessor.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/anomaly/messaging/streams/AnomalyTraceStreamProcessor.java @@ -42,7 +42,6 @@ void process(KStream stream) { JsonSerde> listOfSpanSerde = new JsonSerde<>(new TypeReference<>() {}); // Stream processor - SpanCollectedEvent를 받아서 TraceId로 그루핑한뒤 각 TraceId에 대한 Span들을 List로 묶어서 - // KTable로 만듦 KStream spanStream = stream.flatMap( (key, value) -> { @@ -57,6 +56,7 @@ void process(KStream stream) { TimeWindows timeWindows = TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(30)); + // KTable로 만듦 KTable, List> traceTable = spanStream .groupByKey()