Skip to content

Commit

Permalink
add Observable#bufferTimed
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Jun 22, 2024
1 parent a0d8382 commit 8ac6997
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 5 deletions.
41 changes: 36 additions & 5 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,12 @@ object Observable {

def parMapFuture[B](f: A => Future[B]): Observable[B] = parMapEffect(a => IO.fromFuture(IO(f(a))))

def discard: Observable[Nothing] = Observable.empty.subscribing(source)
def void: Observable[Unit] = map(_ => ())
def as[B](value: B): Observable[B] = map(_ => value)
def asEval[B](value: => B): Observable[B] = map(_ => value)
def discard: Observable[Nothing] = Observable.empty.subscribing(source)
def void: Observable[Unit] = map(_ => ())
def as[B](value: B): Observable[B] = map(_ => value)
def asEval[B](value: => B): Observable[B] = map(_ => value)
def asEffect[F[_]: RunEffect, B](value: => F[B]): Observable[B] = mapEffect(_ => value)
def asFuture[B](value: => Future[B]): Observable[B] = mapFuture(_ => value)
def asFuture[B](value: => Future[B]): Observable[B] = mapFuture(_ => value)

def mapFilter[B](f: A => Option[B]): Observable[B] = new Observable[B] {
def unsafeSubscribe(sink: Observer[B]): Cancelable = source.unsafeSubscribe(sink.contramapFilter(f))
Expand Down Expand Up @@ -1316,6 +1316,37 @@ object Observable {
}
}

@inline def bufferTimed[Col[_]](duration: FiniteDuration)(implicit factory: Factory[A, Col[A]]): Observable[Col[A]] = bufferTimedMillis(
duration.toMillis.toInt,
)

def bufferTimedMillis[Col[_]](duration: Int)(implicit factory: Factory[A, Col[A]]): Observable[Col[A]] = new Observable[Col[A]] {
def unsafeSubscribe(sink: Observer[Col[A]]): Cancelable = {
var isCancel = false
var builder = factory.newBuilder

def send(): Unit = {
sink.unsafeOnNext(builder.result())
builder = factory.newBuilder
}

val intervalId = timers.setInterval(duration.toDouble) { if (!isCancel) send() }

Cancelable.composite(
Cancelable { () =>
isCancel = true
timers.clearInterval(intervalId)
},
source.unsafeSubscribe(
Observer.createUnrecovered(
value => builder += value,
sink.unsafeOnError,
),
),
)
}
}

def evalOn(ec: ExecutionContext): Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = {
var isCancel = false
Expand Down
39 changes: 39 additions & 0 deletions colibri/src/test/scala/colibri/ObservableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1931,4 +1931,43 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {

test.unsafeToFuture()
}

it should "bufferTimed" in {
var received = List.empty[Int]
var errors = 0
val subject = Subject.behavior[Int](0)
val stream = subject.bufferTimedMillis[Vector](100)

import scala.concurrent.duration._

val cancelable = stream.unsafeSubscribe(
Observer.create[Vector[Int]](
received ++= _,
_ => errors += 1,
),
)
cancelable.isEmpty() shouldBe false
received shouldBe List()

val test = for {
_ <- subject.onNextIO(1)
_ = received shouldBe List()
_ <- IO.sleep(50.millis)
_ <- subject.onNextIO(2)
_ = received shouldBe List()
_ <- IO.sleep(50.millis)
_ <- subject.onNextIO(3)
_ = received shouldBe List(0, 1, 2)
_ <- subject.onNextIO(4)
_ = received shouldBe List(0, 1, 2)
_ <- IO.sleep(200.millis)
_ <- subject.onNextIO(5)
_ = received shouldBe List(0, 1, 2, 3, 4)
_ <- IO.sleep(100.millis)
_ = received shouldBe List(0, 1, 2, 3, 4, 5)
_ = errors shouldBe 0
} yield succeed

test.unsafeToFuture()
}
}

0 comments on commit 8ac6997

Please sign in to comment.