Skip to content

An intelligent CLI framework for building, orchestrating, and monitoring complex data processing pipelines with real-time visualization

License

Notifications You must be signed in to change notification settings

psenger/pipeline-forge

Repository files navigation

Pipeline Forge Logo

Pipeline Forge

Build, orchestrate, and monitor data pipelines with confidence.

Python 3.8+ License: MIT Status: Alpha

What is it?FeaturesQuick StartArchitectureUse CasesDocs


What is Pipeline Forge?

"My ETL job failed at 3 AM. Again. Which step? No idea."

Pipeline Forge is a CLI framework that brings sanity to data pipeline orchestration. Define your workflows in simple YAML, get automatic dependency resolution, built-in retry logic, circuit breakers, and a real-time terminal dashboard—all without the operational overhead of heavyweight orchestrators.

Pipeline Forge is for teams who want Airflow-level visibility without Airflow-level complexity.


Features

Core Pipeline Engine

  • YAML-based configuration — Human-readable pipeline definitions
  • Dependency resolution — Automatic topological sorting with cycle detection
  • Parallel execution — Configurable concurrent task limits
  • Shell & Python tasks — Run any command or Python function

Reliability & Fault Tolerance

  • Retry with exponential backoff — Configurable attempts, delays, and jitter
  • Circuit breaker pattern — Prevent cascading failures automatically
  • State snapshots — Recover from corruption with checkpoint restoration
  • Timeout protection — No more hung tasks

Distributed Execution

  • Work stealing scheduler — Automatic load balancing across nodes
  • Node health monitoring — Detect and route around failures
  • Atomic task queues — Thread-safe distributed coordination

Observability

  • Real-time terminal dashboard — Watch execution as it happens
  • Task metrics — Duration, retry count, CPU/memory usage
  • Execution history — Full audit trail of every run

Quick Start

1. Install

pip install -r requirements.txt

2. Create a Pipeline

# my_pipeline.yaml
name: hello_pipeline
version: "1.0"

tasks:
  - name: extract
    command: "echo 'Extracting data...'"
    timeout: 60

  - name: transform
    command: "echo 'Transforming data...'"
    dependencies: ["extract"]
    timeout: 120

  - name: load
    command: "echo 'Loading data...'"
    dependencies: ["transform"]
    timeout: 60

settings:
  parallel_tasks: 2
  log_level: INFO

3. Run

python -m pipeline_forge.cli.main run my_pipeline.yaml --dashboard

Pipeline Forge Dashboard

→ See Getting Started Guide for complete walkthrough.


Architecture

┌──────────────────────────────────────────────────────────────────┐
│                        Pipeline Forge CLI                        │
│                    python -m pipeline_forge.cli.main             │
├──────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐    ┌──────────────┐    ┌───────────────────┐   │
│  │    Config    │    │   Pipeline   │    │     Terminal      │   │
│  │    Parser    │───▶│    Engine    │───▶│    Dashboard      │   │
│  │  (YAML/JSON) │    │              │    │   (Rich TUI)      │   │
│  └──────────────┘    └──────┬───────┘    └───────────────────┘   │
│                             │                                    │
│              ┌──────────────┼──────────────┐                     │
│              ▼              ▼              ▼                     │
│       ┌──────────┐   ┌──────────┐   ┌──────────┐                 │
│       │ Executor │   │  Retry   │   │ Circuit  │                 │
│       │          │   │ Manager  │   │ Breaker  │                 │
│       └────┬─────┘   └──────────┘   └──────────┘                 │
│            │                                                     │
│            ▼                                                     │
│  ┌───────────────────────────────────────────────────────────┐   │
│  │                    Task Execution                         │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐       │   │
│  │  │ Task 1  │  │ Task 2  │  │ Task 3  │  │ Task N  │       │   │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘       │   │
│  └───────────────────────────────────────────────────────────┘   │
│                                                                  │
├──────────────────────────────────────────────────────────────────┤
│                     Distributed Layer (Optional)                 │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐        │
│  │  Scheduler   │───▶│  Work Queue  │───▶│    Nodes     │        │
│  │ (Work Steal) │    │   (Atomic)   │    │  (Workers)   │        │
│  └──────────────┘    └──────────────┘    └──────────────┘        │
└──────────────────────────────────────────────────────────────────┘
Component Technology Purpose
CLI Click, argparse Command-line interface
Config PyYAML, jsonschema Pipeline definition parsing
Dashboard Rich Real-time terminal UI
Execution asyncio Concurrent task running
Reliability Custom Retry, circuit breaker, recovery

Use Cases

Data Engineers

"I need to orchestrate ETL jobs with proper error handling and monitoring."

Define extract-transform-load workflows with automatic retries, dependency management, and real-time visibility into execution status.

DevOps Engineers

"I want to automate deployment pipelines without standing up Airflow."

Run build, test, and deploy tasks with circuit breakers to prevent cascading failures and detailed logs for debugging.

ML Engineers

"I need reproducible training pipelines with checkpointing."

Chain data prep, training, evaluation, and deployment tasks with state snapshots for recovery and metrics collection.

Platform Teams

"We need self-service pipeline infrastructure for internal teams."

Provide a lightweight, YAML-based pipeline framework that teams can adopt without complex infrastructure setup.


Documentation

Guide Description
Getting Started Installation and first pipeline
API Reference Python API documentation
Examples Sample pipelines for common patterns
Performance Tuning Optimization guide
Troubleshooting Common issues and solutions

Project Status

Pipeline Forge is currently in Alpha. Core functionality works, but APIs may change.

What Works

  • YAML pipeline definition and validation
  • Dependency resolution with cycle detection
  • Task execution with parallel workers
  • Retry with exponential backoff
  • Circuit breaker pattern
  • Terminal dashboard (basic)
  • State snapshots and recovery

In Progress

  • Plugin system (base classes defined)
  • Distributed execution (architecture complete)
  • Metrics collection and export
  • Enhanced CLI commands

Planned

  • Web dashboard
  • REST API
  • Kubernetes operator
  • Pre-built connectors (S3, databases, etc.)

→ See ROADMAP.md for detailed plans.


CLI Commands

# Run a pipeline
python -m pipeline_forge.cli.main run <pipeline.yaml>

# Run with dashboard
python -m pipeline_forge.cli.main run <pipeline.yaml> --dashboard

# Validate configuration
python -m pipeline_forge.cli.main validate <pipeline.yaml>

# Initialize new project
python -m pipeline_forge.cli.main init <project_name>

# Start standalone dashboard
python -m pipeline_forge.cli.main dashboard --port 8080

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for guidelines.

# Clone the repository
git clone https://github.com/your-org/pipeline-forge.git
cd pipeline-forge

# Install dependencies
pip install -r requirements.txt

# Run tests (coming soon)
pytest

License

This project is licensed under the MIT License — see the LICENSE file for details.


Acknowledgments

  • Built with Click and Rich
  • Inspired by Apache Airflow, Prefect, and Luigi
  • Circuit breaker pattern from Michael Nygard's Release It!

Built with precision for data teams who value reliability.

About

An intelligent CLI framework for building, orchestrating, and monitoring complex data processing pipelines with real-time visualization

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages