Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

put back regroup() in simplify branch #30

Open
wants to merge 1 commit into
base: simplify
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/tink/streams/Regrouper.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package tink.streams;

import haxe.ds.ReadOnlyArray;
using tink.CoreApi;



private typedef RegroupResultObject<In, Out, Quality> = {
converted:Stream<Out, Quality>,
?leftover:Array<In>, // re-populates the buffer
}

@:forward
abstract RegroupResult<In, Out, Quality>(RegroupResultObject<In, Out, Quality>) from RegroupResultObject<In, Out, Quality> to RegroupResultObject<In, Out, Quality> {
@:from
public static inline function ofStream<In, Out, Quality>(stream:Stream<Out, Quality>):RegroupResult<In, Out, Quality> {
return {converted: stream, leftover: null};
}
}

enum RegroupStatus {
Flowing;
Final;
}

private typedef RegrouperFn<In, Out, Quality> = (items:ReadOnlyArray<In>, status:RegroupStatus)->Return<Option<RegroupResult<In, Out, Quality>>, Quality>;

@:callable
abstract Regrouper<In, Out, Quality>(RegrouperFn<In, Out, Quality>) from RegrouperFn<In, Out, Quality> to RegrouperFn<In, Out, Quality> {
public inline function new(f) {
this = f;
}

@:from
public static inline function ofSyncOutcome<In, Out, Quality>(f:(items:ReadOnlyArray<In>, status:RegroupStatus)->Outcome<Option<RegroupResult<In, Out, Quality>>, Quality>):Regrouper<In, Out, Quality> {
return new Regrouper((items, status) -> Future.sync(f(items, status)));
}

@:from
public static inline function ofSync<In, Out, Quality>(f:(items:ReadOnlyArray<In>, status:RegroupStatus)->Option<RegroupResult<In, Out, Quality>>):Regrouper<In, Out, Quality> {
return new Regrouper((items, status) -> Future.sync(f(items, status)));
}

@:from
public static inline function ofStatusIgnorance<In, Out, Quality>(f:(items:ReadOnlyArray<In>)->Future<Option<RegroupResult<In, Out, Quality>>>):Regrouper<In, Out, Quality> {
return new Regrouper((items, status) -> f(items));
}

@:from
public static inline function ofSyncOutcomeStatusIgnorance<In, Out, Quality>(f:(items:ReadOnlyArray<In>)->Outcome<Option<RegroupResult<In, Out, Quality>>, Quality>):Regrouper<In, Out, Quality> {
return new Regrouper((items, status) -> Future.sync(f(items)));
}

@:from
public static inline function ofSyncStatusIgnorance<In, Out, Quality>(f:(items:ReadOnlyArray<In>)->Option<RegroupResult<In, Out, Quality>>):Regrouper<In, Out, Quality> {
return new Regrouper((items, status) -> Future.sync(f(items)));
}

@:from
public static inline function ofStreamOnly<In, Out, Quality>(f:(items:ReadOnlyArray<In>, status:RegroupStatus)->Future<Option<Stream<Out, Quality>>>):Regrouper<In, Out, Quality> {
return new Regrouper((items, status) -> f(items, status).map(o -> o.map(RegroupResult.ofStream)));
}

@:from
public static inline function ofSyncStreamOnly<In, Out, Quality>(f:(items:ReadOnlyArray<In>, status:RegroupStatus)->Option<Stream<Out, Quality>>):Regrouper<In, Out, Quality> {
return new Regrouper((items, status) -> Future.sync(f(items, status).map(RegroupResult.ofStream)));
}

@:from
public static inline function ofStatusIgnoranceAndStreamOnly<In, Out, Quality>(f:(items:ReadOnlyArray<In>)->Future<Option<Stream<Out, Quality>>>):Regrouper<In, Out, Quality> {
return new Regrouper((items, status) -> f(items).map(o -> o.map(RegroupResult.ofStream)));
}

@:from
public static inline function ofSyncStatusIgnoranceAndStreamOnly<In, Out, Quality>(f:(items:ReadOnlyArray<In>)->Option<Stream<Out, Quality>>):Regrouper<In, Out, Quality> {
return new Regrouper((items, status) -> Future.sync(f(items).map(RegroupResult.ofStream)));
}
}
79 changes: 78 additions & 1 deletion src/tink/streams/Stream.hx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tink.streams;

