Skip to content

Commit

Permalink
StreamT lifting streams of M
Browse files Browse the repository at this point in the history
  • Loading branch information
louthy committed Sep 16, 2024
1 parent b249a87 commit 9413846
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 4 deletions.
2 changes: 1 addition & 1 deletion LanguageExt.Core/Effects/StreamT/DSL/StreamT.DSL.Main.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ from ml in runListT
MNil<A> =>
Empty.runListT,

MCons<M, A>(var h, var t) =>
MCons<M, A>(_, var t) =>
new StreamMainT<M, A>(t).runListT,

MIter<M, A> iter =>
Expand Down
27 changes: 27 additions & 0 deletions LanguageExt.Core/Effects/StreamT/StreamT.Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ public static StreamT<M, A> lift<M, A>(IAsyncEnumerable<A> items)
where M : Monad<M> =>
StreamT<M, A>.Lift(items);

/// <summary>
/// Lift an async-enumerable into the stream
/// </summary>
/// <param name="stream">Sequence to lift</param>
/// <returns>Stream transformer</returns>
public static StreamT<M, A> liftM<M, A>(IAsyncEnumerable<K<M, A>> items)
where M : Monad<M> =>
StreamT<M, A>.LiftM(items);

/// <summary>
/// Lift an enumerable into the stream
/// </summary>
Expand All @@ -46,6 +55,15 @@ public static StreamT<M, A> lift<M, A>(IEnumerable<A> items)
where M : Monad<M> =>
StreamT<M, A>.Lift(items);

/// <summary>
/// Lift an enumerable into the stream
/// </summary>
/// <param name="stream">Sequence to lift</param>
/// <returns>Stream transformer</returns>
public static StreamT<M, A> liftM<M, A>(IEnumerable<K<M, A>> items)
where M : Monad<M> =>
StreamT<M, A>.LiftM(items);

/// <summary>
/// Lift a (possibly lazy) sequence into the stream
/// </summary>
Expand All @@ -55,6 +73,15 @@ public static StreamT<M, A> lift<M, A>(Seq<A> items)
where M : Monad<M> =>
StreamT<M, A>.Lift(items);

/// <summary>
/// Lift a (possibly lazy) sequence into the stream
/// </summary>
/// <param name="list">Sequence to lift</param>
/// <returns>Stream transformer</returns>
public static StreamT<M, A> liftM<M, A>(Seq<K<M, A>> items)
where M : Monad<M> =>
StreamT<M, A>.LiftM(items);

/// <summary>
/// Lift an effect into the stream
/// </summary>
Expand Down
39 changes: 36 additions & 3 deletions LanguageExt.Core/Effects/StreamT/StreamT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ public static StreamT<M, A> LiftF<F>(K<F, A> foldable)
public static StreamT<M, A> Lift(IAsyncEnumerable<A> stream) =>
new StreamAsyncEnumerableT<M, A>(stream);

/// <summary>
/// Lift an async-enumerable into the stream
/// </summary>
/// <param name="stream">Sequence to lift</param>
/// <returns>Stream transformer</returns>
public static StreamT<M, A> LiftM(IAsyncEnumerable<K<M, A>> stream) =>
(from ma in StreamT<M, K<M, A>>.Lift(stream)
from a in Lift(ma)
where true
select a).As();

/// <summary>
/// Lift an enumerable into the stream
/// </summary>
Expand All @@ -78,18 +89,40 @@ public static StreamT<M, A> Lift(IEnumerable<A> stream) =>
StreamT.pure<M, Unit>(default) // HACK: forces re-evaluation of the enumerable
.Bind(_ => new StreamEnumerableT<M, A>(stream));

/// <summary>
/// Lift an enumerable into the stream
/// </summary>
/// <param name="stream">Sequence to lift</param>
/// <returns>Stream transformer</returns>
public static StreamT<M, A> LiftM(IEnumerable<K<M, A>> stream) =>
(from ma in StreamT<M, K<M, A>>.Lift(stream)
from a in Lift(ma)
where true
select a).As();

/// <summary>
/// Lift a (possibly lazy) sequence into the stream
/// </summary>
/// <param name="list">Sequence to lift</param>
/// <param name="stream">Sequence to lift</param>
/// <returns>Stream transformer</returns>
public static StreamT<M, A> Lift(Seq<A> list) =>
list switch
public static StreamT<M, A> Lift(Seq<A> stream) =>
stream switch
{
[] => Empty,
var (x, xs) => new StreamMainT<M, A>(M.Pure<MList<A>>(new MCons<M, A>(x, Lift(xs).runListT)))
};

/// <summary>
/// Lift a (possibly lazy) sequence into the stream
/// </summary>
/// <param name="stream">Sequence to lift</param>
/// <returns>Stream transformer</returns>
public static StreamT<M, A> LiftM(Seq<K<M, A>> stream) =>
(from ma in StreamT<M, K<M, A>>.Lift(stream)
from a in Lift(ma)
where true
select a).As();

/// <summary>
/// Lift an effect into the stream
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions Samples/Streams/Menu.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ from ky in readKey >> green
ConsoleKey.D6 => Folding.run,
ConsoleKey.D7 => Merging.run,
ConsoleKey.D8 => Zipping.run,
ConsoleKey.D9 => OptionalItems.run,
ConsoleKey.X => RecursionIO.run,
_ => unitIO
}
Expand All @@ -36,5 +37,6 @@ from _1 in run
writeLine("6. Folding") >>
writeLine("7. Merging") >>
writeLine("8. Zipping") >>
writeLine("9. Optional items") >>
writeLine("Enter a number for the example you wish to run");
}
34 changes: 34 additions & 0 deletions Samples/Streams/OptionalItems.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using LanguageExt;
using static LanguageExt.Prelude;

namespace Streams;

public static class OptionalItems
{
public static IO<Unit> run =>
from _1 in example(100).Iter().Run()
from _2 in Console.writeLine("done")
select unit;

static StreamT<OptionT<IO>, Unit> example(int n) =>
from x in StreamT.liftM(getOptionsAsync(n))
from _ in Console.write($"{x} ")
where true
select unit;

static bool isAllowed(int x) =>
x != 20;

static async IAsyncEnumerable<OptionT<IO ,int>> getOptionsAsync(int n)
{
foreach (var x in Range(1, n))
{
var option = isAllowed(x)
? OptionT.lift(IO.pure(x))
: OptionT<IO, int>.None;

var r = await Task.FromResult(option);
yield return r;
}
}
}

0 comments on commit 9413846

Please sign in to comment.