From 1de43e941ce6427daa7ee80aaad5f65e8d702a6b Mon Sep 17 00:00:00 2001 From: bufapiqi Date: Tue, 23 Jul 2024 16:02:04 +0800 Subject: [PATCH] fix(interactive): Standardized log for sls metric (#4073) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - support pass upstream traceId to engine,log trace id to info and metric log - print critical infos in json format which allows log framework like SLS to parse. --- interactive_engine/common/pom.xml | 1 + .../groot/common/constant/LogConstant.java | 50 +++ .../graphscope/groot/common/util/Utils.java | 63 ++++ interactive_engine/compiler/pom.xml | 4 + .../gremlin/plugin/QueryLogger.java | 84 ++++- .../gremlin/plugin/QueryStatusCallback.java | 89 +++++- .../processor/IrStandardOpProcessor.java | 77 ++++- .../processor/AbstractResultProcessor.java | 9 +- .../resultx/GremlinResultProcessor.java | 7 +- .../java/com/alibaba/pegasus/RpcClient.java | 4 +- .../engine/pegasus/pegasus/src/lib.rs | 12 +- .../engine/pegasus/pegasus/src/worker.rs | 7 +- .../executor/engine/pegasus/server/src/rpc.rs | 6 +- interactive_engine/groot-client/pom.xml | 3 +- .../graphscope/groot/sdk/GrootClient.java | 298 ++++++++++++++++++ .../graphscope/groot/sdk/RequestOptions.java | 89 ++++++ .../graphscope/groot/sdk/schema/Schema.java | 17 +- .../groot/coordinator/SchemaManager.java | 8 +- .../groot/frontend/ClientWriteService.java | 36 ++- .../groot/frontend/GrootDdlService.java | 10 +- .../groot/frontend/write/GraphWriter.java | 4 + .../groot/frontend/write/KafkaAppender.java | 7 + .../groot/operation/OperationBatch.java | 23 +- .../groot/operation/StoreDataBatch.java | 29 +- .../groot/schema/request/DdlRequestBatch.java | 23 +- .../groot/store/KafkaProcessor.java | 1 + .../graphscope/groot/store/StoreService.java | 25 +- proto/ddl_service.proto | 31 +- proto/groot/sdk/model.proto | 2 + proto/groot/store_write_service.proto | 1 + proto/request_option.proto | 34 ++ proto/write_service.proto | 3 + 32 files changed, 988 insertions(+), 69 deletions(-) create mode 100644 interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/constant/LogConstant.java create mode 100644 interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/Utils.java create mode 100644 interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/RequestOptions.java create mode 100644 proto/request_option.proto diff --git a/interactive_engine/common/pom.xml b/interactive_engine/common/pom.xml index bce589c7aca1..38c3464d449a 100644 --- a/interactive_engine/common/pom.xml +++ b/interactive_engine/common/pom.xml @@ -120,6 +120,7 @@ schema_common.proto ddl_service.proto write_service.proto + request_option.proto diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/constant/LogConstant.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/constant/LogConstant.java new file mode 100644 index 000000000000..bc5f8831719a --- /dev/null +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/constant/LogConstant.java @@ -0,0 +1,50 @@ +package com.alibaba.graphscope.groot.common.constant; + +public class LogConstant { + + public static String TRACE_ID = "traceId"; + + public static String UPSTREAM_ID = "upstreamId"; + + public static String QUERY_ID = "queryId"; + + /** + * 具体查询语句 + */ + public static String QUERY = "query"; + + public static String SUCCESS = "success"; + + public static String ERROR_MESSAGE = "errorMessage"; + + public static String STACK_TRACE = "stackTrace"; + + /** + * 查询计划 + */ + public static String IR_PLAN = "irPlan"; + + /** + * 打印日志的阶段 + * query: java/rust + * write: writeKafka/consumeKafka/writeDb + */ + public static String STAGE = "stage"; + + public static String RESULT = "result"; + + public static String COST = "cost"; + + public static String START_TIME = "startMillis"; + + public static String END_TIME = "endMillis"; + + /** + * 日志类型: query/write + */ + public static String LOG_TYPE = "logType"; + + public static String BATCH_SIZE = "batchSize"; + + public static String PARTITION_ID = "partitionId"; +} diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/Utils.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/Utils.java new file mode 100644 index 000000000000..e31447be3552 --- /dev/null +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/util/Utils.java @@ -0,0 +1,63 @@ +package com.alibaba.graphscope.groot.common.util; + +import com.alibaba.graphscope.groot.common.constant.LogConstant; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Utils { + + private static final ObjectMapper jsonMapper = new ObjectMapper(); + private static final Logger defaultLogger = LoggerFactory.getLogger(Utils.class); + + /** + * build metric json log for monitor + * plz dont delete any field + * could add new field + * @param isSuccess + * @param traceId + * @param batchSize + * @param partitionId + * @param cost + * @param endTime + * @param stage + * @param logType + * @return + */ + public static String buildMetricJsonLog( + boolean isSuccess, + String traceId, + Integer batchSize, + Integer partitionId, + long cost, + Long endTime, + String stage, + String logType) { + String jsonLog = ""; + ObjectNode metricJsonLog = jsonMapper.createObjectNode(); + metricJsonLog.put(LogConstant.TRACE_ID, traceId); + metricJsonLog.put(LogConstant.SUCCESS, isSuccess); + if (batchSize != null) { + metricJsonLog.put(LogConstant.BATCH_SIZE, batchSize); + } + if (partitionId != null) { + metricJsonLog.put(LogConstant.PARTITION_ID, partitionId); + } + if (endTime != null) { + metricJsonLog.put(LogConstant.END_TIME, endTime); + } + metricJsonLog.put(LogConstant.COST, cost); + if (stage != null) { + metricJsonLog.put(LogConstant.STAGE, stage); + } + metricJsonLog.put(LogConstant.LOG_TYPE, logType); + try { + jsonLog = jsonMapper.writeValueAsString(metricJsonLog); + } catch (Exception e) { + defaultLogger.error("JsonProcessingException!", e); + } + return jsonLog; + } +} diff --git a/interactive_engine/compiler/pom.xml b/interactive_engine/compiler/pom.xml index fe5ed32f9566..124837e7de22 100644 --- a/interactive_engine/compiler/pom.xml +++ b/interactive_engine/compiler/pom.xml @@ -146,6 +146,10 @@ io.opentelemetry opentelemetry-api + + io.opentelemetry + opentelemetry-sdk + io.opentelemetry.semconv diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java index c2fd396e9614..6d286af496a0 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryLogger.java @@ -16,6 +16,12 @@ package com.alibaba.graphscope.gremlin.plugin; +import com.alibaba.graphscope.groot.common.constant.LogConstant; +import com.alibaba.graphscope.groot.common.util.Utils; +import com.google.gson.JsonObject; + +import io.opentelemetry.api.trace.Span; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,9 +34,25 @@ public class QueryLogger { private final String query; private final BigInteger queryId; + /** + * 上游带下来的traceId + */ + private final String upstreamId; + + private String irPlan; + public QueryLogger(String query, BigInteger queryId) { this.query = query; this.queryId = queryId; + this.irPlan = null; + this.upstreamId = null; + } + + public QueryLogger(String query, BigInteger queryId, String upstreamId) { + this.query = query; + this.queryId = queryId; + this.upstreamId = upstreamId; + this.irPlan = null; } public void debug(String format, Object... args) { @@ -49,13 +71,61 @@ public void error(String format, Object... args) { defaultLogger.error(this + " : " + format, args); } + public void error(Throwable throwable) { + JsonObject errorJson = new JsonObject(); + String traceId = Span.current().getSpanContext().getTraceId(); + if (this.upstreamId != null) { + errorJson.addProperty(LogConstant.UPSTREAM_ID, this.upstreamId); + } + errorJson.addProperty(LogConstant.TRACE_ID, traceId); + errorJson.addProperty(LogConstant.SUCCESS, false); + errorJson.addProperty(LogConstant.STAGE, "java"); + errorJson.addProperty(LogConstant.LOG_TYPE, "query"); + errorJson.addProperty(LogConstant.ERROR_MESSAGE, throwable.getMessage()); + defaultLogger.error(errorJson.toString(), throwable); + } + + public void print(String message, boolean success, Throwable t) { + if (success) { + defaultLogger.info(message); + } else { + defaultLogger.error(message, t); + } + } + public void metricsInfo(String format, Object... args) { metricLogger.info(queryId + " | " + query + " | " + format, args); } + public void metricsInfo(boolean isSucceed, long cost) { + String traceId = Span.current().getSpanContext().getTraceId(); + String metricJson = + Utils.buildMetricJsonLog( + isSucceed, + traceId, + null, + null, + cost, + System.currentTimeMillis(), + "java", + "query"); + metricLogger.info(metricJson); + } + @Override public String toString() { - return "[" + "query='" + query + '\'' + ", queryId=" + queryId + ']'; + StringBuilder str = new StringBuilder(); + str.append("["); + if (this.upstreamId != null) { + str.append("upstreamId=").append(this.upstreamId).append(", "); + } + str.append("query='") + .append(this.query) + .append("'") + .append(", queryId=") + .append(this.queryId) + .append("]"); + return str.toString(); } public String getQuery() { @@ -65,4 +135,16 @@ public String getQuery() { public BigInteger getQueryId() { return queryId; } + + public void setIrPlan(String irPlan) { + this.irPlan = irPlan; + } + + public String getUpstreamId() { + return upstreamId; + } + + public String getIrPlan() { + return irPlan; + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java index 5d417d3858b8..3ed116befb19 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/QueryStatusCallback.java @@ -16,50 +16,113 @@ package com.alibaba.graphscope.gremlin.plugin; -import static io.opentelemetry.api.common.AttributeKey.*; +import com.alibaba.graphscope.groot.common.constant.LogConstant; +import com.alibaba.graphscope.groot.common.util.JSON; +import com.google.gson.JsonObject; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.trace.Span; -import org.apache.commons.lang3.StringUtils; import org.checkerframework.checker.nullness.qual.Nullable; +import java.util.List; + public class QueryStatusCallback { private final MetricsCollector metricsCollector; private final QueryLogger queryLogger; private LongHistogram queryHistogram; + // if query cost large than threshold, will print detail log + private long printThreshold; public QueryStatusCallback( - MetricsCollector metricsCollector, LongHistogram histogram, QueryLogger queryLogger) { + MetricsCollector metricsCollector, + LongHistogram histogram, + QueryLogger queryLogger, + long printThreshold) { this.metricsCollector = metricsCollector; this.queryLogger = queryLogger; this.queryHistogram = histogram; + this.printThreshold = printThreshold; } public void onStart() {} - public void onEnd(boolean isSucceed, @Nullable String msg) { + public void onErrorEnd(@Nullable String msg) { + this.metricsCollector.stop(); + onErrorEnd(null, msg); + } + + public void onErrorEnd(@Nullable Throwable t) { this.metricsCollector.stop(); - if (isSucceed) { - queryLogger.info("total execution time is {} ms", metricsCollector.getElapsedMillis()); + onErrorEnd(t, null); + } + + private void onErrorEnd(Throwable t, String msg) { + String errorMsg = msg; + if (t != null) { + errorMsg = t.getMessage(); } + JsonObject logJson = buildSimpleLog(false, metricsCollector.getElapsedMillis()); + fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis(), null); + queryLogger.print(logJson.toString(), false, t); Attributes attrs = Attributes.builder() .put("id", queryLogger.getQueryId().toString()) .put("query", queryLogger.getQuery()) - .put("success", isSucceed) + .put("success", false) .put("message", msg != null ? msg : "") .build(); this.queryHistogram.record(metricsCollector.getElapsedMillis(), attrs); + queryLogger.metricsInfo(false, metricsCollector.getElapsedMillis()); + } + + public void onSuccessEnd(List results) { + this.metricsCollector.stop(); + JsonObject logJson = buildSimpleLog(true, metricsCollector.getElapsedMillis()); + if (this.metricsCollector.getElapsedMillis() > this.printThreshold) { + fillLogDetail(logJson, null, metricsCollector.getStartMillis(), results); + } + queryLogger.print(logJson.toString(), true, null); - queryLogger.metricsInfo( - "{} | {} | {} | {}", - isSucceed, - metricsCollector.getElapsedMillis(), - metricsCollector.getStartMillis(), - msg != null ? msg : StringUtils.EMPTY); + Attributes attrs = + Attributes.builder() + .put("id", queryLogger.getQueryId().toString()) + .put("query", queryLogger.getQuery()) + .put("success", true) + .put("message", "") + .build(); + this.queryHistogram.record(metricsCollector.getElapsedMillis(), attrs); + queryLogger.metricsInfo(true, metricsCollector.getElapsedMillis()); + } + + private JsonObject buildSimpleLog(boolean isSucceed, long elaspedMillis) { + String traceId = Span.current().getSpanContext().getTraceId(); + JsonObject simpleJson = new JsonObject(); + simpleJson.addProperty(LogConstant.TRACE_ID, traceId); + simpleJson.addProperty(LogConstant.QUERY_ID, queryLogger.getQueryId()); + simpleJson.addProperty(LogConstant.SUCCESS, isSucceed); + if (queryLogger.getUpstreamId() != null) { + simpleJson.addProperty(LogConstant.UPSTREAM_ID, queryLogger.getUpstreamId()); + } + simpleJson.addProperty(LogConstant.COST, elaspedMillis); + return simpleJson; + } + + private void fillLogDetail( + JsonObject logJson, String errorMessage, long startMillis, List results) { + logJson.addProperty(LogConstant.QUERY, queryLogger.getQuery()); + if (results != null) { + logJson.addProperty(LogConstant.RESULT, JSON.toJson(results)); + } + if (errorMessage != null) { + logJson.addProperty(LogConstant.ERROR_MESSAGE, errorMessage); + } + logJson.addProperty(LogConstant.IR_PLAN, queryLogger.getIrPlan()); + logJson.addProperty(LogConstant.STAGE, "java"); + logJson.addProperty(LogConstant.START_TIME, startMillis); } public QueryLogger getQueryLogger() { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java index 733ac40bd3cf..4f0025743d86 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java @@ -60,11 +60,9 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.*; import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.trace.IdGenerator; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; @@ -84,6 +82,8 @@ import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor; import org.apache.tinkerpop.gremlin.structure.Graph; import org.codehaus.groovy.control.MultipleCompilationErrorsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.math.BigInteger; @@ -99,6 +99,8 @@ import javax.script.SimpleBindings; public class IrStandardOpProcessor extends StandardOpProcessor { + private static final Logger defaultLogger = + LoggerFactory.getLogger(IrStandardOpProcessor.class); protected final Graph graph; protected final GraphTraversalSource g; protected final Configs configs; @@ -113,6 +115,13 @@ public class IrStandardOpProcessor extends StandardOpProcessor { protected final ExecutionClient executionClient; Tracer tracer; LongHistogram queryHistogram; + /** + * for success query + * Print if the threshold is exceeded + */ + private long printThreshold; + + private IdGenerator opentelemetryIdGenerator; public IrStandardOpProcessor( Configs configs, @@ -138,6 +147,8 @@ public IrStandardOpProcessor( this.idGenerator = idGenerator; this.queryCache = queryCache; this.executionClient = executionClient; + this.printThreshold = Long.parseLong(configs.get("query.print.threshold.ms", "200")); + this.opentelemetryIdGenerator = IdGenerator.random(); initTracer(); initMetrics(); } @@ -151,6 +162,12 @@ protected void evalOpInternal( GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get(); Map args = msg.getArgs(); String script = (String) args.get("gremlin"); + Map bindings = + args.get("bindings") == null ? null : (Map) args.get("bindings"); + String upTraceId = + (bindings == null || bindings.get("X-Trace-ID") == null) + ? null + : String.valueOf(bindings.get("X-Trace-ID")); String defaultValidateQuery = "''"; // ad-hoc handling for connection validation @@ -170,7 +187,7 @@ protected void evalOpInternal( && irMeta.getSchema().getEdgeList().isEmpty()) { language = AntlrGremlinScriptEngineFactory.LANGUAGE_NAME; } - QueryStatusCallback statusCallback = createQueryStatusCallback(script, jobId); + QueryStatusCallback statusCallback = createQueryStatusCallback(script, jobId, upTraceId); QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout()); GremlinExecutor.LifeCycle lifeCycle; switch (language) { @@ -209,7 +226,7 @@ protected void evalOpInternal( metaQueryCallback.afterExec(irMeta); // TimeoutException has been handled in ResultProcessor, skip it here if (t != null && !(t instanceof TimeoutException)) { - statusCallback.onEnd(false, t.getMessage()); + statusCallback.onErrorEnd(t.getMessage()); Optional possibleTemporaryException = determineIfTemporaryException(t); if (possibleTemporaryException.isPresent()) { @@ -294,7 +311,7 @@ protected void evalOpInternal( return null; }); } catch (RejectedExecutionException var17) { - statusCallback.getQueryLogger().error(var17.getMessage()); + statusCallback.getQueryLogger().error(var17); ctx.writeAndFlush( ResponseMessage.build(msg) .code(ResponseStatusCode.TOO_MANY_REQUESTS) @@ -305,7 +322,26 @@ protected void evalOpInternal( protected QueryStatusCallback createQueryStatusCallback(String query, BigInteger queryId) { return new QueryStatusCallback( - new MetricsCollector(evalOpTimer), queryHistogram, new QueryLogger(query, queryId)); + new MetricsCollector(evalOpTimer), + queryHistogram, + new QueryLogger(query, queryId), + this.printThreshold); + } + + /** + * + * @param query + * @param queryId + * @param upTraceId 上游传下来的traceId, 用于做全链路追踪 + * @return + */ + protected QueryStatusCallback createQueryStatusCallback( + String query, BigInteger queryId, String upTraceId) { + return new QueryStatusCallback( + new MetricsCollector(evalOpTimer), + queryHistogram, + new QueryLogger(query, queryId, upTraceId), + printThreshold); } protected GremlinExecutor.LifeCycle createLifeCycle( @@ -380,9 +416,10 @@ protected void processTraversal( // print script and jobName with ir plan queryLogger.info("Submitted query"); // Too verbose, since all identical queries produce identical plans, it's no need to print - // every plan in production. + // every plan in production.de String irPlanStr = irPlan.getPlanAsJson(); queryLogger.debug("ir plan {}", irPlanStr); + queryLogger.setIrPlan(irPlanStr); byte[] physicalPlanBytes = irPlan.toPhysicalBytes(queryConfigs); irPlan.close(); @@ -403,8 +440,26 @@ protected void processTraversal( .setAll(PegasusClient.Empty.newBuilder().build()) .build(); request = request.toBuilder().setConf(jobConfig).build(); - Span outgoing = - tracer.spanBuilder("frontend/query").setSpanKind(SpanKind.CLIENT).startSpan(); + Span outgoing; + // if exist up trace, useUpTraceId as current traceId + if (TraceId.isValid(queryLogger.getUpstreamId())) { + SpanContext spanContext = + SpanContext.createFromRemoteParent( + queryLogger.getUpstreamId(), + this.opentelemetryIdGenerator.generateSpanId(), + TraceFlags.getDefault(), + TraceState.getDefault()); + outgoing = + tracer.spanBuilder("frontend/query") + .setParent( + io.opentelemetry.context.Context.current() + .with(Span.wrap(spanContext))) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + } else { + outgoing = + tracer.spanBuilder("frontend/query").setSpanKind(SpanKind.CLIENT).startSpan(); + } try (Scope ignored = outgoing.makeCurrent()) { outgoing.setAttribute("query.id", queryLogger.getQueryId().toString()); outgoing.setAttribute("query.statement", queryLogger.getQuery()); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java index 611e81a93444..67f5c28eff68 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java @@ -131,8 +131,11 @@ public void request() { default: errorCode = ResponseStatusCode.SERVER_ERROR; } - errorMsg = (errorMsg == null) ? t.getMessage() : errorMsg; - statusCallback.onEnd(false, errorMsg); + if (errorMsg == null) { + statusCallback.onErrorEnd(t); + } else { + statusCallback.onErrorEnd(errorMsg); + } writeResult.writeAndFlush( ResponseMessage.build(writeResult.getRequestMessage()) .code(errorCode) @@ -167,7 +170,7 @@ public void process(PegasusClient.JobResponse response) { } public void finish() { - statusCallback.onEnd(true, null); + statusCallback.onSuccessEnd(resultCollectors); aggregateResults(); writeResult.writeAndFlush( ResponseMessage.build(writeResult.getRequestMessage()) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java index d85fc0e2a081..61d8bc849fe7 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java @@ -117,8 +117,11 @@ public void request() { default: errorCode = ResponseStatusCode.SERVER_ERROR; } - errorMsg = (errorMsg == null) ? t.getMessage() : errorMsg; - statusCallback.onEnd(false, errorMsg); + if (errorMsg == null) { + statusCallback.onErrorEnd(t); + } else { + statusCallback.onErrorEnd(errorMsg); + } ctx.writeAndFlush( ResponseMessage.build(ctx.getRequestMessage()) .code(errorCode) diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java index 0e81a0be6810..62939eac6953 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java @@ -24,6 +24,7 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.trace.Span; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,7 +105,8 @@ public void onError(Throwable throwable) { @Override public void onCompleted() { if (counter.decrementAndGet() == 0) { - logger.info("finish get job response from all servers"); + String traceId = Span.current().getSpanContext().getTraceId(); + logger.info("trace: {}, finish get job response from all servers", traceId); try { processor.finish(); } catch (Throwable t) { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs index d2012d9bf702..cbd49ff95da6 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs @@ -284,6 +284,10 @@ where } let worker_ids = workers.unwrap(); let tracer = global::tracer("executor"); + let current_cx = opentelemetry::Context::current(); + let current_span = current_cx.span(); + let trace_id = current_span.span_context().trace_id(); + let trace_id_hex = format!("{:x}", trace_id); let mut workers = Vec::new(); for worker_id in worker_ids { @@ -304,7 +308,13 @@ where return Ok(()); } - info!("spawn job_{}({}) with {} workers;", conf.job_name, conf.job_id, workers.len()); + info!( + "trace_id:{}, spawn job_{}({}) with {} workers;", + trace_id_hex, + conf.job_name, + conf.job_id, + workers.len() + ); match pegasus_executor::spawn_batch(workers) { Ok(_) => Ok(()), diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs index 456ca60be756..d0d76dd4429b 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs @@ -247,13 +247,16 @@ impl Task for Worker { } let _ctx = WorkerContext::new(&mut self.resources, &mut self.keyed_resources); + let trace_id = self.span.span_context().trace_id(); + let trace_id_hex = format!("{:x}", trace_id); match self.task.execute() { Ok(state) => { if TaskState::Finished == state { let elapsed = self.start.elapsed().as_millis(); info_worker!( - "job({}) '{}' finished, used {:?} ms;", + "trace_id:{}, job({}) '{}' finished, used {:?} ms;", + trace_id_hex, self.id.job_id, self.conf.job_name, elapsed @@ -276,7 +279,7 @@ impl Task for Worker { } } Err(e) => { - error_worker!("job({}) execute error: {}", self.id.job_id, e); + error_worker!("trace_id:{}, job({}) execute error: {}", trace_id_hex, self.id.job_id, e); self.span .set_status(trace::Status::error(format!("Execution error: {}", e))); self.span.end(); diff --git a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs index f5011fafd8cf..e4f6c361aae2 100644 --- a/interactive_engine/executor/engine/pegasus/server/src/rpc.rs +++ b/interactive_engine/executor/engine/pegasus/server/src/rpc.rs @@ -218,7 +218,6 @@ where } let conf = parse_conf_req(conf.unwrap()); - info!("job conf {:?}", conf); pegasus::wait_servers_ready(conf.servers()); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let rpc_sink = RpcSink::new(conf.job_id, tx); @@ -231,6 +230,9 @@ where .span_builder("JobService/submit") .with_kind(SpanKind::Server) .start_with_context(&tracer, &parent_ctx); + let trace_id = span.span_context().trace_id(); + let trace_id_hex = format!("{:x}", trace_id); + info!("trace_id : {}, job conf {:?}", trace_id_hex, conf); span.set_attributes(vec![ KeyValue::new("job.name", conf.job_name.clone()), KeyValue::new("job.id", conf.job_id.to_string()), @@ -240,7 +242,7 @@ where let ret = pegasus::run_opt(conf, sink, move |worker| service.assemble(&job, worker)); if let Err(e) = ret { - error!("submit job {} failure: {:?}", job_id, e); + error!("trace_id:{}, submit job {} failure: {:?}", trace_id_hex, job_id, e); Err(Status::unknown(format!("submit job error {}", e))) } else { Ok(Response::new(UnboundedReceiverStream::new(rx))) diff --git a/interactive_engine/groot-client/pom.xml b/interactive_engine/groot-client/pom.xml index 9b46320d90ed..05e0c3e27b3c 100644 --- a/interactive_engine/groot-client/pom.xml +++ b/interactive_engine/groot-client/pom.xml @@ -96,7 +96,8 @@ graph_def.proto schema_common.proto ddl_service.proto - write_service.proto + write_service.proto + request_option.proto diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java index d0e1abd56a90..971dea16d561 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java @@ -61,10 +61,22 @@ public com.alibaba.graphscope.proto.GraphDefPb submitSchema(Schema schema) { return response.getGraphDef(); } + public com.alibaba.graphscope.proto.GraphDefPb submitSchema( + Schema schema, RequestOptions options) { + BatchSubmitRequest request = schema.toProto(options); + BatchSubmitResponse response = ddlStub.batchSubmit(request); + return response.getGraphDef(); + } + public com.alibaba.graphscope.proto.GraphDefPb submitSchema(Schema.Builder schema) { return submitSchema(schema.build()); } + public com.alibaba.graphscope.proto.GraphDefPb submitSchema( + Schema.Builder schema, RequestOptions options) { + return submitSchema(schema.build(), options); + } + private BatchWriteRequest.Builder getNewWriteBuilder() { String clientId = writeStub.getClientId(GetClientIdRequest.newBuilder().build()).getClientId(); @@ -94,17 +106,37 @@ private long modifyVertex(Vertex vertex, WriteTypePb writeType) { return submit(request); } + private long modifyVertex(Vertex vertex, WriteTypePb writeType, RequestOptions options) { + WriteRequestPb request = vertex.toWriteRequest(writeType); + return submit(request, options); + } + private long modifyVertex(List vertices, WriteTypePb writeType) { List requests = getVertexWriteRequestPbs(vertices, writeType); return submit(requests); } + private long modifyVertex( + List vertices, WriteTypePb writeType, RequestOptions options) { + List requests = getVertexWriteRequestPbs(vertices, writeType); + return submit(requests, options); + } + private void modifyVertex( Vertex vertex, StreamObserver callback, WriteTypePb writeType) { WriteRequestPb request = vertex.toWriteRequest(writeType); submit(request, callback); } + private void modifyVertex( + Vertex vertex, + StreamObserver callback, + WriteTypePb writeType, + RequestOptions options) { + WriteRequestPb request = vertex.toWriteRequest(writeType); + submit(request, options, callback); + } + private void modifyVertex( List vertices, StreamObserver callback, @@ -113,28 +145,65 @@ private void modifyVertex( submit(requests, callback); } + private void modifyVertex( + List vertices, + StreamObserver callback, + WriteTypePb writeType, + RequestOptions options) { + List requests = getVertexWriteRequestPbs(vertices, writeType); + submit(requests, options, callback); + } + private long modifyEdge(Edge edge, WriteTypePb writeType) { WriteRequestPb request = edge.toWriteRequest(writeType); return submit(request); } + private long modifyEdge(Edge edge, WriteTypePb writeType, RequestOptions options) { + WriteRequestPb request = edge.toWriteRequest(writeType); + return submit(request, options); + } + private long modifyEdge(List edges, WriteTypePb writeType) { List requests = getEdgeWriteRequestPbs(edges, writeType); return submit(requests); } + private long modifyEdge(List edges, WriteTypePb writeType, RequestOptions options) { + List requests = getEdgeWriteRequestPbs(edges, writeType); + return submit(requests, options); + } + private void modifyEdge( Edge edge, StreamObserver callback, WriteTypePb writeType) { WriteRequestPb request = edge.toWriteRequest(writeType); submit(request, callback); } + private void modifyEdge( + Edge edge, + StreamObserver callback, + WriteTypePb writeType, + RequestOptions options) { + WriteRequestPb request = edge.toWriteRequest(writeType); + submit(request, options, callback); + } + private void modifyEdge( List edges, StreamObserver callback, WriteTypePb writeType) { List requests = getEdgeWriteRequestPbs(edges, writeType); submit(requests, callback); } + private void modifyEdge( + List edges, + StreamObserver callback, + WriteTypePb writeType, + RequestOptions options) { + List requests = getEdgeWriteRequestPbs(edges, writeType); + submit(requests, options, callback); + } + private long modifyVerticesAndEdge( List vertices, List edges, WriteTypePb writeType) { List requests = getVertexWriteRequestPbs(vertices, writeType); @@ -142,6 +211,16 @@ private long modifyVerticesAndEdge( return submit(requests); } + private long modifyVerticesAndEdge( + List vertices, + List edges, + WriteTypePb writeType, + RequestOptions options) { + List requests = getVertexWriteRequestPbs(vertices, writeType); + requests.addAll(getEdgeWriteRequestPbs(edges, writeType)); + return submit(requests, options); + } + private void modifyVerticesAndEdge( List vertices, List edges, @@ -152,33 +231,83 @@ private void modifyVerticesAndEdge( submit(requests, callback); } + private void modifyVerticesAndEdge( + List vertices, + List edges, + StreamObserver callback, + WriteTypePb writeType, + RequestOptions options) { + List requests = getVertexWriteRequestPbs(vertices, writeType); + requests.addAll(getEdgeWriteRequestPbs(edges, writeType)); + submit(requests, options, callback); + } + public long addVerticesAndEdges(List vertices, List edges) { return modifyVerticesAndEdge(vertices, edges, WriteTypePb.INSERT); } + public long addVerticesAndEdges( + List vertices, List edges, RequestOptions options) { + return modifyVerticesAndEdge(vertices, edges, WriteTypePb.INSERT, options); + } + public long updateVerticesAndEdges(List vertices, List edges) { return modifyVerticesAndEdge(vertices, edges, WriteTypePb.UPDATE); } + public long updateVerticesAndEdges( + List vertices, List edges, RequestOptions options) { + return modifyVerticesAndEdge(vertices, edges, WriteTypePb.UPDATE, options); + } + public long deleteVerticesAndEdges(List vertices, List edges) { return modifyVerticesAndEdge(vertices, edges, WriteTypePb.DELETE); } + public long deleteVerticesAndEdges( + List vertices, List edges, RequestOptions options) { + return modifyVerticesAndEdge(vertices, edges, WriteTypePb.DELETE, options); + } + public void addVerticesAndEdges( List vertices, List edges, StreamObserver callback) { modifyVerticesAndEdge(vertices, edges, callback, WriteTypePb.INSERT); } + public void addVerticesAndEdges( + List vertices, + List edges, + StreamObserver callback, + RequestOptions options) { + modifyVerticesAndEdge(vertices, edges, callback, WriteTypePb.INSERT, options); + } + public void updateVerticesAndEdges( List vertices, List edges, StreamObserver callback) { modifyVerticesAndEdge(vertices, edges, callback, WriteTypePb.UPDATE); } + public void updateVerticesAndEdges( + List vertices, + List edges, + StreamObserver callback, + RequestOptions options) { + modifyVerticesAndEdge(vertices, edges, callback, WriteTypePb.UPDATE, options); + } + public void deleteVerticesAndEdges( List vertices, List edges, StreamObserver callback) { modifyVerticesAndEdge(vertices, edges, callback, WriteTypePb.DELETE); } + public void deleteVerticesAndEdges( + List vertices, + List edges, + StreamObserver callback, + RequestOptions options) { + modifyVerticesAndEdge(vertices, edges, callback, WriteTypePb.DELETE, options); + } + /** * Add vertex by realtime write * @param vertex vertex that contains label and pk properties and other properties @@ -187,18 +316,38 @@ public long addVertex(Vertex vertex) { return modifyVertex(vertex, WriteTypePb.INSERT); } + public long addVertex(Vertex vertex, RequestOptions options) { + return modifyVertex(vertex, WriteTypePb.INSERT, options); + } + public long addVertices(List vertices) { return modifyVertex(vertices, WriteTypePb.INSERT); } + public long addVertices(List vertices, RequestOptions options) { + return modifyVertex(vertices, WriteTypePb.INSERT, options); + } + public void addVertex(Vertex vertex, StreamObserver callback) { modifyVertex(vertex, callback, WriteTypePb.INSERT); } + public void addVertex( + Vertex vertex, StreamObserver callback, RequestOptions options) { + modifyVertex(vertex, callback, WriteTypePb.INSERT, options); + } + public void addVertices(List vertices, StreamObserver callback) { modifyVertex(vertices, callback, WriteTypePb.INSERT); } + public void addVertices( + List vertices, + StreamObserver callback, + RequestOptions options) { + modifyVertex(vertices, callback, WriteTypePb.INSERT, options); + } + /** * Update existed vertex by realtime write * @param vertex vertex that contains label and pk properties and other properties @@ -207,18 +356,38 @@ public long updateVertex(Vertex vertex) { return modifyVertex(vertex, WriteTypePb.UPDATE); } + public long updateVertex(Vertex vertex, RequestOptions options) { + return modifyVertex(vertex, WriteTypePb.UPDATE, options); + } + public long updateVertices(List vertices) { return modifyVertex(vertices, WriteTypePb.UPDATE); } + public long updateVertices(List vertices, RequestOptions options) { + return modifyVertex(vertices, WriteTypePb.UPDATE, options); + } + public void updateVertex(Vertex vertex, StreamObserver callback) { modifyVertex(vertex, callback, WriteTypePb.UPDATE); } + public void updateVertex( + Vertex vertex, StreamObserver callback, RequestOptions options) { + modifyVertex(vertex, callback, WriteTypePb.UPDATE, options); + } + public void updateVertices(List vertices, StreamObserver callback) { modifyVertex(vertices, callback, WriteTypePb.UPDATE); } + public void updateVertices( + List vertices, + StreamObserver callback, + RequestOptions options) { + modifyVertex(vertices, callback, WriteTypePb.UPDATE, options); + } + /** * Delete vertex by its primary key * @param vertex vertex that contains label and primary key properties @@ -227,26 +396,54 @@ public long deleteVertex(Vertex vertex) { return modifyVertex(vertex, WriteTypePb.DELETE); } + public long deleteVertex(Vertex vertex, RequestOptions options) { + return modifyVertex(vertex, WriteTypePb.DELETE, options); + } + public long deleteVertices(List vertices) { return modifyVertex(vertices, WriteTypePb.DELETE); } + public long deleteVertices(List vertices, RequestOptions options) { + return modifyVertex(vertices, WriteTypePb.DELETE, options); + } + public void deleteVertex(Vertex vertex, StreamObserver callback) { modifyVertex(vertex, callback, WriteTypePb.DELETE); } + public void deleteVertex( + Vertex vertex, StreamObserver callback, RequestOptions options) { + modifyVertex(vertex, callback, WriteTypePb.DELETE, options); + } + public void deleteVertices(List vertices, StreamObserver callback) { modifyVertex(vertices, callback, WriteTypePb.DELETE); } + public void deleteVertices( + List vertices, + StreamObserver callback, + RequestOptions options) { + modifyVertex(vertices, callback, WriteTypePb.DELETE, options); + } + public long clearVertexProperty(Vertex vertex) { return modifyVertex(vertex, WriteTypePb.CLEAR_PROPERTY); } + public long clearVertexProperty(Vertex vertex, RequestOptions options) { + return modifyVertex(vertex, WriteTypePb.CLEAR_PROPERTY, options); + } + public long clearVertexProperties(List vertices) { return modifyVertex(vertices, WriteTypePb.CLEAR_PROPERTY); } + public long clearVertexProperties(List vertices, RequestOptions options) { + return modifyVertex(vertices, WriteTypePb.CLEAR_PROPERTY, options); + } + /** * Add edge by realtime write * @param edge edge that contains label, src vertex label and pk, dst label and pk, and properties @@ -255,18 +452,36 @@ public long addEdge(Edge edge) { return modifyEdge(edge, WriteTypePb.INSERT); } + public long addEdge(Edge edge, RequestOptions options) { + return modifyEdge(edge, WriteTypePb.INSERT, options); + } + public long addEdges(List edges) { return modifyEdge(edges, WriteTypePb.INSERT); } + public long addEdges(List edges, RequestOptions options) { + return modifyEdge(edges, WriteTypePb.INSERT, options); + } + public void addEdge(Edge edge, StreamObserver callback) { modifyEdge(edge, callback, WriteTypePb.INSERT); } + public void addEdge( + Edge edge, StreamObserver callback, RequestOptions options) { + modifyEdge(edge, callback, WriteTypePb.INSERT, options); + } + public void addEdges(List edges, StreamObserver callback) { modifyEdge(edges, callback, WriteTypePb.INSERT); } + public void addEdges( + List edges, StreamObserver callback, RequestOptions options) { + modifyEdge(edges, callback, WriteTypePb.INSERT, options); + } + /** * Update existed edge by realtime write * @param edge edge that contains label, src vertex label and pk, dst label and pk, and properties @@ -275,18 +490,36 @@ public long updateEdge(Edge edge) { return modifyEdge(edge, WriteTypePb.UPDATE); } + public long updateEdge(Edge edge, RequestOptions options) { + return modifyEdge(edge, WriteTypePb.UPDATE, options); + } + public long updateEdges(List edges) { return modifyEdge(edges, WriteTypePb.UPDATE); } + public long updateEdges(List edges, RequestOptions options) { + return modifyEdge(edges, WriteTypePb.UPDATE, options); + } + public void updateEdge(Edge edge, StreamObserver callback) { modifyEdge(edge, callback, WriteTypePb.UPDATE); } + public void updateEdge( + Edge edge, StreamObserver callback, RequestOptions options) { + modifyEdge(edge, callback, WriteTypePb.UPDATE, options); + } + public void updateEdges(List edges, StreamObserver callback) { modifyEdge(edges, callback, WriteTypePb.UPDATE); } + public void updateEdges( + List edges, StreamObserver callback, RequestOptions options) { + modifyEdge(edges, callback, WriteTypePb.UPDATE, options); + } + /** * Delete an edge by realtime write * @param edge edge that contains label, src vertex label and pk, dst label and pk, no properties required @@ -295,26 +528,52 @@ public long deleteEdge(Edge edge) { return modifyEdge(edge, WriteTypePb.DELETE); } + public long deleteEdge(Edge edge, RequestOptions options) { + return modifyEdge(edge, WriteTypePb.DELETE, options); + } + public long deleteEdges(List edges) { return modifyEdge(edges, WriteTypePb.DELETE); } + public long deleteEdges(List edges, RequestOptions options) { + return modifyEdge(edges, WriteTypePb.DELETE, options); + } + public void deleteEdge(Edge edge, StreamObserver callback) { modifyEdge(edge, callback, WriteTypePb.DELETE); } + public void deleteEdge( + Edge edge, StreamObserver callback, RequestOptions options) { + modifyEdge(edge, callback, WriteTypePb.DELETE, options); + } + public void deleteEdges(List edges, StreamObserver callback) { modifyEdge(edges, callback, WriteTypePb.DELETE); } + public void deleteEdges( + List edges, StreamObserver callback, RequestOptions options) { + modifyEdge(edges, callback, WriteTypePb.DELETE, options); + } + public long clearEdgeProperty(Edge edge) { return modifyEdge(edge, WriteTypePb.CLEAR_PROPERTY); } + public long clearEdgeProperty(Edge edge, RequestOptions options) { + return modifyEdge(edge, WriteTypePb.CLEAR_PROPERTY, options); + } + public long clearEdgeProperties(List edges) { return modifyEdge(edges, WriteTypePb.CLEAR_PROPERTY); } + public long clearEdgeProperties(List edges, RequestOptions options) { + return modifyEdge(edges, WriteTypePb.CLEAR_PROPERTY, options); + } + /** * Commit the realtime write transaction. * @return The snapshot_id. The data committed would be available after a while, or you could remoteFlush(snapshot_id) @@ -326,6 +585,13 @@ private long submit(WriteRequestPb request) { return writeStub.batchWrite(batchWriteBuilder.build()).getSnapshotId(); } + private long submit(WriteRequestPb request, RequestOptions options) { + BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder(); + batchWriteBuilder.addWriteRequests(request); + batchWriteBuilder.setRequestOptions(options.toWriteRequest()); + return writeStub.batchWrite(batchWriteBuilder.build()).getSnapshotId(); + } + private long submit(List requests) { if (requests.isEmpty()) { return 0; @@ -335,12 +601,32 @@ private long submit(List requests) { return writeStub.batchWrite(batchWriteBuilder.build()).getSnapshotId(); } + private long submit(List requests, RequestOptions options) { + if (requests.isEmpty()) { + return 0; + } + BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder(); + batchWriteBuilder.addAllWriteRequests(requests); + batchWriteBuilder.setRequestOptions(options.toWriteRequest()); + return writeStub.batchWrite(batchWriteBuilder.build()).getSnapshotId(); + } + private void submit(WriteRequestPb request, StreamObserver callback) { BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder(); batchWriteBuilder.addWriteRequests(request); asyncWriteStub.batchWrite(batchWriteBuilder.build(), callback); } + private void submit( + WriteRequestPb request, + RequestOptions options, + StreamObserver callback) { + BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder(); + batchWriteBuilder.addWriteRequests(request); + batchWriteBuilder.setRequestOptions(options.toWriteRequest()); + asyncWriteStub.batchWrite(batchWriteBuilder.build(), callback); + } + private void submit( List requests, StreamObserver callback) { if (!requests.isEmpty()) { @@ -350,6 +636,18 @@ private void submit( } } + private void submit( + List requests, + RequestOptions options, + StreamObserver callback) { + if (!requests.isEmpty()) { + BatchWriteRequest.Builder batchWriteBuilder = getNewWriteBuilder(); + batchWriteBuilder.addAllWriteRequests(requests); + batchWriteBuilder.setRequestOptions(options.toWriteRequest()); + asyncWriteStub.batchWrite(batchWriteBuilder.build(), callback); + } + } + public GraphDefPb getSchema() { GetSchemaResponse response = this.clientStub.getSchema(GetSchemaRequest.newBuilder().build()); diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/RequestOptions.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/RequestOptions.java new file mode 100644 index 000000000000..6820cf963db8 --- /dev/null +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/RequestOptions.java @@ -0,0 +1,89 @@ +package com.alibaba.graphscope.groot.sdk; + +import com.alibaba.graphscope.proto.groot.AttributeValue; +import com.alibaba.graphscope.proto.groot.RequestOptionsPb; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Options that can be supplied on a per request basis. + */ +public class RequestOptions { + + public static final RequestOptions EMPTY = RequestOptions.build().create(); + + private final Map parameters; + private final String traceId; + + private RequestOptions(final Builder builder) { + this.parameters = builder.parameters; + this.traceId = builder.traceId; + } + + public Optional> getParameters() { + return Optional.ofNullable(parameters); + } + + public Optional getTraceId() { + return Optional.ofNullable(traceId); + } + + public RequestOptionsPb toWriteRequest() { + RequestOptionsPb.Builder builder = RequestOptionsPb.newBuilder(); + if (traceId != null) { + builder.setTraceId(traceId); + } + if (parameters != null) { + for (Map.Entry entry : parameters.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + AttributeValue.Builder valueBuilder = AttributeValue.newBuilder(); + if (value instanceof Long) { + valueBuilder.setLongValue((long) value); + continue; + } + if (value instanceof Integer) { + valueBuilder.setIntValue((int) value); + continue; + } + if (value instanceof Double) { + valueBuilder.setDoubleValue((double) value); + continue; + } + if (value instanceof String) { + valueBuilder.setStringValue((String) value); + } + builder.putAttributes(key, valueBuilder.build()); + } + } + return builder.build(); + } + + public static Builder build() { + return new Builder(); + } + + public static final class Builder { + private Map parameters = null; + private String traceId = null; + + public Builder addParameter(final String name, final Object value) { + if (null == parameters) { + parameters = new HashMap<>(); + } + parameters.put(name, value); + return this; + } + + public Builder traceId(final String traceId) { + this.traceId = traceId; + return this; + } + + public RequestOptions create() { + return new RequestOptions(this); + } + } +} diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Schema.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Schema.java index c5d5aa61a3d4..15e9564a78cd 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Schema.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Schema.java @@ -1,5 +1,6 @@ package com.alibaba.graphscope.groot.sdk.schema; +import com.alibaba.graphscope.groot.sdk.RequestOptions; import com.alibaba.graphscope.proto.groot.*; import com.alibaba.graphscope.proto.groot.BatchSubmitRequest.DDLRequest; @@ -108,8 +109,23 @@ public static Builder newBuilder() { return new Builder(); } + public BatchSubmitRequest toProto(RequestOptions options) { + if (options == null) { + return toProto(); + } + BatchSubmitRequest.Builder builder = BatchSubmitRequest.newBuilder(); + builder.setRequestOptions(options.toWriteRequest()); + buildProto(builder); + return builder.build(); + } + public BatchSubmitRequest toProto() { BatchSubmitRequest.Builder builder = BatchSubmitRequest.newBuilder(); + buildProto(builder); + return builder.build(); + } + + private void buildProto(BatchSubmitRequest.Builder builder) { for (VertexLabel label : vertexLabels) { CreateVertexTypeRequest.Builder typeBuilder = CreateVertexTypeRequest.newBuilder(); typeBuilder.setTypeDef(label.toProto()); @@ -172,7 +188,6 @@ public BatchSubmitRequest toProto() { builder.addValue(ddlRequestBuilder); } } - return builder.build(); } public static class Builder { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java index 7a0e53b09330..9fff27c729ca 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java @@ -230,8 +230,10 @@ public void submitBatchDdl( String sessionId, DdlRequestBatch ddlRequestBatch, CompletionCallback callback) { + String traceId = ddlRequestBatch.getTraceId(); logger.info( - "submitBatchDdl requestId [{}], sessionId [{}], request body [{}]", + "submitBatchDdl, traceId [{}] requestId [{}], sessionId [{}], request body [{}]", + traceId, requestId, sessionId, ddlRequestBatch.toProto()); @@ -261,6 +263,7 @@ public void submitBatchDdl( OperationBatch operationBatch = OperationBatch.newBuilder(ddlOperations) .setLatestSnapshotId(currentWriteSnapshotId) + .setTraceId(traceId) .build(); batchId = this.ddlWriter.writeOperations(requestId, operationBatch); } finally { @@ -278,7 +281,8 @@ public void submitBatchDdl( callback.onCompleted(snapshotId); } catch (Exception e) { logger.error( - "Error in Ddl requestId [{}], sessionId [{}]", + "Error in Ddl traceId[{}], requestId [{}], sessionId [{}]", + traceId, requestId, sessionId, e); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java index 3c8a6e8a0dec..86d0e0f98184 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java @@ -1,6 +1,7 @@ package com.alibaba.graphscope.groot.frontend; import com.alibaba.graphscope.groot.CompletionCallback; +import com.alibaba.graphscope.groot.common.util.Utils; import com.alibaba.graphscope.groot.common.util.UuidUtils; import com.alibaba.graphscope.groot.frontend.write.GraphWriter; import com.alibaba.graphscope.groot.frontend.write.WriteRequest; @@ -18,6 +19,7 @@ public class ClientWriteService extends ClientWriteGrpc.ClientWriteImplBase { private static final Logger logger = LoggerFactory.getLogger(ClientWriteService.class); + private static Logger metricLogger = LoggerFactory.getLogger("MetricLog"); private final GraphWriter graphWriter; @@ -36,8 +38,11 @@ public void getClientId( @Override public void batchWrite( BatchWriteRequest request, StreamObserver responseObserver) { + long startBatchWrite = System.currentTimeMillis(); String requestId = UuidUtils.getBase64UUIDString(); String writeSession = request.getClientId(); + RequestOptionsPb optionsPb = request.getRequestOptions(); + String upTraceId = optionsPb == null ? null : optionsPb.getTraceId(); int count = request.getWriteRequestsCount(); List writeRequests = new ArrayList<>(count); logger.debug( @@ -53,9 +58,22 @@ public void batchWrite( requestId, writeSession, writeRequests, + optionsPb, new CompletionCallback() { @Override public void onCompleted(Long res) { + long current = System.currentTimeMillis(); + String metricJson = + Utils.buildMetricJsonLog( + true, + upTraceId, + count, + null, + (current - startBatchWrite), + current, + "writeKafka", + "write"); + metricLogger.info(metricJson); responseObserver.onNext( BatchWriteResponse.newBuilder().setSnapshotId(res).build()); responseObserver.onCompleted(); @@ -63,6 +81,18 @@ public void onCompleted(Long res) { @Override public void onError(Throwable t) { + long current = System.currentTimeMillis(); + String metricJson = + Utils.buildMetricJsonLog( + false, + upTraceId, + count, + null, + (current - startBatchWrite), + current, + "writeKafka", + "write"); + metricLogger.info(metricJson); logger.error( "batch write error. request {} session {}", requestId, @@ -77,7 +107,11 @@ public void onError(Throwable t) { } catch (Exception e) { logger.error( - "batchWrite failed. request [{}] session [{}]", requestId, writeSession, e); + "batchWrite failed. traceId[{}] request [{}] session [{}]", + upTraceId, + requestId, + writeSession, + e); responseObserver.onError( Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java index 188ed62ec5a3..ec95ded3ee11 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/GrootDdlService.java @@ -54,10 +54,13 @@ public GrootDdlService(SnapshotCache snapshotCache, BatchDdlClient batchDdlClien @Override public void batchSubmit( BatchSubmitRequest request, StreamObserver responseObserver) { + RequestOptionsPb requestOptionsPb = request.getRequestOptions(); + String traceId = requestOptionsPb == null ? null : requestOptionsPb.getTraceId(); try { boolean simple = request.getSimpleResponse(); DdlRequestBatch.Builder builder = DdlRequestBatch.newBuilder(); - logger.info("Received DDL request: " + request); + builder.setTraceId(traceId); + logger.info("traceId: [{}], Received DDL request: [{}]", traceId, request); for (BatchSubmitRequest.DDLRequest ddlRequest : request.getValueList()) { switch (ddlRequest.getValueCase()) { case CREATE_VERTEX_TYPE_REQUEST: @@ -153,7 +156,10 @@ public void batchSubmit( StringWriter sw = new StringWriter(); e.printStackTrace(new PrintWriter(sw)); String trace = sw.toString(); - logger.error("Exception occurred when processing batch DDL request", e); + logger.error( + "Exception occurred when processing batch DDL request. traceId:[{}]", + traceId, + e); responseObserver.onError(Status.INTERNAL.withDescription(trace).asRuntimeException()); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java index 2f69ad5b93b2..380a02cd15c6 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java @@ -17,6 +17,7 @@ import com.alibaba.graphscope.groot.operation.OperationType; import com.alibaba.graphscope.groot.operation.VertexId; import com.alibaba.graphscope.groot.operation.dml.*; +import com.alibaba.graphscope.proto.groot.RequestOptionsPb; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.common.Attributes; @@ -87,9 +88,11 @@ public void writeBatch( String requestId, String writeSession, List writeRequests, + RequestOptionsPb optionsPb, CompletionCallback callback) { GraphSchema schema = snapshotCache.getSnapshotWithSchema().getGraphDef(); OperationBatch.Builder batchBuilder = OperationBatch.newBuilder(); + String upTraceId = optionsPb == null ? null : optionsPb.getTraceId(); for (WriteRequest writeRequest : writeRequests) { OperationType operationType = writeRequest.getOperationType(); DataRecord dataRecord = writeRequest.getDataRecord(); @@ -123,6 +126,7 @@ public void writeBatch( "Invalid operationType [" + operationType + "]"); } } + batchBuilder.setTraceId(upTraceId); OperationBatch operationBatch = batchBuilder.build(); long startTime = System.currentTimeMillis(); AttributesBuilder attrs = Attributes.builder(); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java index adc98d787134..1776aa2b003d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java @@ -200,6 +200,7 @@ public Map splitBatch(OperationBatch operationB Map storeToBatchBuilder = new HashMap<>(); Function storeDataBatchBuilderFunc = k -> OperationBatch.newBuilder(); + String traceId = operationBatch.getTraceId(); for (OperationBlob operationBlob : operationBatch) { long partitionKey = operationBlob.getPartitionKey(); if (partitionKey == -1L) { @@ -208,6 +209,9 @@ public Map splitBatch(OperationBatch operationB OperationBatch.Builder batchBuilder = storeToBatchBuilder.computeIfAbsent(i, storeDataBatchBuilderFunc); batchBuilder.addOperationBlob(operationBlob); + if (traceId != null) { + batchBuilder.setTraceId(traceId); + } } } else { int partitionId = @@ -216,6 +220,9 @@ public Map splitBatch(OperationBatch operationB OperationBatch.Builder batchBuilder = storeToBatchBuilder.computeIfAbsent(storeId, storeDataBatchBuilderFunc); batchBuilder.addOperationBlob(operationBlob); + if (traceId != null) { + batchBuilder.setTraceId(traceId); + } } } return storeToBatchBuilder; diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/OperationBatch.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/OperationBatch.java index 3a3be247c52b..921161f8271c 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/OperationBatch.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/OperationBatch.java @@ -25,10 +25,13 @@ public class OperationBatch implements Iterable { private final long latestSnapshotId; private final List operationBlobs; + private final String traceId; - private OperationBatch(long latestSnapshotId, List operationBlobs) { + private OperationBatch( + long latestSnapshotId, List operationBlobs, String traceId) { this.latestSnapshotId = latestSnapshotId; this.operationBlobs = operationBlobs; + this.traceId = traceId; } public static OperationBatch parseProto(OperationBatchPb proto) { @@ -38,7 +41,7 @@ public static OperationBatch parseProto(OperationBatchPb proto) { for (OperationPb operationPb : operationPbs) { operationBlobs.add(OperationBlob.parseProto(operationPb)); } - return new OperationBatch(latestSnapshotId, operationBlobs); + return new OperationBatch(latestSnapshotId, operationBlobs, proto.getTraceId()); } public int getOperationCount() { @@ -54,6 +57,10 @@ public long getLatestSnapshotId() { return latestSnapshotId; } + public String getTraceId() { + return traceId; + } + public OperationBlob getOperationBlob(int i) { return operationBlobs.get(i); } @@ -61,6 +68,9 @@ public OperationBlob getOperationBlob(int i) { public OperationBatchPb toProto() { OperationBatchPb.Builder builder = OperationBatchPb.newBuilder(); builder.setLatestSnapshotId(latestSnapshotId); + if (this.traceId != null) { + builder.setTraceId(traceId); + } for (OperationBlob operationBlob : operationBlobs) { builder.addOperations(operationBlob.toProto()); } @@ -89,10 +99,12 @@ public static class Builder { private boolean built = false; private long latestSnapshotId; + private String traceId; private List operationBlobs; private Builder() { this.latestSnapshotId = 0L; + this.traceId = null; this.operationBlobs = new ArrayList<>(); } @@ -120,9 +132,14 @@ public Builder setLatestSnapshotId(long latestSnapshotId) { return this; } + public Builder setTraceId(String traceId) { + this.traceId = traceId; + return this; + } + public OperationBatch build() { this.built = true; - return new OperationBatch(latestSnapshotId, operationBlobs); + return new OperationBatch(latestSnapshotId, operationBlobs, traceId); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/StoreDataBatch.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/StoreDataBatch.java index 5c52423885c0..aa076cca774d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/StoreDataBatch.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/StoreDataBatch.java @@ -30,14 +30,16 @@ public class StoreDataBatch { // List [ partition -> OperationBatch ] private final List> dataBatch; private int size; + private final String traceId; private StoreDataBatch( String requestId, int queueId, long snapshotId, long offset, - List> dataBatch) { - this(requestId, queueId, snapshotId, offset, dataBatch, -1); + List> dataBatch, + String traceId) { + this(requestId, queueId, snapshotId, offset, dataBatch, -1, traceId); } private StoreDataBatch( @@ -46,13 +48,15 @@ private StoreDataBatch( long snapshotId, long offset, List> dataBatch, - int size) { + int size, + String traceId) { this.requestId = requestId; this.queueId = queueId; this.snapshotId = snapshotId; this.offset = offset; this.dataBatch = Collections.unmodifiableList(new ArrayList<>(dataBatch)); this.size = size; + this.traceId = traceId; } public static StoreDataBatch parseProto(StoreDataBatchPb proto) { @@ -68,7 +72,8 @@ public static StoreDataBatch parseProto(StoreDataBatchPb proto) { .forEach((pid, pb) -> batch.put(pid, OperationBatch.parseProto(pb))); dataBatch.add(batch); } - return new StoreDataBatch(requestId, queueId, snapshotId, offset, dataBatch); + return new StoreDataBatch( + requestId, queueId, snapshotId, offset, dataBatch, proto.getTraceId()); } public String getRequestId() { @@ -111,6 +116,9 @@ public StoreDataBatchPb toProto() { .setQueueId(queueId) .setSnapshotId(snapshotId) .setOffset(offset); + if (traceId != null) { + builder.setTraceId(traceId); + } for (Map batch : dataBatch) { PartitionToBatchPb.Builder batchBuilder = PartitionToBatchPb.newBuilder(); batch.forEach((pid, ops) -> batchBuilder.putPartitionToBatch(pid, ops.toProto())); @@ -131,11 +139,13 @@ public static class Builder { private List> dataBatch; private Map partitionBatchBuilder; private int size; + private String traceId; public Builder() { this.dataBatch = new ArrayList<>(); this.partitionBatchBuilder = new HashMap<>(); this.size = 0; + this.traceId = null; } public Builder requestId(String requestId) { @@ -158,6 +168,11 @@ public Builder offset(long offset) { return this; } + public Builder traceId(String traceId) { + this.traceId = traceId; + return this; + } + private Builder addBatch(Map batch) { this.dataBatch.add(batch); return this; @@ -179,6 +194,9 @@ public Builder addOperation(int partitionId, OperationBlob operationBlob) { partitionBatchBuilder.computeIfAbsent( partitionId, k -> OperationBatch.newBuilder()); builder.addOperationBlob(operationBlob); + if (traceId != null) { + builder.setTraceId(traceId); + } this.size++; return this; } @@ -189,7 +207,8 @@ public StoreDataBatch build() { partitionBatchBuilder.forEach((pid, builder) -> batch.put(pid, builder.build())); addBatch(batch); } - return new StoreDataBatch(requestId, queueId, snapshotId, offset, dataBatch, size); + return new StoreDataBatch( + requestId, queueId, snapshotId, offset, dataBatch, size, traceId); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/schema/request/DdlRequestBatch.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/schema/request/DdlRequestBatch.java index 5d908d58bcdf..09a12fc4deae 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/schema/request/DdlRequestBatch.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/schema/request/DdlRequestBatch.java @@ -26,8 +26,11 @@ public class DdlRequestBatch implements Iterable { private List ddlRequestBlobs; - private DdlRequestBatch(List ddlRequestBlobs) { + private String traceId; + + private DdlRequestBatch(List ddlRequestBlobs, String traceId) { this.ddlRequestBlobs = ddlRequestBlobs; + this.traceId = traceId; } public static DdlRequestBatch parseProto(DdlRequestBatchPb proto) { @@ -36,17 +39,24 @@ public static DdlRequestBatch parseProto(DdlRequestBatchPb proto) { for (DdlRequestPb ddlRequestPb : ddlRequestPbs) { ddlRequestBlobs.add(DdlRequestBlob.parseProto(ddlRequestPb)); } - return new DdlRequestBatch(ddlRequestBlobs); + return new DdlRequestBatch(ddlRequestBlobs, proto.getTraceId()); } public DdlRequestBatchPb toProto() { DdlRequestBatchPb.Builder builder = DdlRequestBatchPb.newBuilder(); + if (this.traceId != null) { + builder.setTraceId(traceId); + } for (DdlRequestBlob blob : ddlRequestBlobs) { builder.addDdlRequests(blob.toProto()); } return builder.build(); } + public String getTraceId() { + return traceId; + } + @Override public Iterator iterator() { return ddlRequestBlobs.iterator(); @@ -70,9 +80,11 @@ public static Builder newBuilder() { public static class Builder { private List ddlRequestBlobs; + private String traceId; private Builder() { this.ddlRequestBlobs = new ArrayList<>(); + this.traceId = null; } public Builder addDdlRequest(AbstractDdlRequest ddlRequest) { @@ -80,8 +92,13 @@ public Builder addDdlRequest(AbstractDdlRequest ddlRequest) { return this; } + public Builder setTraceId(String traceId) { + this.traceId = traceId; + return this; + } + public DdlRequestBatch build() { - return new DdlRequestBatch(new ArrayList<>(ddlRequestBlobs)); + return new DdlRequestBatch(new ArrayList<>(ddlRequestBlobs), traceId); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index 0ee4c1dd7f59..826f8ae3f020 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -225,6 +225,7 @@ private void processRecord(ConsumerRecord record) { .requestId("") .queueId(storeId) .snapshotId(snapshotId) + .traceId(operationBatch.getTraceId()) .offset(offset); for (OperationBlob operationBlob : operationBatch) { long partitionKey = operationBlob.getPartitionKey(); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 57f419af00c9..42e8341f925b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -19,6 +19,7 @@ import com.alibaba.graphscope.groot.common.config.StoreConfig; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.common.util.ThreadFactoryUtils; +import com.alibaba.graphscope.groot.common.util.Utils; import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.operation.OperationBatch; import com.alibaba.graphscope.groot.operation.StoreDataBatch; @@ -59,6 +60,7 @@ public class StoreService { private static final Logger logger = LoggerFactory.getLogger(StoreService.class); + private static final Logger metricLogger = LoggerFactory.getLogger("MetricLog"); private final Configs storeConfigs; private final int storeId; @@ -258,16 +260,21 @@ private Map writeStore( if (partition.writeBatch(snapshotId, batch)) { hasDdl.set(true); } + metricLogger.info( + buildMetricJsonLog(true, batch, start, partitionId)); attrs.put("success", true).put("message", ""); this.writeHistogram.record( System.currentTimeMillis() - start, attrs.build()); this.writeCounter.add(batch.getOperationCount(), attrs.build()); } } catch (Exception ex) { + metricLogger.info(buildMetricJsonLog(false, batch, start, partitionId)); logger.error( - "write to partition [{}] failed, snapshotId [{}].", + "write to partition [{}] failed, snapshotId [{}], traceId" + + " [{}].", partitionId, snapshotId, + batch.getTraceId(), ex); attrs.put("message", ex.getMessage()); String msg = "Not supported operation in secondary mode"; @@ -298,6 +305,22 @@ private Map writeStore( return batchNeedRetry; } + private String buildMetricJsonLog( + boolean succeed, OperationBatch operationBatch, long start, int partitionId) { + String traceId = operationBatch.getTraceId(); + long current = System.currentTimeMillis(); + int batchSize = operationBatch.getOperationCount(); + return Utils.buildMetricJsonLog( + succeed, + traceId, + batchSize, + partitionId, + (current - start), + current, + "writeDb", + "write"); + } + public GraphDefPb getGraphDefBlob() throws IOException { GraphPartition graphPartition = this.idToPartition.get(0); return graphPartition.getGraphDefBlob(); diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 3fcb5a421b86..6e6a43adc96d 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -17,6 +17,8 @@ package gs.rpc.groot; import "graph_def.proto"; import "schema_common.proto"; +import "request_option.proto"; + option java_package = "com.alibaba.graphscope.proto.groot"; option java_multiple_files = true; @@ -27,21 +29,22 @@ service GrootDdlService { } message BatchSubmitRequest { - int32 format_version = 1; - bool simple_response = 2; - message DDLRequest { - oneof value { - CreateVertexTypeRequest create_vertex_type_request = 1; - CreateEdgeTypeRequest create_edge_type_request = 2; - AddEdgeKindRequest add_edge_kind_request = 3; - RemoveEdgeKindRequest remove_edge_kind_request = 4; - DropVertexTypeRequest drop_vertex_type_request = 5; - DropEdgeTypeRequest drop_edge_type_request = 6; - AddVertexTypePropertiesRequest add_vertex_type_properties_request = 7; - AddEdgeTypePropertiesRequest add_edge_type_properties_request = 8; + int32 format_version = 1; + bool simple_response = 2; + message DDLRequest { + oneof value { + CreateVertexTypeRequest create_vertex_type_request = 1; + CreateEdgeTypeRequest create_edge_type_request = 2; + AddEdgeKindRequest add_edge_kind_request = 3; + RemoveEdgeKindRequest remove_edge_kind_request = 4; + DropVertexTypeRequest drop_vertex_type_request = 5; + DropEdgeTypeRequest drop_edge_type_request = 6; + AddVertexTypePropertiesRequest add_vertex_type_properties_request = 7; + AddEdgeTypePropertiesRequest add_edge_type_properties_request = 8; + } } - } - repeated DDLRequest value = 3; + repeated DDLRequest value = 3; + gs.rpc.groot.RequestOptionsPb request_options = 4; } message BatchSubmitResponse { diff --git a/proto/groot/sdk/model.proto b/proto/groot/sdk/model.proto index 7cb670496aea..81ddf100c245 100644 --- a/proto/groot/sdk/model.proto +++ b/proto/groot/sdk/model.proto @@ -31,6 +31,7 @@ message OperationPb { message OperationBatchPb { int64 latestSnapshotId = 1; repeated OperationPb operations = 2; + string traceId = 3; } message LogEntryPb { @@ -102,6 +103,7 @@ message DdlRequestPb { message DdlRequestBatchPb { repeated DdlRequestPb ddlRequests = 1; + string traceId = 2; } message DdlOperationPb { diff --git a/proto/groot/store_write_service.proto b/proto/groot/store_write_service.proto index 7586e3110325..bf9a43f5e145 100644 --- a/proto/groot/store_write_service.proto +++ b/proto/groot/store_write_service.proto @@ -31,6 +31,7 @@ message StoreDataBatchPb { int64 snapshotId = 3; int64 offset = 4; repeated PartitionToBatchPb dataBatch = 5; + string traceId = 6; } message WriteStoreRequest { diff --git a/proto/request_option.proto b/proto/request_option.proto new file mode 100644 index 000000000000..2f5d4a81db48 --- /dev/null +++ b/proto/request_option.proto @@ -0,0 +1,34 @@ +// Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; +package gs.rpc.groot; + +option java_package = "com.alibaba.graphscope.proto.groot"; +option java_multiple_files = true; + +message RequestOptionsPb { + map attributes = 1; + string traceId = 2; +} + +message AttributeValue { + oneof value { + string string_value = 1; + int32 int_value = 2; + int64 long_value = 3; + double double_value = 4; + } +} \ No newline at end of file diff --git a/proto/write_service.proto b/proto/write_service.proto index c1ad2013d3d4..7d5153113a0e 100644 --- a/proto/write_service.proto +++ b/proto/write_service.proto @@ -16,6 +16,8 @@ syntax = "proto3"; package gs.rpc.groot; +import "request_option.proto"; + option java_package = "com.alibaba.graphscope.proto.groot"; option java_multiple_files = true; @@ -45,6 +47,7 @@ message GetClientIdResponse { message BatchWriteRequest { string client_id = 1; repeated WriteRequestPb write_requests = 2; + gs.rpc.groot.RequestOptionsPb request_options = 3; } message BatchWriteResponse {