Skip to content

Commit

Permalink
优化sql语句执行埋点
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Apr 2, 2024
1 parent 072c697 commit aa53ede
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 60 deletions.
112 changes: 63 additions & 49 deletions XCode/DataAccessLayer/DAL_DbOperate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ public Int32 SelectCount(String sql, CommandType type, params IDataParameter[] p
/// <returns></returns>
public Int32 Execute(String sql)
{
return ExecuteByCache(sql, "", "", (s, t, p) => Session.Execute(s));
return ExecuteByCache(sql, "", "", (s, t, p) => Session.Execute(s), nameof(Execute));
}

/// <summary>执行插入语句并返回新增行的自动编号</summary>
/// <param name="sql"></param>
/// <returns>新增行的自动编号</returns>
public Int64 InsertAndGetIdentity(String sql)
{
return ExecuteByCache(sql, "", "", (s, t, p) => Session.InsertAndGetIdentity(s));
return ExecuteByCache(sql, "", "", (s, t, p) => Session.InsertAndGetIdentity(s), nameof(InsertAndGetIdentity));
}

/// <summary>执行SQL查询,返回记录集</summary>
Expand All @@ -146,7 +146,7 @@ public DataSet Select(String sql, CommandType type, params IDataParameter[] ps)
/// <returns></returns>
public Int32 Execute(String sql, CommandType type, params IDataParameter[] ps)
{
return ExecuteByCache(sql, type, ps, (s, t, p) => Session.Execute(s, t, p));
return ExecuteByCache(sql, type, ps, (s, t, p) => Session.Execute(s, t, p), nameof(Execute));
}

/// <summary>执行插入语句并返回新增行的自动编号</summary>
Expand All @@ -156,7 +156,7 @@ public Int32 Execute(String sql, CommandType type, params IDataParameter[] ps)
/// <returns>新增行的自动编号</returns>
public Int64 InsertAndGetIdentity(String sql, CommandType type, params IDataParameter[] ps)
{
return ExecuteByCache(sql, type, ps, (s, t, p) => Session.InsertAndGetIdentity(s, t, p));
return ExecuteByCache(sql, type, ps, (s, t, p) => Session.InsertAndGetIdentity(s, t, p), nameof(InsertAndGetIdentity));
}

/// <summary>执行SQL查询,返回记录集</summary>
Expand All @@ -176,7 +176,7 @@ public DataSet Select(String sql, CommandType type, IDictionary<String, Object>
/// <returns></returns>
public Int32 Execute(String sql, CommandType type, IDictionary<String, Object> ps)
{
return ExecuteByCache(sql, type, ps, (s, t, p) => Session.Execute(s, t, Db.CreateParameters(p)));
return ExecuteByCache(sql, type, ps, (s, t, p) => Session.Execute(s, t, Db.CreateParameters(p)), nameof(Execute));
}

/// <summary>执行SQL语句,返回受影响的行数</summary>
Expand All @@ -190,7 +190,7 @@ public Int32 Execute(String sql, Int32 commandTimeout)
using var cmd = Session.CreateCommand(s);
if (t > 0) cmd.CommandTimeout = t;
return Session.Execute(cmd);
});
}, nameof(Execute));
}

/// <summary>执行SQL语句,返回结果中的第一行第一列</summary>
Expand All @@ -201,7 +201,7 @@ public Int32 Execute(String sql, Int32 commandTimeout)
/// <returns></returns>
public T ExecuteScalar<T>(String sql, CommandType type, IDictionary<String, Object> ps)
{
return ExecuteByCache(sql, type, ps, (s, t, p) => Session.ExecuteScalar<T>(s, t, Db.CreateParameters(p)));
return ExecuteByCache(sql, type, ps, (s, t, p) => Session.ExecuteScalar<T>(s, t, Db.CreateParameters(p)), nameof(ExecuteScalar));
}
#endregion

Expand Down Expand Up @@ -252,15 +252,15 @@ public Task<Int64> SelectCountAsync(String sql, CommandType type, params IDataPa
/// <returns></returns>
public Task<Int32> ExecuteAsync(String sql)
{
return ExecuteByCacheAsync(sql, "", "", (s, t, p) => AsyncSession.ExecuteAsync(s));
return ExecuteByCacheAsync(sql, "", "", (s, t, p) => AsyncSession.ExecuteAsync(s), nameof(ExecuteAsync));
}

/// <summary>执行插入语句并返回新增行的自动编号</summary>
/// <param name="sql"></param>
/// <returns>新增行的自动编号</returns>
public Task<Int64> InsertAndGetIdentityAsync(String sql)
{
return ExecuteByCacheAsync(sql, "", "", (s, t, p) => AsyncSession.InsertAndGetIdentityAsync(s));
return ExecuteByCacheAsync(sql, "", "", (s, t, p) => AsyncSession.InsertAndGetIdentityAsync(s), nameof(InsertAndGetIdentityAsync));
}

/// <summary>执行SQL语句,返回受影响的行数</summary>
Expand All @@ -270,7 +270,7 @@ public Task<Int64> InsertAndGetIdentityAsync(String sql)
/// <returns></returns>
public Task<Int32> ExecuteAsync(String sql, CommandType type, params IDataParameter[]? ps)
{
return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => AsyncSession.ExecuteAsync(s, t, p));
return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => AsyncSession.ExecuteAsync(s, t, p), nameof(ExecuteAsync));
}

