Skip to content

Commit

Permalink
fix(interactive): Standardized log for sls metric (#4073)
Browse files Browse the repository at this point in the history
- support pass upstream traceId to engine,log trace id to info and metric log
- print critical infos in json format which allows log framework like SLS to parse.
  • Loading branch information
bufapiqi authored Jul 23, 2024
1 parent c87d388 commit 1de43e9
Show file tree
Hide file tree
Showing 32 changed files with 988 additions and 69 deletions.
1 change: 1 addition & 0 deletions interactive_engine/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
<include>schema_common.proto</include>
<include>ddl_service.proto</include>
<include>write_service.proto</include>
<include>request_option.proto</include>
</includes>
</configuration>
<goals>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.alibaba.graphscope.groot.common.constant;

public class LogConstant {

public static String TRACE_ID = "traceId";

public static String UPSTREAM_ID = "upstreamId";

public static String QUERY_ID = "queryId";

/**
* 具体查询语句
*/
public static String QUERY = "query";

public static String SUCCESS = "success";

public static String ERROR_MESSAGE = "errorMessage";

public static String STACK_TRACE = "stackTrace";

/**
* 查询计划
*/
public static String IR_PLAN = "irPlan";

/**
* 打印日志的阶段
* query: java/rust
* write: writeKafka/consumeKafka/writeDb
*/
public static String STAGE = "stage";

public static String RESULT = "result";

public static String COST = "cost";

public static String START_TIME = "startMillis";

public static String END_TIME = "endMillis";

/**
* 日志类型: query/write
*/
public static String LOG_TYPE = "logType";

public static String BATCH_SIZE = "batchSize";

public static String PARTITION_ID = "partitionId";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.alibaba.graphscope.groot.common.util;

import com.alibaba.graphscope.groot.common.constant.LogConstant;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Utils {

private static final ObjectMapper jsonMapper = new ObjectMapper();
private static final Logger defaultLogger = LoggerFactory.getLogger(Utils.class);

/**
* build metric json log for monitor
* plz dont delete any field
* could add new field
* @param isSuccess
* @param traceId
* @param batchSize
* @param partitionId
* @param cost
* @param endTime
* @param stage
* @param logType
* @return
*/
public static String buildMetricJsonLog(
boolean isSuccess,
String traceId,
Integer batchSize,
Integer partitionId,
long cost,
Long endTime,
String stage,
String logType) {
String jsonLog = "";
ObjectNode metricJsonLog = jsonMapper.createObjectNode();
metricJsonLog.put(LogConstant.TRACE_ID, traceId);
metricJsonLog.put(LogConstant.SUCCESS, isSuccess);
if (batchSize != null) {
metricJsonLog.put(LogConstant.BATCH_SIZE, batchSize);
}
if (partitionId != null) {
metricJsonLog.put(LogConstant.PARTITION_ID, partitionId);
}
if (endTime != null) {
metricJsonLog.put(LogConstant.END_TIME, endTime);
}
metricJsonLog.put(LogConstant.COST, cost);
if (stage != null) {
metricJsonLog.put(LogConstant.STAGE, stage);
}
metricJsonLog.put(LogConstant.LOG_TYPE, logType);
try {
jsonLog = jsonMapper.writeValueAsString(metricJsonLog);
} catch (Exception e) {
defaultLogger.error("JsonProcessingException!", e);
}
return jsonLog;
}
}
4 changes: 4 additions & 0 deletions interactive_engine/compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<!-- Not managed by opentelemetry-bom -->
<groupId>io.opentelemetry.semconv</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package com.alibaba.graphscope.gremlin.plugin;

import com.alibaba.graphscope.groot.common.constant.LogConstant;
import com.alibaba.graphscope.groot.common.util.Utils;
import com.google.gson.JsonObject;

import io.opentelemetry.api.trace.Span;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,9 +34,25 @@ public class QueryLogger {
private final String query;
private final BigInteger queryId;

/**
* 上游带下来的traceId
*/
private final String upstreamId;

private String irPlan;

public QueryLogger(String query, BigInteger queryId) {
this.query = query;
this.queryId = queryId;
this.irPlan = null;
this.upstreamId = null;
}

public QueryLogger(String query, BigInteger queryId, String upstreamId) {
this.query = query;
this.queryId = queryId;
this.upstreamId = upstreamId;
this.irPlan = null;
}

public void debug(String format, Object... args) {
Expand All @@ -49,13 +71,61 @@ public void error(String format, Object... args) {
defaultLogger.error(this + " : " + format, args);
}

public void error(Throwable throwable) {
JsonObject errorJson = new JsonObject();
String traceId = Span.current().getSpanContext().getTraceId();
if (this.upstreamId != null) {
errorJson.addProperty(LogConstant.UPSTREAM_ID, this.upstreamId);
}
errorJson.addProperty(LogConstant.TRACE_ID, traceId);
errorJson.addProperty(LogConstant.SUCCESS, false);
errorJson.addProperty(LogConstant.STAGE, "java");
errorJson.addProperty(LogConstant.LOG_TYPE, "query");
errorJson.addProperty(LogConstant.ERROR_MESSAGE, throwable.getMessage());
defaultLogger.error(errorJson.toString(), throwable);
}

public void print(String message, boolean success, Throwable t) {
if (success) {
defaultLogger.info(message);
} else {
defaultLogger.error(message, t);
}
}

public void metricsInfo(String format, Object... args) {
metricLogger.info(queryId + " | " + query + " | " + format, args);
}

public void metricsInfo(boolean isSucceed, long cost) {
String traceId = Span.current().getSpanContext().getTraceId();
String metricJson =
Utils.buildMetricJsonLog(
isSucceed,
traceId,
null,
null,
cost,
System.currentTimeMillis(),
"java",
"query");
metricLogger.info(metricJson);
}

@Override
public String toString() {
return "[" + "query='" + query + '\'' + ", queryId=" + queryId + ']';
StringBuilder str = new StringBuilder();
str.append("[");
if (this.upstreamId != null) {
str.append("upstreamId=").append(this.upstreamId).append(", ");
}
str.append("query='")
.append(this.query)
.append("'")
.append(", queryId=")
.append(this.queryId)
.append("]");
return str.toString();
}

public String getQuery() {
Expand All @@ -65,4 +135,16 @@ public String getQuery() {
public BigInteger getQueryId() {
return queryId;
}

public void setIrPlan(String irPlan) {
this.irPlan = irPlan;
}

public String getUpstreamId() {
return upstreamId;
}

public String getIrPlan() {
return irPlan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,113 @@

package com.alibaba.graphscope.gremlin.plugin;

import static io.opentelemetry.api.common.AttributeKey.*;
import com.alibaba.graphscope.groot.common.constant.LogConstant;
import com.alibaba.graphscope.groot.common.util.JSON;
import com.google.gson.JsonObject;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.trace.Span;

import org.apache.commons.lang3.StringUtils;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.List;

public class QueryStatusCallback {
private final MetricsCollector metricsCollector;
private final QueryLogger queryLogger;

private LongHistogram queryHistogram;
// if query cost large than threshold, will print detail log
private long printThreshold;

public QueryStatusCallback(
MetricsCollector metricsCollector, LongHistogram histogram, QueryLogger queryLogger) {
MetricsCollector metricsCollector,
LongHistogram histogram,
QueryLogger queryLogger,
long printThreshold) {
this.metricsCollector = metricsCollector;
this.queryLogger = queryLogger;
this.queryHistogram = histogram;
this.printThreshold = printThreshold;
}

public void onStart() {}

public void onEnd(boolean isSucceed, @Nullable String msg) {
public void onErrorEnd(@Nullable String msg) {
this.metricsCollector.stop();
onErrorEnd(null, msg);
}

public void onErrorEnd(@Nullable Throwable t) {
this.metricsCollector.stop();
if (isSucceed) {
queryLogger.info("total execution time is {} ms", metricsCollector.getElapsedMillis());
onErrorEnd(t, null);
}

private void onErrorEnd(Throwable t, String msg) {
String errorMsg = msg;
if (t != null) {
errorMsg = t.getMessage();
}
JsonObject logJson = buildSimpleLog(false, metricsCollector.getElapsedMillis());
fillLogDetail(logJson, errorMsg, metricsCollector.getStartMillis(), null);
queryLogger.print(logJson.toString(), false, t);

Attributes attrs =
Attributes.builder()
.put("id", queryLogger.getQueryId().toString())
.put("query", queryLogger.getQuery())
.put("success", isSucceed)
.put("success", false)
.put("message", msg != null ? msg : "")
.build();
this.queryHistogram.record(metricsCollector.getElapsedMillis(), attrs);
queryLogger.metricsInfo(false, metricsCollector.getElapsedMillis());
}

public void onSuccessEnd(List<Object> results) {
this.metricsCollector.stop();
JsonObject logJson = buildSimpleLog(true, metricsCollector.getElapsedMillis());
if (this.metricsCollector.getElapsedMillis() > this.printThreshold) {
fillLogDetail(logJson, null, metricsCollector.getStartMillis(), results);
}
queryLogger.print(logJson.toString(), true, null);

queryLogger.metricsInfo(
"{} | {} | {} | {}",
isSucceed,
metricsCollector.getElapsedMillis(),
metricsCollector.getStartMillis(),
msg != null ? msg : StringUtils.EMPTY);
Attributes attrs =
Attributes.builder()
.put("id", queryLogger.getQueryId().toString())
.put("query", queryLogger.getQuery())
.put("success", true)
.put("message", "")
.build();
this.queryHistogram.record(metricsCollector.getElapsedMillis(), attrs);
queryLogger.metricsInfo(true, metricsCollector.getElapsedMillis());
}

private JsonObject buildSimpleLog(boolean isSucceed, long elaspedMillis) {
String traceId = Span.current().getSpanContext().getTraceId();
JsonObject simpleJson = new JsonObject();
simpleJson.addProperty(LogConstant.TRACE_ID, traceId);
simpleJson.addProperty(LogConstant.QUERY_ID, queryLogger.getQueryId());
simpleJson.addProperty(LogConstant.SUCCESS, isSucceed);
if (queryLogger.getUpstreamId() != null) {
simpleJson.addProperty(LogConstant.UPSTREAM_ID, queryLogger.getUpstreamId());
}
simpleJson.addProperty(LogConstant.COST, elaspedMillis);
return simpleJson;
}

private void fillLogDetail(
JsonObject logJson, String errorMessage, long startMillis, List<Object> results) {
logJson.addProperty(LogConstant.QUERY, queryLogger.getQuery());
if (results != null) {
logJson.addProperty(LogConstant.RESULT, JSON.toJson(results));
}
if (errorMessage != null) {
logJson.addProperty(LogConstant.ERROR_MESSAGE, errorMessage);
}
logJson.addProperty(LogConstant.IR_PLAN, queryLogger.getIrPlan());
logJson.addProperty(LogConstant.STAGE, "java");
logJson.addProperty(LogConstant.START_TIME, startMillis);
}

public QueryLogger getQueryLogger() {
Expand Down
Loading

0 comments on commit 1de43e9

Please sign in to comment.