Skip to content

Proposal: Support Configurable Orchestration of Inference Pipeline #827

@PadawanZH

Description

@PadawanZH

title: Support Configurable Orchestrated Inference Pipeline
authors:

  • PadawanZH
    reviewers:
  • TBD
    approvers:
  • TBD

creation-date: 2026-03-19


Support Configurable Orchestrated Inference Pipeline

Summary

This proposal introduces a configurable and orchestratable inference pipeline framework for Kthena Router that transforms the current hardcoded inference flow processing into a flexible, DAG-based execution model.

By decomposing inference flows into atomic Phases (such as Schedule, Execute, Encode, Prefill, Decode) and defining execution dependencies through configuration files, this framework enables support for diverse deployment models including PD separation/hybrid, EPD (Encode-Prefill-Decode), and future EPDG patterns without code modifications. The solution supports parallel execution of independent phases, implements comprehensive tracing and observability features, and provides a plugin mechanism for custom processing steps.

This approach can reduces maintenance +overhead while maximizing code reuse across different inference deployment scenarios, addressing the growing complexity of multimodal inference where Encode, Prefill, Decode, and potentially Generator steps may be deployed separately or executed in parallel.

Motivation

Currently, Kthena Router supports inference request processing flows for PD separation mode and PD hybrid mode in a hardcoded manner. The vLLM and SGLang communities are trending towards multimodal separation deployment scenarios. Multimodal scenarios have derived the EPD deployment form with three separated roles: Encode, Prefill, and Decode. In the future, the Generator step might also be separated to form the EPDG inference form. Hardcoded inference flows require us to maintain a complete set of inference flows for each deployment form, making code and functionality reuse impossible. Maintenance overhead will Dramatically increase with the diversification of inference steps and roles.

Additionally, in scenarios like SGLang and asynchronous transmission, the execution of inference steps such as E, P, D, G is no longer serial but may involve parallel execution of inference steps (for example, SGLang needs to schedule Prefill and Decode instances simultaneously and send inference requests to both P and D at the same time).

In summary, we can add support more flexible inference flow orchestration capabilities. By configuring atomic capabilities and execution step DAGs, it can provide different inference flow processing pipelines for different deployment forms while maximizing code reuse and reducing maintenance overhead.

Furthermore, supporting plugins to implement custom processing steps can significantly enhance the extensibility of inference flows.

Goals

  1. Functional Goals:
    1. Support configuration-file-based orchestratable inference execution pipeline generation and execution framework, supporting parallel execution based on dependencies
    2. Based on the above execution framework, support vLLM's PD hybrid and PD separation
    3. Support experimental EPD separation mode, with support for skipping E execution when no images are present
  2. Observability Goals:
    1. Support step trace logging, which when enabled via switch, prints execution progress and step execution results for each request in logs

Non-Goals

Proposal

Currently, Kthena-router implements PD hybrid and PD separation processing flows in a hardcoded manner (see proxyModelEndpoint() and proxyToPDDisaggregated() in pkg/kthena-router/router/router.go).

This proposal suggests implementing an orchestratable inference processing flow. By implementing common Phase interface classes for scheduling, request construction, request sending, and response handling required by various inference roles in the inference flow (such as Encode, Prefill, Decode), and orchestrating them into DAG execution Pipelines through configuration files, it becomes possible to assemble inference flows for different deployment forms through configuration. This enhances code reuse of inference flows and reduces maintenance complexity for adapting to multiple types of inference role nodes and combined inference flows. It also serves as the foundational capability for supporting complex inference flows (such as parallel PD execution for asynchronous transmission).

As shown in the figure below, pipeline is generated through configuration-driven approach:

┌─────────────────────────────────────────────────────────────────┐
│                Configuration-Driven Pipeline Generation         │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────┐
│  pipeline.yaml  │
│ ┌─────────────┐ │
│ │phases:      │ │          ┌──────────────────┐
│ │ - name: pre │ │   ◄─────>│   PhaseRegistry  │
│ │   type:     │ │   │      │                  │
│ │   Schedule  │ │   │      │ ┌────────────┐   │
│ │             │ │   │      │ │Schedule    │   │
│ │ - name: pre │ │   │      │ │Phase       │   │
│ │   type:     │ │   │      │ └─────┬──────┘   │
│ │   Execute   │ │   │      │       │          │
│ │   depends:  │ │   │      │ ┌─────▼──────┐   │
│ │   pre       │ │   │      │ │Execute     │   │
│ │             │ │   │      │ │Phase       │   │
│ │ - name: dec │ │   │      │ └────────────┘   │
│ │   depends:  │ │   │      │                  │
│ │   pre       │ │   │      └──────────────────┘
│ └─────────────┘ │   │
                      │
                      │      ┌──────────────────┐
                      +────► │   Pipeline       │
                             │                  │
                             │ ┌────────────┐   │
                             │ │ Phase DAG  │   │
                             │ │            │   │
                             │ │ pre──────► │   │
                             │ │ Schedule   │   │
                             │ │     │      │   │
                             │ │     ▼      │   │
                             │ │ pre──────► │   │
                             │ │ Execute    │   │
                             │ │     │      │   │
                             │ │     ▼      │   │
                             │ │ dec──────► │   │
                             │ │ Execute    │   │
                             │ └────────────┘   │
                             └──────────────────┘

