Skip to content

sunildataengineer/Real-Time-Data-Quality-Streaming-Governance-Platform

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 

Repository files navigation

🛡️ Real-Time Data Quality & Streaming Governance Platform

Platform Banner Status Records Latency Quality

A production-grade platform for real-time data quality validation, streaming governance enforcement, and automated anomaly detection across 50M+ records/day at 50K–200K events/second.

ArchitectureData FlowSystem DesignGetting StartedAPI DocsContributing


📋 Table of Contents

  1. Problem Statement
  2. Architecture Diagram
  3. How It Works
  4. Data Ingestion & Scraping
  5. Data Flow
  6. Data Access Patterns
  7. Data Modelling
  8. System Design
  9. Prerequisites
  10. Running the Project
  11. Testing
  12. API Service
  13. References
  14. Contributing
  15. License

🧩 Problem Statement

Modern data platforms consume events from dozens of upstream producers. A single upstream schema change, null injection, or outlier burst can silently corrupt downstream analytics, ML features, and business dashboards — often going undetected for hours.

This platform solves:

  • 🔍 Real-time schema and constraint validation at 50K–200K events/sec with < 1s detection latency
  • 🚨 Automated anomaly detection on metric distributions (null rates, cardinality, volume)
  • 📋 Governance rule management — define, version, and enforce data contracts centrally
  • 🔁 Quarantine + dead-letter queue routing for invalid records
  • 📊 Full data quality lineage and audit trail for compliance and debugging
  • 🏗️ Reduce invalid data incidents by 70% through proactive enforcement

🏗️ Architecture Diagram

┌───────────────────────────────────────────────────────────────────────────────────────┐
│              REAL-TIME DATA QUALITY & STREAMING GOVERNANCE PLATFORM                   │
└───────────────────────────────────────────────────────────────────────────────────────┘

  ┌──────────────────────────────────────────────────────────────────────────────────┐
  │  DATA PRODUCER LAYER                                                              │
  │                                                                                   │
  │  Microservices ──┐                                                                │
  │  CDC Pipelines  ─┼──► [Kafka Cluster]  ──── Schema Registry (Avro/JSON/Protobuf) │
  │  Batch Exports  ─┘     Partitions: 32                                             │
  │  Third-party APIs       Replication: 3                                            │
  └──────────────────────────────────────────────────────────────────────────────────┘
                                     │
                    ┌────────────────▼──────────────────┐
                    │  GOVERNANCE RULE STORE             │
                    │  (PostgreSQL + Redis cache)        │
                    │  ├── Schema contracts              │
                    │  ├── Field-level quality rules     │
                    │  ├── Business constraint rules     │
                    │  └── Anomaly detection thresholds  │
                    └────────────────┬──────────────────┘
                                     │
  ┌──────────────────────────────────▼───────────────────────────────────────────────┐
  │  STREAM PROCESSING LAYER                                                          │
  │                                                                                   │
  │  [Apache Flink / Spark Structured Streaming]                                      │
  │  ┌─────────────────────┐  ┌──────────────────────┐  ┌────────────────────────┐  │
  │  │ Schema Validation   │  │  Field Quality Check  │  │  Statistical Anomaly   │  │
  │  │ • Avro/JSON schema  │  │  • Null checks        │  │  • Volume anomaly      │  │
  │  │ • Field presence    │  │  • Range constraints  │  │  • Null rate drift     │  │
  │  │ • Type enforcement  │  │  • Regex patterns     │  │  • Cardinality change  │  │
  │  │ • Enum validation   │  │  • Referential rules  │  │  • Value distribution  │  │
  │  └─────────────────────┘  └──────────────────────┘  └────────────────────────┘  │
  └──────────────────────────────────────────────────────────────────────────────────┘
                  │                        │                       │
        ┌─────────▼─────────┐  ┌──────────▼────────┐  ┌──────────▼──────────┐
        │  VALID RECORDS    │  │  INVALID RECORDS   │  │  ANOMALY ALERTS     │
        │  ──────────────   │  │  ─────────────── │  │  ─────────────────  │
        │  S3 (Parquet)     │  │  Dead-Letter Topic │  │  PagerDuty / Slack  │
        │  Downstream sinks │  │  S3 Quarantine     │  │  Grafana Alerts     │
        └───────────────────┘  └───────────────────┘  └─────────────────────┘
                                                                │
  ┌─────────────────────────────────────────────────────────────────────────────────┐
  │  OBSERVABILITY LAYER                                                             │
  │  Prometheus → Grafana  |  ELK Stack  |  Airflow (Batch DQ + Lineage)           │
  └─────────────────────────────────────────────────────────────────────────────────┘

