From d06be56c41ec814ea7eb579a9282ec0b7161adb8 Mon Sep 17 00:00:00 2001 From: mat Date: Tue, 5 Sep 2023 10:50:18 +0800 Subject: [PATCH 1/2] Fix timer --- reactivex/__init__.py | 3 ++ reactivex/observable/timer.py | 10 +++--- tests/test_observable/test_timer.py | 51 +++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/reactivex/__init__.py b/reactivex/__init__.py index a707f978c..704185cb9 100644 --- a/reactivex/__init__.py +++ b/reactivex/__init__.py @@ -1153,6 +1153,9 @@ def timer( [ timer(2) ] --0-| + [ timer(2, 4) ] + --0----1----2-- + Examples: >>> res = reactivex.timer(datetime(...)) >>> res = reactivex.timer(datetime(...), 0.1) diff --git a/reactivex/observable/timer.py b/reactivex/observable/timer.py index 3bde5f406..9865ba413 100644 --- a/reactivex/observable/timer.py +++ b/reactivex/observable/timer.py @@ -35,14 +35,14 @@ def subscribe( observer: abc.ObserverBase[int], scheduler_: Optional[abc.SchedulerBase] = None ) -> abc.DisposableBase: _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() - nonlocal duetime + due_time = duetime - if not isinstance(duetime, datetime): - duetime = _scheduler.now + _scheduler.to_timedelta(duetime) + if not isinstance(due_time, datetime): + due_time = _scheduler.now + _scheduler.to_timedelta(due_time) p = max(0.0, _scheduler.to_seconds(period)) mad = MultipleAssignmentDisposable() - dt = duetime + dt = due_time count = 0 def action(scheduler: abc.SchedulerBase, state: Any) -> None: @@ -107,7 +107,7 @@ def action(count: Optional[int] = None) -> Optional[int]: return None if not isinstance(_scheduler, PeriodicScheduler): - raise ValueError("Sceduler must be PeriodicScheduler") + raise ValueError("Scheduler must be PeriodicScheduler") return _scheduler.schedule_periodic(period, action, state=0) return Observable(subscribe) diff --git a/tests/test_observable/test_timer.py b/tests/test_observable/test_timer.py index 7e0a99718..2386d1157 100644 --- a/tests/test_observable/test_timer.py +++ b/tests/test_observable/test_timer.py @@ -1,6 +1,7 @@ import unittest import reactivex +from reactivex import operators from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -126,3 +127,53 @@ def create(): results = scheduler.start(create) assert results.messages == [on_next(500, 0), on_next(800, 1)] + + def test_periodic_timer_second_subscription(self): + scheduler = TestScheduler() + t = reactivex.timer(duetime=200, period=300, scheduler=scheduler) + + def create(): + return reactivex.merge( + t.pipe(operators.map(lambda x: (x, "first"))), + reactivex.concat(reactivex.timer(100, scheduler=scheduler), t).pipe( + operators.map(lambda x: (x, "second")) + ), + ) + + results = scheduler.start(create) + assert results.messages == [ + on_next(300, (0, "second")), + on_next(400, (0, "first")), + on_next(500, (0, "second")), + on_next(700, (1, "first")), + on_next(800, (1, "second")), + ] + + def test_on_off_timer_repeat(self): + scheduler = TestScheduler() + t = reactivex.timer(duetime=230, scheduler=scheduler) + + def create(): + return t.pipe(operators.repeat()) + + results = scheduler.start(create) + assert results.messages == [ + on_next(430, 0), + on_next(660, 0), + on_next(890, 0), + ] + + def test_periodic_timer_repeat(self): + scheduler = TestScheduler() + t = reactivex.timer(duetime=130, period=200, scheduler=scheduler) + + def create(): + return t.pipe(operators.take(3), operators.repeat()) + + results = scheduler.start(create) + assert results.messages == [ + on_next(330, 0), + on_next(530, 1), + on_next(730, 2), + on_next(860, 0), + ] From d4307061c57345af0dc7fc0ca766c79f05f6638a Mon Sep 17 00:00:00 2001 From: mat Date: Tue, 5 Sep 2023 11:04:00 +0800 Subject: [PATCH 2/2] Add tests for absolute first emit --- tests/test_observable/test_timer.py | 71 ++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 16 deletions(-) diff --git a/tests/test_observable/test_timer.py b/tests/test_observable/test_timer.py index 2386d1157..98af4d855 100644 --- a/tests/test_observable/test_timer.py +++ b/tests/test_observable/test_timer.py @@ -128,6 +128,60 @@ def create(): results = scheduler.start(create) assert results.messages == [on_next(500, 0), on_next(800, 1)] + def test_periodic_timer_repeat(self): + scheduler = TestScheduler() + t = reactivex.timer(duetime=130, period=200, scheduler=scheduler) + + def create(): + return t.pipe(operators.take(3), operators.repeat()) + + results = scheduler.start(create) + assert results.messages == [ + on_next(330, 0), + on_next(530, 1), + on_next(730, 2), + on_next(860, 0), + ] + + def test_periodic_timer_repeat_with_absolute_datetime(self): + scheduler = TestScheduler() + t = reactivex.timer( + duetime=scheduler.to_datetime(360), period=200, scheduler=scheduler + ) # here we have an absolute first value, so on second subscription, the timer should emit immediately + + def create(): + return t.pipe(operators.take(3), operators.repeat()) + + results = scheduler.start(create) + assert results.messages == [ + on_next(360, 0), + on_next(560, 1), + on_next(760, 2), + on_next( + 760, 0 + ), # our duetime is absolute and in the past so new sub emits immediately + on_next(960, 1), + ] + + def test_periodic_timer_repeat_with_relative_timespan(self): + scheduler = TestScheduler() + t = reactivex.timer( + duetime=scheduler.to_timedelta(130), + period=scheduler.to_timedelta(250), + scheduler=scheduler, + ) + + def create(): + return t.pipe(operators.take(3), operators.repeat()) + + results = scheduler.start(create) + assert results.messages == [ + on_next(330, 0), + on_next(580, 1), + on_next(830, 2), + on_next(960, 0), + ] + def test_periodic_timer_second_subscription(self): scheduler = TestScheduler() t = reactivex.timer(duetime=200, period=300, scheduler=scheduler) @@ -149,7 +203,7 @@ def create(): on_next(800, (1, "second")), ] - def test_on_off_timer_repeat(self): + def test_one_off_timer_repeat(self): scheduler = TestScheduler() t = reactivex.timer(duetime=230, scheduler=scheduler) @@ -162,18 +216,3 @@ def create(): on_next(660, 0), on_next(890, 0), ] - - def test_periodic_timer_repeat(self): - scheduler = TestScheduler() - t = reactivex.timer(duetime=130, period=200, scheduler=scheduler) - - def create(): - return t.pipe(operators.take(3), operators.repeat()) - - results = scheduler.start(create) - assert results.messages == [ - on_next(330, 0), - on_next(530, 1), - on_next(730, 2), - on_next(860, 0), - ]