Skip to content

Commit

Permalink
Merge pull request #66 from doomchild/partition
Browse files Browse the repository at this point in the history
Added TaskExtras.Partition
  • Loading branch information
doomchild authored Jul 25, 2024
2 parents da2f768 + b02d229 commit 74cffc6
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
33 changes: 33 additions & 0 deletions src/TaskExtras.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace RLC.TaskChaining;
Expand Down Expand Up @@ -187,6 +189,37 @@ private static Task<T> DoRetry<T>(
));
}

/// <summary>
/// Partitions a collection of <see cref="Task{T}"/> into faulted and fulfilled lists.
/// </summary>
/// <typeparam name="T">The underlying type of the tasks in <paramref name="tasks"/>.</typeparam>
/// <param name="tasks">The collection of tasks to partition.</param>
/// <returns>A tuple of the partitioned tasks.</returns>
public static Task<(IEnumerable<Task<T>> Faulted, IEnumerable<Task<T>> Fulfilled)> Partition<T>(
IEnumerable<Task<T>> tasks
)
{
return tasks.Aggregate<Task<T>, Task<(IEnumerable<Task<T>> Faulted, IEnumerable<Task<T>> Fulfilled)>>(
Task.FromResult(((IEnumerable<Task<T>>)new List<Task<T>>(), (IEnumerable<Task<T>>)new List<Task<T>>())),
ZipTasksWith<T, (IEnumerable<Task<T>> Faulted, IEnumerable<Task<T>> Fulfilled), (IEnumerable<Task<T>> Faulted, IEnumerable<Task<T>> Fulfilled), Exception>
(
(values, v) => (values.Faulted, values.Fulfilled.Append(Task.FromResult(v)).ToList()),
(values, v) => (values.Faulted.Append<Task<T>>(Task.FromException<T>(v)).ToList(), values.Fulfilled)
)
);
}

private static Func<Task<TAccum>, Task<TValue>, Task<TResult>> ZipTasksWith<TValue, TAccum, TResult, TException>(
Func<TAccum, TValue, TResult> f,
Func<TAccum, TException, TResult> g
) where TException : Exception
{
return (b, a) => a.Then(
valueA => b.Then(valueB => f(valueB, valueA)),
error => b.Then(valueB => g(valueB, (TException)error))
);
}

/// <summary>
/// A function that performs retries of the <paramref name="supplier"/> if it fails.
/// </summary>
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/TaskExtrasPartitionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using RLC.TaskChaining;
using Xunit;

namespace RLC.TaskChainingTests;

public class PartitionTests
{
[Fact]
public async Task ItShouldPartition()
{
int expectedFulfills = 4;
int expectedFaults = 1;
List<Task<string>> tasks = new()
{
Task.FromResult("abc"),
Task.FromResult("def"),
Task.FromException<string>(new InvalidOperationException()),
Task.FromResult("ghi"),
TaskExtras.Defer(() => Task.FromResult("jkl"), TimeSpan.FromSeconds(1))
};

(IEnumerable<Task<string>> Faulted, IEnumerable<Task<string>> Fulfilled) partition = await TaskExtras.Partition(tasks);

Assert.Equal(expectedFaults, partition.Faulted.Count());
Assert.Equal(expectedFulfills, partition.Fulfilled.Count());
}
}

0 comments on commit 74cffc6

Please sign in to comment.