As shown in the figure below, intermediate results of each stage are passed through PipelineContext when executing the DAG:

  1. Through the scheduling stage, schedule Prefill and Decode instances, find low-load instances, and write to context
  2. Prefill execution stage reads prefill scheduling results from context, constructs and sends requests, reads kv_transfer_params from response and stores in context (the PromptToken count from usage in prefill response can also be stored for more accurate Decode scheduling)
  3. Decode execution stage reads decode scheduling results from context, constructs and sends requests, and writes response back to client
┌─────────────────────────────────────────────────────────────────┐
│                     PipelineContext                            │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ - ClientGinCtx: gin.Context                                 │ │
│ │ - ParsedRequest: interface{}                                │ │
│ │ - PreScheduleResult: map[string]interface{}                 │ │
│ │ - kv_transfer_params: map[string]interface{}                │ │
│ │ - DecodeScheduleResult: map[string]interface{}              │ │
│ │ - SubRequests: sync.Map                                     │ │
│ │ - ...                                                       │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Write schedule results                            Write response to client
( skip_info,              (kv_transfer_params,      ( final_tokens,
  selected_instances )     prompt token count )          statistics )
        ▲                     ▲                         ▲
        │                     │                         │
        │                     │                         │
        │                     │                         │

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  PreSchedule    │    │  PreExecute     │    │  DecExecute     │
│                 │    │                 │    │                 │
│ Schedule Prefill│    │ 1. Read Pre     │    │ 1. Read Decode  │
│ Schedule Decode │    │    Schedule     │    │ Schedule Res    │
│                 │    │    Result       │    │ 2. Read kv_     │
│                 │    │ 2. Send Prefill │    │    transfer_    │
│                 │    │    Req          │    │    params       │
│                 │    │ 3. Save transfer│    │ 3. Send Decode  │
│                 │    │    parameters   │    │    Req          │
│                 │    │                 │    │ 4. Write client │
│                 │    │                 │    │    response     │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Execution flow:────────────────────────────────────────────────────→

User Stories (Optional)

Story 1

A serial PD separation inference flow is orchestrated as follows, which can be generated through configuration:

  1. Preprocess: Parse user request body, set initial prompt length
  2. Schedule_Prefill: Schedule Prefill instance, select appropriate prefill instance address based on scheduling algorithm
  3. Execute_prefill: (1) Construct Prefill request (2) Send request based on Schedule_Prefill scheduling result (3) Process Prefill response, extract kv_transfer_params from body and calibrate promptLen based on usage
  4. Schedule_Decode: Schedule Decode instance, select appropriate Decode instance address based on scheduling algorithm
  5. Execute_Decode: (1) Construct Decode request (2) Send request based on Schedule_Decode scheduling result (3) Process Decode response, write response body content to client
  6. PostProcess: Request post-processing, calculate statistics, print logs, update metrics. Execute error handling logic if previous processing flow encountered errors
graph TB
    A[Preprocess] --> D[Schedule_Prefill]
    D --> E[Execute_Prefill]
    E --> F[Schedule_Decode]
    F --> G["Execute_Decode"]
    G --> End["PostProcess"]
Loading
Story 2

Similarly, an SGLang PD separation inference flow can be orchestrated and assembled through configuration:

  1. SGLang needs to schedule both Prefill and Decode instance addresses simultaneously
  2. SGLang's Prefill and Decode requests can be sent simultaneously. Configure SGLang request construction method in the request construction step of Execute_Prefill/Execute_Decode Phase, i.e., add bootstrap-related parameters to request body based on scheduling results
graph TB
    A[Preprocess] --> B[Schedule_Prefill]
    A --> C[Schedule_Decode]
    B --> dummy
    C --> dummy
    dummy --> D[Execute_Prefill]
    dummy --> E["Execute_Decode"]
    D --> End["PostProcess"]
    E --> End