⚙️ How It Works

Data Quality Enforcement Pipeline

Upstream Event Arrives
        │
        ▼
[Schema Validation Gate]
   Avro schema check via Schema Registry
   ├── PASS → continue
   └── FAIL → route to dead-letter-topic:schema-violations
        │
        ▼
[Field Quality Checks] (Deequ / Great Expectations rules)
   ├── Null rate check: field not null where required
   ├── Range check: amount BETWEEN 0.01 AND 999999.99
   ├── Pattern check: email matches regex
   ├── Enum check: status IN ('ACTIVE','INACTIVE','PENDING')
   ├── Referential check: user_id exists in user dimension
   └── FAIL → route to dead-letter-topic:field-violations
        │
        ▼
[Statistical Anomaly Detection] (Sliding Window)
   ├── Null rate > baseline + 3σ → ANOMALY ALERT
   ├── Event volume < 50% of rolling 7-day avg → VOLUME DROP ALERT
   ├── New cardinality explosion: distinct values 10x baseline → ALERT
   └── Distribution shift (KL divergence > threshold) → ALERT
        │
        ▼
[Quality Score Calculation]
   completeness × validity × consistency × timeliness → composite_dq_score
        │
        ▼
[Routing Decision]
   dq_score ≥ 0.95 → valid-topic → downstream consumers
   dq_score 0.7–0.95 → flagged-topic → consume with quality flag
   dq_score < 0.70 → quarantine-topic → hold for review

Quality Dimension Matrix

Dimension Definition Measurement Alert Threshold
Completeness Non-null rate for required fields non_null_count / total_count < 98%
Validity Records passing all constraint rules valid_count / total_count < 99%
Consistency Cross-field logical integrity Custom rule pass rate < 99.5%
Timeliness Event time vs. processing time lag P99 of now - event_time > 5 min
Uniqueness Dedup rate on primary keys distinct_pk / total < 99.9%
Accuracy Value range and business rule pass rate Domain-specific rules < 99%

📥 Data Ingestion & Scraping

Source Systems

Source System Format Volume Ingestion Method
Microservice Events Avro 20M records/day Kafka native producer
CDC from PostgreSQL Debezium JSON 10M records/day Debezium + Kafka Connect
Batch File Exports CSV/JSON 10M records/day S3 → Kafka (Lambda trigger)
Third-Party APIs JSON/REST 10M records/day Kafka Connect HTTP Source

Schema Registry Integration

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka import DeserializingConsumer
import logging
from typing import Optional, Dict, Any

logger = logging.getLogger(__name__)

class GovernedKafkaConsumer:
    """
    Schema-registry-aware Kafka consumer with automatic schema evolution handling.
    Enforces data contracts at the consumer boundary.
    """

    def __init__(
        self,
        bootstrap_servers: str,
        schema_registry_url: str,
        group_id: str,
        topic: str
    ):
        self.topic = topic
        sr_client = SchemaRegistryClient({"url": schema_registry_url})
        avro_deserializer = AvroDeserializer(sr_client)

        self.consumer = DeserializingConsumer({
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,    # Manual commit for exactly-once
            "value.deserializer": avro_deserializer,
            "isolation.level": "read_committed",  # Transactional reads only
        })
        self.consumer.subscribe([topic])

    def consume_with_validation(self) -> Optional[Dict[str, Any]]:
        msg = self.consumer.poll(timeout=1.0)
        if msg is None:
            return None
        if msg.error():
            logger.error("Consumer error: %s", msg.error())
            return None
        try:
            record = msg.value()
            self.consumer.commit(asynchronous=False)
            return record
        except Exception as e:
            logger.error("Deserialization failed (schema violation): %s", e)
            # Route to dead letter — schema violation
            self._publish_to_dead_letter(msg, str(e))
            self.consumer.commit(asynchronous=False)
            return None

🔄 Data Flow

┌──────────────────────────────────────────────────────────────────────────────┐
│                     DETAILED DATA FLOW WITH QUALITY GATES                    │
└──────────────────────────────────────────────────────────────────────────────┘

