A comprehensive data processing pipeline for oceanographic data from ARGO floats, featuring a FastAPI wrapper, ChromaDB vector storage, and automated data ingestion from NetCDF files.
This project processes oceanographic data from ARGO floats (autonomous robotic platforms that measure ocean temperature, salinity, and pressure), stores it in both PostgreSQL (Supabase) and ChromaDB vector database, and provides a REST API for data access and semantic search capabilities.
- Data Processing: Automated ingestion of NetCDF files from ARGO float data
- Vector Database: ChromaDB integration with semantic embeddings for oceanographic data
- Dual Storage: PostgreSQL (Supabase) for structured data, ChromaDB for vector search
- REST API: FastAPI wrapper with file upload and data retrieval endpoints
- Measurements Extraction: Automatic extraction of temperature, pressure, and salinity statistics
- Comprehensive Testing: Advanced query testing framework with 74 challenging test cases
- Data Management: Utilities for cleanup, export, and database maintenance
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ NetCDF Files │───▶│ FastAPI Server │───▶│ ChromaDB │
│ (ARGO Data) │ │ (Port 8080) │ │ (localhost:8000)│
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌──────────────────┐
│ Supabase DB │
│ (PostgreSQL) │
└──────────────────┘
- Python 3.10+
- PostgreSQL database (Supabase)
- ChromaDB server
-
Clone the repository:
git clone <repository-url> cd FloatChat-PreProcessing-Pipeline
-
Create virtual environment:
python -m venv venv venv\Scripts\activate # Windows # or source venv/bin/activate # Linux/Mac
-
Install dependencies:
pip install -r requirements.txt
-
Environment configuration: Create a
.envfile with:SUPABASE_URL=your_supabase_url SUPABASE_KEY=your_supabase_key CHROMADB_HOST=localhost CHROMADB_PORT=8000
-
Start ChromaDB server:
chroma run --host localhost --port 8000
python fastapi_app.pyThe API will be available at http://localhost:8080
curl -X POST "http://localhost:8080/upload-file/" \
-H "accept: application/json" \
-H "Content-Type: multipart/form-data" \
-F "file=@your_file.nc"Visit http://localhost:8080/docs for interactive API documentation.
- POST
/upload-file/- Upload NetCDF files for processing
- Extracts measurements (temperature, pressure, salinity)
- Stores data in both Supabase and ChromaDB
- Returns processing summary with statistics
- GET
/floats/- Retrieve all float data from Supabase
- Returns structured data with metadata
- GET
/- API health check endpoint
- Returns service status
Main orchestration script that handles:
- NetCDF file processing
- Data extraction and transformation
- Database storage coordination
- Error handling and logging
# Example usage
python main.pyManages database connections with:
- Connection pooling and retry logic
- Supabase PostgreSQL integration
- ChromaDB client management
- Robust error recovery
REST API features:
- File upload with validation
- Automatic measurement extraction
- Dual database storage
- Enhanced error handling
- Comprehensive response formatting
Processes measurement data:
- Temperature, pressure, salinity statistics
- JSON parsing and validation
- ChromaDB metadata enhancement
- Progress tracking
Comprehensive testing with 74 challenging queries across 8 categories:
-
Geographic Queries (9 tests)
- Regional data searches
- Coordinate-based filtering
- Ocean basin analysis
-
Temporal Queries (10 tests)
- Date range filtering
- Seasonal analysis
- Multi-year comparisons
-
Measurement Range Queries (8 tests)
- Temperature thresholds
- Pressure ranges
- Salinity boundaries
-
Statistical Queries (9 tests)
- Extremes identification
- Variance analysis
- Distribution patterns
-
Deep Water Queries (9 tests)
- Depth-specific analysis
- Vertical profiling
- Deep ocean conditions
-
Environmental Condition Queries (9 tests)
- Multi-parameter filtering
- Environmental correlations
- Condition-based searches
-
Data Quality Queries (10 tests)
- Quality flag analysis
- Data validation
- Error detection
-
Complex Multi-Dimensional Queries (10 tests)
- Cross-parameter analysis
- Advanced filtering
- Multi-constraint searches
python test_advanced_chromadb_queries.pyCleanup ChromaDB (cleanup_chromadb.py):
python cleanup_chromadb.pyExport ChromaDB to Text (export_chromadb_to_text.py):
python export_chromadb_to_text.pyUpdate Measurements (update_chromadb_measurements.py):
python update_chromadb_measurements.pySync Supabase to ChromaDB (sync_supabase_to_chromadb.py):
python sync_supabase_to_chromadb.pySchema Validation (check_floats_schema.py):
python check_floats_schema.pyConnection Testing (test_connection.py):
python test_connection.pyEnvironment Testing (test_env.py):
python test_env.pyFastAPI Testing (test_fastapi.py):
python test_fastapi.pypython main.py --mode sample --sample small_testpython main.py --mode download --start-date 2023-01-01 --end-date 2023-01-31 --region indian_ocean --source ifremerpython main.py --mode file --file path/to/your/file.nc- Downloads Argo NetCDF datasets from ERDDAP servers
- Loads and validates NetCDF files using xarray
- Extracts essential oceanographic fields
- Cleans and validates oceanographic data
- Converts to database-ready format
- Handles missing values and outliers
- Manages Supabase PostgreSQL connections
- Implements the schema defined in
schema.sql - Handles bulk data insertions
- Generates natural language summaries of float metadata
- Creates vector embeddings using sentence-transformers
- Stores embeddings for semantic search
- Coordinates the entire pipeline
- Provides CLI interface with multiple modes
- Handles error logging and recovery
The pipeline uses the existing schema defined in ingest/schema.sql:
- floats: Float metadata and deployment information
- profiles: Individual profile measurements (temperature, salinity)
- float_embeddings: Vector embeddings for semantic search
# Process a small test dataset
python main.py --mode sample
# Process with custom cache directory
python main.py --mode sample --cache-dir "D:\argo_cache"
# Disable embeddings generation
python main.py --mode sample --no-embeddings# Download Indian Ocean data for specific time range
python main.py --mode download --start-date 2023-06-01 --end-date 2023-06-30 --region indian_ocean --source ifremer
# Download from different source
python main.py --mode download --source ncei --region indian_ocean# Process a local NetCDF file
python main.py --mode file --file "data/argo_sample.nc"-
Import Errors: Ensure all dependencies are installed with
pip install -r requirements.txt -
Database Connection Errors: Verify your Supabase credentials in the
.envfile -
NetCDF Download Failures: Check internet connection and ERDDAP server availability
-
Memory Issues: Use smaller time ranges or batch sizes for large datasets
Pipeline logs are written to pipeline.log and also displayed in the console.
The pipeline supports downloading from multiple ERDDAP servers:
- IFREMER: https://www.ifremer.fr/erddap
- NCEI: https://data.nodc.noaa.gov/erddap
- INCOIS: https://incois.gov.in/erddap
floats table:
CREATE TABLE floats (
float_id SERIAL PRIMARY KEY,
platform_number VARCHAR(20),
deploy_date DATE,
properties JSONB
);Metadata structure:
{
"float_id": "string",
"platform_number": "string",
"deploy_date": "string",
"latitude": "float",
"longitude": "float",
"measurements": {
"temperature": {
"mean": "float",
"std": "float",
"min": "float",
"max": "float"
},
"pressure": {
"mean": "float",
"std": "float",
"min": "float",
"max": "float"
},
"salinity": {
"mean": "float",
"std": "float",
"min": "float",
"max": "float"
}
}
}SUPABASE_URL: Your Supabase project URLSUPABASE_KEY: Your Supabase API keyCHROMADB_HOST: ChromaDB server host (default: localhost)CHROMADB_PORT: ChromaDB server port (default: 8000)
The system uses ChromaDB with:
- Collection name:
floats_collection - Embedding model:
all-MiniLM-L6-v2 - Distance function: Cosine similarity
- Metadata indexing: Enabled for all fields
- Connection Pooling: Configured for optimal Supabase connections
- Retry Logic: Exponential backoff for failed connections
- Batch Processing: Efficient bulk data operations
- Index Strategy: Optimized metadata indexing in ChromaDB
- Embedding Caching: Reuse embeddings where possible
- Query Optimization: Structured queries for better performance
- Collection Management: Proper collection organization
- Memory Management: Efficient resource utilization
-
ChromaDB Connection Failed:
# Start ChromaDB server chroma run --host localhost --port 8000 -
Supabase Connection Issues:
- Verify credentials in
.envfile - Check network connectivity
- Validate API key permissions
- Verify credentials in
-
File Upload Errors:
- Ensure NetCDF file format
- Check file size limits
- Verify file path accessibility
-
Vector Search Issues:
- Confirm ChromaDB collection exists
- Validate embedding model availability
- Check query syntax
Enable debug logging:
import logging
logging.basicConfig(level=logging.DEBUG)FloatChat-PreProcessing-Pipeline/
├── fastapi_app.py # Main FastAPI application
├── main.py # Core processing pipeline
├── requirements.txt # Python dependencies
├── README.md # Project documentation
├── ingest/ # Data ingestion modules
│ ├── db_handler.py # Database connection management
│ ├── ingest.py # Data ingestion logic
│ ├── load_data.py # Data loading utilities
│ ├── preprocess.py # Data preprocessing
│ └── schema.sql # Database schema
├── test_*.py # Testing scripts
├── cleanup_*.py # Cleanup utilities
├── sync_*.py # Synchronization scripts
├── update_*.py # Update utilities
├── export_*.py # Export utilities
├── view_*.py # Data viewing utilities
├── 2019/ # Sample NetCDF data (2019)
├── argo_data/ # ARGO data directory
├── argo_data_2020_01/ # Monthly ARGO data
├── data/ # Processed data
├── docker/ # Docker configuration
└── embeddings/ # Embedding storage
- Follow PEP 8 style guidelines
- Add comprehensive tests for new features
- Update documentation for API changes
- Use type hints where applicable
- Maintain backward compatibility
- Type Checking: Use mypy for static type checking
- Code Formatting: Use black for code formatting
- Linting: Use flake8 for code linting
- Testing: Maintain >90% test coverage
- Use bulk operations for large datasets
- Enable caching to avoid re-downloading files
- Consider disabling embeddings for faster processing during development
- Monitor memory usage with large NetCDF files
After successful data ingestion:
- Verify data in your Supabase dashboard
- Test semantic search with embeddings
- Proceed to Stage 2: Chatbot and Dashboard development
[Add your license information here]
For questions or issues:
- Create an issue in the repository
- Contact the development team
- Check the troubleshooting section
- ARGO float data providers
- ChromaDB for vector database capabilities
- FastAPI for the web framework
- Supabase for database hosting