From a571cf6266aca14e68210ae9fb322307a8781685 Mon Sep 17 00:00:00 2001 From: jakekaplan <40362401+jakekaplan@users.noreply.github.com> Date: Thu, 5 Sep 2024 21:46:54 -0400 Subject: [PATCH] Add debug logging for server side event handling (#15245) --- src/prefect/server/events/messaging.py | 7 +++++++ src/prefect/server/events/services/event_persister.py | 8 ++++++++ src/prefect/server/services/task_run_recorder.py | 9 ++++++--- tests/server/services/test_task_run_recorder.py | 4 ++-- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/prefect/server/events/messaging.py b/src/prefect/server/events/messaging.py index d40f0eec3abb..1e09fb275043 100644 --- a/src/prefect/server/events/messaging.py +++ b/src/prefect/server/events/messaging.py @@ -49,6 +49,13 @@ async def publish_event(self, event: ReceivedEvent): }, ) return + + logger.debug( + "Publishing event: %s with id: %s for resource: %s", + event.event, + event.id, + event.resource.get("prefect.resource.id"), + ) await self.publish_data( encoded, { diff --git a/src/prefect/server/events/services/event_persister.py b/src/prefect/server/events/services/event_persister.py index 3b425ba7230d..b3f0fc188175 100644 --- a/src/prefect/server/events/services/event_persister.py +++ b/src/prefect/server/events/services/event_persister.py @@ -149,6 +149,14 @@ async def message_handler(message: Message): return event = ReceivedEvent.model_validate_json(message.data) + + logger.debug( + "Received event: %s with id: %s for resource: %s", + event.event, + event.id, + event.resource.get("prefect.resource.id"), + ) + await queue.put(event) if queue.qsize() >= batch_size: diff --git a/src/prefect/server/services/task_run_recorder.py b/src/prefect/server/services/task_run_recorder.py index f8e91e97b834..ab19dcfa8c41 100644 --- a/src/prefect/server/services/task_run_recorder.py +++ b/src/prefect/server/services/task_run_recorder.py @@ -149,7 +149,7 @@ async def record_task_run_event(event: ReceivedEvent): session, task_run, denormalized_state_attributes ) - logger.info( + logger.debug( "Recorded task run state change", extra={ "task_run_id": task_run.id, @@ -175,8 +175,11 @@ async def message_handler(message: Message): if not event.resource.get("prefect.orchestration") == "client": return - logger.info( - f"Received event: {event.event} with id: {event.id} for resource: {event.resource.get('prefect.resource.id')}" + logger.debug( + "Received event: %s with id: %s for resource: %s", + event.event, + event.id, + event.resource.get("prefect.resource.id"), ) try: diff --git a/tests/server/services/test_task_run_recorder.py b/tests/server/services/test_task_run_recorder.py index 08846aa392b2..e373a76c972e 100644 --- a/tests/server/services/test_task_run_recorder.py +++ b/tests/server/services/test_task_run_recorder.py @@ -163,7 +163,7 @@ async def test_handle_client_orchestrated_task_run_event( client_orchestrated_task_run_event: ReceivedEvent, caplog: pytest.LogCaptureFixture, ): - with caplog.at_level("INFO"): + with caplog.at_level("DEBUG"): await task_run_recorder_handler(message(client_orchestrated_task_run_event)) assert "Recorded task run state change" in caplog.text @@ -175,7 +175,7 @@ async def test_skip_non_task_run_event( hello_event: ReceivedEvent, caplog: pytest.LogCaptureFixture, ): - with caplog.at_level("INFO"): + with caplog.at_level("DEBUG"): await task_run_recorder_handler(message(hello_event)) assert "Received event" not in caplog.text