INPUT LAYER:
  Raw Event Sources
    ──► Kafka: [raw-events]  (retention: 3 days)
    ──► Schema Registry: contract enforcement at produce time

QUALITY GATE 1 — SCHEMA VALIDATION (Flink/Spark):
  [raw-events]
    ──► Avro schema check against Schema Registry
    ├── VALID   ──► [schema-valid-events]
    └── INVALID ──► [dlq-schema-violations] + Incident Metrics

QUALITY GATE 2 — FIELD QUALITY (Deequ/Great Expectations):
  [schema-valid-events]
    ──► Field constraint evaluation (null, range, pattern, enum)
    ──► Computed per-record quality score
    ├── score ≥ 0.95  ──► [validated-events]
    ├── score 0.7–0.95 ──► [flagged-events]  (with dq_score attached)
    └── score < 0.70  ──► [quarantine-events] + Alert

QUALITY GATE 3 — ANOMALY DETECTION (Flink Sliding Window):
  [validated-events] + [flagged-events]
    ──► 5-min tumbling window aggregations per topic+field
    ──► Statistical comparison vs. 7-day rolling baseline
    ├── NORMAL  ──► downstream sinks
    └── ANOMALY ──► [anomaly-alerts] + PagerDuty/Slack

SINK LAYER:
  [validated-events]   ──► S3 (Parquet, partitioned by date/source/topic)
  [flagged-events]     ──► S3 flagged partition + downstream w/ quality flag
  [quarantine-events]  ──► S3 quarantine bucket + PostgreSQL DQ incident log
  Aggregated metrics   ──► Prometheus → Grafana dashboards
  All events           ──► ELK (quality metadata for audit trail)

BATCH RECONCILIATION LAYER (Airflow - Hourly):
  S3 raw vs. validated ──► Row count reconciliation
  DQ metrics report    ──► Posted to Data Quality Dashboard
  Rule drift detection ──► Trigger rule threshold updates

🗃️ Data Access Patterns

Storage Strategy

Data Type Storage Access Pattern Retention
Validated events (hot) S3 + Parquet Batch + Athena queries 90 days
DQ metrics (hot) Prometheus Grafana dashboard reads 30 days
DQ incident log PostgreSQL API reads, analyst queries 1 year
Governance rules PostgreSQL + Redis Cached rule lookups Persistent
Quarantined events S3 (separate bucket) Analyst review, replay 30 days
Audit trail ELK Stack Full-text search 2 years

Rule Store Access (Redis-Cached PostgreSQL)

import redis
import json
import logging
from typing import List, Optional
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class QualityRule:
    rule_id: str
    topic: str
    field_name: str
    rule_type: str        # NULL_CHECK | RANGE | REGEX | ENUM | REFERENTIAL
    parameters: dict
    severity: str         # CRITICAL | WARNING | INFO
    is_active: bool

class RuleStoreClient:
    """
    Hot-path rule store: Redis cache backed by PostgreSQL.
    Cache TTL: 60 seconds (rules can be updated centrally in near-real-time).
    """

    CACHE_TTL_SECONDS = 60

    def __init__(self, redis_url: str, pg_conn_string: str):
        self.redis = redis.Redis.from_url(redis_url, decode_responses=True)
        self.pg_conn_string = pg_conn_string

    def get_rules_for_topic(self, topic: str) -> List[QualityRule]:
        cache_key = f"dq_rules:{topic}"
        cached = self.redis.get(cache_key)
        if cached:
            logger.debug("Cache HIT for topic rules: %s", topic)
            return [QualityRule(**r) for r in json.loads(cached)]

        logger.debug("Cache MISS — loading rules from PostgreSQL for topic: %s", topic)
        rules = self._load_from_postgres(topic)
        self.redis.setex(cache_key, self.CACHE_TTL_SECONDS, json.dumps([r.__dict__ for r in rules]))
        return rules

    def _load_from_postgres(self, topic: str) -> List[QualityRule]:
        # Load active rules for topic from PostgreSQL
        raise NotImplementedError("Implement PostgreSQL rule loading")

📐 Data Modelling

Data Quality Governance Schema (PostgreSQL)

