Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions DATA_INGESTION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Data Ingestion & Training on Large Scale Genome Repositories

Dirghayu is designed to scale from single-sample analysis to population-level training on terabytes of genomic data (e.g., GenomeIndia, 1000 Genomes, UK Biobank).

To train the AI models (`LifespanNet-India`, `DiseaseNet-Multi`) on 100GB+ datasets, we cannot load raw VCF files into RAM. Instead, we use a **Streaming + Columnar** approach.

## 🚀 Strategy: VCF → Parquet → PyTorch Stream

1. **Ingest**: Convert raw VCFs (row-based, slow text parsing) into **Parquet** files (columnar, compressed, fast binary reads).
2. **Stream**: Use a custom PyTorch `IterableDataset` to stream batches of data from disk during training.
3. **Train**: Update models incrementally without memory limits.

---

## 🛠 Step 1: Convert VCF Repos to Parquet

Use the provided conversion script (to be created) to process your 100GB+ VCF repository.

```bash
# Example: Convert a directory of VCFs to partitioned Parquet dataset
python scripts/vcf_to_parquet.py \
--input_dir /path/to/genome_repo/vcfs/ \
--output_dir /path/to/processed_data/ \
--threads 16
```

**Why Parquet?**
- **Size Reduction**: 100GB VCF -> ~20-30GB Parquet (Snappy compression).
- **Speed**: Reading a batch of genotypes is 100x faster than parsing VCF text.
- **Queryable**: You can use SQL (via DuckDB) to inspect the data.

---

## 🔗 Step 2: Connect to Data Source

### Option A: Local / High-Performance NAS
Just point the training script to your processed directory.
```bash
python scripts/train_models.py --data_dir /mnt/genomics_data/processed/
```

### Option B: Cloud Buckets (AWS S3 / GCS)
If your repo is on the cloud, mount it using `s3fs` or `gcsfuse` so it appears as a local filesystem to PyTorch.

**AWS S3 Example:**
```bash
# Mount bucket
mkdir -p /mnt/s3_data
s3fs my-genomics-bucket /mnt/s3_data

# Train
python scripts/train_models.py --data_dir /mnt/s3_data/parquet/
```

---

## 🧬 Step 3: Training with the `GenomicBigDataset`

The `GenomicBigDataset` class (in `src/data/dataset.py`) handles the complexity:
1. It finds all `.parquet` files in your data directory.
2. It uses `pyarrow` to read chunks of data efficiently.
3. It handles "shuffling" via an in-memory buffer to ensure statistical randomness.

```python
# Code snippet (how it works internally)
dataset = GenomicBigDataset(
data_dir="/path/to/data",
features=["rs123", "rs456", ...], # List of variants to use as features
target_col="lifespan"
)
dataloader = DataLoader(dataset, batch_size=1024)
```

## 📝 Requirements for Repository Data

Your repository data should eventually be structured as a table (DataFrame) with:
- **Genotype Columns**: e.g., `rs1801133` (values: 0, 1, 2)
- **Phenotype Columns**: e.g., `age`, `has_t2d`, `bmi`

*Note: The `vcf_to_parquet.py` script helps flatten VCFs into this format, merging with a clinical metadata CSV if provided.*
105 changes: 52 additions & 53 deletions demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,129 +11,128 @@

import sys
from pathlib import Path
from typing import Dict

# Add src to path
sys.path.insert(0, str(Path(__file__).parent / "src"))

from data import parse_vcf_file, VariantAnnotator
from data import VariantAnnotator, parse_vcf_file
from models import NutrientPredictor


def run_demo(vcf_path: Path):
"""Run complete Dirghayu pipeline demo"""

print("=" * 80)
print("DIRGHAYU: India-First Longevity Genomics Platform")
print("=" * 80)

# Step 1: Parse VCF
print("\n[1/4] Parsing VCF file...")
print(f" Input: {vcf_path}")

variants_df = parse_vcf_file(vcf_path)
print(f" [OK] Found {len(variants_df)} variants")

if len(variants_df) == 0:
print(" [!] No variants found!")
return

