Skip to content

Commit

Permalink
Add EventsPipeline and testing utilities (#14765)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakekaplan authored Jul 26, 2024
1 parent b7cca3d commit 334859f
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 258 deletions.
3 changes: 3 additions & 0 deletions src/prefect/events/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ def reset(cls) -> None:
cls.last = None
cls.all = []

def reset_events(self) -> None:
self.events = []

async def _emit(self, event: Event) -> None:
self.events.append(event)

Expand Down
38 changes: 38 additions & 0 deletions src/prefect/server/events/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import List

import pendulum

from prefect.server.events.schemas.events import ReceivedEvent
from prefect.server.events.services import event_persister
from prefect.server.services import task_run_recorder
from prefect.server.utilities.messaging.memory import MemoryMessage


class EventsPipeline:
@staticmethod
def events_to_messages(events) -> List[MemoryMessage]:
messages = []
for event in events:
received_event = ReceivedEvent(
received=pendulum.now("UTC"), **event.model_dump()
)
message = MemoryMessage(
data=received_event.model_dump_json().encode(),
attributes={"id": str(event.id), "event": event.event},
)
messages.append(message)
return messages

async def process_messages(self, messages: List[MemoryMessage]):
for message in messages:
await self.process_message(message)

async def process_message(self, message: MemoryMessage):
"""Process a single event message"""

# TODO: Investigate if we want to include triggers/actions etc.
async with task_run_recorder.consumer() as handler:
await handler(message)

async with event_persister.create_handler(batch_size=1) as handler:
await handler(message)
16 changes: 16 additions & 0 deletions src/prefect/testing/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
from prefect.events.clients import AssertingEventsClient
from prefect.events.filters import EventFilter
from prefect.events.worker import EventsWorker
from prefect.server.events.pipeline import EventsPipeline
from prefect.settings import (
PREFECT_API_URL,
PREFECT_SERVER_CSRF_PROTECTION_ENABLED,
get_current_settings,
temporary_settings,
)
from prefect.testing.utilities import AsyncMock
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.processutils import open_process


Expand Down Expand Up @@ -346,6 +348,20 @@ def asserting_events_worker(monkeypatch) -> Generator[EventsWorker, None, None]:
worker.drain()


@pytest.fixture
async def events_pipeline(asserting_events_worker: EventsWorker):
class AssertingEventsPipeline(EventsPipeline):
@sync_compatible
async def process_events(self):
asserting_events_worker.wait_until_empty()
events = asserting_events_worker._client.events
messages = self.events_to_messages(events)
await self.process_messages(messages)
asserting_events_worker._client.reset_events()

yield AssertingEventsPipeline()


@pytest.fixture
def reset_worker_events(asserting_events_worker: EventsWorker):
yield
Expand Down
Loading

0 comments on commit 334859f

Please sign in to comment.