A Flask-based ETL service that receives webhooks from CommCare, stores payloads in Firestore, and processes them into a PostgreSQL database. Deployed on Google Cloud Run.
- CommCare submits form data via webhook to
/save-payload/<source> - The payload is stored in Firestore with status
new /process-jobs/<source>picks up new jobs and routes them to the correct orchestrator- Processed records are written to PostgreSQL; job status is updated in Firestore
- Failed jobs can be retried via
/retry-job/<source>
| Form Name | Orchestrator |
|---|---|
| Farmer Registration / Edit Farmer Details / Field Day Farmer Registration | Participant Registration & Update |
| Attendance Full / Field Day Attendance Full | Attendance Full |
| Attendance Light | Attendance Light |
| Training Observation / Demo Plot Observation | Observation |
| Farm Visit Full / Farm Visit - AA | Farm Visit |
| Wet Mill Registration Form | Wetmill Registration |
| Wet Mill Visit | Wetmill Visit |
- Python 3.10+
- PostgreSQL instance
- Google Cloud project with Firestore and Cloud Run enabled
- Google Cloud SDK
-
Clone the repository:
git clone https://github.com/TechnoServe/pima-integration.git cd pima-integration -
Create and activate a virtual environment:
python -m venv venv source venv/bin/activate -
Install dependencies:
pip install -r requirements.txt
-
Set up environment variables — create a
.envfile in the root:SYSTEM_USER_ID_TEST=<your-system-user-id> DATABASE_URL=postgresql://user:password@localhost:5432/pima GOOGLE_APPLICATION_CREDENTIALS=<path-to-your-service-account.json>
-
Run database migrations:
alembic upgrade head
-
Start the app:
python app/main.py
The app runs on
http://localhost:8080.
POST /save-payload/<source>
source can be commcare or cc.
curl -X POST http://localhost:8080/save-payload/commcare \
-H "Content-Type: application/json" \
-d '{"id": "abc123", "form": {"@name": "Farmer Registration", ...}}'GET /process-jobs/<source>
Picks up to 10 new jobs from Firestore and processes them into PostgreSQL.
curl http://localhost:8080/process-jobs/commcare# Auto-retry all failed jobs (up to 3 retries)
GET /retry-job/<source>
# Retry a single job
GET /retry-job/<source>/<job_id>
# Bulk retry by list of IDs
POST /retry-job/<source>
Content-Type: application/json
{"ids": ["id1", "id2"]}
GET /status-count/<source>
Returns counts for new, processing, failed, and completed jobs.
GET /failed-jobs/<source>?start_date=2025-01-01&end_date=2025-12-31&job_name=Farmer Registration
# All payloads (optional ?limit=20)
GET /get-payload/<source>
# Single payload
GET /get-payload/<source>/<job_id>
# Bulk fetch
POST /get-payload/<source>
Content-Type: application/json
{"ids": ["id1", "id2"]}
POST /update-payloads/<source>
Content-Type: application/json
{"status": "new", "run_retries": 0, "job_ids": ["id1", "id2"]}
Build the container image:
gcloud builds submit --tag gcr.io/pima-gcp/pima-integration-appDeploy:
gcloud run deploy pima-integration-app \
--image gcr.io/pima-gcp/pima-integration-app \
--platform managed \
--allow-unauthenticated \
--region europe-west1 \
--network=default \
--subnet=default# Create a new migration
alembic revision --autogenerate -m "describe your change"
# Apply migrations
alembic upgrade head
# Rollback one step
alembic downgrade -1pima-integration/
├── app/
│ ├── core/ # DB init, Firestore utils, logging, mapping
│ ├── jobs/
│ │ └── commcare_to_postgresql/ # Orchestrators per form type
│ ├── models/ # SQLAlchemy models
│ ├── schemas/ # Pydantic schemas
│ └── main.py # Flask app & routes
├── alembic/ # Migration environment
├── migrations/ # Migration scripts
├── requirements.txt
└── pyproject.toml
-
Fork the repo and create a branch from
main:git checkout -b feature/your-feature-name
-
Make your changes — add an orchestrator in
jobs/commcare_to_postgresql/and register it in thejob_mappinginmain.py. -
Write tests using
pytest:pytest
-
Commit with a clear message:
git commit -m "feat: add support for new form type" -
Push and open a Pull Request against
main.
- Create a new orchestrator file in
app/jobs/commcare_to_postgresql/ - Implement a class with a
process_data(payload, system_id)method - Export it from
app/jobs/commcare_to_postgresql/__init__.py - Add the mapping in
job_mappinginmain.py - Add it to
MIGRATED_FORM_TYPESincoreif it should be processed
- Flask — Web framework
- SQLAlchemy + Alembic — ORM & migrations
- Google Cloud Firestore — Job queue & status tracking
- Google Cloud Run — Deployment
- psycopg2 — PostgreSQL driver
- GeoAlchemy2 + Shapely — Geospatial support
Maintained by TechnoServe