-- Central governance rule registry
CREATE TABLE dq_governance.quality_rules (
    rule_id         UUID            PRIMARY KEY DEFAULT gen_random_uuid(),
    rule_name       VARCHAR(255)    NOT NULL,
    topic           VARCHAR(255)    NOT NULL,
    field_name      VARCHAR(255),
    rule_type       VARCHAR(50)     NOT NULL
                    CHECK (rule_type IN ('NULL_CHECK','RANGE','REGEX','ENUM','REFERENTIAL','STATISTICAL')),
    parameters      JSONB           NOT NULL,       -- {"min": 0, "max": 999999}
    severity        VARCHAR(20)     NOT NULL
                    CHECK (severity IN ('CRITICAL','WARNING','INFO')),
    is_active       BOOLEAN         NOT NULL DEFAULT TRUE,
    version         INT             NOT NULL DEFAULT 1,
    created_by      VARCHAR(100)    NOT NULL,
    created_at      TIMESTAMP       NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMP       NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_rules_topic_active ON dq_governance.quality_rules(topic, is_active);

-- Per-event quality audit record
CREATE TABLE dq_governance.quality_audit (
    audit_id            UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
    event_id            VARCHAR(255) NOT NULL,
    topic               VARCHAR(255) NOT NULL,
    source_system       VARCHAR(100) NOT NULL,
    completeness_score  NUMERIC(5,4) NOT NULL,
    validity_score      NUMERIC(5,4) NOT NULL,
    consistency_score   NUMERIC(5,4) NOT NULL,
    composite_dq_score  NUMERIC(5,4) NOT NULL,
    failed_rules        JSONB,                     -- [{rule_id, field, reason}]
    dq_tier             VARCHAR(20)  NOT NULL,      -- VALID | FLAGGED | QUARANTINED
    processed_at        TIMESTAMP    NOT NULL DEFAULT NOW(),
    event_time          TIMESTAMP    NOT NULL
) PARTITION BY RANGE (processed_at);

-- Monthly partitions for audit table
CREATE TABLE dq_governance.quality_audit_2024_01
    PARTITION OF dq_governance.quality_audit
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- DQ incident tracking
CREATE TABLE dq_governance.dq_incidents (
    incident_id         UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
    incident_type       VARCHAR(50) NOT NULL,   -- SCHEMA_VIOLATION | ANOMALY | VOLUME_DROP
    topic               VARCHAR(255) NOT NULL,
    affected_field      VARCHAR(255),
    severity            VARCHAR(20) NOT NULL,
    detected_at         TIMESTAMP   NOT NULL DEFAULT NOW(),
    resolved_at         TIMESTAMP,
    incident_detail     JSONB       NOT NULL,
    auto_resolved       BOOLEAN     NOT NULL DEFAULT FALSE,
    notified_channels   TEXT[]
);

Dimensional Model (Redshift — Analytics)

FACT_DQ_EVENTS
├── event_key (PK)
├── topic_key (FK → DIM_TOPIC)
├── source_key (FK → DIM_SOURCE)
├── date_key  (FK → DIM_DATE)
├── composite_dq_score
├── completeness_score
├── validity_score
├── consistency_score
├── timeliness_ms
├── dq_tier
└── failed_rule_count

DIM_TOPIC: topic_key, topic_name, domain, owner_team, criticality, sla_dq_threshold
DIM_SOURCE: source_key, source_system, source_type, data_classification, pii_contains
DIM_RULE: rule_key, rule_name, rule_type, severity, topic

🏛️ System Design

High-Level Architecture

┌─────────────────────────────────────────────────────────────────────┐
│  GOVERNANCE CONTROL PLANE                                           │
│  ├── Rule Management API (FastAPI)                                  │
│  ├── Rule Store (PostgreSQL + Redis)                                │
│  ├── Schema Registry (Confluent)                                    │
│  └── Governance Dashboard (Grafana)                                 │
└────────────────────────┬────────────────────────────────────────────┘
                          │ Rule Hot-Reload (60s TTL)
┌────────────────────────▼────────────────────────────────────────────┐
│  STREAM PROCESSING PLANE                                            │
│  ├── Flink Quality Validation Job     (parallelism: 32)             │
│  ├── Flink Anomaly Detection Job      (parallelism: 16)             │
│  ├── Spark Structured Streaming       (batch-aligned aggregations)  │
│  └── Kafka Streams (lightweight routing)                            │
└─────────────────────────────────────────────────────────────────────┘

Scalability Decisions:
┌──────────────────────────────────────┬─────────────────────────────────────┐
│ Concern                              │ Solution                            │
├──────────────────────────────────────┼─────────────────────────────────────┤
│ Rule evaluation hot path             │ Redis cache, 60s TTL, lazy refresh  │
│ High cardinality anomaly detection   │ HyperLogLog for distinct counts      │
│ Late arriving events                 │ Flink watermark + allowed lateness  │
│ Rule changes w/o job restart         │ Dynamic config reload (Redis watch)  │
│ DLQ replay for fixed events          │ S3 quarantine → replay Kafka topic  │
│ Schema evolution backward compat     │ Schema Registry + BACKWARD compat   │
│ DQ score computation at scale        │ Vectorized PySpark UDFs             │
└──────────────────────────────────────┴─────────────────────────────────────┘

Flink Data Quality Job (PySpark equivalent)

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import logging
from typing import Dict, Any

logger = logging.getLogger(__name__)

def compute_completeness_score(df, required_fields: list) -> "DataFrame":
    """
    Vectorized completeness scoring across all required fields.
    Avoids UDF overhead by using native Spark functions.
    """
    total_fields = len(required_fields)
    null_check_exprs = [
        F.when(F.col(field).isNull(), 0).otherwise(1)
        for field in required_fields
    ]
    return df.withColumn(
        "completeness_score",
        sum(null_check_exprs) / F.lit(total_fields)
    )

def compute_validity_score(df, rule_config: Dict[str, Any]) -> "DataFrame":
    """
    Applies field-level validity rules and computes aggregate validity score.
    Rules loaded from Redis-backed rule store for dynamic updates.
    """
    validity_checks = []

    for field, config in rule_config.items():
        if config["type"] == "RANGE":
            check = (
                F.col(field).between(config["min"], config["max"])
                .cast("integer")
            )
        elif config["type"] == "NOT_NULL":
            check = F.col(field).isNotNull().cast("integer")
        elif config["type"] == "REGEX":
            check = F.col(field).rlike(config["pattern"]).cast("integer")
        else:
            check = F.lit(1)

        validity_checks.append(check)

    total_rules = len(validity_checks)
    return df.withColumn(
        "validity_score",
        sum(validity_checks) / F.lit(total_rules)
    )

def classify_dq_tier(df) -> "DataFrame":
    """Assign DQ tier based on composite score."""
    return df.withColumn(
        "dq_tier",
        F.when(F.col("composite_dq_score") >= 0.95, "VALID")
         .when(F.col("composite_dq_score") >= 0.70, "FLAGGED")
         .otherwise("QUARANTINED")
    )

def run_quality_pipeline(
    kafka_bootstrap: str,
    topic: str,
    checkpoint_path: str,
    rule_config: Dict[str, Any],
    required_fields: list
) -> None:
    spark = (
        SparkSession.builder
        .appName("DataQualityGovernancePipeline")
        .config("spark.sql.shuffle.partitions", "64")
        .config("spark.streaming.stopGracefullyOnShutdown", "true")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")

    # Read from Kafka
    raw_stream = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafka_bootstrap)
        .option("subscribe", topic)
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()
    )

    # Deserialize and apply quality pipeline
    enriched = (
        raw_stream
        .select(F.from_json(F.col("value").cast("string"), schema=get_event_schema()).alias("data"))
        .select("data.*")
        .transform(lambda df: compute_completeness_score(df, required_fields))
        .transform(lambda df: compute_validity_score(df, rule_config))
        .withColumn("composite_dq_score",
                    (F.col("completeness_score") * 0.4 + F.col("validity_score") * 0.6))
        .transform(classify_dq_tier)
    )

    # Write valid records downstream
    query = (
        enriched
        .writeStream
        .format("delta")
        .option("checkpointLocation", checkpoint_path)
        .option("path", "s3://your-bucket/validated-events/")
        .partitionBy("dq_tier", "event_date")
        .outputMode("append")
        .trigger(processingTime="10 seconds")
        .start()
    )

    query.awaitTermination()