print("\n Sample variants:")
print(variants_df[['chrom', 'pos', 'rsid', 'ref', 'alt', 'genotype']].head())
print(variants_df[["chrom", "pos", "rsid", "ref", "alt", "genotype"]].head())

# Step 2: Annotate variants
print("\n[2/4] Annotating variants with public databases...")
print(" Sources: Ensembl VEP, gnomAD")
print(" [!] This makes API calls - may take 30-60 seconds")

annotator = VariantAnnotator()
annotated_df = annotator.annotate_dataframe(variants_df)

print("\n [OK] Annotation complete!")
print("\n Annotated variants:")
print(annotated_df[['rsid', 'gene_symbol', 'consequence', 'gnomad_af']].head())
print(annotated_df[["rsid", "gene_symbol", "consequence", "gnomad_af"]].head())

# Step 3: Train model (on synthetic data for demo)
print("\n[3/4] Training nutrient deficiency predictor...")
print(" [!] Using synthetic data for demonstration")

predictor = NutrientPredictor()
predictor.train(
variants_df=annotated_df,
labels_df=None, # Would be real clinical data
epochs=30
epochs=30,
)

# Save model
model_path = Path("models/nutrient_predictor.pth")
predictor.save(model_path)

# Step 4: Generate predictions
print("\n[4/4] Generating personalized health predictions...")

predictions = predictor.predict(annotated_df)

print("\n" + "=" * 80)
print("HEALTH PREDICTION REPORT")
print("=" * 80)

# Display nutrient deficiency risks
print("\n[NUTRIENT DEFICIENCY RISK ASSESSMENT]")
print("-" * 80)

risk_levels = {
(0.0, 0.3): ("LOW", "[LOW]"),
(0.3, 0.6): ("MODERATE", "[MOD]"),
(0.6, 1.0): ("HIGH", "[HIGH]")
(0.6, 1.0): ("HIGH", "[HIGH]"),
}

for nutrient, risk_score in predictions.items():
# Determine risk level
level, icon = "UNKNOWN", "[?]"
for (low, high), (l, i) in risk_levels.items():
for (low, high), (lvl, icn) in risk_levels.items():
if low <= risk_score < high:
level, icon = l, i
level, icon = lvl, icn
break
nutrient_name = nutrient.replace('_', ' ').title()

nutrient_name = nutrient.replace("_", " ").title()
print(f"\n{icon} {nutrient_name}:")
print(f" Risk Score: {risk_score:.2%}")
print(f" Risk Level: {level}")

# Provide recommendations based on risk
if risk_score > 0.6:
recommendations = get_recommendations(nutrient)
print(f" Recommendations:")
print(" Recommendations:")
for rec in recommendations:
print(f" - {rec}")

# Genetic insights from annotated variants
print("\n" + "=" * 80)
print("🧬 GENETIC INSIGHTS")
print("=" * 80)

# Look for key variants
key_variants = {
'rs1801133': 'MTHFR C677T - Affects folate metabolism',
'rs429358': 'APOE e4 - Increased Alzheimer\'s risk',
'rs601338': 'FUT2 - Affects vitamin B12 absorption',
'rs2228570': 'VDR FokI - Affects vitamin D receptor'
"rs1801133": "MTHFR C677T - Affects folate metabolism",
"rs429358": "APOE e4 - Increased Alzheimer's risk",
"rs601338": "FUT2 - Affects vitamin B12 absorption",
"rs2228570": "VDR FokI - Affects vitamin D receptor",
}
found_variants = annotated_df[annotated_df['rsid'].isin(key_variants.keys())]

found_variants = annotated_df[annotated_df["rsid"].isin(key_variants.keys())]

if len(found_variants) > 0:
print("\nKey variants detected:")
for _, var in found_variants.iterrows():
rsid = var['rsid']
rsid = var["rsid"]
if rsid in key_variants:
print(f"\n - {rsid} ({var['genotype']})")
print(f" Gene: {var.get('gene_symbol', 'Unknown')}")
print(f" Impact: {key_variants[rsid]}")
print(f" Population frequency: {var.get('gnomad_af', 'Unknown')}")
else:
print("\n No high-impact variants detected in this sample")

print("\n" + "=" * 80)
print("[OK] Demo complete!")
print("=" * 80)
Expand All @@ -148,34 +147,34 @@ def run_demo(vcf_path: Path):

def get_recommendations(nutrient: str) -> list:
"""Get dietary/lifestyle recommendations for nutrient deficiency risk"""

recommendations = {
'vitamin_b12': [
"vitamin_b12": [
"Consider B12 supplementation (methylcobalamin 1000 mcg/day)",
"Increase fortified foods (cereals, plant milk)",
"If vegetarian, consult about B12 injections",
"Monitor serum B12 levels every 6 months"
"Monitor serum B12 levels every 6 months",
],
'vitamin_d': [
"vitamin_d": [
"Vitamin D3 supplementation (2000 IU/day)",
"15 minutes sun exposure daily (10 AM - 12 PM)",
"Include fatty fish, egg yolks, fortified milk",
"Check 25(OH)D levels quarterly"
"Check 25(OH)D levels quarterly",
],
'iron': [
"iron": [
"Iron-rich foods (lentils, spinach, fortified grains)",
"Vitamin C with meals to enhance absorption",
"Avoid tea/coffee with iron-rich meals",
"Consider iron supplementation if confirmed deficient"
"Consider iron supplementation if confirmed deficient",
],
'folate': [
"folate": [
"Methylfolate supplementation (400-800 mcg/day)",
"Leafy greens, legumes, fortified grains",
"Ensure adequate B6 and B12 intake",
"Monitor homocysteine levels"
]
"Monitor homocysteine levels",
],
}

return recommendations.get(nutrient, ["Consult healthcare provider"])


Expand All @@ -186,14 +185,14 @@ def get_recommendations(nutrient: str) -> list:
else:
# Use sample VCF
vcf_path = Path("data/sample.vcf")

if not vcf_path.exists():
print(f"Error: VCF file not found: {vcf_path}")
print("\nUsage:")
print(" python demo.py <path_to_vcf_file>")
print("\nOr create sample data first:")
print(" python scripts/download_data.py")
sys.exit(1)

# Run demo
run_demo(vcf_path)
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ dependencies = [
"pandas>=2.0.0",
"numpy>=1.24.0",
"scipy>=1.11.0",
"shap>=0.49.1",
"matplotlib>=3.10.8",

# Genomics-specific
"cyvcf2>=0.30.0",
Expand All @@ -57,6 +59,10 @@ dependencies = [
"fastapi>=0.104.0",
"uvicorn>=0.24.0",
"pydantic>=2.4.0",
"python-multipart>=0.0.9",

# Reporting
"fpdf>=1.7.2",

# Utilities
"requests>=2.31.0",
Expand All @@ -76,6 +82,9 @@ cloud = [
"google-cloud-bigquery>=3.13.0",
]

[tool.hatch.build.targets.wheel]
packages = ["src/api", "src/data", "src/models", "src/reports"]

[tool.uv]
# uv-specific configuration for faster installs
dev-dependencies = [
Expand Down
6 changes: 6 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ scikit-learn>=1.3.0
pandas>=2.0.0
numpy>=1.24.0
scipy>=1.11.0
shap>=0.49.1 # Explainability
matplotlib>=3.10.8 # Plotting

# Genomics-specific
cyvcf2>=0.30.0 # Fast VCF parsing
pysam>=0.21.0 # BAM/VCF handling
biopython>=1.81 # Sequence analysis

# Reporting
fpdf>=1.7.2 # PDF Generation

# Data storage (local-friendly)
pyarrow>=13.0.0 # Parquet files
duckdb>=0.9.0 # SQL on Parquet, no server
Expand All @@ -20,6 +25,7 @@ polars>=0.19.0 # Fast dataframes (Rust-based)
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.4.0
python-multipart>=0.0.9 # Form data support

# Utilities
requests>=2.31.0
Expand Down
Loading