Skip to content

Commit

Permalink
优化性能、加入数据同步器(WIP)
Browse files Browse the repository at this point in the history
优化了数据复制的性能
加入了数据同步器的源生成器实现
修复了数据转发的bug
完成了广播包的实现
完成了无负载数据包的分发处理逻辑
加入了广播包的测试(WIP)
更新 Nuget
优化了部分 Socket 的发送逻辑
其它问题修复
  • Loading branch information
laolarou726 committed Aug 23, 2023
1 parent ee12d16 commit dae233e
Show file tree
Hide file tree
Showing 67 changed files with 1,699 additions and 170 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System;
using System.Buffers;
using System.Text;
using Hive.DataSynchronizer.Shared.Attributes;
using Hive.DataSynchronizer.Shared.UpdateInfo;
using Hive.Framework.Codec.Abstractions;
using Hive.Framework.Shared;
using Hive.Framework.Shared.Helpers;

namespace Hive.DataSynchronizer.SourceGenerator.Tests
{
public class GuidUpdateInfo : AbstractUpdateInfoBase<Guid>
{
public GuidUpdateInfo(ushort objectSyncId, string propertyName, Guid newValue) : base(objectSyncId, propertyName, newValue)
{
}

public override ReadOnlyMemory<byte> Serialize<TId>(IPacketCodec<TId> codec)
{
// [LENGTH (2) | PACKET_FLAGS (4) | TYPE (2) | CONTENT]
var packetIdMemory = codec.PacketIdMapper.GetPacketIdMemory(typeof(UInt32UpdateInfo));
var packetFlags = PacketFlags.Broadcast | PacketFlags.S2CPacket;
var propertyNameMemory = Encoding.UTF8.GetBytes(PropertyName).AsSpan();

var totalLength = sizeof(ushort) + sizeof(uint) + packetIdMemory.Length + sizeof(ushort) + sizeof(uint) + propertyNameMemory.Length;
var result = new Memory<byte>(new byte[totalLength]);

var index = 0;

BitConverter.TryWriteBytes(
result.Span.SliceAndIncrement(ref index, sizeof(ushort)),
(ushort)totalLength);
BitConverter.TryWriteBytes(
result.Span.SliceAndIncrement(ref index, sizeof(uint)),
(uint)packetFlags);
packetIdMemory.Span
.CopyTo(result.Span.SliceAndIncrement(ref index, packetIdMemory.Length));
BitConverter.TryWriteBytes(
result.Span.SliceAndIncrement(ref index, sizeof(ushort)),
ObjectSyncId);
NewValue.ToByteArray().AsSpan()
.CopyTo(result.Span.SliceAndIncrement(ref index, 16));
propertyNameMemory
.CopyTo(result.Span.SliceAndIncrement(ref index, propertyNameMemory.Length));

return result;
}

public override AbstractUpdateInfoBase Deserialize(ReadOnlyMemory<byte> memory)
{
var index = 0;
var objectSyncId = BitConverter.ToUInt16(memory.Span.SliceAndIncrement(ref index, sizeof(ushort)));
var newValue = new Guid(memory.Span.SliceAndIncrement(ref index, 16));
var propertyName = Encoding.UTF8.GetString(memory.Span[index..]);

return new GuidUpdateInfo(objectSyncId, propertyName, newValue);
}
}

[DataSynchronizationObject(1)]
public partial class Class1
{
[DataSynchronizationProperty]
private int _in;

[DataSynchronizationProperty]
[UseCustomUpdateInfoType(typeof(GuidUpdateInfo))]
private Guid _guidTest;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\DataSynchronizer\Hive.DataSynchronizer.Abstraction\Hive.DataSynchronizer.Abstraction.csproj" />
<ProjectReference Include="..\..\DataSynchronizer\Hive.DataSynchronizer.SourceGenerator\Hive.DataSynchronizer.SourceGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
<ProjectReference Include="..\..\DataSynchronizer\Hive.DataSynchronizer\Hive.DataSynchronizer.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface INetworkingTestProperties
public int AdderCount { get; }
public int AdderPackageReceiveCount { get; }
public int BidirectionalPacketAddResult { get; }
public PacketFlags NoPayloadPacketFlags { get; }
public int NoPayloadPacketCount { get; }
}

public abstract class AbstractNetworkingTestBase<TSession, TClient, TAcceptor, TClientManager>
Expand Down Expand Up @@ -138,20 +138,19 @@ public async Task NoPayloadPacketReceiveTest()
{
await Task.Delay(1000);

var sentCount = 0;

for (var i = -1; i <= 30; i++)
{
var flag = i == -1 ? 0 : (1 << i);

await Client.SendWithoutPayload(
Client.PacketCodec,
(PacketFlags)flag);
await Client.SendWithoutPayload(PacketFlags.None);
sentCount++;
}

await Task.Delay(3000);

Assert.Multiple(() =>
{
Assert.That((uint)ClientManager.NoPayloadPacketFlags, Is.EqualTo(int.MaxValue));
Assert.That((uint)ClientManager.NoPayloadPacketCount, Is.EqualTo(sentCount));
});
}

Expand All @@ -177,6 +176,8 @@ public async Task ReconnectTest()

ShouldSendHeartBeat = true;

await Task.Delay(1500);

Assert.That(ClientManager.ReconnectedClient, Is.EqualTo(1));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected override void RegisterHeartBeatMessage(KcpSession<ushort> session)
public int AdderCount { get; private set; }
public int AdderPackageReceiveCount { get; private set; }
public int BidirectionalPacketAddResult { get; private set; }
public PacketFlags NoPayloadPacketFlags { get; private set; }
public int NoPayloadPacketCount { get; private set; }

public override ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(KcpSession<ushort> session)
{
Expand Down Expand Up @@ -74,9 +74,9 @@ protected override void RegisterSigninMessage(KcpSession<ushort> session)
await kcpSession.SendAsync(new S2CTestPacket { ReversedRandomNumber = -message.Payload.RandomNumber }, PacketFlags.None);
});

