diff --git a/src/prefect/server/events/pipeline.py b/src/prefect/server/events/pipeline.py new file mode 100644 index 000000000000..ad0c69db192a --- /dev/null +++ b/src/prefect/server/events/pipeline.py @@ -0,0 +1,38 @@ +from typing import List + +import pendulum + +from prefect.server.events.schemas.events import ReceivedEvent +from prefect.server.events.services import event_persister +from prefect.server.services import task_run_recorder +from prefect.server.utilities.messaging.memory import MemoryMessage + + +class EventsPipeline: + @staticmethod + def events_to_messages(events) -> List[MemoryMessage]: + messages = [] + for event in events: + received_event = ReceivedEvent( + received=pendulum.now("UTC"), **event.model_dump() + ) + message = MemoryMessage( + data=received_event.model_dump_json().encode(), + attributes={"id": str(event.id), "event": event.event}, + ) + messages.append(message) + return messages + + async def process_messages(self, messages: List[MemoryMessage]): + for message in messages: + await self.process_message(message) + + async def process_message(self, message: MemoryMessage): + """Process a single event message""" + + # All event stream consumers should be added here + async with task_run_recorder.consumer() as handler: + await handler(message) + + async with event_persister.create_handler(batch_size=1) as handler: + await handler(message)