Skip to content
This repository has been archived by the owner on Dec 20, 2019. It is now read-only.

Commit

Permalink
Fixing bugs, changes in RemoteInvoker.
Browse files Browse the repository at this point in the history
  • Loading branch information
dajuric committed Dec 25, 2017
1 parent 1f7f16f commit 3768778
Show file tree
Hide file tree
Showing 21 changed files with 213 additions and 123 deletions.
Binary file removed .nuget/NuGet.exe
Binary file not shown.
11 changes: 1 addition & 10 deletions Deploy/Nuget/Push.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,10 @@
@echo off
timeout /T 5

:: settings
set nugetPath=%cd%\..\..\.nuget

::update NuGet
attrib -R "%nugetPath%\nuget.exe"
echo Updating NuGet...
"%nugetPath%\nuget.exe" update -Self

echo.
echo Pushing packages:
for /r %%f in (*.nupkg) do (
echo %%f
"%nugetPath%\nuget.exe" push "%%f" -Source https://www.nuget.org/api/v2/package
dotnet nuget push "%%f" --source https://www.nuget.org/api/v2/package
)

echo.
Expand Down
6 changes: 5 additions & 1 deletion Samples/ClientJs/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ static void Main(string[] args)

//start server and bind its local and remote API
var cts = new CancellationTokenSource();
var s = Server.ListenAsync("http://localhost:8001/", cts.Token, (c, ws) => c.Bind<LocalAPI, IRemoteAPI>(new LocalAPI()));
var s = Server.ListenAsync("http://localhost:8005/", cts.Token, (c, ws) =>
{
c.Bind<LocalAPI, IRemoteAPI>(new LocalAPI());
c.BindTimeout(TimeSpan.FromSeconds(1)); //close connection if no response after X seconds
});

Console.Write("Running: '{0}'. Press [Enter] to exit.", nameof(TestClientJs));
Console.ReadLine();
Expand Down
2 changes: 1 addition & 1 deletion Samples/ClientJs/Site/Index.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
}

//init API
var api = new LocalAPI("ws://localhost:8001");
var api = new LocalAPI("ws://localhost:8005");

