Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pipelines/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ RUN mkdir /pipeline
COPY ./general_libraries/gfm_logger /pipeline/gfm_logger
COPY ./general_libraries/gfm_data_processing /pipeline/gfm_data_processing
COPY ./general_libraries/orchestrate_wrapper /pipeline/
COPY ./general_libraries/terrakit_cache /pipeline/terrakit_cache


########## Adding specific processor scripts
# inference planner
Expand Down
39 changes: 39 additions & 0 deletions pipelines/components/terrakit_data_fetch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,45 @@ docker push quay.io/geospatial-studio/template_process:v0.1.0
rm -r gfm_logger gfm_data_processing orchestrate_wrapper ./*.cwl ./*.job.yaml terrakit_data_fetch.yaml
```

## Caching Feature

The terrakit_data_fetch component now includes intelligent caching to significantly improve performance for repeated queries.

### How It Works

1. **Cache Check**: Before fetching from Terrakit, checks if data exists in cache
2. **Cache Hit**: Copies (or hardlinks) files from `/data/cache` to task folder (~2-5 seconds)
3. **Cache Miss**: Fetches from Terrakit, processes, and caches results (~30-60 seconds)
4. **Performance**: 10-30x faster for repeated queries with same spatial-temporal parameters

### Configuration

Add these environment variables to your `values.yaml`:

```yaml
- name: TERRAKIT_CACHE_ENABLED
value: "true"
- name: TERRAKIT_CACHE_DIR
value: "/data/cache" # Uses existing shared PVC
- name: TERRAKIT_CACHE_TTL_DAYS
value: "30" # Optional
- name: TERRAKIT_CACHE_MAX_SIZE_GB
value: "400" # Optional size limit
```

### Management

```bash
# Monitor cache size
kubectl exec -it <pod-name> -- du -sh /data/cache

# Clear cache
kubectl exec -it <pod-name> -- rm -rf /data/cache/*

# Disable cache temporarily
kubectl set env deployment/terrakit-data-fetch TERRAKIT_CACHE_ENABLED=false
```

## Deploy the process component
To deploy the component to OpenShift you will use the deployment script in the folder. In the deployment script you will need to:

Expand Down
72 changes: 69 additions & 3 deletions pipelines/components/terrakit_data_fetch/terrakit_data_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from gfm_data_processing.metrics import MetricManager
from gfm_data_processing.common import logger, notify_gfmaas_ui, report_exception
from gfm_data_processing.exceptions import GfmDataProcessingException
from terrakit_cache import TerrakitPVCacheManager

# Uncomment next 2 lines for local testing
# import dotenv
Expand All @@ -45,6 +46,14 @@

metric_manager = MetricManager(component_name=process_id)

# Initialize cache manager
cache_manager = TerrakitPVCacheManager(
cache_dir=os.getenv("TERRAKIT_CACHE_DIR", "/pipeline/data/terrakit_cache"),
cache_ttl_days=int(os.getenv("TERRAKIT_CACHE_TTL_DAYS", "30")),
max_cache_size_gb=float(os.getenv("TERRAKIT_CACHE_MAX_SIZE_GB")) if os.getenv("TERRAKIT_CACHE_MAX_SIZE_GB") else None,
enabled=os.getenv("TERRAKIT_CACHE_ENABLED", "true").lower() == "true"
)


def to_decibels(linear):
return 10 * np.log10(linear)
Expand Down Expand Up @@ -156,10 +165,45 @@ def terrakit_data_fetch():
output_file_date = data_date

save_filepath = f"{task_folder}/{task_id}_{modality_tag}_{output_file_date}{file_suffix}.tif"
original_input_images += [save_filepath]

imputed_file_path = f"{task_folder}/{task_id}_{modality_tag}_{output_file_date}_imputed{file_suffix}.tif"
band_names = list(band_dict.get("band_name") for band_dict in model_input_data_spec["bands"])

# Generate cache key
cache_key = cache_manager.get_cache_key(
bbox=bbox,
date=data_date,
collection_name=collection_name,
band_names=band_names,
maxcc=maxcc,
modality_tag=modality_tag,
transform=model_input_data_spec.get("transform")
)

cached_data = cache_manager.get_cached_files(cache_key)

if cached_data:
# Cache hit - copy from cache to task folder
logger.info(f"🎯 Using cached data for {modality_tag} on {data_date}")

original_pv_path = cached_data["original_pv_path"]
imputed_pv_path = cached_data["imputed_pv_path"]

# Copy (or hardlink) from cache to task folder
success_original = cache_manager.copy_cached_file(original_pv_path, save_filepath)
success_imputed = cache_manager.copy_cached_file(imputed_pv_path, imputed_file_path)

if success_original and success_imputed:
original_input_images += [save_filepath]
imputed_input_images += [imputed_file_path]
logger.info(f"✅ Successfully retrieved cached files")
continue # Skip to next modality
else:
logger.warning(f"⚠️ Failed to copy cached files, fetching from Terrakit...")

# Cache miss or copy failed - fetch from Terrakit
logger.info(f"🌍 Fetching data from Terrakit for {modality_tag} on {data_date}")

# Use tenacity for automatic retry on network errors
da = fetch_data_with_retry(
dc=dc,
Expand Down Expand Up @@ -189,10 +233,32 @@ def terrakit_data_fetch():
dai = scale_data_xarray(da, model_input_data_spec_scaling_factors)

# Imputing nans if any are found in data
imputed_file_path = f"{task_folder}/{task_id}_{modality_tag}_{output_file_date}_imputed{file_suffix}.tif"
dai = impute_nans_xarray(dai, nodata_value=nodata_value)
save_data_array_to_file(dai, imputed_file_path, imputed=True)

original_input_images += [save_filepath]
imputed_input_images += [imputed_file_path]

# Cache the files to /pipeline/data/terrakit_cache
cache_metadata = {
"date": data_date,
"bbox": bbox,
"collection": collection_name,
"bands": band_names,
"maxcc": maxcc,
"modality": modality_tag,
"nodata_value": float(nodata_value),
"transform": model_input_data_spec.get("transform"),
"inference_id": inference_id,
"task_id": task_id
}

cache_manager.cache_files(
cache_key=cache_key,
original_file_path=save_filepath,
imputed_file_path=imputed_file_path,
metadata=cache_metadata
)

######################################################################################################
### (optional) if you want to pass on information to later stages of the pipelines,
Expand Down
8 changes: 8 additions & 0 deletions pipelines/general_libraries/terrakit_cache/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# © Copyright IBM Corporation 2025
# SPDX-License-Identifier: Apache-2.0

from .pv_cache_manager import TerrakitPVCacheManager

__all__ = ["TerrakitPVCacheManager"]

# Made with Bob
Loading
Loading