Skip to content

Commit

Permalink
fix: trace line 读取优化
Browse files Browse the repository at this point in the history
  • Loading branch information
Qinyouzeng committed Jul 12, 2024
1 parent db2131f commit 88bae9d
Show file tree
Hide file tree
Showing 16 changed files with 347 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ public class RequestTraceListDto : Pagination<RequestTraceListDto>

public int? LatMax { get; set; }

public bool HasPage { get; set; } = true;

public string SpanId { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ public static void Init(MasaStackClickhouseConnection connection, string suffix,
InitErrorTable(connection, suffix, logTable);
InitAppTable(connection, suffix, logTable, traceTable);
InitAggregateTable(connection);
//InitDurationTable(connection);
//InitDurationCountTable(connection);
//InitDurationCountTable(connection);
}

private static void InitErrorTable(MasaStackClickhouseConnection connection, string? suffix = null, string? appLogTable = null)
Expand Down Expand Up @@ -188,4 +191,142 @@ Order by (`Brand`,`Model`)
SETTINGS index_granularity = 8192";
ClickhouseInit.InitTable(connection, Constants.ModelsTable, sql);
}

private static void InitDurationTable(MasaStackClickhouseConnection connection)
{
var table = Constants.DurationTable;
var sql = $@"create table {Constants.DurationTable}(
Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)),
ServiceName String CODEC(ZSTD(1)),
`Resource.service.namespace` String CODEC(ZSTD(1)),
`Attributes.http.method` String CODEC(ZSTD(1)),
`Attributes.http.status_code` String CODEC(ZSTD(1)),
`Attributes.http.target` String CODEC(ZSTD(1)),
TraceId String CODEC(ZSTD(1)),
Duration Int64 CODEC(ZSTD(1))
)
ENGINE = MergeTree
ORDER BY (
Timestamp,
ServiceName,
TraceId,
Attributes.http.target,
Attributes.http.status_code,
Attributes.http.method,
Resource.service.namespace)
TTL toDateTime(Timestamp) + toIntervalDay(30)
SETTINGS index_granularity = 8192,
ttl_only_drop_parts = 1;";
var sqlView =
$@"CREATE MATERIALIZED VIEW {table.Replace(".", ".v_")} TO {table}
AS
select Timestamp,ServiceName,Resource.service.namespace,Attributes.http.method,Attributes.http.status_code,Attributes.http.target,TraceId,Duration
from
{MasaStackClickhouseConnection.TraceHttpServerTable}
";
ClickhouseInit.InitTable(connection, table, sql);
ClickhouseInit.InitTable(connection, table.Replace(".", ".v_"), sqlView);
}

private static void InitDurationCountTable(MasaStackClickhouseConnection connection)
{
var table = Constants.DurationCountTable1;
var sql = $@"create table {Constants.DurationCountTable1}(
Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)),
ServiceName String CODEC(ZSTD(1)),
`Resource.service.namespace` String CODEC(ZSTD(1)),
`Attributes.http.method` String CODEC(ZSTD(1)),
`Attributes.http.target` String CODEC(ZSTD(1)),
Duration Int64 CODEC(ZSTD(1)),
Total AggregateFunction(count,UInt8),
)
ENGINE = AggregatingMergeTree
--PARTITION BY toYYYYMM(Timestamp)
ORDER BY (
Timestamp,
ServiceName,
Attributes.http.target,
Attributes.http.method,
Resource.service.namespace)
TTL toDateTime(Timestamp) + toIntervalDay(30)
SETTINGS index_granularity = 8192";
var sqlView =
$@"CREATE MATERIALIZED VIEW {table.Replace(".", ".v_")} TO {table}
AS
SELECT
toStartOfInterval(Timestamp,toIntervalMinute(1)) AS Timestamp,
ServiceName,
`Resource.service.namespace`,
`Attributes.http.method`,
`Attributes.http.target`,
floor(Duration/1000000) as Duration,
countState(1) AS Total
FROM {Constants.DurationTable}
GROUP BY
ServiceName,
`Resource.service.namespace`,
`Attributes.http.target`,
`Attributes.http.method`,
Timestamp,
Duration";
ClickhouseInit.InitTable(connection, table, sql);
ClickhouseInit.InitTable(connection, table.Replace(".", ".v_"), sqlView);
}

