Skip to content

Commit 184ef8a

Browse files
committed
fix: coderabbit comments on the pr
1 parent 712749f commit 184ef8a

File tree

4 files changed

+33
-57
lines changed

4 files changed

+33
-57
lines changed

examples/observability/cross_workflow_tracking/cross_workflow_tracking_example.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@ async def create_simple_config() -> str:
2121

2222
config_content = """
2323
llms:
24-
nvidia_llm:
25-
_type: nim
26-
model_name: meta/llama-3.1-8b-instruct
27-
temperature: 0.7
28-
max_tokens: 1024
29-
24+
demo_llm:
25+
_type: nat_test_llm
26+
response_seq:
27+
- "Stubbed workflow reply."
28+
delay_ms: 0
3029
workflow:
3130
_type: chat_completion
32-
llm_name: nvidia_llm
31+
llm_name: demo_llm
3332
system_prompt: "You are a helpful customer support assistant. Provide clear, concise, and helpful responses."
3433
"""
3534

examples/observability/cross_workflow_tracking/example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ async def simulate_workflow_execution(workflow_name: str,
105105
result = await transform_data(input_data)
106106
elif workflow_name == "storage" or "store" in workflow_name:
107107
result = await store_data(input_data)
108-
elif workflow_name == "report" in workflow_name:
108+
elif "report" in workflow_name:
109109
result = await generate_report(input_data)
110110
else:
111111
# Generic processing

src/nat/observability/processor/cross_workflow_processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async def process(self, item: Span) -> Span:
104104

105105
except (AttributeError, KeyError, TypeError, ValueError) as e:
106106
# If there's any error in processing, log it but don't fail the span
107-
logger.warning(f"Error processing cross-workflow observability data: {e}", exc_info=True)
107+
logger.exception("Error processing cross-workflow observability data: %s", e)
108108
item.set_attribute("observability.processing_error", str(e))
109109

110110
return item
@@ -165,7 +165,7 @@ async def process(self, item: Span) -> Span:
165165

166166
except (AttributeError, IndexError, TypeError) as e:
167167
# If there's any error in processing, log it but don't fail the span
168-
logger.warning(f"Error processing workflow relationship data: {e}", exc_info=True)
168+
logger.exception("Error processing workflow relationship data: %s", e)
169169
item.set_attribute("relationship.processing_error", str(e))
170170

171171
return item

src/nat/observability/workflow_utils.py

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -88,27 +88,19 @@ async def invoke_workflow_with_context(workflow: "Workflow[InputT, StreamingOutp
8888
try:
8989
async with workflow.run(message, observability_context=obs_context) as runner:
9090
result = await runner.result(to_type=to_type)
91-
92-
# Update workflow metadata on completion
93-
if obs_context:
94-
current_workflow = obs_context.get_current_workflow()
95-
if current_workflow:
96-
current_workflow.end_time = time.time()
97-
current_workflow.status = "completed"
98-
9991
return result
100-
101-
except Exception as e:
102-
# Update workflow metadata on failure and log error
92+
finally:
93+
exc = sys.exc_info()[1]
10394
if obs_context:
10495
current_workflow = obs_context.get_current_workflow()
10596
if current_workflow:
10697
current_workflow.end_time = time.time()
107-
current_workflow.status = "failed"
108-
current_workflow.tags["error"] = str(e)
109-
110-
logger.error(f"Workflow '{workflow_name}' failed with error: {e}", exc_info=True)
111-
raise
98+
if exc is None:
99+
current_workflow.status = "completed"
100+
else:
101+
current_workflow.status = "failed"
102+
current_workflow.tags["error"] = str(exc)
103+
logger.error("Workflow '%s' failed: %s", workflow_name, exc)
112104

113105
@staticmethod
114106
async def invoke_workflow_stream_with_context(
@@ -158,25 +150,18 @@ async def invoke_workflow_stream_with_context(
158150
async with workflow.run(message, observability_context=obs_context) as runner:
159151
async for item in runner.result_stream(to_type=to_type):
160152
yield item
161-
162-
# Update workflow metadata on completion
163-
if obs_context:
164-
current_workflow = obs_context.get_current_workflow()
165-
if current_workflow:
166-
current_workflow.end_time = time.time()
167-
current_workflow.status = "completed"
168-
169-
except Exception as e:
170-
# Update workflow metadata on failure and log error
153+
finally:
154+
exc = sys.exc_info()[1]
171155
if obs_context:
172156
current_workflow = obs_context.get_current_workflow()
173157
if current_workflow:
174158
current_workflow.end_time = time.time()
175-
current_workflow.status = "failed"
176-
current_workflow.tags["error"] = str(e)
177-
178-
logger.error(f"Streaming workflow '{workflow_name}' failed with error: {e}", exc_info=True)
179-
raise
159+
if exc is None:
160+
current_workflow.status = "completed"
161+
else:
162+
current_workflow.status = "failed"
163+
current_workflow.tags["error"] = str(exc)
164+
logger.error("Streaming workflow '%s' failed: %s", workflow_name, exc)
180165

181166
@staticmethod
182167
def get_current_observability_context() -> Optional[ObservabilityContext]:
@@ -268,24 +253,16 @@ async def invoke_with_steps_and_context(workflow: "Workflow[InputT, StreamingOut
268253
to_type=to_type,
269254
observability_context=obs_context
270255
)
271-
272-
# Update workflow metadata on completion
273-
if obs_context:
274-
current_workflow = obs_context.get_current_workflow()
275-
if current_workflow:
276-
current_workflow.end_time = time.time()
277-
current_workflow.status = "completed"
278-
279256
return result, steps
280-
281-
except Exception as e:
282-
# Update workflow metadata on failure and log error
257+
finally:
258+
exc = sys.exc_info()[1]
283259
if obs_context:
284260
current_workflow = obs_context.get_current_workflow()
285261
if current_workflow:
286262
current_workflow.end_time = time.time()
287-
current_workflow.status = "failed"
288-
current_workflow.tags["error"] = str(e)
289-
290-
logger.error(f"Workflow with steps '{workflow_name}' failed with error: {e}", exc_info=True)
291-
raise
263+
if exc is None:
264+
current_workflow.status = "completed"
265+
else:
266+
current_workflow.status = "failed"
267+
current_workflow.tags["error"] = str(exc)
268+
logger.error("Workflow with steps '%s' failed: %s", workflow_name, exc)

0 commit comments

Comments
 (0)