diff --git a/.gitignore b/.gitignore index c2065bc..1ff5045 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ build/ !**/src/main/**/build/ !**/src/test/**/build/ +/kafka + ### STS ### .apt_generated .classpath diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d8b979a --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +start-kafka: + docker compose -f kafka/common.yml -f kafka/zookeeper.yml -f kafka/kafka_cluster.yml up -d + +stop-kafka: + docker compose -f kafka/common.yml -f kafka/zookeeper.yml -f kafka/kafka_cluster.yml down \ No newline at end of file diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/global/aop/LogAspect.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/global/aop/LogAspect.java index cc1cb72..b7b8474 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/global/aop/LogAspect.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/global/aop/LogAspect.java @@ -28,7 +28,7 @@ public class LogAspect { @Pointcut( - "execution(* com.univ.tracedin.domain..*(..)) && execution(* com.univ.tracedin.infra..*(..))&& !execution(* com.univ.tracedin.common..*(..))") + "execution(* com.univ.tracedin.domain..*(..)) || execution(* com.univ.tracedin.infra..*(..)) && !execution(* com.univ.tracedin.common..*(..))") public void all() {} @Pointcut( diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/metric/ServiceMetricsApi.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/metric/ServiceMetricsApi.java index 456a7fa..56bd437 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/metric/ServiceMetricsApi.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/metric/ServiceMetricsApi.java @@ -14,7 +14,7 @@ import com.univ.tracedin.api.metric.dto.AppendServiceMetricsRequest; import com.univ.tracedin.api.metric.dto.HttpRequestCountResponse; import com.univ.tracedin.domain.metric.ServiceMetricsService; -import com.univ.tracedin.domain.project.ServiceNode; +import com.univ.tracedin.domain.project.Node; @RestController @RequestMapping("/api/v1/metrics") @@ -29,9 +29,9 @@ public void appendMetrics(@RequestBody AppendServiceMetricsRequest requests) { } @GetMapping("/http-request-count") - public Response> getHttpRequestCount(ServiceNode serviceNode) { + public Response> getHttpRequestCount(Node node) { List responses = - serviceMetricService.getHttpRequestCount(serviceNode).stream() + serviceMetricService.getHttpRequestCount(node).stream() .map(HttpRequestCountResponse::from) .toList(); return Response.success(responses); diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/metric/ServiceMetricsApiDocs.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/metric/ServiceMetricsApiDocs.java index 5b078a5..26bcb3c 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/metric/ServiceMetricsApiDocs.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/metric/ServiceMetricsApiDocs.java @@ -5,7 +5,7 @@ import com.univ.tracedin.api.global.dto.Response; import com.univ.tracedin.api.metric.dto.AppendServiceMetricsRequest; import com.univ.tracedin.api.metric.dto.HttpRequestCountResponse; -import com.univ.tracedin.domain.project.ServiceNode; +import com.univ.tracedin.domain.project.Node; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -19,5 +19,5 @@ public interface ServiceMetricsApiDocs { @Operation( summary = "5시간 이내의 10분 별로 HTTP 요청 횟수 조회", description = "5시간 이내의 10분 별로 HTTP 요청 횟수 조회, 개발 기간에는 5시간 이내 조건 없음") - Response> getHttpRequestCount(ServiceNode serviceNode); + Response> getHttpRequestCount(Node node); } diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApi.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApi.java index e26a177..00a3d87 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApi.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApi.java @@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import lombok.RequiredArgsConstructor; @@ -17,9 +18,9 @@ import com.univ.tracedin.domain.auth.UserPrincipal; import com.univ.tracedin.domain.project.EndTimeBucket; import com.univ.tracedin.domain.project.NetworkTopology; +import com.univ.tracedin.domain.project.Node; import com.univ.tracedin.domain.project.ProjectKey; import com.univ.tracedin.domain.project.ProjectService; -import com.univ.tracedin.domain.project.ServiceNode; @RestController @RequiredArgsConstructor @@ -37,7 +38,7 @@ public Response createProject( } @GetMapping("/{projectKey}/service-nodes") - public Response> serviceNodes(@PathVariable String projectKey) { + public Response> serviceNodes(@PathVariable String projectKey) { return Response.success(projectService.getServiceNodeList(projectKey)); } @@ -47,7 +48,8 @@ public Response networkTopology(@PathVariable String projectKey } @GetMapping("/{projectKey}/hit-map") - public Response> hitMap(@PathVariable String projectKey) { - return Response.success(projectService.getTraceHitMap(projectKey)); + public Response> hitMap( + @PathVariable String projectKey, @RequestParam(required = false) String serviceName) { + return Response.success(projectService.getTraceHitMap(projectKey, serviceName)); } } diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApiDocs.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApiDocs.java index 7a39406..3379439 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApiDocs.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApiDocs.java @@ -7,8 +7,8 @@ import com.univ.tracedin.domain.auth.UserPrincipal; import com.univ.tracedin.domain.project.EndTimeBucket; import com.univ.tracedin.domain.project.NetworkTopology; +import com.univ.tracedin.domain.project.Node; import com.univ.tracedin.domain.project.ProjectKey; -import com.univ.tracedin.domain.project.ServiceNode; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -20,11 +20,11 @@ public interface ProjectApiDocs { Response createProject(CreateProjectRequest request, UserPrincipal currentUser); @Operation(summary = "서비스 리스트 조회", description = "프로젝트의 서비스 노드 리스트를 조회합니다.") - Response> serviceNodes(String projectKey); + Response> serviceNodes(String projectKey); @Operation(summary = "네트워크 토폴로지 조회", description = "프로젝트의 네트워크 토폴로지를 조회합니다.") Response networkTopology(String projectKey); - @Operation(summary = "히트맵 조회", description = "프로젝트의 히트맵을 조회합니다.(1시간별 트레이스의 응답시간 분포)") - Response> hitMap(String projectKey); + @Operation(summary = "히트맵 조회", description = "프로젝트의 히트맵을 조회합니다.(5분 별 트레이스의 응답시간 분포)") + Response> hitMap(String projectKey, String serviceName); } diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/SpanApi.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/SpanApi.java index dee9e54..76dd0d8 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/SpanApi.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/SpanApi.java @@ -13,7 +13,7 @@ import com.univ.tracedin.api.global.dto.Response; import com.univ.tracedin.api.span.dto.AppendSpanRequest; -import com.univ.tracedin.api.span.dto.ReadSpanRequest; +import com.univ.tracedin.api.span.dto.ReadTraceRequest; import com.univ.tracedin.api.span.dto.SpanTreeResponse; import com.univ.tracedin.api.span.dto.TraceResponse; import com.univ.tracedin.common.dto.SearchCursor; @@ -32,14 +32,14 @@ public class SpanApi implements SpanApiDocs { @PostMapping public void appendSpan(@RequestBody List request) { log.info("appendSpan request: {}", request.toString()); - spanService.appendSpan(request.stream().map(AppendSpanRequest::toSpan).toList()); + spanService.publishSpans(request.stream().map(AppendSpanRequest::toSpan).toList()); } @GetMapping("/traces") - public Response> getTraces( - ReadSpanRequest request, SearchCursor cursor) { + public Response> searchTraces( + ReadTraceRequest request, SearchCursor cursor) { SearchResult responses = - spanService.getTraces(request.toServiceNode(), cursor).map(TraceResponse::from); + spanService.getTraces(request.toSearchCond(), cursor).map(TraceResponse::from); return Response.success(responses); } diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/SpanApiDocs.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/SpanApiDocs.java index 63181ad..c3ff84a 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/SpanApiDocs.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/SpanApiDocs.java @@ -4,7 +4,7 @@ import com.univ.tracedin.api.global.dto.Response; import com.univ.tracedin.api.span.dto.AppendSpanRequest; -import com.univ.tracedin.api.span.dto.ReadSpanRequest; +import com.univ.tracedin.api.span.dto.ReadTraceRequest; import com.univ.tracedin.api.span.dto.SpanTreeResponse; import com.univ.tracedin.api.span.dto.TraceResponse; import com.univ.tracedin.common.dto.SearchCursor; @@ -20,7 +20,8 @@ public interface SpanApiDocs { void appendSpan(List request); @Operation(summary = "트레이스(트랜잭션) 조회 API", description = "프로젝트의 특정 서비스의 트레이스(트랜잭션)를 조회합니다.") - Response> getTraces(ReadSpanRequest request, SearchCursor cursor); + Response> searchTraces( + ReadTraceRequest request, SearchCursor cursor); @Operation( summary = "트레이스(트랜잭션) 내 스팬트리 조회 API", diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/AppendSpanRequest.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/AppendSpanRequest.java index 74e12d4..72637c9 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/AppendSpanRequest.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/AppendSpanRequest.java @@ -68,7 +68,7 @@ public Span toSpan() { } private SpanType getSpanType() { - SpanType type = spanType == null ? null : SpanType.fromValue(spanType); + SpanType type = spanType == null ? SpanType.UNKNOWN : SpanType.fromValue(spanType); if (attributes.data().containsKey("db.operation")) { type = SpanType.QUERY; } diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/ReadSpanRequest.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/ReadSpanRequest.java deleted file mode 100644 index 9da0ed9..0000000 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/ReadSpanRequest.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.univ.tracedin.api.span.dto; - -import com.univ.tracedin.domain.project.ServiceNode; - -public record ReadSpanRequest(String projectKey, String serviceName) { - - public ServiceNode toServiceNode() { - return ServiceNode.of(projectKey(), serviceName()); - } -} diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/ReadTraceRequest.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/ReadTraceRequest.java new file mode 100644 index 0000000..b3191da --- /dev/null +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/span/dto/ReadTraceRequest.java @@ -0,0 +1,16 @@ +package com.univ.tracedin.api.span.dto; + +import java.time.LocalDateTime; + +import com.univ.tracedin.domain.project.Node; +import com.univ.tracedin.domain.project.NodeType; +import com.univ.tracedin.domain.span.TraceSearchCond; + +public record ReadTraceRequest( + String projectKey, String serviceName, LocalDateTime startTime, LocalDateTime endTime) { + + public TraceSearchCond toSearchCond() { + return new TraceSearchCond( + Node.of(projectKey, serviceName, NodeType.SERVICE), startTime, endTime); + } +} diff --git a/tracedin-application/app-api/src/main/resources/application.yml b/tracedin-application/app-api/src/main/resources/application.yml index fed0793..5493aca 100644 --- a/tracedin-application/app-api/src/main/resources/application.yml +++ b/tracedin-application/app-api/src/main/resources/application.yml @@ -25,7 +25,7 @@ jwt: access-token: expiration: 25920000 server: - port: 8989 + port: 8089 --- spring: config: diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/global/BaseId.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/global/BaseId.java index 1d309b0..a14912c 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/global/BaseId.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/global/BaseId.java @@ -1,10 +1,11 @@ package com.univ.tracedin.domain.global; +import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @Getter -@NoArgsConstructor(access = lombok.AccessLevel.PROTECTED) +@NoArgsConstructor(access = AccessLevel.PROTECTED) public abstract class BaseId { private T value; diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/Metric.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/Metric.java index 438240d..a54f335 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/Metric.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/Metric.java @@ -7,10 +7,12 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; +import lombok.NoArgsConstructor; @Getter @Builder @AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor(access = AccessLevel.PROTECTED) public class Metric { private String name; diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetrics.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetrics.java index 6329c6f..01f6df3 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetrics.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetrics.java @@ -6,10 +6,12 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; +import lombok.NoArgsConstructor; @Getter @Builder @AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor(access = AccessLevel.PROTECTED) public class ServiceMetrics { private String projectKey; diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsAppender.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsAppender.java index bbbe840..3bf20a4 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsAppender.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsAppender.java @@ -1,5 +1,7 @@ package com.univ.tracedin.domain.metric; +import java.util.List; + import org.springframework.stereotype.Component; import lombok.RequiredArgsConstructor; @@ -10,7 +12,7 @@ public class ServiceMetricsAppender { private final ServiceMetricsRepository serviceMetricsRepository; - public void append(ServiceMetrics metrics) { - serviceMetricsRepository.save(metrics); + public void appendAll(List metrics) { + serviceMetricsRepository.saveAll(metrics); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsCollectedEvent.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsCollectedEvent.java new file mode 100644 index 0000000..553edef --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsCollectedEvent.java @@ -0,0 +1,14 @@ +package com.univ.tracedin.domain.metric; + +import java.io.Serializable; + +public record ServiceMetricsCollectedEvent(ServiceMetrics serviceMetrics) implements Serializable { + + public static ServiceMetricsCollectedEvent from(ServiceMetrics metrics) { + return new ServiceMetricsCollectedEvent(metrics); + } + + public String getKey() { + return serviceMetrics.getProjectKey() + "-" + serviceMetrics.getServiceName(); + } +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsCollectedMessagePublisher.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsCollectedMessagePublisher.java new file mode 100644 index 0000000..eab5e0d --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsCollectedMessagePublisher.java @@ -0,0 +1,6 @@ +package com.univ.tracedin.domain.metric; + +public interface ServiceMetricsCollectedMessagePublisher { + + void publish(ServiceMetricsCollectedEvent serviceMetricsCollectedEvent); +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsReader.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsReader.java index 38e2eff..b0b66d9 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsReader.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsReader.java @@ -6,7 +6,7 @@ import lombok.RequiredArgsConstructor; -import com.univ.tracedin.domain.project.ServiceNode; +import com.univ.tracedin.domain.project.Node; @Component @RequiredArgsConstructor @@ -14,7 +14,7 @@ public class ServiceMetricsReader { private final ServiceMetricsRepository serviceMetricsRepository; - public List readHttpRequestCount(ServiceNode serviceNode) { - return serviceMetricsRepository.getHttpRequestCount(serviceNode); + public List readHttpRequestCount(Node node) { + return serviceMetricsRepository.getHttpRequestCount(node); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsRepository.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsRepository.java index ffb7c19..b0732cb 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsRepository.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsRepository.java @@ -2,11 +2,11 @@ import java.util.List; -import com.univ.tracedin.domain.project.ServiceNode; +import com.univ.tracedin.domain.project.Node; public interface ServiceMetricsRepository { - void save(ServiceMetrics metrics); + List getHttpRequestCount(Node node); - List getHttpRequestCount(ServiceNode serviceNode); + void saveAll(List metrics); } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsService.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsService.java index 0e22f9e..e2406f0 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsService.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsService.java @@ -6,20 +6,20 @@ import lombok.RequiredArgsConstructor; -import com.univ.tracedin.domain.project.ServiceNode; +import com.univ.tracedin.domain.project.Node; @Service @RequiredArgsConstructor public class ServiceMetricsService { - private final ServiceMetricsAppender serviceMetricAppender; private final ServiceMetricsReader serviceMetricReader; + private final ServiceMetricsCollectedMessagePublisher serviceMetricsCollectedMessagePublisher; public void appendMetrics(ServiceMetrics metrics) { - serviceMetricAppender.append(metrics); + serviceMetricsCollectedMessagePublisher.publish(ServiceMetricsCollectedEvent.from(metrics)); } - public List getHttpRequestCount(ServiceNode serviceNode) { - return serviceMetricReader.readHttpRequestCount(serviceNode); + public List getHttpRequestCount(Node node) { + return serviceMetricReader.readHttpRequestCount(node); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/HitMapReader.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/HitMapReader.java index 82740a6..1a03c12 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/HitMapReader.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/HitMapReader.java @@ -4,5 +4,5 @@ public interface HitMapReader { - List read(String projectKey); + List read(String projectKey, String serviceName); } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopology.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopology.java index 91fb7c3..0d149a7 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopology.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopology.java @@ -10,10 +10,10 @@ @AllArgsConstructor(access = AccessLevel.PRIVATE) public class NetworkTopology { - private final List serviceNodes; + private final List nodes; private final List edges; - public static NetworkTopology of(List serviceNodes, List edges) { - return new NetworkTopology(serviceNodes, edges); + public static NetworkTopology of(List nodes, List edges) { + return new NetworkTopology(nodes, edges); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/Node.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/Node.java new file mode 100644 index 0000000..5b20087 --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/Node.java @@ -0,0 +1,17 @@ +package com.univ.tracedin.domain.project; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public class Node { + + private String projectKey; + private String name; + private NodeType nodeType; + + public static Node of(String projectKey, String serviceName, NodeType nodeType) { + return new Node(projectKey, serviceName, nodeType); + } +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NodeType.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NodeType.java new file mode 100644 index 0000000..baffc0a --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NodeType.java @@ -0,0 +1,7 @@ +package com.univ.tracedin.domain.project; + +public enum NodeType { + SERVICE, + KAFKA, + DATABASE +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectId.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectId.java index 3bb68b0..1a8d722 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectId.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectId.java @@ -1,7 +1,11 @@ package com.univ.tracedin.domain.project; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + import com.univ.tracedin.domain.global.BaseId; +@NoArgsConstructor(access = AccessLevel.PROTECTED) public class ProjectId extends BaseId { public ProjectId(Long id) { diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectReader.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectReader.java index 831d331..2335222 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectReader.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectReader.java @@ -16,7 +16,7 @@ public List read(ProjectOwner projectOwner) { return projectRepository.getByOwner(projectOwner); } - public List readServiceNods(String projectKey) { + public List readServiceNods(String projectKey) { return projectRepository.findServiceNodeList(projectKey); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectRepository.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectRepository.java index a2946d8..5642d56 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectRepository.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectRepository.java @@ -8,5 +8,5 @@ public interface ProjectRepository { List getByOwner(ProjectOwner owner); - List findServiceNodeList(String projectKey); + List findServiceNodeList(String projectKey); } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectService.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectService.java index 9e16efa..063041f 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectService.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectService.java @@ -25,7 +25,7 @@ public ProjectKey create(UserId creatorId, ProjectInfo projectInfo) { return projectAppender.append(user, projectInfo); } - public List getServiceNodeList(String projectKey) { + public List getServiceNodeList(String projectKey) { return projectReader.readServiceNods(projectKey); } @@ -33,7 +33,7 @@ public NetworkTopology getNetworkTopology(String projectKey) { return networkTopologyBuilder.build(projectKey); } - public List getTraceHitMap(String projectKey) { - return hitMapReader.read(projectKey); + public List getTraceHitMap(String projectKey, String serviceName) { + return hitMapReader.read(projectKey, serviceName); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ServiceNode.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ServiceNode.java deleted file mode 100644 index fdeb209..0000000 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ServiceNode.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.univ.tracedin.domain.project; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -@Getter -@AllArgsConstructor -public class ServiceNode { - - private String projectKey; - private String name; - - public static ServiceNode of(String projectKey, String serviceName) { - return new ServiceNode(projectKey, serviceName); - } -} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/ConditionValidator.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/ConditionValidator.java new file mode 100644 index 0000000..262da82 --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/ConditionValidator.java @@ -0,0 +1,17 @@ +package com.univ.tracedin.domain.span; + +import org.springframework.stereotype.Component; + +@Component +public class ConditionValidator { + + public void validate(TraceSearchCond cond) { + if (cond.startTime() == null && cond.endTime() != null) { + throw new IllegalArgumentException("Invalid time condition"); + } + + if (cond.hasTimeRange() && cond.getEpochMillisStartTime() > cond.getEpochMillisEndTime()) { + throw new IllegalArgumentException("Invalid time condition"); + } + } +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/HitMapReaderImpl.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/HitMapReaderImpl.java index 1b2cc84..1595d85 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/HitMapReaderImpl.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/HitMapReaderImpl.java @@ -16,7 +16,7 @@ public class HitMapReaderImpl implements HitMapReader { private final SpanRepository spanRepository; @Override - public List read(String projectKey) { - return spanRepository.getTraceHitMapByProjectKey(projectKey); + public List read(String projectKey, String serviceName) { + return spanRepository.getTraceHitMapByProjectKey(projectKey, serviceName); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/NetworkTopologyBuilderImpl.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/NetworkTopologyBuilderImpl.java index c1bc51f..996c8e6 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/NetworkTopologyBuilderImpl.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/NetworkTopologyBuilderImpl.java @@ -1,9 +1,6 @@ package com.univ.tracedin.domain.span; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -11,27 +8,43 @@ import lombok.RequiredArgsConstructor; -import com.univ.tracedin.domain.project.Edge; -import com.univ.tracedin.domain.project.NetworkTopology; -import com.univ.tracedin.domain.project.NetworkTopologyBuilder; -import com.univ.tracedin.domain.project.ServiceNode; +import com.univ.tracedin.domain.project.*; @Component @RequiredArgsConstructor public class NetworkTopologyBuilderImpl implements NetworkTopologyBuilder { + private static final String KAFKA_NODE_NAME = "KAFKA"; private final SpanReader spanReader; @Override public NetworkTopology build(String projectKey) { - List clientSpans = spanReader.read(projectKey, SpanKind.CLIENT); - List serverSpans = spanReader.read(projectKey, SpanKind.SERVER); - return calculate(projectKey, clientSpans, serverSpans); + List clientSpans = spanReader.read(projectKey, SpanType.HTTP, SpanKind.CLIENT); + List serverSpans = spanReader.read(projectKey, SpanType.HTTP, SpanKind.SERVER); + List producerSpans = spanReader.read(projectKey, SpanType.UNKNOWN, SpanKind.PRODUCER); + List consumerSpans = spanReader.read(projectKey, SpanType.UNKNOWN, SpanKind.CONSUMER); + List dbSpans = spanReader.read(projectKey, SpanType.QUERY, SpanKind.CLIENT); + + Map nodeMap = new HashMap<>(); + Map edgeMap = new HashMap<>(); + + buildServiceNodesAndEdges(projectKey, clientSpans, serverSpans, nodeMap, edgeMap); + buildKafkaNodesAndEdges(projectKey, producerSpans, consumerSpans, nodeMap, edgeMap); + buildDatabaseNodesAndEdges(projectKey, dbSpans, nodeMap, edgeMap); + + List nodes = nodeMap.values().stream().toList(); + List edges = edgeMap.values().stream().toList(); + + return NetworkTopology.of(nodes, edges); } - public NetworkTopology calculate( - String projectKey, List clientSpans, List serverSpans) { - // CLIENT 스팬을 traceId와 spanId로 매핑 + private void buildServiceNodesAndEdges( + String projectKey, + List clientSpans, + List serverSpans, + Map nodeMap, + Map edgeMap) { + Map> clientSpanMap = clientSpans.stream() .collect( @@ -39,41 +52,88 @@ public NetworkTopology calculate( Span::getTraceId, Collectors.toMap(Span::getId, Function.identity()))); - // 노드 및 엣지 데이터를 저장할 맵 - Map nodeMap = new HashMap<>(); - Map edgeMap = new HashMap<>(); - for (Span serverSpan : serverSpans) { - TraceId traceId = serverSpan.getTraceId(); - SpanId serverParentId = serverSpan.getParentId(); - - Map clientSpansInTrace = clientSpanMap.get(traceId); + Map clientSpansInTrace = clientSpanMap.get(serverSpan.getTraceId()); if (clientSpansInTrace == null) { continue; } - Span clientSpan = clientSpansInTrace.get(serverParentId); - + Span clientSpan = clientSpansInTrace.get(serverSpan.getParentId()); if (clientSpan != null) { String sourceService = clientSpan.getServiceName(); String targetService = serverSpan.getServiceName(); - // 노드 추가 nodeMap.computeIfAbsent( - sourceService, service -> ServiceNode.of(projectKey, service)); + sourceService, service -> Node.of(projectKey, service, NodeType.SERVICE)); nodeMap.computeIfAbsent( - targetService, service -> ServiceNode.of(projectKey, service)); + targetService, service -> Node.of(projectKey, service, NodeType.SERVICE)); - // 엣지 추가 또는 업데이트 String edgeKey = sourceService + "->" + targetService; edgeMap.computeIfAbsent(edgeKey, k -> Edge.init(sourceService, targetService)) .incrementRequestCount(); } } + } + + private void buildKafkaNodesAndEdges( + String projectKey, + List producerSpans, + List consumerSpans, + Map nodeMap, + Map edgeMap) { + + // Kafka 노드 추가 + nodeMap.computeIfAbsent( + KAFKA_NODE_NAME, kafka -> Node.of(projectKey, kafka, NodeType.KAFKA)); + + // 프로듀서 스팬 처리 + for (Span producerSpan : producerSpans) { + String producerService = producerSpan.getServiceName(); + + nodeMap.computeIfAbsent( + producerService, service -> Node.of(projectKey, service, NodeType.SERVICE)); + + String edgeKey = producerService + "->" + KAFKA_NODE_NAME; + edgeMap.computeIfAbsent(edgeKey, k -> Edge.init(producerService, KAFKA_NODE_NAME)) + .incrementRequestCount(); + } + + // 컨슈머 스팬 처리 + for (Span consumerSpan : consumerSpans) { + String consumerService = consumerSpan.getServiceName(); + + nodeMap.computeIfAbsent( + consumerService, service -> Node.of(projectKey, service, NodeType.SERVICE)); + + String edgeKey = KAFKA_NODE_NAME + "->" + consumerService; + edgeMap.computeIfAbsent(edgeKey, k -> Edge.init(KAFKA_NODE_NAME, consumerService)) + .incrementRequestCount(); + } + } + + private void buildDatabaseNodesAndEdges( + String projectKey, + List dbSpans, + Map nodeMap, + Map edgeMap) { - List serviceNodes = new ArrayList<>(nodeMap.values()); - List edges = new ArrayList<>(edgeMap.values()); + for (Span dbSpan : dbSpans) { + Map attributes = dbSpan.getAttributes().data(); + String serviceName = dbSpan.getServiceName(); + String dbSystem = (String) attributes.get("db.system"); + + if (dbSystem == null) { + continue; // db.name 속성이 없으면 스킵 + } - return NetworkTopology.of(serviceNodes, edges); + nodeMap.computeIfAbsent( + serviceName, service -> Node.of(projectKey, service, NodeType.SERVICE)); + + nodeMap.computeIfAbsent(dbSystem, db -> Node.of(projectKey, db, NodeType.DATABASE)); + + String edgeKey = serviceName + "->" + dbSystem; + edgeMap.computeIfAbsent(edgeKey, k -> Edge.init(serviceName, dbSystem)) + .incrementRequestCount(); + } } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/Span.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/Span.java index 6dc354d..7d07173 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/Span.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/Span.java @@ -8,9 +8,11 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; +import lombok.NoArgsConstructor; @Builder @AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor(access = AccessLevel.PROTECTED) @Getter public class Span { diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanAppender.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanAppender.java index 17bbfde..7db8ba8 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanAppender.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanAppender.java @@ -12,7 +12,7 @@ public class SpanAppender { private final SpanRepository spanRepository; - public void append(List spans) { - spanRepository.save(spans); + public void appendAll(List spans) { + spanRepository.saveAll(spans); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanCollectedEvent.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanCollectedEvent.java new file mode 100644 index 0000000..a4db017 --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanCollectedEvent.java @@ -0,0 +1,15 @@ +package com.univ.tracedin.domain.span; + +import java.io.Serializable; +import java.util.List; + +public record SpanCollectedEvent(List spans) implements Serializable { + + public static SpanCollectedEvent from(List spans) { + return new SpanCollectedEvent(spans); + } + + public String getKey() { + return spans.stream().map(Span::getTraceId).findFirst().orElseThrow().getValue(); + } +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanCollectedMessagePublisher.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanCollectedMessagePublisher.java new file mode 100644 index 0000000..2f567aa --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanCollectedMessagePublisher.java @@ -0,0 +1,6 @@ +package com.univ.tracedin.domain.span; + +public interface SpanCollectedMessagePublisher { + + void publish(SpanCollectedEvent event); +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanId.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanId.java index d6c5da1..2a1ca75 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanId.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanId.java @@ -1,7 +1,11 @@ package com.univ.tracedin.domain.span; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + import com.univ.tracedin.domain.global.BaseId; +@NoArgsConstructor(access = AccessLevel.PROTECTED) public class SpanId extends BaseId { public SpanId(String id) { diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanKind.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanKind.java index ec9a310..cc6120f 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanKind.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanKind.java @@ -3,6 +3,8 @@ public enum SpanKind { CLIENT, SERVER, + PRODUCER, + CONSUMER, INTERNAL; public static SpanKind fromValue(String value) { @@ -11,6 +13,6 @@ public static SpanKind fromValue(String value) { return type; } } - throw new IllegalArgumentException("Unknown span type: " + value); + throw new IllegalArgumentException("Unknown span kind: " + value); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanReader.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanReader.java index a370e07..28d6bf5 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanReader.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanReader.java @@ -8,7 +8,6 @@ import com.univ.tracedin.common.dto.SearchCursor; import com.univ.tracedin.common.dto.SearchResult; -import com.univ.tracedin.domain.project.ServiceNode; @Component @RequiredArgsConstructor @@ -16,12 +15,12 @@ public class SpanReader { private final SpanRepository spanRepository; - public List read(String projectKey, SpanKind spanKind) { - return spanRepository.findByProjectKeyAndSpanKind(projectKey, spanKind); + public List read(String projectKey, SpanType spanType, SpanKind spanKind) { + return spanRepository.findByProjectKeyAndSpanKind(projectKey, spanType, spanKind); } - public SearchResult read(ServiceNode serviceNode, SearchCursor cursor) { - return spanRepository.findTracesByServiceNode(serviceNode, cursor); + public SearchResult read(TraceSearchCond cond, SearchCursor cursor) { + return spanRepository.findTracesByNode(cond, cursor); } public List read(TraceId traceId) { diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanRepository.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanRepository.java index 1ff82eb..c2325c2 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanRepository.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanRepository.java @@ -5,19 +5,18 @@ import com.univ.tracedin.common.dto.SearchCursor; import com.univ.tracedin.common.dto.SearchResult; import com.univ.tracedin.domain.project.EndTimeBucket; -import com.univ.tracedin.domain.project.ServiceNode; public interface SpanRepository { - void save(List spans); + void saveAll(List spans); Span findById(String id); - List findByProjectKeyAndSpanKind(String projectKey, SpanKind spanKind); + List findByProjectKeyAndSpanKind(String projectKey, SpanType spanType, SpanKind spanKind); - SearchResult findTracesByServiceNode(ServiceNode serviceNode, SearchCursor cursor); + SearchResult findTracesByNode(TraceSearchCond cond, SearchCursor cursor); List findByTraceId(TraceId traceId); - List getTraceHitMapByProjectKey(String projectKey); + List getTraceHitMapByProjectKey(String projectKey, String serviceName); } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanService.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanService.java index 1a68069..c54e113 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanService.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanService.java @@ -8,21 +8,22 @@ import com.univ.tracedin.common.dto.SearchCursor; import com.univ.tracedin.common.dto.SearchResult; -import com.univ.tracedin.domain.project.ServiceNode; @Service @RequiredArgsConstructor public class SpanService { - private final SpanAppender spanAppender; private final SpanReader spanReader; + private final ConditionValidator conditionValidator; + private final SpanCollectedMessagePublisher spanCollectedMessagePublisher; - public void appendSpan(List spans) { - spanAppender.append(spans); + public void publishSpans(List spans) { + spanCollectedMessagePublisher.publish(SpanCollectedEvent.from(spans)); } - public SearchResult getTraces(ServiceNode serviceNode, SearchCursor cursor) { - return spanReader.read(serviceNode, cursor); + public SearchResult getTraces(TraceSearchCond cond, SearchCursor cursor) { + conditionValidator.validate(cond); + return spanReader.read(cond, cursor); } public SpanTree getSpanTree(TraceId traceId) { diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanType.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanType.java index 4a18e7d..53f084c 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanType.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanType.java @@ -4,6 +4,7 @@ @Getter public enum SpanType { + UNKNOWN("unknown"), HTTP("http"), METHOD("method"), QUERY("query"); @@ -16,6 +17,7 @@ public enum SpanType { public static SpanType fromValue(String value) { for (SpanType type : SpanType.values()) { + if (type.value.equalsIgnoreCase(value)) { return type; } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/TraceId.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/TraceId.java index 82f17a8..67fef83 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/TraceId.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/TraceId.java @@ -1,7 +1,11 @@ package com.univ.tracedin.domain.span; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + import com.univ.tracedin.domain.global.BaseId; +@NoArgsConstructor(access = AccessLevel.PROTECTED) public class TraceId extends BaseId { public TraceId(String id) { diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/TraceSearchCond.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/TraceSearchCond.java new file mode 100644 index 0000000..58d45a1 --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/TraceSearchCond.java @@ -0,0 +1,21 @@ +package com.univ.tracedin.domain.span; + +import java.time.LocalDateTime; +import java.time.ZoneId; + +import com.univ.tracedin.domain.project.Node; + +public record TraceSearchCond(Node serviceNode, LocalDateTime startTime, LocalDateTime endTime) { + + public long getEpochMillisStartTime() { + return startTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + } + + public long getEpochMillisEndTime() { + return endTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + } + + public boolean hasTimeRange() { + return startTime != null && endTime != null; + } +} diff --git a/tracedin-infra/build.gradle b/tracedin-infra/build.gradle index 8f5e9dd..7ed5e00 100644 --- a/tracedin-infra/build.gradle +++ b/tracedin-infra/build.gradle @@ -10,6 +10,9 @@ dependencies { runtimeOnly 'com.mysql:mysql-connector-j' implementation 'org.springframework.boot:spring-boot-starter-data-jpa' + //kakfa + implementation 'org.springframework.kafka:spring-kafka' + implementation('org.springframework.boot:spring-boot-starter-data-elasticsearch') { exclude group: 'co.elastic.clients', module: 'elasticsearch-java' } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaConsumer.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaConsumer.java new file mode 100644 index 0000000..d65f6d9 --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaConsumer.java @@ -0,0 +1,9 @@ +package com.univ.tracedin.infra.kafka; + +import java.io.Serializable; +import java.util.List; + +public interface KafkaConsumer { + + void receive(List messages, List keys, List partitions, List offsets); +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaMessageHelper.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaMessageHelper.java new file mode 100644 index 0000000..69c3948 --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaMessageHelper.java @@ -0,0 +1,36 @@ +package com.univ.tracedin.infra.kafka; + +import java.util.concurrent.CompletableFuture; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Component +@Slf4j +public class KafkaMessageHelper { + + public CompletableFuture> getKafkaCallback(T payload) { + return new CompletableFuture<>() { + @Override + public boolean complete(SendResult value) { + RecordMetadata metadata = value.getRecordMetadata(); + log.info( + "Kafka message sent successfully: Topic: {} Partition: {} Offset: {} Timestamp: {}", + metadata.topic(), + metadata.partition(), + metadata.offset(), + metadata.timestamp()); + return super.complete(value); + } + + @Override + public boolean completeExceptionally(Throwable ex) { + log.error("Error while sending Kafka message: {}", payload.toString(), ex); + return super.completeExceptionally(ex); + } + }; + } +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducer.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducer.java new file mode 100644 index 0000000..7f81663 --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducer.java @@ -0,0 +1,11 @@ +package com.univ.tracedin.infra.kafka; + +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; + +import org.springframework.kafka.support.SendResult; + +public interface KafkaProducer { + + void send(String topicName, K key, V message, CompletableFuture> callback); +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducerImpl.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducerImpl.java new file mode 100644 index 0000000..89ed5a0 --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducerImpl.java @@ -0,0 +1,70 @@ +package com.univ.tracedin.infra.kafka; + +import java.io.Serializable; +import java.util.concurrent.CompletableFuture; + +import jakarta.annotation.PreDestroy; + +import org.springframework.kafka.KafkaException; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class KafkaProducerImpl + implements KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + public KafkaProducerImpl(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @Override + public void send( + String topicName, K key, V message, CompletableFuture> callback) { + log.info("Sending message={} to topic={}", message, topicName); + + try { + CompletableFuture> kafkaResultFuture = + kafkaTemplate.send(topicName, key, message); + kafkaResultFuture.whenComplete( + (result, ex) -> { + if (ex != null) { + log.error( + "Error while sending message to kafka with key: {}, message: {} and exception: {}", + key, + message, + ex.getMessage()); + callback.completeExceptionally(ex); + } else { + log.info( + "Message sent successfully to kafka with key: {}, message: {}", + key, + message); + callback.complete(result); + } + }); + + } catch (KafkaException e) { + log.error( + "Error on kafka producer with key: {}, message: {} and exception: {}", + key, + message, + e.getMessage()); + throw new RuntimeException( + "Error on kafka producer with key: " + key + " and message: " + message); + } + } + + @PreDestroy + public void close() { + if (kafkaTemplate != null) { + log.info("Closing kafka producer!"); + kafkaTemplate.destroy(); + } + } +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/KafkaConsumerConfig.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..2879d47 --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/KafkaConsumerConfig.java @@ -0,0 +1,82 @@ +package com.univ.tracedin.infra.kafka.config; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import com.univ.tracedin.infra.kafka.config.properties.KafkaConfigData; +import com.univ.tracedin.infra.kafka.config.properties.KafkaConsumerConfigData; + +@Configuration +public class KafkaConsumerConfig { + + private final KafkaConfigData kafkaConfigData; + private final KafkaConsumerConfigData kafkaConsumerConfigData; + + public KafkaConsumerConfig( + KafkaConfigData kafkaConfigData, KafkaConsumerConfigData kafkaConsumerConfigData) { + this.kafkaConfigData = kafkaConfigData; + this.kafkaConsumerConfigData = kafkaConsumerConfigData; + } + + @Bean + public Map consumerConfigs() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.getBootstrapServers()); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + kafkaConsumerConfigData.getKeyDeserializer()); + props.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + kafkaConsumerConfigData.getValueDeserializer()); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + kafkaConsumerConfigData.getAutoOffsetReset()); + props.put( + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, + kafkaConsumerConfigData.getSessionTimeoutMs()); + props.put( + ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, + kafkaConsumerConfigData.getHeartbeatIntervalMs()); + props.put( + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, + kafkaConsumerConfigData.getMaxPollIntervalMs()); + props.put( + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, + kafkaConsumerConfigData.getMaxPartitionFetchBytesDefault() + * kafkaConsumerConfigData.getMaxPartitionFetchBytesBoostFactor()); + props.put( + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + kafkaConsumerConfigData.getMaxPollRecords()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); + return props; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public KafkaListenerContainerFactory> + kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(kafkaConsumerConfigData.getBatchListener()); + factory.setConcurrency(kafkaConsumerConfigData.getConcurrencyLevel()); + factory.setAutoStartup(kafkaConsumerConfigData.getAutoStartup()); + factory.getContainerProperties().setPollTimeout(kafkaConsumerConfigData.getPollTimeoutMs()); + return factory; + } +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/KafkaProducerConfig.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/KafkaProducerConfig.java new file mode 100644 index 0000000..c3e708d --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,64 @@ +package com.univ.tracedin.infra.kafka.config; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import com.univ.tracedin.infra.kafka.config.properties.KafkaConfigData; +import com.univ.tracedin.infra.kafka.config.properties.KafkaProducerConfigData; + +@Configuration +public class KafkaProducerConfig { + + private final KafkaConfigData kafkaConfigData; + private final KafkaProducerConfigData kafkaProducerConfigData; + + public KafkaProducerConfig( + KafkaConfigData kafkaConfigData, KafkaProducerConfigData kafkaProducerConfigData) { + this.kafkaConfigData = kafkaConfigData; + this.kafkaProducerConfigData = kafkaProducerConfigData; + } + + @Bean + public Map producerConfig() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.getBootstrapServers()); + props.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + kafkaProducerConfigData.getKeySerializerClass()); + props.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + kafkaProducerConfigData.getValueSerializerClass()); + props.put( + ProducerConfig.BATCH_SIZE_CONFIG, + kafkaProducerConfigData.getBatchSize() + * kafkaProducerConfigData.getBatchSizeBoostFactor()); + props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfigData.getLingerMs()); + props.put( + ProducerConfig.COMPRESSION_TYPE_CONFIG, + kafkaProducerConfigData.getCompressionType()); + props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfigData.getAcks()); + props.put( + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, + kafkaProducerConfigData.getRequestTimeoutMs()); + props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerConfigData.getRetryCount()); + return props; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfig()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/properties/KafkaConfigData.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/properties/KafkaConfigData.java new file mode 100644 index 0000000..90286aa --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/properties/KafkaConfigData.java @@ -0,0 +1,16 @@ +package com.univ.tracedin.infra.kafka.config.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; + +@Data +@Configuration +@ConfigurationProperties(prefix = "kafka-config") +public class KafkaConfigData { + + private String bootstrapServers; + private Integer numOfPartitions; + private Short replicationFactor; +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/properties/KafkaConsumerConfigData.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/properties/KafkaConsumerConfigData.java new file mode 100644 index 0000000..88b57d3 --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/properties/KafkaConsumerConfigData.java @@ -0,0 +1,26 @@ +package com.univ.tracedin.infra.kafka.config.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; + +@Data +@Configuration +@ConfigurationProperties(prefix = "kafka-consumer-config") +public class KafkaConsumerConfigData { + + private String keyDeserializer; + private String valueDeserializer; + private String autoOffsetReset; + private Boolean batchListener; + private Boolean autoStartup; + private Integer concurrencyLevel; + private Integer sessionTimeoutMs; + private Integer heartbeatIntervalMs; + private Integer maxPollIntervalMs; + private Long pollTimeoutMs; + private Integer maxPollRecords; + private Integer maxPartitionFetchBytesDefault; + private Integer maxPartitionFetchBytesBoostFactor; +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/properties/KafkaProducerConfigData.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/properties/KafkaProducerConfigData.java new file mode 100644 index 0000000..a2a4d59 --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/config/properties/KafkaProducerConfigData.java @@ -0,0 +1,22 @@ +package com.univ.tracedin.infra.kafka.config.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; + +@Data +@Configuration +@ConfigurationProperties(prefix = "kafka-producer-config") +public class KafkaProducerConfigData { + + private String keySerializerClass; + private String valueSerializerClass; + private String compressionType; + private String acks; + private Integer batchSize; + private Integer batchSizeBoostFactor; + private Integer lingerMs; + private Integer requestTimeoutMs; + private Integer retryCount; +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/listener/MetricCollectedKafkaListener.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/listener/MetricCollectedKafkaListener.java new file mode 100644 index 0000000..c106085 --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/listener/MetricCollectedKafkaListener.java @@ -0,0 +1,42 @@ +package com.univ.tracedin.infra.metric.messaging.listener; + +import java.util.List; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import com.univ.tracedin.domain.metric.ServiceMetrics; +import com.univ.tracedin.domain.metric.ServiceMetricsAppender; +import com.univ.tracedin.domain.metric.ServiceMetricsCollectedEvent; +import com.univ.tracedin.infra.kafka.KafkaConsumer; + +@Slf4j +@Component +@RequiredArgsConstructor +public class MetricCollectedKafkaListener implements KafkaConsumer { + + private final ServiceMetricsAppender serviceMetricsAppender; + + @Override + @KafkaListener( + id = "${kafka-consumer-config.service-metrics-group-id}", + topics = "${kafka.topic.service-metrics}") + public void receive( + List messages, + List keys, + List partitions, + List offsets) { + log.info( + "{} number of service metrics collected events received with keys:{}, partitions:{} and offsets: {}", + messages.size(), + keys.toString(), + partitions.toString(), + offsets.toString()); + List metrics = + messages.stream().map(ServiceMetricsCollectedEvent::serviceMetrics).toList(); + serviceMetricsAppender.appendAll(metrics); + } +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/publish/ServiceMetricsCollectedKafkaEventPublisher.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/publish/ServiceMetricsCollectedKafkaEventPublisher.java new file mode 100644 index 0000000..57a599c --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/publish/ServiceMetricsCollectedKafkaEventPublisher.java @@ -0,0 +1,34 @@ +package com.univ.tracedin.infra.metric.messaging.publish; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import com.univ.tracedin.domain.metric.ServiceMetricsCollectedEvent; +import com.univ.tracedin.domain.metric.ServiceMetricsCollectedMessagePublisher; +import com.univ.tracedin.infra.kafka.KafkaMessageHelper; +import com.univ.tracedin.infra.kafka.KafkaProducer; + +@Slf4j +@Component +@RequiredArgsConstructor +public class ServiceMetricsCollectedKafkaEventPublisher + implements ServiceMetricsCollectedMessagePublisher { + + private final KafkaProducer kafkaProducer; + private final KafkaMessageHelper kafkaMessageHelper; + + @Value("${kafka.topic.service-metrics}") + private String serviceMetricsTopic; + + @Override + public void publish(ServiceMetricsCollectedEvent serviceMetricsCollectedEvent) { + kafkaProducer.send( + serviceMetricsTopic, + serviceMetricsCollectedEvent.getKey(), + serviceMetricsCollectedEvent, + kafkaMessageHelper.getKafkaCallback(serviceMetricsCollectedEvent)); + } +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/repository/ServiceMetricsCoreRepository.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/repository/ServiceMetricsCoreRepository.java index 141fe54..b79d1cc 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/repository/ServiceMetricsCoreRepository.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/repository/ServiceMetricsCoreRepository.java @@ -11,7 +11,7 @@ import com.univ.tracedin.domain.metric.HttpRequestCount; import com.univ.tracedin.domain.metric.ServiceMetrics; import com.univ.tracedin.domain.metric.ServiceMetricsRepository; -import com.univ.tracedin.domain.project.ServiceNode; +import com.univ.tracedin.domain.project.Node; import com.univ.tracedin.infra.metric.document.ServiceMetricsDocument; @Repository @@ -21,13 +21,15 @@ public class ServiceMetricsCoreRepository implements ServiceMetricsRepository { private final ServiceMetricsElasticSearchRepository serviceMetricsElasticSearchRepository; - public void save(ServiceMetrics metrics) { - serviceMetricsElasticSearchRepository.save(ServiceMetricsDocument.from(metrics)); + public void saveAll(List metrics) { + List documents = + metrics.stream().map(ServiceMetricsDocument::from).toList(); + serviceMetricsElasticSearchRepository.saveAll(documents); } @Override - public List getHttpRequestCount(ServiceNode serviceNode) { + public List getHttpRequestCount(Node node) { return serviceMetricsElasticSearchRepository.getHttpRequestCount( - serviceNode.getProjectKey(), serviceNode.getName()); + node.getProjectKey(), node.getName()); } } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/project/repository/ProjectRepositoryAdapter.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/project/repository/ProjectRepositoryAdapter.java index cf1220e..c3b7ae9 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/project/repository/ProjectRepositoryAdapter.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/project/repository/ProjectRepositoryAdapter.java @@ -8,10 +8,11 @@ import lombok.RequiredArgsConstructor; +import com.univ.tracedin.domain.project.Node; +import com.univ.tracedin.domain.project.NodeType; import com.univ.tracedin.domain.project.Project; import com.univ.tracedin.domain.project.ProjectOwner; import com.univ.tracedin.domain.project.ProjectRepository; -import com.univ.tracedin.domain.project.ServiceNode; import com.univ.tracedin.infra.project.entity.ProjectEntity; import com.univ.tracedin.infra.span.repository.SpanElasticSearchRepository; @@ -36,8 +37,10 @@ public List getByOwner(ProjectOwner owner) { } @Override - public List findServiceNodeList(String projectKey) { + public List findServiceNodeList(String projectKey) { List serviceNames = spanElasticSearchRepository.findServiceNames(projectKey); - return serviceNames.stream().map(name -> ServiceNode.of(projectKey, name)).toList(); + return serviceNames.stream() + .map(name -> Node.of(projectKey, name, NodeType.SERVICE)) + .toList(); } } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/listener/SpanCollectedKafkaListener.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/listener/SpanCollectedKafkaListener.java new file mode 100644 index 0000000..78635eb --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/listener/SpanCollectedKafkaListener.java @@ -0,0 +1,42 @@ +package com.univ.tracedin.infra.span.messaging.listener; + +import java.util.List; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import com.univ.tracedin.domain.span.Span; +import com.univ.tracedin.domain.span.SpanAppender; +import com.univ.tracedin.domain.span.SpanCollectedEvent; +import com.univ.tracedin.infra.kafka.KafkaConsumer; + +@Slf4j +@Component +@RequiredArgsConstructor +public class SpanCollectedKafkaListener implements KafkaConsumer { + + private final SpanAppender spanAppender; + + @Override + @KafkaListener(id = "${kafka-consumer-config.span-group-id}", topics = "${kafka.topic.span}") + public void receive( + List messages, + List keys, + List partitions, + List offsets) { + log.info( + "{} number of span collected events received with keys:{}, partitions:{} and offsets: {}", + messages.size(), + keys.toString(), + partitions.toString(), + offsets.toString()); + + List spans = + messages.stream().map(SpanCollectedEvent::spans).flatMap(List::stream).toList(); + + spanAppender.appendAll(spans); + } +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/publisher/SpanCollectedKafkaEventPublisher.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/publisher/SpanCollectedKafkaEventPublisher.java new file mode 100644 index 0000000..ab26d6c --- /dev/null +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/publisher/SpanCollectedKafkaEventPublisher.java @@ -0,0 +1,30 @@ +package com.univ.tracedin.infra.span.messaging.publisher; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import com.univ.tracedin.domain.span.SpanCollectedEvent; +import com.univ.tracedin.domain.span.SpanCollectedMessagePublisher; +import com.univ.tracedin.infra.kafka.KafkaMessageHelper; +import com.univ.tracedin.infra.kafka.KafkaProducer; + +@Slf4j +@Component +@RequiredArgsConstructor +public class SpanCollectedKafkaEventPublisher implements SpanCollectedMessagePublisher { + + private final KafkaProducer kafkaProducer; + private final KafkaMessageHelper kafkaMessageHelper; + + @Value("${kafka.topic.span}") + private String spanTopic; + + @Override + public void publish(SpanCollectedEvent event) { + kafkaProducer.send( + spanTopic, event.getKey(), event, kafkaMessageHelper.getKafkaCallback(event)); + } +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanCoreRepository.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanCoreRepository.java index 0bb367d..1a43700 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanCoreRepository.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanCoreRepository.java @@ -10,12 +10,13 @@ import com.univ.tracedin.common.dto.SearchCursor; import com.univ.tracedin.common.dto.SearchResult; import com.univ.tracedin.domain.project.EndTimeBucket; -import com.univ.tracedin.domain.project.ServiceNode; import com.univ.tracedin.domain.span.Span; import com.univ.tracedin.domain.span.SpanKind; import com.univ.tracedin.domain.span.SpanRepository; +import com.univ.tracedin.domain.span.SpanType; import com.univ.tracedin.domain.span.Trace; import com.univ.tracedin.domain.span.TraceId; +import com.univ.tracedin.domain.span.TraceSearchCond; import com.univ.tracedin.infra.span.document.SpanDocument; @Transactional @@ -26,7 +27,7 @@ public class SpanCoreRepository implements SpanRepository { private final SpanElasticSearchRepository spanElasticSearchRepository; @Override - public void save(List spans) { + public void saveAll(List spans) { spanElasticSearchRepository.saveAll(spans.stream().map(SpanDocument::from).toList()); } @@ -39,20 +40,16 @@ public Span findById(String id) { } @Override - public List findByProjectKeyAndSpanKind(String projectKey, SpanKind spanKind) { - return spanElasticSearchRepository.search(projectKey, spanKind).stream() + public List findByProjectKeyAndSpanKind( + String projectKey, SpanType spanType, SpanKind spanKind) { + return spanElasticSearchRepository.search(projectKey, spanType, spanKind).stream() .map(SpanDocument::toSpan) .toList(); } @Override - public SearchResult findTracesByServiceNode( - ServiceNode serviceNode, SearchCursor cursor) { - return spanElasticSearchRepository.findTracesByServiceNode( - serviceNode.getProjectKey(), - serviceNode.getName(), - cursor.size(), - cursor.afterKey()); + public SearchResult findTracesByNode(TraceSearchCond cond, SearchCursor cursor) { + return spanElasticSearchRepository.findTracesByNode(cond, cursor.size(), cursor.afterKey()); } @Override @@ -63,7 +60,7 @@ public List findByTraceId(TraceId traceId) { } @Override - public List getTraceHitMapByProjectKey(String projectKey) { - return spanElasticSearchRepository.getTraceHitMapByProjectKey(projectKey); + public List getTraceHitMapByProjectKey(String projectKey, String serviceName) { + return spanElasticSearchRepository.getTraceHitMapByProjectKey(projectKey, serviceName); } } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustom.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustom.java index 49d7ae1..1405e42 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustom.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustom.java @@ -6,19 +6,21 @@ import com.univ.tracedin.common.dto.SearchResult; import com.univ.tracedin.domain.project.EndTimeBucket; import com.univ.tracedin.domain.span.SpanKind; +import com.univ.tracedin.domain.span.SpanType; import com.univ.tracedin.domain.span.Trace; +import com.univ.tracedin.domain.span.TraceSearchCond; import com.univ.tracedin.infra.span.document.SpanDocument; public interface SpanElasticSearchRepositoryCustom { - List search(String projectKey, SpanKind spanKind); + List search(String projectKey, SpanType spanType, SpanKind spanKind); List findServiceNames(String projectKey); - SearchResult findTracesByServiceNode( - String projectKey, String serviceName, int size, Map afterKey); + SearchResult findTracesByNode( + TraceSearchCond cond, int size, Map afterKey); List findByTraceId(String traceId); - List getTraceHitMapByProjectKey(String projectKey); + List getTraceHitMapByProjectKey(String projectKey, String serviceName); } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustomImpl.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustomImpl.java index 4e036cc..1a5a47b 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustomImpl.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustomImpl.java @@ -19,6 +19,7 @@ import com.univ.tracedin.domain.span.SpanType; import com.univ.tracedin.domain.span.Trace; import com.univ.tracedin.domain.span.TraceId; +import com.univ.tracedin.domain.span.TraceSearchCond; import com.univ.tracedin.infra.span.document.SpanDocument; import co.elastic.clients.elasticsearch.ElasticsearchClient; @@ -36,11 +37,13 @@ import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket; import co.elastic.clients.elasticsearch._types.aggregations.TopHitsAggregate; import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery; +import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery.Builder; import co.elastic.clients.elasticsearch._types.query_dsl.Query; import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders; import co.elastic.clients.elasticsearch.core.SearchResponse; import co.elastic.clients.elasticsearch.core.search.Hit; import co.elastic.clients.json.JsonData; +import io.micrometer.common.util.StringUtils; @Slf4j @RequiredArgsConstructor @@ -52,8 +55,8 @@ public class SpanElasticSearchRepositoryCustomImpl implements SpanElasticSearchR private final ElasticsearchClient client; @Override - public List search(String projectKey, SpanKind spanKind) { - Query spanQuery = createSpanQuery(projectKey, spanKind); + public List search(String projectKey, SpanType spanType, SpanKind spanKind) { + Query spanQuery = createSpanQuery(projectKey, spanType, spanKind); return executeESQuery( () -> @@ -102,8 +105,8 @@ public List findServiceNames(String projectKey) { } @Override - public SearchResult findTracesByServiceNode( - String projectKey, String serviceName, int size, Map afterKey) { + public SearchResult findTracesByNode( + TraceSearchCond cond, int size, Map afterKey) { return executeESQuery( () -> { SearchResponse response = @@ -111,9 +114,7 @@ public SearchResult findTracesByServiceNode( s -> s.index(INDEX_NAME) .size(0) - .query( - createServiceSpanQuery( - projectKey, serviceName)) + .query(createServiceSpanQuery(cond)) .aggregations( "by_trace", createTraceAggregation(size, afterKey)) @@ -153,8 +154,8 @@ public List findByTraceId(String traceId) { } @Override - public List getTraceHitMapByProjectKey(String projectKey) { - Query query = rootSpanQuery(projectKey); + public List getTraceHitMapByProjectKey(String projectKey, String serviceName) { + Query query = rootSpanQuery(projectKey, serviceName); Aggregation aggregation = histogramAggregation(); return executeESQuery( @@ -246,14 +247,26 @@ private Aggregation histogramAggregation() { .aggregations(Map.of("attributes_nested", attributesNestedAgg))); } - private Query rootSpanQuery(String projectKey) { + private Query rootSpanQuery(String projectKey, String serviceName) { + if (StringUtils.isBlank(serviceName)) { + return QueryBuilders.bool( + b -> + b.must( + QueryBuilders.term( + t -> t.field("projectKey").value(projectKey)), + QueryBuilders.term(t -> t.field("spanType").value("HTTP")), + QueryBuilders.term( + t -> + t.field("parentSpanId") + .value("0000000000000000")))); + } + return QueryBuilders.bool( b -> b.must( QueryBuilders.term(t -> t.field("projectKey").value(projectKey)), - QueryBuilders.term(t -> t.field("spanType").value("HTTP")), - QueryBuilders.term( - t -> t.field("parentSpanId").value("0000000000000000")))); + QueryBuilders.term(t -> t.field("serviceName").value(serviceName)), + QueryBuilders.term(t -> t.field("spanType").value("HTTP")))); } private List extractTraces(CompositeAggregate spansByTrace) { @@ -344,27 +357,39 @@ private Aggregation createTraceAggregation(int size, Map afterKe "attributes.data.http.status_code"))))))); } - private static Query createServiceSpanQuery(String projectKey, String serviceName) { - return QueryBuilders.bool( - b -> - b.must( - List.of( - QueryBuilders.term( - t -> t.field("projectKey").value(projectKey)), - QueryBuilders.term( - t -> t.field("serviceName").value(serviceName)), - QueryBuilders.term(t -> t.field("spanType").value("HTTP")), - QueryBuilders.term( - t -> - t.field("parentSpanId") - .value("0000000000000000"))))); + private Query createServiceSpanQuery(TraceSearchCond cond) { + Builder bool = QueryBuilders.bool(); + + bool.must( + QueryBuilders.term( + t -> t.field("projectKey").value(cond.serviceNode().getProjectKey()))); + bool.must( + QueryBuilders.term( + t -> t.field("serviceName").value(cond.serviceNode().getName()))); + bool.must(QueryBuilders.term(t -> t.field("spanType").value("HTTP"))); + bool.must(QueryBuilders.term(t -> t.field("kind").value("SERVER"))); + + if (cond.hasTimeRange()) { + bool.must( + QueryBuilders.range( + r -> + r.field("startEpochMillis") + .gte(JsonData.of(cond.getEpochMillisStartTime())))); + bool.must( + QueryBuilders.range( + r -> + r.field("endEpochMillis") + .lte(JsonData.of(cond.getEpochMillisEndTime())))); + } + + return bool.build()._toQuery(); } private Aggregation createServiceNameAggregation() { return Aggregation.of(a -> a.terms(t -> t.field("serviceName").size(100))); } - private Query createSpanQuery(String projectKey, SpanKind spanKind) { + private Query createSpanQuery(String projectKey, SpanType spanType, SpanKind spanKind) { return BoolQuery.of( b -> b.must( @@ -376,9 +401,7 @@ private Query createSpanQuery(String projectKey, SpanKind spanKind) { QueryBuilders.term( t -> t.field("spanType") - .value( - SpanType.HTTP - .name())), + .value(spanType.name())), QueryBuilders.term( t -> t.field("kind").value(spanKind.name())) // , diff --git a/tracedin-infra/src/main/resources/application-infra.yml b/tracedin-infra/src/main/resources/application-infra.yml index 45a90bb..484c499 100644 --- a/tracedin-infra/src/main/resources/application-infra.yml +++ b/tracedin-infra/src/main/resources/application-infra.yml @@ -20,11 +20,6 @@ elasticsearch: client: host: ${ELASTIC_HOST} port: 9200 - -redis: - host: localhost - port: 6379 - logging: level: tracer: trace @@ -35,6 +30,50 @@ logging: client: WIRE: trace +kafka: + topic: + span: spanCollectedEvent + service-metrics: serviceMetricsCollectedEvent + +kafka-config: + bootstrap-servers: localhost:19092, localhost:29092, localhost:39092 + num-of-partitions: 3 + replication-factor: 3 + +kafka-producer-config: + key-serializer-class: org.apache.kafka.common.serialization.StringSerializer + value-serializer-class: org.springframework.kafka.support.serializer.JsonSerializer + compression-type: none + acks: all + batch-size: 16384 + batch-size-boost-factor: 100 + linger-ms: 5 + request-timeout-ms: 60000 + retry-count: 5 + +kafka-consumer-config: + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + span-group-id: tracedin-span + service-metrics-group-id: tracedin-service-metrics + auto-offset-reset: earliest + batch-listener: true + auto-startup: true + concurrency-level: 3 + session-timeout-ms: 10000 + heartbeat-interval-ms: 3000 + max-poll-interval-ms: 300000 + max-poll-records: 500 + max-partition-fetch-bytes-default: 1048576 + max-partition-fetch-bytes-boost-factor: 1 + poll-timeout-ms: 150 + +redis: + host: localhost + port: 6379 + + + --- spring: config: @@ -61,3 +100,41 @@ elasticsearch: redis: host: redis port: 6379 + +kafka: + topic: + span: spanCollectedEvent + service-metrics: serviceMetricsCollectedEvent + +kafka-config: + bootstrap-servers: kafka-broker-1:9092, kafka-broker-3:9092, kafka-broker-3:9092 + num-of-partitions: 3 + replication-factor: 3 + +kafka-producer-config: + key-serializer-class: org.apache.kafka.common.serialization.StringSerializer + value-serializer-class: org.springframework.kafka.support.serializer.JsonSerializer + compression-type: none + acks: all + batch-size: 16384 + batch-size-boost-factor: 100 + linger-ms: 5 + request-timeout-ms: 60000 + retry-count: 5 + +kafka-consumer-config: + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + span-group-id: tracedin-span + service-metrics-group-id: tracedin-service-metrics + auto-offset-reset: earliest + batch-listener: true + auto-startup: true + concurrency-level: 3 + session-timeout-ms: 10000 + heartbeat-interval-ms: 3000 + max-poll-interval-ms: 300000 + max-poll-records: 500 + max-partition-fetch-bytes-default: 1048576 + max-partition-fetch-bytes-boost-factor: 1 + poll-timeout-ms: 150 \ No newline at end of file