🔧 Prerequisites

Infrastructure Requirements

Component Version Notes
Docker ≥ 24.0 Container runtime
Docker Compose ≥ 2.20 Local orchestration
Kubernetes ≥ 1.27 Production deployment
Helm ≥ 3.12 K8s package management
Terraform ≥ 1.5 AWS infra provisioning
Python ≥ 3.10 Application runtime
Java ≥ 11 Flink runtime
Apache Kafka ≥ 3.5 Message streaming
Apache Flink ≥ 1.17 Stream processing
Apache Spark ≥ 3.4 Batch + structured streaming
Great Expectations ≥ 0.18 Data quality framework
Apache Deequ ≥ 2.0 Spark-native DQ
PostgreSQL ≥ 15 Rule store + audit log
Redis ≥ 7.0 Rule cache hot path

AWS Services Required

├── Amazon MSK          — Managed Kafka (schema registry, DLQ topics)
├── Amazon EKS          — Kubernetes cluster for Flink/Spark
├── Amazon S3           — Validated/quarantined data lake
├── Amazon RDS          — PostgreSQL for rule store and audit log
├── Amazon ElastiCache  — Redis for rule caching
├── Amazon CloudWatch   — Metrics, alarms, log groups
├── AWS Glue Catalog    — Data catalog for validated S3 data
└── AWS Secrets Manager — Connection secrets management

