Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix AsyncQueueSegmentDispatcher performance issue #585

Merged
merged 1 commit into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/SkyApm.Abstractions/Common/Tags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ namespace SkyApm.Common
public static class Tags
{
public static readonly string URL = "url";

public static readonly string PATH = "path";

public static readonly string PATH = "path";

public static readonly string HTTP_METHOD = "http.method";

Expand All @@ -40,9 +39,9 @@ public static class Tags
public static readonly string DB_TYPE = "db.type";

public static readonly string DB_INSTANCE = "db.instance";

public static readonly string DB_STATEMENT = "db.statement";

public static readonly string DB_BIND_VARIABLES = "db.bind_vars";

public static readonly string MQ_TOPIC = "mq.topic";
Expand All @@ -59,4 +58,4 @@ public static class Tags

public static readonly string CACHE_CMD = "cache.cmd";
}
}
}
29 changes: 18 additions & 11 deletions src/SkyApm.Abstractions/Config/TransportConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,33 @@

namespace SkyApm.Config
{
public static class ProtocolVersions
{
public static string V8 { get; } = "v8";
}

[Config("SkyWalking", "Transport")]
public class TransportConfig
{
public int QueueSize { get; set; } = 30000;
public string ProtocolVersion { get; set; } = ProtocolVersions.V8;

/// <summary>
/// Flush Interval Millisecond
/// TotalQueueSize = QueueSize * Parallel
/// Elements will be dropped if queues are full.
/// </summary>
public int Interval { get; set; } = 3000;
public int QueueSize { get; set; } = 10000;

/// <summary>
/// Data queued beyond this time will be discarded.
/// TotalBatchSize = BatchSize * Parallel
/// </summary>
public int BatchSize { get; set; } = 3000;
public int BatchSize { get; set; } = 2000;

public string ProtocolVersion { get; set; } = ProtocolVersions.V8;
}
public int Parallel { get; set; } = 5;

public static class ProtocolVersions
{
public static string V8 { get; } = "v8";
/// <summary>
/// max interval between each batch, in milliseconds
/// -1 - waits for previous batch to complete
/// </summary>
public int Interval { get; set; } = 50;
}
}
}
4 changes: 0 additions & 4 deletions src/SkyApm.Abstractions/Transport/ISegmentDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*
*/

using System.Threading;
using System.Threading.Tasks;
using SkyApm.Tracing.Segments;

namespace SkyApm.Transport
Expand All @@ -26,8 +24,6 @@ public interface ISegmentDispatcher
{
bool Dispatch(SegmentContext segmentContext);

Task Flush(CancellationToken token = default(CancellationToken));

void Close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private static IServiceCollection AddSkyAPMCore(this IServiceCollection services
services.AddSingleton<IExecutionService, RegisterService>();
services.AddSingleton<IExecutionService, LogReportService>();
services.AddSingleton<IExecutionService, PingService>();
services.AddSingleton<IExecutionService, SegmentReportService>();
services.AddSingleton<SegmentReportService>();
services.AddSingleton<IExecutionService, CLRStatsService>();
services.AddSingleton<IInstrumentStartup, InstrumentStartup>();
services.AddSingleton<IRuntimeEnvironment>(RuntimeEnvironment.Instance);
Expand Down Expand Up @@ -164,4 +164,4 @@ private static IServiceCollection AddSkyApmLogging(this IServiceCollection servi
return services;
}
}
}
}
24 changes: 8 additions & 16 deletions src/SkyApm.Core/Service/SegmentReportService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*
*/

using System;
using System.Threading;
using System.Threading.Tasks;
using SkyApm.Config;
Expand All @@ -25,33 +24,26 @@

