diff --git a/LanguageExt.Core/Effects/StreamT/DSL/StreamT.DSL.Main.cs b/LanguageExt.Core/Effects/StreamT/DSL/StreamT.DSL.Main.cs index 7f1298b7b..16abcbe90 100644 --- a/LanguageExt.Core/Effects/StreamT/DSL/StreamT.DSL.Main.cs +++ b/LanguageExt.Core/Effects/StreamT/DSL/StreamT.DSL.Main.cs @@ -22,7 +22,7 @@ from ml in runListT MNil => Empty.runListT, - MCons(var h, var t) => + MCons(_, var t) => new StreamMainT(t).runListT, MIter iter => diff --git a/LanguageExt.Core/Effects/StreamT/StreamT.Module.cs b/LanguageExt.Core/Effects/StreamT/StreamT.Module.cs index 69e2e5334..b6fc139e8 100644 --- a/LanguageExt.Core/Effects/StreamT/StreamT.Module.cs +++ b/LanguageExt.Core/Effects/StreamT/StreamT.Module.cs @@ -37,6 +37,15 @@ public static StreamT lift(IAsyncEnumerable items) where M : Monad => StreamT.Lift(items); + /// + /// Lift an async-enumerable into the stream + /// + /// Sequence to lift + /// Stream transformer + public static StreamT liftM(IAsyncEnumerable> items) + where M : Monad => + StreamT.LiftM(items); + /// /// Lift an enumerable into the stream /// @@ -46,6 +55,15 @@ public static StreamT lift(IEnumerable items) where M : Monad => StreamT.Lift(items); + /// + /// Lift an enumerable into the stream + /// + /// Sequence to lift + /// Stream transformer + public static StreamT liftM(IEnumerable> items) + where M : Monad => + StreamT.LiftM(items); + /// /// Lift a (possibly lazy) sequence into the stream /// @@ -55,6 +73,15 @@ public static StreamT lift(Seq items) where M : Monad => StreamT.Lift(items); + /// + /// Lift a (possibly lazy) sequence into the stream + /// + /// Sequence to lift + /// Stream transformer + public static StreamT liftM(Seq> items) + where M : Monad => + StreamT.LiftM(items); + /// /// Lift an effect into the stream /// diff --git a/LanguageExt.Core/Effects/StreamT/StreamT.cs b/LanguageExt.Core/Effects/StreamT/StreamT.cs index 6b0852424..75cb7b784 100644 --- a/LanguageExt.Core/Effects/StreamT/StreamT.cs +++ b/LanguageExt.Core/Effects/StreamT/StreamT.cs @@ -69,6 +69,17 @@ public static StreamT LiftF(K foldable) public static StreamT Lift(IAsyncEnumerable stream) => new StreamAsyncEnumerableT(stream); + /// + /// Lift an async-enumerable into the stream + /// + /// Sequence to lift + /// Stream transformer + public static StreamT LiftM(IAsyncEnumerable> stream) => + (from ma in StreamT>.Lift(stream) + from a in Lift(ma) + where true + select a).As(); + /// /// Lift an enumerable into the stream /// @@ -78,18 +89,40 @@ public static StreamT Lift(IEnumerable stream) => StreamT.pure(default) // HACK: forces re-evaluation of the enumerable .Bind(_ => new StreamEnumerableT(stream)); + /// + /// Lift an enumerable into the stream + /// + /// Sequence to lift + /// Stream transformer + public static StreamT LiftM(IEnumerable> stream) => + (from ma in StreamT>.Lift(stream) + from a in Lift(ma) + where true + select a).As(); + /// /// Lift a (possibly lazy) sequence into the stream /// - /// Sequence to lift + /// Sequence to lift /// Stream transformer - public static StreamT Lift(Seq list) => - list switch + public static StreamT Lift(Seq stream) => + stream switch { [] => Empty, var (x, xs) => new StreamMainT(M.Pure>(new MCons(x, Lift(xs).runListT))) }; + /// + /// Lift a (possibly lazy) sequence into the stream + /// + /// Sequence to lift + /// Stream transformer + public static StreamT LiftM(Seq> stream) => + (from ma in StreamT>.Lift(stream) + from a in Lift(ma) + where true + select a).As(); + /// /// Lift an effect into the stream /// diff --git a/Samples/Streams/Menu.cs b/Samples/Streams/Menu.cs index 96f4f47dc..ea698803f 100644 --- a/Samples/Streams/Menu.cs +++ b/Samples/Streams/Menu.cs @@ -19,6 +19,7 @@ from ky in readKey >> green ConsoleKey.D6 => Folding.run, ConsoleKey.D7 => Merging.run, ConsoleKey.D8 => Zipping.run, + ConsoleKey.D9 => OptionalItems.run, ConsoleKey.X => RecursionIO.run, _ => unitIO } @@ -36,5 +37,6 @@ from _1 in run writeLine("6. Folding") >> writeLine("7. Merging") >> writeLine("8. Zipping") >> + writeLine("9. Optional items") >> writeLine("Enter a number for the example you wish to run"); } diff --git a/Samples/Streams/OptionalItems.cs b/Samples/Streams/OptionalItems.cs new file mode 100644 index 000000000..537e055e1 --- /dev/null +++ b/Samples/Streams/OptionalItems.cs @@ -0,0 +1,34 @@ +using LanguageExt; +using static LanguageExt.Prelude; + +namespace Streams; + +public static class OptionalItems +{ + public static IO run => + from _1 in example(100).Iter().Run() + from _2 in Console.writeLine("done") + select unit; + + static StreamT, Unit> example(int n) => + from x in StreamT.liftM(getOptionsAsync(n)) + from _ in Console.write($"{x} ") + where true + select unit; + + static bool isAllowed(int x) => + x != 20; + + static async IAsyncEnumerable> getOptionsAsync(int n) + { + foreach (var x in Range(1, n)) + { + var option = isAllowed(x) + ? OptionT.lift(IO.pure(x)) + : OptionT.None; + + var r = await Task.FromResult(option); + yield return r; + } + } +}