Python Dependencies

# Core dependencies
confluent-kafka==2.3.0
pyspark==3.4.1
apache-flink==1.17.0
great-expectations==0.18.0
fastapi==0.104.0
uvicorn==0.24.0
psycopg2-binary==2.9.9
redis==5.0.1
boto3==1.34.0
prometheus-client==0.19.0

🚀 Running the Project

Local Development

# Clone the repository
git clone https://github.com/sunildataengineer/Real-Time-Data-Quality-Streaming-Governance-Platform.git
cd Real-Time-Data-Quality-Streaming-Governance-Platform

# Set up environment
python3.10 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

# Start local infrastructure
docker-compose -f docker/docker-compose.dev.yml up -d
# Services: Kafka, Zookeeper, Schema Registry, PostgreSQL, Redis, Prometheus, Grafana

# Run database migrations
python scripts/setup/run_migrations.py

# Seed governance rules
python scripts/setup/seed_quality_rules.py

# Start synthetic data generator
python scripts/data_gen/event_simulator.py --topic user-events --rate 5000 --bad-record-rate 0.05

# Start Spark quality pipeline
python src/pipelines/quality_pipeline.py \
  --kafka-bootstrap localhost:9092 \
  --topic user-events \
  --checkpoint-path /tmp/checkpoints/quality-pipeline

# Access Grafana dashboard
open http://localhost:3000  # admin/admin

Production Deployment

# Provision AWS infrastructure
cd terraform/
terraform init && terraform apply -var-file=environments/prod.tfvars

# Build and push images
make docker-build ENV=prod && make docker-push ENV=prod

# Deploy to Kubernetes
helm upgrade --install dq-governance ./helm/dq-governance \
  --namespace data-quality \
  --values helm/dq-governance/values.prod.yaml \
  --set image.tag=$(git rev-parse --short HEAD)

# Deploy CI/CD pipeline (GitHub Actions)
# .github/workflows/deploy.yml handles: test → build → deploy → smoke test

CI/CD Pipeline (GitHub Actions)

# .github/workflows/deploy.yml (summary)
name: Deploy Data Quality Platform
on:
  push:
    branches: [main]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install -r requirements.txt && pytest tests/ --cov=src --cov-fail-under=80
  build-and-push:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - run: make docker-build && make docker-push
  deploy:
    needs: build-and-push
    runs-on: ubuntu-latest
    steps:
      - run: helm upgrade --install dq-governance ./helm/dq-governance ...
      - run: python tests/smoke/test_end_to_end.py --env prod

🧪 Testing

Test Categories

# Unit tests — quality rules, scoring logic, schema validation
pytest tests/unit/ -v --cov=src --cov-report=html --cov-fail-under=80

# Integration tests — Kafka → pipeline → PostgreSQL
pytest tests/integration/ -v -m integration

# Data quality rule tests
pytest tests/unit/test_quality_rules.py -v

# Anomaly detection tests
pytest tests/unit/test_anomaly_detection.py -v

# End-to-end: inject bad records, verify DLQ routing
pytest tests/e2e/test_quality_pipeline.py -v --timeout=180

# Great Expectations suite validation
great_expectations checkpoint run quality_checkpoint

# Load test
python tests/load/load_test.py --rate 50000 --duration 300 --bad-rate 0.05

Sample Rule Test

