A comprehensive data integration and processing platform that orchestrates bi-directional data synchronization between CommCare mobile applications, Salesforce CRM, and PostgreSQL databases for TechnoServe's coffee portfolio programs.
This integration platform serves as the central data processing hub for TechnoServe's coffee portfolio programs, handling complex data workflows between CommCare mobile data collection, Salesforce CRM management, and PostgreSQL analytics databases. The system processes multiple form types including farmer registrations, training sessions, farm visits, wet mill operations, and attendance tracking across multiple countries.
- π Bi-directional Sync: CommCare β Salesforce β PostgreSQL data synchronization
- π Multi-Program Support: Handles PIMA Agronomy and PIMA Sustainability programs
- π Asynchronous Processing: Firestore-based queue system for reliable data processing
- π Multi-Country Operations: Supports coffee programs across different regions
- π± Mobile-First: Optimized for CommCare mobile data collection workflows
- π Comprehensive Logging: Structured logging with request tracking and error handling
- β»οΈ Auto-Retry Logic: Intelligent retry mechanisms for failed processing jobs
- π Real-time Monitoring: Status tracking and processing analytics
βββββββββββββββββββ ββββββββββββββββββββββββ βββββββββββββββββββ
β CommCare ββββββ Flask Web Service ββββββ Salesforce β
β Mobile Forms β β (Cloud Run) β β CRM β
β β β β β β
β β’ Registrations β β βββββββββββββββββββ β β β’ Participants β
β β’ Farm Visits βββββββββ Firestore β ββββββ β’ Training Data β
β β’ Training Obs. β β β Queue System β β β β’ Farm Records β
β β’ Attendance β β βββββββββββββββββββ β β β’ Observations β
βββββββββββββββββββ β β βββββββββββββββββββ
β βββββββββββββββββββ β
β β PostgreSQL β β
β β (PostGIS) β β
β β β β
β β β’ Wet Mills β β
β β β’ Survey Data β β
β β β’ Geospatial β β
β βββββββββββββββββββ β
ββββββββββββββββββββββββ
- Farmer Registration - New farmer onboarding and profile creation
- Edit Farmer Details - Farmer profile updates and modifications
- Field Day Farmer Registration - Event-specific farmer registration
- Attendance Full/Light - Training session participation tracking
- Field Day Attendance - Event attendance management
- Training Observation - Training quality assessment and feedback
- Demo Plot Observation - Agricultural demonstration tracking
- Farm Visit Full/AA - Comprehensive farm assessment visits
- Wet Mill Registration - Coffee processing facility registration
- Wet Mill Visit - Facility inspection and survey data collection
- Participant Data - Farmer and participant information sync
- Training Groups - Training cohort management
- Training Sessions - Session scheduling and details
- Project Roles - Staff and role assignments
- Household Sampling - Household survey data
app/
βββ jobs/
β βββ commcare_to_postgresql/
β β βββ wetmill_registration.py # Wet mill facility processing
β β βββ wetmill_visit.py # Survey data and geospatial processing
β βββ commcare_to_salesforce/
β β βββ registration.py # Farmer registration workflows
β β βββ attendance.py # Training attendance processing
β β βββ training_observation.py # Training quality assessments
β β βββ demoplot_observation.py # Demo plot data processing
β β βββ farm_visit.py # Comprehensive farm visit processing
β βββ salesforce_to_commcare/
β βββ process_commcare_data.py # Parallel data sync to CommCare
βββ utils/
β βββ models.py # SQLAlchemy database models
β βββ postgres.py # PostgreSQL connection management
β βββ mappings.py # Data transformation mappings
β βββ logging_config.py # Centralized logging configuration
β βββ [various utility modules] # Processing utilities
βββ main.py # Flask application and routing
- Python 3.8+
- PostgreSQL 14+ with PostGIS extension
- Google Cloud SDK
- CommCare API credentials
- Salesforce API credentials
- Google Firestore database
-
Clone and set up environment
git clone https://github.com/TechnoServe/commcare-sf-integration.git cd commcare-sf-integration python3 -m venv venv source venv/bin/activate # Windows: venv\Scripts\activate pip install -r requirements.txt
-
Configure environment variables
cp .env.example .env # Edit .env with your credentials -
Set up PostgreSQL with PostGIS
# Ubuntu/Debian sudo apt update sudo apt install postgresql-14-postgis-3 # Create database and extensions createdb commcare_integration psql -d commcare_integration -c "CREATE EXTENSION IF NOT EXISTS postgis;" psql -d commcare_integration -c "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";"
-
Initialize database
python3 -c "from utils.postgres import init_db; init_db()" -
Run locally
python3 app/main.py
POST /process-data-cc
Content-Type: application/json
# Processes incoming CommCare form submissions
# Routes to appropriate handler based on form type
# Stores in Firestore queue for async processingPOST /process-data-sf
Content-Type: application/json
# Processes incoming Salesforce data
# Handles participant, training, and project data
# Queues for CommCare synchronizationPOST /process-firestore-to-sf
# Processes batched CommCare data to Salesforce
# Handles up to 10 records per batch
# Updates Firestore status trackingPOST /process-firestore-to-cc
# Processes Salesforce data to CommCare
# Handles 1 record per batch (due to complexity)
# XML generation and parallel processingPOST /auto-retry-firestore-to-sf
POST /auto-retry-firestore-to-cc
# Automatically retries failed records
# Up to 3 retry attempts per record
# Incremental backoff strategyGET /retry/sf/{record_id}
GET /retry/cc/{record_id}
# Manual retry for specific failed records
# Updates retry counters and timestamps
# Useful for debugging and manual interventionGET /record/{collection}/{id}
# Retrieves specific record from Firestore
# Returns processing history and current status
# Useful for debugging and trackingGET /failed/sf
GET /failed/cc
# Returns all failed jobs with error details
# Includes retry counts and failure reasons
# Essential for monitoring system healthGET /status-count/{collection}
# Returns processing statistics
# Counts: new, processing, completed, failed
# Real-time system health monitoring# Salesforce Configuration
SALESFORCE_ENV=sandbox # or production
SF_TEST_USERNAME=your_sandbox_username
SF_PROD_USERNAME=your_production_username
SF_PROD_PASSWORD=your_password
SF_PROD_SECURITY_TOKEN=your_security_token
# CommCare Configuration
CC_DOMAIN=your_commcare_domain
CC_API_KEY=your_api_key
CC_USERNAME=your_username
# Database Configuration
DATABASE_URL=postgresql://user:password@host:port/database
# Google Cloud
GOOGLE_APPLICATION_CREDENTIALS=path/to/service-account.json
GOOGLE_CLOUD_PROJECT=your-project-idThe system processes these migrated form types:
migrated_form_types = [
"Farmer Registration", "Attendance Full - Current Module",
'Edit Farmer Details', 'Training Observation',
"Attendance Light - Current Module", 'Participant',
"Training Group", "Training Session", "Project Role",
"Household Sampling", "Demo Plot Observation", "Farm Visit Full",
"Farm Visit - AA", "Field Day Farmer Registration", "Field Day Attendance Full",
"Wet Mill Registration Form", "Wet Mill Visit"
]CREATE TABLE wetmills (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
wet_mill_unique_id VARCHAR NOT NULL,
commcare_case_id VARCHAR UNIQUE,
name VARCHAR,
mill_status VARCHAR,
exporting_status VARCHAR,
vertical_integration VARCHAR,
manager_name VARCHAR,
manager_role VARCHAR,
programme VARCHAR,
country VARCHAR,
comments TEXT,
ba_signature VARCHAR,
manager_signature VARCHAR,
tor_page_picture VARCHAR,
office_entrance_picture VARCHAR,
registration_date DATE,
date_ba_signature DATE,
office_gps geometry(Point,4326),
user_id UUID REFERENCES tbl_users(user_id),
created_at TIMESTAMP DEFAULT now(),
updated_at TIMESTAMP DEFAULT now()
);CREATE TABLE form_visits (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
submission_id VARCHAR UNIQUE NOT NULL,
wetmill_id UUID REFERENCES wetmills(id) ON DELETE CASCADE,
user_id UUID,
form_name VARCHAR NOT NULL,
visit_date TIMESTAMP WITHOUT TIME ZONE NOT NULL,
entrance_photograph VARCHAR,
geo_location geometry(Point,4326),
created_at TIMESTAMP DEFAULT now()
);CREATE TABLE survey_responses (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
submission_id VARCHAR UNIQUE NOT NULL,
form_visit_id UUID REFERENCES form_visits(id) ON DELETE CASCADE,
survey_type VARCHAR NOT NULL,
completed_date TIMESTAMP WITHOUT TIME ZONE,
general_feedback TEXT,
created_at TIMESTAMP DEFAULT now()
);CREATE TABLE survey_question_responses (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
submission_id VARCHAR UNIQUE NOT NULL,
survey_response_id UUID REFERENCES survey_responses(id) ON DELETE CASCADE,
section_name VARCHAR,
question_name VARCHAR NOT NULL,
field_type VARCHAR NOT NULL,
value_text TEXT,
value_number DOUBLE PRECISION,
value_boolean BOOLEAN,
value_date TIMESTAMP WITHOUT TIME ZONE,
value_gps geometry(Point,4326)
);- Purpose: Queue CommCare form submissions for Salesforce processing
- Fields:
data,job_name,status,timestamp,run_retries,error - Statuses:
new,processing,completed,failed
- Purpose: Queue Salesforce data for CommCare synchronization
- Fields:
data,job_name,status,timestamp,run_retries,error - Processing: Generates XML for CommCare case creation/updates
- Form Submission: CommCare sends webhook to
/process-data-cc - Immediate Storage: Data stored in Firestore with
newstatus - Batch Processing: Scheduler triggers
/process-firestore-to-sf - Data Transformation: Form data mapped to Salesforce objects
- Multi-Step Processing: Complex forms processed in sequential steps
- Status Update: Firestore status updated to
completedorfailed - Error Handling: Failed records eligible for auto-retry
- Data Trigger: Salesforce sends data to
/process-data-sf - Queue Storage: Data stored in Firestore for processing
- XML Generation: Salesforce data converted to CommCare XML format
- Parallel Processing: Multiple records processed concurrently
- Case Management: Creates/updates CommCare cases via API
- Status Tracking: Processing results tracked in Firestore
- Wet Mill Forms: Processed immediately without queuing
- Spatial Data: GPS coordinates stored as PostGIS geometry
- Survey Processing: Dynamic survey responses with flexible schema
- Upsert Logic: Updates existing records or creates new ones
- Data Validation: Type inference and validation for survey responses
# Set up Google Cloud project
gcloud config set project your-project-id
# Enable required services
gcloud services enable cloudbuild.googleapis.com
gcloud services enable run.googleapis.com
gcloud services enable firestore.googleapis.com
# Build container
gcloud builds submit --tag gcr.io/your-project/cc-sf-integration-app
# Deploy to Cloud Run
gcloud run deploy cc-sf-integration-app \
--image gcr.io/your-project/cc-sf-integration-app \
--platform managed \
--region us-central1 \
--allow-unauthenticated \
--memory 2Gi \
--cpu 2 \
--timeout 3600 \
--concurrency 10 \
--max-instances 50# Set environment variables
gcloud run services update cc-sf-integration-app \
--set-env-vars "SALESFORCE_ENV=production,DATABASE_URL=your_db_url"# Create scheduler jobs for batch processing
gcloud scheduler jobs create http sf-batch-processor \
--schedule="*/5 * * * *" \
--uri="https://your-service-url/process-firestore-to-sf" \
--http-method=POST
gcloud scheduler jobs create http cc-batch-processor \
--schedule="*/2 * * * *" \
--uri="https://your-service-url/process-firestore-to-cc" \
--http-method=POST
# Auto-retry failed records
gcloud scheduler jobs create http auto-retry-sf \
--schedule="0 */2 * * *" \
--uri="https://your-service-url/auto-retry-firestore-to-sf" \
--http-method=POSTThe system uses structured logging with request tracking:
logger.info({
"message": "Processing farm visit",
"request_id": request_id,
"job_name": job_name,
"processing_step": "best_practices"
})- Processing Rate: Records processed per minute
- Queue Depth: Number of pending records in Firestore
- Error Rate: Failed processing percentage
- Retry Success: Auto-retry effectiveness
- Processing Time: Average time per record type
# View processing logs
gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=cc-sf-integration-app" --limit=100
# Monitor errors
gcloud logging read "resource.type=cloud_run_revision AND severity>=ERROR" --limit=50
# Track specific request
gcloud logging read "jsonPayload.request_id=\"your-request-id\"" --limit=20-
Database Connection Errors
# Check PostgreSQL connectivity psql $DATABASE_URL -c "SELECT version();" # Verify PostGIS extension psql $DATABASE_URL -c "SELECT PostGIS_version();"
-
Salesforce Authentication Issues
# Test Salesforce connection python3 -c "from main import authenticate_salesforce; print(authenticate_salesforce())"
-
CommCare API Issues
# Verify CommCare credentials curl -H "Authorization: ApiKey username:apikey" \ https://www.commcarehq.org/a/domain/api/v0.5/user/
-
Firestore Queue Backlog
# Check queue status curl https://your-service-url/status-count/commcare_collection
- Batch Size Tuning: Adjust query limits in
process_firestore_records() - Concurrency Limits: Modify semaphore limits for parallel processing
- Database Indexing: Add indexes for frequently queried fields
- Memory Allocation: Increase Cloud Run memory for large batches
- Code Style: Follow PEP 8 standards
- Error Handling: Always include try-except blocks with logging
- Documentation: Update README for new form types or endpoints
- Testing: Add unit tests for new processing functions
- Update Form List: Add to
migrated_form_typesinmain.py - Create Processor: Add handler in appropriate
jobs/directory - Add Routing: Update processing logic in main Flask routes
- Test Thoroughly: Verify end-to-end processing
- Technical Issues: Create GitHub issues with detailed error logs
- TechnoServe PIMA Team: Internal Slack
#pima - Production Issues: Follow incident response procedures
This project is licensed under the MIT License - see the LICENSE file for details.
Built by TechnoServe for creating business solutions to poverty