Skip to content

Commit

Permalink
fix possible deadlock from hard wait on dispatch result
Browse files Browse the repository at this point in the history
- replace WaitAll with WhenAll to be able to await the result
  • Loading branch information
Dirk-Peters authored and apfohl committed Apr 3, 2022
1 parent 2032bd4 commit af4e4b4
Showing 1 changed file with 29 additions and 6 deletions.
35 changes: 29 additions & 6 deletions FoundationalBits/Messaging/AgentBasedMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using MonadicBits;

namespace bridgefield.FoundationalBits.Messaging
{
using static Functional;

internal sealed class AgentBasedMessageBus : IMessageBus
{
private sealed record State(ImmutableList<Subscription> Subscriptions);
Expand Down Expand Up @@ -33,22 +36,42 @@ public async Task Publish(object argument)
{
try
{
Task.WaitAll(
var result = await Task.WhenAll(
(await agent.Tell(new SelectSubscriptions(argument.GetType())))
.Select(s => s
.Handler(argument.GetType())
.Match(
h => h.Post(argument),
() => agent.Tell(new RemoveSubscription(s))))
.Select(s => Dispatch(argument, s))
.ToArray()
);

var errors = result.SelectMany(error => error.ToEnumerable()).ToList();
if (errors.Any())
{
throw new AggregateException(errors);
}
}
catch (Exception exception)
{
throw DispatchFailed.Handle(exception);
}
}

private async Task<Maybe<DispatchFailed>> Dispatch(object argument, Subscription s)
{
try
{
await s
.Handler(argument.GetType())
.Match(
h => h.Post(argument),
() => agent.Tell(new RemoveSubscription(s)));
}
catch (DispatchFailed error)
{
return error;
}

return Nothing;
}

private abstract record SubscriptionCommand
{
public abstract Task<(State, IEnumerable<Subscription>)> Execute(State state);
Expand Down

0 comments on commit af4e4b4

Please sign in to comment.