Skip to content
This repository has been archived by the owner on Dec 20, 2019. It is now read-only.

Commit

Permalink
Removing LocalRemoteBinder, concurent execution of the functions with…
Browse files Browse the repository at this point in the history
… the same name and the same connection, bug fixing.
  • Loading branch information
dajuric committed Dec 28, 2017
1 parent 3768778 commit 16b1ddf
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 186 deletions.
10 changes: 0 additions & 10 deletions Samples/Serialization/packages.config

This file was deleted.

7 changes: 2 additions & 5 deletions Source/WebSocketRPC.Base/ClientServer/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static async Task ConnectAsync(string uri, CancellationToken token, Actio
{
isClosedSuccessfully = await connectAsync(uri, token, onConnect, setOptions);
}
catch(Exception ex)
catch(Exception)
{
isClosedSuccessfully = false;
if (!reconnectOnError && !reconnectOnClose) throw;
Expand All @@ -86,7 +86,7 @@ static async Task<bool> connectAsync(string uri, CancellationToken token, Action
setOptions?.Invoke(webSocket.Options);
await webSocket.ConnectAsync(new Uri(uri), token);
}
catch(Exception ex)
catch(Exception)
{
webSocket?.Dispose();
throw;
Expand All @@ -98,8 +98,6 @@ static async Task<bool> connectAsync(string uri, CancellationToken token, Action
onConnection(connection);
await connection.ListenReceiveAsync(token);
}
catch (Exception ex)
{ }
finally
{
isClosedSuccessfully = webSocket.State != WebSocketState.Aborted;
Expand All @@ -109,6 +107,5 @@ static async Task<bool> connectAsync(string uri, CancellationToken token, Action
return isClosedSuccessfully;
}


}
}
7 changes: 4 additions & 3 deletions Source/WebSocketRPC.Base/ClientServer/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,9 @@ internal async Task ListenReceiveAsync(CancellationToken token)
}
else
{
Debug.WriteLine("Received: " + new ArraySegment<byte>(receiveBuffer, 0, count).ToString(Encoding.ASCII));
OnReceive?.Invoke(new ArraySegment<byte>(receiveBuffer, 0, count), receiveResult.MessageType == WebSocketMessageType.Text);
var segment = new ArraySegment<byte>(receiveBuffer, 0, count);
OnReceive?.Invoke(segment, receiveResult.MessageType == WebSocketMessageType.Text);
Debug.WriteLine("Received: " + segment.ToString(RPCSettings.Encoding));
}

if (token.IsCancellationRequested)
Expand All @@ -201,7 +202,7 @@ internal async Task ListenReceiveAsync(CancellationToken token)
/// Invokes the error event.
/// </summary>
/// <param name="ex">Exception.</param>
internal void InvokeErrorAsync(Exception ex)
internal void InvokeError(Exception ex)
{
OnError?.Invoke(ex);
}
Expand Down
6 changes: 3 additions & 3 deletions Source/WebSocketRPC.Base/Invokers/LocalInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static void verifyType(MethodInfo[] methodList)
{
//check constraints
if (typeof(TObj).IsInterface)
throw new Exception("The specified type must be a class.");
throw new Exception("The specified type must be a class or struct.");

var overloadedMethodNames = methodList.GroupBy(x => x.Name)
.DefaultIfEmpty()
Expand All @@ -70,7 +70,7 @@ static void verifyType(MethodInfo[] methodList)

var propertyList = typeof(TObj).GetProperties(BindingFlags.Public | BindingFlags.Instance);
if (propertyList.Any())
throw new NotSupportedException("The class must not declare any properties: " + String.Join(", ", propertyList.Select(x => x.Name)) + ".");
throw new NotSupportedException("Properties are not permitted: " + String.Join(", ", propertyList.Select(x => x.Name)) + ".");
}

public async Task<Response> InvokeAsync(TObj obj, Request clientMessage)
Expand All @@ -91,7 +91,7 @@ public async Task<Response> InvokeAsync(TObj obj, Request clientMessage)
return new Response
{
FunctionName = clientMessage.FunctionName,
CallIndex = clientMessage.CallIndex,
CallId = clientMessage.CallId,
ReturnValue = result,
Error = error?.Message
};
Expand Down
9 changes: 5 additions & 4 deletions Source/WebSocketRPC.Base/Invokers/Messages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Linq;

namespace WebSocketRPC
{
struct Request
{
public string FunctionName;
public int CallIndex;
public string CallId;
public JToken[] Arguments;

public static Request FromJson(string json)
Expand All @@ -44,7 +45,7 @@ public static Request FromJson(string json)
var r = new Request
{
FunctionName = root[nameof(FunctionName)]?.Value<string>(),
CallIndex = root[nameof(CallIndex)]?.Value<int>() ?? 0,
CallId = root[nameof(CallId)]?.Value<string>() ?? Guid.Empty.ToString(),
Arguments = root[nameof(Arguments)]?.Children().ToArray()
};

Expand All @@ -65,7 +66,7 @@ public string ToJson()
struct Response
{
public string FunctionName;
public int CallIndex;
public string CallId;
public JToken ReturnValue;
public string Error;

Expand All @@ -78,7 +79,7 @@ public static Response FromJson(string json)
var r = new Response
{
FunctionName = root[nameof(FunctionName)]?.Value<string>(),
CallIndex = root[nameof(CallIndex)]?.Value<int>() ?? 0,
CallId = root[nameof(CallId)]?.Value<string>() ?? Guid.Empty.ToString(),
ReturnValue = root[nameof(ReturnValue)]?.Value<JToken>(),
Error = root[nameof(Error)]?.Value<string>()
};
Expand Down
20 changes: 3 additions & 17 deletions Source/WebSocketRPC.Base/Invokers/RemoteInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ class RemoteInvoker<TInterface>

Func<Request, Task> sendAsync;
ConcurrentDictionary<string, TaskCompletionSource<Response>> runningMethods;
ConcurrentDictionary<string, SemaphoreSlim> methodConcurencySyncs;

public RemoteInvoker()
{
runningMethods = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
methodConcurencySyncs = new ConcurrentDictionary<string, SemaphoreSlim>();
if (verifiedTypes.Contains(typeof(TInterface))) return;

//verify constraints
Expand Down Expand Up @@ -75,7 +73,7 @@ public void Initialize(Func<Request, Task> sendAsync)

public void Receive(Response response)
{
var key = response.FunctionName + "-" + response.CallIndex;
var key = response.FunctionName + "-" + response.CallId;

lock (runningMethods)
{
Expand Down Expand Up @@ -127,38 +125,26 @@ public async Task<TResult> InvokeAsync<TResult>(Expression<Func<TInterface, Task
return result;
}

ConcurrentDictionary<string, int> callIndex = new ConcurrentDictionary<string, int>();
async Task<Response> invokeAsync(string name, params object[] args)
{
if (sendAsync == null)
throw new Exception("The invoker is not initialized.");

//Console.WriteLine("Queue: " + name + " Task: " + Thread.CurrentThread.ManagedThreadId);
//methodConcurencySyncs.GetOrAdd(name, new SemaphoreSlim(1));
//await methodConcurencySyncs[name].WaitAsync(); //wait for the previous task (functions with the same name are run sequentially)

var msg = new Request
{
FunctionName = name,
CallIndex = callIndex.AddOrUpdate(name, 0, (k, v) => v + 1),
CallId = Guid.NewGuid().ToString(),
Arguments = args.Select(a => JToken.FromObject(a, RPCSettings.Serializer)).ToArray()
};

//Console.WriteLine("Invoking: " + name + " Task: " + Thread.CurrentThread.ManagedThreadId);
var key = msg.FunctionName + "-" + msg.CallIndex;
var key = msg.FunctionName + "-" + msg.CallId;

runningMethods[key] = new TaskCompletionSource<Response>();
await sendAsync(msg);
await runningMethods[key].Task;

var response = runningMethods[key].Task.Result;
runningMethods.TryRemove(key, out TaskCompletionSource<Response> _);

//Console.WriteLine("End invoke: " + name + " Task: " + Thread.CurrentThread.ManagedThreadId);

key = msg.FunctionName + "-" + msg.CallIndex;
callIndex.TryRemove(key, out int _);
//methodConcurencySyncs[name].Release();
return response;
}

Expand Down
110 changes: 73 additions & 37 deletions Source/WebSocketRPC.Base/RPC.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,44 +42,67 @@ public static class RPC
/// </summary>
internal static readonly List<IBinder> AllBinders = new List<IBinder>();

#region Bind

/// <summary>
/// Creates two-way RPC receiving-sending binding for the provided connection.
/// Creates one way RPC receiving binding for the provided connection.
/// </summary>
/// <typeparam name="TObj">Object type.</typeparam>
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <param name="connection">Existing connection to bind to.</param>
/// <param name="obj">Object to bind to.</param>
/// <returns>Binder.</returns>
public static IBinder Bind<TObj, TInterface>(this Connection connection, TObj obj)
public static ILocalBinder<TObj> Bind<TObj>(this Connection connection, TObj obj)
{
return new LocalRemoteBinder<TObj, TInterface>(connection, obj);
if (AllBinders.ToArray().OfType<ILocalBinder<TObj>>().Any(x => x.Connection == connection))
throw new NotSupportedException("Only one local binder is permitted.");

return new LocalBinder<TObj>(connection, obj);
}

/// <summary>
/// Creates one way RPC receiving binding for the provided connection.
/// Creates one way RPC sending binding for the provided connection.
/// </summary>
/// <typeparam name="TObj">Object type.</typeparam>
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <param name="connection">Existing connection to bind to.</param>
/// <param name="obj">Object to bind to.</param>
/// <returns>Binder.</returns>
public static IBinder Bind<TObj>(this Connection connection, TObj obj)
public static IRemoteBinder<TInterface> Bind<TInterface>(this Connection connection)
{
return new LocalBinder<TObj>(connection, obj);
if (AllBinders.ToArray().OfType<IRemoteBinder<TInterface>>().Any(x => x.Connection == connection))
throw new NotSupportedException("Only one remote binder is permitted.");

return new RemoteBinder<TInterface>(connection);
}

/// <summary>
/// Creates one way RPC sending binding for the provided connection.
/// Creates two way RPC sending binding for the provided connection.
/// <para>Shorthand for binding local and remote binder separately.</para>
/// </summary>
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <param name="connection">Existing connection to bind to.</param>
/// <returns>Binder.</returns>
public static IBinder Bind<TInterface>(this Connection connection)
/// <returns>Loca and remote binder.</returns>
public static (ILocalBinder<TObj>, IRemoteBinder<TInterface>) Bind<TObj, TInterface>(this Connection connection, TObj obj)
{
return new RemoteBinder<TInterface>(connection);
return (connection.Bind(obj), connection.Bind<TInterface>());
}

#endregion


#region For

/// <summary>
/// Gets all two-way or one-way remote binders.
/// Gets all the binders associated with the specified connection.
/// </summary>
/// <param name="connection">Connection.</param>
/// <returns>Binders associated with the connection.</returns>
public static IEnumerable<IBinder> For(Connection connection)
{
return AllBinders.ToArray()
.Where(x => x.Connection == connection);
}

/// <summary>
/// Gets all one-way remote binders.
/// </summary>
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <returns>Binders.</returns>
Expand All @@ -89,42 +112,38 @@ public static IEnumerable<IRemoteBinder<TInterface>> For<TInterface>()
}

/// <summary>
/// Gets all two-way binders associated with the specified object.
/// Gets all remote binders which connection also have local binder(s) associated with the specified object.
/// </summary>
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <param name="obj">Target object.</param>
/// <returns>Binders.</returns>
/// <returns>Remote binders.</returns>
public static IEnumerable<IRemoteBinder<TInterface>> For<TInterface>(object obj)
{
var lBinderType = typeof(ILocalBinder<>).MakeGenericType(obj.GetType());
var lObjBinders = AllBinders.ToArray() // prevent 'Collection was modified'
.Where(x =>
{
var xType = x.GetType();

var binders = AllBinders.ToArray() // prevent 'Collection was modified'
.OfType<IRemoteBinder<TInterface>>()
.Where(x =>
{
var xType = x.GetType();
var isLocalBinder = lBinderType.IsAssignableFrom(xType);
if (!isLocalBinder) return false;

var isLocalBinder = lBinderType.IsAssignableFrom(xType);
if (!isLocalBinder) return false;
var isObjBinder = xType.GetProperty(nameof(ILocalBinder<object>.Object)).GetValue(x, null) == obj;
return isObjBinder;
});

var isObjBinder = xType.GetProperty(nameof(ILocalBinder<object>.Object)).GetValue(x, null) == obj;
return isObjBinder;
});

return binders;
}
var rObjBinders = AllBinders.ToArray() // prevent 'Collection was modified'
.OfType<IRemoteBinder<TInterface>>()
.Where(rb => lObjBinders.Any(lb => lb.Connection == rb.Connection));

/// <summary>
/// Gets whether the data contain RPC message or not.
/// </summary>
/// <param name="data">Received data.</param>
/// <returns>True if the data contain RPC message, false otherwise.</returns>
public static bool IsRpcMessage(this ArraySegment<byte> data)
{
var str = data.ToString(Encoding.ASCII);
return !Request.FromJson(str).IsEmpty || !Response.FromJson(str).IsEmpty;
return rObjBinders;
}

#endregion


#region Call

/// <summary>
/// Calls the remote method.
Expand Down Expand Up @@ -213,5 +232,22 @@ public static async Task<TResult[]> CallAsync<TInterface, TResult>(this IEnumera

return results;
}

#endregion

#region Misc

public static int ConnectionCount
{
get
{
return AllBinders.ToArray()
.Select(x => x.Connection)
.Distinct()
.Count();
}
}

#endregion
}
}
6 changes: 3 additions & 3 deletions Source/WebSocketRPC.Base/RpcBinders/Base/Binder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ public abstract class Binder : IBinder
protected Binder(Connection connection)
{
Connection = connection;
RPC.AllBinders.Add(this);
lock(RPC.AllBinders) RPC.AllBinders.Add(this);

Connection.OnClose += () =>
{
Debug.WriteLine("Close");

RPC.AllBinders.Remove(this);
lock (RPC.AllBinders) RPC.AllBinders.Remove(this);
};

Connection.OnError += e =>
{
Debug.WriteLine("Error");

RPC.AllBinders.Remove(this);
lock (RPC.AllBinders) RPC.AllBinders.Remove(this);
};
}
}
Expand Down
Loading

0 comments on commit 16b1ddf

Please sign in to comment.