import pytest
from src.quality.rules import NullCheckRule, RangeRule, RegexRule

class TestQualityRules:

    def test_null_check_passes_non_null(self):
        rule = NullCheckRule(field="user_id", severity="CRITICAL")
        assert rule.evaluate({"user_id": "u_001"}).passed is True

    def test_null_check_fails_null_field(self):
        rule = NullCheckRule(field="user_id", severity="CRITICAL")
        result = rule.evaluate({"user_id": None})
        assert result.passed is False
        assert result.severity == "CRITICAL"

    def test_range_rule_passes_in_bounds(self):
        rule = RangeRule(field="amount", min_val=0.01, max_val=999999.99)
        assert rule.evaluate({"amount": 100.50}).passed is True

    def test_range_rule_fails_negative_amount(self):
        rule = RangeRule(field="amount", min_val=0.01, max_val=999999.99)
        result = rule.evaluate({"amount": -5.00})
        assert result.passed is False

    def test_composite_score_quarantine_threshold(self):
        from src.quality.scorer import CompositeScorer
        scorer = CompositeScorer()
        score = scorer.compute(completeness=0.50, validity=0.60)
        assert score < 0.70  # Should be QUARANTINED

🌐 API Service

Governance REST API (FastAPI)

Base URL: https://dq-governance-api.yourdomain.com/v1

Endpoint Method Description
/health GET Health check
/rules GET List all active quality rules
/rules POST Create a new quality rule
/rules/{rule_id} PUT Update rule parameters
/rules/{rule_id} DELETE Deactivate a rule
/topics/{topic}/quality GET Current DQ metrics for a topic
/topics/{topic}/incidents GET Recent DQ incidents
/quarantine GET List quarantined records
/quarantine/{event_id}/replay POST Replay a quarantined record
/metrics/summary GET Platform-wide DQ scorecard

Sample Requests

# Get DQ scorecard for a topic
curl -X GET "https://dq-governance-api.yourdomain.com/v1/topics/user-events/quality" \
  -H "Authorization: Bearer $API_TOKEN"
{
  "topic": "user-events",
  "window": "last_1_hour",
  "total_records": 180432,
  "valid_records": 175987,
  "flagged_records": 3241,
  "quarantined_records": 1204,
  "composite_dq_score": 0.9751,
  "completeness_score": 0.9923,
  "validity_score": 0.9871,
  "consistency_score": 0.9914,
  "active_incidents": 0,
  "evaluated_at": "2024-01-15T11:00:00Z"
}
# Create a new quality rule
curl -X POST "https://dq-governance-api.yourdomain.com/v1/rules" \
  -H "Authorization: Bearer $API_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "rule_name": "amount_positive",
    "topic": "payment-events",
    "field_name": "amount",
    "rule_type": "RANGE",
    "parameters": {"min": 0.01, "max": 999999.99},
    "severity": "CRITICAL"
  }'

📚 References

Resource Link
Apache Flink Documentation https://nightlies.apache.org/flink/flink-docs-stable/
Apache Deequ (AWS) https://github.com/awslabs/deequ
Great Expectations Docs https://docs.greatexpectations.io/
Confluent Schema Registry https://docs.confluent.io/platform/current/schema-registry/
Spark Structured Streaming https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
Data Quality Dimensions (DAMA) https://www.dama.org/
The Data Engineering Cookbook https://github.com/andkret/Cookbook
AWS MSK Best Practices https://docs.aws.amazon.com/msk/latest/developerguide/bestpractices.html

🤝 Contributing

# Standard contribution workflow
git checkout -b feat/your-quality-rule-name

# Run full test suite
make lint && make test

# Commit with conventional commits
git commit -m "feat(rules): add referential integrity check for user_id"

# Open pull request — requires 2 reviewer approvals + CI green

Key standards:

  • All new quality rules must have unit tests with pass/fail/edge-case coverage
  • Rule changes require performance benchmarking (< 5% throughput impact)
  • Any new DLQ route must include a replay mechanism
  • Governance changes require team lead approval

📄 License

This project is licensed under the Apache License 2.0 — see the LICENSE file for details.


Built for production-grade data trust and governance at scale

⭐ Star this repo if it helped • 🐛 Report Issues • 💡 Request Features

About

A streaming data governance system that enforces schema and data quality rules in real time using Spark and Delta Lake.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors