Skip to content

Commit

Permalink
完成网关基础设置、优化服务端封包转发策略
Browse files Browse the repository at this point in the history
  • Loading branch information
laolarou726 committed Aug 7, 2023
1 parent ed24726 commit 12a5224
Show file tree
Hide file tree
Showing 32 changed files with 186 additions and 107 deletions.
2 changes: 1 addition & 1 deletion Common/Hive.Common.Codec.Bson/BsonPacketCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public ReadOnlyMemory<byte> GetPacketFlagsMemory(ReadOnlyMemory<byte> payload)

public PacketFlags GetPacketFlags(ReadOnlyMemory<byte> data)
{
var flagsMemory = data.Slice(6, 4);
var flagsMemory = data.Slice(2, 4);
var flags = BitConverter.ToUInt32(flagsMemory.Span);

return (PacketFlags) flags;
Expand Down
9 changes: 9 additions & 0 deletions Common/Hive.Common.Networking.Abstractions/IClientManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public interface IClientManager<TSessionId, TSession>
where TSession : ISession<TSession>
where TSessionId : unmanaged
{
int SessionIdSize { get; }

/// <summary>
/// 根据会话获取编码后的 C2S(Client -> Server) 前缀
/// <para>这个方法通常被用于在网关服务器向具体服务器转发时注入的额外信息</para>
Expand All @@ -16,6 +18,13 @@ public interface IClientManager<TSessionId, TSession>
/// <returns>编码后的会话前缀</returns>
ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(TSession session);

/// <summary>
/// 从完整的封包中提取并解析会话 ID
/// </summary>
/// <param name="payload"></param>
/// <returns></returns>
TSessionId ResolveSessionPrefix(ReadOnlyMemory<byte> payload);

/// <summary>
/// 根据会话获取会话 ID
/// </summary>
Expand Down

This file was deleted.

3 changes: 3 additions & 0 deletions Common/Hive.Common.Networking.Shared/AbstractClientManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public abstract class AbstractClientManager<TSessionId, TSession> : IClientManag

private bool _isClientLinkHolderRunning;

public abstract int SessionIdSize { get; }

public event EventHandler<ClientConnectionChangedEventArgs<TSession>>? OnClientConnected;
public event EventHandler<ClientConnectionChangedEventArgs<TSession>>? OnClientDisconnected;

Expand Down Expand Up @@ -91,6 +93,7 @@ public virtual void StopClientConnectionHolder()
}

public abstract ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(TSession session);
public abstract TSessionId ResolveSessionPrefix(ReadOnlyMemory<byte> payload);

protected abstract void RegisterHeartBeatMessage(TSession session);
protected abstract void RegisterSigninMessage(TSession session);
Expand Down
47 changes: 35 additions & 12 deletions Common/Hive.Common.Networking.Shared/AbstractGatewayServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Hive.Framework.Networking.Abstractions;
using Hive.Framework.Networking.Abstractions.EventArgs;
using Hive.Framework.Shared;
using Hive.Framework.Shared.Helpers;

namespace Hive.Framework.Networking.Shared;

Expand Down Expand Up @@ -79,7 +80,7 @@ protected virtual void AddPacketRoute(TId packetId, TSession session)
/// <para>客户端只应该在接受到该方法发送的消息后才开始进行数据传输,否则可能会导致前半部分数据丢失</para>
/// </summary>
/// <param name="session"></param>
protected abstract void NotifyClientCanStartTransmitMessage(TSession session);
protected abstract ValueTask NotifyClientCanStartTransmitMessage(TSession session);

/// <summary>
/// 服务器注册方法
Expand All @@ -90,14 +91,6 @@ protected virtual void AddPacketRoute(TId packetId, TSession session)
/// <param name="session"></param>
protected abstract void RegisterServerRegistrationMessage(TSession session);

/// <summary>
/// 服务器回复数据包注册方法
/// <para>一般情况下,此方法需要注册相应的数据包来帮助网关服务器向正确的客户端传输数据。</para>
/// <para>服务端回复数据包需实现 <see cref="IServerReplyPacket{TId}"/>,其中包括目标客户端 ID 以及数据包负载。 </para>
/// </summary>
/// <param name="session"></param>
protected abstract void RegisterServerReplyMessage(TSession session);

/// <summary>
/// 客户端数据包传送起始注册方法
/// <para>一般情况下,此方法需要在接受到相应的起始数据包后开始转发该客户端会话发送的所有请求,通过调用 <see cref="DoForwardDataToServerAsync"/> 来将数据包转发给对应服务器</para>
Expand Down Expand Up @@ -127,6 +120,7 @@ protected virtual void InvokeOnClientConnected(object sender, ClientConnectionCh
/// <summary>
/// 客户端数据转发方法
/// <para>一般情况下,此方法会拆解客户端数据包,并向其追加客户端会话 ID 等信息,并将数据包转发给相应服务器</para>
/// <para>注意:在调用该转发方法时,应始终保持 [封包 ID] 的下一位为 [客户端会话 ID],并且服务端也应该从 [客户端会话 ID] 部分开始解析自定义包头</para>
/// </summary>
/// <param name="session"></param>
/// <param name="data"></param>
Expand Down Expand Up @@ -157,12 +151,41 @@ protected virtual async ValueTask DoForwardDataToServerAsync(TSession session, R
await serverSession!.SendAsync(repackedData);
}

protected virtual async ValueTask DoForwardDataToClientAsync(IServerReplyPacket<TSessionId> packet)
/// <summary>
/// 服务端数据转发方法
/// <para>一般情况下,此方法会拆解服务端数据包,并解析其中包含的客户端会话 ID 等信息,并将数据包转发给相应客户端</para>
/// <para>注意:在调用该转发方法时,应始终保持 [封包 ID] 的下一位为 [客户端会话 ID],并且服务端也应该从 [客户端会话 ID] 部分开始解析自定义包头</para>
/// </summary>
/// <param name="session"></param>
/// <param name="data"></param>
/// <returns></returns>
protected virtual async ValueTask DoForwardDataToClientAsync(ReadOnlyMemory<byte> data)
{
if (!Acceptor.ClientManager.TryGetSession(packet.SendTo, out var session))
var packetIdMemory = PacketCodec.GetPacketIdMemory(data);
var packetFlags = PacketCodec.GetPacketFlags(data);

if (!packetFlags.HasFlag(PacketFlags.S2CPacket)) return;

var sessionId = Acceptor.ClientManager.ResolveSessionPrefix(data);

if (!Acceptor.ClientManager.TryGetSession(sessionId, out var session))
return;

await session!.SendAsync(packet.InnerPayload);
var newFlag = packetFlags | PacketFlags.Finalized;
var packetFlagsMemory = BitConverter.GetBytes((uint) newFlag);
var payload = data[(2 + 4 + packetIdMemory.Length + Acceptor.ClientManager.SessionIdSize)..];
var resultLength = packetFlagsMemory.Length + packetIdMemory.Length + payload.Length;
var lengthMemory = BitConverter.GetBytes((ushort)resultLength).AsMemory();

// [LENGTH (2) | PACKET_FLAGS (4) | PACKET_ID | SESSION_ID | PAYLOAD]
var repackedData =
MemoryHelper.CombineMemory(
lengthMemory,
packetFlagsMemory,
packetIdMemory,
payload);

await session!.SendAsync(repackedData);
}

protected virtual bool GetServerSession(TId packetId, bool useLoadBalancer, out TSession? serverSession)
Expand Down
7 changes: 4 additions & 3 deletions Common/Hive.Common.Networking.Shared/AbstractSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ protected async ValueTask ProcessPacket(ReadOnlyMemory<byte> payloadBytes)
var packetFlags = PacketCodec.GetPacketFlags(payloadBytes);
var id = PacketCodec.GetPacketId(idMemory);

if (RedirectReceivedData &&
(RedirectPacketIds?.Contains(id) ?? false) &&
!packetFlags.HasFlag(PacketFlags.ServerReply))
var isPacketFinalized = packetFlags.HasFlag(PacketFlags.Finalized);
var shouldRedirect = RedirectReceivedData && (RedirectPacketIds?.Contains(id) ?? false);

if ((shouldRedirect || packetFlags.HasFlag(PacketFlags.S2CPacket)) && !isPacketFinalized)
{
await InvokeDataReceivedEventAsync(idMemory, payloadBytes.ToArray().AsMemory());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Hive.Framework.Codec.Abstractions;
using Hive.Framework.Networking.Abstractions;
using Hive.Framework.Shared;
using Hive.Framework.Shared.Helpers;

namespace Hive.Framework.Networking.Shared.Helpers;

Expand Down
2 changes: 1 addition & 1 deletion Common/Hive.Common.Networking.Tcp/TcpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public TcpSession(string addressWithPort, IPacketCodec<TId> packetCodec, IDataDi
protected override async ValueTask DispatchPacket(IPacketDecodeResult<object>? packet, Type? packetType = null)
{
if (packet == null) return;

await DataDispatcher.DispatchAsync(this, packet, packetType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Hive.Framework.Networking.Tests.Messages;
using Hive.Framework.Networking.Tests.Messages.BidirectionalPacket;
using Hive.Framework.Shared;
using System.Text;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Kcp;

Expand All @@ -18,7 +17,9 @@ protected override void RegisterHeartBeatMessage(KcpSession<ushort> session)
UpdateHeartBeatReceiveTime(sessionId);
});
}


public override int SessionIdSize => 16;

public int ConnectedClient { get; private set; }
public int SigninMessageVal { get; private set; }
public int SignOutMessageVal { get; private set; }
Expand All @@ -30,7 +31,16 @@ protected override void RegisterHeartBeatMessage(KcpSession<ushort> session)

public override ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(KcpSession<ushort> session)
{
return Encoding.ASCII.GetBytes(GetSessionId(session).ToString("N"));
return GetSessionId(session).ToByteArray();
}

public override Guid ResolveSessionPrefix(ReadOnlyMemory<byte> payload)
{
// [LENGTH (2) | PACKET_FLAGS (4) | PACKET_ID | SESSION_ID | PAYLOAD]
const int startIndex = 2 + 4 + sizeof(ushort);
var sessionIdMemory = payload.Slice(startIndex, 16);

return new Guid(sessionIdMemory.Span);
}

protected override void InvokeOnClientDisconnected(Guid sessionId, KcpSession<ushort> session, bool isClientRequest)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using Hive.Framework.Codec.Bson;
using Hive.Framework.Networking.Kcp;
using Hive.Framework.Networking.Shared;
using Hive.Framework.Shared;
using System.Net;
using Hive.Common.Codec.Shared;
using Hive.Framework.Shared.Helpers;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Kcp;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using Hive.Framework.Networking.Kcp;
using Hive.Framework.Networking.Shared;
using Hive.Framework.Shared;
using System.Net;
using Hive.Common.Codec.MemoryPack;
using Hive.Common.Codec.Shared;
using Hive.Framework.Shared.Helpers;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Kcp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Hive.Framework.Codec.Protobuf;
using Hive.Framework.Networking.Kcp;
using Hive.Framework.Networking.Shared;
using Hive.Framework.Shared;
using Hive.Framework.Shared.Helpers;
using System.Net;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Kcp;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Runtime.Versioning;
using System.Text;
using Hive.Framework.Networking.Kcp;
using Hive.Framework.Networking.Quic;
using Hive.Framework.Networking.Shared;
using Hive.Framework.Networking.Tests.Messages;
Expand All @@ -21,6 +22,8 @@ protected override void RegisterHeartBeatMessage(QuicSession<ushort> session)
});
}

public override int SessionIdSize => 16;

public int ConnectedClient { get; private set; }
public int SigninMessageVal { get; private set; }
public int SignOutMessageVal { get; private set; }
Expand All @@ -32,7 +35,16 @@ protected override void RegisterHeartBeatMessage(QuicSession<ushort> session)

public override ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(QuicSession<ushort> session)
{
return Encoding.ASCII.GetBytes(GetSessionId(session).ToString("N"));
return GetSessionId(session).ToByteArray();
}

public override Guid ResolveSessionPrefix(ReadOnlyMemory<byte> payload)
{
// [LENGTH (2) | PACKET_FLAGS (4) | PACKET_ID | SESSION_ID | PAYLOAD]
const int startIndex = 2 + 4 + sizeof(ushort);
var sessionIdMemory = payload.Slice(startIndex, 16);

return new Guid(sessionIdMemory.Span);
}

protected override void InvokeOnClientDisconnected(Guid sessionId, QuicSession<ushort> session, bool isClientRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
using Hive.Framework.Networking.Shared;
using System.Net;
using System.Runtime.Versioning;
using Hive.Framework.Shared;
using Hive.Common.Codec.Shared;
using Hive.Framework.Shared.Helpers;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Quic;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using Hive.Framework.Networking.Quic;
using Hive.Framework.Networking.Shared;
using Hive.Framework.Shared;
using System.Net;
using System.Runtime.Versioning;
using Hive.Common.Codec.MemoryPack;
using Hive.Common.Codec.Shared;
using Hive.Framework.Shared.Helpers;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Quic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Hive.Framework.Codec.Protobuf;
using Hive.Framework.Networking.Quic;
using Hive.Framework.Networking.Shared;
using Hive.Framework.Shared;
using Hive.Framework.Shared.Helpers;
using System.Net;
using System.Runtime.Versioning;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ protected override void RegisterHeartBeatMessage(TcpSession<ushort> session)
});
}

public override int SessionIdSize => 16;

public int ConnectedClient { get; private set; }
public int SigninMessageVal { get; private set; }
public int SignOutMessageVal { get; private set; }
Expand All @@ -32,6 +34,15 @@ public override ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(TcpSession<ushor
return GetSessionId(session).ToByteArray();
}

public override Guid ResolveSessionPrefix(ReadOnlyMemory<byte> payload)
{
// [LENGTH (2) | PACKET_FLAGS (4) | PACKET_ID | SESSION_ID | PAYLOAD]
const int startIndex = 2 + 4 + sizeof(ushort);
var sessionIdMemory = payload.Slice(startIndex, 16);

return new Guid(sessionIdMemory.Span);
}

protected override void InvokeOnClientDisconnected(Guid sessionId, TcpSession<ushort> session, bool isClientRequest)
{
base.InvokeOnClientDisconnected(sessionId, session, isClientRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Hive.Framework.Codec.Bson;
using Hive.Framework.Networking.Shared;
using Hive.Framework.Networking.Tcp;
using Hive.Framework.Shared;
using Hive.Framework.Shared.Helpers;
using System.Net;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Tcp;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using Hive.Framework.Networking.Shared;
using Hive.Framework.Networking.Tcp;
using Hive.Framework.Shared;
using System.Net;
using Hive.Common.Codec.MemoryPack;
using Hive.Common.Codec.Shared;
using Hive.Framework.Shared.Helpers;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Tcp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Hive.Framework.Codec.Protobuf;
using Hive.Framework.Networking.Shared;
using Hive.Framework.Networking.Tcp;
using Hive.Framework.Shared;
using Hive.Framework.Shared.Helpers;
using System.Net;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Tcp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ protected override void RegisterHeartBeatMessage(UdpSession<ushort> session)
});
}

