Skip to content

Commit

Permalink
add eventspipelien and test utility
Browse files Browse the repository at this point in the history
  • Loading branch information
jakekaplan committed Jul 25, 2024
1 parent 09b7a48 commit 02c7991
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 259 deletions.
4 changes: 4 additions & 0 deletions src/prefect/events/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ def reset(cls) -> None:
cls.last = None
cls.all = []

@classmethod
def reset_events(self) -> None:
self.events = []

async def _emit(self, event: Event) -> None:
self.events.append(event)

Expand Down
21 changes: 20 additions & 1 deletion src/prefect/testing/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import os
import socket
Expand All @@ -19,6 +20,7 @@
from prefect.events.clients import AssertingEventsClient
from prefect.events.filters import EventFilter
from prefect.events.worker import EventsWorker
from prefect.server.events.pipeline import EventsPipeline
from prefect.settings import (
PREFECT_API_URL,
PREFECT_SERVER_CSRF_PROTECTION_ENABLED,
Expand Down Expand Up @@ -335,7 +337,7 @@ def mock_should_emit_events(monkeypatch) -> mock.Mock:
return m


@pytest.fixture
@pytest.fixture(autouse=True)
def asserting_events_worker(monkeypatch) -> Generator[EventsWorker, None, None]:
worker = EventsWorker.instance(AssertingEventsClient)
# Always yield the asserting worker when new instances are retrieved
Expand All @@ -346,6 +348,23 @@ def asserting_events_worker(monkeypatch) -> Generator[EventsWorker, None, None]:
worker.drain()


@pytest.fixture
async def events_pipeline(asserting_events_worker: EventsWorker):
class AssertingEventsPipeline(EventsPipeline):
def sync_process_events(self):
asyncio.run(self.process_events())

async def process_events(self):
asserting_events_worker.wait_until_empty()
events = asserting_events_worker._client.events

messages = self.events_to_messages(events)
await self.process_messages(messages)
asserting_events_worker._client.reset_events()

yield AssertingEventsPipeline()


@pytest.fixture
def reset_worker_events(asserting_events_worker: EventsWorker):
yield
Expand Down
Loading

0 comments on commit 02c7991

Please sign in to comment.