From 7f010925006c0b674154138b2c43638082280438 Mon Sep 17 00:00:00 2001 From: Felix Dietze Date: Thu, 15 Dec 2022 16:39:00 +0100 Subject: [PATCH 1/3] Var.sequence: only trigger outer Rx if collection size changed --- .../scala/colibri/reactive/Reactive.scala | 61 ++++++++++--------- .../src/test/scala/colibri/ReactiveSpec.scala | 38 ++++++++++++ 2 files changed, 71 insertions(+), 28 deletions(-) diff --git a/reactive/src/main/scala/colibri/reactive/Reactive.scala b/reactive/src/main/scala/colibri/reactive/Reactive.scala index 5723518c..0bb7e348 100644 --- a/reactive/src/main/scala/colibri/reactive/Reactive.scala +++ b/reactive/src/main/scala/colibri/reactive/Reactive.scala @@ -7,6 +7,7 @@ import monocle.{Iso, Lens, Prism} import scala.concurrent.Future import scala.reflect.ClassTag +import collection.mutable trait Rx[+A] { def observable: Observable[A] @@ -158,39 +159,43 @@ object Var { def combine[A](read: Rx[A], write: RxWriter[A]): Var[A] = new VarCombine(read, write) @inline implicit class SeqVarOperations[A](rxvar: Var[Seq[A]]) { - def sequence(implicit owner: Owner): Rx[Seq[Var[A]]] = Rx.observableSync(new Observable[Seq[Var[A]]] { - - def unsafeSubscribe(sink: Observer[Seq[Var[A]]]): Cancelable = { - rxvar.observable.unsafeSubscribe( - Observer.create( - { seq => - sink.unsafeOnNext(seq.zipWithIndex.map { case (a, idx) => - val observer = new Observer[A] { - def unsafeOnNext(value: A): Unit = { - rxvar.set(seq.updated(idx, value)) - } - def unsafeOnError(error: Throwable): Unit = { - sink.unsafeOnError(error) + def sequence(implicit owner: Owner): Rx[Seq[Var[A]]] = { + val observable = new Observable[Seq[Var[A]]] { + def unsafeSubscribe(sink: Observer[Seq[Var[A]]]): Cancelable = { + // keep a var for every index of the original sequence + val vars = mutable.ArrayBuffer.empty[Var[A]] + + def createIndexVar(idx:Int, seed: A):Var[A] = { + Var[A](seed).transformVarRxWriter{ _.contramap{ newValue => + rxvar.update(_.updated(idx, newValue)) + newValue + }} + } + + rxvar.observable.unsafeSubscribe( + Observer.create( + consume = { seq => + if(seq.size < vars.size) { + vars.remove(seq.size, vars.size - seq.size) + } else if(seq.size > vars.size) { + for((elem,idx) <- seq.zipWithIndex.takeRight(seq.size - vars.size)) { + vars += createIndexVar(idx, seed= elem) } } - val observable = new Observable.Value[A] { - def now(): A = a - - def unsafeSubscribe(sink: Observer[A]): Cancelable = { - sink.unsafeOnNext(a) - Cancelable.empty - } - + assert(seq.size == vars.size) + for ((newValue, elemVar) <- seq.zip(vars)) { + elemVar.set(newValue) } - Var.combine(Rx.observable(observable)(seed = a), RxWriter.observer(observer)) - }) - }, - sink.unsafeOnError, - ), - ) + sink.unsafeOnNext(vars.toSeq) + }, + failure = sink.unsafeOnError, + ), + ) + } } - }) + Rx.observableSync(observable) + } } @inline implicit class OptionVarOperations[A](rxvar: Var[Option[A]]) { diff --git a/reactive/src/test/scala/colibri/ReactiveSpec.scala b/reactive/src/test/scala/colibri/ReactiveSpec.scala index 9428b545..6064b6fe 100644 --- a/reactive/src/test/scala/colibri/ReactiveSpec.scala +++ b/reactive/src/test/scala/colibri/ReactiveSpec.scala @@ -655,6 +655,44 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { sequence.now()(0).set(2) variable.now() shouldBe Seq(2) } + + { + // only trigger outer Rx if collection size changed + val variable = Var[Seq[Int]](Seq(1,2)) + val sequence: Rx[Seq[Var[Int]]] = variable.sequence + var sequenceTriggered = 0 + sequence.foreach(_ => sequenceTriggered += 1) + + sequenceTriggered shouldBe 1 + sequence.now().size shouldBe 2 + + sequence.now()(0).set(3) + sequence.now().size shouldBe 2 + sequenceTriggered shouldBe 1 + + variable.set(Seq(3,4)) + sequence.now().size shouldBe 2 + sequence.now()(0).now() shouldBe 3 + sequence.now()(1).now() shouldBe 4 + sequenceTriggered shouldBe 1 + + variable.set(Seq(3,4,5,6)) + sequence.now().size shouldBe 4 + sequence.now()(0).now() shouldBe 3 + sequence.now()(1).now() shouldBe 4 + sequence.now()(2).now() shouldBe 5 + sequenceTriggered shouldBe 2 + + variable.set(Seq(7)) + sequence.now().size shouldBe 1 + sequence.now()(0).now() shouldBe 7 + sequenceTriggered shouldBe 3 + + variable.set(Seq(8)) + sequence.now().size shouldBe 1 + sequence.now()(0).now() shouldBe 8 + sequenceTriggered shouldBe 3 + } }).unsafeRunSync() it should "sequence on Var[Option[T]]" in Owned(SyncIO { From e7dd4fb26d0f335ce1cf93d6d7775ee630d869cc Mon Sep 17 00:00:00 2001 From: Felix Dietze Date: Thu, 15 Dec 2022 19:21:57 +0100 Subject: [PATCH 2/3] trigger build From 0f271a81a58e43cb06b9136fec3a29bcb825aeef Mon Sep 17 00:00:00 2001 From: johannes karoff Date: Fri, 17 Nov 2023 18:43:12 +0100 Subject: [PATCH 3/3] less triggers in sequence --- .../scala/colibri/reactive/Reactive.scala | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/reactive/src/main/scala/colibri/reactive/Reactive.scala b/reactive/src/main/scala/colibri/reactive/Reactive.scala index 464d0aed..33f6bc9d 100644 --- a/reactive/src/main/scala/colibri/reactive/Reactive.scala +++ b/reactive/src/main/scala/colibri/reactive/Reactive.scala @@ -343,28 +343,39 @@ object Var { // keep a var for every index of the original sequence val vars = mutable.ArrayBuffer.empty[Var[A]] - def createIndexVar(idx:Int, seed: A):Var[A] = { - Var[A](seed).transformVarWrite{ _.contramap{ newValue => - rxvar.update(_.updated(idx, newValue)) - newValue - }} + def createIndexVar(idx: Int, seed: A): Var[A] = { + Var[A](seed).transformVarWrite { + _.contramap { newValue => + rxvar.update(_.updated(idx, newValue)) + newValue + } + } } - rxvar.observable.unsafeSubscribe( + rxvar.observable.zipWithIndex.unsafeSubscribe( Observer.create( - consume = { seq => - if(seq.size < vars.size) { + consume = { case (seq, idx) => + val needsRetrigger = if (seq.size < vars.size) { vars.remove(seq.size, vars.size - seq.size) - } else if(seq.size > vars.size) { - for((elem,idx) <- seq.zipWithIndex.takeRight(seq.size - vars.size)) { - vars += createIndexVar(idx, seed= elem) + true + } else if (seq.size > vars.size) { + for ((elem, idx) <- seq.zipWithIndex.takeRight(seq.size - vars.size)) { + vars += createIndexVar(idx, seed = elem) } + true + } else { + idx == 0 } + assert(seq.size == vars.size) + for ((newValue, elemVar) <- seq.zip(vars)) { elemVar.set(newValue) } - sink.unsafeOnNext(vars.toSeq) + + if (needsRetrigger) { + sink.unsafeOnNext(vars.toSeq) + } }, failure = sink.unsafeOnError, ),