Skip to content

Commit

Permalink
fix bug: snappy compress causes connection close.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmoonlight committed Jul 24, 2024
1 parent 27af6ca commit 9f4e4e2
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 112 deletions.
76 changes: 29 additions & 47 deletions src/NSmartProxy.ClientRouter/Router.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using NSmartProxy.ClientRouter.Dispatchers;
using NSmartProxy.Data.Models;
using NSmartProxy.Infrastructure;
using System.IO.Compression;

namespace NSmartProxy.Client
{
Expand Down Expand Up @@ -97,7 +96,6 @@ public Router()
public Router(INSmartLogger logger) : this()
{
Logger = logger;
Global.Logger = logger;
}

//public Router SetConfiguration(string configstr)
Expand Down Expand Up @@ -458,7 +456,7 @@ private async Task OpenTransmission(int appId, TcpClient providerClient)
return;//tcp 开启隧道,并且不再利用此连接
case ControlMethod.ForceClose:
Logger.Info("客户端在别处被抢登,当前被强制下线。");
await Close();
Close();
return;
default: throw new Exception("非法请求:" + buffer[0]);
}
Expand Down Expand Up @@ -651,35 +649,28 @@ private async Task TcpTransferAsync(TcpClient providerClient, TcpClient toTarget
private async Task StreamTransfer(CancellationToken ct, NetworkStream fromStream, NetworkStream toStream,
string epString, ClientApp item)
{
//if (item.IsCompress)
//{
// await StringUtil.DecompressInSnappierAsync(fromStream, toStream, ct);
//}
//else
byte[] buffer = new byte[Global.ClientTunnelBufferSize];
using (fromStream)
{
byte[] buffer = new byte[Global.ClientTunnelBufferSize];
int bytesRead;
if (item.IsCompress)
{
using (GZipStream gZipStream = new GZipStream(fromStream, CompressionMode.Decompress))
while (!ct.IsCancellationRequested)
{
int bytesRead;
while ((bytesRead = await gZipStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
}
//Array.Resize(array: ref buffer, newSize: bytesRead);//此处存在copy,需要看看snappy是否支持偏移量数组
byte[] bufferCompressed = await fromStream.ReadNextQLengthBytes();
if (bufferCompressed.Length == 0) break;
var compressBuffer = StringUtil.DecompressInSnappier(bufferCompressed);
bytesRead = compressBuffer.Length;
await toStream.WriteAsync(compressBuffer, 0, bytesRead, ct).ConfigureAwait(false);
}
}
else
{
using (fromStream)
while ((bytesRead =
await fromStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
int bytesRead;
while ((bytesRead =
await fromStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);

}
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
}
}
}
Expand All @@ -692,35 +683,26 @@ await fromStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false))
private async Task ToStaticTransfer(CancellationToken ct, NetworkStream fromStream, NetworkStream toStream,
string epString, ClientApp item)
{
//if (item.IsCompress)
//{
// await StringUtil.CompressInSnappierAsync(fromStream, toStream, ct);
//}
//else
byte[] buffer = new byte[Global.ClientTunnelBufferSize];
using (fromStream)
{
byte[] buffer = new byte[Global.ClientTunnelBufferSize];
if (item.IsCompress)
int bytesRead;
while ((bytesRead =
await fromStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
using(GZipStream gZipStream = new GZipStream(fromStream, CompressionMode.Compress))
if (item.IsCompress)
{
int bytesRead;
while ((bytesRead =await gZipStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
}
Array.Resize(array: ref buffer, newSize: bytesRead);//此处存在copy,需要看看snappy是否支持偏移量数组
var compressInSnappy = StringUtil.CompressInSnappier(buffer);
//var compressedBuffer = compressInSnappy.ContentBytes;
bytesRead = compressInSnappy.Length;
//TODO 封包传送
if (ct.IsCancellationRequested) { Global.Logger.Info("=传输外部中止="); return; }
await toStream.WriteQLengthBytes(compressInSnappy).ConfigureAwait(false);
}
}
else
{
using (fromStream)
else
{
int bytesRead;
while ((bytesRead =
await fromStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{

await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
}
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
}
}
}
Expand Down
58 changes: 55 additions & 3 deletions src/NSmartProxy.Infrastructure/Extensions/StreamExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ public static async Task WriteDLengthBytes(this Stream stream, byte[] bytes)
await stream.WriteAsync(bytes);
}

/// <summary>
/// 写入动态长度的字节,头四字节存放长度
/// </summary>
/// <param name="stream"></param>
/// <param name="bytes"></param>
/// <returns></returns>
public static async Task WriteQLengthBytes(this Stream stream, byte[] bytes)
{
stream.Write(StringUtil.IntTo4Bytes(bytes.Length), 0, 4);
await stream.WriteAsync(bytes);
}

/// <summary>
/// 读取动态长度的字节,头两字节存放长度
/// </summary>
Expand All @@ -147,9 +159,49 @@ public static async Task<byte[]> ReadNextDLengthBytes(this Stream stream)
var readByte = await stream.ReadAsync(bt2, 0, 2);
byte[] bytes = null;
if (readByte > 0)
{//TODO 7这种写法会不会有问题
bytes = new byte[StringUtil.DoubleBytesToInt(bt2)];
await stream.ReadAsync(bytes, 0, StringUtil.DoubleBytesToInt(bt2));
{
int length = BitConverter.ToInt16(bt2, 0);
bytes = new byte[length];
await stream.ReadAsync(bytes, 0, length);
}
return bytes;
}

/// <summary>
/// 读取动态长度的字节,头四字节存放长度,这种方式最大支持2G的数据
/// 这种比ReadNextDLengthBytes支持更大数据,但是读取时候会造成中断,需要多次读取
/// </summary>
/// <param name="stream"></param>
/// <param name="bytes"></param>
/// <returns></returns>
public static async Task<byte[]> ReadNextQLengthBytes(this Stream stream)
{
byte[] bt2 = new byte[4];
var readByte = await stream.ReadAsync(bt2, 0, 4);
byte[] bytes = null;
if (readByte > 0)
{
int length = BitConverter.ToInt32(bt2, 0);
bytes = new byte[length];
int readedByteCount = await stream.ReadAsync(bytes, 0, length);
if (readedByteCount == 0)
{
return new byte[0];
}
int restLength = length - readedByteCount;
while (restLength > 0)
{
readedByteCount += await stream.ReadAsync(bytes, readedByteCount, restLength);
if (readedByteCount == 0)
{
return new byte[0];
}
restLength = length - readedByteCount;
}
}
else
{
return new byte[0];
}
return bytes;
}
Expand Down
12 changes: 12 additions & 0 deletions src/NSmartProxy.Infrastructure/StringUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,20 @@ public async static Task DecompressInSnappierAsync(Stream inputStream, Stream ou
await _snappyCompressor.Value.DecompressAsync(inputStream, outputStream, ct);
}

public static byte[] CompressInSnappier(byte[] inputBytes)
{
return _snappyCompressor.Value.Compress(inputBytes);
}

public static byte[] DecompressInSnappier(byte[] inputBytes)
{
return _snappyCompressor.Value.Decompress(inputBytes);
}

internal static byte[] IntTo4Bytes(int length)
{
return BitConverter.GetBytes(length);
}


/// <summary>
Expand Down
107 changes: 45 additions & 62 deletions src/NSmartProxy/Server.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
using NSmartProxy.Database;
using NSmartProxy.Infrastructure.Extension;
using System.Collections.Concurrent;
using System.IO.Compression;

namespace NSmartProxy
{
Expand Down Expand Up @@ -473,7 +472,7 @@ private async Task ProcessConsumeTcpRequestAsync(int consumerPort, TcpClient con
//TODO 如果NSPApp中是http,则需要进一步分离,通过GetHTTPClient来分出对应的client以建立隧道
//II.弹出先前已经准备好的socket
tunnel.ClientServerClient = s2pClient;
CancellationTokenSource transfering = new CancellationTokenSource();
CancellationTokenSource transfering = new CancellationTokenSource();
transferTokenDic.TryAdd(transfering.GetHashCode(), transfering);
Server.Logger.Debug("记录一个连接的中断控制token:" + transfering.Token.GetHashCode().ToString());
//✳关键过程✳
Expand Down Expand Up @@ -864,95 +863,79 @@ async Task OpenTcpTransmission(Stream consumerStream, Stream providerStream,

}


private async Task StreamTransfer(CancellationToken ct, Stream fromStream, Stream toStream, NSPApp nspApp)
{
//if (nspApp.IsCompress)
//{
// await StringUtil.DecompressInSnappierAsync(fromStream, toStream, ct);
//}
//else
using (fromStream)
{
using (fromStream)
byte[] buffer = new byte[Global.ServerTunnelBufferSize];
try
{
byte[] buffer = new byte[Global.ServerTunnelBufferSize];
int bytesRead;
if (nspApp.IsCompress)
{
//gzip
using (var gzipStream = new GZipStream(fromStream, CompressionMode.Decompress))
while (!ct.IsCancellationRequested)
{
int bytesRead;
while ((bytesRead = await gzipStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
ServerContext.TotalReceivedBytes += bytesRead; //下行
}
//Array.Resize(array: ref buffer, newSize: bytesRead);//此处存在copy,需要看看snappy是否支持偏移量数组
byte[] bufferCompressed = await fromStream.ReadNextQLengthBytes();
if (bufferCompressed.Length == 0) break;
var compressBuffer = StringUtil.DecompressInSnappier(bufferCompressed);
bytesRead = compressBuffer.Length;
await toStream.WriteAsync(compressBuffer, 0, bytesRead, ct).ConfigureAwait(false);
}
}
else
{
try
{
int bytesRead;
while ((bytesRead =
await fromStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
ServerContext.TotalSentBytes += bytesRead; //上行
}
}
catch (Exception ioe)
while ((bytesRead =
await fromStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
if (ioe is IOException) { return; } //Suppress this exception.
throw;

await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);

ServerContext.TotalSentBytes += bytesRead; //上行
}
}
}
catch (Exception ioe)
{
if (ioe is IOException) { return; } //Suppress this exception.
throw;
}
}
}


private async Task ToStaticTransfer(CancellationToken ct, Stream fromStream, Stream toStream, NSPApp nspApp)
{


using (fromStream)
{
using (fromStream)
byte[] buffer = new byte[Global.ServerTunnelBufferSize];
try
{
if (nspApp.IsCompress)
int bytesRead;
while ((bytesRead =
await fromStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
//gzip
using (var gzipStream = new GZipStream(fromStream, CompressionMode.Compress))
if (nspApp.IsCompress)
{
byte[] buffer = new byte[Global.ServerTunnelBufferSize];
int bytesRead;
while ((bytesRead = await gzipStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
ServerContext.TotalReceivedBytes += bytesRead; //下行
}
Array.Resize(array: ref buffer, newSize: bytesRead);//此处存在copy,需要看看snappy是否支持偏移量数组
var compressInSnappy = StringUtil.CompressInSnappier(buffer);
//var compressedBuffer = compressInSnappy.ContentBytes;
bytesRead = compressInSnappy.Length;
//TODO 封包传送
if (ct.IsCancellationRequested) { Global.Logger.Info("=传输外部中止="); return; }
await toStream.WriteQLengthBytes(compressInSnappy).ConfigureAwait(false);
}
}
else
{
byte[] buffer = new byte[Global.ServerTunnelBufferSize];
try
{
int bytesRead;
while ((bytesRead =
await fromStream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false)) != 0)
{
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
ServerContext.TotalReceivedBytes += bytesRead; //下行
}
}
catch (Exception ioe)
else
{
if (ioe is IOException) { return; } //Suppress this exception.
throw;
await toStream.WriteAsync(buffer, 0, bytesRead, ct).ConfigureAwait(false);
}
ServerContext.TotalReceivedBytes += bytesRead; //下行
}
}
catch (Exception ioe)
{
if (ioe is IOException) { return; } //Suppress this exception.
throw;
}
}
}

Expand Down

0 comments on commit 9f4e4e2

Please sign in to comment.