An end-to-end data pipeline using Apache Spark, Apache Airflow, and PostgreSQL for processing and analyzing the Online Retail dataset.
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Raw Data │ │ Apache Spark │ │ PostgreSQL │
│ (retails.csv) │────▶│ (Cleaning & │────▶│ (Storage & │
│ │ │ Transformation)│ │ Analysis) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
▲
│
┌────────┴────────┐
│ Apache Airflow │
│ (Orchestration)│
└─────────────────┘
retails.csv → data_cleaning.py → retail_transactions (PostgreSQL)
↓
data_transformation.py
↓
┌───────────────────────────┼───────────────────────────┐
↓ ↓ ↓
total_revenue top_10_most_popular_products monthly_revenue
(PostgreSQL) (PostgreSQL) (PostgreSQL)
- Docker (version 20.10 or higher)
- Docker Compose (version 2.0 or higher)
- 8GB+ RAM recommended (configured for 16GB system)
- At least 10GB free disk space
retail-data-pipeline/
├── docker-compose.yml # Docker services configuration
├── .env.example # Environment variables template
├── .env # Environment variables (not in git)
├── .gitignore # Git ignore rules
├── README.md # This file
├── data/
│ └── retails.csv # Input dataset
├── docker/ # Docker configurations
│ ├── airflow/
│ │ └── Dockerfile # Airflow container
│ └── spark/
│ └── Dockerfile # Spark container
└── src/ # Source code
├── dags/
│ └── retail_pipeline_dag.py # Airflow DAG
├── jobs/
│ ├── data_cleaning.py # Data cleaning → PostgreSQL
│ └── data_transformation.py # Analysis → PostgreSQL
└── sql/
├── init_postgres.sql # Database initialization
└── analysis_queries.sql # SQL analysis queries
cd retail-data-pipeline# Copy the example environment file (defaults work out of the box)
cp .env.example .env# Build all Docker images
docker-compose build
# Start all services (includes automatic Airflow initialization)
docker-compose up -d
# Wait ~30 seconds for all services to be ready
docker-compose ps| Service | URL | Credentials |
|---|---|---|
| Airflow UI | http://localhost:8081 | (see .env file) |
| Spark Master UI | http://localhost:8080 | N/A |
| PostgreSQL | localhost:5432 | (see .env file) |
Option A: Via Airflow UI
- Open http://localhost:8081
- Login with admin / admin
- Find
retail_data_pipelineDAG - Click "Trigger DAG" to run manually
Option B: Via Command Line
# Unpause and trigger the DAG
docker exec airflow-scheduler airflow dags unpause retail_data_pipeline
docker exec airflow-scheduler airflow dags trigger retail_data_pipeline
# Check DAG status
docker exec airflow-scheduler airflow dags list-runs -d retail_data_pipelineOption C: Run Spark Jobs Manually
# Data Cleaning (cleans data and loads to PostgreSQL)
docker exec spark-master spark-submit \
--master spark://spark-master:7077 \
--jars /opt/spark/jars/postgresql-42.7.1.jar \
/opt/spark/jobs/data_cleaning.py
# Data Transformation (reads from PostgreSQL, performs analysis, saves results)
docker exec spark-master spark-submit \
--master spark://spark-master:7077 \
--jars /opt/spark/jars/postgresql-42.7.1.jar \
/opt/spark/jobs/data_transformation.pyCleans raw retail data and stores in PostgreSQL:
- Filters cancelled transactions (InvoiceNo starting with 'C')
- Validates InvoiceNo (6-digit integer)
- Validates StockCode (5-digit integer)
- Validates Quantity (positive integer)
- Validates UnitPrice (positive number)
- Validates CustomerID (5-digit integer)
- Validates InvoiceDate (valid datetime)
- Cleans Description and Country fields
- Calculates Revenue (Quantity × UnitPrice)
- Removes duplicate records
- Anonymizes CustomerID using SHA-256 hashing (PII protection)
- Output:
retail_transactionstable in PostgreSQL
Reads cleaned data from PostgreSQL and performs analysis:
- Calculates total revenue generated
- Identifies top 10 most popular products by quantity sold
- Analyzes monthly revenue trends with insights
- Output:
total_revenuetable in PostgreSQLtop_10_most_popular_productstable in PostgreSQLmonthly_revenuetable in PostgreSQL
| Table | Description |
|---|---|
retail_transactions |
Cleaned transaction data |
total_revenue |
Total revenue, transaction count, avg transaction value |
top_10_most_popular_products |
Top 10 products by quantity sold |
monthly_revenue |
Monthly revenue trends |
# Check table row counts
docker exec retail_postgres psql -U airflow -d retail_db -c "
SELECT 'retail_transactions' as table_name, COUNT(*) as rows FROM retail_transactions
UNION ALL
SELECT 'total_revenue', COUNT(*) FROM total_revenue
UNION ALL
SELECT 'top_10_most_popular_products', COUNT(*) FROM top_10_most_popular_products
UNION ALL
SELECT 'monthly_revenue', COUNT(*) FROM monthly_revenue;
"
# View total revenue
docker exec retail_postgres psql -U airflow -d retail_db -c "
SELECT total_revenue, transaction_count, avg_transaction_value FROM total_revenue;
"
# View top products
docker exec retail_postgres psql -U airflow -d retail_db -c "
SELECT stock_code, description, total_quantity_sold, total_revenue
FROM top_10_most_popular_products
ORDER BY total_quantity_sold DESC;
"
# View monthly revenue
docker exec retail_postgres psql -U airflow -d retail_db -c "
SELECT year_month, total_revenue, transaction_count , avg_transaction_value
FROM monthly_revenue
ORDER BY year_month;
"
# customer_id shows SHA-256 hash (PII protected)
docker exec retail_postgres psql -U airflow -d retail_db -c "
SELECT invoice_no, customer_id, country, revenue
FROM retail_transactions
LIMIT 3;
"
Connect to PostgreSQL to run analysis queries:
# Connect to PostgreSQL
docker exec -it retail_postgres psql -U airflow -d retail_dbThe src/sql/analysis_queries.sql file contains advanced analytical queries:
- Top 3 Products by Revenue per Month (last 6 months of data)
- Rolling 3-Month Average Revenue for Australia
# Run all analysis queries from the file
docker exec -i retail_postgres psql -U airflow -d retail_db < src/sql/analysis_queries.sql
# Or run from inside the container
docker exec -it retail_postgres psql -U airflow -d retail_db -f /docker-entrypoint-initdb.d/analysis_queries.sql# Stop all services
docker-compose down
# Stop and remove volumes (WARNING: deletes all data)
docker-compose down -v# Check service status
docker-compose ps
# View logs for specific service
docker-compose logs spark-master
docker-compose logs airflow-webserver
docker-compose logs postgres# Restart Airflow scheduler
docker-compose restart airflow-scheduler
# Check for DAG errors
docker exec airflow-scheduler airflow dags list# Check Spark logs
docker-compose logs spark-master
docker-compose logs spark-worker
# Verify data file exists
docker exec spark-master ls -la /data/# Verify PostgreSQL is running
docker exec retail_postgres pg_isready -U airflow -d retail_db
# Check PostgreSQL logs
docker-compose logs postgres| Component | Setting | Value |
|---|---|---|
| PostgreSQL | Memory limit | 1GB |
| PostgreSQL | Shared buffers | 256MB |
| Spark Master | Memory limit | 1.5GB |
| Spark Worker | Memory | 2GB |
| Spark Worker | Cores | 2 |
| Spark Jobs | Driver memory | 1GB |
| Spark Jobs | Executor memory | 1GB |
| Spark Jobs | Shuffle partitions | 8 |
| Airflow Webserver | Memory limit | 2GB |
| Airflow Scheduler | Memory limit | 2GB |
| Airflow | Parallelism | 8 |
Total estimated memory usage: ~10GB (leaves headroom for your OS)
Configure in .env file (copy from .env.example). Defaults work out of the box for local development.
| Variable | Default | Description |
|---|---|---|
| POSTGRES_USER | airflow | Database username |
| POSTGRES_PASSWORD | airflow | Database password |
| POSTGRES_DB | retail_db | Database name |
| AIRFLOW_ADMIN_USERNAME | admin | Airflow web UI username |
| AIRFLOW_ADMIN_PASSWORD | admin | Airflow web UI password |
| AIRFLOW_FERNET_KEY | (pre-generated) | Encryption key for Airflow |
| AIRFLOW_SECRET_KEY | (pre-generated) | Secret key for Airflow webserver |
.env to version control.
The Online Retail dataset contains:
| Column | Description |
|---|---|
| InvoiceNo | Invoice number (starts with 'C' for cancellations) |
| StockCode | Product code (5-digit integer) |
| Description | Product name |
| Quantity | Quantity per transaction |
| InvoiceDate | Transaction date/time |
| UnitPrice | Price per unit (GBP) |
| CustomerID | Customer identifier (anonymized with SHA-256) |
| Country | Customer's country |
| Revenue | Transaction revenue (calculated) |
| Field | Validation |
|---|---|
| InvoiceNo | 6-digit integer, no cancellations (C prefix) |
| StockCode | 5-digit integer |
| Quantity | Positive integer |
| UnitPrice | Positive number |
| CustomerID | 5-digit integer (hashed for PII protection) |
| InvoiceDate | Valid datetime |
| Revenue | Quantity × UnitPrice > 0 |
This project is for educational/evaluation purposes.