Loading

Notes/Constraints/Caveats (Optional)

Risks and Mitigations

Design Details

Class Diagram:

classDiagram
    class Pipeline {
        -servingMode: enum
        -phases: PhaseDAG
        -config: PipelineConfig
        +ExecutePipeline(ctx: PipelineContext): void
    }

    class PipelineContext {
        +ClientGinCtx: *gin.Context
        +PipelineErr: *InferError
        +Tracer: *Tracer
        +SubRequests: sync.Map
    }

    class Phase {
        <<interface>>
        +PhaseName(): string
        +PhaseTypeName(): string
        +Init(phaseCfg: PhaseConfig): error
        +ExecutePhase(ctx: PipelineContext): *InferError
    }

    class SchedulePhase {
        -scheduleConfig: ScheduleConfig
        +ExecutePhase(ctx: PipelineContext): RSError
        +PhaseTypeName(): string
        +RelateInstanceType(): string
    }

    class ExecuteConfig {
        +string instance_type
        +string pre_process_request_func
        +string[] post_process_funcs
        +string[] post_phase_funcs
    }

    class ScheduleConfig {
        +string instance_type
        +string skip_policy
        +string[] request_load_provider
    }

    class ExecutePhase {
        -executeConfig: ExecuteConfig
        +ExecutePhase(ctx: PipelineContext): RSError
        +PhaseTypeName(): string
        +RelateInstanceType(): string
    }

    class PhaseRegistry {
        -phaseRegistry: map[string]PhaseFactoryFunc
        +RegisterPhase(phaseFunc: PhaseFactoryFunc): error
        +CreatePhase(typeName: string): Phase
    }

    Phase <|-- SchedulePhase
    Phase <|-- ExecutePhase
    Pipeline "1" *-- "n" Phase : contains
    ExecutePhase "1" *-- "1" ExecuteConfig : contains
    SchedulePhase "1" *-- "1" ScheduleConfig : contains
    Pipeline --> PipelineContext : uses
    PhaseRegistry "1" *-- "1" Phase : creates

    note for PhaseRegistry "All implemented Phases are registered at startup so Pipeline can be generated based on configuration files"
    note for Phase "All Phases must implement this interface"
    note for Pipeline "Responsible for organizing and executing the entire inference flow"
    note for SchedulePhase "Prebuilt Phase, responsible for scheduling inference instances"
    note for ExecutePhase "Prebuilt Phase, responsible for executing inference requests and processing inference responses"
Loading

Example of PD disaggregate inference running in serial (vLLM):

A PD separation flow is processed as follows:

sequenceDiagram
    participant Client
    participant Pipeline
    participant PhaseRegistry
    participant PipelineContext
    participant PreSchedule
    participant PreExecute
    participant DecSchedule
    participant DecExecute

    Note over Client, DecExecute: Initialization phase
    Pipeline->>PhaseRegistry: Get Phase types
    PhaseRegistry-->>Pipeline: Return Phase factory functions
    Pipeline->>Pipeline: Create Phase instances and configure DAG

    Note over Client, DecExecute: Inference request processing
    Client->>Pipeline: ExecutePipeline(request)
    Pipeline->>PipelineContext: Create execution context

    Note over Client, DecExecute: Prefill phase
    activate PreSchedule
    Pipeline->>PreSchedule: ExecutePhase(ctx)
    PreSchedule->>PipelineContext: Schedule Prefill instance
    PreSchedule-->>Pipeline: Execution completed
    deactivate PreSchedule

    activate PreExecute
    Pipeline->>PreExecute: ExecutePhase(ctx) [depends on PreSchedule]
    PreExecute->>PipelineContext: Prepare request(prefill_prepare_request)
    PreExecute->>PipelineContext: Execute Prefill inference
    PreExecute->>PipelineContext: Process response(prefill_post_response)
    PreExecute-->>Pipeline: Execution completed
    deactivate PreExecute

    Note over Client, DecExecute: Decode phase
    activate DecSchedule
    Pipeline->>DecSchedule: ExecutePhase(ctx) [depends on PreExecute]
    DecSchedule->>PipelineContext: Schedule Decode instance
    DecSchedule-->>Pipeline: Scheduling completed
    deactivate DecSchedule

    activate DecExecute
    Pipeline->>DecExecute: ExecutePhase(ctx) [depends on DecSchedule]
    DecExecute->>PipelineContext: Prepare request(decode_prepare_request)
    DecExecute->>PipelineContext: Execute Decode inference
    DecExecute->>PipelineContext: Process response(decode_post_response)
    DecExecute-->>Pipeline: Execution completed
    deactivate DecExecute

    Note over Client, DecExecute: Completion processing
    Pipeline->>PipelineContext: Execute success Phase
    Pipeline-->>Client: Return inference result
