Skip to content

[RFC] Architecture Design for Cognitive Workflow Engine MVP (Idea 4) #1583

@hritikkumarpradhan

Description

@hritikkumarpradhan

1. Context & Objective

Hi team, as part of my Pre-GSoC preparation for the Cognitive Workflow Engine (Idea 4), I am aiming to deliver the core foundational MVP: a declarative DSL parser, a DAG executor utilizing Kahn's algorithm, and a sqlx PostgreSQL state ledger. Before initiating the mofa-workflow crate, I want to align with the core maintainers on the foundational data structures and execution flow.

Relationship to Existing Code

I have studied the current workflow infrastructure in mofa-foundation::workflow and the DSL parser in mofa-foundation::workflow::dsl. The existing NodeDefinition enum (tagged with #[serde(tag = "type")]) and WorkflowDslParser provide an excellent foundation. The new mofa-workflow crate will:

  • Extend, not replace the existing DSL schema with three new cognitive node types
  • Follow the same serde tagging strategy (#[serde(tag = "type", rename_all = "snake_case")])
  • Integrate with the existing error_stack::Report<KernelError> pattern from mofa-kernel::error
  • Leverage the token_budget module from mofa-foundation::llm::token_budget

2. The Declarative DSL & UI Separation

To ensure the core orchestration logic remains version-control friendly and mathematically pure, I propose strictly separating execution logic from visual metadata.

  • workflow.yaml - Contains only the strict AST (Nodes, Edges, Parameters)
  • workflow.ui.yaml - Contains the 2D spatial coordinates (X/Y axis for React Flow)

Rationale

This separation means git diff on workflow.yaml shows only semantic changes to orchestration logic, never noise from users dragging nodes around in a visual editor. The .ui.yaml sidecar is optional - headless CI/CD environments can ignore it entirely.


2.1 AST Node Type Schema (YAML -> Rust serde Enum)

The following three core node types are designed to deserialize directly into a Rust #[serde(tag = "type")] enum, consistent with MoFA's existing NodeDefinition pattern.

AgentInvoke Node

# Invokes a registered MoFA agent by name.
# Maps to: CognitiveNode::AgentInvoke { .. }
- type: agent_invoke
  node_id: "node_summarize_report"
   agent_name: "report_summarizer_v2"
   inputs:
     document_url: "{{outputs.fetch_doc.url}}"
     max_length: 500
   config:                              # optional per-node overrides
     timeout_ms: 30000
     retry_policy:
        max_retries: 2
        retry_delay_ms: 1000
        exponential_backoff: true
- ```
#### `LlmInference` Node

```yaml
# Direct LLM inference with token budget enforcement.
# The max_token_budget field integrates with mofa-foundation's
# ContextWindowManager for cost control at the DAG level.
# Maps to: CognitiveNode::LlmInference { .. }
- type: llm_inference
   node_id: "node_generate_analysis"
   model: "gpt-4o"
   prompt_template: |
     Analyze the following data and provide insights:
    Data: {{outputs.node_summarize_report.summary}}

    Respond in structured JSON with keys: findings, confidence, recommendations.
  max_token_budget: 4096             # enforced via ContextWindowManager
  config:
    temperature: 0.3
    timeout_ms: 60000

ConditionalBranch Node

# Evaluates a condition expression to route execution.
# Supports JMESPath expressions or simple string matching.
# Maps to: CognitiveNode::ConditionalBranch { .. }
- type: conditional_branch
   node_id: "node_quality_gate"
   condition_expression: "outputs.node_generate_analysis.confidence > 0.85"
   on_true_edge: "node_publish_results"
   on_false_edge: "node_human_review"
- ```
#### Corresponding Rust Type

```rust
/// Cognitive workflow node types for the `mofa-workflow` crate.
///
/// These extend the existing `NodeDefinition` pattern in `mofa-foundation`
/// with three new workflow-engine-specific node types.
///
/// Deserialized from YAML/TOML via `serde` with internally tagged representation.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#909 pub enum CognitiveNode {
    /// Invoke a registered MoFA agent by name.
    AgentInvoke {
        node_id: String,
        agent_name: String,
        /// Key-value input bindings supporting `{{outputs.node_id.field}}` interpolation.
        #[serde(default)]
        inputs: HashMap<String, serde_json::Value>,
        #[serde(default)]
        config: Option<NodeConfigDef>,
    },

    /// Direct LLM inference with token budget enforcement.
    LlmInference {
        node_id: String,
        model: String,
        /// Jinja-style prompt template with `{{outputs.*}}` interpolation.
        prompt_template: String,
        /// Hard ceiling on token usage, enforced via `ContextWindowManager`.
        /// Aligns with `mofa-foundation::llm::token_budget`.
        max_token_budget: u32,
        #[serde(default)]
        config: Option<LlmInferenceConfig>,
    },

    /// Conditional branch routing based on expression evaluation.
    ConditionalBranch {
        node_id: String,
        /// JMESPath expression or simple comparison string.
        /// Evaluated against the DAG's accumulated output context.
        condition_expression: String,
        /// Edge taken when condition evaluates to `true`.
        on_true_edge: String,
        /// Edge taken when condition evaluates to `false`.
        on_false_edge: String,
    },
}

/// LLM-specific node configuration (extends base NodeConfigDef).
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LlmInferenceConfig {
    #[serde(default)]
    pub temperature: Option<f32>,
    #[serde(default)]
    pub timeout_ms: Option<u64>,
    #[serde(default)]
    pub retry_policy: Option<RetryPolicy>,
}

2.2 UI vs Core - YAML Separation Example

File 1: example_workflow.yaml (Execution Logic Only)

# example_workflow.yaml
# This file contains ONLY orchestration logic - no visual metadata.
# Safe to diff, review, and version-control.

metadata:
  id: "customer_feedback_pipeline"
  name: "Customer Feedback Analysis Pipeline"
  version: "1.0.0"
  description: "Summarizes customer feedback, then conditionally routes to human review."

nodes:
  - type: agent_invoke
      node_id: "node_ingest_feedback"
      agent_name: "feedback_ingestion_agent"
      inputs: source: "kafka://feedback-topic"
        batch_size: 50
  - type: llm_inference
      node_id: "node_sentiment_analysis"
      model: "gpt-4o-mini"
      prompt_template: |
        Classify the sentiment of each feedback item as positive, neutral, or negative.
        Provide a confidence score (0.0-1.0) for each classification.
      Feedback batch:
      {{outputs.node_ingest_feedback.batch}}
    max_token_budget: 8192

edges:
  - from: "node_ingest_feedback"
  -     to: "node_sentiment_analysis"
  - ```
#### File 2: `example_workflow.ui.yaml` (Visual Metadata Only)

```yaml
# example_workflow.ui.yaml
# Sidecar file for React Flow visual editor coordinates.
# This file is:
#   - Auto-generated by the visual editor
#   - OPTIONAL for headless execution
#   - Safe to .gitignore in CI/CD environments

schema_version: 1
workflow_ref: "customer_feedback_pipeline"  # must match metadata.id in workflow.yaml

node_positions:
  node_ingest_feedback:
    x: 120
    y: 200
    width: 280
    height: 80
    collapsed: false
    style:
      background: "#1a1a2e"
      border_color: "#e94560"

  node_sentiment_analysis:
    x: 500
    y: 200
    width: 320
    height: 100
    collapsed: false
    style:
      background: "#1a1a2e"
      border_color: "#0f3460"

viewport:
  x: 0
  y: 0
  zoom: 1.0

edge_styles:
  node_ingest_feedback->node_sentiment_analysis:
    animated: true
    stroke_color: "#16213e"
    label_position: "center"

3. Execution Engine & Error Handling

The MVP will utilize Kahn's algorithm for pre-execution cycle detection on the parsed DAG. For parallel branch execution, I plan to use tokio::task::JoinSet.

Execution Flow

+-----------------+
|  Parse YAML DSL |  <- WorkflowDslParser::from_yaml()
+--------+--------+
         |
         v
+-----------------+
|  Build DAG      |  <- Construct adjacency list + in-degree map
|  (petgraph)     |
+--------+--------+
         |
         v
+-----------------+
|  Kahn's Sort    |  <- Detect cycles, produce topological order
|  + Cycle Check  |
+--------+--------+
         |
         v
+-----------------+
|  Execute Nodes  |  <- tokio::task::JoinSet for parallel branches
|  (async)        |     sqlx flush to Postgres on each state change
+--------+--------+
         |
         v
+-----------------+
|  State Ledger   |  <- node_executions rows updated per-node
|  (PostgreSQL)   |     WAITING_HITL pauses execution for human input
+-----------------+

Error Handling Strategy

Alignment Question: I notice the repository is standardizing on the error_stack crate (v0.6.0) for unified error handling - see KernelResult<T> and CliResult<T>. Should the new mofa-workflow executor adopt error_stack from Day 1 to natively map execution failures into the node_executions.error_trace ledger?

Proposed error type:

use error_stack::Report;
use thiserror::Error;

#[derive(Debug, Error)]
#909 pub enum WorkflowError {
    #[error("DAG contains a cycle involving node: {node_id}")]
    CycleDetected { node_id: String },

    #[error("Node execution failed: {node_id}")]
    NodeExecutionFailed { node_id: String },

    #[error("Token budget exceeded for node {node_id}: used {used}, limit {limit}")]
    TokenBudgetExceeded { node_id: String, used: u32, limit: u32 },

    #[error("Condition expression evaluation failed: {expression}")]
    ConditionEvalError { expression: String },

    #[error("State ledger error: {0}")]
    LedgerError(#[from] sqlx::Error),

    #[error("DSL parse error: {0}")]
    ParseError(String),
}

/// Workflow result type using error_stack for rich context propagation.
/// Matches the KernelResult<T> pattern.
pub type WorkflowResult<T> = Result<T, Report<WorkflowError>>;

4. PostgreSQL State Ledger (sqlx)

To support pausing, Human-in-the-Loop (HITL), and crash recovery, the in-memory execution state will be flushed to Postgres via sqlx. The schema is designed for sqlx::query_as! compile-time checked queries.

Custom Enum Type

-- Execution state machine.
-- Maps to a Rust enum via sqlx::Type derive.
-- Includes WAITING_HITL for Human-in-the-Loop pause/resume.
CREATE TYPE execution_status AS ENUM (
    'PENDING',          -- Scheduled but not yet started
    'RUNNING',          -- Currently executing
    'PAUSED',           -- Manually paused (e.g., by operator)
    'FAILED',           -- Terminal failure (after retries exhausted)
    'COMPLETED',        -- Successful completion
    'WAITING_HITL'      -- Blocked on human-in-the-loop approval
);

COMMENT ON TYPE execution_status IS
    'State machine for workflow and node execution lifecycle. '
    'Transitions: PENDING -> RUNNING -> {COMPLETED, FAILED, PAUSED, WAITING_HITL}. '
    'WAITING_HITL -> RUNNING (on human approval). '
    'PAUSED -> RUNNING (on manual resume).';

Table: workflows

-- Workflow definitions registry.
-- Stores metadata for each registered workflow template.
CREATE TABLE workflows (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name        TEXT NOT NULL UNIQUE,
    description TEXT NOT NULL DEFAULT '',
    -- Raw YAML AST stored for re-parsing and versioning.
    -- Stored as JSONB (parsed from YAML) for indexed querying.
    definition  JSONB NOT NULL DEFAULT '{}',
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_workflows_name ON workflows (name);
CREATE INDEX idx_workflows_created_at ON workflows (created_at DESC);

COMMENT ON TABLE workflows IS
    'Registry of workflow definitions. Each row is an immutable snapshot '
    'of a workflow AST. Versioning is handled by inserting new rows.';

Table: workflow_executions

-- Execution instances of a workflow.
-- Each row represents one run of a workflow definition.
CREATE TABLE workflow_executions (
    id                UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_id       UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
    status            execution_status NOT NULL DEFAULT 'PENDING',
    -- Aggregate token usage across all LlmInference nodes in this execution.
    -- Updated atomically via: UPDATE ... SET token_usage_total = token_usage_total + $delta
    token_usage_total BIGINT NOT NULL DEFAULT 0,
    -- Execution context (input parameters, environment variables, etc.)
    context           JSONB NOT NULL DEFAULT '{}',
    -- Error summary if status = FAILED (high-level; details in node_executions)
    error_summary     TEXT,
    started_at        TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at        TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at      TIMESTAMPTZ
);

CREATE INDEX idx_wf_exec_workflow_id ON workflow_executions (workflow_id);
CREATE INDEX idx_wf_exec_status ON workflow_executions (status);
CREATE INDEX idx_wf_exec_started_at ON workflow_executions (started_at DESC);

COMMENT ON TABLE workflow_executions IS
    'Execution instances. One row per workflow run. '
    'token_usage_total aggregates LLM token consumption for cost tracking.';

Table: node_executions

-- Per-node execution records within a workflow run.
-- Provides granular observability and crash-recovery checkpointing.
CREATE TABLE node_executions (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    execution_id    UUID NOT NULL REFERENCES workflow_executions(id) ON DELETE CASCADE,
    -- Matches the node_id from the YAML AST.
    node_id         TEXT NOT NULL,
    status          execution_status NOT NULL DEFAULT 'PENDING',
    -- Structured output payload from node execution.
    -- For LlmInference nodes, includes the raw LLM response.
    output_payload  JSONB,
    -- Full error_stack trace serialized as TEXT.
    -- Populated when status = FAILED.
    -- Format: error_stack::Report<WorkflowError> debug output.
    error_trace     TEXT,
    -- Token usage for this specific node (LlmInference nodes only).
    token_usage     INT DEFAULT 0,
    started_at      TIMESTAMPTZ,
    completed_at    TIMESTAMPTZ,

    -- Prevent duplicate node executions within the same workflow run.
    UNIQUE (execution_id, node_id)
);

CREATE INDEX idx_node_exec_execution_id ON node_executions (execution_id);
CREATE INDEX idx_node_exec_status ON node_executions (status);
CREATE INDEX idx_node_exec_node_id ON node_executions (node_id);

COMMENT ON TABLE node_executions IS
    'Per-node execution log. error_trace stores serialized error_stack::Report '
    'output for post-mortem debugging. UNIQUE(execution_id, node_id) prevents '
    'duplicate entries during crash recovery replays.';

Corresponding Rust sqlx::Type Mapping

/// Execution status enum, mapped to PostgreSQL `execution_status` type.
/// Compatible with sqlx::query_as! compile-time checked queries.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "execution_status", rename_all = "SCREAMING_SNAKE_CASE")]
#909 pub enum ExecutionStatus {
  nding,
    Running,
    Paused,
    Failed,
    Completed,
    WaitingHitl,
}

5. Token Budgeting & Resource Management

I see recent PRs introducing token budget tracking helpers to the microkernel - specifically the ContextWindowManager with its TokenEstimator trait and ContextWindowPolicy enum.

As seen in the AST design above, I plan to integrate a max_token_budget directly into the MVP's LlmInference node to ensure runaway LLM costs are restricted at the DAG orchestration level, not just at the individual agent level. The workflow executor will:

  1. Instantiate a ContextWindowManager per LlmInference node with the declared max_token_budget
    1. Enforce ContextWindowPolicy::Strict mode - any prompt exceeding the budget halts the node and sets status to FAILED
    1. Accumulate per-node token_usage into workflow_executions.token_usage_total via an atomic SQL UPDATE
// Pseudocode for token budget enforcement in the executor
let manager = ContextWindowManager::new(node.max_token_budget as usize)
    .with_policy(ContextWindowPolicy::Strict)
    .with_estimator(Box::new(CharBasedEstimator::default()));

let trim_result = manager.apply(&assembled_messages);
if trim_result.estimated_tokens > node.max_token_budget as usize {
    return Err(Report::new(WorkflowError::TokenBudgetExceeded {
        node_id: node.node_id.clone(),
        used: trim_result.estimated_tokens as u32,
        limit: node.max_token_budget,
    }));
}

6. Proposed Crate Structure

Following MoFA's architecture layering, the new crate will sit alongside existing crates:

crates/
|-- mofa-kernel/         # Trait definitions (existing)
|-- mofa-foundation/     # Concrete impls (existing)
|-- mofa-workflow/       # NEW - Cognitive Workflow Engine
|   |-- Cargo.toml
|   +-- src/
|       |-- lib.rs
|       |-- error.rs         # WorkflowError + WorkflowResult<T>
|       |-- ast/
|       |   |-- mod.rs
|       |   |-- nodes.rs     # CognitiveNode enum
|       |   +-- parser.rs    # YAML/TOML -> CognitiveNode deserialization
|       |-- engine/
|       |   |-- mod.rs
|       |   |-- dag.rs       # petgraph DAG construction + Kahn's algorithm
|       |   +-- executor.rs  # tokio::task::JoinSet parallel execution
|       |-- ledger/
|       |   |-- mod.rs
|       |   |-- models.rs    # sqlx row types + ExecutionStatus enum
|       |   |-- queries.rs   # Compile-time checked sqlx queries
|       |   +-- migrations/  # SQL migration files
|       +-- budget/
|           +-- mod.rs       # ContextWindowManager integration
+-- ...

Key Dependencies

[dependencies]
# Workspace deps
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
error-stack = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }

# Crate-specific
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "uuid", "chrono", "json"] }
petgraph = "0.6"
jmespath = "0.3"       # For condition_expression evaluation

# Internal
mofa-kernel = { path = "../mofa-kernel" }
mofa-foundation = { path = "../mofa-foundation" }

7. Questions for Maintainers

@lijingrs @yangrudan - I would highly appreciate your architectural sign-off or feedback on this approach, specifically:

  1. UI Sidecar Pattern: Does the .ui.yaml sidecar approach align with MoFA's vision for the visual editor? Should the format be JSON instead of YAML for easier React Flow interop?
  2. error_stack Enforcement: Should mofa-workflow enforce error_stack from Day 1 to natively serialize execution failures into the node_executions.error_trace column?
  3. Crate Boundary: Should CognitiveNode types live in mofa-kernel (as trait/type definitions) with implementations in mofa-workflow? Or is a self-contained mofa-workflow crate acceptable for the MVP?
  4. petgraph vs Custom: The existing WorkflowGraph in mofa-foundation uses a custom adjacency list. Should I adopt petgraph for the new engine, or extend the existing infrastructure?
  5. Condition Expression Language: JMESPath is lightweight but limited. Would you prefer a more expressive embedded language (e.g., Rhai, which is already a workspace dependency)?

Metadata

Metadata

Assignees

No one assigned

    Labels

    area/modelsModel selection, behavior, responsesdependenciesPull requests that update a dependency fileduplicateThis issue or pull request already existserror-handlingkind/featureNew capabilitypriority/p1High impactquestionFurther information is requestedrustPull requests that update rust code

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions