Skip to content

Commit

Permalink
RunTelemetry start span uses run name instead of flow/task name (#16389)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanluciano authored Dec 31, 2024
1 parent 9924f8f commit 5f5e4a6
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 70 deletions.
7 changes: 5 additions & 2 deletions src/prefect/flow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,13 +602,16 @@ def setup_run_context(self, client: Optional[SyncPrefectClient] = None):
self.client.set_flow_run_name(
flow_run_id=self.flow_run.id, name=flow_run_name
)

self.logger.extra["flow_run_name"] = flow_run_name
self.logger.debug(
f"Renamed flow run {self.flow_run.name!r} to {flow_run_name!r}"
)
self.flow_run.name = flow_run_name
self._flow_run_name_set = True

self._telemetry.update_run_name(name=flow_run_name)

if self.flow_run.parent_task_run_id:
_logger = get_run_logger(FlowRunContext.get())
run_type = "subflow"
Expand Down Expand Up @@ -655,7 +658,6 @@ def initialize_run(self):
)

self._telemetry.start_span(
name=self.flow.name,
run=self.flow_run,
client=self.client,
parameters=self.parameters,
Expand Down Expand Up @@ -1156,6 +1158,7 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None):
self.logger = flow_run_logger(flow_run=self.flow_run, flow=self.flow)

# update the flow run name if necessary

if not self._flow_run_name_set and self.flow.flow_run_name:
flow_run_name = resolve_custom_flow_run_name(
flow=self.flow, parameters=self.parameters
Expand All @@ -1170,6 +1173,7 @@ async def setup_run_context(self, client: Optional[PrefectClient] = None):
self.flow_run.name = flow_run_name
self._flow_run_name_set = True

self._telemetry.update_run_name(name=flow_run_name)
if self.flow_run.parent_task_run_id:
_logger = get_run_logger(FlowRunContext.get())
run_type = "subflow"
Expand Down Expand Up @@ -1222,7 +1226,6 @@ async def initialize_run(self):
)

await self._telemetry.async_start_span(
name=self.flow.name,
run=self.flow_run,
client=self.client,
parameters=self.parameters,
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def _set_custom_task_run_name(self):
)
self.task_run.name = task_run_name
self._task_name_set = True
self._telemetry.update_run_name(name=task_run_name)

def _wait_for_dependencies(self):
if not self.wait_for:
Expand Down Expand Up @@ -686,7 +687,6 @@ def initialize_run(

self._telemetry.start_span(
run=self.task_run,
name=self.task.name,
client=self.client,
parameters=self.parameters,
)
Expand Down Expand Up @@ -1215,7 +1215,6 @@ async def initialize_run(

await self._telemetry.async_start_span(
run=self.task_run,
name=self.task.name,
client=self.client,
parameters=self.parameters,
)
Expand Down
19 changes: 12 additions & 7 deletions src/prefect/telemetry/run_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ async def async_start_span(
self,
run: FlowOrTaskRun,
client: PrefectClient,
name: Optional[str] = None,
parameters: Optional[dict[str, Any]] = None,
):
traceparent, span = self._start_span(run, name, parameters)
traceparent, span = self._start_span(run, parameters)

if self._run_type(run) == "flow" and traceparent:
# Only explicitly update labels if the run is a flow as task runs
Expand All @@ -71,10 +70,9 @@ def start_span(
self,
run: FlowOrTaskRun,
client: SyncPrefectClient,
name: Optional[str] = None,
parameters: Optional[dict[str, Any]] = None,
):
traceparent, span = self._start_span(run, name, parameters)
traceparent, span = self._start_span(run, parameters)

if self._run_type(run) == "flow" and traceparent:
# Only explicitly update labels if the run is a flow as task runs
Expand All @@ -86,7 +84,6 @@ def start_span(
def _start_span(
self,
run: FlowOrTaskRun,
name: Optional[str] = None,
parameters: Optional[dict[str, Any]] = None,
) -> tuple[Optional[str], Span]:
"""
Expand Down Expand Up @@ -117,10 +114,10 @@ def _start_span(
run_type = self._run_type(run)

self.span = self._tracer.start_span(
name=name or run.name,
name=run.name,
context=context,
attributes={
"prefect.run.name": name or run.name,
"prefect.run.name": run.name,
"prefect.run.type": run_type,
"prefect.run.id": str(run.id),
"prefect.tags": run.tags,
Expand Down Expand Up @@ -198,6 +195,14 @@ def update_state(self, new_state: State) -> None:
},
)

def update_run_name(self, name: str) -> None:
"""
Update the name of the run.
"""
if self.span:
self.span.update_name(name=name)
self.span.set_attribute("prefect.run.name", name)

def _parent_run(self) -> Union[FlowOrTaskRun, None]:
"""
Identify the "parent run" for the current execution context.
Expand Down
4 changes: 2 additions & 2 deletions tests/deployment/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ async def test_propagates_otel_trace_to_deployment_flow_run(
"""Test that OTEL trace context gets propagated from parent flow to deployment flow run"""
deployment = test_deployment

@flow(name="child-flow")
@flow(flow_run_name="child-flow")
async def child_flow() -> None:
pass

Expand All @@ -457,7 +457,7 @@ async def child_flow() -> None:
)
deployment = await prefect_client.read_deployment(deployment_id)

@flow(name="parent-flow")
@flow(flow_run_name="parent-flow")
async def parent_flow():
return await run_deployment(
f"foo/{deployment.name}",
Expand Down
Loading

0 comments on commit 5f5e4a6

Please sign in to comment.