diff --git a/engine/tests/common/flow_helpers.rs b/engine/tests/common/flow_helpers.rs new file mode 100644 index 0000000000..14afd76bf1 --- /dev/null +++ b/engine/tests/common/flow_helpers.rs @@ -0,0 +1,489 @@ +#![allow(dead_code)] + +use std::sync::Once; +use std::time::{Duration, Instant}; + +use anyhow::{anyhow, bail}; +use iii::telemetry::{ExporterType, OtelConfig, get_span_storage, init_otel}; +use iii::workers::observability::otel::StoredSpan; +use serde::Serialize; +use serde_json::Value; +use tokio::sync::{Mutex, MutexGuard}; +use tokio::time::sleep; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +static FLOW_TRACING_INIT: Once = Once::new(); +static FLOW_TRACING_LOCK: Mutex<()> = Mutex::const_new(()); + +pub struct FlowTracingGuard { + _guard: MutexGuard<'static, ()>, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub struct FlowGraphNode { + pub id: String, + pub kind: String, + pub label: String, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub struct FlowGraphEdge { + pub id: String, + pub kind: String, + pub source: String, + pub target: String, + pub label: String, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub struct ValidatedFlowGraphAsset { + pub id: String, + pub name: String, + pub nodes: Vec, + pub edges: Vec, +} + +#[derive(Debug, Clone)] +pub struct HttpQueueStatePathSpec { + pub id: String, + pub name: String, + pub method: String, + pub path: String, + pub entry_function_id: String, + pub queue_topic: String, + pub queue_consumer_function_id: String, + pub state_scope: String, + pub state_key: String, + pub state_observer_function_id: String, +} + +#[derive(Debug, Clone)] +pub struct HttpStreamPathSpec { + pub id: String, + pub name: String, + pub method: String, + pub path: String, + pub entry_function_id: String, + pub stream_name: String, + pub group_id: String, + pub item_id: String, + pub stream_observer_function_id: String, +} + +impl HttpQueueStatePathSpec { + pub async fn wait_for_validated_asset( + &self, + state_event: &Value, + timeout: Duration, + ) -> anyhow::Result { + let deadline = Instant::now() + timeout; + let mut last_error: Option = None; + + loop { + match self.try_build_asset(state_event) { + Ok(asset) => return Ok(asset), + Err(err) if Instant::now() < deadline => { + last_error = Some(err); + sleep(Duration::from_millis(25)).await; + } + Err(err) => { + let last_error = last_error.unwrap_or(err); + return Err(last_error.context("timed out waiting for validated flow asset")); + } + } + } + } + + fn try_build_asset(&self, state_event: &Value) -> anyhow::Result { + validate_state_event(state_event, &self.state_scope, &self.state_key)?; + + let storage = + get_span_storage().ok_or_else(|| anyhow!("span storage is not initialized"))?; + let spans = storage.get_spans(); + + let http_root = spans + .iter() + .find(|span| span.name == format!("{} {}", self.method, self.path)) + .ok_or_else(|| anyhow!("missing HTTP span for {} {}", self.method, self.path))?; + + let entry_call = find_child_span( + &spans, + http_root, + &format!("call {}", self.entry_function_id), + None, + )?; + let enqueue_call = find_child_span( + &spans, + entry_call, + "call iii::durable::publish", + Some(("messaging.destination.name", self.queue_topic.as_str())), + )?; + let queue_job = find_child_span( + &spans, + enqueue_call, + &format!( + "queue {}::{}", + self.queue_topic, self.queue_consumer_function_id + ), + Some(( + "queue", + &format!("{}::{}", self.queue_topic, self.queue_consumer_function_id), + )), + )?; + let consumer_call = find_child_span( + &spans, + queue_job, + &format!("call {}", self.queue_consumer_function_id), + None, + )?; + let state_set = find_child_span(&spans, consumer_call, "call state::set", None)?; + let state_triggers = find_child_span(&spans, state_set, "state_triggers", None)?; + let _observer_call = find_child_span( + &spans, + state_triggers, + &format!("call {}", self.state_observer_function_id), + None, + )?; + + let http_node_id = format!("http:{}:{}", self.method, self.path); + let entry_node_id = self.entry_function_id.clone(); + let queue_node_id = format!("queue:{}", self.queue_topic); + let consumer_node_id = self.queue_consumer_function_id.clone(); + let state_node_id = format!("state:{}:{}", self.state_scope, self.state_key); + let observer_node_id = self.state_observer_function_id.clone(); + + Ok(ValidatedFlowGraphAsset { + id: self.id.clone(), + name: self.name.clone(), + nodes: vec![ + FlowGraphNode { + id: http_node_id.clone(), + kind: "http_trigger".to_string(), + label: format!("{} {}", self.method, self.path), + }, + FlowGraphNode { + id: entry_node_id.clone(), + kind: "function".to_string(), + label: self.entry_function_id.clone(), + }, + FlowGraphNode { + id: queue_node_id.clone(), + kind: "queue_topic".to_string(), + label: self.queue_topic.clone(), + }, + FlowGraphNode { + id: consumer_node_id.clone(), + kind: "function".to_string(), + label: self.queue_consumer_function_id.clone(), + }, + FlowGraphNode { + id: state_node_id.clone(), + kind: "state_slot".to_string(), + label: format!("{}/{}", self.state_scope, self.state_key), + }, + FlowGraphNode { + id: observer_node_id.clone(), + kind: "function".to_string(), + label: self.state_observer_function_id.clone(), + }, + ], + edges: vec![ + FlowGraphEdge { + id: format!("{http_node_id}->{entry_node_id}"), + kind: "http_trigger".to_string(), + source: http_node_id, + target: entry_node_id.clone(), + label: format!("{} {}", self.method, self.path), + }, + FlowGraphEdge { + id: format!("{entry_node_id}->{queue_node_id}"), + kind: "enqueue".to_string(), + source: entry_node_id, + target: queue_node_id.clone(), + label: self.queue_topic.clone(), + }, + FlowGraphEdge { + id: format!("{queue_node_id}->{consumer_node_id}"), + kind: "queue_trigger".to_string(), + source: queue_node_id, + target: consumer_node_id.clone(), + label: self.queue_topic.clone(), + }, + FlowGraphEdge { + id: format!("{consumer_node_id}->{state_node_id}"), + kind: "state_write".to_string(), + source: consumer_node_id, + target: state_node_id.clone(), + label: format!("{}/{}", self.state_scope, self.state_key), + }, + FlowGraphEdge { + id: format!("{state_node_id}->{observer_node_id}"), + kind: "state_trigger".to_string(), + source: state_node_id, + target: observer_node_id, + label: self.state_scope.clone(), + }, + ], + }) + } +} + +impl HttpStreamPathSpec { + pub async fn wait_for_validated_asset( + &self, + stream_event: &Value, + timeout: Duration, + ) -> anyhow::Result { + let deadline = Instant::now() + timeout; + let mut last_error: Option = None; + + loop { + match self.try_build_asset(stream_event) { + Ok(asset) => return Ok(asset), + Err(err) if Instant::now() < deadline => { + last_error = Some(err); + sleep(Duration::from_millis(25)).await; + } + Err(err) => { + let last_error = last_error.unwrap_or(err); + return Err( + last_error.context("timed out waiting for validated stream flow asset") + ); + } + } + } + } + + fn try_build_asset(&self, stream_event: &Value) -> anyhow::Result { + validate_stream_event( + stream_event, + &self.stream_name, + &self.group_id, + &self.item_id, + )?; + + let storage = + get_span_storage().ok_or_else(|| anyhow!("span storage is not initialized"))?; + let spans = storage.get_spans(); + + let http_root = spans + .iter() + .find(|span| span.name == format!("{} {}", self.method, self.path)) + .ok_or_else(|| anyhow!("missing HTTP span for {} {}", self.method, self.path))?; + + let entry_call = find_child_span( + &spans, + http_root, + &format!("call {}", self.entry_function_id), + None, + )?; + let stream_set_call = find_child_span(&spans, entry_call, "call stream::set", None)?; + let stream_triggers = find_child_span(&spans, stream_set_call, "stream_triggers", None)?; + let _observer_call = find_child_span( + &spans, + stream_triggers, + &format!("call {}", self.stream_observer_function_id), + None, + )?; + + let http_node_id = format!("http:{}:{}", self.method, self.path); + let entry_node_id = self.entry_function_id.clone(); + let stream_node_id = format!( + "stream:{}:{}:{}", + self.stream_name, self.group_id, self.item_id + ); + let observer_node_id = self.stream_observer_function_id.clone(); + let stream_label = format!("{}/{}/{}", self.stream_name, self.group_id, self.item_id); + + Ok(ValidatedFlowGraphAsset { + id: self.id.clone(), + name: self.name.clone(), + nodes: vec![ + FlowGraphNode { + id: http_node_id.clone(), + kind: "http_trigger".to_string(), + label: format!("{} {}", self.method, self.path), + }, + FlowGraphNode { + id: entry_node_id.clone(), + kind: "function".to_string(), + label: self.entry_function_id.clone(), + }, + FlowGraphNode { + id: stream_node_id.clone(), + kind: "stream_item".to_string(), + label: stream_label.clone(), + }, + FlowGraphNode { + id: observer_node_id.clone(), + kind: "function".to_string(), + label: self.stream_observer_function_id.clone(), + }, + ], + edges: vec![ + FlowGraphEdge { + id: format!("{http_node_id}->{entry_node_id}"), + kind: "http_trigger".to_string(), + source: http_node_id, + target: entry_node_id.clone(), + label: format!("{} {}", self.method, self.path), + }, + FlowGraphEdge { + id: format!("{entry_node_id}->{stream_node_id}"), + kind: "stream_write".to_string(), + source: entry_node_id, + target: stream_node_id.clone(), + label: stream_label.clone(), + }, + FlowGraphEdge { + id: format!("{stream_node_id}->{observer_node_id}"), + kind: "stream_trigger".to_string(), + source: stream_node_id, + target: observer_node_id, + label: self.stream_name.clone(), + }, + ], + }) + } +} + +pub async fn ensure_flow_test_tracing() -> FlowTracingGuard { + let guard = FLOW_TRACING_LOCK.lock().await; + + iii::workers::observability::metrics::ensure_default_meter(); + + FLOW_TRACING_INIT.call_once(|| { + let config = OtelConfig { + enabled: true, + service_name: "iii-flow-test".to_string(), + service_version: "0.1.0".to_string(), + service_namespace: None, + exporter: ExporterType::Memory, + endpoint: "http://127.0.0.1:4317".to_string(), + sampling_ratio: 1.0, + memory_max_spans: 2048, + }; + + tracing_subscriber::registry() + .with(init_otel(&config)) + .try_init() + .expect("flow test tracing subscriber should initialize"); + }); + + if let Some(storage) = get_span_storage() { + storage.clear(); + } else { + panic!("missing span storage: tracing subscriber not installed for flow tests"); + } + + FlowTracingGuard { _guard: guard } +} + +fn validate_state_event( + state_event: &Value, + expected_scope: &str, + expected_key: &str, +) -> anyhow::Result<()> { + let actual_scope = state_event + .get("scope") + .and_then(Value::as_str) + .ok_or_else(|| anyhow!("state event is missing scope"))?; + let actual_key = state_event + .get("key") + .and_then(Value::as_str) + .ok_or_else(|| anyhow!("state event is missing key"))?; + + if actual_scope != expected_scope { + bail!( + "state event scope mismatch: expected {}, got {}", + expected_scope, + actual_scope + ); + } + + if actual_key != expected_key { + bail!( + "state event key mismatch: expected {}, got {}", + expected_key, + actual_key + ); + } + + Ok(()) +} + +fn validate_stream_event( + stream_event: &Value, + expected_stream_name: &str, + expected_group_id: &str, + expected_item_id: &str, +) -> anyhow::Result<()> { + let actual_stream_name = stream_event + .get("streamName") + .and_then(Value::as_str) + .ok_or_else(|| anyhow!("stream event is missing streamName"))?; + let actual_group_id = stream_event + .get("groupId") + .and_then(Value::as_str) + .ok_or_else(|| anyhow!("stream event is missing groupId"))?; + let actual_item_id = stream_event + .get("id") + .and_then(Value::as_str) + .ok_or_else(|| anyhow!("stream event is missing id"))?; + + if actual_stream_name != expected_stream_name { + bail!( + "stream event streamName mismatch: expected {}, got {}", + expected_stream_name, + actual_stream_name + ); + } + + if actual_group_id != expected_group_id { + bail!( + "stream event groupId mismatch: expected {}, got {}", + expected_group_id, + actual_group_id + ); + } + + if actual_item_id != expected_item_id { + bail!( + "stream event id mismatch: expected {}, got {}", + expected_item_id, + actual_item_id + ); + } + + Ok(()) +} + +fn find_child_span<'a>( + spans: &'a [StoredSpan], + parent: &StoredSpan, + expected_name: &str, + attribute: Option<(&str, &str)>, +) -> anyhow::Result<&'a StoredSpan> { + spans + .iter() + .find(|span| { + span.parent_span_id.as_deref() == Some(parent.span_id.as_str()) + && span.name == expected_name + && attribute + .map(|(key, value)| has_attribute(span, key, value)) + .unwrap_or(true) + }) + .ok_or_else(|| { + anyhow!( + "missing child span '{}' under parent '{}'", + expected_name, + parent.name + ) + }) +} + +fn has_attribute(span: &StoredSpan, expected_key: &str, expected_value: &str) -> bool { + span.attributes + .iter() + .any(|(key, value)| key == expected_key && value == expected_value) +} diff --git a/engine/tests/common/mod.rs b/engine/tests/common/mod.rs index b89215e792..7fddaeaf3f 100644 --- a/engine/tests/common/mod.rs +++ b/engine/tests/common/mod.rs @@ -1,3 +1,4 @@ +pub mod flow_helpers; pub mod http_helpers; pub mod queue_helpers; diff --git a/engine/tests/validated_flow_asset_e2e.rs b/engine/tests/validated_flow_asset_e2e.rs new file mode 100644 index 0000000000..899ef0fe8c --- /dev/null +++ b/engine/tests/validated_flow_asset_e2e.rs @@ -0,0 +1,601 @@ +mod common; + +use std::net::TcpListener as StdTcpListener; +use std::sync::Arc; +use std::time::Duration; + +use serde_json::{Value, json}; +use serial_test::serial; +use tokio::sync::mpsc; +use tokio::time::{sleep, timeout}; + +use iii::{ + engine::{Engine, EngineTrait, Handler, RegisterFunctionRequest}, + function::FunctionResult, + trigger::Trigger, + workers::{ + queue::QueueWorker, rest_api::HttpWorker, state::StateWorker, stream::StreamWorker, + traits::Worker, + }, +}; + +use common::flow_helpers::{HttpQueueStatePathSpec, HttpStreamPathSpec, ensure_flow_test_tracing}; +use common::queue_helpers::builtin_queue_config; + +fn reserve_local_port() -> u16 { + let listener = StdTcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let port = listener.local_addr().expect("listener addr").port(); + drop(listener); + port +} + +async fn start_http_worker(engine: Arc, port: u16) -> String { + let module = HttpWorker::create( + engine.clone(), + Some(json!({ + "host": "127.0.0.1", + "port": port, + "default_timeout": 5000, + })), + ) + .await + .expect("HttpWorker::create should succeed"); + + module.register_functions(engine); + module + .initialize() + .await + .expect("HttpWorker::initialize should succeed"); + + format!("http://127.0.0.1:{port}") +} + +async fn wait_for_route(client: &reqwest::Client, url: &str) { + let deadline = std::time::Instant::now() + Duration::from_secs(3); + while std::time::Instant::now() < deadline { + if client.get(url).send().await.is_ok() { + return; + } + sleep(Duration::from_millis(10)).await; + } + + panic!("HTTP route did not become reachable before timeout: {url}"); +} + +#[tokio::test] +#[serial] +async fn validated_flow_helper_builds_graph_asset_from_real_http_queue_state_path() { + let _flow_tracing = ensure_flow_test_tracing().await; + + let engine = Arc::new(Engine::new()); + + let queue_module = QueueWorker::create(engine.clone(), Some(builtin_queue_config())) + .await + .expect("QueueWorker::create should succeed"); + queue_module.register_functions(engine.clone()); + queue_module + .initialize() + .await + .expect("QueueWorker::initialize should succeed"); + + let state_module = StateWorker::create(engine.clone(), None) + .await + .expect("StateWorker::create should succeed"); + state_module.register_functions(engine.clone()); + state_module + .initialize() + .await + .expect("StateWorker::initialize should succeed"); + + let port = reserve_local_port(); + let base_url = start_http_worker(engine.clone(), port).await; + + let queue_topic = "validated-flow.events"; + let state_scope = "validated-flow.orders"; + let expected_order_id = "order-123"; + + let (state_event_tx, mut state_event_rx) = mpsc::unbounded_channel::(); + + let engine_for_http = engine.clone(); + engine.register_function_handler( + RegisterFunctionRequest { + function_id: "flow_poc::http_entry".to_string(), + description: Some("Entry function for validated flow POC".to_string()), + request_format: None, + response_format: None, + metadata: None, + }, + Handler::new(move |input: Value| { + let engine = engine_for_http.clone(); + async move { + let order_id = input + .get("body") + .and_then(|body| body.get("order_id")) + .and_then(Value::as_str) + .unwrap_or("missing-order-id") + .to_string(); + + match engine + .call( + "iii::durable::publish", + json!({ + "topic": queue_topic, + "data": { + "order_id": order_id, + "status": "processed" + } + }), + ) + .await + { + Ok(_) => FunctionResult::Success(Some(json!({ + "status_code": 202, + "body": { + "accepted": true, + "queue_topic": queue_topic + } + }))), + Err(err) => FunctionResult::Failure(err), + } + } + }), + ); + + let engine_for_consumer = engine.clone(); + engine.register_function_handler( + RegisterFunctionRequest { + function_id: "flow_poc::queue_consumer".to_string(), + description: Some("Queue consumer for validated flow POC".to_string()), + request_format: None, + response_format: None, + metadata: None, + }, + Handler::new(move |input: Value| { + let engine = engine_for_consumer.clone(); + async move { + let order_id = input + .get("order_id") + .and_then(Value::as_str) + .unwrap_or("missing-order-id") + .to_string(); + + match engine + .call( + "state::set", + json!({ + "scope": state_scope, + "key": order_id, + "value": { + "status": input + .get("status") + .and_then(Value::as_str) + .unwrap_or("unknown") + } + }), + ) + .await + { + Ok(_) => FunctionResult::Success(Some(json!({ "ok": true }))), + Err(err) => FunctionResult::Failure(err), + } + } + }), + ); + + engine.register_function_handler( + RegisterFunctionRequest { + function_id: "flow_poc::state_observer".to_string(), + description: Some("State observer for validated flow POC".to_string()), + request_format: None, + response_format: None, + metadata: None, + }, + Handler::new(move |input: Value| { + let tx = state_event_tx.clone(); + async move { + tx.send(input) + .expect("state event receiver should remain available"); + FunctionResult::Success(Some(json!({ "observed": true }))) + } + }), + ); + + engine + .trigger_registry + .register_trigger(Trigger { + id: "validated-flow-http".to_string(), + trigger_type: "http".to_string(), + function_id: "flow_poc::http_entry".to_string(), + config: json!({ + "api_path": "/validated-flow", + "http_method": "POST" + }), + worker_id: None, + metadata: None, + }) + .await + .expect("HTTP trigger should register"); + + engine + .trigger_registry + .register_trigger(Trigger { + id: "validated-flow-queue".to_string(), + trigger_type: "durable:subscriber".to_string(), + function_id: "flow_poc::queue_consumer".to_string(), + config: json!({ + "topic": queue_topic, + "queue_config": { + "max_retries": 2, + "backoff_ms": 50, + "poll_interval_ms": 25 + } + }), + worker_id: None, + metadata: None, + }) + .await + .expect("queue trigger should register"); + + engine + .trigger_registry + .register_trigger(Trigger { + id: "validated-flow-state".to_string(), + trigger_type: "state".to_string(), + function_id: "flow_poc::state_observer".to_string(), + config: json!({ + "scope": state_scope, + }), + worker_id: None, + metadata: None, + }) + .await + .expect("state trigger should register"); + + let client = reqwest::Client::new(); + let url = format!("{base_url}/validated-flow"); + wait_for_route(&client, &url).await; + + let response = client + .post(&url) + .json(&json!({ "order_id": expected_order_id })) + .send() + .await + .expect("HTTP request should succeed"); + + assert_eq!(response.status().as_u16(), 202); + + let response_body: Value = response + .json() + .await + .expect("response should be valid JSON"); + assert_eq!(response_body["accepted"], true); + assert_eq!(response_body["queue_topic"], queue_topic); + + let state_event = timeout(Duration::from_secs(5), state_event_rx.recv()) + .await + .expect("timed out waiting for state observer") + .expect("state observer channel should remain open"); + + assert_eq!(state_event["scope"], state_scope); + assert_eq!(state_event["key"], expected_order_id); + assert_eq!(state_event["new_value"]["status"], "processed"); + + let spec = HttpQueueStatePathSpec { + id: "validated-http-queue-state".to_string(), + name: "Validated HTTP Queue State Path".to_string(), + method: "POST".to_string(), + path: "/validated-flow".to_string(), + entry_function_id: "flow_poc::http_entry".to_string(), + queue_topic: queue_topic.to_string(), + queue_consumer_function_id: "flow_poc::queue_consumer".to_string(), + state_scope: state_scope.to_string(), + state_key: expected_order_id.to_string(), + state_observer_function_id: "flow_poc::state_observer".to_string(), + }; + + let asset = spec + .wait_for_validated_asset(&state_event, Duration::from_secs(5)) + .await + .expect("validated flow asset should be generated"); + + assert_eq!( + serde_json::to_value(&asset).expect("asset should serialize"), + json!({ + "id": "validated-http-queue-state", + "name": "Validated HTTP Queue State Path", + "nodes": [ + { + "id": "http:POST:/validated-flow", + "kind": "http_trigger", + "label": "POST /validated-flow" + }, + { + "id": "flow_poc::http_entry", + "kind": "function", + "label": "flow_poc::http_entry" + }, + { + "id": "queue:validated-flow.events", + "kind": "queue_topic", + "label": "validated-flow.events" + }, + { + "id": "flow_poc::queue_consumer", + "kind": "function", + "label": "flow_poc::queue_consumer" + }, + { + "id": "state:validated-flow.orders:order-123", + "kind": "state_slot", + "label": "validated-flow.orders/order-123" + }, + { + "id": "flow_poc::state_observer", + "kind": "function", + "label": "flow_poc::state_observer" + } + ], + "edges": [ + { + "id": "http:POST:/validated-flow->flow_poc::http_entry", + "kind": "http_trigger", + "source": "http:POST:/validated-flow", + "target": "flow_poc::http_entry", + "label": "POST /validated-flow" + }, + { + "id": "flow_poc::http_entry->queue:validated-flow.events", + "kind": "enqueue", + "source": "flow_poc::http_entry", + "target": "queue:validated-flow.events", + "label": "validated-flow.events" + }, + { + "id": "queue:validated-flow.events->flow_poc::queue_consumer", + "kind": "queue_trigger", + "source": "queue:validated-flow.events", + "target": "flow_poc::queue_consumer", + "label": "validated-flow.events" + }, + { + "id": "flow_poc::queue_consumer->state:validated-flow.orders:order-123", + "kind": "state_write", + "source": "flow_poc::queue_consumer", + "target": "state:validated-flow.orders:order-123", + "label": "validated-flow.orders/order-123" + }, + { + "id": "state:validated-flow.orders:order-123->flow_poc::state_observer", + "kind": "state_trigger", + "source": "state:validated-flow.orders:order-123", + "target": "flow_poc::state_observer", + "label": "validated-flow.orders" + } + ] + }) + ); +} + +#[tokio::test] +#[serial] +async fn validated_flow_helper_builds_graph_asset_from_real_http_stream_path() { + let _flow_tracing = ensure_flow_test_tracing().await; + + let engine = Arc::new(Engine::new()); + + let stream_module = StreamWorker::create(engine.clone(), Some(json!({ "port": 0 }))) + .await + .expect("StreamWorker::create should succeed"); + stream_module.register_functions(engine.clone()); + stream_module + .initialize() + .await + .expect("StreamWorker::initialize should succeed"); + + let port = reserve_local_port(); + let base_url = start_http_worker(engine.clone(), port).await; + + let stream_name = "validated-flow.stream"; + let group_id = "orders"; + let expected_item_id = "item-123"; + + let (stream_event_tx, mut stream_event_rx) = mpsc::unbounded_channel::(); + + let engine_for_http = engine.clone(); + engine.register_function_handler( + RegisterFunctionRequest { + function_id: "flow_poc::stream_http_entry".to_string(), + description: Some("Entry function for validated stream flow POC".to_string()), + request_format: None, + response_format: None, + metadata: None, + }, + Handler::new(move |input: Value| { + let engine = engine_for_http.clone(); + async move { + let item_id = input + .get("body") + .and_then(|body| body.get("item_id")) + .and_then(Value::as_str) + .unwrap_or("missing-item-id") + .to_string(); + + match engine + .call( + "stream::set", + json!({ + "stream_name": stream_name, + "group_id": group_id, + "item_id": item_id, + "data": { + "status": "processed" + } + }), + ) + .await + { + Ok(_) => FunctionResult::Success(Some(json!({ + "status_code": 202, + "body": { + "accepted": true, + "stream_name": stream_name + } + }))), + Err(err) => FunctionResult::Failure(err), + } + } + }), + ); + + engine.register_function_handler( + RegisterFunctionRequest { + function_id: "flow_poc::stream_observer".to_string(), + description: Some("Stream observer for validated flow POC".to_string()), + request_format: None, + response_format: None, + metadata: None, + }, + Handler::new(move |input: Value| { + let tx = stream_event_tx.clone(); + async move { + tx.send(input) + .expect("stream event receiver should remain available"); + FunctionResult::Success(Some(json!({ "observed": true }))) + } + }), + ); + + engine + .trigger_registry + .register_trigger(Trigger { + id: "validated-stream-http".to_string(), + trigger_type: "http".to_string(), + function_id: "flow_poc::stream_http_entry".to_string(), + config: json!({ + "api_path": "/validated-stream-flow", + "http_method": "POST" + }), + worker_id: None, + metadata: None, + }) + .await + .expect("HTTP trigger should register"); + + engine + .trigger_registry + .register_trigger(Trigger { + id: "validated-stream-trigger".to_string(), + trigger_type: "stream".to_string(), + function_id: "flow_poc::stream_observer".to_string(), + config: json!({ + "stream_name": stream_name, + "group_id": group_id, + "item_id": expected_item_id, + }), + worker_id: None, + metadata: None, + }) + .await + .expect("stream trigger should register"); + + let client = reqwest::Client::new(); + let url = format!("{base_url}/validated-stream-flow"); + wait_for_route(&client, &url).await; + + let response = client + .post(&url) + .json(&json!({ "item_id": expected_item_id })) + .send() + .await + .expect("HTTP request should succeed"); + + assert_eq!(response.status().as_u16(), 202); + + let response_body: Value = response + .json() + .await + .expect("response should be valid JSON"); + assert_eq!(response_body["accepted"], true); + assert_eq!(response_body["stream_name"], stream_name); + + let stream_event = timeout(Duration::from_secs(5), stream_event_rx.recv()) + .await + .expect("timed out waiting for stream observer") + .expect("stream observer channel should remain open"); + + assert_eq!(stream_event["streamName"], stream_name); + assert_eq!(stream_event["groupId"], group_id); + assert_eq!(stream_event["id"], expected_item_id); + assert_eq!(stream_event["event"]["type"], "create"); + assert_eq!(stream_event["event"]["data"]["status"], "processed"); + + let spec = HttpStreamPathSpec { + id: "validated-http-stream".to_string(), + name: "Validated HTTP Stream Path".to_string(), + method: "POST".to_string(), + path: "/validated-stream-flow".to_string(), + entry_function_id: "flow_poc::stream_http_entry".to_string(), + stream_name: stream_name.to_string(), + group_id: group_id.to_string(), + item_id: expected_item_id.to_string(), + stream_observer_function_id: "flow_poc::stream_observer".to_string(), + }; + + let asset = spec + .wait_for_validated_asset(&stream_event, Duration::from_secs(5)) + .await + .expect("validated stream flow asset should be generated"); + + assert_eq!( + serde_json::to_value(&asset).expect("asset should serialize"), + json!({ + "id": "validated-http-stream", + "name": "Validated HTTP Stream Path", + "nodes": [ + { + "id": "http:POST:/validated-stream-flow", + "kind": "http_trigger", + "label": "POST /validated-stream-flow" + }, + { + "id": "flow_poc::stream_http_entry", + "kind": "function", + "label": "flow_poc::stream_http_entry" + }, + { + "id": "stream:validated-flow.stream:orders:item-123", + "kind": "stream_item", + "label": "validated-flow.stream/orders/item-123" + }, + { + "id": "flow_poc::stream_observer", + "kind": "function", + "label": "flow_poc::stream_observer" + } + ], + "edges": [ + { + "id": "http:POST:/validated-stream-flow->flow_poc::stream_http_entry", + "kind": "http_trigger", + "source": "http:POST:/validated-stream-flow", + "target": "flow_poc::stream_http_entry", + "label": "POST /validated-stream-flow" + }, + { + "id": "flow_poc::stream_http_entry->stream:validated-flow.stream:orders:item-123", + "kind": "stream_write", + "source": "flow_poc::stream_http_entry", + "target": "stream:validated-flow.stream:orders:item-123", + "label": "validated-flow.stream/orders/item-123" + }, + { + "id": "stream:validated-flow.stream:orders:item-123->flow_poc::stream_observer", + "kind": "stream_trigger", + "source": "stream:validated-flow.stream:orders:item-123", + "target": "flow_poc::stream_observer", + "label": "validated-flow.stream" + } + ] + }) + ); +}