An automated ETL (Extract, Transform, Load) data pipeline orchestrated by Apache Airflow. This project fetches the latest market data for a defined list of cryptocurrencies from the CoinGecko API every 10 minutes, transforms the data into a clean format, and loads it into a PostgreSQL database for long-term storage and analysis. The entire system is containerized using Docker and Docker Compose for easy deployment and scalability.
- Automated Data Ingestion: Schedules data fetching from the CoinGecko API to run every 10 minutes.
- Dynamic Configuration: Easily add or remove target cryptocurrencies by modifying a central configuration file.
- Modular & Maintainable Code: The project is structured with a clear separation of concerns (API client, transformations, database loading).
- Persistent Storage: Data is reliably stored in a PostgreSQL database, ready for analysis.
- Orchestration & Monitoring: Full workflow management and monitoring via the Apache Airflow Web UI.
- Containerized Environment: All services (Airflow, PostgreSQL, Redis) are containerized, ensuring a consistent and reproducible setup.
- Orchestrator: Apache Airflow 2.8.2
- Database: PostgreSQL 13
- Containerization: Docker & Docker Compose
- Language: Python 3.8
- Core Libraries:
requests,pandas,psycopg2(via Airflow Provider) - Data Source: CoinGecko API
The project is structured to separate the orchestration logic (DAGs) from the core application logic (src), promoting reusability and maintainability.
coingecko-airflow-pipeline/
├── dags/
│ └── coingecko_pipeline_dag.py
├── src/
│ ├── __init__.py
│ ├── api/
│ │ └── coingecko_client.py
│ ├── core/
│ │ └── transformations.py
│ ├── db/
│ │ └── postgres_loader.py
│ └── config/
│ └── settings.py
├── logs/
├── plugins/
├── .env
├── docker-compose.yml
└── requirements.txt
This guide assumes you have a host machine (e.g., a GCP VM) with Docker and Docker Compose already installed.
For a complete, step-by-step guide on setting up the entire infrastructure from scratch (including GCP VM, Docker installation, and Firewall rules), please refer to the Detailed Infrastructure Setup Guide.
git clone [https://github.com/](https://github.com/)[your-github-username]/[your-repo-name].git
cd [your-repo-name]To avoid file permission issues between the host and the Docker containers, you need to set your local user and group ID.
-
Find your User ID and Group ID:
echo "AIRFLOW_UID=$(id -u)" >> .env echo "AIRFLOW_GID=$(id -g)" >> .env
-
This will create a
.envfile in the project root with the correct values.
- Open
src/config/settings.py. - Update
API_HEADERSwith your valid CoinGecko API key. - Modify the
TARGET_COINSlist to include the cryptocurrencies you want to track.
Before running the pipeline for the first time, you need to create the target table in your PostgreSQL database. Connect to your PostgreSQL instance and run the schema definition script (you can place this in sql/schema.sql for reference).
Run all services using Docker Compose from the project root directory.
docker-compose up -dWait for 3-5 minutes for all services to initialize, especially on the first run. Check the status with docker ps. All services should show a (healthy) status.
-
Access the Airflow UI:
- Open your web browser and navigate to
http://<your_vm_ip>:8080. - Log in with the default credentials:
airflow/airflow.
- Open your web browser and navigate to
-
Enable and Trigger the DAG:
- On the DAGs page, find
coingecko_production_pipeline. - Click the toggle button on the left to un-pause and activate the DAG.
- The DAG will start running automatically based on its schedule (
*/10 * * * *). You can also trigger it manually by clicking the "Play" button on the right.
- On the DAGs page, find
-
Verify Data in PostgreSQL:
- Connect to your PostgreSQL database using your preferred client (e.g., DBeaver, pgAdmin) or via the command line.
- Run a query to see the data being inserted:
SELECT * FROM bronze.data_coin_list ORDER BY fetch_timestamp DESC LIMIT 10;
- Secret Management: Migrate API keys and other sensitive data to Airflow's secret backends (like HashiCorp Vault or GCP Secret Manager) instead of using the config file.
- Data Quality Checks: Implement a dedicated task to validate data after the transform step (e.g., check for null prices, ensure data types are correct) using a library like Great Expectations.
- Idempotent Loads: Refactor the
load_taskto be idempotent (using an UPSERT or DELETE-then-INSERT pattern) to handle re-runs and backfills safely. - Alerting: Set up
on_failure_callbackto send notifications to Slack or email when a task fails. - Dashboarding: Connect a BI tool like Apache Superset or Metabase to the PostgreSQL database to visualize the collected data.