namespace SkyApm.Service
{
public class SegmentReportService : ExecutionService
/// <summary>
/// deprecated
/// </summary>
public class SegmentReportService
{
protected readonly ILogger Logger;
private readonly TransportConfig _config;
private readonly ISegmentDispatcher _dispatcher;

public SegmentReportService(IConfigAccessor configAccessor, ISegmentDispatcher dispatcher,
IRuntimeEnvironment runtimeEnvironment, ILoggerFactory loggerFactory)
: base(runtimeEnvironment, loggerFactory)
{
_dispatcher = dispatcher;
_config = configAccessor.Get<TransportConfig>();
Period = TimeSpan.FromMilliseconds(_config.Interval);
}

protected override TimeSpan DueTime { get; } = TimeSpan.FromSeconds(3);

protected override TimeSpan Period { get; }

protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
return _dispatcher.Flush(cancellationToken);
_dispatcher = dispatcher;
}

protected override Task Stopping(CancellationToken cancellationToke)
protected Task Stopping(CancellationToken cancellationToke)
{
_dispatcher.Close();
return Task.CompletedTask;
}
}
}
}
130 changes: 102 additions & 28 deletions src/SkyApm.Core/Transport/AsyncQueueSegmentDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
Expand All @@ -32,72 +33,145 @@ public class AsyncQueueSegmentDispatcher : ISegmentDispatcher
private readonly TransportConfig _config;
private readonly ISegmentReporter _segmentReporter;
private readonly ISegmentContextMapper _segmentContextMapper;
private readonly ConcurrentQueue<SegmentRequest> _segmentQueue;
private readonly IRuntimeEnvironment _runtimeEnvironment;
private readonly CancellationTokenSource _cancellation;
private int _offset;
private readonly Random _random;
private long _produceCount = 0L;
private long _consumeCount = 0L;
private long _dropCount = 0L;
private readonly BlockingCollection<SegmentRequest>[] _queueArray;
private readonly long[] _countArray;

public AsyncQueueSegmentDispatcher(IConfigAccessor configAccessor,
ISegmentReporter segmentReporter, IRuntimeEnvironment runtimeEnvironment,
ISegmentContextMapper segmentContextMapper, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger(typeof(AsyncQueueSegmentDispatcher));
_config = configAccessor.Get<TransportConfig>();
_segmentReporter = segmentReporter;
_segmentContextMapper = segmentContextMapper;
_runtimeEnvironment = runtimeEnvironment;
_logger = loggerFactory.CreateLogger(typeof(AsyncQueueSegmentDispatcher));
_config = configAccessor.Get<TransportConfig>();
_segmentQueue = new ConcurrentQueue<SegmentRequest>();
_cancellation = new CancellationTokenSource();
_random = new Random();
_queueArray = new BlockingCollection<SegmentRequest>[_config.Parallel];
_countArray = new long[_config.Parallel];
for (int i = 0; i < _config.Parallel; ++ i)
{
_queueArray[i] = new BlockingCollection<SegmentRequest>(_config.QueueSize);
_countArray[i] = 0;
}
for (int i = 0; i < _config.Parallel; ++ i)
{
int taskId = i;
Task.Run(() => Flush(taskId));
}
Task.Run(() => Statistics());
}

public bool Dispatch(SegmentContext segmentContext)
{
if (!_runtimeEnvironment.Initialized || segmentContext == null || !segmentContext.Sampled)
return false;

// todo performance optimization for ConcurrentQueue
if (_config.QueueSize < _offset || _cancellation.IsCancellationRequested)
if (_cancellation.IsCancellationRequested)
return false;

var segment = _segmentContextMapper.Map(segmentContext);

if (segment == null)
return false;

_segmentQueue.Enqueue(segment);
int queueId = _random.Next(_config.Parallel);

Interlocked.Increment(ref _offset);
bool result = _queueArray[queueId].TryAdd(segment, 0);

_logger.Debug($"Dispatch trace segment. [SegmentId]={segmentContext.SegmentId}.");
return true;
if (result)
{
Interlocked.Add(ref _produceCount, 1);
Interlocked.Add(ref _countArray[queueId], 1);
}
else
{
Interlocked.Add(ref _dropCount, 1);
}

_logger.Debug($"Dispatch trace segment. [SegmentId]={segmentContext.SegmentId},[result=]{result}.");

return result;
}

public Task Flush(CancellationToken token = default(CancellationToken))
private void Flush(int taskId)
{
// todo performance optimization for ConcurrentQueue
//var queued = _segmentQueue.Count;
//var limit = queued <= _config.PendingSegmentLimit ? queued : _config.PendingSegmentLimit;
var limit = _config.BatchSize;
var index = 0;
var segments = new List<SegmentRequest>(limit);
while (index++ < limit && _segmentQueue.TryDequeue(out var request))
while (!_cancellation.IsCancellationRequested)
{
segments.Add(request);
Interlocked.Decrement(ref _offset);
// handle dedicated queue
{
int count = DoFlush(taskId, taskId, 2000);
if (count > 0)
{
continue;
}
}
// handle other queue
{
int queueId = _random.Next(_config.Parallel);
if (queueId == taskId)
{
continue;
}
DoFlush(taskId, queueId, 0);
}
}
}

