Validated flow pubsub path#1572
Conversation
|
@abishekgiri is attempting to deploy a commit to the motia Team on Vercel. A member of the Team first needs to authorize it. |
📝 WalkthroughWalkthroughAdds test utilities and end-to-end tests that capture OpenTelemetry spans for HTTP-triggered queue, stream, and pub/sub flows, validate span topology and attributes, and produce deterministic flow graph assets for assertion. Changes
Sequence Diagram(s)sequenceDiagram
participant Test as Test Code
participant HTTP as HTTP Worker
participant Engine as Engine
participant Tracing as OTel Tracing
participant Storage as Span Storage
Test->>Tracing: ensure_flow_test_tracing()
Tracing->>Tracing: Initialize in-memory exporter & subscriber
Test->>HTTP: Start worker on ephemeral port
Test->>Engine: Register triggers & handlers
Test->>HTTP: POST to trigger path
HTTP->>Engine: Route to entry function
Engine->>Engine: Execute handler chain
Engine->>Tracing: Emit spans (entry, queue/stream/pubsub, observer)
Tracing->>Storage: Store spans
Test->>Storage: Poll for span topology
Storage-->>Test: Spans found
Test->>Test: Construct ValidatedFlowGraphAsset
Test->>Test: Assert JSON structure & connections
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.Comment |
|
Added the next validated-flow helper slice for pub/sub. This covers a real Validated with:
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/tests/common/flow_helpers.rs`:
- Around line 406-411: The helper currently picks the first "call publish" child
and the next "call {subscriber}" by timestamp only; instead, locate the publish
span that has the topic attribute equal to self.topic (inspect
StoredSpan.attributes) and then find the subscriber span whose name equals
format!("call {}", self.subscriber_function_id) and that correlates to that
publish span via link/trace metadata (inspect StoredSpan.links or matching
trace/span id fields on the publish span). Concretely, replace the single
find_child_span(...) with a search for a child span named "call publish" AND
attributes.get("topic") == Some(self.topic), then replace the time-only
find_span_after(...) with a search for a span named format!("call {}",
self.subscriber_function_id) that either has an attribute topic == self.topic or
contains a link referencing the publish span (or shares the same trace id),
using StoredSpan.attributes and StoredSpan.links to validate correlation.
In `@engine/tests/validated_flow_asset_e2e.rs`:
- Around line 55-64: The current wait_for_route exits as soon as the GET request
completes, even on 4xx/5xx responses; change wait_for_route to await the send(),
inspect the Response and only return when resp.status().is_success() is true
(otherwise continue looping), handling Err(_) by continuing the retry loop until
the deadline; keep using the same client and url parameters and the existing
sleep/retry pattern.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 73dd841c-0355-4120-80a7-5fe841aca281
📒 Files selected for processing (3)
engine/tests/common/flow_helpers.rsengine/tests/common/mod.rsengine/tests/validated_flow_asset_e2e.rs
| async fn wait_for_route(client: &reqwest::Client, url: &str) { | ||
| let deadline = std::time::Instant::now() + Duration::from_secs(3); | ||
| while std::time::Instant::now() < deadline { | ||
| if client.get(url).send().await.is_ok() { | ||
| return; | ||
| } | ||
| sleep(Duration::from_millis(10)).await; | ||
| } | ||
|
|
||
| panic!("HTTP route did not become reachable before timeout: {url}"); |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In reqwest 0.12, does RequestBuilder::send().awaitreturnOk(Response)for HTTP 404/405 responses and only returnErr for transport/protocol failures?
💡 Result:
Yes, in reqwest 0.12, RequestBuilder::send.await returns Ok(Response) for HTTP 404 and 405 responses. It only returns Err for transport or protocol failures such as connection errors, TLS handshake failures, timeouts, or redirect loops. HTTP status codes in the 4xx or 5xx range are delivered via the Response object, accessible through methods like response.status, and do not cause send to error. Users must check response.status.is_success or use response.error_for_status if they want to treat non-2xx responses as errors.
Citations:
- 1: https://docs.rs/reqwest/0.11.0/reqwest/struct.RequestBuilder.html
- 2: https://docs.rs/reqwest/latest/reqwest/struct.RequestBuilder.html
- 3: https://docs.rs/reqwest/0.11.4/reqwest/struct.RequestBuilder.html
- 4: https://stackoverflow.com/questions/77289638/how-do-i-handle-raising-an-error-if-reqwest-status-code-is-not-200
- 5: https://docs.rs/reqwest/0.9.9/reqwest/struct.Response.html
- 6: https://docs.rs/reqwest/latest/reqwest/struct.Error.html
- 7: 400 Bad Request after upgrading to 0.12.1 seanmonstar/reqwest#2202
- 8: https://docs.rs/crate/reqwest/0.12.12/source/CHANGELOG.md
- 9: https://docs.rs/reqwest/0.9.13/reqwest/struct.Response.html
wait_for_route does not actually wait for the route.
send().await.is_ok() ignores the HTTP status, so a live server returning 404/405 satisfies this check and exits the loop. The subsequent POST then races actual route availability, introducing test flakiness.
🛠️ Suggested fix
async fn wait_for_route(client: &reqwest::Client, url: &str) {
let deadline = std::time::Instant::now() + Duration::from_secs(3);
while std::time::Instant::now() < deadline {
- if client.get(url).send().await.is_ok() {
- return;
+ if let Ok(response) = client.get(url).send().await {
+ if response.status() != reqwest::StatusCode::NOT_FOUND {
+ return;
+ }
}
sleep(Duration::from_millis(10)).await;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/tests/validated_flow_asset_e2e.rs` around lines 55 - 64, The current
wait_for_route exits as soon as the GET request completes, even on 4xx/5xx
responses; change wait_for_route to await the send(), inspect the Response and
only return when resp.status().is_success() is true (otherwise continue
looping), handling Err(_) by continuing the retry loop until the deadline; keep
using the same client and url parameters and the existing sleep/retry pattern.
|
Finished the remaining Phase 1 helper cleanup in the latest push. The validated-flow helper now has a reusable Phase 1 now covers:
Validated with:
|
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
engine/tests/common/flow_helpers.rs (1)
577-589:⚠️ Potential issue | 🟠 MajorCorrelate pub/sub spans with topic/link metadata, not timestamp ordering.
The pub/sub spec still identifies
publishonly by span name andsubscriberonly by name plusstarts_after_id, so a request that publishes multiple topics—or an unrelated subscriber firing nearby—can satisfy this helper with the wrong path. Please constrain the publish span bymessaging.destination.name == self.topicand correlate the subscriber viaStoredSpan.linksor same-trace metadata instead of time ordering alone.Also applies to: 759-776
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/tests/common/flow_helpers.rs` around lines 577 - 589, The test helper currently matches the pub/sub ExpectedFlowSpan entries by span name and starts_after_id only, which can mismatch when multiple publishes/subscribers exist; update the matching logic in flow_helpers.rs (where ExpectedFlowSpan with id "publish" and "subscriber" and the use of self.subscriber_function_id is defined) to also assert that the publish span contains attribute messaging.destination.name == self.topic and to correlate the subscriber span by inspecting StoredSpan.links (or same-trace link metadata) to find a link to the publish span (rather than relying on starts_after timestamp ordering); ensure both the publish matcher and the subscriber matcher use these attributes/links so the helper only accepts spans that are actually connected by topic/link metadata.
🧹 Nitpick comments (1)
engine/tests/common/flow_helpers.rs (1)
189-190: Reject non-standard HTTP paths up front.These IDs and span names are built directly from
self.path, so"orders"will silently produce assets that can never match the engine’sGET /ordersspans. A smallstarts_with('/')guard here would fail fast instead of timing out later.As per coding guidelines "Never omit leading slashes on api_path — the engine standard is /path".
Also applies to: 379-380, 509-510
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/tests/common/flow_helpers.rs` around lines 189 - 190, Add a guard that rejects non-standard HTTP paths by checking self.path.starts_with('/'); before constructing http_node_id and entry_node_id (and the same check at the other occurrences around the code that build IDs at the noted spots), if the check fails return an explicit error or panic with a clear message (e.g. "api_path must start with '/'") so the builder fails fast rather than producing unusable IDs built from a path like "orders"; place the check in the function that constructs http_node_id/entry_node_id to enforce the rule consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/tests/common/flow_helpers.rs`:
- Around line 160-185: Call validate_state_event once before starting the retry
loop in wait_for_validated_asset and fail immediately on validation errors
instead of retrying; specifically, invoke validate_state_event(state_event,
&self.state_scope, &self.state_key) before the loop and return Err if it fails,
then inside the loop only call self.declaration().try_build_asset(); also update
the other analogous helpers that poll for assets from fixed inputs (the methods
handling stream_event and pubsub_event referenced in the review) to validate
their event payloads once up-front rather than retrying on validation failures.
---
Duplicate comments:
In `@engine/tests/common/flow_helpers.rs`:
- Around line 577-589: The test helper currently matches the pub/sub
ExpectedFlowSpan entries by span name and starts_after_id only, which can
mismatch when multiple publishes/subscribers exist; update the matching logic in
flow_helpers.rs (where ExpectedFlowSpan with id "publish" and "subscriber" and
the use of self.subscriber_function_id is defined) to also assert that the
publish span contains attribute messaging.destination.name == self.topic and to
correlate the subscriber span by inspecting StoredSpan.links (or same-trace link
metadata) to find a link to the publish span (rather than relying on
starts_after timestamp ordering); ensure both the publish matcher and the
subscriber matcher use these attributes/links so the helper only accepts spans
that are actually connected by topic/link metadata.
---
Nitpick comments:
In `@engine/tests/common/flow_helpers.rs`:
- Around line 189-190: Add a guard that rejects non-standard HTTP paths by
checking self.path.starts_with('/'); before constructing http_node_id and
entry_node_id (and the same check at the other occurrences around the code that
build IDs at the noted spots), if the check fails return an explicit error or
panic with a clear message (e.g. "api_path must start with '/'") so the builder
fails fast rather than producing unusable IDs built from a path like "orders";
place the check in the function that constructs http_node_id/entry_node_id to
enforce the rule consistently.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 961908c0-4c40-4a7d-8830-8dfd3e91e358
📒 Files selected for processing (1)
engine/tests/common/flow_helpers.rs
| pub async fn wait_for_validated_asset( | ||
| &self, | ||
| state_event: &Value, | ||
| timeout: Duration, | ||
| ) -> anyhow::Result<ValidatedFlowGraphAsset> { | ||
| let deadline = Instant::now() + timeout; | ||
| let mut last_error: Option<anyhow::Error> = None; | ||
|
|
||
| loop { | ||
| match self.try_build_asset(state_event) { | ||
| Ok(asset) => return Ok(asset), | ||
| Err(err) if Instant::now() < deadline => { | ||
| last_error = Some(err); | ||
| sleep(Duration::from_millis(25)).await; | ||
| } | ||
| Err(err) => { | ||
| let last_error = last_error.unwrap_or(err); | ||
| return Err(last_error.context("timed out waiting for validated flow asset")); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn try_build_asset(&self, state_event: &Value) -> anyhow::Result<ValidatedFlowGraphAsset> { | ||
| validate_state_event(state_event, &self.state_scope, &self.state_key)?; | ||
| self.declaration().try_build_asset() |
There was a problem hiding this comment.
Fail fast on invalid event payloads.
state_event, stream_event, and pubsub_event are fixed inputs, but these methods keep polling when validation fails inside try_build_asset(...). That turns a deterministic setup error into a slow timeout and hides the real cause. Validate the event once before entering the retry loop, and reserve polling for span-availability misses.
Also applies to: 343-375, 478-505
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/tests/common/flow_helpers.rs` around lines 160 - 185, Call
validate_state_event once before starting the retry loop in
wait_for_validated_asset and fail immediately on validation errors instead of
retrying; specifically, invoke validate_state_event(state_event,
&self.state_scope, &self.state_key) before the loop and return Err if it fails,
then inside the loop only call self.declaration().try_build_asset(); also update
the other analogous helpers that poll for assets from fixed inputs (the methods
handling stream_event and pubsub_event referenced in the review) to validate
their event payloads once up-front rather than retrying on validation failures.
What
HTTP -> publish -> subscribepathValidatedFlowPathSpecdeclaration model for expected nodes, edges, and span evidenceWhy
#1468by adding another real connection typeNotes
this PR is stacked on top of
#1518Phase 1 now covers
HTTP -> queue -> state,HTTP -> stream, andHTTP -> publish -> subscribevalidation used:
cargo fmt --allcargo test -p iii --test validated_flow_asset_e2e -- --nocaptureI am licensing the entirety of this PR under Apache 2 and have all necessary rights to the code I am contributing.
Summary by CodeRabbit