Skip to content

Commit

Permalink
Merge pull request #9 from pfpack/feature/async-pipeline-options
Browse files Browse the repository at this point in the history
Feature/async pipeline options
  • Loading branch information
pmosk authored Aug 24, 2023
2 parents fc3b41f + 5b3da07 commit 313e039
Show file tree
Hide file tree
Showing 25 changed files with 182 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ namespace System;
internal AsyncPipeline(AsyncPipeline<Result<TSuccess, TFailure>> pipeline)
=>
this.pipeline = pipeline;

public AsyncPipelineOptions Options
=>
pipeline.Options;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Diagnostics.CodeAnalysis;

namespace System;

partial struct AsyncPipeline<TSuccess, TFailure>
{
public AsyncPipeline<TSuccess, TFailure> Configure([AllowNull] AsyncPipelineOptions options)
=>
new(
pipeline.Configure(options));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Threading;
using System.Threading.Tasks;

namespace System
{
partial struct AsyncPipeline<TSuccess, TFailure>
{
public AsyncPipeline<TResultSuccess, TResultFailure> Fold<TResultSuccess, TResultFailure>(
Func<TSuccess, CancellationToken, Task<Result<TResultSuccess, TResultFailure>>> mapSuccessAsync,
Func<TFailure, CancellationToken, Task<Result<TResultSuccess, TResultFailure>>> mapFailureAsync)
where TResultFailure : struct
=>
InnerFold(
mapSuccessAsync ?? throw new ArgumentNullException(nameof(mapSuccessAsync)),
mapFailureAsync ?? throw new ArgumentNullException(nameof(mapFailureAsync)));

private AsyncPipeline<TResultSuccess, TResultFailure> InnerFold<TResultSuccess, TResultFailure>(
Func<TSuccess, CancellationToken, Task<Result<TResultSuccess, TResultFailure>>> mapSuccessAsync,
Func<TFailure, CancellationToken, Task<Result<TResultSuccess, TResultFailure>>> mapFailureAsync)
where TResultFailure : struct
=>
new(
pipeline.InternalPipe(
(r, t) => r.FoldAsync(
s => mapSuccessAsync.Invoke(s, t),
f => mapFailureAsync.Invoke(f, t))));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace System
{
partial struct AsyncPipeline<TSuccess, TFailure>
{
public AsyncPipeline<TResultSuccess, TResultFailure> Fold<TResultSuccess, TResultFailure>(
Func<TSuccess, Result<TResultSuccess, TResultFailure>> mapSuccess,
Func<TFailure, Result<TResultSuccess, TResultFailure>> mapFailure)
where TResultFailure : struct
=>
InnerFold(
mapSuccess ?? throw new ArgumentNullException(nameof(mapSuccess)),
mapFailure ?? throw new ArgumentNullException(nameof(mapFailure)));

private AsyncPipeline<TResultSuccess, TResultFailure> InnerFold<TResultSuccess, TResultFailure>(
Func<TSuccess, Result<TResultSuccess, TResultFailure>> mapSuccess,
Func<TFailure, Result<TResultSuccess, TResultFailure>> mapFailure)
where TResultFailure : struct
=>
new(
pipeline.InternalPipe(
r => r.Fold(mapSuccess, mapFailure)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Threading;
using System.Threading.Tasks;

namespace System
{
partial struct AsyncPipeline<TSuccess, TFailure>
{
public AsyncPipeline<TResultSuccess, TResultFailure> FoldValue<TResultSuccess, TResultFailure>(
Func<TSuccess, CancellationToken, ValueTask<Result<TResultSuccess, TResultFailure>>> mapSuccessAsync,
Func<TFailure, CancellationToken, ValueTask<Result<TResultSuccess, TResultFailure>>> mapFailureAsync)
where TResultFailure : struct
=>
InnerFoldValue(
mapSuccessAsync ?? throw new ArgumentNullException(nameof(mapSuccessAsync)),
mapFailureAsync ?? throw new ArgumentNullException(nameof(mapFailureAsync)));

private AsyncPipeline<TResultSuccess, TResultFailure> InnerFoldValue<TResultSuccess, TResultFailure>(
Func<TSuccess, CancellationToken, ValueTask<Result<TResultSuccess, TResultFailure>>> mapSuccessAsync,
Func<TFailure, CancellationToken, ValueTask<Result<TResultSuccess, TResultFailure>>> mapFailureAsync)
where TResultFailure : struct
=>
new(
pipeline.InternalPipeValue(
(r, t) => r.FoldValueAsync(
s => mapSuccessAsync.Invoke(s, t),
f => mapFailureAsync.Invoke(f, t))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ public AsyncPipeline<TResult> Fold<TResult>(
Func<TSuccess, CancellationToken, Task<TResult>> mapSuccessAsync,
Func<TFailure, CancellationToken, Task<TResult>> mapFailureAsync)
=>
InnerFilter(
InnerFold(
mapSuccessAsync ?? throw new ArgumentNullException(nameof(mapSuccessAsync)),
mapFailureAsync ?? throw new ArgumentNullException(nameof(mapFailureAsync)));

private AsyncPipeline<TResult> InnerFilter<TResult>(
private AsyncPipeline<TResult> InnerFold<TResult>(
Func<TSuccess, CancellationToken, Task<TResult>> mapSuccessAsync,
Func<TFailure, CancellationToken, Task<TResult>> mapFailureAsync)
=>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ public AsyncPipeline<TResult> FoldValue<TResult>(
Func<TSuccess, CancellationToken, ValueTask<TResult>> mapSuccessAsync,
Func<TFailure, CancellationToken, ValueTask<TResult>> mapFailureAsync)
=>
InnerFilterValue(
InnerFoldValue(
mapSuccessAsync ?? throw new ArgumentNullException(nameof(mapSuccessAsync)),
mapFailureAsync ?? throw new ArgumentNullException(nameof(mapFailureAsync)));

private AsyncPipeline<TResult> InnerFilterValue<TResult>(
private AsyncPipeline<TResult> InnerFoldValue<TResult>(
Func<TSuccess, CancellationToken, ValueTask<TResult>> mapSuccessAsync,
Func<TFailure, CancellationToken, ValueTask<TResult>> mapFailureAsync)
=>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,33 @@ private AsyncPipeline<TNextSuccess, TFailure> InnerForward<TNextSuccess, TNextFa
Func<TSuccess, CancellationToken, Task<Result<TNextSuccess, TNextFailure>>> nextAsync,
Func<TNextFailure, TFailure> mapFailure)
where TNextFailure : struct
=>
InnerPipeValue(
{
var continueOnCapturedContext = pipeline.Options.ContinueOnCapturedContext;

return InnerPipeValue(
(r, t) => r.ForwardValueAsync(
async s =>
{
var next = await nextAsync.Invoke(s, t).ConfigureAwait(false);
var next = await nextAsync.Invoke(s, t).ConfigureAwait(continueOnCapturedContext);
return next.MapFailure(mapFailure);
}));
}

private AsyncPipeline<TNextSuccess, TFailure> InnerForward<TNextSuccess, TNextFailure>(
Func<TSuccess, CancellationToken, Task<Result<TNextSuccess, TNextFailure>>> nextAsync,
Func<TNextFailure, CancellationToken, Task<TFailure>> mapFailureAsync)
where TNextFailure : struct
=>
InnerPipeValue(
{
var continueOnCapturedContext = pipeline.Options.ContinueOnCapturedContext;

return InnerPipeValue(
(r, t) => r.ForwardValueAsync(
async s =>
{
var next = await nextAsync.Invoke(s, t).ConfigureAwait(false);
return await next.MapFailureAsync(f => mapFailureAsync.Invoke(f, t)).ConfigureAwait(false);
return await next.MapFailureAsync(f => mapFailureAsync.Invoke(f, t)).ConfigureAwait(continueOnCapturedContext);
}));
}

private AsyncPipeline<TNextSuccess, TNextFailure> InnerForward<TNextSuccess, TNextFailure>(
Func<TSuccess, CancellationToken, Task<Result<TNextSuccess, TNextFailure>>> nextAsync,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,33 @@ private AsyncPipeline<TNextSuccess, TFailure> InnerForwardValue<TNextSuccess, TN
Func<TSuccess, CancellationToken, ValueTask<Result<TNextSuccess, TNextFailure>>> nextAsync,
Func<TNextFailure, TFailure> mapFailure)
where TNextFailure : struct
=>
InnerPipeValue(
{
var continueOnCapturedContext = pipeline.Options.ContinueOnCapturedContext;

return InnerPipeValue(
(r, t) => r.ForwardValueAsync(
async s =>
{
var next = await nextAsync.Invoke(s, t).ConfigureAwait(false);
var next = await nextAsync.Invoke(s, t).ConfigureAwait(continueOnCapturedContext);
return next.MapFailure(mapFailure);
}));
}

private AsyncPipeline<TNextSuccess, TFailure> InnerForwardValue<TNextSuccess, TNextFailure>(
Func<TSuccess, CancellationToken, ValueTask<Result<TNextSuccess, TNextFailure>>> nextAsync,
Func<TNextFailure, CancellationToken, ValueTask<TFailure>> mapFailureAsync)
where TNextFailure : struct
=>
InnerPipeValue(
{
var continueOnCapturedContext = pipeline.Options.ContinueOnCapturedContext;

return InnerPipeValue(
(r, t) => r.ForwardValueAsync(
async s =>
{
var next = await nextAsync.Invoke(s, t).ConfigureAwait(false);
return await next.MapFailureValueAsync(f => mapFailureAsync.Invoke(f, t)).ConfigureAwait(false);
var next = await nextAsync.Invoke(s, t).ConfigureAwait(continueOnCapturedContext);
return await next.MapFailureValueAsync(f => mapFailureAsync.Invoke(f, t)).ConfigureAwait(continueOnCapturedContext);
}));
}

private AsyncPipeline<TNextSuccess, TNextFailure> InnerForwardValue<TNextSuccess, TNextFailure>(
Func<TSuccess, CancellationToken, ValueTask<Result<TNextSuccess, TNextFailure>>> nextAsync,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ namespace System;

private readonly ValueTask<T> valueTask;

private readonly AsyncPipelineOptions? options;

private readonly CancellationToken cancellationToken;

// Creates a non-stopped pipeline
internal AsyncPipeline(ValueTask<T> valueTask, CancellationToken cancellationToken)
internal AsyncPipeline(ValueTask<T> valueTask, AsyncPipelineOptions? options, CancellationToken cancellationToken)
{
isStopped = false;
this.valueTask = valueTask;
this.options = options;
this.cancellationToken = cancellationToken;
}

Expand All @@ -29,5 +32,15 @@ private AsyncPipeline(int _)
isStopped = true;
valueTask = default;
cancellationToken = default;
options = null;
}

public AsyncPipelineOptions Options
=>
options ?? InnerEmptyOptions.Value;

private static class InnerEmptyOptions
{
internal static readonly AsyncPipelineOptions Value = new();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Diagnostics.CodeAnalysis;

namespace System;

partial struct AsyncPipeline<T>
{
public AsyncPipeline<T> Configure([AllowNull] AsyncPipelineOptions options)
=>
isStopped is false
? new(valueTask, options, cancellationToken)
: new(default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ private static EqualityComparer<ValueTask<T>> ValueTaskComparer
=>
EqualityComparer<ValueTask<T>>.Default;

private static EqualityComparer<AsyncPipelineOptions> OptionsComparer
=>
EqualityComparer<AsyncPipelineOptions>.Default;

private static EqualityComparer<CancellationToken> CancellationTokenComparer
=>
EqualityComparer<CancellationToken>.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public bool Equals(AsyncPipeline<T> other)
{
return
ValueTaskComparer.Equals(valueTask, other.valueTask) &&
OptionsComparer.Equals(Options, other.Options) &&
CancellationTokenComparer.Equals(cancellationToken, other.cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ private int NonStoppedHashCode()
HashCode.Combine(
EqualityContractHashCode(),
ValueTaskComparer.GetHashCode(valueTask),
OptionsComparer.GetHashCode(Options),
CancellationTokenComparer.GetHashCode(cancellationToken));

private static int StoppedHashCode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ public AsyncPipeline<TResult> Pipe<TResult>(Func<T, CancellationToken, Task<TRes
internal AsyncPipeline<TResult> InternalPipe<TResult>(Func<T, CancellationToken, Task<TResult>> pipeAsync)
=>
isStopped is false
? new(InnerInvokeAsync(pipeAsync), cancellationToken)
? new(InnerInvokeAsync(pipeAsync), options, cancellationToken)
: new(default);

private async ValueTask<TResult> InnerInvokeAsync<TResult>(Func<T, CancellationToken, Task<TResult>> pipeAsync)
{
var result = await valueTask.ConfigureAwait(false);
return await pipeAsync.Invoke(result, cancellationToken).ConfigureAwait(false);
var result = await valueTask.ConfigureAwait(Options.ContinueOnCapturedContext);
return await pipeAsync.Invoke(result, cancellationToken).ConfigureAwait(Options.ContinueOnCapturedContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ public AsyncPipeline<TResult> Pipe<TResult>(Func<T, Task<TResult>> pipeAsync)
internal AsyncPipeline<TResult> InternalPipe<TResult>(Func<T, Task<TResult>> pipeAsync)
=>
isStopped is false
? new(InnerInvokeAsync(pipeAsync), cancellationToken)
? new(InnerInvokeAsync(pipeAsync), options, cancellationToken)
: new(default);

private async ValueTask<TResult> InnerInvokeAsync<TResult>(Func<T, Task<TResult>> pipeAsync)
{
var result = await valueTask.ConfigureAwait(false);
return await pipeAsync.Invoke(result).ConfigureAwait(false);
var result = await valueTask.ConfigureAwait(Options.ContinueOnCapturedContext);
return await pipeAsync.Invoke(result).ConfigureAwait(Options.ContinueOnCapturedContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ public AsyncPipeline<TResult> Pipe<TResult>(Func<T, TResult> pipe)
internal AsyncPipeline<TResult> InternalPipe<TResult>(Func<T, TResult> pipe)
=>
isStopped is false
? new(InnerInvokeAsync(pipe), cancellationToken)
? new(InnerInvokeAsync(pipe), options, cancellationToken)
: new(default);

private async ValueTask<TResult> InnerInvokeAsync<TResult>(Func<T, TResult> pipe)
{
var result = await valueTask.ConfigureAwait(false);
var result = await valueTask.ConfigureAwait(Options.ContinueOnCapturedContext);
return pipe.Invoke(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ public AsyncPipeline<TResult> PipeValue<TResult>(Func<T, CancellationToken, Valu
internal AsyncPipeline<TResult> InternalPipeValue<TResult>(Func<T, CancellationToken, ValueTask<TResult>> pipeAsync)
=>
isStopped is false
? new(InnerInvokeValueAsync(pipeAsync), cancellationToken)
? new(InnerInvokeValueAsync(pipeAsync), options, cancellationToken)
: new(default);

private async ValueTask<TResult> InnerInvokeValueAsync<TResult>(Func<T, CancellationToken, ValueTask<TResult>> pipeAsync)
{
var result = await valueTask.ConfigureAwait(false);
return await pipeAsync.Invoke(result, cancellationToken).ConfigureAwait(false);
var result = await valueTask.ConfigureAwait(Options.ContinueOnCapturedContext);
return await pipeAsync.Invoke(result, cancellationToken).ConfigureAwait(Options.ContinueOnCapturedContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ public AsyncPipeline<TResult> PipeValue<TResult>(Func<T, ValueTask<TResult>> pip
internal AsyncPipeline<TResult> InternalPipeValue<TResult>(Func<T, ValueTask<TResult>> pipeAsync)
=>
isStopped is false
? new(InnerInvokeValueAsync(pipeAsync), cancellationToken)
? new(InnerInvokeValueAsync(pipeAsync), options, cancellationToken)
: new(default);

private async ValueTask<TResult> InnerInvokeValueAsync<TResult>(Func<T, ValueTask<TResult>> pipeAsync)
{
var result = await valueTask.ConfigureAwait(false);
return await pipeAsync.Invoke(result).ConfigureAwait(false);
var result = await valueTask.ConfigureAwait(Options.ContinueOnCapturedContext);
return await pipeAsync.Invoke(result).ConfigureAwait(Options.ContinueOnCapturedContext);
}
}
2 changes: 1 addition & 1 deletion src/core-asyncpipeline/AsyncPipeline/AsyncPipeline.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<Description>EarlyFuncPack Core.AsyncPipeline is a core library for .NET consisting of asynchronous pipeline.</Description>
<RootNamespace>System</RootNamespace>
<AssemblyName>EarlyFuncPack.Core.AsyncPipeline</AssemblyName>
<Version>0.2.0</Version>
<Version>0.3.0-preview.1</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 313e039

Please sign in to comment.