Skip to content
This repository has been archived by the owner on Dec 20, 2019. It is now read-only.

Commit

Permalink
Async function support; action support.
Browse files Browse the repository at this point in the history
  • Loading branch information
dajuric committed Oct 14, 2017
1 parent c2604b4 commit 051e219
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 36 deletions.
8 changes: 4 additions & 4 deletions Samples/ServerClientSample/Client/Program.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using WebsocketRPC;

namespace TestClient
{
public interface ILocalAPI
{
int LongRunningTask(int a, int b);
Task<int> LongRunningTask(int a, int b);
}

public class RemoteAPI //:IRemoteAPI
{
public bool WriteProgress(float progress)
public void WriteProgress(float progress)
{
Console.WriteLine("\rCompleted: {0}%.", progress * 100);
return true;
}
}

Expand All @@ -37,7 +37,7 @@ static void Main(string[] args)
c.OnOpen += async () =>
{
var r = await RPC.For<ILocalAPI>().CallAsync(x => x.LongRunningTask(5, 3));
Console.WriteLine("Result: " + r.First());
Console.WriteLine("Result: " + r.First().Result);
};
})
.Wait(0);
Expand Down
2 changes: 1 addition & 1 deletion Samples/ServerClientSample/Server/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace TestServer
{
interface IRemoteAPI
{
bool WriteProgress(float progress);
void WriteProgress(float progress);
}

class LocalAPI
Expand Down
3 changes: 2 additions & 1 deletion Source/WebsocketRPC/ClientServer/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ internal static async Task ListenReceiveAsync(Connection connection, Cancellatio
{
try
{
byte[] receiveBuffer = new byte[1024];
byte[] receiveBuffer = new byte[RPCSettings.MaxMessageSize];

while (webSocket.State == WebSocketState.Open)
{
Expand Down Expand Up @@ -178,6 +178,7 @@ internal static async Task ListenReceiveAsync(Connection connection, Cancellatio
{
while (ex.InnerException != null) ex = ex.InnerException;
connection.OnError?.Invoke(ex);
connection.OnClose?.Invoke();
//socket will be aborted -> no need to close manually
}
}
Expand Down
11 changes: 0 additions & 11 deletions Source/WebsocketRPC/ClientServer/Server.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,6 @@

namespace WebsocketRPC
{
//sample:
/*Server.ListenAsync("http://+:8001/", CancellationToken.None, c =>
{
c.OnOpenAsync = () => Task.Run(() => Console.WriteLine("Connection opened."));
c.OnErrorAsync = e => Task.Run(() => Console.WriteLine("Error: " + e.Message));
c.OnCloseAsync = () => Task.Run(() => Console.WriteLine("Connection closed."));
c.OnReceiveAsync = (s, d) => Task.Run(() => Console.WriteLine("Received: " + d.AsString(Encoding.UTF8)));
})
.Wait();*/

/// <summary>
/// Websocket server.
/// </summary>
Expand Down
15 changes: 15 additions & 0 deletions Source/WebsocketRPC/ConnectionBinders/Binder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,24 @@ public Binder(Connection connection, TObj obj)

public TObj Object { get; private set; }

public async Task CallAsync(Expression<Action<TInterface>> functionExpression)
{
await rInvoker.InvokeAsync(functionExpression);
}

public async Task<TResult> CallAsync<TResult>(Expression<Func<TInterface, TResult>> functionExpression)
{
return await rInvoker.InvokeAsync(functionExpression);
}

public async Task<Task> CallAsync(Expression<Func<TInterface, Task>> functionExpression)
{
return await rInvoker.InvokeAsync(functionExpression);
}

public async Task<Task<TResult>> CallAsync<TResult>(Expression<Func<TInterface, Task<TResult>>> functionExpression)
{
return await rInvoker.InvokeAsync(functionExpression);
}
}
}
25 changes: 24 additions & 1 deletion Source/WebsocketRPC/ConnectionBinders/BinderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,33 @@ public interface IRemoteBinder<T> : IBinder
/// <summary>
/// Calls the RPC method.
/// </summary>
/// <typeparam name="TResult">Result.</typeparam>
/// <param name="functionExpression">Method getter.</param>
/// <returns>RPC invoking task.</returns>
Task CallAsync(Expression<Action<T>> functionExpression);

/// <summary>
/// Calls the RPC method.
/// </summary>
/// <typeparam name="TResult">Result.</typeparam>
/// <param name="functionExpression">Method getter.</param>
/// <returns>RPC result.</returns>
Task<TResult> CallAsync<TResult>(Expression<Func<T, TResult>> functionExpression);


/// <summary>
/// Calls the RPC method.
/// </summary>
/// <param name="functionExpression">Method getter.</param>
/// <returns>RPC task result.</returns>
Task<Task> CallAsync(Expression<Func<T, Task>> functionExpression);

/// <summary>
/// Calls the RPC method.
/// </summary>
/// <typeparam name="TResult">Result.</typeparam>
/// <param name="functionExpression">Method getter.</param>
/// <returns>RPC result.</returns>
Task<Task<TResult>> CallAsync<TResult>(Expression<Func<T, Task<TResult>>> functionExpression);
}

/// <summary>
Expand Down
15 changes: 15 additions & 0 deletions Source/WebsocketRPC/ConnectionBinders/RemoteBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,24 @@ public RemoteBinder(Connection connection)
};
}

public async Task CallAsync(Expression<Action<TInterface>> functionExpression)
{
await rInvoker.InvokeAsync(functionExpression);
}

public async Task<TResult> CallAsync<TResult>(Expression<Func<TInterface, TResult>> functionExpression)
{
return await rInvoker.InvokeAsync(functionExpression);
}

public async Task<Task> CallAsync(Expression<Func<TInterface, Task>> functionExpression)
{
return await rInvoker.InvokeAsync(functionExpression);
}

public async Task<Task<TResult>> CallAsync<TResult>(Expression<Func<TInterface, Task<TResult>>> functionExpression)
{
return await rInvoker.InvokeAsync(functionExpression);
}
}
}
33 changes: 30 additions & 3 deletions Source/WebsocketRPC/Invokers/LocalInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,21 @@ public async Task<Response> InvokeAsync(TObj obj, Request clientMessage)

try
{
var returnVal = await invokeAsync(methods[functionName], obj, argObjs);
var result = (returnVal != null) ? JToken.FromObject(returnVal, RPCSettings.Serializer) : null;
var hasResult = methods[functionName].ReturnType != typeof(void) &&
methods[functionName].ReturnType != typeof(Task);

JToken result = null;
if (hasResult)
{
var returnVal = await invokeWithResultAsync(methods[functionName], obj, argObjs);
result = (returnVal != null) ? JToken.FromObject(returnVal, RPCSettings.Serializer) : null;
}
else
{
await invokeAsync(methods[functionName], obj, argObjs);
result = JToken.FromObject(true);
}

return (result, null);
}
catch (Exception ex)
Expand All @@ -106,7 +119,19 @@ public async Task<Response> InvokeAsync(TObj obj, Request clientMessage)
}
}

async Task<object> invokeAsync(MethodInfo method, TObj obj, object[] args)
async Task invokeAsync(MethodInfo method, TObj obj, object[] args)
{
object returnVal = method.Invoke(obj, args);

//async method support
if (method.GetCustomAttribute<AsyncStateMachineAttribute>() != null)
{
var task = (Task)returnVal;
await task.ConfigureAwait(false);
}
}

async Task<object> invokeWithResultAsync(MethodInfo method, TObj obj, object[] args)
{
object returnVal = method.Invoke(obj, args);

Expand All @@ -122,5 +147,7 @@ async Task<object> invokeAsync(MethodInfo method, TObj obj, object[] args)

return returnVal;
}


}
}
49 changes: 42 additions & 7 deletions Source/WebsocketRPC/Invokers/RemoteInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,52 @@ public void Receive(Response response)
runningMethods[response.FunctionName].SetResult(response);
}


public async Task InvokeAsync(Expression<Action<TInterface>> functionExpression)
{
var (funcName, argVals) = getFunctionInfo(functionExpression);
var response = await invokeAsync(funcName, argVals);

if (response.Error != null)
throw new Exception(response.Error);
}

public async Task<TResult> InvokeAsync<TResult>(Expression<Func<TInterface, TResult>> functionExpression)
{
var (funcName, argVals) = getFunctionInfo(functionExpression);
var r = await invokeAsync<TResult>(funcName, argVals);
return r.Result;
var response = await invokeAsync(funcName, argVals);

var result = response.ReturnValue.ToObject<TResult>(RPCSettings.Serializer);
if (response.Error != null)
throw new Exception(response.Error);

return result;
}

public async Task<Task> InvokeAsync(Expression<Func<TInterface, Task>> functionExpression)
{
var (funcName, argVals) = getFunctionInfo(functionExpression);
var response = await invokeAsync(funcName, argVals);

if (response.Error != null)
throw new Exception(response.Error);

return Task.FromResult(true);
}

public async Task<Task<TResult>> InvokeAsync<TResult>(Expression<Func<TInterface, Task<TResult>>> functionExpression)
{
var (funcName, argVals) = getFunctionInfo(functionExpression);
var response = await invokeAsync(funcName, argVals);

var result = response.ReturnValue.ToObject<TResult>(RPCSettings.Serializer);
if (response.Error != null)
throw new Exception(response.Error);

return Task.FromResult(result);
}