Loading

Example of PD disaggregate inference running in parallel (SGLang):

sequenceDiagram
    participant Pipeline
    participant PipelineContext
    participant PreSchedule
    participant DecSchedule
    participant PreExecute
    participant DecExecute

    Note over Pipeline, DecExecute: Parallel scheduling phase
    Pipeline->>PipelineContext: Create execution context

    par Parallel execution
        activate PreSchedule
        Pipeline->>PreSchedule: ExecutePhase(ctx)
        PreSchedule-->>Pipeline: Scheduling completed
        deactivate PreSchedule
    and Simultaneous execution
        activate DecSchedule
        Pipeline->>DecSchedule: ExecutePhase(ctx)
        DecSchedule-->>Pipeline: Scheduling completed
        deactivate DecSchedule
    end

    Note over Pipeline, DecExecute: Wait and execute inference
    activate PreExecute
    Pipeline->>PreExecute: ExecutePhase(ctx)
    PreExecute->>PipelineContext: Execute Prefill inference
    PreExecute-->>Pipeline: Prefill completed
    deactivate PreExecute

    activate DecExecute
    Pipeline->>DecExecute: ExecutePhase(ctx)
    DecExecute->>PipelineContext: Execute Decode inference
    DecExecute-->>Pipeline: Decode completed
    deactivate DecExecute

    Pipeline-->>PipelineContext: Return result
Loading

Execution Flow

1. Initialization Phase
Load configuration file → Validate configuration → Create Phase instances → Build DAG → Topological sort
2. Execution Phase
Prepare context → Parallel execute Phases(in-degree=0) → Handle Phase completion → Trigger subsequent Phases → Loop until complete
3. Completion Phase
Wait for all Phases to complete → Execute success/failure Phase → Clean up resources

Phase Extension example

1. Phase Plugin Extension
// 1. Implement Phase interface
type CustomPhase struct {
    *BasePhase
    // custom fields
}

func (p *CustomPhase) ExecutePhase(ctx *PipelineContext) *errs.RSError {
    // custom logic
    return nil
}

// 2. Register plugin
func init() {
    RegisterPhase(func() Phase {
        return NewCustomPhase()
    })
}

// 3. Use in configuration
phases:
  - name: my_custom_phase
    type: CustomPhase
    dependsOn:
      - previous_phase
2. Processing Function Plugins
  • Request preprocessing functions: Custom request transformation logic
  • Response post-processing functions: Custom response processing logic
  • Phase post-processing functions: Custom logic after Phase execution
3. Skip Policies
  • Conditional skip: Decide whether to execute based on request characteristics
  • Example: Skip Encode phase when no images
schedule_params:
  skip_policy: skip_on_zero_image

Config example

infer_modes:
  - EPD
pipeline_cfg:
  phases:
    - name: encode_schedule
      type: Schedule
      span_name: encode_stage
      stage_name: Encode
      schedule_params:
        instance_type: encode
        skip_policy: skip_on_zero_image
        request_load: [image, text, bytes]

    - name: encode_execute
      type: Execute
      span_name: encode_stage
      stage_name: Encode
      execute_params:
        instance_type: encode
        pre_process_request_func: encode_prepare_request
        post_process_funcs: [encode_post_response]
      dependsOn: [encode_schedule]

    - name: prefill_schedule
      type: Schedule
      span_name: prefill_stage
      stage_name: Prefill
      schedule_params:
        instance_type: prefill
        request_load: [usage, text, bytes]
      dependsOn: [encode_execute]

    - name: prefill_execute
      type: Execute
      span_name: prefill_stage
      stage_name: Prefill
      execute_params:
        instance_type: prefill
        pre_process_request_func: prefill_prepare_request
      dependsOn: [prefill_schedule]

    - name: decode_schedule
      type: Schedule
      span_name: decode_stage
      stage_name: Decode
      schedule_params:
        instance_type: decode
        request_load: [usage, text, bytes]
      dependsOn: [prefill_execute]

    - name: decode_execute
      type: Execute
      span_name: decode_stage
      stage_name: Decode
      execute_params:
        instance_type: decode
        pre_process_request_func: decode_prepare_request
        post_process_funcs: [decode_post_response]
      dependsOn: [decode_schedule]

Test Plan

Alternatives

N/A

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions