diff --git a/tests/reactor/conftest.py b/tests/reactor/conftest.py index 5f752f2a..0d51caa0 100644 --- a/tests/reactor/conftest.py +++ b/tests/reactor/conftest.py @@ -1,4 +1,5 @@ import asyncio +import time import pytest from asynctest import CoroutineMock @@ -6,6 +7,7 @@ from kopf.clients.watching import streaming_watch from kopf.reactor.queueing import watcher from kopf.reactor.queueing import worker as original_worker +from kopf.structs.configuration import OperatorSettings @pytest.fixture(autouse=True) @@ -19,15 +21,19 @@ def processor(): return CoroutineMock() +# Code overhead is not used, but is needed to order the fixtures: first, +# the measurement, which requires the real worker; then, the worker mocking. @pytest.fixture() -def worker_spy(mocker): +def worker_spy(mocker, watcher_code_overhead): """ Spy on the watcher: actually call it, but provide the mock-fields. """ spy = CoroutineMock(spec=original_worker, wraps=original_worker) return mocker.patch('kopf.reactor.queueing.worker', spy) +# Code overhead is not used, but is needed to order the fixtures: first, +# the measurement, which requires the real worker; then, the worker mocking. @pytest.fixture() -def worker_mock(mocker): +def worker_mock(mocker, watcher_code_overhead): """ Prevent the queue consumption, so that the queues could be checked. """ return mocker.patch('kopf.reactor.queueing.worker') @@ -67,3 +73,73 @@ async def do_nothing(*args, **kwargs): event_loop.run_until_complete(task) except asyncio.CancelledError: pass + + +@pytest.fixture() +async def watcher_code_overhead(resource, stream, aresponses, watcher_limited, timer) -> float: + """ + Estimate the overhead of synchronous code in the watching routines. + + The code overhead is caused by Kopf's and tests' own low-level activities: + the code of ``watcher()``/``worker()`` itself, including a job scheduler, + the local ``aresponses`` server, the API communication with that server + in ``aiohttp``, serialization/deserialization in ``kopf.clients``, etc. + + The actual aspect being tested are the ``watcher()``/``worker()`` routines: + their input/output and their timing regarding the blocking queue operations + or explicit sleeps, not the timing of underlying low-level activities. + So, the expected values for the durations of the call are adjusted for + the estimated code overhead before asserting them. + + .. note:: + + The tests are designed with small timeouts to run fast, so that + the whole test-suite with thousands of tests is not delayed much. + Once there is a way to simulate asyncio time like with ``freezegun``, + or ``freezegun`` supports asyncio time, the problem can be solved by + using the lengthy timeouts and ignoring the code overhead._ + + The estimation of the overhead is measured by running a single-event cycle, + which means one worker only, but with batching of events disabled. This + ensures that only the fastest way is executed: no explicit or implicit + sleeps are used (e.g. as in getting from an empty queue with timeouts). + + Extra 10-30% are added to the measured overhead to ensure that the future + code executions would fit into the estimation despite the variations. + + Empirically, the overhead usually remains within the range of 50-150 ms. + It does not depend on the number of events or unique uids in the stream. + It does depend on the hardware used, or containers in the CI systems. + """ + + # We feed the stream and consume the stream before we go into the tests, + # which can feed the stream with their own events. + stream.feed([ + {'type': 'ADDED', 'object': {'metadata': {'uid': 'uid'}}}, + ]) + stream.close() + + # We use our own fixtures -- to not collide with the tests' fixtures. + processor = CoroutineMock() + settings = OperatorSettings() + settings.batching.batch_window = 0 + settings.batching.idle_timeout = 1 + settings.batching.exit_timeout = 1 + + with timer: + await watcher( + namespace=None, + resource=resource, + settings=settings, + processor=processor, + ) + + # Ensure that everything worked as expected, i.e. the worker is not mocked, + # and the whole code is actually executed down to the processor callback. + assert processor.awaited, "The processor is not called for code overhead measurement." + aresponses._responses[:] = [] + + # Uncomment for debugging of the actual timing: visible only with -s pytest option. + # print(f"The estimated code overhead is {timer.seconds:.3f} seconds (unadjusted).") + + return timer.seconds * 1.33 diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index 299c184f..e23cd09a 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -18,10 +18,6 @@ from kopf.reactor.queueing import watcher, EOS -# An overhead for the sync logic in async tests. Guesstimated empirically: -# 10ms is too fast, 200ms is too slow, 50-150ms is good enough (can vary). -CODE_OVERHEAD = 0.130 - @pytest.mark.parametrize('uids, cnts, events', [ @@ -46,7 +42,8 @@ ]) @pytest.mark.usefixtures('watcher_limited') async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor, - settings, stream, events, uids, cnts): + settings, stream, events, uids, cnts, + watcher_code_overhead): """ Verify that every unique uid goes into its own queue+worker, which are never shared. """ # Inject the events of unique objects - to produce few streams/workers. @@ -63,7 +60,7 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor ) # The streams are not cleared by the mocked worker, but the worker exits fast. - assert timer.seconds < CODE_OVERHEAD + assert timer.seconds < watcher_code_overhead # The processor must not be called by the watcher, only by the worker. # But the worker (even if mocked) must be called & awaited by the watcher. @@ -116,7 +113,8 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor ]) @pytest.mark.usefixtures('watcher_limited') async def test_watchevent_batching(settings, resource, processor, timer, - stream, events, uids, vals): + stream, events, uids, vals, + watcher_code_overhead): """ Verify that only the last event per uid is actually handled. """ # Override the default timeouts to make the tests faster. @@ -138,8 +136,8 @@ async def test_watchevent_batching(settings, resource, processor, timer, ) # Significantly less than the queue getting timeout, but sufficient to run. - # 2 <= 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough. - assert timer.seconds < settings.batching.batch_window + CODE_OVERHEAD + # 2x: 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough. + assert timer.seconds < settings.batching.batch_window + watcher_code_overhead # Was the processor called at all? Awaited as needed for async fns? assert processor.awaited @@ -171,7 +169,8 @@ async def test_watchevent_batching(settings, resource, processor, timer, ]) @pytest.mark.usefixtures('watcher_in_background') -async def test_garbage_collection_of_streams(settings, stream, events, unique, worker_spy): +async def test_garbage_collection_of_streams(settings, stream, events, unique, worker_spy, + watcher_code_overhead): # Override the default timeouts to make the tests faster. settings.batching.idle_timeout = 0.5 @@ -201,7 +200,7 @@ async def test_garbage_collection_of_streams(settings, stream, events, unique, w # Once the idle timeout, they will exit and gc their individual streams. await asyncio.sleep(settings.batching.batch_window) # depleting the queues. await asyncio.sleep(settings.batching.idle_timeout) # idling on empty queues. - await asyncio.sleep(CODE_OVERHEAD) + await asyncio.sleep(watcher_code_overhead) # The mutable(!) streams dict is now empty, i.e. garbage-collected. assert len(streams) == 0