// send async
private int DoFlush(int taskId, int queueId, int timeout)
{
var segments = new List<SegmentRequest>(_config.BatchSize);
for (int i = 0; i < _config.BatchSize; ++ i)
{
if (!_queueArray[queueId].TryTake(out var request, timeout))
{
// segments is not full
break;
}
segments.Add(request);
}
if (segments.Count > 0)
_segmentReporter.ReportAsync(segments, token);

Interlocked.Exchange(ref _offset, _segmentQueue.Count);

return Task.CompletedTask;
{
try
{
Task[] task = new Task[1];
task[0] = _segmentReporter.ReportAsync(segments, new CancellationToken());
Task.WaitAll(task, _config.Interval);
}
catch (Exception e)
{
_logger.Error("Task.WaitAll failed." + taskId + "," + queueId + "," + segments.Count, e);
}
Interlocked.Add(ref _consumeCount, segments.Count);
Interlocked.Add(ref _countArray[queueId], 0 - segments.Count);
}
return segments.Count;
}

public void Close()
{
_cancellation.Cancel();
}

private void Statistics()
{
while (!_cancellation.IsCancellationRequested)
{
string message =
"statistics." +
"produce=" + _produceCount + "," +
"consume=" + _consumeCount + "," +
"drop=" + _dropCount + "," +
"detail=[" + String.Join(",", _countArray) + "],";
_logger.Information(message);
Thread.Sleep(1000 * 60);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public static IConfigurationBuilder AddSkyWalkingDefaultConfig(this IConfigurati
{ "SkyWalking:Logging:RollOnFileSizeLimit", configuration?.GetSection("SkyWalking:Logging:RollOnFileSizeLimit").Value ?? "false" },
{ "SkyWalking:Logging:RetainedFileCountLimit", configuration?.GetSection("SkyWalking:Logging:RetainedFileCountLimit").Value ?? "10" },
{ "SkyWalking:Logging:RetainedFileTimeLimit", configuration?.GetSection("SkyWalking:Logging:RetainedFileTimeLimit").Value ?? "864000000" },
{ "SkyWalking:Transport:Interval", configuration?.GetSection("SkyWalking:Transport:Interval").Value ?? "3000" },
{ "SkyWalking:Transport:ProtocolVersion", configuration?.GetSection("SkyWalking:Transport:ProtocolVersion").Value ?? ProtocolVersions.V8 },
{ "SkyWalking:Transport:QueueSize", configuration?.GetSection("SkyWalking:Transport:QueueSize").Value ?? "30000" },
{ "SkyWalking:Transport:BatchSize", configuration?.GetSection("SkyWalking:Transport:BatchSize").Value ?? "3000" },
{ "SkyWalking:Transport:ProtocolVersion", configuration?.GetSection("SkyWalking:Transport:ProtocolVersion").Value ?? ProtocolVersions.V8 },
{ "SkyWalking:Transport:QueueSize", configuration?.GetSection("SkyWalking:Transport:QueueSize").Value ?? "10000" },
{ "SkyWalking:Transport:BatchSize", configuration?.GetSection("SkyWalking:Transport:BatchSize").Value ?? "2000" },
{ "SkyWalking:Transport:Parallel", configuration?.GetSection("SkyWalking:Transport:Parallel").Value ?? "5" },
{ "SkyWalking:Transport:Interval", configuration?.GetSection("SkyWalking:Transport:Interval").Value ?? "50" },
{ "SkyWalking:Transport:gRPC:Servers", configuration?.GetSection("SkyWalking:Transport:gRPC:Servers").Value ?? "localhost:11800" },
{ "SkyWalking:Transport:gRPC:Timeout", configuration?.GetSection("SkyWalking:Transport:gRPC:Timeout").Value ?? "10000" },
{ "SkyWalking:Transport:gRPC:ReportTimeout", configuration?.GetSection("SkyWalking:Transport:gRPC:ReportTimeout").Value ?? "600000" },
Expand Down Expand Up @@ -84,3 +85,4 @@ private static string BuildDefaultServiceInstanceName()
}
}
}

Loading