Skip to content

Commit

Permalink
Improved keep alive logic
Browse files Browse the repository at this point in the history
  • Loading branch information
alessiobianchini committed Dec 31, 2024
1 parent 00ca710 commit dd06bc2
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 21 deletions.
96 changes: 91 additions & 5 deletions Tunnelize/Services/TunnelManager.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Net.WebSockets;
using System;
using System.Net.WebSockets;
using System.Text;

public class TunnelManager
{
private readonly Dictionary<string, WebSocket> _tunnels = new();
private readonly Dictionary<string, DateTime> _lastActivityTracker = new(); // Track last activity per tunnel

public async Task HandleTunnelConnection(string tunnelId, WebSocket webSocket)
{
Expand All @@ -12,8 +14,76 @@ public async Task HandleTunnelConnection(string tunnelId, WebSocket webSocket)
if (!_tunnels.ContainsKey(tunnelId))
{
_tunnels[tunnelId] = webSocket;
_lastActivityTracker[tunnelId] = DateTime.UtcNow;
}

var buffer = new byte[1024 * 1024];
var cts = new CancellationTokenSource();
var keepAliveInterval = TimeSpan.FromMinutes(20);

_ = Task.Run(async () =>
{
while (webSocket.State == WebSocketState.Open)
{
try
{
if (_lastActivityTracker.ContainsKey(tunnelId) && DateTime.UtcNow - _lastActivityTracker[tunnelId] >= keepAliveInterval)
{
Console.WriteLine($"[DEBUG] No activity for 5 minutes. Sending ping to client for tunnel {tunnelId}");
await webSocket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes("ping")), WebSocketMessageType.Text, true, CancellationToken.None);

try
{
while (true)
{
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), cts.Token);

if (result.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed connection", CancellationToken.None);
throw new Exception($"WebSocket connection closed by client for tunnel {tunnelId}");
}

if (result.MessageType == WebSocketMessageType.Text)
{
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);

if (message == "pong")
{
Console.WriteLine($"[DEBUG] Pong received from client for tunnel {tunnelId}");
}
else
{
Console.WriteLine($"[DEBUG] Received message for tunnel {tunnelId}: {message}");
}

if (_lastActivityTracker.ContainsKey(tunnelId))
{
_lastActivityTracker[tunnelId] = DateTime.UtcNow;
}
}

if (result.EndOfMessage)
{
break;
}
}
}
catch (OperationCanceledException)
{
throw new Exception("Timeout while waiting for pong from WebSocket client.");
}
}
await Task.Delay(TimeSpan.FromSeconds(30), cts.Token);
}
catch (Exception ex)
{
Console.WriteLine($"[ERROR] Ping failed for tunnel {tunnelId}: {ex.Message}");
break;
}
}
});

try
{
while (webSocket.State == WebSocketState.Open)
Expand All @@ -26,11 +96,18 @@ public async Task HandleTunnelConnection(string tunnelId, WebSocket webSocket)
}
finally
{
cts.Cancel();

if (_tunnels.ContainsKey(tunnelId))
{
_tunnels.Remove(tunnelId);
Console.WriteLine($"[INFO] Tunnel {tunnelId} removed.");
}

if (_lastActivityTracker.ContainsKey(tunnelId))
{
_lastActivityTracker.Remove(tunnelId);
}
}
}

Expand All @@ -45,6 +122,11 @@ public async Task ForwardRequestToClient(string tunnelId, string message)
var buffer = Encoding.UTF8.GetBytes(message);

await webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);

if (_lastActivityTracker.ContainsKey(tunnelId))
{
_lastActivityTracker[tunnelId] = DateTime.UtcNow;
}
}

public async Task<string> ForwardRequestToWSClient(string tunnelId, string message)
Expand All @@ -59,9 +141,14 @@ public async Task<string> ForwardRequestToWSClient(string tunnelId, string messa

await webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);

if (_lastActivityTracker.ContainsKey(tunnelId))
{
_lastActivityTracker[tunnelId] = DateTime.UtcNow;
}

var responseBuffer = new byte[1024 * 1024 * 5];
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
var completeResponse = new List<byte>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
var completeResponse = new List<byte>();

try
{
Expand Down Expand Up @@ -91,5 +178,4 @@ public async Task<string> ForwardRequestToWSClient(string tunnelId, string messa
var jsonResponse = Encoding.UTF8.GetString(completeResponse.ToArray());
return jsonResponse;
}

}
}
23 changes: 7 additions & 16 deletions TunnelizeClient/src/tunnelize.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,9 @@ function connectToWebSocket(protocol, port, tunnelId = null) {
const MAX_BUFFER_SIZE = 1024 * 1024 * 5;
let wssUrl = !!tunnelId ? `wss://${url}/ws/${tunnelId}` : `wss://${url}/ws`;
const ws = new WebSocket(wssUrl, { maxPayload: MAX_BUFFER_SIZE });
let isAlive = true;


ws.on('open', () => {
console.info('[INFO] Connection established with the proxy');
setInterval(() => {
console.info(`[DEBUG] Sending ping. Connection alive: ${isAlive}`);
if (isAlive) {
ws.ping();
isAlive = false;
} else {
console.error('[ERROR] WebSocket connection appears to be dead. Reconnecting...');
ws.terminate();
}
}, 60000);
});

ws.on('pong', () => {
isAlive = true;
});

ws.on('close', () => {
Expand All @@ -47,6 +32,12 @@ function connectToWebSocket(protocol, port, tunnelId = null) {
ws.on('message', (data) => {
const message = data.toString('utf8');

if (message === 'ping') {
console.debug('[DEBUG] Ping received from proxy. Sending pong...');
ws.send('pong', { fin: true });
return;
}

if (isTunnelId(message)) {
console.log(`\n✅ Tunnel ID received: ${message}\nYou can use now https://${url}/${message}/*?param=abc`);
} else {
Expand Down

0 comments on commit dd06bc2

Please sign in to comment.