public override int SessionIdSize => 16;

public int ConnectedClient { get; private set; }
public int SigninMessageVal { get; private set; }
public int SignOutMessageVal { get; private set; }
Expand All @@ -30,7 +32,16 @@ protected override void RegisterHeartBeatMessage(UdpSession<ushort> session)

public override ReadOnlyMemory<byte> GetEncodedC2SSessionPrefix(UdpSession<ushort> session)
{
return Encoding.ASCII.GetBytes(GetSessionId(session).ToString("N"));
return GetSessionId(session).ToByteArray();
}

public override Guid ResolveSessionPrefix(ReadOnlyMemory<byte> payload)
{
// [LENGTH (2) | PACKET_FLAGS (4) | PACKET_ID | SESSION_ID | PAYLOAD]
const int startIndex = 2 + 4 + sizeof(ushort);
var sessionIdMemory = payload.Slice(startIndex, 16);

return new Guid(sessionIdMemory.Span);
}

protected override void InvokeOnClientDisconnected(Guid sessionId, UdpSession<ushort> session, bool isClientRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Hive.Framework.Codec.Bson;
using Hive.Framework.Networking.Shared;
using Hive.Framework.Networking.Udp;
using Hive.Framework.Shared;
using Hive.Framework.Shared.Helpers;
using System.Net;

namespace Hive.Framework.Networking.Tests.BasicNetworking.Udp;
Expand Down
Loading

0 comments on commit 12a5224

Please sign in to comment.