Skip to content

Commit

Permalink
Merge pull request #6 from bridgefield/cleanup
Browse files Browse the repository at this point in the history
cleanup for first release
  • Loading branch information
apfohl authored Jan 13, 2022
2 parents 84f76ba + f087fef commit 2032bd4
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 79 deletions.
14 changes: 7 additions & 7 deletions FoundationalBits.Spec/Messaging/Specs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public static class messages_are_dispatched
[Test]
public static async Task to_an_interested_subscriber()
{
var cut = new AgentBasedMessageBus();
var cut = MessageBus.Create();
var monitor = new HandleMonitor<object>();
cut.Subscribe(monitor);
await cut.Publish(new object());
Expand All @@ -27,7 +27,7 @@ public static class a_subscriber_error
[TestCaseSource(nameof(ErrorSubscribers))]
public static void is_propagated_to_sender(object subscriber)
{
var cut = new AgentBasedMessageBus();
var cut = MessageBus.Create();
cut.Subscribe(subscriber);

Assert.CatchAsync<DispatchFailed>(() => cut.Publish(new object()))
Expand All @@ -38,7 +38,7 @@ public static void is_propagated_to_sender(object subscriber)
public static void does_not_prevent_dispatch_to_other_subscribers(
object subscriber)
{
var cut = new AgentBasedMessageBus();
var cut = MessageBus.Create();
cut.Subscribe(subscriber);
var monitor = new HandleMonitor<object>();
cut.Subscribe(monitor);
Expand All @@ -64,7 +64,7 @@ public static void are_collected_and_propagated_to_sender()
new ThrowingHandler<object>(),
new AsyncThrowingHandler<object>()
};
var cut = new AgentBasedMessageBus();
var cut = MessageBus.Create();
errorSubscribers.ForEach(s => cut.Subscribe(s));
Assert.CatchAsync<DispatchFailed>(() => cut.Publish(new object()))
.Should().Match<DispatchFailed>(e => HasMultipleDispatchErrors(e));
Expand All @@ -81,7 +81,7 @@ public static class messages_are_not_dispatched
[Test]
public static async Task to_garbage_collected_subscribers()
{
var cut = new AgentBasedMessageBus();
var cut = MessageBus.Create();
var monitor = new HandleMonitor<object>();
await cut.SubscribeWeak(new TestHandler<object>(monitor)).WaitForCollection();
await cut.Publish(new object());
Expand All @@ -91,7 +91,7 @@ public static async Task to_garbage_collected_subscribers()
[Test]
public static async Task to_unsubscribed_subscribers()
{
var cut = new AgentBasedMessageBus();
var cut = MessageBus.Create();
var monitor = new HandleMonitor<object>();
cut.SubscribeUnsubscribe(new TestHandler<object>(monitor));
await cut.Publish(new object());
Expand All @@ -101,7 +101,7 @@ public static async Task to_unsubscribed_subscribers()
[Test]
public static async Task to_uninterested_subscribers()
{
var cut = new AgentBasedMessageBus();
var cut = MessageBus.Create();
var monitor = new HandleMonitor<int>();
var subscriber = new TestHandler<int>(monitor);
cut.Subscribe(subscriber);
Expand Down
2 changes: 2 additions & 0 deletions FoundationalBits.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/UserDictionary/Words/=bridgefield/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
18 changes: 18 additions & 0 deletions FoundationalBits/Agent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Threading.Tasks;
using bridgefield.FoundationalBits.Agents;

namespace bridgefield.FoundationalBits
{
public static class Agent
{
public static IAgent<TCommand, TReply> Start<TState, TCommand, TReply>(
TState initialState,
Func<TState, TCommand, Task<(TState newState, TReply reply)>> update)
=> new StatefulAgent<TState, TCommand, TReply>(initialState, update);

public static IAgent<TCommand, TReply> Start<TCommand, TReply>(
Func<TCommand, Task<TReply>> update)
=> new StatelessAgent<TCommand, TReply>(update);
}
}
25 changes: 10 additions & 15 deletions FoundationalBits/Agents/StatefulAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,30 @@

namespace bridgefield.FoundationalBits.Agents
{
public sealed class StatefulAgent<TState, TCommand, TReply> : IAgent<TCommand, TReply>
internal sealed class StatefulAgent<TState, TCommand, TReply> : IAgent<TCommand, TReply>
{
private readonly ActionBlock<(TCommand command, TaskCompletionSource<TReply> task)> actions;
private readonly ActionBlock<(TCommand command, TaskCompletionSource<TReply> task)> actionBlock;

public StatefulAgent(TState initialState,
Func<TState, TCommand, Task<(TState newState, TReply reply)>> processor)
{
var state = initialState;

actions = new ActionBlock<(TCommand command, TaskCompletionSource<TReply> task)>(
actionBlock = new(
data => processor(state, data.command)
.ContinueWith(task =>
{
if (task.IsFaulted)
.HandleResult(
r =>
{
data.task.SetException(task.Exception);
}
else
{
state = task.Result.newState;
data.task.SetResult(task.Result.reply);
}
}));
state = r.newState;
data.task.SetResult(r.reply);
},
data.task.SetException));
}

public Task<TReply> Tell(TCommand command)
{
var completionSource = new TaskCompletionSource<TReply>(TaskCreationOptions.RunContinuationsAsynchronously);
actions.Post((command, completionSource));
actionBlock.Post((command, completionSource));
return completionSource.Task;
}
}
Expand Down
22 changes: 7 additions & 15 deletions FoundationalBits/Agents/StatelessAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,22 @@

namespace bridgefield.FoundationalBits.Agents
{
public sealed class StatelessAgent<TCommand, TReply> : IAgent<TCommand, TReply>
internal sealed class StatelessAgent<TCommand, TReply> : IAgent<TCommand, TReply>
{
private readonly ActionBlock<(TCommand command, TaskCompletionSource<TReply> task)> actions;
private readonly ActionBlock<(TCommand command, TaskCompletionSource<TReply> task)> actionBlock;

public StatelessAgent(Func<TCommand, Task<TReply>> processor) =>
actions = new ActionBlock<(TCommand command, TaskCompletionSource<TReply> task)>(
actionBlock = new(
data => processor(data.command)
.ContinueWith(task =>
{
if (task.IsFaulted)
{
data.task.SetException(task.Exception);
}
else
{
data.task.SetResult(task.Result);
}
})
.HandleResult(
data.task.SetResult,
data.task.SetException)
);

public Task<TReply> Tell(TCommand command)
{
var completionSource = new TaskCompletionSource<TReply>(TaskCreationOptions.RunContinuationsAsynchronously);
actions.Post((command, completionSource));
actionBlock.Post((command, completionSource));
return completionSource.Task;
}
}
Expand Down
24 changes: 24 additions & 0 deletions FoundationalBits/Agents/Tasks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Threading.Tasks;

namespace bridgefield.FoundationalBits.Agents
{
internal static class Tasks
{
public static Task HandleResult<T>(
this Task<T> task,
Action<T> onSuccess,
Action<Exception> onError) =>
task.ContinueWith(t =>
{
if (t.IsFaulted)
{
onError(task.Exception);
}
else
{
onSuccess(task.Result);
}
});
}
}
4 changes: 2 additions & 2 deletions FoundationalBits/FoundationalBits.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="bridgefield.MonadicBits" Version="0.4.0"/>
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="6.0.0"/>
<PackageReference Include="bridgefield.MonadicBits" Version="0.4.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="6.0.0" />
</ItemGroup>
</Project>
10 changes: 0 additions & 10 deletions FoundationalBits/IAgent.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
using System;
using System.Threading.Tasks;
using bridgefield.FoundationalBits.Agents;

namespace bridgefield.FoundationalBits
{
// ReSharper disable once TypeParameterCanBeVariant
public interface IAgent<TCommand, TReply>
{
Task<TReply> Tell(TCommand command);

static IAgent<TCommand, TReply> Start<TState>(
TState initialState,
Func<TState, TCommand, Task<(TState newState, TReply reply)>> update)
=> new StatefulAgent<TState, TCommand, TReply>(initialState, update);

static IAgent<TCommand, TReply> Start(Func<TCommand, Task<TReply>> update)
=> new StatelessAgent<TCommand, TReply>(update);
}
}
9 changes: 9 additions & 0 deletions FoundationalBits/MessageBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using bridgefield.FoundationalBits.Messaging;

namespace bridgefield.FoundationalBits
{
public static class MessageBus
{
public static IMessageBus Create() => new AgentBasedMessageBus();
}
}
7 changes: 4 additions & 3 deletions FoundationalBits/Messaging/AgentBasedMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@

namespace bridgefield.FoundationalBits.Messaging
{
public sealed class AgentBasedMessageBus : IMessageBus
internal sealed class AgentBasedMessageBus : IMessageBus
{
private sealed record State(ImmutableList<Subscription> Subscriptions);

private readonly IAgent<SubscriptionCommand, IEnumerable<Subscription>> agent;

public AgentBasedMessageBus() =>
agent = IAgent<SubscriptionCommand, IEnumerable<Subscription>>.Start(
agent = Agent.Start<State, SubscriptionCommand, IEnumerable<Subscription>>(
new State(ImmutableList<Subscription>.Create()),
(state, command) => command.Execute(state));
(state, command) => command.Execute(state)
);

public void Subscribe(object subscriber) =>
Subscribe(subscriber, SubscriptionLifecycle.GarbageCollected);
Expand Down
32 changes: 7 additions & 25 deletions FoundationalBits/Messaging/ImmutableList.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using System.Collections;
using System.Collections.Generic;

Expand All @@ -17,23 +16,12 @@ namespace bridgefield.FoundationalBits.Messaging
public ImmutableList<T> Remove(T item) => bucket.Remove(item).ToList();
public static ImmutableList<T> Create() => new EmptyBucket().ToList();

public TResult Match<TResult>(
Func<TResult> empty,
Func<T, ImmutableList<T>, TResult> headAndTail) =>
bucket.Match(
empty,
(h, t) => headAndTail(h, t.ToList()));

private interface IBucket
{
IEnumerable<T> AsEnumerable();
ImmutableList<T> ToList();
IBucket Add(T item) => new TailedBucket(item, this);
IBucket Remove(T item);

TResult Match<TResult>(
Func<TResult> empty,
Func<T, IBucket, TResult> headAndTail);
}

private sealed record EmptyBucket : IBucket
Expand All @@ -45,46 +33,40 @@ public IEnumerable<T> AsEnumerable()

public ImmutableList<T> ToList() => new(this);
public IBucket Remove(T item) => this;

public TResult Match<TResult>(Func<TResult> empty, Func<T, IBucket, TResult> headAndTail) =>
empty();
}

private sealed record TailedBucket(T Head, IBucket Tail) : IBucket
{
public IEnumerable<T> AsEnumerable() =>
new TailedBucketEnumerable(this);
new ValueEnumerable(this);

public ImmutableList<T> ToList() => new(this);

public IBucket Remove(T item) =>
Equals(item, Head)
? Tail.Remove(item)
: new TailedBucket(Head, Tail.Remove(item));

public TResult Match<TResult>(
Func<TResult> empty,
Func<T, IBucket, TResult> headAndTail) => headAndTail(Head, Tail);
}

private sealed class TailedBucketEnumerable : IEnumerable<T>
private sealed class ValueEnumerable : IEnumerable<T>
{
private readonly TailedBucket bucket;

public TailedBucketEnumerable(TailedBucket bucket) => this.bucket = bucket;
public ValueEnumerable(TailedBucket bucket) => this.bucket = bucket;

public IEnumerator<T> GetEnumerator() => new TailedBucketEnumerator(bucket);
public IEnumerator<T> GetEnumerator() => new ValueEnumerator(bucket);

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

private sealed class TailedBucketEnumerator : IEnumerator<T>
private sealed class ValueEnumerator : IEnumerator<T>
{
private TailedBucket currentBucket;
private bool started;
private bool hasMore = true;

public TailedBucketEnumerator(TailedBucket currentBucket) => this.currentBucket = currentBucket;
public ValueEnumerator(TailedBucket currentBucket) =>
this.currentBucket = currentBucket;

public bool MoveNext()
{
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Useful bits for foundational groundwork in C# applications.

...

var messageBus = new AgentBasedMessageBus()
var messageBus = MessageBus.Create()
messageBus.Subscribe(
new Receiver(),
SubscriptionLifecycle.ExplicitUnsubscribe);
Expand All @@ -35,7 +35,7 @@ Useful bits for foundational groundwork in C# applications.
Decrement,
Current,
}
var counter = IAgent<CounterCommand,int>.Start<int>(
var counter = Agent.Start<CounterCommand,int,int>(
0,
(current,command) => command switch{
CounterCommand.Increment => (current+1, current+1),
Expand Down

0 comments on commit 2032bd4

Please sign in to comment.