Skip to content

Commit

Permalink
Attempts to fix replay recording performance issues.
Browse files Browse the repository at this point in the history
Replays now use a dedicated thread (rather than thread pool) for write operations.

Moved batch operations to this thread as well. They were previously happening during PVS. Looking at some trace files these compression ops can easily take 5+ ms in some cases, so moving them somewhere else is appreciated.

Added EventSource instrumentation for PVS and replay recording.
  • Loading branch information
PJB3005 committed Aug 27, 2023
1 parent 01546f3 commit 5e1d80b
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 32 deletions.
4 changes: 2 additions & 2 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ END TEMPLATE-->

### Other

*None yet*
* Performance improvements for replay recording.

### Internal

*None yet*
* Added some `EventSource` providers for PVS and replay recording: `Robust.Pvs` and `Robust.ReplayRecording`.


## 152.0.0
Expand Down
38 changes: 37 additions & 1 deletion Robust.Server/GameStates/ServerGameStateManager.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Linq;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand All @@ -24,7 +25,6 @@
using Prometheus;
using Robust.Server.Replays;
using Robust.Shared.Players;
using Robust.Shared.Map.Components;

namespace Robust.Server.GameStates
{
Expand Down Expand Up @@ -219,10 +219,16 @@ PvsThreadResources SendPlayer(int i, ParallelLoopState state, PvsThreadResources
{
try
{
var guid = i >= 0 ? players[i].UserId.UserId : default;

PvsEventSource.Log.WorkStart(_gameTiming.CurTick.Value, i, guid);

if (i >= 0)
SendStateUpdate(i, resource, inputSystem, players[i], pvsData, mQuery, tQuery, ref oldestAckValue);
else
_replay.Update();

PvsEventSource.Log.WorkStop(_gameTiming.CurTick.Value, i, guid);
}
catch (Exception e) // Catch EVERY exception
{
Expand Down Expand Up @@ -373,5 +379,35 @@ private void SendStateUpdate(int i,
_networkManager.ServerSendMessage(pvsMessage, channel);
}
}

[EventSource(Name = "Robust.Pvs")]
public sealed class PvsEventSource : System.Diagnostics.Tracing.EventSource
{
public static PvsEventSource Log { get; } = new();

[Event(1)]
public void WorkStart(uint tick, int playerIdx, Guid playerGuid) => WriteEvent(1, tick, playerIdx, playerGuid);

[Event(2)]
public void WorkStop(uint tick, int playerIdx, Guid playerGuid) => WriteEvent(2, tick, playerIdx, playerGuid);

[NonEvent]
private unsafe void WriteEvent(int eventId, uint arg1, int arg2, Guid arg3)
{
if (IsEnabled())
{
var descrs = stackalloc EventData[3];

descrs[0].DataPointer = (IntPtr)(&arg1);
descrs[0].Size = 4;
descrs[1].DataPointer = (IntPtr)(&arg2);
descrs[1].Size = 4;
descrs[2].DataPointer = (IntPtr)(&arg3);
descrs[2].Size = 16;

WriteEventCore(eventId, 3, descrs);
}
}
}
}
}
28 changes: 28 additions & 0 deletions Robust.Shared/Replays/SharedReplayRecordingManager.Events.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Diagnostics.Tracing;

namespace Robust.Shared.Replays;

internal abstract partial class SharedReplayRecordingManager
{
[EventSource(Name = "Robust.ReplayRecording")]
public sealed class RecordingEventSource : EventSource
{
public static RecordingEventSource Log { get; } = new();

[Event(1)]
public void WriteTaskStart(int task) => WriteEvent(1, task);

[Event(2)]
public void WriteTaskStop(int task) => WriteEvent(2, task);

[Event(3)]
public void WriteBatchStart(int index) => WriteEvent(3, index);

[Event(4)]
public void WriteBatchStop(int index, int uncompressed, int compressed) =>
WriteEvent(4, index, uncompressed, compressed);

[Event(5)]
public void WriteQueueBlocked() => WriteEvent(5);
}
}
77 changes: 70 additions & 7 deletions Robust.Shared/Replays/SharedReplayRecordingManager.Write.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Threading;
using System.Threading.Channels;
using Robust.Shared.Serialization;
using Robust.Shared.Utility;
Expand All @@ -25,7 +26,7 @@ internal abstract partial class SharedReplayRecordingManager
// and even then not for much longer than a couple hundred ms at most.
private readonly List<Task> _finalizingWriteTasks = new();

private static void WriteYaml(RecordingState state, ResPath path, YamlDocument data)
private void WriteYaml(RecordingState state, ResPath path, YamlDocument data)
{
var memStream = new MemoryStream();
using var writer = new StreamWriter(memStream);
Expand All @@ -43,7 +44,7 @@ private void WriteSerializer<T>(RecordingState state, ResPath path, T obj)
WriteBytes(state, path, memStream.AsMemory());
}

private static void WritePooledBytes(
private void WritePooledBytes(
RecordingState state,
ResPath path,
byte[] bytes,
Expand All @@ -67,6 +68,45 @@ private static void WritePooledBytes(
});
}

private void WriteTickBatch(
RecordingState state,
ResPath path,
byte[] bytes,
int length)
{
DebugTools.Assert(path.IsRelative, "Zip path should be relative");

WriteQueueTask(state, () =>
{
byte[]? buf = null;
try
{
// Compress stream to buffer.
// First 4 bytes of buffer are reserved for the length of the uncompressed stream.
var bound = ZStd.CompressBound(length);
buf = ArrayPool<byte>.Shared.Rent(4 + bound);
var compressedLength = state.CompressionContext.Compress2(
buf.AsSpan(4, bound),
bytes.AsSpan(0, length));
BitConverter.TryWriteBytes(buf, length);
Interlocked.Add(ref state.UncompressedSize, length);
Interlocked.Add(ref state.CompressedSize, compressedLength);
var entry = state.Zip.CreateEntry(path.ToString(), CompressionLevel.NoCompression);
using var stream = entry.Open();
stream.Write(buf, 0, compressedLength + 4);
}
finally
{
ArrayPool<byte>.Shared.Return(bytes);
if (buf != null)
ArrayPool<byte>.Shared.Return(buf);
}
});
}

private void WriteToml(RecordingState state, IEnumerable<string> enumerable, ResPath path)
{
var memStream = new MemoryStream();
Expand All @@ -75,7 +115,7 @@ private void WriteToml(RecordingState state, IEnumerable<string> enumerable, Res
WriteBytes(state, path, memStream.AsMemory());
}

private static void WriteBytes(
private void WriteBytes(
RecordingState recState,
ResPath path,
ReadOnlyMemory<byte> bytes,
Expand All @@ -91,14 +131,18 @@ private static void WriteBytes(
});
}

private static void WriteQueueTask(RecordingState recState, Action a)
private void WriteQueueTask(RecordingState recState, Action a)
{
var task = recState.WriteCommandChannel.WriteAsync(a);

// If we have to wait here, it's because the channel is full.
// Synchronous waiting is safe here: the writing code doesn't rely on the synchronization context.
if (!task.IsCompletedSuccessfully)
{
RecordingEventSource.Log.WriteQueueBlocked();
_sawmill.Warning("Forced to wait on replay write queue. Consider increasing replay.write_channel_size!");
task.AsTask().Wait();
}
}

protected void UpdateWriteTasks()
Expand Down Expand Up @@ -154,23 +198,42 @@ public Task WaitWriteTasks()
return Task.WhenAll(_finalizingWriteTasks);
}

private static async Task WriteQueueLoop(ChannelReader<Action> reader, ZipArchive archive)
#pragma warning disable RA0004
private static void WriteQueueLoop(
TaskCompletionSource taskCompletionSource,
ChannelReader<Action> reader,
ZipArchive archive,
ZStdCompressionContext compressionContext)
{
try
{
var i = 0;
while (true)
{
var result = await reader.WaitToReadAsync();
var result = reader.WaitToReadAsync().AsTask().Result;

if (!result)
break;

var action = await reader.ReadAsync();
var action = reader.ReadAsync().AsTask().Result;
RecordingEventSource.Log.WriteTaskStart(i);
action();
RecordingEventSource.Log.WriteTaskStop(i);

i += 1;
}

taskCompletionSource.TrySetResult();
}
catch (Exception e)
{
taskCompletionSource.TrySetException(e);
}
finally
{
archive.Dispose();
compressionContext.Dispose();
}
}
#pragma warning restore RA0004
}
58 changes: 36 additions & 22 deletions Robust.Shared/Replays/SharedReplayRecordingManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Linq;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Robust.Shared.Asynchronous;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void StopRecording()

