Skip to content

Commit

Permalink
log rendered custom flow_run_name (#16517)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Dec 26, 2024
1 parent 2d9450b commit f1aaad2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 36 deletions.
83 changes: 47 additions & 36 deletions src/prefect/flow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,24 +490,13 @@ def create_flow_run(self, client: SyncPrefectClient) -> FlowRun:
):
return subflow_run

flow_run = client.create_flow_run(
return client.create_flow_run(
flow=self.flow,
parameters=self.flow.serialize_parameters(parameters),
state=Pending(),
parent_task_run_id=getattr(parent_task_run, "id", None),
tags=TagsContext.get().current_tags,
)
if flow_run_ctx:
parent_logger = get_run_logger(flow_run_ctx)
parent_logger.info(
f"Created subflow run {flow_run.name!r} for flow {self.flow.name!r}"
)
else:
self.logger.info(
f"Created flow run {flow_run.name!r} for flow {self.flow.name!r}"
)

return flow_run

def call_hooks(self, state: Optional[State] = None):
if state is None:
Expand Down Expand Up @@ -606,6 +595,7 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
stack.enter_context(ConcurrencyContext())

# set the logger to the flow run logger

self.logger = flow_run_logger(flow_run=self.flow_run, flow=self.flow)

# update the flow run name if necessary
Expand All @@ -622,6 +612,23 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
)
self.flow_run.name = flow_run_name
self._flow_run_name_set = True

if self.flow_run.parent_task_run_id:
_logger = get_run_logger(FlowRunContext.get())
run_type = "subflow"
else:
_logger = self.logger
run_type = "flow"

_logger.info(
f"Beginning {run_type} run {self.flow_run.name!r} for flow {self.flow.name!r}"
)

if flow_run_url := url_for(self.flow_run):
self.logger.info(
f"View at {flow_run_url}", extra={"send_to_api": False}
)

yield

@contextmanager
Expand All @@ -635,12 +642,6 @@ def initialize_run(self):

if not self.flow_run:
self.flow_run = self.create_flow_run(self.client)
flow_run_url = url_for(self.flow_run)

if flow_run_url:
self.logger.info(
f"View at {flow_run_url}", extra={"send_to_api": False}
)
else:
# Update the empirical policy to match the flow if it is not set
if self.flow_run.empirical_policy.retry_delay is None:
Expand Down Expand Up @@ -705,9 +706,11 @@ def initialize_run(self):
@contextmanager
def start(self) -> Generator[None, None, None]:
with self.initialize_run():
with trace.use_span(
self._telemetry.span
) if self._telemetry.span else nullcontext():
with (
trace.use_span(self._telemetry.span)
if self._telemetry.span
else nullcontext()
):
self.begin_run()

if self.state.is_running():
Expand Down Expand Up @@ -1052,24 +1055,13 @@ async def create_flow_run(self, client: PrefectClient) -> FlowRun:
):
return subflow_run

flow_run = await client.create_flow_run(
return await client.create_flow_run(
flow=self.flow,
parameters=self.flow.serialize_parameters(parameters),
state=Pending(),
parent_task_run_id=getattr(parent_task_run, "id", None),
tags=TagsContext.get().current_tags,
)
if flow_run_ctx:
parent_logger = get_run_logger(flow_run_ctx)
parent_logger.info(
f"Created subflow run {flow_run.name!r} for flow {self.flow.name!r}"
)
else:
self.logger.info(
f"Created flow run {flow_run.name!r} for flow {self.flow.name!r}"
)

return flow_run

async def call_hooks(self, state: Optional[State] = None):
if state is None:
Expand Down Expand Up @@ -1184,6 +1176,23 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None):
)
self.flow_run.name = flow_run_name
self._flow_run_name_set = True

if self.flow_run.parent_task_run_id:
_logger = get_run_logger(FlowRunContext.get())
run_type = "subflow"
else:
_logger = self.logger
run_type = "flow"

_logger.info(
f"Beginning {run_type} run {self.flow_run.name!r} for flow {self.flow.name!r}"
)

if flow_run_url := url_for(self.flow_run):
self.logger.info(
f"View at {flow_run_url}", extra={"send_to_api": False}
)

yield

@asynccontextmanager
Expand Down Expand Up @@ -1267,9 +1276,11 @@ async def initialize_run(self):
@asynccontextmanager
async def start(self) -> AsyncGenerator[None, None]:
async with self.initialize_run():
with trace.use_span(
self._telemetry.span
) if self._telemetry.span else nullcontext():
with (
trace.use_span(self._telemetry.span)
if self._telemetry.span
else nullcontext()
):
await self.begin_run()

if self.state.is_running():
Expand Down
19 changes: 19 additions & 0 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2829,6 +2829,25 @@ def flow_method():
assert mocked_flow_method.call_count == 2
assert generate_flow_run_name.call_count == 2

async def test_both_engines_logs_custom_flow_run_name(
self, caplog: pytest.LogCaptureFixture
):
@flow(flow_run_name="very-bespoke-name")
def test():
pass

test()

assert "Beginning flow run 'very-bespoke-name'" in caplog.text

@flow(flow_run_name="very-bespoke-async-name")
async def test_async():
pass

await test_async()

assert "Beginning flow run 'very-bespoke-async-name'" in caplog.text


def create_hook(mock_obj):
def my_hook(flow, flow_run, state):
Expand Down

0 comments on commit f1aaad2

Please sign in to comment.