Skip to content

Commit

Permalink
add pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jakekaplan committed Jul 25, 2024
1 parent 02c7991 commit f28d4bc
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions src/prefect/server/events/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import List

import pendulum

from prefect.server.events.schemas.events import ReceivedEvent
from prefect.server.events.services import event_persister
from prefect.server.services import task_run_recorder
from prefect.server.utilities.messaging.memory import MemoryMessage


class EventsPipeline:
@staticmethod
def events_to_messages(events) -> List[MemoryMessage]:
messages = []
for event in events:
received_event = ReceivedEvent(
received=pendulum.now("UTC"), **event.model_dump()
)
message = MemoryMessage(
data=received_event.model_dump_json().encode(),
attributes={"id": str(event.id), "event": event.event},
)
messages.append(message)
return messages

async def process_messages(self, messages: List[MemoryMessage]):
for message in messages:
await self.process_message(message)

async def process_message(self, message: MemoryMessage):
"""Process a single event message"""

# All event stream consumers should be added here
async with task_run_recorder.consumer() as handler:
await handler(message)

async with event_persister.create_handler(batch_size=1) as handler:
await handler(message)

0 comments on commit f28d4bc

Please sign in to comment.