Skip to content

Commit

Permalink
Measure code overhead depending on the environment the tests run in
Browse files Browse the repository at this point in the history
  • Loading branch information
nolar committed Aug 29, 2020
1 parent 342ee78 commit bbffbcf
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 13 deletions.
80 changes: 78 additions & 2 deletions tests/reactor/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import asyncio
import time

import pytest
from asynctest import CoroutineMock

from kopf.clients.watching import streaming_watch
from kopf.reactor.queueing import watcher
from kopf.reactor.queueing import worker as original_worker
from kopf.structs.configuration import OperatorSettings


@pytest.fixture(autouse=True)
Expand All @@ -19,15 +21,19 @@ def processor():
return CoroutineMock()


# Code overhead is not used, but is needed to order the fixtures: first,
# the measurement, which requires the real worker; then, the worker mocking.
@pytest.fixture()
def worker_spy(mocker):
def worker_spy(mocker, watcher_code_overhead):
""" Spy on the watcher: actually call it, but provide the mock-fields. """
spy = CoroutineMock(spec=original_worker, wraps=original_worker)
return mocker.patch('kopf.reactor.queueing.worker', spy)


# Code overhead is not used, but is needed to order the fixtures: first,
# the measurement, which requires the real worker; then, the worker mocking.
@pytest.fixture()
def worker_mock(mocker):
def worker_mock(mocker, watcher_code_overhead):
""" Prevent the queue consumption, so that the queues could be checked. """
return mocker.patch('kopf.reactor.queueing.worker')

Expand Down Expand Up @@ -67,3 +73,73 @@ async def do_nothing(*args, **kwargs):
event_loop.run_until_complete(task)
except asyncio.CancelledError:
pass


@pytest.fixture()
async def watcher_code_overhead(resource, stream, aresponses, watcher_limited, timer) -> float:
"""
Estimate the overhead of synchronous code in the watching routines.
The code overhead is caused by Kopf's and tests' own low-level activities:
the code of ``watcher()``/``worker()`` itself, including a job scheduler,
the local ``aresponses`` server, the API communication with that server
in ``aiohttp``, serialization/deserialization in ``kopf.clients``, etc.
The actual aspect being tested are the ``watcher()``/``worker()`` routines:
their input/output and their timing regarding the blocking queue operations
or explicit sleeps, not the timing of underlying low-level activities.
So, the expected values for the durations of the call are adjusted for
the estimated code overhead before asserting them.
.. note::
The tests are designed with small timeouts to run fast, so that
the whole test-suite with thousands of tests is not delayed much.
Once there is a way to simulate asyncio time like with ``freezegun``,
or ``freezegun`` supports asyncio time, the problem can be solved by
using the lengthy timeouts and ignoring the code overhead._
The estimation of the overhead is measured by running a single-event cycle,
which means one worker only, but with batching of events disabled. This
ensures that only the fastest way is executed: no explicit or implicit
sleeps are used (e.g. as in getting from an empty queue with timeouts).
Extra 10-30% are added to the measured overhead to ensure that the future
code executions would fit into the estimation despite the variations.
Empirically, the overhead usually remains within the range of 50-150 ms.
It does not depend on the number of events or unique uids in the stream.
It does depend on the hardware used, or containers in the CI systems.
"""

# We feed the stream and consume the stream before we go into the tests,
# which can feed the stream with their own events.
stream.feed([
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid'}}},
])
stream.close()

# We use our own fixtures -- to not collide with the tests' fixtures.
processor = CoroutineMock()
settings = OperatorSettings()
settings.batching.batch_window = 0
settings.batching.idle_timeout = 1
settings.batching.exit_timeout = 1

with timer:
await watcher(
namespace=None,
resource=resource,
settings=settings,
processor=processor,
)

# Ensure that everything worked as expected, i.e. the worker is not mocked,
# and the whole code is actually executed down to the processor callback.
assert processor.awaited, "The processor is not called for code overhead measurement."
aresponses._responses[:] = []

# Uncomment for debugging of the actual timing: visible only with -s pytest option.
# print(f"The estimated code overhead is {timer.seconds:.3f} seconds (unadjusted).")

return timer.seconds * 1.33
21 changes: 10 additions & 11 deletions tests/reactor/test_queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

from kopf.reactor.queueing import watcher, EOS

# An overhead for the sync logic in async tests. Guesstimated empirically:
# 10ms is too fast, 200ms is too slow, 50-150ms is good enough (can vary).
CODE_OVERHEAD = 0.130


@pytest.mark.parametrize('uids, cnts, events', [
Expand All @@ -46,7 +42,8 @@
])
@pytest.mark.usefixtures('watcher_limited')
async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor,
settings, stream, events, uids, cnts):
settings, stream, events, uids, cnts,
watcher_code_overhead):
""" Verify that every unique uid goes into its own queue+worker, which are never shared. """

# Inject the events of unique objects - to produce few streams/workers.
Expand All @@ -63,7 +60,7 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor
)

# The streams are not cleared by the mocked worker, but the worker exits fast.
assert timer.seconds < CODE_OVERHEAD
assert timer.seconds < watcher_code_overhead

# The processor must not be called by the watcher, only by the worker.
# But the worker (even if mocked) must be called & awaited by the watcher.
Expand Down Expand Up @@ -116,7 +113,8 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor
])
@pytest.mark.usefixtures('watcher_limited')
async def test_watchevent_batching(settings, resource, processor, timer,
stream, events, uids, vals):
stream, events, uids, vals,
watcher_code_overhead):
""" Verify that only the last event per uid is actually handled. """

# Override the default timeouts to make the tests faster.
Expand All @@ -138,8 +136,8 @@ async def test_watchevent_batching(settings, resource, processor, timer,
)

# Significantly less than the queue getting timeout, but sufficient to run.
# 2 <= 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough.
assert timer.seconds < settings.batching.batch_window + CODE_OVERHEAD
# 2x: 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough.
assert timer.seconds < settings.batching.batch_window + watcher_code_overhead

# Was the processor called at all? Awaited as needed for async fns?
assert processor.awaited
Expand Down Expand Up @@ -171,7 +169,8 @@ async def test_watchevent_batching(settings, resource, processor, timer,
])
@pytest.mark.usefixtures('watcher_in_background')
async def test_garbage_collection_of_streams(settings, stream, events, unique, worker_spy):
async def test_garbage_collection_of_streams(settings, stream, events, unique, worker_spy,
watcher_code_overhead):

# Override the default timeouts to make the tests faster.
settings.batching.idle_timeout = 0.5
Expand Down Expand Up @@ -201,7 +200,7 @@ async def test_garbage_collection_of_streams(settings, stream, events, unique, w
# Once the idle timeout, they will exit and gc their individual streams.
await asyncio.sleep(settings.batching.batch_window) # depleting the queues.
await asyncio.sleep(settings.batching.idle_timeout) # idling on empty queues.
await asyncio.sleep(CODE_OVERHEAD)
await asyncio.sleep(watcher_code_overhead)

# The mutable(!) streams dict is now empty, i.e. garbage-collected.
assert len(streams) == 0
Expand Down

0 comments on commit bbffbcf

Please sign in to comment.