Skip to content
This repository has been archived by the owner on Dec 20, 2019. It is now read-only.

Commit

Permalink
Async events, improvements in exception handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
dajuric committed Dec 29, 2017
1 parent 4ff6874 commit 219fbc8
Show file tree
Hide file tree
Showing 13 changed files with 389 additions and 73 deletions.
5 changes: 3 additions & 2 deletions Samples/RawMsgJs/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using WebSocketRPC;

namespace RawMsgJs
Expand Down Expand Up @@ -30,8 +31,8 @@ static void Main(string[] args)
c.BindTimeout(TimeSpan.FromSeconds(30));

c.OnOpen += async () => await c.SendAsync("Hello from server using WebSocketRPC");
c.OnClose += () => Console.WriteLine("Connection closed.");
c.OnError += e => Console.WriteLine("Error: " + e.Message);
c.OnClose += () => { Console.WriteLine("Connection closed."); return Task.FromResult(true); };
c.OnError += e => { Console.WriteLine("Error: " + e.Message); return Task.FromResult(true); };

c.OnReceive += async msg =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace WebSocketRPC
/// <summary>
/// Provides the helper methods.
/// </summary>
public static class HelperExtensions
static class HelperExtensions
{
/// <summary>
/// Converts the specified binary data to a string data using the specified encoding.
Expand Down
212 changes: 155 additions & 57 deletions Source/WebSocketRPC.Base/ClientServer/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;

namespace WebSocketRPC
{
Expand All @@ -38,6 +39,8 @@ namespace WebSocketRPC
/// </summary>
public class Connection
{
#region Global settings

static int maxMessageSize = 64 * 1024; //x KiB
/// <summary>
/// Gets or sets the maximum message size in bytes [1..Int32.MaxValue].
Expand Down Expand Up @@ -72,6 +75,8 @@ public static Encoding Encoding

static string messageToBig = "The message exceeds the maximum allowed message size: {0} bytes.";

#endregion

WebSocket socket;
TaskQueue sendTaskQueue;

Expand All @@ -92,22 +97,99 @@ internal protected Connection(WebSocket socket, IReadOnlyDictionary<string, stri
/// </summary>
public IReadOnlyDictionary<string, string> Cookies { get; private set; }

#region Events

/// <summary>
/// Message receive event. Message is decoded using <seealso cref="Encoding"/>.
/// </summary>
public event Action<string> OnReceive;
public event Func<string, Task> OnReceive;
/// <summary>
/// Open event.
/// </summary>
public event Action OnOpen;
public event Func<Task> OnOpen;
/// <summary>
/// Close event.
/// </summary>
public event Action OnClose;
public event Func<Task> OnClose;
/// <summary>
/// Error event Args: exception.
/// </summary>
public event Action<Exception> OnError;
public event Func<Exception, Task> OnError;

/// <summary>
/// Invokes the error event.
/// </summary>
/// <param name="exception">Exception.</param>
internal void InvokeOnError(Exception exception)
{
if (OnError == null || exception == null)
return;

try
{
var members = OnError.GetInvocationList().Cast<Func<Exception, Task>>();

Task.WhenAll(members.Select(x => x(exception)))
.Wait(0);
}
catch (Exception ex) when (ex.InnerException is TaskCanceledException)
{ }
}

private void invokeOnOpen()
{
if (OnOpen == null)
return;

try
{
var members = OnOpen.GetInvocationList().Cast<Func<Task>>();

Task.WhenAll(members.Select(x=> x()))
.ContinueWith(t => InvokeOnError(t.Exception), TaskContinuationOptions.OnlyOnFaulted)
.Wait(0);
}
catch (Exception ex) when (ex.InnerException is TaskCanceledException)
{ }
}

private void invokeOnReceive(string msg)
{
if (OnReceive == null)
return;

try
{
var members = OnReceive.GetInvocationList().Cast<Func<string, Task>>();

Task.WhenAll(members.Select(x => x(msg)))
.ContinueWith(t => InvokeOnError(t.Exception), TaskContinuationOptions.OnlyOnFaulted)
.Wait(0);
}
catch (Exception ex) when (ex.InnerException is TaskCanceledException)
{ }
}

private void invokeOnClose()
{
if (OnClose == null)
return;

try
{
var members = OnClose.GetInvocationList().Cast<Func<Task>>();

Task.WhenAll(members.Select(x => x()))
.ContinueWith(t => InvokeOnError(t.Exception), TaskContinuationOptions.OnlyOnFaulted)
.Wait(0);
}
catch (Exception ex) when (ex.InnerException is TaskCanceledException)
{ }
}

#endregion

#region Send

/// <summary>
/// Sends the specified data as the text message type.
Expand Down Expand Up @@ -151,6 +233,10 @@ async Task sendAsync(ArraySegment<byte> data, WebSocketMessageType msgType)
}
}

#endregion

#region Close

/// <summary>
/// Closes the connection.
/// </summary>
Expand All @@ -164,15 +250,28 @@ public async Task CloseAsync(WebSocketCloseStatus closeStatus = WebSocketCloseSt
if (socket.State == WebSocketState.Open || socket.State == WebSocketState.CloseReceived)
await socket.CloseOutputAsync(closeStatus, statusDescription, CancellationToken.None);
}
catch
{ } //do not propagate the exception
catch(Exception ex)
{
InvokeOnError(ex);
}
finally
{
OnClose?.Invoke();
invokeOnClose();
clearEvents();
}
}

private void clearEvents()
{
OnClose = null;
OnError = null;
OnReceive = null;
}

#endregion

#region Receive (listen)

/// <summary>
/// Listens for the receive messages for the specified connection.
/// </summary>
Expand All @@ -184,68 +283,67 @@ internal async Task ListenReceiveAsync(CancellationToken token)
{
try
{
OnOpen?.Invoke();
byte[] receiveBuffer = new byte[MaxMessageSize];

while (socket.State == WebSocketState.Open)
{
WebSocketReceiveResult receiveResult = null;
var count = 0;
do
{
var segment = new ArraySegment<byte>(receiveBuffer, count, MaxMessageSize - count);
receiveResult = await socket.ReceiveAsync(segment, CancellationToken.None);
count += receiveResult.Count;

if (count >= MaxMessageSize)
{
await CloseAsync(WebSocketCloseStatus.MessageTooBig, String.Format(messageToBig, MaxMessageSize));
return;
}
}
while (receiveResult?.EndOfMessage == false);


if (receiveResult.MessageType == WebSocketMessageType.Close)
{
await CloseAsync();
}
else
{
var segment = new ArraySegment<byte>(receiveBuffer, 0, count);
var msg = segment.ToString(Encoding);
OnReceive?.Invoke(msg);
Debug.WriteLine("Received: " + msg);
}

if (token.IsCancellationRequested)
break;
}
await listenReceiveAsync(token);
}
catch (Exception ex)
{
OnError?.Invoke(ex);
InvokeOnError(ex);
await CloseAsync(WebSocketCloseStatus.InternalServerError, ex.Message);
//socket will be aborted -> no need to close manually
}
}
}

