Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventsPipeline and testing utilities #14765

Merged
merged 6 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/prefect/events/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ def reset(cls) -> None:
cls.last = None
cls.all = []

@classmethod
def reset_events(self) -> None:
self.events = []
jakekaplan marked this conversation as resolved.
Show resolved Hide resolved

async def _emit(self, event: Event) -> None:
self.events.append(event)

Expand Down
38 changes: 38 additions & 0 deletions src/prefect/server/events/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import List

import pendulum

from prefect.server.events.schemas.events import ReceivedEvent
from prefect.server.events.services import event_persister
from prefect.server.services import task_run_recorder
from prefect.server.utilities.messaging.memory import MemoryMessage


class EventsPipeline:
@staticmethod
def events_to_messages(events) -> List[MemoryMessage]:
messages = []
for event in events:
received_event = ReceivedEvent(
received=pendulum.now("UTC"), **event.model_dump()
)
message = MemoryMessage(
data=received_event.model_dump_json().encode(),
attributes={"id": str(event.id), "event": event.event},
)
messages.append(message)
return messages

async def process_messages(self, messages: List[MemoryMessage]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future when shutting down the ephemeral version we can invoke this method on everything sitting inside of the in-memory events queue

for message in messages:
await self.process_message(message)

async def process_message(self, message: MemoryMessage):
"""Process a single event message"""

# TODO: Investigate if we want to include triggers/actions etc.
async with task_run_recorder.consumer() as handler:
await handler(message)

async with event_persister.create_handler(batch_size=1) as handler:
await handler(message)
21 changes: 20 additions & 1 deletion src/prefect/testing/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import os
import socket
Expand All @@ -19,6 +20,7 @@
from prefect.events.clients import AssertingEventsClient
from prefect.events.filters import EventFilter
from prefect.events.worker import EventsWorker
from prefect.server.events.pipeline import EventsPipeline
from prefect.settings import (
PREFECT_API_URL,
PREFECT_SERVER_CSRF_PROTECTION_ENABLED,
Expand Down Expand Up @@ -335,7 +337,7 @@ def mock_should_emit_events(monkeypatch) -> mock.Mock:
return m


@pytest.fixture
@pytest.fixture(autouse=True)
def asserting_events_worker(monkeypatch) -> Generator[EventsWorker, None, None]:
worker = EventsWorker.instance(AssertingEventsClient)
# Always yield the asserting worker when new instances are retrieved
Expand All @@ -346,6 +348,23 @@ def asserting_events_worker(monkeypatch) -> Generator[EventsWorker, None, None]:
worker.drain()


@pytest.fixture
async def events_pipeline(asserting_events_worker: EventsWorker):
class AssertingEventsPipeline(EventsPipeline):
def sync_process_events(self):
asyncio.run(self.process_events())
cicdw marked this conversation as resolved.
Show resolved Hide resolved

async def process_events(self):
asserting_events_worker.wait_until_empty()
events = asserting_events_worker._client.events

messages = self.events_to_messages(events)
await self.process_messages(messages)
asserting_events_worker._client.reset_events()

yield AssertingEventsPipeline()


@pytest.fixture
def reset_worker_events(asserting_events_worker: EventsWorker):
yield
Expand Down
Loading
Loading