/// <summary>执行插入语句并返回新增行的自动编号</summary>
Expand All @@ -280,7 +280,7 @@ public Task<Int32> ExecuteAsync(String sql, CommandType type, params IDataParame
/// <returns>新增行的自动编号</returns>
public Task<Int64> InsertAndGetIdentityAsync(String sql, CommandType type, params IDataParameter[]? ps)
{
return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => AsyncSession.InsertAndGetIdentityAsync(s, t, p));
return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => AsyncSession.InsertAndGetIdentityAsync(s, t, p), nameof(InsertAndGetIdentityAsync));
}

/// <summary>执行SQL语句,返回受影响的行数</summary>
Expand All @@ -290,7 +290,7 @@ public Task<Int64> InsertAndGetIdentityAsync(String sql, CommandType type, param
/// <returns></returns>
public Task<Int32> ExecuteAsync(String sql, CommandType type, IDictionary<String, Object> ps)
{
return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => AsyncSession.ExecuteAsync(s, t, Db.CreateParameters(p)));
return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => AsyncSession.ExecuteAsync(s, t, Db.CreateParameters(p)), nameof(ExecuteAsync));
}

/// <summary>执行SQL语句,返回受影响的行数</summary>
Expand All @@ -304,7 +304,7 @@ public Task<Int32> ExecuteAsync(String sql, Int32 commandTimeout)
using var cmd = Session.CreateCommand(s);
if (t > 0) cmd.CommandTimeout = t;
return AsyncSession.ExecuteAsync(cmd);
});
}, nameof(ExecuteAsync));
}

/// <summary>执行SQL语句,返回结果中的第一行第一列</summary>
Expand All @@ -315,7 +315,7 @@ public Task<Int32> ExecuteAsync(String sql, Int32 commandTimeout)
/// <returns></returns>
public Task<T> ExecuteScalarAsync<T>(String sql, CommandType type, IDictionary<String, Object> ps)
{
return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => AsyncSession.ExecuteScalarAsync<T>(s, t, Db.CreateParameters(p)));
return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => AsyncSession.ExecuteScalarAsync<T>(s, t, Db.CreateParameters(p)), nameof(ExecuteScalarAsync));
}
#endregion

Expand Down Expand Up @@ -427,13 +427,13 @@ private TResult QueryByCache<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1,
return rs;
}

private TResult ExecuteByCache<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, TResult> callback)
private TResult ExecuteByCache<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, TResult> callback, String action)
{
if (Db.Readonly) throw new InvalidOperationException($"数据连接[{ConnName}]只读,禁止执行{k1}");

//CheckDatabase();

var rs = Invoke(k1, k2, k3, callback, "Execute");
var rs = Invoke(k1, k2, k3, callback, action);

GetCache()?.Clear();

Expand All @@ -452,31 +452,15 @@ private TResult Invoke<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3
if (tracer != null)
{
sql = (k1 + "").Trim();
if (action == "Execute")
{
// 使用 Insert/Update/Delete 作为埋点操作名
var p = sql.IndexOf(' ');
if (p > 0) action = sql[..p];
}
else if (action.EqualIgnoreCase("Query", "Select"))
{
// 查询数据时,Group作为独立埋点操作名
if (sql.ToLower().Contains("group by"))
action = "Group";
}

traceName = $"db:{ConnName}:{action}";

var tables = GetTables(sql, true);
if (tables.Length > 0) traceName += ":" + tables.Join("-");
traceName = GetTraceName(sql, action);
}

// 使用k1参数作为tag,一般是sql
using var span = tracer?.NewSpan(traceName, sql);
try
{
var rs = callback(k1, k2, k3);
AppendTag(span, sql, rs);
AppendTag(span, sql, rs, action);

return rs;
}
Expand All @@ -487,7 +471,35 @@ private TResult Invoke<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3
}
}

private void AppendTag(ISpan? span, String sql, Object? rs)
private String GetTraceName(String sql, String action)
{
var act = (action + "").TrimEnd("Async");
if (act.EqualIgnoreCase("InsertAndGetIdentity"))
{
act = "Insert";
}
else if (act.EqualIgnoreCase("Execute", "ExecuteScalar"))
{
// 使用 Insert/Update/Delete 作为埋点操作名
var p = sql.IndexOf(' ');
if (p > 0) act = sql[..p];
}
else if (act.EqualIgnoreCase("Query", "Select"))
{
// 查询数据时,Group作为独立埋点操作名
if (sql.ToLower().Contains("group by"))
act = "Group";
}

var traceName = $"db:{ConnName}:{act}";

var tables = GetTables(sql, true);
if (tables.Length > 0) traceName += ":" + tables.Join("-");

return traceName;
}

private void AppendTag(ISpan? span, String sql, Object? rs, String action)
{
if (span == null) return;

Expand All @@ -514,6 +526,12 @@ private void AppendTag(ISpan? span, String sql, Object? rs)
else
span.Tag = $"{sql} [rows={rows}]";
}
else if (action == nameof(InsertAndGetIdentity) || action == nameof(InsertAndGetIdentityAsync))
{
if (rs.ToInt() > 0) span.Value = 1;

span.Tag = $"{sql} [id={rs}]";
}
else
{
// 数值和标签都记录结果,大概率是受影响行数
Expand Down Expand Up @@ -563,13 +581,13 @@ private async Task<TResult> QueryByCacheAsync<T1, T2, T3, TResult>(T1 k1, T2 k2,
return rs;
}

private async Task<TResult> ExecuteByCacheAsync<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, Task<TResult>> callback)
private async Task<TResult> ExecuteByCacheAsync<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, Task<TResult>> callback, String action)
{
if (Db.Readonly) throw new InvalidOperationException($"数据连接[{ConnName}]只读,禁止执行{k1}");

//CheckDatabase();

var rs = await InvokeAsync(k1, k2, k3, callback, "Execute");
var rs = await InvokeAsync(k1, k2, k3, callback, action);

GetCache()?.Clear();

Expand All @@ -588,25 +606,15 @@ private async Task<TResult> InvokeAsync<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3
if (tracer != null)
{
sql = (k1 + "").Trim();
if (action == "Execute")
{
// 使用 Insert/Update/Delete 作为埋点操作名
var p = sql.IndexOf(' ');
if (p > 0) action = sql[..p];
}

traceName = $"db:{ConnName}:{action}";

var tables = GetTables(sql, true);
if (tables.Length > 0) traceName += ":" + tables.Join("-");
traceName = GetTraceName(sql, action);
}

// 使用k1参数作为tag,一般是sql
using var span = tracer?.NewSpan(traceName, sql);
try
{
var rs = await callback(k1, k2, k3);
AppendTag(span, sql, rs);
AppendTag(span, sql, rs, action);

return rs;
}
Expand Down Expand Up @@ -698,6 +706,9 @@ public Boolean SuspendReadOnly(Int32 delayTime = 0)
{
if (_reads == null) return false;

var tracer = Tracer ?? GlobalTracer;
using var span = tracer?.NewSpan($"db:{ConnName}:SuspendReadOnly", "delayTime=" + delayTime);

_bakReads = _reads;
_reads = null;

Expand All @@ -716,6 +727,9 @@ public Boolean ResumeReadOnly()
{
if (_bakReads == null) return false;

var tracer = Tracer ?? GlobalTracer;
using var span = tracer?.NewSpan($"db:{ConnName}:ResumeReadOnly");

_reads = _bakReads;
_bakReads = null;

Expand Down
21 changes: 10 additions & 11 deletions XCode/DataAccessLayer/DAL_Mapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ private static Boolean IsValueTuple(Type type)
/// <param name="param">参数对象</param>
/// <returns></returns>
public Int32 Execute(String sql, Object? param = null) =>
//var ps = param?.ToDictionary();
ExecuteByCache(sql, "", param, (s, t, p) => Session.Execute(s, CommandType.Text, Db.CreateParameters(p)));
ExecuteByCache(sql, "", param, (s, t, p) => Session.Execute(s, CommandType.Text, Db.CreateParameters(p)), nameof(Execute));

/// <summary>执行Sql并返回数据读取器</summary>
/// <param name="sql"></param>
Expand Down Expand Up @@ -181,7 +180,7 @@ public T ExecuteScalar<T>(String sql, Object? param = null) =>
/// <param name="param">参数对象</param>
/// <returns></returns>
public Task<Int32> ExecuteAsync(String sql, Object? param = null) =>
ExecuteByCacheAsync(sql, "", param, (s, t, p) => AsyncSession.ExecuteAsync(s, CommandType.Text, Db.CreateParameters(p)));
ExecuteByCacheAsync(sql, "", param, (s, t, p) => AsyncSession.ExecuteAsync(s, CommandType.Text, Db.CreateParameters(p)), nameof(ExecuteAsync));

/// <summary>执行Sql并返回数据读取器</summary>
/// <param name="sql"></param>
Expand Down Expand Up @@ -249,7 +248,7 @@ public Int32 Insert(Object data, String? tableName = null)
var vs = dps.Join(",", e => e.ParameterName);
var sql = $"Insert Into {tableName}({ns}) Values({vs})";

return ExecuteByCache(sql, "", dps, (s, t, p) => Session.Execute(s, CommandType.Text, p));
return ExecuteByCache(sql, "", dps, (s, t, p) => Session.Execute(s, CommandType.Text, p), nameof(Insert));
}

/// <summary>插入数据表。多行数据循环插入,非批量</summary>
Expand Down Expand Up @@ -284,7 +283,7 @@ public Int32 Insert(IModel data, IDataTable table, IDataColumn[]? columns = null
var sql = builder.GetSql(Db, table, columns, data);
if (sql.IsNullOrEmpty()) return 0;

return ExecuteByCache(sql, "", builder.Parameters, (s, t, p) => Session.Execute(s, CommandType.Text, p));
return ExecuteByCache(sql, "", builder.Parameters, (s, t, p) => Session.Execute(s, CommandType.Text, p), nameof(Insert));
}

/// <summary>更新数据。不支持自动识别主键</summary>
Expand Down Expand Up @@ -374,7 +373,7 @@ public Int32 Update(Object data, Object where, String? tableName = null)

var sql = sb.Put(true);

return ExecuteByCache(sql, "", dps.ToArray(), (s, t, p) => Session.Execute(s, CommandType.Text, p));
return ExecuteByCache(sql, "", dps.ToArray(), (s, t, p) => Session.Execute(s, CommandType.Text, p), nameof(Update));
}
/// <summary>更细数据。无实体</summary>
/// <param name="data">实体对象</param>
Expand Down Expand Up @@ -426,7 +425,7 @@ public Int32 Update(IModel data, IDataTable table, IDataColumn[] columns, IColle
dps.Add(Db.CreateParameter(dc.Name, val, dc));
}

return ExecuteByCache(sql, "", dps.ToArray(), (s, t, p) => Session.Execute(s, CommandType.Text, p));
return ExecuteByCache(sql, "", dps.ToArray(), (s, t, p) => Session.Execute(s, CommandType.Text, p), nameof(Update));
}