//implement the interface
api.writeProgress = function (p)
Expand Down
4 changes: 2 additions & 2 deletions Samples/MultiService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class Program
static void Main(string[] args)
{
//generate js code
File.WriteAllText($"../Site/{nameof(NumericService)}.js", RPCJs.GenerateCallerWithDoc<NumericService>());
File.WriteAllText($"../Site/{nameof(TextService)}.js", RPCJs.GenerateCallerWithDoc<TextService>());
File.WriteAllText($"./Site/{nameof(NumericService)}.js", RPCJs.GenerateCallerWithDoc<NumericService>());
File.WriteAllText($"./Site/{nameof(TextService)}.js", RPCJs.GenerateCallerWithDoc<TextService>());

//start server and bind its local and remote APIs
var cts = new CancellationTokenSource();
Expand Down
2 changes: 1 addition & 1 deletion Source/WebSocketRPC.AspCore/WebSokcetRPCMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task Invoke(HttpContext context)
try
{
onConnect(context, connection);
await Connection.ListenReceiveAsync(connection, CancellationToken.None);
await connection.ListenReceiveAsync(CancellationToken.None);
}
finally
{
Expand Down
20 changes: 10 additions & 10 deletions Source/WebSocketRPC.Base/ClientServer/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
#endregion

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -43,14 +41,14 @@ public static class Client
/// <param name="uri">The target uri of the format: "ws://(address)/[path]".</param>
/// <param name="token">Cancellation token.</param>
/// <param name="onConnect">Action executed when connection is established.</param>
/// <param name="setOptions">Websocket option set method.</param>
/// <param name="reconnectOnError">True to reconnect on error, false otherwise.</param>
/// <param name="reconnectOnClose">True to reconnect on normal close request, false otherwise.</param>
/// <param name="secondsBetweenReconnect">The number of seconds between two reconnect attempts.</param>
/// <param name="setOptions">Websocket option set method.</param>
/// <returns>Client task.</returns>
/// <exception cref="Exception">Socket connection exception thrown in case when <paramref name="reconnectOnError"/> and <paramref name="reconnectOnClose"/> is set to false.</exception>
public static async Task ConnectAsync(string uri, CancellationToken token, Action<Connection> onConnect, Action<ClientWebSocketOptions> setOptions = null,
bool reconnectOnError = true, bool reconnectOnClose = false, int secondsBetweenReconnect = 0)
public static async Task ConnectAsync(string uri, CancellationToken token, Action<Connection> onConnect, bool reconnectOnError = true,
bool reconnectOnClose = false, int secondsBetweenReconnect = 0, Action<ClientWebSocketOptions> setOptions = null)
{
var isClosedSuccessfully = true;
var shouldReconnect = false;
Expand All @@ -61,10 +59,10 @@ public static async Task ConnectAsync(string uri, CancellationToken token, Actio
{
isClosedSuccessfully = await connectAsync(uri, token, onConnect, setOptions);
}
catch (Exception ex)
catch(Exception ex)
{
isClosedSuccessfully = false;
if (!reconnectOnError && !reconnectOnClose) throw ex;
if (!reconnectOnError && !reconnectOnClose) throw;
}

if (token.IsCancellationRequested)
Expand All @@ -88,18 +86,20 @@ static async Task<bool> connectAsync(string uri, CancellationToken token, Action
setOptions?.Invoke(webSocket.Options);
await webSocket.ConnectAsync(new Uri(uri), token);
}
catch (Exception ex)
catch(Exception ex)
{
webSocket?.Dispose();
throw ex;
throw;
}

var connection = new Connection(webSocket, CookieUtils.GetCookies(webSocket.Options.Cookies?.GetCookies(new Uri(uri))));
try
{
onConnection(connection);
await Connection.ListenReceiveAsync(connection, token);
await connection.ListenReceiveAsync(token);
}
catch (Exception ex)
{ }
finally
{
isClosedSuccessfully = webSocket.State != WebSocketState.Aborted;
Expand Down
59 changes: 21 additions & 38 deletions Source/WebSocketRPC.Base/ClientServer/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace WebSocketRPC
/// </summary>
public class Connection
{
internal static int MaxMessageSize { get; set; } = 64 * 1024; //x KiB
internal static int MaxMessageSize { get; set; } = 64 * 1024; //x KiB
static string messageToBig = "The message exceeds the maximum allowed message size: {0} bytes.";

WebSocket socket;
Expand All @@ -59,7 +59,7 @@ internal protected Connection(WebSocket socket, IReadOnlyDictionary<string, stri
/// <summary>
/// Gets the cookie collection.
/// </summary>
public IReadOnlyDictionary<string, string> Cookies { get; private set; }
public IReadOnlyDictionary<string, string> Cookies { get; private set; }

/// <summary>
/// Message receive event. Args: message, is text message.
Expand All @@ -78,27 +78,6 @@ internal protected Connection(WebSocket socket, IReadOnlyDictionary<string, stri
/// </summary>
public event Action<Exception> OnError;

/// <summary>
/// Sends the specified data.
/// </summary>
/// <param name="data">Binary data to send.</param>
/// <returns>True if the operation was successful, false otherwise.</returns>
public async Task<bool> SendAsync(ArraySegment<byte> data)
{
if (socket.State != WebSocketState.Open)
return false;

if (data.Count >= MaxMessageSize)
{
await CloseAsync(WebSocketCloseStatus.MessageTooBig, String.Format(messageToBig, MaxMessageSize));
return false;
}

Debug.WriteLine("Sending binary data.");
await sendTaskQueue.Enqueue(() => sendAsync(data, WebSocketMessageType.Binary));
return true;
}

/// <summary>
/// Sends the specified data.
/// </summary>
Expand All @@ -119,17 +98,21 @@ public async Task<bool> SendAsync(string data, Encoding e)
}

Debug.WriteLine("Sending: " + data);
//sendAsync(segment, WebSocketMessageType.Text).Wait();
await sendTaskQueue.Enqueue(() => sendAsync(segment, WebSocketMessageType.Text));
return true;
}

async Task sendAsync(ArraySegment<byte> data, WebSocketMessageType msgType)
{
if (socket.State != WebSocketState.Open)
return;

try
{
await socket.SendAsync(data, msgType, true, CancellationToken.None);
}
catch(Exception ex)
catch (Exception ex)
{
if (socket.State != WebSocketState.Open)
await CloseAsync(WebSocketCloseStatus.InternalServerError, ex.Message);
Expand Down Expand Up @@ -161,31 +144,30 @@ public async Task CloseAsync(WebSocketCloseStatus closeStatus = WebSocketCloseSt
/// <summary>
/// Listens for the receive messages for the specified connection.
/// </summary>
/// <param name="connection">Connection.</param>
/// <param name="token">Cancellation token.</param>
/// <returns>Task.</returns>
internal static async Task ListenReceiveAsync(Connection connection, CancellationToken token)
internal async Task ListenReceiveAsync(CancellationToken token)
{
var webSocket = connection.socket;
using (var registration = token.Register(() => connection.CloseAsync().Wait()))
using (var registration = token.Register(() => CloseAsync().Wait()))
{
try
{
connection.OnOpen?.Invoke();
byte[] receiveBuffer = new byte[RPCSettings.MaxMessageSize];
while (webSocket.State == WebSocketState.Open)
OnOpen?.Invoke();
byte[] receiveBuffer = new byte[MaxMessageSize];

while (socket.State == WebSocketState.Open)
{
WebSocketReceiveResult receiveResult = null;
var count = 0;
do
{
receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), CancellationToken.None);
var segment = new ArraySegment<byte>(receiveBuffer, count, MaxMessageSize - count);
receiveResult = await socket.ReceiveAsync(segment, CancellationToken.None);
count += receiveResult.Count;

if (count >= MaxMessageSize)
{
await connection.CloseAsync(WebSocketCloseStatus.MessageTooBig, String.Format(messageToBig, MaxMessageSize));
await CloseAsync(WebSocketCloseStatus.MessageTooBig, String.Format(messageToBig, MaxMessageSize));
return;
}
}
Expand All @@ -194,12 +176,12 @@ internal static async Task ListenReceiveAsync(Connection connection, Cancellatio

if (receiveResult.MessageType == WebSocketMessageType.Close)
{
await connection.CloseAsync();
await CloseAsync();
}
else
{
Debug.WriteLine("Received: " + new ArraySegment<byte>(receiveBuffer, 0, count).ToString(Encoding.ASCII));
connection.OnReceive?.Invoke(new ArraySegment<byte>(receiveBuffer, 0, count), receiveResult.MessageType == WebSocketMessageType.Text);
OnReceive?.Invoke(new ArraySegment<byte>(receiveBuffer, 0, count), receiveResult.MessageType == WebSocketMessageType.Text);
}

if (token.IsCancellationRequested)
Expand All @@ -208,8 +190,8 @@ internal static async Task ListenReceiveAsync(Connection connection, Cancellatio
}
catch (Exception ex)
{
connection.OnError?.Invoke(ex);
await connection.CloseAsync(WebSocketCloseStatus.InternalServerError, ex.Message);
OnError?.Invoke(ex);
await CloseAsync(WebSocketCloseStatus.InternalServerError, ex.Message);
//socket will be aborted -> no need to close manually
}
}
Expand All @@ -232,3 +214,4 @@ private void clearEvents()
}
}
}

67 changes: 67 additions & 0 deletions Source/WebSocketRPC.Base/Features/TimeoutBinder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using System;
using System.Collections.Generic;
using System.Timers;

namespace WebSocketRPC
{
/// <summary>
/// Timeout binder.
/// </summary>
class TimeoutBinder
{
Timer timer;
Connection connection;
string closeMessage;

/// <summary>
/// Creates new timeout binder.
/// </summary>
/// <param name="connection">Connection.</param>
/// <param name="timeout">Idle timeout.</param>
/// <param name="closeMessage">Close message.</param>
public TimeoutBinder(Connection connection, TimeSpan timeout, string closeMessage)
{
this.connection = connection;
this.closeMessage = closeMessage;

timer = new Timer(timeout.TotalMilliseconds);
timer.AutoReset = false;
timer.Elapsed += Timer_Elapsed;

connection.OnOpen += () => timer.Enabled = true;
connection.OnReceive += (msg, isText) =>
{
timer.Enabled = false;
timer.Enabled = true;
};
}

private async void Timer_Elapsed(object sender, ElapsedEventArgs e)
{
timer.Dispose();
await connection.CloseAsync(System.Net.WebSockets.WebSocketCloseStatus.PolicyViolation, closeMessage);
}
}

/// <summary>
/// Idle-timeout binder extension.
/// </summary>
public static class TimeoutBinderExtension
{
internal static List<TimeoutBinder> timeoutBinders = new List<TimeoutBinder>();

/// <summary>
/// Binds a new idle-timeout binder to the provided connection.
/// </summary>
/// <param name="connection">Connection.</param>
/// <param name="timeout">Idle-timeout. Interval is reset on each received message.</param>
/// <param name="closeMessage">Message sent if connection is closed due to timeout.</param>
public static void BindTimeout(this Connection connection, TimeSpan timeout, string closeMessage = "Idle time elapsed.")
{
var binder = new TimeoutBinder(connection, timeout, closeMessage);

lock (timeoutBinders) timeoutBinders.Add(binder);
connection.OnClose += () => { lock (timeoutBinders) timeoutBinders.Remove(binder); };
}
}
}
13 changes: 9 additions & 4 deletions Source/WebSocketRPC.Base/Invokers/LocalInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NameInfoPairs = System.Collections.Generic.Dictionary<string, System.Reflection.MethodInfo>;

Expand All @@ -42,7 +41,7 @@ class LocalInvoker<TObj>

public LocalInvoker()
{
cache.TryGetValue(typeof(TObj), out methods);
lock(cache) cache.TryGetValue(typeof(TObj), out methods);
if (methods != null) return;

var methodList = typeof(TObj).GetMethods(BindingFlags.Public | BindingFlags.Instance);
Expand All @@ -52,7 +51,7 @@ public LocalInvoker()

//initialize and cache it
methods = methodList.ToDictionary(x => x.Name, x => x);
cache[typeof(TObj)] = methods;
lock(cache) cache[typeof(TObj)] = methods;
}

static void verifyType(MethodInfo[] methodList)
Expand Down Expand Up @@ -89,7 +88,13 @@ public async Task<Response> InvokeAsync(TObj obj, Request clientMessage)
error = ex;
}

return new Response { FunctionName = clientMessage.FunctionName, ReturnValue = result, Error = error?.Message };
return new Response
{
FunctionName = clientMessage.FunctionName,
CallIndex = clientMessage.CallIndex,
ReturnValue = result,
Error = error?.Message
};
}

async Task<JToken> invokeAsync(TObj obj, string functionName, JToken[] args)
Expand Down
Loading

0 comments on commit 3768778

Please sign in to comment.