session.OnReceive<INoPayloadPacketPlaceHolder>((result, _) =>
session.OnReceive<INoPayloadPacketPlaceHolder>((_, _) =>
{
NoPayloadPacketFlags |= result.Flags;
NoPayloadPacketCount++;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected override void RegisterHeartBeatMessage(QuicSession<ushort> session)
public int AdderCount { get; private set; }
public int AdderPackageReceiveCount { get; private set; }
public int BidirectionalPacketAddResult { get; private set; }
public PacketFlags NoPayloadPacketFlags { get; private set; }
public int NoPayloadPacketCount { get; private set; }

public override ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(QuicSession<ushort> session)
{
Expand Down Expand Up @@ -76,9 +76,9 @@ protected override void RegisterSigninMessage(QuicSession<ushort> session)
await quicSession.SendAsync(new S2CTestPacket { ReversedRandomNumber = -message.Payload.RandomNumber }, PacketFlags.None);
});

session.OnReceive<INoPayloadPacketPlaceHolder>((result, _) =>
session.OnReceive<INoPayloadPacketPlaceHolder>((_, _) =>
{
NoPayloadPacketFlags |= result.Flags;
NoPayloadPacketCount++;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected override void RegisterHeartBeatMessage(TcpSession<ushort> session)
public int AdderCount { get; private set; }
public int AdderPackageReceiveCount { get; private set; }
public int BidirectionalPacketAddResult { get; private set; }
public PacketFlags NoPayloadPacketFlags { get; private set; }
public int NoPayloadPacketCount { get; private set; }

public override ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(TcpSession<ushort> session)
{
Expand Down Expand Up @@ -74,9 +74,9 @@ protected override void RegisterSigninMessage(TcpSession<ushort> session)
await tcpSession.SendAsync(new S2CTestPacket { ReversedRandomNumber = -message.Payload.RandomNumber }, PacketFlags.None);
});

session.OnReceive<INoPayloadPacketPlaceHolder>((result, _) =>
session.OnReceive<INoPayloadPacketPlaceHolder>((_, _) =>
{
NoPayloadPacketFlags |= result.Flags;
NoPayloadPacketCount++;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected override void RegisterHeartBeatMessage(UdpSession<ushort> session)
public int AdderCount { get; private set; }
public int AdderPackageReceiveCount { get; private set; }
public int BidirectionalPacketAddResult { get; private set; }
public PacketFlags NoPayloadPacketFlags { get; private set; }
public int NoPayloadPacketCount { get; private set; }

public override ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(UdpSession<ushort> session)
{
Expand Down Expand Up @@ -74,9 +74,9 @@ protected override void RegisterSigninMessage(UdpSession<ushort> session)
await udpSession.SendAsync(new S2CTestPacket { ReversedRandomNumber = -message.Payload.RandomNumber }, PacketFlags.None);
});

session.OnReceive<INoPayloadPacketPlaceHolder>((result, _) =>
session.OnReceive<INoPayloadPacketPlaceHolder>((_, _) =>
{
NoPayloadPacketFlags |= result.Flags;
NoPayloadPacketCount++;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ protected override void RegisterClientStartTransmitMessage(TcpSession<ushort> se

private async Task TcpServerSessionOnOnDataReceived(object? sender, ReceivedDataEventArgs e)
{
await DoForwardDataToClientAsync(e.Data);
await DoForwardDataToClientAsync(e);
}

private async Task TcpClientSessionOnOnDataReceived(object? sender, ReceivedDataEventArgs e)
{
await DoForwardDataToServerAsync((TcpSession<ushort>)sender!, e.Data);
await DoForwardDataToServerAsync((TcpSession<ushort>)sender!, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void Setup()
_packetIdMapper.Register<ClientCanTransmitMessage>();
_packetIdMapper.Register<ServerRedirectTestMessage1>();
_packetIdMapper.Register<ServerRedirectTestMessage2>();
_packetIdMapper.Register<ServerBroadcastTestMessage>();

_clientPacketCodec = new ProtoBufPacketCodec(_packetIdMapper);
_serverPacketCodec = new ProtoBufPacketCodec(_packetIdMapper, new IPacketPrefixResolver[]
Expand Down Expand Up @@ -381,4 +382,52 @@ await client.SendAsync(new ServerRedirectTestMessage2
Assert.That(client2Counter, Is.EqualTo(client2LocalCounter));
});
}

[Test]
[Order(8)]
public async Task ServerBroadcastWithPayloadTest()
{
await Task.Delay(1000);

var client1ReceivedNumber = 0;
var client2ReceivedNumber = 0;

_client1.OnReceive<ServerBroadcastTestMessage>((result, _) =>
{
client1ReceivedNumber += result.Payload.Number;
});

_client2.OnReceive<ServerBroadcastTestMessage>((result, _) =>
{
client2ReceivedNumber += result.Payload.Number;
});

await Task.Delay(500);

var localSentNumber = 0;

for (var i = 0; i < 100; i++)
{
var rnd = Random.Shared.Next(-100, 100);

localSentNumber += rnd;

await _server.SendAsync(
new ServerBroadcastTestMessage
{
Number = rnd
},
PacketFlags.Broadcast | PacketFlags.S2CPacket);
await Task.Delay(10);
}

await Task.Delay(8000);

Assert.Multiple(() =>
{
Assert.That(client1ReceivedNumber, Is.EqualTo(localSentNumber));
Assert.That(client2ReceivedNumber, Is.EqualTo(localSentNumber));
Assert.That(client1ReceivedNumber, Is.EqualTo(client2ReceivedNumber));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.7.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.7.1" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="NUnit.Analyzers" Version="3.6.1">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using MemoryPack;
using ProtoBuf;

namespace Hive.Framework.Networking.Tests.Messages;

[ProtoContract]
[MemoryPackable]
public partial class ServerBroadcastTestMessage
{
[ProtoMember(1)]
public int Number { get; set; }
}
3 changes: 1 addition & 2 deletions Codecs/Hive.Codec.Abstractions/IPacketCodec.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Buffers;
using Hive.Framework.Shared;

namespace Hive.Framework.Codec.Abstractions
Expand All @@ -20,7 +19,7 @@ public interface IPacketCodec<TId> where TId : unmanaged
ReadOnlyMemory<byte> GetPacketFlagsMemory(ReadOnlyMemory<byte> payload);
PacketFlags GetPacketFlags(ReadOnlyMemory<byte> data);

SerializedPacketMemory Encode<T>(T obj, PacketFlags flags);
ReadOnlyMemory<byte> Encode<T>(T obj, PacketFlags flags);

PacketDecodeResultWithId<TId> Decode(ReadOnlySpan<byte> data);
}
Expand Down
1 change: 1 addition & 0 deletions Codecs/Hive.Codec.Abstractions/IPacketIdMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface IPacketIdMapper<TId> where TId : unmanaged
void Register(Type type, out TId id);

TId GetPacketId(Type type);
ReadOnlyMemory<byte> GetPacketIdMemory(Type type);
Type GetPacketType(TId id);
}
}
14 changes: 7 additions & 7 deletions Codecs/Hive.Codec.Bson/BsonPacketCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public PacketFlags GetPacketFlags(ReadOnlyMemory<byte> data)
return (PacketFlags) flags;
}

public SerializedPacketMemory Encode<T>(T obj, PacketFlags flags)
public ReadOnlyMemory<byte> Encode<T>(T obj, PacketFlags flags)
{
var dataSpan = obj.ToBson().AsSpan();

Expand All @@ -63,26 +63,26 @@ public SerializedPacketMemory Encode<T>(T obj, PacketFlags flags)

// [LENGTH (2) | PACKET_FLAGS (4) | TYPE (2) | CONTENT]
var index = 0;
var result = MemoryPool<byte>.Shared.Rent(2 + 4 + 2 + dataSpan.Length);
var result = new Memory<byte>(new byte[2 + 4 + 2 + dataSpan.Length]);

BitConverter.TryWriteBytes(
result.Memory.Span.SliceAndIncrement(ref index, sizeof(ushort)),
result.Span.SliceAndIncrement(ref index, sizeof(ushort)),
(ushort)(dataSpan.Length + 4 + 2));

// Packet Flags
BitConverter.TryWriteBytes(
result.Memory.Span.SliceAndIncrement(ref index, sizeof(uint)),
result.Span.SliceAndIncrement(ref index, sizeof(uint)),
(uint)flags);

// Packet Id
BitConverter.TryWriteBytes(
result.Memory.Span.SliceAndIncrement(ref index, sizeof(ushort)),
result.Span.SliceAndIncrement(ref index, sizeof(ushort)),
packetId);

// Packet Payload
dataSpan.CopyTo(result.Memory.Span.SliceAndIncrement(ref index, dataSpan.Length));
dataSpan.CopyTo(result.Span.SliceAndIncrement(ref index, dataSpan.Length));

return new SerializedPacketMemory(index, result);
return result;
}

public unsafe PacketDecodeResultWithId<ushort> Decode(ReadOnlySpan<byte> data)
Expand Down
Loading

0 comments on commit dae233e

Please sign in to comment.