import tink.streams.Regrouper;
import tink.core.Callback;
using tink.CoreApi;

Expand Down Expand Up @@ -45,6 +46,9 @@ abstract Stream<Item, Quality>(StreamObject<Item, Quality>) from StreamObject<It
case Success(data): Success(Some(data));// BUG: Success(data) compiles
case Failure(failure): Failure(failure);
}));

public function regroup<R>(f:Regrouper<Item, R, Quality>):Stream<R, Quality>
return new RegroupStream(this, f);

static public inline function empty<Item, Quality>():Stream<Item, Quality>
return @:privateAccess
Expand Down Expand Up @@ -218,7 +222,6 @@ private class Compound<Item, Quality> implements StreamObject<Item, Quality> {
}

private typedef Selector<In, Out, Quality> = In->Return<Option<Out>, Quality>;

private class SelectStream<In, Out, Quality> implements StreamObject<Out, Quality> {

final source:Stream<In, Quality>;
Expand Down Expand Up @@ -285,6 +288,80 @@ private class SelectStream<In, Out, Quality> implements StreamObject<Out, Qualit
);
}


private class RegroupStream<In, Out, Quality> implements StreamObject<Out, Quality> {
final source:Stream<In, Quality>;
final regrouper:Regrouper<In, Out, Quality>;
final buffer:Null<Array<In>>;

public function new(source, regrouper, ?buffer) {
this.source = source;
this.regrouper = regrouper;
this.buffer = buffer;
}

function continued(unconsumed:Stream<Out, Quality>, unprocessed:Stream<In, Quality>, ?buffer:Array<In>):Stream<Out, Quality>
return new Compound([unconsumed, new RegroupStream(unprocessed, regrouper, buffer)]);

public function forEach<Result>(f:(item:Out)->Future<Option<Result>>):Future<IterationResult<Out, Result, Quality>> {
var buffer = this.buffer ?? [];

return
source.forEach(i -> {
buffer.push(i);
regrouper(buffer, Flowing).asFuture()
.flatMap(regrouped -> switch regrouped {
case Success(None): // regrouper decided to do nothing
Future.sync(None); // continue source iteration
case Success(Some({converted: converted, leftover: leftover})): // regrouper converted some items
// reset buffer
buffer = leftover ?? [];

// pass converted stream to consumer
// if consumer is done, continue source iteration
// else capture the unconsumed items + reuslt, then stop source iteration
converted.forEach(f).map(x -> switch x {
case Done: None;
case Stopped(unconsumed, result): Some(new Pair(unconsumed, Success(result)));
case Failed(unconsumed, e): Some(new Pair(cast unconsumed, cast Failure(e)));
});
case Failure(e): // regrouper failed
buffer = [];
Future.sync(Some(new Pair(Stream.empty(), Failure(e))));
});
}).flatMap(function(r):Future<IterationResult<Out, Result, Quality>> return switch r {
case Done if(buffer.length > 0):
// if everything has finished but there are still items in the buffer,
// give user a final chance to process them before ending the stream
regrouper(buffer, Final).asFuture()
.flatMap(regrouped -> switch regrouped {
case Success(None): // regrouper decided to do nothing
Future.sync(Done);
case Success(Some({converted: converted})): // leftover is ignored at final regroup
converted.forEach(f).map(x -> switch x {
case Done: Done;
case Stopped(unconsumed, result): Stopped(unconsumed, result);
case Failed(unconsumed, e): cast Failed(unconsumed, cast e);
});
case Failure(e):
Future.sync(cast Failed(Stream.empty(), cast e));
});
case Done:
Future.sync(Done);
case Stopped(unprocessed, {a: unconsumed, b: result}): // the source stream can be stopped for 2 reasons: a. regroup failed, b. consumer stopped
final rest = continued(unconsumed, unprocessed, buffer);
Future.sync(switch result {
case Success(r):
Stopped(rest, r);
case Failure(e):
cast Failed(cast rest, cast e);
});
case Failed(unprocessed, e):
Future.sync(cast Failed(cast continued(Stream.empty(), cast unprocessed, buffer), e));
});
}

}
private class Grouped<Item, Quality> implements StreamObject<Item, Quality> {
final source:Stream<Array<Item>, Quality>;

Expand Down
Loading