diff --git a/src/tink/streams/Regrouper.hx b/src/tink/streams/Regrouper.hx new file mode 100644 index 0000000..85d33b5 --- /dev/null +++ b/src/tink/streams/Regrouper.hx @@ -0,0 +1,78 @@ +package tink.streams; + +import haxe.ds.ReadOnlyArray; +using tink.CoreApi; + + + +private typedef RegroupResultObject = { + converted:Stream, + ?leftover:Array, // re-populates the buffer +} + +@:forward +abstract RegroupResult(RegroupResultObject) from RegroupResultObject to RegroupResultObject { + @:from + public static inline function ofStream(stream:Stream):RegroupResult { + return {converted: stream, leftover: null}; + } +} + +enum RegroupStatus { + Flowing; + Final; +} + +private typedef RegrouperFn = (items:ReadOnlyArray, status:RegroupStatus)->Return>, Quality>; + +@:callable +abstract Regrouper(RegrouperFn) from RegrouperFn to RegrouperFn { + public inline function new(f) { + this = f; + } + + @:from + public static inline function ofSyncOutcome(f:(items:ReadOnlyArray, status:RegroupStatus)->Outcome>, Quality>):Regrouper { + return new Regrouper((items, status) -> Future.sync(f(items, status))); + } + + @:from + public static inline function ofSync(f:(items:ReadOnlyArray, status:RegroupStatus)->Option>):Regrouper { + return new Regrouper((items, status) -> Future.sync(f(items, status))); + } + + @:from + public static inline function ofStatusIgnorance(f:(items:ReadOnlyArray)->Future>>):Regrouper { + return new Regrouper((items, status) -> f(items)); + } + + @:from + public static inline function ofSyncOutcomeStatusIgnorance(f:(items:ReadOnlyArray)->Outcome>, Quality>):Regrouper { + return new Regrouper((items, status) -> Future.sync(f(items))); + } + + @:from + public static inline function ofSyncStatusIgnorance(f:(items:ReadOnlyArray)->Option>):Regrouper { + return new Regrouper((items, status) -> Future.sync(f(items))); + } + + @:from + public static inline function ofStreamOnly(f:(items:ReadOnlyArray, status:RegroupStatus)->Future>>):Regrouper { + return new Regrouper((items, status) -> f(items, status).map(o -> o.map(RegroupResult.ofStream))); + } + + @:from + public static inline function ofSyncStreamOnly(f:(items:ReadOnlyArray, status:RegroupStatus)->Option>):Regrouper { + return new Regrouper((items, status) -> Future.sync(f(items, status).map(RegroupResult.ofStream))); + } + + @:from + public static inline function ofStatusIgnoranceAndStreamOnly(f:(items:ReadOnlyArray)->Future>>):Regrouper { + return new Regrouper((items, status) -> f(items).map(o -> o.map(RegroupResult.ofStream))); + } + + @:from + public static inline function ofSyncStatusIgnoranceAndStreamOnly(f:(items:ReadOnlyArray)->Option>):Regrouper { + return new Regrouper((items, status) -> Future.sync(f(items).map(RegroupResult.ofStream))); + } +} \ No newline at end of file diff --git a/src/tink/streams/Stream.hx b/src/tink/streams/Stream.hx index a894752..8a6a38d 100644 --- a/src/tink/streams/Stream.hx +++ b/src/tink/streams/Stream.hx @@ -1,5 +1,6 @@ package tink.streams; +import tink.streams.Regrouper; import tink.core.Callback; using tink.CoreApi; @@ -45,6 +46,9 @@ abstract Stream(StreamObject) from StreamObject(f:Regrouper):Stream + return new RegroupStream(this, f); static public inline function empty():Stream return @:privateAccess @@ -218,7 +222,6 @@ private class Compound implements StreamObject { } private typedef Selector = In->Return, Quality>; - private class SelectStream implements StreamObject { final source:Stream; @@ -285,6 +288,80 @@ private class SelectStream implements StreamObject implements StreamObject { + final source:Stream; + final regrouper:Regrouper; + final buffer:Null>; + + public function new(source, regrouper, ?buffer) { + this.source = source; + this.regrouper = regrouper; + this.buffer = buffer; + } + + function continued(unconsumed:Stream, unprocessed:Stream, ?buffer:Array):Stream + return new Compound([unconsumed, new RegroupStream(unprocessed, regrouper, buffer)]); + + public function forEach(f:(item:Out)->Future>):Future> { + var buffer = this.buffer ?? []; + + return + source.forEach(i -> { + buffer.push(i); + regrouper(buffer, Flowing).asFuture() + .flatMap(regrouped -> switch regrouped { + case Success(None): // regrouper decided to do nothing + Future.sync(None); // continue source iteration + case Success(Some({converted: converted, leftover: leftover})): // regrouper converted some items + // reset buffer + buffer = leftover ?? []; + + // pass converted stream to consumer + // if consumer is done, continue source iteration + // else capture the unconsumed items + reuslt, then stop source iteration + converted.forEach(f).map(x -> switch x { + case Done: None; + case Stopped(unconsumed, result): Some(new Pair(unconsumed, Success(result))); + case Failed(unconsumed, e): Some(new Pair(cast unconsumed, cast Failure(e))); + }); + case Failure(e): // regrouper failed + buffer = []; + Future.sync(Some(new Pair(Stream.empty(), Failure(e)))); + }); + }).flatMap(function(r):Future> return switch r { + case Done if(buffer.length > 0): + // if everything has finished but there are still items in the buffer, + // give user a final chance to process them before ending the stream + regrouper(buffer, Final).asFuture() + .flatMap(regrouped -> switch regrouped { + case Success(None): // regrouper decided to do nothing + Future.sync(Done); + case Success(Some({converted: converted})): // leftover is ignored at final regroup + converted.forEach(f).map(x -> switch x { + case Done: Done; + case Stopped(unconsumed, result): Stopped(unconsumed, result); + case Failed(unconsumed, e): cast Failed(unconsumed, cast e); + }); + case Failure(e): + Future.sync(cast Failed(Stream.empty(), cast e)); + }); + case Done: + Future.sync(Done); + case Stopped(unprocessed, {a: unconsumed, b: result}): // the source stream can be stopped for 2 reasons: a. regroup failed, b. consumer stopped + final rest = continued(unconsumed, unprocessed, buffer); + Future.sync(switch result { + case Success(r): + Stopped(rest, r); + case Failure(e): + cast Failed(cast rest, cast e); + }); + case Failed(unprocessed, e): + Future.sync(cast Failed(cast continued(Stream.empty(), cast unprocessed, buffer), e)); + }); + } + +} private class Grouped implements StreamObject { final source:Stream, Quality>; diff --git a/tests/StreamTest.hx b/tests/StreamTest.hx index 6232e1b..07f1460 100644 --- a/tests/StreamTest.hx +++ b/tests/StreamTest.hx @@ -3,6 +3,8 @@ package; import tink.streams.IdealStream; import tink.streams.RealStream; import tink.streams.Stream; +import tink.streams.Return; +import haxe.ds.ReadOnlyArray; using StringTools; using tink.CoreApi; @@ -78,81 +80,107 @@ class StreamTest { return asserts; } - // public function testRegroup() { - - // var s = Stream.ofIterator(0...100); - - // var sum = 0; - // s.regroup(function (i:Array) return i.length == 5 ? Converted(Stream.single(i[0] + i[4])) : Untouched) - // .idealize(null).forEach(function (v) { - // sum += v; - // return Resume; - // }) - // .handle(function (x) switch x { - // case Depleted: - // asserts.assert(1980 == sum); - // case Halted(_): - // asserts.fail('Expected "Depleted"'); - // }); - - // var sum = 0; - // s.regroup(function (i:Array, s) { - // return if(s == Flowing) - // i.length == 3 ? Converted(Stream.single(i[0] + i[2])) : Untouched - // else - // Converted(Stream.single(i[0])); // TODO: test backoff / clog at last step - // }) - // .idealize(null).forEach(function (v) { - // sum += v; - // return Resume; - // }) - // .handle(function (x) switch x { - // case Depleted: - // asserts.assert(3333 == sum); - // case Halted(_): - // asserts.fail('Expected "Depleted"'); - // }); - - // var sum = 0; - // s.regroup(function (i:Array) return Converted([i[0], i[0]].iterator())) - // .idealize(null).forEach(function (v) { - // sum += v; - // return Resume; - // }) - // .handle(function (x) switch x { - // case Depleted: - // asserts.assert(9900 == sum); - // case Halted(_): - // asserts.fail('Expected "Depleted"'); - // }); - - // var sum = 0; - // s.regroup(function (i:Array, status:RegroupStatus) { - // var batch = null; - - // if(status == Ended) - // batch = i; - // else if(i.length > 3) - // batch = i.splice(0, 3); // leave one item in the buf - - // return if(batch != null) - // Converted(batch.iterator(), i) - // else - // Untouched; - // }) - // .idealize(null).forEach(function (v) { - // sum += v; - // return Resume; - // }) - // .handle(function (x) switch x { - // case Depleted: - // asserts.assert(4950 == sum); - // case Halted(_): - // asserts.fail('Expected "Depleted"'); - // }); + public function testRegroup() { - // return asserts.done(); - // } + var s = Stream.ofIterator(0...100); + + + final duplicated = s.regroup((i, _) -> Some(Stream.single(i[0])...Stream.single(i[0]))); + + final leftover = s.regroup((i, s) -> { + final i = i.copy(); + var batch = null; + + if(s == Final) + batch = i; + else if(i.length > 3) { // when there are 4 items in the buffer, consume 3 and leave 1 back into the stream + batch = i.splice(0, 3); + } + + return + if(batch != null) + Some({converted: Stream.ofIterator(batch.iterator()), leftover: i}) + else + None; + }); + + final skipped = s.regroup((i, s) -> + if(s == Flowing) + i.length == 3 ? Some(Stream.ofIterator([i[0], i[2]].iterator())) : None + else + Some(Stream.single(i[0])) // TODO: test backoff / clog at last step + ); + + computeSum(s.regroup((i, _) -> i.length == 5 ? Some(Stream.single(i[0] + i[4])) : None)) + .handle(sum -> asserts.assert(1980 == sum)); + + computeSum(skipped).handle(sum -> asserts.assert(3333 == sum)); + computeSum(duplicated).handle(sum -> asserts.assert(9900 == sum)); + computeSum(leftover).handle(sum -> asserts.assert(4950 == sum)); + + + function computeSumUpTo(at:Int) { + var sum = 0; + return v -> { + sum += v; + Future.sync(v == at ? Some({stoppedAt: at, sum: sum}) : None); + } + } + + duplicated + .forEach(computeSumUpTo(10)) + .handle(x -> switch x { + case Stopped(rest, v): + asserts.assert(v.stoppedAt == 10); + asserts.assert(v.sum == 100); + computeSum(rest).handle(restSum -> asserts.assert(restSum == 9800)); + case _: + asserts.fail('Expected "Stopped"'); + }); + + leftover + .forEach(computeSumUpTo(10)) + .handle(x -> switch x { + case Stopped(rest, v): + asserts.assert(v.stoppedAt == 10); + asserts.assert(v.sum == 55); + var restSum = 0; + computeSum(rest).handle(restSum -> asserts.assert(restSum == 4895)); + case _: + asserts.fail('Expected "Stopped"'); + }); + + return asserts.done(); + } + public function testRegroupError() { + + final s1:Stream = Stream.ofIterator(0...10); + final s2:Stream = Stream.ofIterator(0...10)...Stream.ofError(new Error('Foo'))...Stream.ofIterator(10...20); + final duplicated = s2.regroup(i -> Some(Stream.ofIterator([i[0], i[0]].iterator()))); + final invalid = s1.regroup((i, s) -> { + i[0] == 2 + ? Failure(new Error('Halted')) + : Success(Some({converted: ([i[0], i[0]].iterator():Stream), leftover: null})); + }); + + duplicated.forEach(i -> None).handle(r -> switch r { + case Failed(rest, e): + asserts.assert(e.message == 'Foo'); + computeSum(rest).handle(sum -> asserts.assert(sum == 290)); + case _: + asserts.fail('Expected "Failed"'); + }); + + invalid.forEach(i -> None).handle(r -> switch r { + case Failed(rest, e): + asserts.assert(e.message == 'Halted'); + computeSum(rest).handle(sum -> asserts.assert(sum == 84)); + case _: + asserts.fail('Expected "Failed"'); + }); + + return asserts.done(); + } public function testNested() { var n = Stream.ofIterator([Stream.ofIterator(0...3), Stream.ofIterator(3...6)].iterator()); @@ -356,4 +384,33 @@ class StreamTest { inline function ofOutcomes(i:Iterator>) { return Stream.ofIterator(i).map(function(v:Outcome) return v); } + + function join(s:Stream):Future> { + final arr = []; + return s.forEach(i -> { + arr.push(i); + None; + }).map(x -> switch x { + case Done: arr; + case v: throw 'Unreachable $v'; + }); + } + + function computeSum(s:Stream):Future { + var v = 0; + return s.forEach(i -> { + v += i; + None; + }).map(x -> switch x { + case Done: v; + case _: throw 'Unreachable ($x)'; + }); + } + + function print(s:Stream) { + return s.forEach(i -> { + trace(i); + None; + }).handle(v -> trace(v)); + } } \ No newline at end of file