private static void InitDurationCountTable1(MasaStackClickhouseConnection connection)
{
foreach (var item in Constants.DicDurationCountTable)
{
InitDurationCountTable(connection, item.Key, item.Value);
}
}

private static void InitDurationCountTable(MasaStackClickhouseConnection connection, string interval, string tableName)
{
var viewTable = tableName.Replace(".", ".v_");
var sql = new string[] {
$@"CREATE TABLE {tableName}
(
Timestamp DateTime64(9) CODEC(Delta(8), ZSTD(1)),
ServiceName String CODEC(ZSTD(1)),
`Resource.service.namespace` String CODEC(ZSTD(1)),
`Attributes.http.method` String CODEC(ZSTD(1)),
`Attributes.http.target` String CODEC(ZSTD(1)),
Duration Int64 CODEC(ZSTD(1)),
Total AggregateFunction(count,UInt8),
)
ENGINE = AggregatingMergeTree
--PARTITION BY toYYYYMM(Timestamp)
ORDER BY (
Timestamp,
ServiceName,
Attributes.http.target,
Attributes.http.method,
Resource.service.namespace)
TTL toDateTime(Timestamp) + toIntervalDay(30)
SETTINGS index_granularity = 8192;",
$@"CREATE MATERIALIZED VIEW {viewTable} TO {tableName}
AS
SELECT
toStartOfInterval(Timestamp,INTERVAL {interval}) AS Timestamp,
ServiceName,
ResourceAttributes['service.namespace'] AS `Resource.service.namespace`,
SpanAttributes['http.method'] AS `Attributes.http.method`,
SpanAttributes['http.target'] AS `Attributes.http.target`,
floor(Duration/1000000) as Duration,
count(1) AS Throughput
FROM {MasaStackClickhouseConnection.TraceSourceTable}
WHERE SpanKind = 'SPAN_KIND_SERVER'
GROUP BY
ServiceName,
`Resource.service.namespace`,
`Attributes.http.target`,
`Attributes.http.method`,
Timestamp,
Duration"
};

ClickhouseInit.InitTable(connection, tableName, sql[0]);
ClickhouseInit.InitTable(connection, viewTable, sql[1]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ public Task<EndpointLatencyDistributionDto> EndpointLatencyDistributionAsync(Apm
{
query.IsTrace = true;
var (where, ors, parameters) = AppendWhere(query);
var period = GetPeriod(query);
var tableName = Constants.GetAggregateTable(period);
var result = new EndpointLatencyDistributionDto();
var p95 = Convert.ToDouble(Scalar($"select floor(quantile(0.95)(Duration/{MILLSECOND})) p95 from {MasaStackClickhouseConnection.TraceHttpServerTable} where {where}", parameters));

var p95 = Convert.ToDouble(Scalar($"select floor(quantileMerge(P95)/{MILLSECOND}) p95 from {tableName} where {where}", parameters));
if (p95 is not double.NaN)
result.P95 = (long)Math.Floor(p95);

var sql = $@"select Duration/{MILLSECOND},count(1) total from {MasaStackClickhouseConnection.TraceHttpServerTable} where {where} group by Duration order by Duration";
var list = new List<ChartPointDto>();
lock (lockObj)
Expand Down Expand Up @@ -290,14 +294,25 @@ public async Task<PaginatedListBase<TraceResponseDto>> TraceLatencyDetailAsync(A

public Task<PaginatedListBase<SimpleTraceListDto>> GetSimpleTraceListAsync(ApmEndpointRequestDto query)
{
query.IsServer = default;
query.IsTrace = true;
//query.IsServer = default;
//query.IsTrace = true;
var (where, ors, parameters) = AppendWhere(query);
var orderBy = GetOrderBy(query, new());
//
PaginatedListBase<SimpleTraceListDto> result = new() { };
if (query.HasPage)
{
var sql1 = CombineOrs($@"select TraceId from {MasaStackClickhouseConnection.TraceHttpServerTable} where {where}", ors);
var countSql = $"select count(1) from(select TraceId from {sql1} group by TraceId)";
//var sql1 = CombineOrs($@"select countMerge(Total) as Total from {Constants.DurationCountTable1} where {where}", ors);
//var countSql = $"select sum(Total) from({sql1})";
result.Total = Convert.ToInt64(Scalar(countSql, parameters));
}
//Constants.DurationTable

var sql = CombineOrs($@"select TraceId,Duration,Timestamp from {MasaStackClickhouseConnection.TraceHttpServerTable} where {where}", ors);
var countSql = $"select count(1) from (select DISTINCT TraceId from {sql})";
sql = $"select DISTINCT TraceId,Duration,Timestamp from {sql} {orderBy} @limit";
PaginatedListBase<SimpleTraceListDto> result = new() { Total = Convert.ToInt64(Scalar(countSql, parameters)) };
sql = $"select TraceId,Duration,Timestamp from {sql} group by TraceId,Duration,Timestamp {orderBy} @limit";

SetData(sql, parameters, result, query, ToSampleTraceListDto);
return Task.FromResult(result);
}
Expand All @@ -308,7 +323,7 @@ private static SimpleTraceListDto ToSampleTraceListDto(IDataReader reader)
long ns = Convert.ToInt64(reader["Duration"]);
var result = new SimpleTraceListDto
{
TraceId = reader[StorageConstaaa.Current.TraceId].ToString(),
TraceId = reader[StorageConstaaa.Current.TraceId].ToString()!,
Timestamp = startTime,
EndTimestamp = startTime.AddMilliseconds(ns / 1e6),
};
Expand Down Expand Up @@ -485,42 +500,34 @@ public Task<IEnumerable<ChartLineDto>> ChartDataAsync(BaseApmRequestDto query)
query.IsTrace = true;
bool isInstrument = !string.IsNullOrEmpty(query.Queries);
query.IsMetric = !isInstrument;
var period = GetPeriod(query);
var tableName = Constants.GetAggregateTable(period);
var (where, ors, parameters) = AppendWhere(query);
bool isEndpoint = query is ApmEndpointRequestDto;
string groupAppend = isEndpoint ? ",`Attributes.http.target`,`Attributes.http.method`" : string.Empty;
string sql;
var period = GetPeriod(query);

var result = new List<ChartLineDto>();
if (isInstrument)
{
var minites = (long)(query.End - query.Start).TotalMinutes;
if (minites == 0) minites = 1;
sql = $@"select
`time` as Timestamp,
Latency,
Throughput,
Failed,
P99,
P95,
ServiceName{groupAppend}
from
(select
toStartOfInterval(`Timestamp` , INTERVAL {period} ) as `time`,
floor(avg(Duration/{MILLSECOND})) `Latency`,
round(count(1)*1.0/{minites},2) `Throughput`,
round(sum(has(['{string.Join("','", query.GetErrorStatusCodes())}'],`Attributes.http.status_code`))*100.0/if(count(1)=0,1,count(1)),2) `Failed`,
floor(quantile(0.99)(Duration/{MILLSECOND})) `P99`,
floor(quantile(0.95)(Duration/{MILLSECOND})) `P95`,
ServiceName{groupAppend}
from {MasaStackClickhouseConnection.TraceHttpServerTable} where {where}
group by ServiceName{groupAppend},`time`
order by ServiceName{groupAppend},`time`
) t
";
var sql2 = CombineOrs($"select DISTINCT ServiceName{groupAppend} from {MasaStackClickhouseConnection.TraceHttpServerTable} where {where}", ors);
sql = @$"select a.* from(select
Timestamp,
avgMerge(Latency) as Latency,
countMerge(Throughput) as Throughput,
SumMerge(Failed) as Failed,
floor(quantileMerge(P99)/{MILLSECOND}) as P99,
floor(quantileMerge(P95)/{MILLSECOND}) as P95,
ServiceName
{groupAppend}
from {tableName}
where {where}
group by ServiceName{groupAppend},Timestamp) a,
(select DISTINCT ServiceName{groupAppend} from ({sql2})) b where a.ServiceName=b.ServiceName{(isEndpoint ? " and a.Attributes.http.target=b.Attributes.http.target and a.Attributes.http.method=b.Attributes.http.method" : "")}
order by ServiceName{groupAppend},Timestamp";
}
else
{
var tableName = Constants.GetAggregateTable(period);
sql = $@"select
Timestamp,
floor(Latency1/{MILLSECOND}) as Latency,
Expand Down Expand Up @@ -707,6 +714,7 @@ private static void AppendDuration(ApmTraceLatencyRequestDto? query, StringBuild

private IDataReader Query(string sql, IEnumerable<ClickHouseParameter> parameters)
{
var start = DateTime.Now;
try
{
command.CommandText = sql;
Expand All @@ -718,6 +726,13 @@ private IDataReader Query(string sql, IEnumerable<ClickHouseParameter> parameter
_logger.LogError(ex, "execute sql error:{Sqlraw}", sql);
throw;
}
finally
{
var end = DateTime.Now;
var duration = (end - start).TotalSeconds;
if (duration - 3 > 0)
_logger.LogWarning("Clickhouse query slow {Duration}s, rawSql:{Rawsql}, parameters:{Paramters}", duration, sql, parameters);
}
}

private object Scalar(string sql, IEnumerable<ClickHouseParameter> parameters)
Expand All @@ -726,7 +741,18 @@ private object Scalar(string sql, IEnumerable<ClickHouseParameter> parameters)
{
command.CommandText = sql;
SetParameters(parameters);
return command.ExecuteScalar()!;
var start = DateTime.Now;
try
{
return command.ExecuteScalar()!;
}
finally
{
var end = DateTime.Now;
var duration = (end - start).TotalSeconds;
if (duration - 3 > 0)
_logger.LogWarning("Clickhouse query slow {Duration}s, rawSql:{Rawsql}, parameters:{Paramters}", duration, sql, parameters);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ internal static class Constants
public static string ErrorTable { get; private set; }

private const string AggregateTable = "otel_trace_metrics@interval";
private const string DurationCountTable = "otel_traces_spans_duration@intervalcount_aaaaaaaaaaaaaa";

public static string DurationTable { get; private set; }

public static string DurationCountTable1 { get; private set; }

public static string ModelsTable { get; private set; }

Expand All @@ -18,26 +23,42 @@ private static string GetAggregateTable(string interval, string suffix)
return AggregateTable.Replace("@interval", interval.Replace(" ", $"_")) + "_" + suffix;
}

private static string GetDurationCountTable(string interval, string suffix)
{
if (string.IsNullOrEmpty(interval))
interval = INTERVALS[0];
return DurationCountTable.Replace("@interval", interval.Replace(" ", $"_")) + "_" + suffix;
}

public static string GetAggregateTable(string interval)
{
return DicAggregateTable.TryGetValue(interval, out var table) ? table : default!;
}

public static string GetDurationCountTable(string interval)
{
return DicDurationCountTable.TryGetValue(interval, out var table) ? table : default!;
}

public static readonly int[] DefaultErrorStatus = new int[] { 400, 500, 501, 502, 503, 504, 505 };

public static readonly string[] INTERVALS = new string[] { "1 minute", "30 minute", "1 hour", "1 day", "1 week", "1 month" };

public static readonly Dictionary<string, string> DicAggregateTable = new();
public static readonly Dictionary<string, string> DicDurationCountTable = new();

public static void Init(string database, string suffix)
{
if (!string.IsNullOrEmpty(database))
database = $"{database}.";
ErrorTable = $"{database}otel_errors_{suffix}";
ModelsTable = $"{database}tsc_phone_models_{suffix}";
DurationTable = $"{database}otel_traces_spans_duration_{suffix}";
DurationCountTable1 = $"{database}otel_traces_spans_duration_count_{suffix}_aaaaaaaaaaaaaa";
ModelsTable = $"{database}tsc_phone_models_{suffix}_aaaaaaaaaaaaaa";
foreach (var key in INTERVALS)
{
DicAggregateTable.Add(key, database + GetAggregateTable(key, suffix));
DicDurationCountTable.Add(key, database + GetDurationCountTable(key, suffix));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class BaseApmRequestDto : RequestPageBase

public string StatusCodes { get; set; }

public bool HasPage { get; set; } = true;

internal int[] GetErrorStatusCodes() => string.IsNullOrEmpty(StatusCodes) ? Constants.DefaultErrorStatus : StatusCodes.Split(',').Select(s => Convert.ToInt32(s)).Where(num => num != 0).ToArray();

internal bool? IsServer { get; set; } = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Masa.Tsc.Storage.Clickhouse.Extensions;

public static class ClickhouseInit
{
private static ILogger Logger { get; set; }
internal static ILogger Logger { get; set; }

internal static MasaStackClickhouseConnection Connection { get; private set; }

Expand Down
Loading

0 comments on commit 88bae9d

Please sign in to comment.