async Task<(TResult Result, Exception Error)> invokeAsync<TResult>(string name, params object[] args)
async Task<Response> invokeAsync(string name, params object[] args)
{
if (sendAsync == null)
throw new Exception("The invoker is not initialized.");
Expand All @@ -108,10 +146,7 @@ public async Task<TResult> InvokeAsync<TResult>(Expression<Func<TInterface, TRes
var response = runningMethods[name].Task.Result;
runningMethods.Remove(name);

var result = response.ReturnValue.ToObject<TResult>(RPCSettings.Serializer);
var ex = response.Error;

return (result, (ex != null) ? new Exception(ex) : null);
return response;
}

//the idea is taken from: https://stackoverflow.com/questions/3766698/get-end-values-from-lambda-expressions-method-parameters?rq=1
Expand Down
80 changes: 72 additions & 8 deletions Source/WebsocketRPC/RPC.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static IBinder Bind<TInterface>(this Connection connection)
}

/// <summary>
/// Gets all two-way or one-way sending binders.
/// Gets all two-way or one-way remote binders.
/// </summary>
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <returns>Binders.</returns>
Expand Down Expand Up @@ -113,14 +113,45 @@ public static IEnumerable<IRemoteBinder<TInterface>> For<TInterface>(object obj)
return binders;
}

/// <summary>
/// Gets whether the data contain RPC message or not.
/// </summary>
/// <param name="data">Received data.</param>
/// <returns>True if the data contain RPC message, false otherwise.</returns>
public static bool IsRPC(this ArraySegment<byte> data)
{
var str = data.ToString(Encoding.ASCII);
return !Request.FromJson(str).IsEmpty || !Response.FromJson(str).IsEmpty;
}


/// <summary>
/// Calls the remote method.
/// </summary>
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <param name="binders">Remote binder collection.</param>
/// <param name="functionExpression">Method getter.</param>
/// <returns>The RPC task.</returns>
public static async Task CallAsync<TInterface>(this IEnumerable<IRemoteBinder<TInterface>> binders, Expression<Action<TInterface>> functionExpression)
{
var tasks = new List<Task>();
foreach (var b in binders)
{
var t = b.CallAsync(functionExpression);
tasks.Add(t);
}

await Task.WhenAll(tasks);
}

/// <summary>
/// Calls the remote method.
/// </summary>
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <typeparam name="TResult">Method result type.</typeparam>
/// <param name="binders">Remote binder collection.</param>
/// <param name="functionExpression">Method getter.</param>
/// <returns>The collection of the RPC invoking tasks.</returns>
/// <returns>The collection of results.</returns>
public static async Task<TResult[]> CallAsync<TInterface, TResult>(this IEnumerable<IRemoteBinder<TInterface>> binders, Expression<Func<TInterface, TResult>> functionExpression)
{
var tasks = new List<Task<TResult>>();
Expand All @@ -139,14 +170,47 @@ public static async Task<TResult[]> CallAsync<TInterface, TResult>(this IEnumera
}

/// <summary>
/// Gets whether the data contain RPC message or not.
/// Calls the remote method.
/// </summary>
/// <param name="data">Received data.</param>
/// <returns>True if the data contain RPC message, false otherwise.</returns>
public static bool IsRPC(this ArraySegment<byte> data)
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <param name="binders">Remote binder collection.</param>
/// <param name="functionExpression">Method getter.</param>
/// <returns>The RPC task.</returns>
public static async Task CallAsync<TInterface>(this IEnumerable<IRemoteBinder<TInterface>> binders, Expression<Func<TInterface, Task>> functionExpression)
{
var str = data.ToString(Encoding.ASCII);
return !Request.FromJson(str).IsEmpty || !Response.FromJson(str).IsEmpty;
var tasks = new List<Task>();
foreach (var b in binders)
{
var t = b.CallAsync(functionExpression);
tasks.Add(t);
}

await Task.WhenAll(tasks);
}

/// <summary>
/// Calls the remote method.
/// </summary>
/// <typeparam name="TInterface">Interface type.</typeparam>
/// <typeparam name="TResult">Method result type.</typeparam>
/// <param name="binders">Remote binder collection.</param>
/// <param name="functionExpression">Method getter.</param>
/// <returns>The collection of results.</returns>
public static async Task<Task<TResult>[]> CallAsync<TInterface, TResult>(this IEnumerable<IRemoteBinder<TInterface>> binders, Expression<Func<TInterface, Task<TResult>>> functionExpression)
{
var tasks = new List<Task<Task<TResult>>>();
foreach (var b in binders)
{
var t = b.CallAsync(functionExpression);
tasks.Add(t);
}

await Task.WhenAll(tasks);
var results = tasks.Where(x => x.Status == TaskStatus.RanToCompletion)
.Select(x => x.Result)
.ToArray();

return results;
}
}
}

0 comments on commit 051e219

Please sign in to comment.