/// <summary>
/// Invokes the error event.
/// </summary>
/// <param name="ex">Exception.</param>
internal void InvokeError(Exception ex)
async Task listenReceiveAsync(CancellationToken token)
{
OnError?.Invoke(ex);
}
invokeOnOpen();
byte[] receiveBuffer = new byte[maxMessageSize];

private void clearEvents()
{
OnClose = null;
OnError = null;
OnReceive = null;
while (socket.State == WebSocketState.Open)
{
//receive
WebSocketReceiveResult receiveResult = null;
var count = 0;
do
{
var segment = new ArraySegment<byte>(receiveBuffer, count, maxMessageSize - count);
receiveResult = await socket.ReceiveAsync(segment, CancellationToken.None);
count += receiveResult.Count;

if (count >= maxMessageSize)
{
await CloseAsync(WebSocketCloseStatus.MessageTooBig, String.Format(messageToBig, maxMessageSize));
return;
}
}
while (receiveResult?.EndOfMessage == false);

//process response
switch (receiveResult.MessageType)
{
case WebSocketMessageType.Close:
await CloseAsync();
break;
case WebSocketMessageType.Binary:
InvokeOnError(new NotSupportedException($"Binary messages are not supported. Received {count} bytes."));
Debug.WriteLine("Received binary.");
break;
default:
var segment = new ArraySegment<byte>(receiveBuffer, 0, count);
var msg = segment.ToString(encoding);

invokeOnReceive(msg);
Debug.WriteLine("Received: " + msg);
break;
}

//check if cancellation is requested
if (token.IsCancellationRequested)
break;
}
}

#endregion
}
}

10 changes: 8 additions & 2 deletions Source/WebSocketRPC.Base/Features/TimeoutBinder.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Timers;

namespace WebSocketRPC
Expand Down Expand Up @@ -28,11 +29,12 @@ public TimeoutBinder(Connection connection, TimeSpan timeout, string closeMessag
timer.AutoReset = false;
timer.Elapsed += Timer_Elapsed;

connection.OnOpen += () => timer.Enabled = true;
connection.OnOpen += () => { timer.Enabled = true; return Task.FromResult(true); };
connection.OnReceive += msg =>
{
timer.Enabled = false;
timer.Enabled = true;
return Task.FromResult(true);
};
}

Expand Down Expand Up @@ -61,7 +63,11 @@ public static void BindTimeout(this Connection connection, TimeSpan timeout, str
var binder = new TimeoutBinder(connection, timeout, closeMessage);

lock (timeoutBinders) timeoutBinders.Add(binder);
connection.OnClose += () => { lock (timeoutBinders) timeoutBinders.Remove(binder); };
connection.OnClose += () =>
{
lock (timeoutBinders) timeoutBinders.Remove(binder);
return Task.FromResult(true);
};
}
}
}
Loading

0 comments on commit 219fbc8

Please sign in to comment.