Skip to content

Parallelization

Oddbjørn Bakke edited this page May 10, 2021 · 5 revisions

The code under might be the default at some point for all plugins. But for now, to keep things simple, we only have a synchronous implementation.

Please tell us if you do something similar to the example under, and want us to either update the documentation, and/or references to your projects as an example.

For simple parallelization you can make a extension that executes each event on a ThreadPool thread instead.

static void Main(string[] args)
{
    var samplePlugin = new SamplePlugin();
    samplePlugin.Run();
}

public class SamplePlugin : ITouchPortalAsyncEventHandler
{
    public string PluginId => "Plugin.Id"; //Replace "Plugin.Id" with your unique id.

    private readonly ITouchPortalClient _client;

    public SamplePlugin()
    {
       var eventHandler = new TouchPortalEventHandler(this);
       
       //Static factory method TouchPortalFactory:CreateClient:
       //Optionally you can also provide a TouchPortalOptions,
       // or a loggerFactory parameter to configure the SDK
        _client = TouchPortalFactory.CreateClient(eventHandler );
    }

    public void Run()
    {
        _client.Connect();
    }
    ...
}

public interface ITouchPortalAsyncEventHandler
{
    string PluginId { get; }
    Task OnInfoEvent(InfoEvent message, CancellationToken cancellationToken = default);
    Task OnListChangedEvent(ListChangeEvent message, CancellationToken cancellationToken = default);
    Task OnBroadcastEvent(BroadcastEvent message, CancellationToken cancellationToken = default);
    Task OnSettingsEvent(SettingsEvent message, CancellationToken cancellationToken = default);
    Task OnActionEvent(ActionEvent message, CancellationToken cancellationToken = default);
    Task OnClosedEvent(string message, CancellationToken cancellationToken = default);
    Task OnUnhandledEvent(string jsonMessage, CancellationToken cancellationToken = default);
}

public class TouchPortalEventHandler : ITouchPortalEventHandler
{
    private readonly ITouchPortalAsyncEventHandler _asyncEventHandler;
    private readonly CancellationToken _cancellationToken;

    string ITouchPortalEventHandler.PluginId => _asyncEventHandler.PluginId;

    public TouchPortalEventHandler(ITouchPortalAsyncEventHandler asyncEventHandler,
                                   CancellationToken cancellationToken = default)
    {
        _asyncEventHandler = asyncEventHandler
            ?? throw new ArgumentNullException(nameof(asyncEventHandler));

        _cancellationToken = cancellationToken;
    }

    async void ExecuteEvent<TMessage>(TMessage message, Func<TMessage, CancellationToken, Task> executeEvent)
    {
        try
        {
            await Task.Run(() => executeEvent(message, _cancellationToken), _cancellationToken)
                .ConfigureAwait(false);
        }
        catch
        {
            //ignore, log or something with the exception.
        }
    }

    void ITouchPortalEventHandler.OnActionEvent(ActionEvent message)
        => ExecuteEvent(message, _asyncEventHandler.OnActionEvent);

    void ITouchPortalEventHandler.OnBroadcastEvent(BroadcastEvent message)
        => ExecuteEvent(message, _asyncEventHandler.OnBroadcastEvent);

    void ITouchPortalEventHandler.OnClosedEvent(string message)
        => ExecuteEvent(message, _asyncEventHandler.OnClosedEvent);

    void ITouchPortalEventHandler.OnInfoEvent(InfoEvent message)
        => ExecuteEvent(message, _asyncEventHandler.OnInfoEvent);

    void ITouchPortalEventHandler.OnListChangedEvent(ListChangeEvent message)
        => ExecuteEvent(message, _asyncEventHandler.OnListChangedEvent);

    void ITouchPortalEventHandler.OnSettingsEvent(SettingsEvent message)
        => ExecuteEvent(message, _asyncEventHandler.OnSettingsEvent);

    void ITouchPortalEventHandler.OnUnhandledEvent(string jsonMessage)
        => ExecuteEvent(jsonMessage, _asyncEventHandler.OnUnhandledEvent);
}

If you want throttling you can look into SemaphoreSlim or similar alternatives, and get full control over the execution of your actions.

public class TouchPortalAsyncEventHandler : ITouchPortalEventHandler
{
    private readonly SemaphoreSlim _throttling;
    private readonly ITouchPortalAsyncEventHandler _asyncEventHandler;
    private readonly CancellationToken _cancellationToken;

    string ITouchPortalEventHandler.PluginId => _asyncEventHandler.PluginId;

    public TouchPortalAsyncEventHandler(ITouchPortalAsyncEventHandler asyncEventHandler,
                                        CancellationToken cancellationToken = default)
    {
        //Pick a degree of parallelism number, or pass as a parameter, for all actions.
        //Use 1 if you only want one action to run at each time, and then not lock the listener thread as a result.
        var parallelism = 5;
        _throttling = new SemaphoreSlim(parallelism, parallelism);

        _asyncEventHandler = asyncEventHandler
            ?? throw new ArgumentNullException(nameof(asyncEventHandler));

        _cancellationToken = cancellationToken;
    }

    async void ExecuteEvent<TMessage>(TMessage message, Func<TMessage, CancellationToken, Task> executeEvent)
    {
        try
        {
            //Can be done in SamplePlugin's methods to adjust per TouchPortal action
            await _throttling.WaitAsync(_cancellationToken);

            await Task.Run(() => executeEvent(message, _cancellationToken), _cancellationToken)
                .ConfigureAwait(false);
        }
        catch
        {
            //ignore, log or something with the exception.
        }
        finally
        {
            //Important to release in a finally block, to avoid deadlocks
            _throttling.Release(1);
        }
    }
    ...
}
Clone this wiki locally