try
{
WriteGameState(continueRecording: false);
WriteBatch(continueRecording: false);
_sawmill.Info("Replay recording stopped!");
}
catch
Expand Down Expand Up @@ -137,7 +138,7 @@ public void Update(GameState? state)
_sawmill.Info("Reached requested replay recording length. Stopping recording.");

if (!continueRecording || _recState.Buffer.Length > _tickBatchSize)
WriteGameState(continueRecording);
WriteBatch(continueRecording);
}
catch (Exception e)
{
Expand Down Expand Up @@ -201,12 +202,19 @@ public virtual bool TryStartRecording(
new BoundedChannelOptions(NetConf.GetCVar(CVars.ReplayWriteChannelSize))
{
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false
SingleWriter = true
}
);

var writeTask = Task.Run(() => WriteQueueLoop(commandQueue.Reader, zip));
var writeTaskTcs = new TaskCompletionSource();
// This is on its own thread instead of the thread pool.
// Official SS14 servers write replays to an NFS mount,
// which causes some write calls to have significant latency (~1s).
// We want to avoid clogging thread pool threads with that, so...
var writeThread = new Thread(() => WriteQueueLoop(writeTaskTcs, commandQueue.Reader, zip, context));
writeThread.Priority = ThreadPriority.BelowNormal;
writeThread.Name = "Replay Recording Thread";
writeThread.Start();

_recState = new RecordingState(
zip,
Expand All @@ -216,7 +224,7 @@ public virtual bool TryStartRecording(
Timing.CurTime,
recordingEnd,
commandQueue.Writer,
writeTask,
writeTaskTcs.Task,
directory,
filePath,
state
Expand Down Expand Up @@ -252,26 +260,33 @@ public void RecordReplayMessage(object obj)
_queuedMessages.Add(obj);
}

private void WriteGameState(bool continueRecording = true)
private void WriteBatch(bool continueRecording = true)
{
DebugTools.Assert(_recState != null);

var batchIndex = _recState.Index++;
RecordingEventSource.Log.WriteBatchStart(batchIndex);

_recState.Buffer.Position = 0;

// Compress stream to buffer.
// First 4 bytes of buffer are reserved for the length of the uncompressed stream.
var bound = ZStd.CompressBound((int)_recState.Buffer.Length);
var buf = ArrayPool<byte>.Shared.Rent(4 + bound);
var length = _recState.CompressionContext.Compress2(buf.AsSpan(4, bound), _recState.Buffer.AsSpan());
BitConverter.TryWriteBytes(buf, (int)_recState.Buffer.Length);
WritePooledBytes(
_recState,
ReplayZipFolder / $"{DataFilePrefix}{_recState.Index++}.{Ext}",
buf, 4 + length, CompressionLevel.NoCompression);
var uncompressed = _recState.Buffer.AsSpan();
var poolData = ArrayPool<byte>.Shared.Rent(uncompressed.Length);
uncompressed.CopyTo(poolData);

_recState.UncompressedSize += (int)_recState.Buffer.Length;
_recState.CompressedSize += length;
if (_recState.UncompressedSize >= _maxUncompressedSize || _recState.CompressedSize >= _maxCompressedSize)
WriteTickBatch(
_recState,
ReplayZipFolder / $"{DataFilePrefix}{batchIndex}.{Ext}",
poolData,
uncompressed.Length);

// Note: these values are ASYNCHRONOUSLY updated from the replay write thread.
// This means reading them here won't get the most up-to-date values,
// and we'll probably always be off-by-one.
// That's considered acceptable.
var uncompressedSize = Interlocked.Read(ref _recState.UncompressedSize);
var compressedSize = Interlocked.Read(ref _recState.CompressedSize);

if (uncompressedSize >= _maxUncompressedSize || compressedSize >= _maxCompressedSize)
{
_sawmill.Info("Reached max replay recording size. Stopping recording.");
continueRecording = false;
Expand All @@ -288,8 +303,7 @@ protected virtual void Reset()
if (_recState == null)
return;

_recState.CompressionContext.Dispose();
// File stream is always disposed from the worker task.
// File stream & compression context is always disposed from the worker task.
_recState.WriteCommandChannel.Complete();

_recState = null;
Expand Down

0 comments on commit 5e1d80b

Please sign in to comment.