Is there an equivalent to GroupedWithin in language-ext? #1155
-
Hi there, This method collects elements from a stream (e.g. configured amount or just the elements from within the configured time). Is there such a functionality in language-ext? Thanks and Regards, |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Streams in language-ext can be achieved with Pipes, which is a part of the Effects system. What you're describing is a fold on a stream. Folds manage an aggregate value until a trigger event that then yields the value downstream. There's a simple example in the EffectsExamples: FoldTest sample. It listens to the stream of characters from the console and aggregates them into words. There's similar examples which chunk 80 characters at a time from any stream Outside of that the new functionality that I'm building into language-ext v5.0 will bring stream-like functionality to all monadic types (along with folding), but that's a ways off yet. Although this isn't exactly what you're asking for, if you're looking for actor-like functionality (like akka actors) then you can build them quite easily with a combination of Pipes and using System;
using LanguageExt;
using LanguageExt.Pipes;
using LanguageExt.Effects.Traits;
using static LanguageExt.Prelude;
using static LanguageExt.Pipes.Proxy;
public record Actor<RT, A>(Func<A, Eff<RT, Unit>> post, Eff<Unit> shutdown)
where RT : struct, HasCancel<RT>;
public static class Actor<RT, S, A>
where RT : struct, HasCancel<RT>
{
public static Eff<RT, Actor<RT, A>> observe(IObservable<A> stream, S initial, Func<S, A, Aff<RT, S>> inbox) =>
observe(stream, initial, map<A, A>(identity), inbox);
public static Eff<RT, Actor<RT, A>> observe<B>(
IObservable<A> stream,
S initial,
Pipe<RT, A, B, Unit> pipe,
Func<S, B, Aff<RT, S>> inbox)
{
var state = Atom(initial);
var queue = Queue<RT, A>();
var items = Producer.merge(queue, Proxy.observe(stream));
return from cancel in fork(items | pipe | message(state, inbox))
select new Actor<RT, A>(queue.EnqueueEff, cancel);
}
public static Eff<RT, Actor<RT, A>> spawn(S initial, Func<S, A, Aff<RT, S>> inbox) =>
spawn(initial, map<A, A>(identity), inbox);
public static Eff<RT, Actor<RT, A>> spawn<B>(
S initial,
Pipe<RT, A, B, Unit> pipe,
Func<S, B, Aff<RT, S>> inbox)
{
var state = Atom(initial);
var queue = Queue<RT, A>();
return from cancel in fork(queue | pipe | message(state, inbox))
select new Actor<RT, A>(queue.EnqueueEff, cancel);
}
static Consumer<RT, MSG, Unit> message<MSG>(Atom<S> state, Func<S, MSG, Aff<RT, S>> inbox) =>
from m in awaiting<MSG>()
from _ in state.SwapAff(s => inbox(s, m))
select unit;
} This allows the spawning of processes that either listen to observable streams and/or support direct posting of values (via |
Beta Was this translation helpful? Give feedback.
Streams in language-ext can be achieved with Pipes, which is a part of the Effects system. What you're describing is a fold on a stream. Folds manage an aggregate value until a trigger event that then yields the value downstream.
There's a simple example in the EffectsExamples: FoldTest sample. It listens to the stream of characters from the console and aggregates them into words.
There's similar examples which chunk 80 characters at a time from any stream
Outside of that the new functionality that I'm building into language-ext v5.0 will bring stream-like functionality to all monadic types (along with folding), but that's a ways off yet.
Although this isn't exactly what you're asking for…