From 02d85d4ad7d628e58de47bee600a4b3ca5588cfd Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Thu, 5 Dec 2024 12:42:00 -0600 Subject: [PATCH 1/7] DM-47764: App metrics unit testing helpers Add a `MockEventManager` that creates `MockPublishers` which are no-op publishers that record all published payloads. This is returned from the config factory function when `enabled` is `False` and a new config key, `mock`, is `True`. Payloads are recorded to a list with mixed-in assertion helpers. --- ...anfuchs_metrics_events_unittest_support.md | 3 + docs/documenteer.toml | 1 + docs/user-guide/metrics/index.rst | 176 +++++++++++++ safir/src/safir/metrics/__init__.py | 24 ++ safir/src/safir/metrics/_config.py | 58 ++++- safir/src/safir/metrics/_event_manager.py | 87 +++++++ safir/src/safir/metrics/_testing.py | 235 ++++++++++++++++++ safir/tests/metrics/config_test.py | 13 + safir/tests/metrics/mock_publisher_test.py | 228 +++++++++++++++++ 9 files changed, 821 insertions(+), 4 deletions(-) create mode 100644 changelog.d/20241205_130111_danfuchs_metrics_events_unittest_support.md create mode 100644 safir/src/safir/metrics/_testing.py create mode 100644 safir/tests/metrics/mock_publisher_test.py diff --git a/changelog.d/20241205_130111_danfuchs_metrics_events_unittest_support.md b/changelog.d/20241205_130111_danfuchs_metrics_events_unittest_support.md new file mode 100644 index 00000000..fb566d1d --- /dev/null +++ b/changelog.d/20241205_130111_danfuchs_metrics_events_unittest_support.md @@ -0,0 +1,3 @@ +### New features + +- Unit testing helpers for application metrics diff --git a/docs/documenteer.toml b/docs/documenteer.toml index 001e6508..e2592b58 100644 --- a/docs/documenteer.toml +++ b/docs/documenteer.toml @@ -53,6 +53,7 @@ nitpick_ignore = [ ["py:obj", "safir.database._pagination.E"], ["py:obj", "safir.redis._storage.S"], ["py:obj", "safir.metrics._event_manager.P"], + ["py:obj", "safir.metrics._testing.P"], # SQLAlchemy DeclarativeBase documentation has references that Sphinx # can't resolve properly. ["py:class", "sqlalchemy.inspection.Inspectable"], diff --git a/docs/user-guide/metrics/index.rst b/docs/user-guide/metrics/index.rst index bacbd320..975347b7 100644 --- a/docs/user-guide/metrics/index.rst +++ b/docs/user-guide/metrics/index.rst @@ -58,6 +58,7 @@ The Kafka and schema manager values come from the Sasquatch configuration that y .. code-block:: shell METRICS_APPLICATION=myapp + METRICS_EVENTS_TOPIC_PREFIX=what.ever KAFKA_SECURITY_PROTOCOL=SSL KAFKA_BOOSTRAP_SERVERS=sasquatch.kafka-1:9092,sasquatcy.kafka2-9092 KAFKA_CLUSTER_CA_PATH=/some/path/ca.crt @@ -232,6 +233,181 @@ But the principle remains the same: .. _RequestContext: https://sqr-072.lsst.io/#request-context .. _Service: https://sqr-072.lsst.io/#services + +Unit testing +============ + +Setting ``enabled`` to ``False`` and ``mock`` to ``True`` in your metrics configuration will give you a `safir.metrics.MockEventManager`. +This is a no-op event manager that produces publishers that record all of the events that they publish. +You can make assertions about these published events in your unit tests. + +.. warning:: + + Do not use the `safir.metrics.MockEventManager` in any deployed instance of your application. + Recorded events are never cleaned up, and memory usage will grow unbounded. + +.. code-block:: shell + + METRICS_APPLICATION=myapp + METRICS_ENABLED=false + METRICS_MOCK=true + METRICS_EVENTS_TOPIC_PREFIX=what.ever + +.. code-block:: python + + from safir.metrics import metrics_configuration_factory + + + config = metrics_configuration_factory() + manager = config.make_manager() + + + class SomeEvent(EventPayload): + model_config = ConfigDict(ser_json_timedelta="float") + + foo: str + count: int + duration: float | None + + + await manager.initialize() + pub = await manager.create_publisher("someevent", SomeEvent) + + await pub.publish(SomeEvent(foo="foo1", count=1, duration=1.234)) + await pub.publish(SomeEvent(foo="foo2", count=2, duration=2.345)) + await pub.publish(SomeEvent(foo="foo3", count=3, duration=3.456)) + await pub.publish(SomeEvent(foo="foo4", count=4, duration=None)) + await pub.publish(SomeEvent(foo="foo5", count=5, duration=5.678)) + + await manager.aclose() + + published = pub.published + +A mock publisher has an `safir.metrics.MockEventPublisher.published` attribute which is a `safir.metrics.PublishedList` containing of all of the `safir.metrics.EventPayload`'s published by that publisher. +A `safir.metrics.PublishedList` is a regular Python list with some mixed-in assertion methods. +All of these assertion methods take a list of dicts and compare them to the ``model_dump(mode="json")`` serialization of the published ``EventPayloads``. + +``assert_published`` +-------------------- + +Use `safir.metrics.PublishedList.assert_published` to assert that some set of payloads is an ordered subset of all of the payloads that were published, with no events in between. +If not, an exception (a subclass of `AssertionError`) will be raised. +Other events could have been published before or after the expected payloads. + +.. code-block:: python + + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + ] + ) + +You can also assert that the all of the expected payloads were published in any order, and possibly with events in between: + +.. code-block:: python + + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + ], + any_order=True, + ) + +``assert_published_all`` +------------------------ + +Use `safir.metrics.PublishedList.assert_published_all` to assert that the expected payloads, and only the expected payloads, were published: + +.. code-block:: python + + pub.assert_published_all( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo4", "count": 4, "duration": None}, + {"foo": "foo5", "count": 5, "duration": 5.678}, + ], + ) + +This would raise an exception because it is missing the ``foo5`` event: + +.. code-block:: python + + pub.assert_published_all( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo4", "count": 4, "duration": None}, + ], + ) + +You can use ``any_order`` here too: + +.. code-block:: python + + pub.assert_published_all( + [ + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo5", "count": 5, "duration": 5.678}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo4", "count": 4, "duration": None}, + ], + any_order=True, + ) + +``ANY`` and ``NOT_NONE`` +------------------------ + +You can use `safir.metrics.ANY` to indicate that any value, event `None` is OK. +This is just a re-export of `unittest.mock.ANY`. + +.. code-block:: python + + from safir.metrics import ANY + + + pub.assert_published_all( + [ + {"foo": "foo3", "count": 3, "duration": ANY}, + {"foo": "foo4", "count": 4, "duration": ANY}, + ], + ) + +You can use `safir.metrics.NOT_NONE` to indicate that any value except `None` is OK: + +.. code-block:: python + + from safir.metrics import ANY + + + pub.assert_published_all( + [ + {"foo": "foo3", "count": 3, "duration": NOT_NONE}, + {"foo": "foo4", "count": 4, "duration": ANY}, + ], + ) + +This would raise an exception, because ``duration`` for the ``foo4`` payload is `None`: + +.. code-block:: python + + from safir.metrics import ANY + + + pub.assert_published_all( + [ + {"foo": "foo3", "count": 3, "duration": NOT_NONE}, + {"foo": "foo4", "count": 4, "duration": NOT_NONE}, + ], + ) + .. _configuration-details: Configuration details diff --git a/safir/src/safir/metrics/__init__.py b/safir/src/safir/metrics/__init__.py index fe690b15..6e73555e 100644 --- a/safir/src/safir/metrics/__init__.py +++ b/safir/src/safir/metrics/__init__.py @@ -4,6 +4,7 @@ EventsConfiguration, KafkaMetricsConfiguration, MetricsConfiguration, + MockMetricsConfiguration, metrics_configuration_factory, ) from ._event_manager import ( @@ -11,6 +12,8 @@ EventPublisher, KafkaEventManager, KafkaEventPublisher, + MockEventManager, + MockEventPublisher, NoopEventManager, NoopEventPublisher, ) @@ -20,8 +23,21 @@ KafkaTopicError, ) from ._models import EventMetadata, EventPayload +from ._testing import ( + ANY, + NOT_NONE, + BaseAssertionError, + NotPublishedConsecutivelyError, + NotPublishedError, + PublishedCountError, + PublishedList, + PublishedTooFewError, +) __all__ = [ + "ANY", + "NOT_NONE", + "BaseAssertionError", "BaseMetricsConfiguration", "DisabledMetricsConfiguration", "DuplicateEventError", @@ -37,7 +53,15 @@ "KafkaTopicError", "KafkaTopicError", "MetricsConfiguration", + "MockEventManager", + "MockEventPublisher", + "MockMetricsConfiguration", "NoopEventManager", "NoopEventPublisher", + "NotPublishedConsecutivelyError", + "NotPublishedError", + "PublishedCountError", + "PublishedList", + "PublishedTooFewError", "metrics_configuration_factory", ] diff --git a/safir/src/safir/metrics/_config.py b/safir/src/safir/metrics/_config.py index 2d7b8dbb..096bdf3e 100644 --- a/safir/src/safir/metrics/_config.py +++ b/safir/src/safir/metrics/_config.py @@ -15,7 +15,12 @@ from ..kafka import KafkaConnectionSettings, SchemaManagerSettings from ._constants import ADMIN_CLIENT_PREFIX, BROKER_PREFIX -from ._event_manager import EventManager, KafkaEventManager, NoopEventManager +from ._event_manager import ( + EventManager, + KafkaEventManager, + MockEventManager, + NoopEventManager, +) __all__ = [ "BaseMetricsConfiguration", @@ -23,6 +28,7 @@ "EventsConfiguration", "KafkaMetricsConfiguration", "MetricsConfiguration", + "MockMetricsConfiguration", "metrics_configuration_factory", ] @@ -131,6 +137,45 @@ def make_manager( ) +class MockMetricsConfiguration(BaseMetricsConfiguration): + """Metrics configuration when metrics publishing is mocked.""" + + enabled: Annotated[ + bool, AfterValidator(lambda x: _require_bool(x, False)) + ] = Field( + ..., + title="Whether to send events", + description=( + "If set to false, no events will be sent and all calls to publish" + " events will be no-ops." + ), + validation_alias=AliasChoices("enabled", "METRICS_ENABLED"), + ) + + mock: Annotated[bool, AfterValidator(lambda x: _require_bool(x, True))] = ( + Field( + title="Mock publishers", + description=( + "If set to true, all event publishers will be" + " unittest.mock.MagicMock instances which will record all" + " calls to their publish methods." + ), + validation_alias=AliasChoices("mock", "METRICS_MOCK"), + ) + ) + + model_config = SettingsConfigDict(extra="ignore", populate_by_name=True) + + def make_manager( + self, logger: BoundLogger | None = None + ) -> MockEventManager: + if not logger: + logger = structlog.get_logger("safir.metrics") + return MockEventManager( + self.application, self.events.topic_prefix, logger + ) + + class KafkaMetricsConfiguration(BaseMetricsConfiguration): """Metrics configuration when enabled, including Kafka configuration.""" @@ -204,7 +249,9 @@ def make_manager( MetricsConfiguration: TypeAlias = ( - DisabledMetricsConfiguration | KafkaMetricsConfiguration + MockMetricsConfiguration + | DisabledMetricsConfiguration + | KafkaMetricsConfiguration ) """Type to use for metrics configuration in the application config. @@ -262,6 +309,9 @@ class Config(BaseSettings): # environment variable settings to enable, and then finally # unconditionally try to return the default. try: - return DisabledMetricsConfiguration() + return MockMetricsConfiguration() except ValidationError: - return KafkaMetricsConfiguration() + try: + return DisabledMetricsConfiguration() + except ValidationError: + return KafkaMetricsConfiguration() diff --git a/safir/src/safir/metrics/_event_manager.py b/safir/src/safir/metrics/_event_manager.py index 95242d86..f54b50b1 100644 --- a/safir/src/safir/metrics/_event_manager.py +++ b/safir/src/safir/metrics/_event_manager.py @@ -23,6 +23,7 @@ KafkaTopicError, ) from ._models import EventMetadata, EventPayload +from ._testing import PublishedList P = TypeVar("P", bound=EventPayload) """Generic event payload type.""" @@ -32,6 +33,8 @@ "EventPublisher", "KafkaEventManager", "KafkaEventPublisher", + "MockEventManager", + "MockEventPublisher", "NoopEventManager", "NoopEventPublisher", ] @@ -100,6 +103,11 @@ async def publish(self, payload: P) -> EventMetadata: accessed. """ + @property + def published(self) -> PublishedList[P]: + """A list of published event payloads with some test helpers.""" + raise NotImplementedError("Only available on a MockEventPublisher.") + @staticmethod def _ns_to_datetime(ns: int) -> datetime: """Convert an `int` number of nanoseconds to a `~datetime.datetime`.""" @@ -176,6 +184,34 @@ async def publish(self, payload: P) -> EventMetadata: return event +class MockEventPublisher(NoopEventPublisher, Generic[P]): + """Event publisher that quietly does nothing and records all payloads. + + This is meant to be used in unit tests to enable assertions on published + payloads. It should NOT be used in any deployed application instances + because memory usage will grow unbounded with each published event. + """ + + def __init__( + self, + application: str, + event_class: type[AvroBaseModel], + logger: BoundLogger, + ) -> None: + super().__init__(application, event_class, logger) + self._published: PublishedList[P] = PublishedList() + + async def publish(self, payload: P) -> EventMetadata: + event = await super().publish(payload) + self._published.append(payload) + return event + + @property + def published(self) -> PublishedList[P]: + """A list of published event payloads with some test helpers.""" + return self._published + + class EventManager(metaclass=ABCMeta): """Interface for a client for publishing application metrics events. @@ -556,3 +592,54 @@ async def build_publisher_for_model( An appropriate event publisher implementation instance. """ return NoopEventPublisher[P](self._application, model, self.logger) + + +class MockEventManager(EventManager): + """An event manager that creates mock publishers that record all publishes. + + This is used as the implementation of `~safir.metrics.EventManager` when + event publication is disabled and mocking is enabled. Like a + `~safir.metrics.NoopEventManager`, the event type registrations are still + verified to catch errors, but any calls to + `~safir.metrics.MockEventPublisher` are recorded for later assertion. + + This is for use only in unit testing. Don't use it in any deployed + environment because memory usage will grow unbounded. + + + Parameters + ---------- + application + Name of the application that is generating events. + topic_prefix + Kafka topic prefix for the metrics events topic for this application. + logger + Logger to use for internal logging. + """ + + def __init__( + self, + application: str, + topic_prefix: str, + logger: BoundLogger | None = None, + ) -> None: + super().__init__(f"{topic_prefix}.{application}", logger) + self._application = application + + async def build_publisher_for_model( + self, model: type[P] + ) -> EventPublisher[P]: + """Build a no-op recording publisher for a specific enriched model. + + Parameters + ---------- + model + Enriched and configured model representing the event that will be + published. + + Returns + ------- + EventPublisher + An appropriate event publisher implementation instance. + """ + return MockEventPublisher[P](self._application, model, self.logger) diff --git a/safir/src/safir/metrics/_testing.py b/safir/src/safir/metrics/_testing.py new file mode 100644 index 00000000..3d172ae8 --- /dev/null +++ b/safir/src/safir/metrics/_testing.py @@ -0,0 +1,235 @@ +"""Helpers working with metrics events in unit tests.""" + +from abc import ABC, abstractmethod +from pprint import pformat +from typing import Any, Generic, TypeVar +from unittest.mock import ANY as MOCK_ANY + +from pydantic import BaseModel + +__all__ = [ + "ANY", + "NOT_NONE", + "BaseAssertionError", + "NotPublishedConsecutivelyError", + "NotPublishedError", + "PublishedCountError", + "PublishedList", + "PublishedTooFewError", +] + +P = TypeVar("P", bound=BaseModel) +"""Generic event payload type.""" + + +ANY = MOCK_ANY +"""An object that compares equal to anything, reexported from unittest.mock.""" + + +class _NotNone: + """A helper object that compares equal to everything except None.""" + + def __eq__(self, other: object) -> bool: + return other is not None + + def __repr__(self) -> str: + return "" + + +NOT_NONE = _NotNone() +"""An object to indicate that a value can be anything except None.""" + + +ModelDumpList = list[dict[str, Any]] +"""Type alias for a list of dumped Pydantic models.""" + + +class BaseAssertionError(Generic[P], ABC, AssertionError): + """Base assertion error with common attributes and messaging.""" + + def __init__( + self, + expected: ModelDumpList, + actual: ModelDumpList, + actual_models: list[P], + ) -> None: + self.expected = expected + self.actual = actual + self.actual_models = actual_models + message = ( + f"{self.errormsg()}\n" + f"Expected:\n" + f"{pformat(self.expected)}\n" + f"Actual:\n" + f"{pformat(self.actual)}\n" + f"Actual models: {pformat(self.actual_models)}" + ) + super().__init__(message) + + @abstractmethod + def errormsg(self) -> str: + """Return a string to be added to the exception message.""" + + +class PublishedTooFewError(BaseAssertionError): + """Expected more events than have actually been published.""" + + def errormsg(self) -> str: + return "Expected more events than have actually been published" + + +class PublishedCountError(BaseAssertionError): + """Expected has a different number of items than were published.""" + + def errormsg(self) -> str: + return "Expected has a different number of items than were published" + + +class NotPublishedConsecutivelyError(BaseAssertionError): + """Expected events were not published consecutively.""" + + def errormsg(self) -> str: + return "Expected events were not published consecutively" + + +class NotPublishedError(BaseAssertionError): + """Some expected items were not published.""" + + def __init__( + self, + expected: ModelDumpList, + actual: ModelDumpList, + actual_models: list[P], + not_found: ModelDumpList, + ) -> None: + self.not_found = not_found + super().__init__(expected, actual, actual_models) + + def errormsg(self) -> str: + return ( + f"Some expected items not published\n" + f"Not published:" + f"{pformat(self.not_found)}" + ) + + +class PublishedList(Generic[P], list[P]): + """A list of event payload models with assertion helpers. + + All assertion helpers take lists of dicts as expected items and use the + ``model_dump(mode="json")`` serialization of the models for comparison. + + """ + + def assert_published( + self, + expected: ModelDumpList, + *, + any_order: bool = False, + ) -> None: + """Assert that all of the expected payloads were published. + + + Parameters + ---------- + expected + A list of expected event payload dicts, to be compared with the + ``model_dump(format="json")`` serialization of the actual published + payloads. + any_order + If true, then the expected payload list must be an ordered subset + of the actual published payloads. This is like the ``any_order`` + parameter in `unittest.mock.Mock` methods. + + Raises + ------ + PublishedTooFewError + Expected more events than have actually been published + NotPublishedError + Some expected items were not published + NotPublishedConsecutivelyError + Expected events were not published consecutively + """ + if len(expected) > len(self): + actual = [model.model_dump(mode="json") for model in self] + raise PublishedTooFewError(expected, actual, self) + + if any_order: + self._unordered(expected) + else: + # Try unordered first to differentiate between the cases where some + # expected events were not published at all, vs all of the expected + # events were published, just not in the expected order. + self._unordered(expected) + self._ordered(expected) + + def assert_published_all( + self, + expected: ModelDumpList, + *, + any_order: bool = False, + ) -> None: + """Assert that all of the expected payloads were published, and only + the expected payloads, were published. + + + Parameters + ---------- + expected + A list of expected event payload dicts, to be compared with the + ``model_dump(format="json")`` serialization of the actual published + payloads. + any_order + If true, then the expected payloads must be in the same order as + the actual published payloads. This is like the ``any_order`` + parameter in `unittest.mock.Mock` methods. + + Raises + ------ + NotPublishedError + Some expected items were not published + NotPublishedConsecutivelyError + Expected events were not published consecutively + PublishedCountError + A different number of events were published than were expected + """ + if len(expected) != len(self): + actual = [model.model_dump(mode="json") for model in self] + raise PublishedCountError(expected, actual, self) + self.assert_published(expected, any_order=any_order) + + def _unordered( + self, + expected: ModelDumpList, + ) -> None: + """Assert that each expected event was published. + + Each published event is matched only once. Taken from the + ``unittest.mock`` ``assert_has_calls`` method. + + """ + actual = [model.model_dump(mode="json") for model in self] + not_found = [] + tmp_actual = actual.copy() + for item in expected: + try: + tmp_actual.remove(item) + except ValueError: + not_found.append(item) + if not_found: + raise NotPublishedError(expected, actual, self, not_found) + + def _ordered(self, expected: ModelDumpList) -> None: + """Assert that the list of expected events is an ordered subset of the + list of published events. + + Taken from the ``unittest.mock`` ``assert_has_calls`` method. + """ + len_expected = len(expected) + len_self = len(self) + actual = [model.model_dump(mode="json") for model in self] + for i in range(len_self - len_expected + 1): + sub_list = actual[i : i + len_expected] + if sub_list == expected: + return + raise NotPublishedConsecutivelyError(expected, actual, self) diff --git a/safir/tests/metrics/config_test.py b/safir/tests/metrics/config_test.py index 77f6ccd5..295aebef 100644 --- a/safir/tests/metrics/config_test.py +++ b/safir/tests/metrics/config_test.py @@ -11,6 +11,8 @@ KafkaEventManager, KafkaMetricsConfiguration, MetricsConfiguration, + MockEventManager, + MockMetricsConfiguration, NoopEventManager, metrics_configuration_factory, ) @@ -49,6 +51,17 @@ def test_disabled_extra() -> None: assert isinstance(config.metrics, DisabledMetricsConfiguration) +def test_mock(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("METRICS_APPLICATION", "test") + monkeypatch.setenv("METRICS_ENABLED", "false") + monkeypatch.setenv("METRICS_MOCK", "true") + + config = Config() + assert isinstance(config.metrics, MockMetricsConfiguration) + manager = config.metrics.make_manager() + assert isinstance(manager, MockEventManager) + + @pytest.mark.asyncio async def test_kafka(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("METRICS_APPLICATION", "test") diff --git a/safir/tests/metrics/mock_publisher_test.py b/safir/tests/metrics/mock_publisher_test.py new file mode 100644 index 00000000..6acb92b9 --- /dev/null +++ b/safir/tests/metrics/mock_publisher_test.py @@ -0,0 +1,228 @@ +"""Tests for mock publishers.""" + +import pytest +from pydantic import ConfigDict + +from safir.metrics import ( + ANY, + NOT_NONE, + EventPayload, + EventsConfiguration, + MockMetricsConfiguration, + NotPublishedConsecutivelyError, + NotPublishedError, + PublishedCountError, + PublishedList, +) + + +async def publish() -> PublishedList: + """Mock publish some events and return the PublishedList.""" + config = MockMetricsConfiguration( + enabled=False, + mock=True, + application="testapp", + events=EventsConfiguration(topic_prefix="what.ever"), + ) + manager = config.make_manager() + + class SomeEvent(EventPayload): + model_config = ConfigDict(ser_json_timedelta="float") + + foo: str + count: int + duration: float | None + + await manager.initialize() + pub = await manager.create_publisher("someevent", SomeEvent) + + await pub.publish(SomeEvent(foo="foo1", count=1, duration=1.234)) + await pub.publish(SomeEvent(foo="foo2", count=2, duration=2.345)) + await pub.publish(SomeEvent(foo="foo3", count=3, duration=3.456)) + await pub.publish(SomeEvent(foo="foo4", count=4, duration=None)) + await pub.publish(SomeEvent(foo="foo5", count=5, duration=5.678)) + + await manager.aclose() + + return pub.published + + +@pytest.mark.asyncio +async def test_assert_one() -> None: + pub = await publish() + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + ] + ) + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + ], + any_order=False, + ) + + with pytest.raises(NotPublishedError): + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1}, + ] + ) + + +@pytest.mark.asyncio +async def test_assert_unordered() -> None: + pub = await publish() + pub.assert_published( + [ + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + ], + any_order=True, + ) + + with pytest.raises(NotPublishedConsecutivelyError): + pub.assert_published( + [ + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + ], + ) + + +@pytest.mark.asyncio +async def test_only_counted_once() -> None: + pub = await publish() + + with pytest.raises(NotPublishedError): + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo1", "count": 1, "duration": 1.234}, + ], + any_order=True, + ) + + +@pytest.mark.asyncio +async def test_assert_ordered() -> None: + pub = await publish() + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + ] + ) + + with pytest.raises(NotPublishedConsecutivelyError): + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + ], + ) + + +@pytest.mark.asyncio +async def test_assert_extra() -> None: + pub = await publish() + with pytest.raises(NotPublishedError): + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "nope", "count": 3, "duration": 3.456}, + ] + ) + with pytest.raises(NotPublishedError): + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "nope", "count": 3, "duration": 3.456}, + ], + any_order=False, + ) + + +@pytest.mark.asyncio +async def test_assert_all() -> None: + pub = await publish() + pub.assert_published_all( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo4", "count": 4, "duration": None}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo5", "count": 5, "duration": 5.678}, + ], + any_order=True, + ) + + pub.assert_published_all( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo4", "count": 4, "duration": None}, + {"foo": "foo5", "count": 5, "duration": 5.678}, + ], + ) + + with pytest.raises(PublishedCountError): + pub.assert_published_all( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo4", "count": 4, "duration": None}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + ] + ) + + with pytest.raises(NotPublishedConsecutivelyError): + pub.assert_published_all( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": 2.345}, + {"foo": "foo4", "count": 4, "duration": None}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo5", "count": 5, "duration": 5.678}, + ], + ) + + +@pytest.mark.asyncio +async def test_any_and_not_none() -> None: + pub = await publish() + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": ANY}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo4", "count": 4, "duration": ANY}, + ] + ) + + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": NOT_NONE}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo4", "count": 4, "duration": ANY}, + ] + ) + + with pytest.raises(NotPublishedError): + pub.assert_published( + [ + {"foo": "foo1", "count": 1, "duration": 1.234}, + {"foo": "foo2", "count": 2, "duration": NOT_NONE}, + {"foo": "foo3", "count": 3, "duration": 3.456}, + {"foo": "foo4", "count": 4, "duration": NOT_NONE}, + ] + ) From 5962fb800e039a16dee4a27a0ded77b222b44393 Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Thu, 5 Dec 2024 13:28:00 -0600 Subject: [PATCH 2/7] Fix metrics unit test docs example --- docs/user-guide/metrics/index.rst | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/user-guide/metrics/index.rst b/docs/user-guide/metrics/index.rst index 975347b7..3306bb79 100644 --- a/docs/user-guide/metrics/index.rst +++ b/docs/user-guide/metrics/index.rst @@ -271,17 +271,17 @@ You can make assertions about these published events in your unit tests. await manager.initialize() - pub = await manager.create_publisher("someevent", SomeEvent) + publisher = await manager.create_publisher("someevent", SomeEvent) - await pub.publish(SomeEvent(foo="foo1", count=1, duration=1.234)) - await pub.publish(SomeEvent(foo="foo2", count=2, duration=2.345)) - await pub.publish(SomeEvent(foo="foo3", count=3, duration=3.456)) - await pub.publish(SomeEvent(foo="foo4", count=4, duration=None)) - await pub.publish(SomeEvent(foo="foo5", count=5, duration=5.678)) + await publisher.publish(SomeEvent(foo="foo1", count=1, duration=1.234)) + await publisher.publish(SomeEvent(foo="foo2", count=2, duration=2.345)) + await publisher.publish(SomeEvent(foo="foo3", count=3, duration=3.456)) + await publisher.publish(SomeEvent(foo="foo4", count=4, duration=None)) + await publisher.publish(SomeEvent(foo="foo5", count=5, duration=5.678)) await manager.aclose() - published = pub.published + pub = pub.published A mock publisher has an `safir.metrics.MockEventPublisher.published` attribute which is a `safir.metrics.PublishedList` containing of all of the `safir.metrics.EventPayload`'s published by that publisher. A `safir.metrics.PublishedList` is a regular Python list with some mixed-in assertion methods. From 5e1c4f5b6798f99a8a66ed70afea487c885fc609 Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Sat, 7 Dec 2024 14:19:24 -0600 Subject: [PATCH 3/7] Review feedback --- docs/user-guide/metrics/index.rst | 10 ++++- safir/src/safir/metrics/_event_manager.py | 8 +--- safir/src/safir/metrics/_testing.py | 43 +++++++++++----------- safir/tests/metrics/mock_publisher_test.py | 5 ++- 4 files changed, 35 insertions(+), 31 deletions(-) diff --git a/docs/user-guide/metrics/index.rst b/docs/user-guide/metrics/index.rst index 3306bb79..e853fc2f 100644 --- a/docs/user-guide/metrics/index.rst +++ b/docs/user-guide/metrics/index.rst @@ -255,7 +255,13 @@ You can make assertions about these published events in your unit tests. .. code-block:: python - from safir.metrics import metrics_configuration_factory + from pydantic import ConfigDict + + from safir.metrics import ( + EventPayload, + MockEventPublisher, + metrics_configuration_factory, + ) config = metrics_configuration_factory() @@ -281,7 +287,7 @@ You can make assertions about these published events in your unit tests. await manager.aclose() - pub = pub.published + pub = cast(MockEventPublisher, pub).published A mock publisher has an `safir.metrics.MockEventPublisher.published` attribute which is a `safir.metrics.PublishedList` containing of all of the `safir.metrics.EventPayload`'s published by that publisher. A `safir.metrics.PublishedList` is a regular Python list with some mixed-in assertion methods. diff --git a/safir/src/safir/metrics/_event_manager.py b/safir/src/safir/metrics/_event_manager.py index f54b50b1..04d67be8 100644 --- a/safir/src/safir/metrics/_event_manager.py +++ b/safir/src/safir/metrics/_event_manager.py @@ -103,11 +103,6 @@ async def publish(self, payload: P) -> EventMetadata: accessed. """ - @property - def published(self) -> PublishedList[P]: - """A list of published event payloads with some test helpers.""" - raise NotImplementedError("Only available on a MockEventPublisher.") - @staticmethod def _ns_to_datetime(ns: int) -> datetime: """Convert an `int` number of nanoseconds to a `~datetime.datetime`.""" @@ -606,7 +601,6 @@ class MockEventManager(EventManager): This is for use only in unit testing. Don't use it in any deployed environment because memory usage will grow unbounded. - Parameters ---------- application @@ -642,4 +636,4 @@ async def build_publisher_for_model( EventPublisher An appropriate event publisher implementation instance. """ - return MockEventPublisher[P](self._application, model, self.logger) + return MockEventPublisher(self._application, model, self.logger) diff --git a/safir/src/safir/metrics/_testing.py b/safir/src/safir/metrics/_testing.py index 3d172ae8..df573cdd 100644 --- a/safir/src/safir/metrics/_testing.py +++ b/safir/src/safir/metrics/_testing.py @@ -2,10 +2,10 @@ from abc import ABC, abstractmethod from pprint import pformat -from typing import Any, Generic, TypeVar +from typing import Any, Generic, TypeAlias, TypeVar from unittest.mock import ANY as MOCK_ANY -from pydantic import BaseModel +from ._models import EventPayload __all__ = [ "ANY", @@ -18,13 +18,15 @@ "PublishedTooFewError", ] -P = TypeVar("P", bound=BaseModel) +P = TypeVar("P", bound=EventPayload) """Generic event payload type.""" - ANY = MOCK_ANY """An object that compares equal to anything, reexported from unittest.mock.""" +ModelDumpList: TypeAlias = list[dict[str, Any]] +"""Type alias for a list of dumped Pydantic models.""" + class _NotNone: """A helper object that compares equal to everything except None.""" @@ -40,10 +42,6 @@ def __repr__(self) -> str: """An object to indicate that a value can be anything except None.""" -ModelDumpList = list[dict[str, Any]] -"""Type alias for a list of dumped Pydantic models.""" - - class BaseAssertionError(Generic[P], ABC, AssertionError): """Base assertion error with common attributes and messaging.""" @@ -97,6 +95,7 @@ class NotPublishedError(BaseAssertionError): def __init__( self, + *, expected: ModelDumpList, actual: ModelDumpList, actual_models: list[P], @@ -113,12 +112,11 @@ def errormsg(self) -> str: ) -class PublishedList(Generic[P], list[P]): +class PublishedList(list[P]): """A list of event payload models with assertion helpers. All assertion helpers take lists of dicts as expected items and use the ``model_dump(mode="json")`` serialization of the models for comparison. - """ def assert_published( @@ -155,13 +153,13 @@ def assert_published( raise PublishedTooFewError(expected, actual, self) if any_order: - self._unordered(expected) + self._check_unordered(expected) else: # Try unordered first to differentiate between the cases where some # expected events were not published at all, vs all of the expected # events were published, just not in the expected order. - self._unordered(expected) - self._ordered(expected) + self._check_unordered(expected) + self._check_ordered(expected) def assert_published_all( self, @@ -169,9 +167,8 @@ def assert_published_all( *, any_order: bool = False, ) -> None: - """Assert that all of the expected payloads were published, and only - the expected payloads, were published. - + """Assert that all of the expected payloads, and only the expected + payloads, were published. Parameters ---------- @@ -198,7 +195,7 @@ def assert_published_all( raise PublishedCountError(expected, actual, self) self.assert_published(expected, any_order=any_order) - def _unordered( + def _check_unordered( self, expected: ModelDumpList, ) -> None: @@ -206,7 +203,6 @@ def _unordered( Each published event is matched only once. Taken from the ``unittest.mock`` ``assert_has_calls`` method. - """ actual = [model.model_dump(mode="json") for model in self] not_found = [] @@ -217,9 +213,14 @@ def _unordered( except ValueError: not_found.append(item) if not_found: - raise NotPublishedError(expected, actual, self, not_found) - - def _ordered(self, expected: ModelDumpList) -> None: + raise NotPublishedError( + expected=expected, + actual=actual, + actual_models=self, + not_found=not_found, + ) + + def _check_ordered(self, expected: ModelDumpList) -> None: """Assert that the list of expected events is an ordered subset of the list of published events. diff --git a/safir/tests/metrics/mock_publisher_test.py b/safir/tests/metrics/mock_publisher_test.py index 6acb92b9..6e41ba69 100644 --- a/safir/tests/metrics/mock_publisher_test.py +++ b/safir/tests/metrics/mock_publisher_test.py @@ -1,5 +1,7 @@ """Tests for mock publishers.""" +from typing import cast + import pytest from pydantic import ConfigDict @@ -8,6 +10,7 @@ NOT_NONE, EventPayload, EventsConfiguration, + MockEventPublisher, MockMetricsConfiguration, NotPublishedConsecutivelyError, NotPublishedError, @@ -44,7 +47,7 @@ class SomeEvent(EventPayload): await manager.aclose() - return pub.published + return cast(MockEventPublisher, pub).published @pytest.mark.asyncio From 06ce821c1bc00d5f7673d142e81572e222a2b702 Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Mon, 9 Dec 2024 09:24:36 -0600 Subject: [PATCH 4/7] Consistent explicit references to enriched event type --- safir/src/safir/metrics/_event_manager.py | 28 +++++++++++++++++------ safir/tests/metrics/event_manager_test.py | 4 +--- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/safir/src/safir/metrics/_event_manager.py b/safir/src/safir/metrics/_event_manager.py index 04d67be8..2e3b85b1 100644 --- a/safir/src/safir/metrics/_event_manager.py +++ b/safir/src/safir/metrics/_event_manager.py @@ -5,7 +5,7 @@ import time from abc import ABCMeta, abstractmethod from datetime import UTC, datetime -from typing import Generic, TypeVar, cast +from typing import Generic, TypeAlias, TypeVar, cast from uuid import uuid4 import structlog @@ -28,6 +28,18 @@ P = TypeVar("P", bound=EventPayload) """Generic event payload type.""" +EnrichedEvent: TypeAlias = P | EventMetadata +"""Alias to refer to the dynamically created payload+metadata event type. + +Since this type is dynamically created, there is no way to name it or make it a +TypeVar. We could declare it as ``P`` everywhere we need to declare it, but +that makes it difficult to see when we are using the payload vs the +payload+metadata types. + +This lets us differentiate it from a payload class in the code, and it can be +narrowed further if we have to access either metadata or payload attributes. +""" + __all__ = [ "EventManager", "EventPublisher", @@ -54,11 +66,13 @@ class EventPublisher(Generic[P], metaclass=ABCMeta): publication. """ - def __init__(self, application: str, event_class: type[P]) -> None: + def __init__( + self, application: str, event_class: type[EnrichedEvent] + ) -> None: self._application = application self._event_class = event_class - def construct_event(self, payload: P) -> P: + def construct_event(self, payload: P) -> EnrichedEvent: """Construct the full event as it will be published. Parameters @@ -139,7 +153,7 @@ def __init__( *, application: str, manager: KafkaEventManager, - event_class: type[AvroBaseModel], + event_class: type[EnrichedEvent], publisher: AsyncAPIDefaultPublisher, schema_info: SchemaInfo, ) -> None: @@ -165,7 +179,7 @@ class NoopEventPublisher(EventPublisher, Generic[P]): def __init__( self, application: str, - event_class: type[AvroBaseModel], + event_class: type[EnrichedEvent], logger: BoundLogger, ) -> None: super().__init__(application, event_class) @@ -190,7 +204,7 @@ class MockEventPublisher(NoopEventPublisher, Generic[P]): def __init__( self, application: str, - event_class: type[AvroBaseModel], + event_class: type[EnrichedEvent], logger: BoundLogger, ) -> None: super().__init__(application, event_class, logger) @@ -486,7 +500,7 @@ async def initialize(self) -> None: async def publish( self, - event: AvroBaseModel, + event: EnrichedEvent, publisher: AsyncAPIDefaultPublisher, schema_info: SchemaInfo | None, ) -> None: diff --git a/safir/tests/metrics/event_manager_test.py b/safir/tests/metrics/event_manager_test.py index 4eb730ec..dad720ab 100644 --- a/safir/tests/metrics/event_manager_test.py +++ b/safir/tests/metrics/event_manager_test.py @@ -6,13 +6,11 @@ import math from datetime import timedelta from enum import Enum -from typing import cast from uuid import UUID import pytest from aiokafka import AIOKafkaConsumer from aiokafka.admin.client import AIOKafkaAdminClient, NewTopic -from dataclasses_avroschema.pydantic import AvroBaseModel from faststream.kafka import KafkaBroker from pydantic import Field from schema_registry.client.client import AsyncSchemaRegistryClient @@ -81,7 +79,7 @@ async def assert_from_kafka( serializer = AsyncAvroMessageSerializer(schema_registry) deserialized_dict = await serializer.decode_message(message.value) assert deserialized_dict is not None - event_class = cast(type[AvroBaseModel], event._event_class) + event_class = event._event_class deserialized = event_class(**deserialized_dict) assert isinstance(deserialized, EventMetadata) From dd2d30146341bd0abf62570acd8a718a6701ab70 Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Mon, 9 Dec 2024 10:22:43 -0600 Subject: [PATCH 5/7] No need for typevar specification --- safir/src/safir/metrics/_event_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/safir/src/safir/metrics/_event_manager.py b/safir/src/safir/metrics/_event_manager.py index 2e3b85b1..9366b3fc 100644 --- a/safir/src/safir/metrics/_event_manager.py +++ b/safir/src/safir/metrics/_event_manager.py @@ -483,7 +483,7 @@ async def build_publisher_for_model( schema_info = await self._schema_manager.register_model(model) # Return the corresponding event publisher. - return KafkaEventPublisher[P]( + return KafkaEventPublisher( application=self._application, event_class=model, publisher=async_publisher, @@ -600,7 +600,7 @@ async def build_publisher_for_model( EventPublisher An appropriate event publisher implementation instance. """ - return NoopEventPublisher[P](self._application, model, self.logger) + return NoopEventPublisher(self._application, model, self.logger) class MockEventManager(EventManager): From 44eae9aa397368165e94804a5c576b11d07004f3 Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Mon, 9 Dec 2024 21:33:55 -0600 Subject: [PATCH 6/7] Just call everything P and be done with it --- safir/src/safir/metrics/_event_manager.py | 28 ++++++----------------- safir/tests/metrics/event_manager_test.py | 4 +++- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/safir/src/safir/metrics/_event_manager.py b/safir/src/safir/metrics/_event_manager.py index 9366b3fc..44fdc2d9 100644 --- a/safir/src/safir/metrics/_event_manager.py +++ b/safir/src/safir/metrics/_event_manager.py @@ -5,7 +5,7 @@ import time from abc import ABCMeta, abstractmethod from datetime import UTC, datetime -from typing import Generic, TypeAlias, TypeVar, cast +from typing import Generic, TypeVar, cast from uuid import uuid4 import structlog @@ -28,18 +28,6 @@ P = TypeVar("P", bound=EventPayload) """Generic event payload type.""" -EnrichedEvent: TypeAlias = P | EventMetadata -"""Alias to refer to the dynamically created payload+metadata event type. - -Since this type is dynamically created, there is no way to name it or make it a -TypeVar. We could declare it as ``P`` everywhere we need to declare it, but -that makes it difficult to see when we are using the payload vs the -payload+metadata types. - -This lets us differentiate it from a payload class in the code, and it can be -narrowed further if we have to access either metadata or payload attributes. -""" - __all__ = [ "EventManager", "EventPublisher", @@ -66,13 +54,11 @@ class EventPublisher(Generic[P], metaclass=ABCMeta): publication. """ - def __init__( - self, application: str, event_class: type[EnrichedEvent] - ) -> None: + def __init__(self, application: str, event_class: type[P]) -> None: self._application = application self._event_class = event_class - def construct_event(self, payload: P) -> EnrichedEvent: + def construct_event(self, payload: P) -> P: """Construct the full event as it will be published. Parameters @@ -153,7 +139,7 @@ def __init__( *, application: str, manager: KafkaEventManager, - event_class: type[EnrichedEvent], + event_class: type[P], publisher: AsyncAPIDefaultPublisher, schema_info: SchemaInfo, ) -> None: @@ -179,7 +165,7 @@ class NoopEventPublisher(EventPublisher, Generic[P]): def __init__( self, application: str, - event_class: type[EnrichedEvent], + event_class: type[P], logger: BoundLogger, ) -> None: super().__init__(application, event_class) @@ -204,7 +190,7 @@ class MockEventPublisher(NoopEventPublisher, Generic[P]): def __init__( self, application: str, - event_class: type[EnrichedEvent], + event_class: type[P], logger: BoundLogger, ) -> None: super().__init__(application, event_class, logger) @@ -500,7 +486,7 @@ async def initialize(self) -> None: async def publish( self, - event: EnrichedEvent, + event: P, publisher: AsyncAPIDefaultPublisher, schema_info: SchemaInfo | None, ) -> None: diff --git a/safir/tests/metrics/event_manager_test.py b/safir/tests/metrics/event_manager_test.py index dad720ab..4eb730ec 100644 --- a/safir/tests/metrics/event_manager_test.py +++ b/safir/tests/metrics/event_manager_test.py @@ -6,11 +6,13 @@ import math from datetime import timedelta from enum import Enum +from typing import cast from uuid import UUID import pytest from aiokafka import AIOKafkaConsumer from aiokafka.admin.client import AIOKafkaAdminClient, NewTopic +from dataclasses_avroschema.pydantic import AvroBaseModel from faststream.kafka import KafkaBroker from pydantic import Field from schema_registry.client.client import AsyncSchemaRegistryClient @@ -79,7 +81,7 @@ async def assert_from_kafka( serializer = AsyncAvroMessageSerializer(schema_registry) deserialized_dict = await serializer.decode_message(message.value) assert deserialized_dict is not None - event_class = event._event_class + event_class = cast(type[AvroBaseModel], event._event_class) deserialized = event_class(**deserialized_dict) assert isinstance(deserialized, EventMetadata) From 9efbb16f58ae84300ce664dc22190bd8605c3a89 Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Mon, 9 Dec 2024 21:40:31 -0600 Subject: [PATCH 7/7] Put publisher typevar specification back in --- safir/src/safir/metrics/_event_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/safir/src/safir/metrics/_event_manager.py b/safir/src/safir/metrics/_event_manager.py index 44fdc2d9..9474d425 100644 --- a/safir/src/safir/metrics/_event_manager.py +++ b/safir/src/safir/metrics/_event_manager.py @@ -469,7 +469,7 @@ async def build_publisher_for_model( schema_info = await self._schema_manager.register_model(model) # Return the corresponding event publisher. - return KafkaEventPublisher( + return KafkaEventPublisher[P]( application=self._application, event_class=model, publisher=async_publisher, @@ -586,7 +586,7 @@ async def build_publisher_for_model( EventPublisher An appropriate event publisher implementation instance. """ - return NoopEventPublisher(self._application, model, self.logger) + return NoopEventPublisher[P](self._application, model, self.logger) class MockEventManager(EventManager): @@ -636,4 +636,4 @@ async def build_publisher_for_model( EventPublisher An appropriate event publisher implementation instance. """ - return MockEventPublisher(self._application, model, self.logger) + return MockEventPublisher[P](self._application, model, self.logger)