private String GetUpdateSql(IDataTable table, IDataColumn[] columns, ICollection<String> updateColumns, ICollection<String> addColumns, ICollection<String> ps)
Expand Down Expand Up @@ -506,7 +505,7 @@ public Int32 Delete(String tableName, Object where)
}
var sql = sb.Put(true);

return ExecuteByCache(sql, "", dps.ToArray(), (s, t, p) => Session.Execute(s, CommandType.Text, p));
return ExecuteByCache(sql, "", dps.ToArray(), (s, t, p) => Session.Execute(s, CommandType.Text, p), nameof(Delete));
}

/// <summary>插入数据</summary>
Expand All @@ -524,7 +523,7 @@ public Task<Int32> InsertAsync(Object data, String? tableName = null)
var vs = dps.Join(",", e => e.ParameterName);
var sql = $"Insert Into {tableName}({ns}) Values({vs})";

return ExecuteByCacheAsync(sql, "", dps, (s, t, p) => AsyncSession.ExecuteAsync(s, CommandType.Text, p));
return ExecuteByCacheAsync(sql, "", dps, (s, t, p) => AsyncSession.ExecuteAsync(s, CommandType.Text, p), nameof(InsertAsync));
}

/// <summary>更新数据。不支持自动识别主键</summary>
Expand Down Expand Up @@ -586,7 +585,7 @@ public Task<Int32> UpdateAsync(Object data, Object where, String? tableName = nu

var sql = sb.Put(true);

return ExecuteByCacheAsync(sql, "", dps.ToArray(), (s, t, p) => AsyncSession.ExecuteAsync(s, CommandType.Text, p));
return ExecuteByCacheAsync(sql, "", dps.ToArray(), (s, t, p) => AsyncSession.ExecuteAsync(s, CommandType.Text, p), nameof(UpdateAsync));
}

/// <summary>删除数据</summary>
Expand Down Expand Up @@ -618,7 +617,7 @@ public Task<Int32> DeleteAsync(String tableName, Object where)
}
var sql = sb.Put(true);

return ExecuteByCacheAsync(sql, "", dps.ToArray(), (s, t, p) => AsyncSession.ExecuteAsync(s, CommandType.Text, p));
return ExecuteByCacheAsync(sql, "", dps.ToArray(), (s, t, p) => AsyncSession.ExecuteAsync(s, CommandType.Text, p), nameof(DeleteAsync));
}

/// <summary>插入数据</summary>
Expand Down

0 comments on commit aa53ede

Please sign in to comment.