Add validated flow helper proof of concept#1518
Conversation
|
@abishekgiri is attempting to deploy a commit to the motia Team on Vercel. A member of the Team first needs to authorize it. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a public test helper module that installs in-memory OpenTelemetry tracing, builds deterministic flow graph assets from captured spans, and provides path specs for HTTP→queue→state and HTTP→stream flows; also adds end-to-end Tokio tests that exercise both flows and assert exact serialized graph assets. Changes
Sequence DiagramsequenceDiagram
actor Client
participant HTTP as HTTP Worker
participant Queue as Queue Worker
participant State as State Worker
participant Tracing as OTEL Tracer
participant Observer as State/Stream Observer
Client->>HTTP: POST /path {payload}
HTTP->>Tracing: create "METHOD /path" span
HTTP->>Queue: publish/enqueue (topic or stream write)
Queue->>Tracing: record producer/enqueue span
Queue->>Queue: poll & consume (durable) / trigger stream handlers
Queue->>Tracing: record consumer/trigger span
Queue->>State: state::set(scope,key,value) / write stream item
State->>Tracing: record state write / stream set span
State->>Observer: invoke state/stream observer
Observer->>Tracing: record observer span
Tracing->>Tracing: store spans in memory exporter for verification
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/tests/common/flow_helpers.rs`:
- Around line 233-237: The current code creates a detached InMemorySpanExporter
when get_span_storage() returns None, which hides tracing setup failures and
causes wait_for_validated_asset to poll an empty store; instead, fail fast:
replace the fallback InMemorySpanExporter::new(…) branch with an explicit panic
or test failure (e.g., panic!("missing span storage: tracing subscriber not
installed for tests")) so the test immediately reports the misconfigured tracing
subscriber; locate the check using get_span_storage() in flow_helpers.rs and
update it to abort rather than instantiate InMemorySpanExporter::new, and ensure
any tests that rely on wait_for_validated_asset are adjusted to set up the
subscriber properly.
In `@engine/tests/validated_flow_asset_e2e.rs`:
- Around line 50-58: The readiness helper wait_for_route currently silently
returns if the route never becomes reachable after 3 seconds; change
wait_for_route to return a Result<(), anyhow::Error> (or TestError) instead of
unit, await client.get(url).send().await inside the loop as before, and on
timeout return Err(anyhow::anyhow!("timeout waiting for route {}", url)) (or use
context/bail) so the test fails immediately with a clear message; update callers
to .await? and propagate the error or unwrap in the test.
- Around line 22-27: The test currently reserves then drops an ephemeral port in
reserve_local_port(), which is a TOCTOU; instead have the worker bind the port
itself (use port 0) or pass an already-bound listener into the worker to avoid
releasing the socket before HttpWorker::initialize is called. Change the test to
stop calling reserve_local_port(), either (a) call HttpWorker::initialize so it
binds to "127.0.0.1:0" and then query the worker for its bound address/port, or
(b) create and keep a StdTcpListener bound to "127.0.0.1:0" and pass that
listener/socket into HttpWorker::initialize (or add an overload that accepts a
listener) so the socket remains owned until the worker takes it. Ensure
references to reserve_local_port are removed and update test assertions to read
the real bound port from the worker or listener.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 1b18f5a0-face-421a-9c71-1a3cd6a5817e
📒 Files selected for processing (3)
engine/tests/common/flow_helpers.rsengine/tests/common/mod.rsengine/tests/validated_flow_asset_e2e.rs
|
Following the plan from #1468, the next step after this POC is to keep the scope narrow and extend the validated-helper pattern to a small number of additional connection types while continuing to derive flow assets from real runtime behavior. I do not plan to broaden this into console integration or a public engine flows API yet. The goal is to prove the helper shape across a few real execution paths first, then build outward from there once the validated model feels solid. |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
engine/tests/common/flow_helpers.rs (1)
60-76: Optional: simplify the timeout/error-capture loop.The
Err(err) if Instant::now() < deadlinearm plus the shadowinglet last_error = last_error.unwrap_or(err);on line 71 works, but the control flow is a bit hard to follow and the successful-pathlast_erroris written but never read. Atokio::time::timeoutwrapper around a simple retry loop reads more linearly and preserves the most recent error automatically.Sketch
- let deadline = Instant::now() + timeout; - let mut last_error: Option<anyhow::Error> = None; - - loop { - match self.try_build_asset(state_event) { - Ok(asset) => return Ok(asset), - Err(err) if Instant::now() < deadline => { - last_error = Some(err); - sleep(Duration::from_millis(25)).await; - } - Err(err) => { - let last_error = last_error.unwrap_or(err); - return Err(last_error.context("timed out waiting for validated flow asset")); - } - } - } + let deadline = Instant::now() + timeout; + let mut last_error = anyhow!("validated flow asset was never observed"); + while Instant::now() < deadline { + match self.try_build_asset(state_event) { + Ok(asset) => return Ok(asset), + Err(err) => { + last_error = err; + sleep(Duration::from_millis(25)).await; + } + } + } + Err(last_error.context("timed out waiting for validated flow asset"))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/tests/common/flow_helpers.rs` around lines 60 - 76, The retry loop around try_build_asset uses manual Instant checks and last_error handling; replace it with tokio::time::timeout(timeout, async { ... }) to simplify control flow: inside the timeout future loop, repeatedly call self.try_build_asset(state_event).await, returning Ok(asset) on success and otherwise sleep(Duration::from_millis(25)).await and retry; if the timeout elapses, propagate the most recent error with .context("timed out waiting for validated flow asset") so you no longer need manual Instant::now/last_error bookkeeping or the shadowed let last_error variable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/tests/common/flow_helpers.rs`:
- Around line 212-238: ensure_flow_test_tracing uses a process-global span
storage (get_span_storage) and currently clears it unguarded, which will race
when multiple flow tests run in the same test binary; fix by serializing access
inside ensure_flow_test_tracing: introduce a shared global sync primitive (e.g.,
a static Mutex or tokio::sync::Mutex) acquired at the start of
ensure_flow_test_tracing before initializing the tracer or calling
storage.clear(), and released after the test-specific setup/teardown so only one
test manipulates the global span storage at a time; alternatively, scope spans
to a test by tagging root spans with a per-test id and filtering
storage.get_spans()/try_build_asset by that id instead—apply the chosen approach
to ensure_flow_test_tracing, FLOW_TRACING_INIT, get_span_storage, storage.clear,
and the places that call storage.get_spans()/try_build_asset.
---
Nitpick comments:
In `@engine/tests/common/flow_helpers.rs`:
- Around line 60-76: The retry loop around try_build_asset uses manual Instant
checks and last_error handling; replace it with tokio::time::timeout(timeout,
async { ... }) to simplify control flow: inside the timeout future loop,
repeatedly call self.try_build_asset(state_event).await, returning Ok(asset) on
success and otherwise sleep(Duration::from_millis(25)).await and retry; if the
timeout elapses, propagate the most recent error with .context("timed out
waiting for validated flow asset") so you no longer need manual
Instant::now/last_error bookkeeping or the shadowed let last_error variable.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 54d6aa35-2b8b-48f6-a222-5f5bbce4f8ab
📒 Files selected for processing (2)
engine/tests/common/flow_helpers.rsengine/tests/validated_flow_asset_e2e.rs
✅ Files skipped from review due to trivial changes (1)
- engine/tests/validated_flow_asset_e2e.rs
|
@abishekgiri Thanks for opening this PR! Can you address the one remaining comment from coderabbit? @sergiofilhowz Take a look at this one and let us know what you think about it. I know we're all onboard for flows but I haven't reviewed this code myself. |
|
@anthonyiscoding Addressed the remaining CodeRabbit comment in the latest push. The flow tracing helper now serializes access to the shared in-memory span storage so future validated-flow tests do not race on
|
|
Expanded the validated-flow proof of concept with the next What changed:
Validated with:
|
|
@abishekgiri hey, how's it going? Is this PR up for review for @sergiofilhowz and @andersonleal |
|
@rohitg00 Hey, going well, thanks for checking in. Yes, this is ready for review from the code/design side. This PR came out of the direction discussed in This first PR is intentionally a proof of concept. It adds a trace-backed test helper that waits for real in-memory spans from an executed iii path, validates that the expected runtime path actually happened, and only then generates a renderable flow graph asset. Current scope in this PR:
The follow-up work is already split out separately: Most code checks are passing on this PR. I noticed the license-agreement check is still red, so I can take a look at that separately, but from the implementation side this is ready for review by @sergiofilhowz and @andersonleal. |
What
iii::durable::publishstate::setstream::setWhy
#1468to validate flows from real runtime behavior instead of relying on hand-maintained metadataNotes
cargo fmt --allcargo test -p iii --test validated_flow_asset_e2e -- --nocapture