From 0672f8ac1df353492dde0dd676e21e62597303f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B0=95=EC=A7=80=EC=9B=90?= Date: Fri, 1 Nov 2024 15:48:25 +0900 Subject: [PATCH] =?UTF-8?q?feat=20:=20=EC=84=9C=EB=B9=84=EC=8A=A4=20?= =?UTF-8?q?=EC=97=94=EB=93=9C=ED=8F=AC=EC=9D=B8=ED=8A=B8=20=EB=A6=AC?= =?UTF-8?q?=EC=8A=A4=ED=8A=B8=20=EC=A1=B0=ED=9A=8C=20API=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 네트워크 토폴로지 로직 리팩토링 CompletableFuture를 이용해서 병렬로 처리 및 Builder 클래스를 통해 추상화하여 가독성 향상 --- infra/kafka/kafka_cluster.yml | 1 + .../univ/tracedin/api/project/ProjectApi.java | 29 ++- .../tracedin/api/project/ProjectApiDocs.java | 8 +- .../api/project/dto/ServiceSearchRequest.java | 13 ++ .../api/project/dto/TraceSearchRequest.java | 10 +- .../domain/metric/ServiceMetricsService.java | 1 - .../tracedin/domain/project/EndPointUrl.java | 16 ++ .../domain/project/NetworkTopology.java | 14 +- .../project/NetworkTopologyAnalyer.java | 8 + .../project/NetworkTopologyBuilder.java | 6 - .../domain/project/ProjectAnalyzer.java | 4 + .../domain/project/ProjectService.java | 32 +-- .../project/ServiceSearchCondition.java | 26 +++ .../domain/project/TraceSearchCondition.java | 25 ++- .../NetworkTopologyBuildException.java | 13 ++ .../project/exception/ProjectErrorCode.java | 1 + .../domain/span/AbstractTopologyBuilder.java | 32 +++ .../domain/span/NetworkTopologyBuilder.java | 135 +++++++++++++ .../univ/tracedin/domain/span/SpanReader.java | 26 +++ .../tracedin/domain/span/SpanRepository.java | 4 + .../domain/span/SpanStatisticsAnalyzer.java | 172 ++++------------ .../univ/tracedin/domain/span/Topology.java | 23 +++ .../infra/kafka/KafkaMessageHelper.java | 11 +- .../infra/kafka/KafkaProducerImpl.java | 4 +- .../publish/ServiceMetricsKafkaPublisher.java | 3 +- .../publisher/SpanKafkaEventPublisher.java | 3 +- .../span/repository/SpanCoreRepository.java | 11 +- .../SpanElasticSearchRepositoryCustom.java | 4 + ...SpanElasticSearchRepositoryCustomImpl.java | 191 +++++++++++------- 29 files changed, 564 insertions(+), 262 deletions(-) create mode 100644 tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/dto/ServiceSearchRequest.java create mode 100644 tracedin-domain/src/main/java/com/univ/tracedin/domain/project/EndPointUrl.java create mode 100644 tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopologyAnalyer.java delete mode 100644 tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopologyBuilder.java create mode 100644 tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ServiceSearchCondition.java create mode 100644 tracedin-domain/src/main/java/com/univ/tracedin/domain/project/exception/NetworkTopologyBuildException.java create mode 100644 tracedin-domain/src/main/java/com/univ/tracedin/domain/span/AbstractTopologyBuilder.java create mode 100644 tracedin-domain/src/main/java/com/univ/tracedin/domain/span/NetworkTopologyBuilder.java create mode 100644 tracedin-domain/src/main/java/com/univ/tracedin/domain/span/Topology.java diff --git a/infra/kafka/kafka_cluster.yml b/infra/kafka/kafka_cluster.yml index b85371d..e03d84f 100755 --- a/infra/kafka/kafka_cluster.yml +++ b/infra/kafka/kafka_cluster.yml @@ -2,6 +2,7 @@ services: kafka-broker-1: image: confluentinc/cp-kafka:latest hostname: kafka-broker-1 + restart: always depends_on: - zookeeper ports: diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApi.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApi.java index 96083a1..47d7d6f 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApi.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApi.java @@ -18,8 +18,9 @@ import com.univ.tracedin.api.project.dto.CreateProjectRequest; import com.univ.tracedin.api.project.dto.NodeResponse; import com.univ.tracedin.api.project.dto.ProjectResponse; +import com.univ.tracedin.api.project.dto.ServiceSearchRequest; import com.univ.tracedin.api.project.dto.TraceSearchRequest; -import com.univ.tracedin.domain.project.NetworkTopology; +import com.univ.tracedin.domain.project.EndPointUrl; import com.univ.tracedin.domain.project.ProjectId; import com.univ.tracedin.domain.project.ProjectKey; import com.univ.tracedin.domain.project.ProjectMember.MemberRole; @@ -27,6 +28,7 @@ import com.univ.tracedin.domain.project.ProjectService; import com.univ.tracedin.domain.project.ProjectStatistic; import com.univ.tracedin.domain.project.ProjectStatistic.StatisticsType; +import com.univ.tracedin.domain.span.Topology; import com.univ.tracedin.domain.user.UserId; @RestController @@ -36,6 +38,7 @@ public class ProjectApi implements ProjectApiDocs { private final ProjectService projectService; + @Override @PostMapping public Response createProject( @RequestBody CreateProjectRequest request, Long userId) { @@ -43,35 +46,50 @@ public Response createProject( projectService.create(UserId.from(userId), request.toProjectInfo())); } + @Override @GetMapping public Response> projectList(Long userId) { - List responses = + final List responses = projectService.getProjectList(UserId.from(userId)).stream() .map(ProjectResponse::from) .toList(); return Response.success(responses); } + @Override @DeleteMapping("/{projectId}") public Response deleteProject(@PathVariable Long projectId) { projectService.deleteProject(ProjectId.from(projectId)); return Response.success(); } + @Override @GetMapping("/{projectKey}/service-nodes") public Response> serviceNodes(@PathVariable String projectKey) { - List responses = + final List responses = projectService.getServiceNodeList(ProjectKey.from(projectKey)).stream() .map(NodeResponse::from) .toList(); return Response.success(responses); } + @Override + @GetMapping("/service-endpoints") + public Response> serviceEndpoints(ServiceSearchRequest request) { + final List response = + projectService.getServiceEndpoints(request.toCondition()).stream() + .map(EndPointUrl::value) + .toList(); + return Response.success(response); + } + + @Override @GetMapping("/{projectKey}/network-topology") - public Response networkTopology(@PathVariable String projectKey) { + public Response networkTopology(@PathVariable String projectKey) { return Response.success(projectService.getNetworkTopology(ProjectKey.from(projectKey))); } + @Override @GetMapping("/statistics/{statisticsType}") public Response> statistics( @PathVariable StatisticsType statisticsType, TraceSearchRequest request) { @@ -79,6 +97,7 @@ public Response> statistics( projectService.getStatistics(request.toCondition(), statisticsType)); } + @Override @PostMapping("/{projectId}/members") public Response addMember(@PathVariable Long projectId, AddMemberRequest request) { projectService.addMember( @@ -86,12 +105,14 @@ public Response addMember(@PathVariable Long projectId, AddMemberRequest r return Response.success(); } + @Override @DeleteMapping("/members/{projectMemberId}") public Response removeMember(@PathVariable Long projectMemberId) { projectService.removeMember(ProjectMemberId.from(projectMemberId)); return Response.success(); } + @Override @PatchMapping("/members/{projectMemberId}") public Response changeRole(@PathVariable Long projectMemberId, MemberRole targetRole) { projectService.changeRole(ProjectMemberId.from(projectMemberId), targetRole); diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApiDocs.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApiDocs.java index acdd752..fa519ea 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApiDocs.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/ProjectApiDocs.java @@ -9,12 +9,13 @@ import com.univ.tracedin.api.project.dto.CreateProjectRequest; import com.univ.tracedin.api.project.dto.NodeResponse; import com.univ.tracedin.api.project.dto.ProjectResponse; +import com.univ.tracedin.api.project.dto.ServiceSearchRequest; import com.univ.tracedin.api.project.dto.TraceSearchRequest; -import com.univ.tracedin.domain.project.NetworkTopology; import com.univ.tracedin.domain.project.ProjectKey; import com.univ.tracedin.domain.project.ProjectMember.MemberRole; import com.univ.tracedin.domain.project.ProjectStatistic; import com.univ.tracedin.domain.project.ProjectStatistic.StatisticsType; +import com.univ.tracedin.domain.span.Topology; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -34,8 +35,11 @@ public interface ProjectApiDocs { @Operation(summary = "서비스 리스트 조회", description = "프로젝트의 서비스 노드 리스트를 조회합니다.") Response> serviceNodes(String projectKey); + @Operation(summary = "서비스의 엔드포인트 리스트 조회", description = "프로젝트의 서비스의 엔드포인트 리스트를 조회합니다.") + Response> serviceEndpoints(ServiceSearchRequest request); + @Operation(summary = "네트워크 토폴로지 조회", description = "프로젝트의 네트워크 토폴로지를 조회합니다.") - Response networkTopology(String projectKey); + Response networkTopology(String projectKey); @Operation( summary = "프로젝트 통계 조회", diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/dto/ServiceSearchRequest.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/dto/ServiceSearchRequest.java new file mode 100644 index 0000000..2637108 --- /dev/null +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/dto/ServiceSearchRequest.java @@ -0,0 +1,13 @@ +package com.univ.tracedin.api.project.dto; + +import com.univ.tracedin.domain.project.ProjectKey; +import com.univ.tracedin.domain.project.ServiceSearchCondition; + +public record ServiceSearchRequest(String projectKey, String serviceName) { + public ServiceSearchCondition toCondition() { + return ServiceSearchCondition.builder() + .projectKey(ProjectKey.from(projectKey)) + .serviceName(serviceName) + .build(); + } +} diff --git a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/dto/TraceSearchRequest.java b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/dto/TraceSearchRequest.java index 72789ed..6fb1eee 100644 --- a/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/dto/TraceSearchRequest.java +++ b/tracedin-application/app-api/src/main/java/com/univ/tracedin/api/project/dto/TraceSearchRequest.java @@ -5,6 +5,7 @@ import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import com.univ.tracedin.domain.project.EndPointUrl; import com.univ.tracedin.domain.project.ProjectKey; import com.univ.tracedin.domain.project.TraceSearchCondition; @@ -18,8 +19,13 @@ public record TraceSearchRequest( LocalDateTime endTime) { public TraceSearchCondition toCondition() { - return new TraceSearchCondition( - new ProjectKey(projectKey), serviceName, endPointUrl, startTime, endTime); + return TraceSearchCondition.builder() + .projectKey(ProjectKey.from(projectKey)) + .serviceName(serviceName) + .endPointUrl(EndPointUrl.from(endPointUrl)) + .startTime(startTime) + .endTime(endTime) + .build(); } // 종료 시간만 있을 수 없도록 검증 diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsService.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsService.java index 7f7f1ae..2bd437d 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsService.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/metric/ServiceMetricsService.java @@ -8,7 +8,6 @@ @RequiredArgsConstructor public class ServiceMetricsService { - private final ServiceMetricsReader serviceMetricReader; private final ServiceMetricsMessagePublisher serviceMetricsMessagePublisher; public void appendMetrics(ServiceMetrics metrics) { diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/EndPointUrl.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/EndPointUrl.java new file mode 100644 index 0000000..8e6fcd5 --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/EndPointUrl.java @@ -0,0 +1,16 @@ +package com.univ.tracedin.domain.project; + +import java.util.regex.Pattern; + +public record EndPointUrl(String value) { + + private static final Pattern ENDPOINT_PATTERN = + Pattern.compile("^(https?://)([a-zA-Z0-9.-]+)(:[0-9]+)?(/[a-zA-Z0-9._-]+)*$"); + + public static EndPointUrl from(String url) { + if (url == null || !ENDPOINT_PATTERN.matcher(url).matches()) { + throw new IllegalArgumentException("Invalid endpoint URL format: " + url); + } + return new EndPointUrl(url); + } +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopology.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopology.java index 957e5f2..87d25f0 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopology.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopology.java @@ -7,20 +7,18 @@ import lombok.AllArgsConstructor; import lombok.Getter; +import com.univ.tracedin.domain.span.Topology; + @Getter -@AllArgsConstructor(access = AccessLevel.PRIVATE) -public class NetworkTopology { +@AllArgsConstructor +public final class NetworkTopology implements Topology { private final List nodes; private final List edges; - public static NetworkTopology of(List nodes, List edges) { - return new NetworkTopology(nodes, edges); - } - @Getter @AllArgsConstructor - public static class Node { + public static final class Node { private ProjectKey projectKey; private String name; @@ -62,7 +60,7 @@ public enum NodeType { @Getter @AllArgsConstructor(access = AccessLevel.PRIVATE) - public static class Edge { + public static final class Edge { private String source; private String target; diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopologyAnalyer.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopologyAnalyer.java new file mode 100644 index 0000000..9082fdc --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopologyAnalyer.java @@ -0,0 +1,8 @@ +package com.univ.tracedin.domain.project; + +import com.univ.tracedin.domain.span.Topology; + +public interface NetworkTopologyAnalyer { + + Topology analyze(Project projectKey); +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopologyBuilder.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopologyBuilder.java deleted file mode 100644 index 3b10d98..0000000 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/NetworkTopologyBuilder.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.univ.tracedin.domain.project; - -public interface NetworkTopologyBuilder { - - NetworkTopology build(Project projectKey); -} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectAnalyzer.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectAnalyzer.java index ac9c24e..70c6782 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectAnalyzer.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectAnalyzer.java @@ -1,7 +1,11 @@ package com.univ.tracedin.domain.project; +import java.util.List; + public interface ProjectAnalyzer { ProjectStatistic analyze( TraceSearchCondition cond, ProjectStatistic.StatisticsType statisticsType); + + List getEndpoints(ServiceSearchCondition cond); } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectService.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectService.java index 8b718ba..65f0d1d 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectService.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ProjectService.java @@ -10,6 +10,7 @@ import lombok.RequiredArgsConstructor; +import com.univ.tracedin.domain.span.Topology; import com.univ.tracedin.domain.user.User; import com.univ.tracedin.domain.user.UserId; import com.univ.tracedin.domain.user.UserReader; @@ -24,53 +25,58 @@ public class ProjectService { private final ProjectDeleter projectDeleter; private final ProjectValidator projectValidator; private final ProjectMemberManager projectMemberManager; - private final NetworkTopologyBuilder networkTopologyBuilder; + private final NetworkTopologyAnalyer networkTopologyAnalyer; private final ProjectAnalyzer projectAnalyzer; public ProjectKey create(UserId creatorId, ProjectInfo projectInfo) { - User user = userReader.read(creatorId); + final User user = userReader.read(creatorId); return projectAppender.append(user, projectInfo); } public List getProjectList(UserId userId) { - User user = userReader.read(userId); + final User user = userReader.read(userId); return projectReader.readAll(user); } public List getServiceNodeList(ProjectKey projectKey) { - Project project = projectReader.readByKey(projectKey); + final Project project = projectReader.readByKey(projectKey); return projectReader.readServiceNods(project); } - public NetworkTopology getNetworkTopology(ProjectKey projectKey) { - Project project = projectReader.readByKey(projectKey); - return networkTopologyBuilder.build(project); + public List getServiceEndpoints(ServiceSearchCondition cond) { + projectValidator.validate(cond.getProjectKey()); + return projectAnalyzer.getEndpoints(cond); + } + + public Topology getNetworkTopology(ProjectKey projectKey) { + final Project project = projectReader.readByKey(projectKey); + return networkTopologyAnalyer.analyze(project); } public ProjectStatistic getStatistics( TraceSearchCondition cond, StatisticsType statisticsType) { - projectValidator.validate(cond.projectKey()); + projectValidator.validate(cond.getProjectKey()); return projectAnalyzer.analyze(cond, statisticsType); } public void addMember(ProjectId projectId, String targetMemberEmail, MemberRole role) { - Project project = projectReader.read(projectId); - User targetUser = userReader.read(targetMemberEmail); + final Project project = projectReader.read(projectId); + final User targetUser = userReader.read(targetMemberEmail); projectMemberManager.add(project, targetUser, role); } public void removeMember(ProjectMemberId projectMemberId) { - ProjectMember projectMember = projectMemberManager.read(projectMemberId); + final ProjectMember projectMember = projectMemberManager.read(projectMemberId); projectMemberManager.remove(projectMember); } public void changeRole(ProjectMemberId projectMemberId, MemberRole role) { - ProjectMember projectMember = projectMemberManager.read(projectMemberId); + final ProjectMember projectMember = projectMemberManager.read(projectMemberId); projectMemberManager.changeRole(projectMember, role); } public void deleteProject(ProjectId projectId) { - Project project = projectReader.read(projectId); + final Project project = projectReader.read(projectId); projectMemberManager.removeAll(project); projectDeleter.delete(project); } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ServiceSearchCondition.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ServiceSearchCondition.java new file mode 100644 index 0000000..132c0ea --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/ServiceSearchCondition.java @@ -0,0 +1,26 @@ +package com.univ.tracedin.domain.project; + +import lombok.Builder; +import lombok.Getter; +import lombok.experimental.SuperBuilder; + +import io.micrometer.common.util.StringUtils; + +@Getter +@SuperBuilder +@Builder +public class ServiceSearchCondition { + protected ProjectKey projectKey; + protected String serviceName; + + protected ServiceSearchCondition(ProjectKey projectKey, String serviceName) { + this.projectKey = projectKey; + this.serviceName = serviceName; + } + + protected ServiceSearchCondition() {} + + public boolean hasServiceName() { + return StringUtils.isNotBlank(serviceName); + } +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/TraceSearchCondition.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/TraceSearchCondition.java index 0e3c455..40b3729 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/TraceSearchCondition.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/TraceSearchCondition.java @@ -3,14 +3,21 @@ import java.time.LocalDateTime; import java.time.ZoneId; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + import io.micrometer.common.util.StringUtils; -public record TraceSearchCondition( - ProjectKey projectKey, - String serviceName, - String endPointUrl, - LocalDateTime startTime, - LocalDateTime endTime) { +@Getter +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +public class TraceSearchCondition extends ServiceSearchCondition { + private EndPointUrl endPointUrl; + private LocalDateTime startTime; + private LocalDateTime endTime; public long getEpochMillisStartTime() { return startTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); @@ -20,15 +27,11 @@ public long getEpochMillisEndTime() { return endTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); } - public boolean hasServiceName() { - return StringUtils.isNotBlank(serviceName); - } - public boolean hasTimeRange() { return startTime != null && endTime != null; } public boolean hasEndPointUrl() { - return StringUtils.isNotBlank(endPointUrl); + return StringUtils.isNotBlank(endPointUrl.value()); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/exception/NetworkTopologyBuildException.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/exception/NetworkTopologyBuildException.java new file mode 100644 index 0000000..5b2a0e6 --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/exception/NetworkTopologyBuildException.java @@ -0,0 +1,13 @@ +package com.univ.tracedin.domain.project.exception; + +import com.univ.tracedin.common.exception.DomainException; + +public final class NetworkTopologyBuildException extends DomainException { + + public static final NetworkTopologyBuildException EXCEPTION = + new NetworkTopologyBuildException(); + + public NetworkTopologyBuildException() { + super(ProjectErrorCode.NETWORK_TOPOLOGY_BUILD_FAILED); + } +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/exception/ProjectErrorCode.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/exception/ProjectErrorCode.java index 9de9497..10f3813 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/exception/ProjectErrorCode.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/project/exception/ProjectErrorCode.java @@ -12,6 +12,7 @@ public enum ProjectErrorCode implements BaseErrorCode { PROJECT_NOT_FOUND(NOT_FOUND, "PROJECT_404_1", "해당 ID의 프로젝트를 찾지 못했습니다."), PROJECT_MEMBER_NOT_FOUND(NOT_FOUND, "PROJECT_MEMBER_404_1", "해당 ID의 프로젝트 멤버를 찾지 못했습니다."), PROJECT_KEY_NOT_FOUND(NOT_FOUND, "PROJECT_KEY_404_1", "해당 키의 프로젝트를 찾지 못했습니다."), + NETWORK_TOPOLOGY_BUILD_FAILED(500, "NETWORK_TOPOLOGY_500_1", "네트워크 토폴로지를 생성하는데 실패했습니다."), ; private final Integer status; private final String errorCode; diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/AbstractTopologyBuilder.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/AbstractTopologyBuilder.java new file mode 100644 index 0000000..2a7722f --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/AbstractTopologyBuilder.java @@ -0,0 +1,32 @@ +package com.univ.tracedin.domain.span; + +import java.util.HashMap; +import java.util.Map; + +import com.univ.tracedin.domain.project.NetworkTopology.Edge; +import com.univ.tracedin.domain.project.NetworkTopology.Node; +import com.univ.tracedin.domain.project.NetworkTopology.NodeType; +import com.univ.tracedin.domain.project.ProjectKey; + +public abstract class AbstractTopologyBuilder { + + protected static final String KAFKA_NODE_NAME = "Kafka"; + + public ProjectKey projectKey; + public Map nodes = new HashMap<>(); + public Map edges = new HashMap<>(); + + public AbstractTopologyBuilder projectKey(ProjectKey projectKey) { + this.projectKey = projectKey; + return this; + } + + protected void addNode(String nodeName, NodeType nodeType) { + nodes.computeIfAbsent(nodeName, name -> Node.of(projectKey, name, nodeType)); + } + + protected void addEdge(String source, String target) { + final String edgeKey = source + "->" + target; + edges.computeIfAbsent(edgeKey, k -> Edge.init(source, target)).incrementRequestCount(); + } +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/NetworkTopologyBuilder.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/NetworkTopologyBuilder.java new file mode 100644 index 0000000..13b0bb1 --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/NetworkTopologyBuilder.java @@ -0,0 +1,135 @@ +package com.univ.tracedin.domain.span; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.univ.tracedin.domain.project.NetworkTopology.NodeType; +import com.univ.tracedin.domain.project.ProjectKey; + +public final class NetworkTopologyBuilder extends AbstractTopologyBuilder { + + private List clientSpans; + private List serverSpans; + private List dbSpans; + private List producerSpans; + private List consumerSpans; + + @Override + public NetworkTopologyBuilder projectKey(ProjectKey projectKey) { + return (NetworkTopologyBuilder) super.projectKey(projectKey); + } + + public NetworkTopologyBuilder client(List clientSpans) { + this.clientSpans = clientSpans; + return this; + } + + public NetworkTopologyBuilder server(List serverSpans) { + this.serverSpans = serverSpans; + return this; + } + + public NetworkTopologyBuilder db(List dbSpans) { + this.dbSpans = dbSpans; + return this; + } + + public NetworkTopologyBuilder producer(List producerSpans) { + this.producerSpans = producerSpans; + return this; + } + + public NetworkTopologyBuilder consumer(List consumerSpans) { + this.consumerSpans = consumerSpans; + return this; + } + + public Topology build() { + if (clientSpans != null && serverSpans != null) { + buildServiceTopology(); + } + + if (hasKafka()) { + buildKafkaTopology(); + } + + if (hasDb()) { + buildDatabaseTopology(); + } + + return Topology.of(nodes, edges); + } + + private void buildServiceTopology() { + final Map> clientSpanMap = + clientSpans.stream() + .collect( + Collectors.groupingBy( + Span::getTraceId, + Collectors.toMap(Span::getId, Function.identity()))); + + for (Span serverSpan : serverSpans) { + final Map clientSpansInTrace = clientSpanMap.get(serverSpan.getTraceId()); + + if (clientSpansInTrace == null) { + continue; + } + + final Span clientSpan = clientSpansInTrace.get(serverSpan.getParentId()); + + if (clientSpan != null) { + final String sourceService = clientSpan.getServiceName(); + final String targetService = serverSpan.getServiceName(); + + addNode(sourceService, NodeType.SERVICE); + addNode(targetService, NodeType.SERVICE); + addEdge(sourceService, targetService); + } + } + } + + private void buildKafkaTopology() { + addNode(KAFKA_NODE_NAME, NodeType.KAFKA); + if (producerSpans != null && !producerSpans.isEmpty()) { + processKafkaSpans(producerSpans, true); + } + if (consumerSpans != null && !consumerSpans.isEmpty()) { + processKafkaSpans(consumerSpans, false); + } + } + + private void processKafkaSpans(List spans, boolean isProducer) { + for (Span span : spans) { + final String serviceName = span.getServiceName(); + addNode(serviceName, NodeType.SERVICE); + + if (isProducer) { + addEdge(serviceName, KAFKA_NODE_NAME); + } else { + addEdge(KAFKA_NODE_NAME, serviceName); + } + } + } + + private void buildDatabaseTopology() { + for (Span dbSpan : dbSpans) { + final String dbSystem = (String) dbSpan.getAttributes().data().get("db.system"); + final String serviceName = dbSpan.getServiceName(); + + addNode(serviceName, NodeType.SERVICE); + addNode(dbSystem, NodeType.DATABASE); + addEdge(serviceName, dbSystem); + } + } + + private boolean hasKafka() { + return (producerSpans != null && !producerSpans.isEmpty()) + || (consumerSpans != null && !consumerSpans.isEmpty()); + } + + private boolean hasDb() { + return dbSpans != null && !dbSpans.isEmpty(); + } +} diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanReader.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanReader.java index a88b0bf..bfb48fb 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanReader.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanReader.java @@ -1,6 +1,7 @@ package com.univ.tracedin.domain.span; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.springframework.stereotype.Component; @@ -21,6 +22,31 @@ public List read(ProjectKey projectKey, SpanType spanType, SpanKind spanKi return spanRepository.findByProjectKeyAndSpanKind(projectKey, spanType, spanKind); } + public CompletableFuture> readAsync( + ProjectKey projectKey, SpanType spanType, SpanKind spanKind) { + return CompletableFuture.completedFuture(read(projectKey, spanType, spanKind)); + } + + public CompletableFuture> readClientSpans(ProjectKey projectKey) { + return readAsync(projectKey, SpanType.HTTP, SpanKind.CLIENT); + } + + public CompletableFuture> readServerSpans(ProjectKey projectKey) { + return readAsync(projectKey, SpanType.HTTP, SpanKind.SERVER); + } + + public CompletableFuture> readProducerSpans(ProjectKey projectKey) { + return readAsync(projectKey, SpanType.UNKNOWN, SpanKind.PRODUCER); + } + + public CompletableFuture> readConsumerSpans(ProjectKey projectKey) { + return readAsync(projectKey, SpanType.UNKNOWN, SpanKind.CONSUMER); + } + + public CompletableFuture> readDbSpans(ProjectKey projectKey) { + return readAsync(projectKey, SpanType.QUERY, SpanKind.CLIENT); + } + public SearchResult read(TraceSearchCondition cond, SearchCursor cursor) { return spanRepository.findTracesByNode(cond, cursor); } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanRepository.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanRepository.java index 739c3b4..797873a 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanRepository.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanRepository.java @@ -4,8 +4,10 @@ import com.univ.tracedin.common.dto.SearchCursor; import com.univ.tracedin.common.dto.SearchResult; +import com.univ.tracedin.domain.project.EndPointUrl; import com.univ.tracedin.domain.project.HttpTps; import com.univ.tracedin.domain.project.ProjectKey; +import com.univ.tracedin.domain.project.ServiceSearchCondition; import com.univ.tracedin.domain.project.StatusCodeDistribution; import com.univ.tracedin.domain.project.TraceHipMap; import com.univ.tracedin.domain.project.TraceSearchCondition; @@ -30,4 +32,6 @@ List findByProjectKeyAndSpanKind( StatusCodeDistribution getStatusCodeDistribution(TraceSearchCondition cond); List getHttpTps(TraceSearchCondition cond); + + List getEndpoints(ServiceSearchCondition cond); } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanStatisticsAnalyzer.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanStatisticsAnalyzer.java index dd934af..92b692b 100644 --- a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanStatisticsAnalyzer.java +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/SpanStatisticsAnalyzer.java @@ -1,32 +1,26 @@ package com.univ.tracedin.domain.span; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; +import java.util.concurrent.CompletableFuture; import org.springframework.stereotype.Component; import lombok.RequiredArgsConstructor; -import com.univ.tracedin.domain.project.NetworkTopology; -import com.univ.tracedin.domain.project.NetworkTopology.Edge; -import com.univ.tracedin.domain.project.NetworkTopology.Node; -import com.univ.tracedin.domain.project.NetworkTopology.NodeType; -import com.univ.tracedin.domain.project.NetworkTopologyBuilder; +import com.univ.tracedin.domain.project.EndPointUrl; +import com.univ.tracedin.domain.project.NetworkTopologyAnalyer; import com.univ.tracedin.domain.project.Project; import com.univ.tracedin.domain.project.ProjectAnalyzer; -import com.univ.tracedin.domain.project.ProjectKey; import com.univ.tracedin.domain.project.ProjectStatistic; import com.univ.tracedin.domain.project.ProjectStatistic.StatisticsType; +import com.univ.tracedin.domain.project.ServiceSearchCondition; import com.univ.tracedin.domain.project.TraceSearchCondition; +import com.univ.tracedin.domain.project.exception.NetworkTopologyBuildException; @Component @RequiredArgsConstructor -public class SpanStatisticsAnalyzer implements ProjectAnalyzer, NetworkTopologyBuilder { +public class SpanStatisticsAnalyzer implements ProjectAnalyzer, NetworkTopologyAnalyer { - private static final String KAFKA_NODE_NAME = "KAFKA"; private final SpanReader spanReader; private final SpanRepository spanRepository; @@ -46,129 +40,39 @@ public ProjectStatistic analyze(TraceSearchCondition cond, StatisticsType sta } @Override - public NetworkTopology build(Project project) { - List clientSpans = - spanReader.read(project.getProjectKey(), SpanType.HTTP, SpanKind.CLIENT); - List serverSpans = - spanReader.read(project.getProjectKey(), SpanType.HTTP, SpanKind.SERVER); - List producerSpans = - spanReader.read(project.getProjectKey(), SpanType.UNKNOWN, SpanKind.PRODUCER); - List consumerSpans = - spanReader.read(project.getProjectKey(), SpanType.UNKNOWN, SpanKind.CONSUMER); - List dbSpans = - spanReader.read(project.getProjectKey(), SpanType.QUERY, SpanKind.CLIENT); - - Map nodeMap = new HashMap<>(); - Map edgeMap = new HashMap<>(); - - buildServiceNodesAndEdges( - project.getProjectKey(), clientSpans, serverSpans, nodeMap, edgeMap); - buildKafkaNodesAndEdges( - project.getProjectKey(), producerSpans, consumerSpans, nodeMap, edgeMap); - buildDatabaseNodesAndEdges(project.getProjectKey(), dbSpans, nodeMap, edgeMap); - - List nodes = nodeMap.values().stream().toList(); - List edges = edgeMap.values().stream().toList(); - - return NetworkTopology.of(nodes, edges); - } - - private void buildServiceNodesAndEdges( - ProjectKey projectKey, - List clientSpans, - List serverSpans, - Map nodeMap, - Map edgeMap) { - - Map> clientSpanMap = - clientSpans.stream() - .collect( - Collectors.groupingBy( - Span::getTraceId, - Collectors.toMap(Span::getId, Function.identity()))); - - for (Span serverSpan : serverSpans) { - Map clientSpansInTrace = clientSpanMap.get(serverSpan.getTraceId()); - if (clientSpansInTrace == null) { - continue; - } - - Span clientSpan = clientSpansInTrace.get(serverSpan.getParentId()); - if (clientSpan != null) { - String sourceService = clientSpan.getServiceName(); - String targetService = serverSpan.getServiceName(); - - nodeMap.computeIfAbsent( - sourceService, service -> Node.of(projectKey, service, NodeType.SERVICE)); - nodeMap.computeIfAbsent( - targetService, service -> Node.of(projectKey, service, NodeType.SERVICE)); - - String edgeKey = sourceService + "->" + targetService; - edgeMap.computeIfAbsent(edgeKey, k -> Edge.init(sourceService, targetService)) - .incrementRequestCount(); - } - } - } - - private void buildKafkaNodesAndEdges( - ProjectKey projectKey, - List producerSpans, - List consumerSpans, - Map nodeMap, - Map edgeMap) { - - // Kafka 노드 추가 - nodeMap.computeIfAbsent( - KAFKA_NODE_NAME, kafka -> Node.of(projectKey, kafka, NodeType.KAFKA)); - - // 프로듀서 스팬 처리 - for (Span producerSpan : producerSpans) { - String producerService = producerSpan.getServiceName(); - - nodeMap.computeIfAbsent( - producerService, service -> Node.of(projectKey, service, NodeType.SERVICE)); - - String edgeKey = producerService + "->" + KAFKA_NODE_NAME; - edgeMap.computeIfAbsent(edgeKey, k -> Edge.init(producerService, KAFKA_NODE_NAME)) - .incrementRequestCount(); - } - - // 컨슈머 스팬 처리 - for (Span consumerSpan : consumerSpans) { - String consumerService = consumerSpan.getServiceName(); - - nodeMap.computeIfAbsent( - consumerService, service -> Node.of(projectKey, service, NodeType.SERVICE)); - - String edgeKey = KAFKA_NODE_NAME + "->" + consumerService; - edgeMap.computeIfAbsent(edgeKey, k -> Edge.init(KAFKA_NODE_NAME, consumerService)) - .incrementRequestCount(); - } + public List getEndpoints(ServiceSearchCondition cond) { + return spanRepository.getEndpoints(cond); } - private void buildDatabaseNodesAndEdges( - ProjectKey projectKey, - List dbSpans, - Map nodeMap, - Map edgeMap) { - - for (Span dbSpan : dbSpans) { - Map attributes = dbSpan.getAttributes().data(); - String serviceName = dbSpan.getServiceName(); - String dbSystem = (String) attributes.get("db.system"); - - if (dbSystem == null) { - continue; // db.name 속성이 없으면 스킵 - } - - nodeMap.computeIfAbsent( - serviceName, service -> Node.of(projectKey, service, NodeType.SERVICE)); - - nodeMap.computeIfAbsent(dbSystem, db -> Node.of(projectKey, db, NodeType.DATABASE)); - - String edgeKey = serviceName + "->" + dbSystem; - edgeMap.computeIfAbsent(edgeKey, k -> Edge.init(serviceName, dbSystem)) - .incrementRequestCount(); - } + @Override + public Topology analyze(Project project) { + final var clientSpansFuture = spanReader.readClientSpans(project.getProjectKey()); + final var serverSpansFuture = spanReader.readServerSpans(project.getProjectKey()); + final var producerSpansFuture = spanReader.readProducerSpans(project.getProjectKey()); + final var consumerSpansFuture = spanReader.readConsumerSpans(project.getProjectKey()); + final var dbSpansFuture = spanReader.readDbSpans(project.getProjectKey()); + + return CompletableFuture.allOf( + clientSpansFuture, + serverSpansFuture, + producerSpansFuture, + consumerSpansFuture, + dbSpansFuture) + .thenApply( + v -> { + try { + return Topology.builder() + .projectKey(project.getProjectKey()) + .client(clientSpansFuture.join()) + .server(serverSpansFuture.join()) + .producer(producerSpansFuture.join()) + .consumer(consumerSpansFuture.join()) + .db(dbSpansFuture.join()) + .build(); + } catch (Exception e) { + throw NetworkTopologyBuildException.EXCEPTION; + } + }) + .join(); } } diff --git a/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/Topology.java b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/Topology.java new file mode 100644 index 0000000..2a4a2dc --- /dev/null +++ b/tracedin-domain/src/main/java/com/univ/tracedin/domain/span/Topology.java @@ -0,0 +1,23 @@ +package com.univ.tracedin.domain.span; + +import java.util.List; +import java.util.Map; + +import com.univ.tracedin.domain.project.NetworkTopology; +import com.univ.tracedin.domain.project.NetworkTopology.Edge; +import com.univ.tracedin.domain.project.NetworkTopology.Node; + +public interface Topology { + + static Topology of(Map nodes, Map edges) { + return of(List.copyOf(nodes.values()), List.copyOf(edges.values())); + } + + static Topology of(List nodes, List edges) { + return new NetworkTopology(nodes, edges); + } + + static NetworkTopologyBuilder builder() { + return new NetworkTopologyBuilder(); + } +} diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaMessageHelper.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaMessageHelper.java index 253e6fa..89604d4 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaMessageHelper.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaMessageHelper.java @@ -4,19 +4,20 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.kafka.support.SendResult; -import org.springframework.stereotype.Component; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -@Component @Slf4j -public class KafkaMessageHelper { +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public final class KafkaMessageHelper { - public CompletableFuture> getKafkaCallback(V payload) { + public static CompletableFuture> getKafkaCallback(V payload) { return new CompletableFuture<>() { @Override public boolean complete(SendResult value) { - RecordMetadata metadata = value.getRecordMetadata(); + final RecordMetadata metadata = value.getRecordMetadata(); log.info( "Kafka message sent successfully: Topic: {} Partition: {} Offset: {} Timestamp: {}", metadata.topic(), diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducerImpl.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducerImpl.java index 4fa8b03..6c9118d 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducerImpl.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/kafka/KafkaProducerImpl.java @@ -26,10 +26,10 @@ public KafkaProducerImpl(KafkaTemplate kafkaTemplate) { @Override public void send( String topicName, K key, V message, CompletableFuture> callback) { - log.info("Sending message={} to topic={}", message, topicName); + log.info("Sending message={} to topic={}", message.getClass().getSimpleName(), topicName); try { - CompletableFuture> kafkaResultFuture = + final CompletableFuture> kafkaResultFuture = kafkaTemplate.send(topicName, key, message); kafkaResultFuture.whenComplete( (result, ex) -> { diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/publish/ServiceMetricsKafkaPublisher.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/publish/ServiceMetricsKafkaPublisher.java index f77c93d..25492f3 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/publish/ServiceMetricsKafkaPublisher.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/metric/messaging/publish/ServiceMetricsKafkaPublisher.java @@ -17,7 +17,6 @@ public class ServiceMetricsKafkaPublisher implements ServiceMetricsMessagePublisher { private final KafkaProducer kafkaProducer; - private final KafkaMessageHelper kafkaMessageHelper; @Value("${kafka.topic.service-metrics}") private String serviceMetricsTopic; @@ -27,6 +26,6 @@ public void publish(ServiceMetricsCollectedEvent serviceMetricsCollectedEvent) { serviceMetricsTopic, serviceMetricsCollectedEvent.getKey(), serviceMetricsCollectedEvent, - kafkaMessageHelper.getKafkaCallback(serviceMetricsCollectedEvent)); + KafkaMessageHelper.getKafkaCallback(serviceMetricsCollectedEvent)); } } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/publisher/SpanKafkaEventPublisher.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/publisher/SpanKafkaEventPublisher.java index aa59ce1..5dbdf1b 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/publisher/SpanKafkaEventPublisher.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/messaging/publisher/SpanKafkaEventPublisher.java @@ -18,7 +18,6 @@ public class SpanKafkaEventPublisher implements SpanMessagePublisher { private final KafkaProducer kafkaProducer; - private final KafkaMessageHelper kafkaMessageHelper; @Value("${kafka.topic.span}") private String spanTopic; @@ -26,6 +25,6 @@ public class SpanKafkaEventPublisher implements SpanMessagePublisher { @Override public void publish(SpanCollectedEvent event) { kafkaProducer.send( - spanTopic, event.getKey(), event, kafkaMessageHelper.getKafkaCallback(event)); + spanTopic, event.getKey(), event, KafkaMessageHelper.getKafkaCallback(event)); } } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanCoreRepository.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanCoreRepository.java index d2250b7..9aede08 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanCoreRepository.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanCoreRepository.java @@ -9,8 +9,10 @@ import com.univ.tracedin.common.dto.SearchCursor; import com.univ.tracedin.common.dto.SearchResult; +import com.univ.tracedin.domain.project.EndPointUrl; import com.univ.tracedin.domain.project.HttpTps; import com.univ.tracedin.domain.project.ProjectKey; +import com.univ.tracedin.domain.project.ServiceSearchCondition; import com.univ.tracedin.domain.project.StatusCodeDistribution; import com.univ.tracedin.domain.project.StatusCodeDistribution.StatusCodeBucket; import com.univ.tracedin.domain.project.TraceHipMap; @@ -73,14 +75,14 @@ public List findByTraceId(TraceId traceId) { @Override public TraceHipMap getTraceHitMap(TraceSearchCondition cond) { - List traceHitMapByProjectKey = + final List traceHitMapByProjectKey = spanElasticSearchRepository.getTraceHitMapByProjectKey(cond); return TraceHipMap.from(traceHitMapByProjectKey); } @Override public StatusCodeDistribution getStatusCodeDistribution(TraceSearchCondition cond) { - List statusCodeDistribution = + final List statusCodeDistribution = spanElasticSearchRepository.getStatusCodeDistribution(cond); return StatusCodeDistribution.from(statusCodeDistribution); @@ -90,4 +92,9 @@ public StatusCodeDistribution getStatusCodeDistribution(TraceSearchCondition con public List getHttpTps(TraceSearchCondition cond) { return spanElasticSearchRepository.getHttpTps(cond); } + + @Override + public List getEndpoints(ServiceSearchCondition cond) { + return spanElasticSearchRepository.getEndpointUrls(cond); + } } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustom.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustom.java index bb1eb1c..815f27c 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustom.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustom.java @@ -4,7 +4,9 @@ import java.util.Map; import com.univ.tracedin.common.dto.SearchResult; +import com.univ.tracedin.domain.project.EndPointUrl; import com.univ.tracedin.domain.project.HttpTps; +import com.univ.tracedin.domain.project.ServiceSearchCondition; import com.univ.tracedin.domain.project.StatusCodeDistribution.StatusCodeBucket; import com.univ.tracedin.domain.project.TraceHipMap.EndTimeBucket; import com.univ.tracedin.domain.project.TraceSearchCondition; @@ -29,4 +31,6 @@ SearchResult searchTracesByNode( List getStatusCodeDistribution(TraceSearchCondition cond); List getHttpTps(TraceSearchCondition cond); + + List getEndpointUrls(ServiceSearchCondition cond); } diff --git a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustomImpl.java b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustomImpl.java index 68fe2a1..c0d6e51 100644 --- a/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustomImpl.java +++ b/tracedin-infra/src/main/java/com/univ/tracedin/infra/span/repository/SpanElasticSearchRepositoryCustomImpl.java @@ -14,7 +14,9 @@ import lombok.extern.slf4j.Slf4j; import com.univ.tracedin.common.dto.SearchResult; +import com.univ.tracedin.domain.project.EndPointUrl; import com.univ.tracedin.domain.project.HttpTps; +import com.univ.tracedin.domain.project.ServiceSearchCondition; import com.univ.tracedin.domain.project.StatusCodeDistribution.StatusCodeBucket; import com.univ.tracedin.domain.project.TraceHipMap.EndTimeBucket; import com.univ.tracedin.domain.project.TraceHipMap.ResponseTimeBucket; @@ -52,14 +54,14 @@ @RequiredArgsConstructor public class SpanElasticSearchRepositoryCustomImpl implements SpanElasticSearchRepositoryCustom { - private final String INDEX_NAME = "span"; - private final String SERVICE_AGGREGATION_KEY = "service_nodes"; + private static final String INDEX_NAME = "span"; + private static final String SERVICE_AGGREGATION_KEY = "service_nodes"; private final ElasticsearchClient client; @Override public List search(String projectKey, SpanType spanType, SpanKind spanKind) { - Query spanQuery = createSpanQuery(projectKey, spanType, spanKind); + final Query spanQuery = createSpanQuery(projectKey, spanType, spanKind); return executeESQuery( () -> @@ -78,7 +80,7 @@ public List search(String projectKey, SpanType spanType, SpanKind public List findServiceNames(String projectKey) { return executeESQuery( () -> { - SearchResponse response = + final SearchResponse response = client.search( s -> s.index(INDEX_NAME) @@ -112,12 +114,12 @@ public SearchResult searchTracesByNode( TraceSearchCondition cond, int size, Map afterKey) { return executeESQuery( () -> { - SearchResponse response = + final SearchResponse response = client.search( s -> s.index(INDEX_NAME) .size(0) - .query(createServiceSpanQuery(cond)) + .query(createSearchQuery(cond)) .aggregations( "by_trace", createTraceAggregation(size, afterKey)) @@ -131,23 +133,23 @@ public SearchResult searchTracesByNode( "traceId")))), Void.class); - CompositeAggregate spansByTrace = + final CompositeAggregate spansByTrace = response.aggregations().get("by_trace").composite(); - List traces = extractTraces(spansByTrace); - long traceCount = + final List traces = extractTraces(spansByTrace); + final long traceCount = response.aggregations().get("trace_count").cardinality().value(); - Map nextAfterKey = toObjectMap(spansByTrace.afterKey()); + final Map nextAfterKey = toObjectMap(spansByTrace.afterKey()); return SearchResult.success(traces, nextAfterKey, traceCount); }); } @Override public List findByTraceId(String traceId) { - Query query = QueryBuilders.term(t -> t.field("traceId").value(traceId)); + final Query query = QueryBuilders.term(t -> t.field("traceId").value(traceId)); return executeESQuery( () -> { - SearchResponse response = + final SearchResponse response = client.search( s -> s.index(INDEX_NAME).size(100).query(query), SpanDocument.class); @@ -160,12 +162,12 @@ public List findByTraceId(String traceId) { public List getTraceHitMapByProjectKey(TraceSearchCondition cond) { return executeESQuery( () -> { - SearchResponse response = + final SearchResponse response = client.search( s -> s.index(INDEX_NAME) .size(0) - .query(createServiceSpanQuery(cond)) + .query(createSearchQuery(cond)) .aggregations( "by_end_time", histogramAggregation()), Void.class); @@ -178,12 +180,12 @@ public List getTraceHitMapByProjectKey(TraceSearchCondition cond) public List getStatusCodeDistribution(TraceSearchCondition cond) { return executeESQuery( () -> { - SearchResponse response = + final SearchResponse response = client.search( s -> s.index(INDEX_NAME) .size(0) - .query(createServiceSpanQuery(cond)) + .query(createSearchQuery(cond)) .aggregations( "nested_attributes", statusCodeDistributionAggregation()), @@ -196,12 +198,12 @@ public List getStatusCodeDistribution(TraceSearchCondition con public List getHttpTps(TraceSearchCondition cond) { return executeESQuery( () -> { - SearchResponse response = + final SearchResponse response = client.search( s -> s.index(INDEX_NAME) .size(0) - .query(createServiceSpanQuery(cond)) + .query(createSearchQuery(cond)) .aggregations("http_tps", httpTpsHistogram()), Void.class); @@ -209,7 +211,58 @@ public List getHttpTps(TraceSearchCondition cond) { }); } - private Aggregation httpTpsHistogram() { + @Override + public List getEndpointUrls(ServiceSearchCondition cond) { + return executeESQuery( + () -> { + final SearchResponse response = + client.search( + s -> + s.index(INDEX_NAME) + .size(0) + .query(createSearchQuery(cond)) + .aggregations( + "nested_attributes", + endpointsAggregation()), + Void.class); + + return parseEndpointUrls(response); + }); + } + + private static List parseEndpointUrls(SearchResponse response) { + return response + .aggregations() + .get("nested_attributes") + .nested() + .aggregations() + .get("http_url_agg") + .sterms() + .buckets() + .array() + .stream() + .map(bucket -> EndPointUrl.from(bucket.key().stringValue())) + .toList(); + } + + private static Aggregation endpointsAggregation() { + return Aggregation.of( + a -> + a.nested(n -> n.path("attributes")) + .aggregations( + Map.of( + "http_url_agg", + Aggregation.of( + agg -> + agg.terms( + t -> + t.field( + "attributes.data.http.url.keyword") + .size( + 100)))))); + } + + private static Aggregation httpTpsHistogram() { return Aggregation.of( agg -> agg.dateHistogram( @@ -242,8 +295,8 @@ private Aggregation httpTpsHistogram() { "params.count / 600")))))))); } - private List parseHttpTpsResponse(SearchResponse response) { - List httpTpsList = new ArrayList<>(); + private static List parseHttpTpsResponse(SearchResponse response) { + final List httpTpsList = new ArrayList<>(); response.aggregations() .get("http_tps") .dateHistogram() @@ -251,15 +304,16 @@ private List parseHttpTpsResponse(SearchResponse response) { .array() .forEach( bucket -> { - long startEpochMillis = bucket.key(); - double tps = bucket.aggregations().get("tps").simpleValue().value(); + final long startEpochMillis = bucket.key(); + final double tps = + bucket.aggregations().get("tps").simpleValue().value(); httpTpsList.add(HttpTps.of(startEpochMillis, tps)); }); httpTpsList.sort(Comparator.comparing(HttpTps::timestamp).reversed()); return httpTpsList; } - private Aggregation statusCodeDistributionAggregation() { + private static Aggregation statusCodeDistributionAggregation() { return Aggregation.of( a -> a.nested(n -> n.path("attributes")) @@ -283,12 +337,12 @@ private Aggregation statusCodeDistributionAggregation() { 10)))))); } - private List parseStatusCodeDistributionResponse( + private static List parseStatusCodeDistributionResponse( SearchResponse response) { - List statusCodeBuckets = new ArrayList<>(); + final List statusCodeBuckets = new ArrayList<>(); // 어그리게이션 이름을 통해 결과 추출 - NestedAggregate nestedAttributes = + final NestedAggregate nestedAttributes = response.aggregations().get("nested_attributes").nested(); nestedAttributes @@ -299,40 +353,41 @@ private List parseStatusCodeDistributionResponse( .array() .forEach( bucket -> { - String statusCode = bucket.key().stringValue(); - long count = bucket.docCount(); - StatusCodeBucket statusCodeBucket = + final String statusCode = bucket.key().stringValue(); + final long count = bucket.docCount(); + final StatusCodeBucket statusCodeBucket = StatusCodeBucket.of(statusCode, count); statusCodeBuckets.add(statusCodeBucket); }); return statusCodeBuckets; } - private List parseHistogramResponse(SearchResponse response) { - List endTimeBuckets = new ArrayList<>(); + private static List parseHistogramResponse(SearchResponse response) { + final List endTimeBuckets = new ArrayList<>(); - Aggregate byEndTimeAgg = response.aggregations().get("by_end_time"); - DateHistogramAggregate dateHistogram = byEndTimeAgg.dateHistogram(); + final Aggregate byEndTimeAgg = response.aggregations().get("by_end_time"); + final DateHistogramAggregate dateHistogram = byEndTimeAgg.dateHistogram(); for (DateHistogramBucket dateBucket : dateHistogram.buckets().array()) { - long endEpochMillis = dateBucket.key(); + final long endEpochMillis = dateBucket.key(); - Map attributesNested = + final Map attributesNested = dateBucket.aggregations().get("attributes_nested").nested().aggregations(); - HistogramAggregate histogram = attributesNested.get("by_response_time").histogram(); + final HistogramAggregate histogram = + attributesNested.get("by_response_time").histogram(); - List responseTimeBuckets = new ArrayList<>(); + final List responseTimeBuckets = new ArrayList<>(); for (HistogramBucket histogramBucket : histogram.buckets().array()) { - double responseTime = histogramBucket.key(); - long count = histogramBucket.docCount(); - ResponseTimeBucket responseTimeBucket = + final double responseTime = histogramBucket.key(); + final long count = histogramBucket.docCount(); + final ResponseTimeBucket responseTimeBucket = ResponseTimeBucket.from(responseTime, count); responseTimeBuckets.add(responseTimeBucket); } - long errorCount = attributesNested.get("error_status_count").filter().docCount(); - EndTimeBucket endTimeBucket = + final long errorCount = attributesNested.get("error_status_count").filter().docCount(); + final EndTimeBucket endTimeBucket = EndTimeBucket.from(endEpochMillis, errorCount, responseTimeBuckets); endTimeBuckets.add(endTimeBucket); } @@ -340,8 +395,8 @@ private List parseHistogramResponse(SearchResponse response return endTimeBuckets; } - private Aggregation histogramAggregation() { - Aggregation byResponseTimeAgg = + private static Aggregation histogramAggregation() { + final Aggregation byResponseTimeAgg = Aggregation.of( a -> a.histogram( @@ -350,7 +405,7 @@ private Aggregation histogramAggregation() { .interval(200.0) .minDocCount(1))); - Aggregation errorStatusCountAgg = + final Aggregation errorStatusCountAgg = Aggregation.of( a -> a.filter( @@ -361,7 +416,7 @@ private Aggregation histogramAggregation() { "attributes.data.http.status_code") .gte(JsonData.of(400))))); - Aggregation attributesNestedAgg = + final Aggregation attributesNestedAgg = Aggregation.of( a -> a.nested(n -> n.path("attributes")) @@ -382,7 +437,7 @@ private Aggregation histogramAggregation() { .aggregations(Map.of("attributes_nested", attributesNestedAgg))); } - private Query nestedHttpUrlTermQuery(String endPointUrl) { + private static Query nestedHttpUrlTermQuery(String endPointUrl) { return QueryBuilders.nested( n -> n.path("attributes") @@ -393,14 +448,14 @@ private Query nestedHttpUrlTermQuery(String endPointUrl) { .value(endPointUrl)))); } - private List extractTraces(CompositeAggregate spansByTrace) { - List traces = new ArrayList<>(); + private static List extractTraces(CompositeAggregate spansByTrace) { + final List traces = new ArrayList<>(); spansByTrace .buckets() .array() .forEach( bucket -> { - TopHitsAggregate spanDetails = + final TopHitsAggregate spanDetails = bucket.aggregations().get("span_details").topHits(); spanDetails .hits() @@ -410,7 +465,7 @@ private List extractTraces(CompositeAggregate spansByTrace) { if (hit.source() == null) { return; } - Trace trace = + final Trace trace = mapToTrace( hit.source() .toJson() @@ -422,7 +477,7 @@ private List extractTraces(CompositeAggregate spansByTrace) { return traces; } - private Aggregation createTraceAggregation(int size, Map afterKey) { + private static Aggregation createTraceAggregation(int size, Map afterKey) { return Aggregation.of( a -> a.composite( @@ -482,42 +537,42 @@ private Aggregation createTraceAggregation(int size, Map afterKe "attributes.data.anomaly"))))))); } - private Query createServiceSpanQuery(TraceSearchCondition cond) { - Builder bool = QueryBuilders.bool(); + private static Query createSearchQuery(ServiceSearchCondition cond) { + final Builder bool = QueryBuilders.bool(); if (cond.hasServiceName()) { - bool.must(QueryBuilders.term(t -> t.field("serviceName").value(cond.serviceName()))); + bool.must(QueryBuilders.term(t -> t.field("serviceName").value(cond.getServiceName()))); - if (cond.hasEndPointUrl()) { - bool.must(nestedHttpUrlTermQuery(cond.endPointUrl())); + if (cond instanceof TraceSearchCondition traceCond && traceCond.hasEndPointUrl()) { + bool.must(nestedHttpUrlTermQuery(traceCond.getEndPointUrl().value())); } } - if (cond.hasTimeRange()) { + if (cond instanceof TraceSearchCondition traceCond && traceCond.hasTimeRange()) { bool.must( QueryBuilders.range( r -> r.field("startEpochMillis") - .gte(JsonData.of(cond.getEpochMillisStartTime()))), + .gte(JsonData.of(traceCond.getEpochMillisStartTime()))), QueryBuilders.range( r -> r.field("endEpochMillis") - .lte(JsonData.of(cond.getEpochMillisEndTime())))); + .lte(JsonData.of(traceCond.getEpochMillisEndTime())))); } bool.must( - QueryBuilders.term(t -> t.field("projectKey").value(cond.projectKey().value())), + QueryBuilders.term(t -> t.field("projectKey").value(cond.getProjectKey().value())), QueryBuilders.term(t -> t.field("spanType").value("HTTP")), QueryBuilders.term(t -> t.field("kind").value("SERVER"))); return bool.build()._toQuery(); } - private Aggregation createServiceNameAggregation() { + private static Aggregation createServiceNameAggregation() { return Aggregation.of(a -> a.terms(t -> t.field("serviceName").size(100))); } - private Query createSpanQuery(String projectKey, SpanType spanType, SpanKind spanKind) { + private static Query createSpanQuery(String projectKey, SpanType spanType, SpanKind spanKind) { return BoolQuery.of( b -> b.must( @@ -544,7 +599,7 @@ private Query createSpanQuery(String projectKey, SpanType spanType, SpanKind spa ._toQuery(); } - private Map toFieldValueMap(Map afterKey) { + private static Map toFieldValueMap(Map afterKey) { return afterKey.entrySet().stream() .map( entry -> { @@ -568,13 +623,13 @@ private Map toFieldValueMap(Map afterKey) { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - private Map toObjectMap(Map afterKey) { + private static Map toObjectMap(Map afterKey) { return afterKey.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue()._get())); } - private Trace mapToTrace(JsonObject source) { - JsonObject data = source.getJsonObject("attributes").getJsonObject("data"); + private static Trace mapToTrace(JsonObject source) { + final JsonObject data = source.getJsonObject("attributes").getJsonObject("data"); return Trace.builder() .id(TraceId.from(source.getString("traceId")))