diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml new file mode 100644 index 0000000..1547352 --- /dev/null +++ b/.github/workflows/docker-build.yml @@ -0,0 +1,64 @@ +name: Build and Push Docker Image + +on: + push: + branches: + - master + - dockerise + tags: + - 'v*' + pull_request: + branches: + - master + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + build: + runs-on: ubuntu-latest + + permissions: + contents: read + packages: write + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=sha + type=raw,value=latest,enable={{is_default_branch}} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + context: . + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Image digest + run: echo "Image pushed successfully to ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}" diff --git a/Dockerfile b/Dockerfile index f1d50e9..97ebae4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,19 @@ -FROM python:3.10 +FROM python:3.11-slim WORKDIR /app -ENV PYTHONBUFFERED 1 +ENV PYTHONUNBUFFERED=1 + +# Install system dependencies for SSL certificates +RUN apt-get update && \ + apt-get install -y --no-install-recommends ca-certificates && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Create symlink for RedHat-style certificate path (required by librdkafka OIDC) +# librdkafka's OIDC token retrieval looks for /etc/pki/tls/certs/ca-bundle.crt +RUN mkdir -p /etc/pki/tls/certs && \ + ln -s /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt RUN pip install --upgrade pip COPY ./requirements.txt /app @@ -10,4 +21,7 @@ RUN pip install -r requirements.txt COPY . /app +# Install the gwtm_cron package +RUN pip install -e . + CMD ["python", "src/gwtm_cron/gwtm_listener/listener.py"] diff --git a/README.md b/README.md index 4e28383..dad7557 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,273 @@ # gwtm_cron -Gravitational Wave Treasure Map cron functions -* GCN Listener -* Others coming soon -### Build/Deploy with docker -```bash +Gravitational Wave Treasure Map cron functions - a Kafka-based listener system for processing: +* LIGO/Virgo/KAGRA gravitational wave alerts +* IceCube neutrino coincidence notices + +**Requirements**: Python 3.11+ + +## How It Works + +The listeners are **real-time streaming processors** that: + +1. **Subscribe to NASA GCN Kafka streams** + - LIGO Listener: `igwn.gwalert` topic for gravitational wave detections + - IceCube Listener: `gcn.notices.icecube.lvk_nu_track_search` for neutrino coincidences + +2. **Process each alert** as it arrives (typically within seconds of detection): + - Parse alert JSON and extract metadata (event ID, classification, instruments) + - Decode and analyze probability skymaps (FITS format) + - Calculate sky localization statistics (90%/50% credible areas, average position) + - Generate derived products: + - Sky contours (GeoJSON for visualization) + - MOC (Multi-Order Coverage) files + - Satellite visibility maps (Fermi, LAT) + - Query galaxy catalogs to identify potential host galaxies + +3. **Store products** to cloud storage (S3/Azure/OpenStack Swift): + - Raw alert JSON + - Processed FITS skymaps + - Visualization-ready contours and maps + - One event can produce 5-10 files as it evolves (Early Warning → Preliminary → Update) + +4. **POST to GWTM API** for public consumption by astronomers worldwide +**Important**: Listeners only process **new alerts** that arrive after they start. Historical alerts are not backfilled. If you start with empty storage, it will remain empty until the next gravitational wave or neutrino detection is announced. + +## Quick Start + +### Docker Compose (Local Development) + +```bash +# Build and run both listeners docker compose up -docker build -t gwtm_cron . +# Run specific listener +docker compose up ligo-listener +docker compose up icecube-listener +``` -docker tag gwtm_cron:latest 929887798640.dkr.ecr.us-east-2.amazonaws.com/gwtm_cron_listener:latest +### Manual Docker Build +```bash +docker build -t gwtm_cron . +docker tag gwtm_cron:latest 929887798640.dkr.ecr.us-east-2.amazonaws.com/gwtm_cron_listener:latest ./ecrlogin.sh - docker push 929887798640.dkr.ecr.us-east-2.amazonaws.com/gwtm_cron_listener:latest ``` +**Note**: GitHub Actions automatically builds and pushes to ECR on commits to master. + +## Environment Variables + +The following environment variables are required for Docker/Kubernetes/Helm deployments: + +### Required Variables + +#### Kafka Configuration +- `KAFKA_CLIENT_ID` - GCN Kafka client ID for authentication +- `KAFKA_CLIENT_SECRET` - GCN Kafka client secret + +#### GWTM API Configuration +- `API_TOKEN` - Authentication token for GWTM API +- `API_BASE` - Base URL for GWTM API (e.g., `https://treasuremap.space/api/v0/`) + +#### Cloud Storage Configuration + +**Option 1: AWS S3** +- `AWS_ACCESS_KEY_ID` - AWS access key +- `AWS_SECRET_ACCESS_KEY` - AWS secret key +- `AWS_DEFAULT_REGION` - AWS region (default: `us-east-2`) +- `AWS_BUCKET` - S3 bucket name (default: `gwtreasuremap`) +- `STORAGE_BUCKET_SOURCE=s3` - Set to `s3` for AWS storage + +**Option 2: Azure Blob Storage** +- `AZURE_ACCOUNT_NAME` - Azure storage account name +- `AZURE_ACCOUNT_KEY` - Azure storage account key +- `STORAGE_BUCKET_SOURCE=abfs` - Set to `abfs` for Azure storage + +**Option 3: OpenStack Swift** +- `OS_AUTH_URL` - OpenStack authentication endpoint (e.g., `https://openstack.example.com:5000/v3`) +- `OS_USERNAME` - OpenStack username +- `OS_PASSWORD` - OpenStack password +- `OS_PROJECT_NAME` - OpenStack project/tenant name +- `OS_USER_DOMAIN_NAME` - User domain name (default: `Default`) +- `OS_PROJECT_DOMAIN_NAME` - Project domain name (default: `Default`) +- `OS_CONTAINER_NAME` - Swift container name (default: `gwtreasuremap`) +- `STORAGE_BUCKET_SOURCE=swift` - Set to `swift` for OpenStack storage + +### Optional Variables + +- `OBSERVING_RUN` - Observing run identifier (default: `O4`) +- `PATH_TO_GALAXY_CATALOG_CONFIG` - Path to galaxy catalog config file (only needed for LIGO listener if generating galaxy lists) + +### Listener Control Variables + +- `DRY_RUN` - Set to `true` or `1` to run in dry-run mode (no API calls, no storage writes) +- `WRITE_TO_STORAGE` - Set to `false` or `0` to disable storage writes (default: `true`) +- `VERBOSE` - Set to `false` or `0` to disable verbose logging (default: `true`) + +**Note**: Storage type (S3, Azure, Swift) is controlled by `STORAGE_BUCKET_SOURCE`, not by `WRITE_TO_STORAGE`. + +### Logging Configuration + +- `LOG_FORMAT` - Set to `json` to enable structured JSON logging for Kubernetes (default: print statements) +- `LOG_LEVEL` - Set log level: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` (default: `INFO`) + +### Example Kubernetes/Helm Values + +**AWS S3 Example:** +```yaml +env: + - name: KAFKA_CLIENT_ID + valueFrom: + secretKeyRef: + name: gwtm-secrets + key: kafka-client-id + - name: KAFKA_CLIENT_SECRET + valueFrom: + secretKeyRef: + name: gwtm-secrets + key: kafka-client-secret + - name: API_TOKEN + valueFrom: + secretKeyRef: + name: gwtm-secrets + key: api-token + - name: API_BASE + value: "https://treasuremap.space/api/v0/" + - name: STORAGE_BUCKET_SOURCE + value: "s3" + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-credentials + key: access-key-id + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-credentials + key: secret-access-key + - name: AWS_DEFAULT_REGION + value: "us-east-2" + - name: AWS_BUCKET + value: "gwtreasuremap" + - name: OBSERVING_RUN + value: "O4" +``` + +**OpenStack Swift Example:** +```yaml +env: + - name: KAFKA_CLIENT_ID + valueFrom: + secretKeyRef: + name: gwtm-secrets + key: kafka-client-id + - name: KAFKA_CLIENT_SECRET + valueFrom: + secretKeyRef: + name: gwtm-secrets + key: kafka-client-secret + - name: API_TOKEN + valueFrom: + secretKeyRef: + name: gwtm-secrets + key: api-token + - name: API_BASE + value: "https://treasuremap.space/api/v0/" + - name: STORAGE_BUCKET_SOURCE + value: "swift" + - name: OS_AUTH_URL + value: "https://openstack.example.com:5000/v3" + - name: OS_USERNAME + valueFrom: + secretKeyRef: + name: openstack-credentials + key: username + - name: OS_PASSWORD + valueFrom: + secretKeyRef: + name: openstack-credentials + key: password + - name: OS_PROJECT_NAME + value: "gwtm-project" + - name: OS_USER_DOMAIN_NAME + value: "Default" + - name: OS_PROJECT_DOMAIN_NAME + value: "Default" + - name: OS_CONTAINER_NAME + value: "gwtreasuremap" + - name: OBSERVING_RUN + value: "O4" +``` + +## Deployment Architecture + +The system runs two independent listener processes: + +1. **LIGO Listener** (`docker/run_ligo_listener.py`) + - Subscribes to `igwn.gwalert` Kafka topic + - Processes gravitational wave alerts + - Generates skymaps, contours, and galaxy lists + - Posts to GWTM API + +2. **IceCube Listener** (`docker/run_icecube_listener.py`) + - Subscribes to `gcn.notices.icecube.lvk_nu_track_search` Kafka topic + - Processes neutrino coincidence notices + - Posts to GWTM API + +Both listeners run continuously and process alerts in real-time as they arrive on the Kafka stream. + +## Data Migration + +When migrating between storage backends (e.g., moving from AWS to OpenStack): + +```bash +# Dry run to see what would be migrated (includes size estimation) +python scripts/migrate_storage.py --source s3 --dest swift --container fit --dry-run + +# Actual migration with progress tracking +python scripts/migrate_storage.py --source s3 --dest swift --container fit + +# Migrate test data +python scripts/migrate_storage.py --source s3 --dest swift --container test +``` + +**Migration Features**: +- **Size Estimation**: Calculates total data size before transfer (samples files if >100) +- **Time Estimation**: Shows ETA and transfer rate during migration +- **Progress Tracking**: Real-time progress with percentage complete +- **Transfer Statistics**: Reports total size, time, and average transfer rate +- **Error Handling**: Continues on errors and reports failed files at end +- **Dry-run Mode**: Preview migration without transferring data + +**Example Output**: +``` +Scanning source (s3)... +Found 1523 files. Calculating total size... +Total size: ~4.23 GB (1523 files) +-------------------------------------------------------------------------------- + +[1/1523 - 0.1%] fit/S230518h-Preliminary.fits.gz + Size: 2.45 MB, Time: 1.2s, Rate: 2.04 MB/s, ETA: 30.5m +[2/1523 - 0.1%] fit/S230518h-contours-smooth.json (145.23 KB) ✓ +... +[1523/1523 - 100.0%] test/MS181101ab-retraction.json (8.12 KB) ✓ + +================================================================================ +Migration complete! + Successful: 1523/1523 + Total size: 4.23 GB + Total time: 28.3m + Avg rate: 2.56 MB/s +``` + +## Testing + +```bash +# Run local ingestion tests with sample alerts +python tests/listener_tests/test_local_ingest.py +python tests/icecube_tests/test_local_ingest.py +``` diff --git a/docker-compose.yml b/docker-compose.yml index 2c1a881..e88429b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,17 +1,72 @@ version: '3.8' services: - worker: + ligo-listener: build: . - command: python src/gwtm_cron/gwtm_listener/listener.py + command: python docker/run_ligo_listener.py + restart: unless-stopped environment: + # AWS S3 Configuration - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY - AWS_DEFAULT_REGION - AWS_BUCKET + # Azure Blob Storage Configuration (alternative to S3) + - AZURE_ACCOUNT_NAME + - AZURE_ACCOUNT_KEY + # OpenStack Swift Configuration (alternative to S3/Azure) + - OS_AUTH_URL + - OS_USERNAME + - OS_PASSWORD + - OS_PROJECT_NAME + - OS_USER_DOMAIN_NAME + - OS_PROJECT_DOMAIN_NAME + - OS_CONTAINER_NAME + # Storage backend: 's3' for AWS, 'abfs' for Azure, 'swift' for OpenStack + - STORAGE_BUCKET_SOURCE + # GWTM API Configuration - API_TOKEN - API_BASE - - ALERT_DOMAIN + # Kafka Configuration + - KAFKA_CLIENT_ID + - KAFKA_CLIENT_SECRET + - KAFKA_OFFSET_RESET # 'latest' (default) or 'earliest' to replay buffered messages + - KAFKA_GROUP_ID # Optional: set group ID for persistent offset tracking + # Observing Run Configuration - OBSERVING_RUN + # Galaxy Catalog Configuration (optional, for galaxy list generation) + - PATH_TO_GALAXY_CATALOG_CONFIG + + icecube-listener: + build: . + command: python docker/run_icecube_listener.py + restart: unless-stopped + environment: + # AWS S3 Configuration + - AWS_ACCESS_KEY_ID + - AWS_SECRET_ACCESS_KEY + - AWS_DEFAULT_REGION + - AWS_BUCKET + # Azure Blob Storage Configuration (alternative to S3) + - AZURE_ACCOUNT_NAME + - AZURE_ACCOUNT_KEY + # OpenStack Swift Configuration (alternative to S3/Azure) + - OS_AUTH_URL + - OS_USERNAME + - OS_PASSWORD + - OS_PROJECT_NAME + - OS_USER_DOMAIN_NAME + - OS_PROJECT_DOMAIN_NAME + - OS_CONTAINER_NAME + # Storage backend: 's3' for AWS, 'abfs' for Azure, 'swift' for OpenStack + - STORAGE_BUCKET_SOURCE + # GWTM API Configuration + - API_TOKEN + - API_BASE + # Kafka Configuration - KAFKA_CLIENT_ID - KAFKA_CLIENT_SECRET + - KAFKA_OFFSET_RESET # 'latest' (default) or 'earliest' to replay buffered messages + - KAFKA_GROUP_ID # Optional: set group ID for persistent offset tracking + # Observing Run Configuration + - OBSERVING_RUN diff --git a/docker/run_icecube_listener.py b/docker/run_icecube_listener.py new file mode 100644 index 0000000..733ce46 --- /dev/null +++ b/docker/run_icecube_listener.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +""" +Docker entrypoint for IceCube notice listener. +Runs in production mode using environment variables for configuration. + +Environment variables: + DRY_RUN: Set to 'true' or '1' to run in dry-run mode (no API calls, no storage writes) + WRITE_TO_STORAGE: Set to 'false' or '0' to disable storage writes (default: true) + VERBOSE: Set to 'false' or '0' to disable verbose logging (default: true) + KAFKA_OFFSET_RESET: Set to 'earliest' to replay buffered messages from past few days (default: 'latest') + KAFKA_GROUP_ID: Optional group ID for persistent offset tracking + +Note: Storage type (S3, Azure, Swift) is controlled by STORAGE_BUCKET_SOURCE env var. + +Replay Example: + To ingest historical events from the Kafka buffer (past ~3 days): + export KAFKA_OFFSET_RESET=earliest + docker compose up icecube-listener +""" + +import os +from gwtm_cron import gwtm_listener + + +def str_to_bool(value: str, default: bool = False) -> bool: + """Convert string environment variable to boolean""" + if not value: + return default + return value.lower() in ('true', '1', 'yes', 'on') + + +if __name__ == "__main__": + # Read configuration from environment + dry_run = str_to_bool(os.environ.get('DRY_RUN', ''), default=False) + write_to_storage = str_to_bool(os.environ.get('WRITE_TO_STORAGE', ''), default=True) + verbose = str_to_bool(os.environ.get('VERBOSE', ''), default=True) + + listener = gwtm_listener.listener.Listener( + listener_type="ICECUBE_NOTICE" + ) + listener.run(write_to_storage=write_to_storage, verbose=verbose, dry_run=dry_run) diff --git a/docker/run_ligo_listener.py b/docker/run_ligo_listener.py new file mode 100644 index 0000000..e263c1a --- /dev/null +++ b/docker/run_ligo_listener.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +""" +Docker entrypoint for LIGO alert listener. +Runs in production mode using environment variables for configuration. + +Environment variables: + DRY_RUN: Set to 'true' or '1' to run in dry-run mode (no API calls, no storage writes) + WRITE_TO_STORAGE: Set to 'false' or '0' to disable storage writes (default: true) + VERBOSE: Set to 'false' or '0' to disable verbose logging (default: true) + KAFKA_OFFSET_RESET: Set to 'earliest' to replay buffered messages from past few days (default: 'latest') + KAFKA_GROUP_ID: Optional group ID for persistent offset tracking + +Note: Storage type (S3, Azure, Swift) is controlled by STORAGE_BUCKET_SOURCE env var. + +Replay Example: + To ingest historical events from the Kafka buffer (past ~3 days): + export KAFKA_OFFSET_RESET=earliest + docker compose up ligo-listener +""" + +import os +from gwtm_cron import gwtm_listener + + +def str_to_bool(value: str, default: bool = False) -> bool: + """Convert string environment variable to boolean""" + if not value: + return default + return value.lower() in ('true', '1', 'yes', 'on') + + +if __name__ == "__main__": + # Read configuration from environment + dry_run = str_to_bool(os.environ.get('DRY_RUN', ''), default=False) + write_to_storage = str_to_bool(os.environ.get('WRITE_TO_STORAGE', ''), default=True) + verbose = str_to_bool(os.environ.get('VERBOSE', ''), default=True) + + listener = gwtm_listener.listener.Listener( + listener_type="LIGO_ALERT" + ) + listener.run(write_to_storage=write_to_storage, verbose=verbose, dry_run=dry_run) diff --git a/pyproject.toml b/pyproject.toml index 03d2584..f599743 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,19 +4,33 @@ build-backend = "hatchling.build" [project] name = "gwtm_cron" -version = "0.0.1" +version = "0.1.0" authors = [ { name="Samuel Wyatt", email="swyatt@arizona.edu" }, + { name="Steve Foale", email="sfoale@lco.global" }, ] -description = "A python gcn-kafka alert listener for the Gravitational Wave Treasure Map" +description = "Kafka-based listener system for LIGO/Virgo/KAGRA gravitational wave alerts and IceCube neutrino coincidence notices" readme = "README.md" -requires-python = ">=3.10" +requires-python = ">=3.11" +license = { text = "MIT" } +keywords = ["gravitational-waves", "astronomy", "kafka", "ligo", "icecube", "alerts"] classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Science/Research", + "Topic :: Scientific/Engineering :: Astronomy", + "Topic :: Scientific/Engineering :: Physics", "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.11", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", + "Environment :: Console", ] [project.urls] -"Homepage" = "https://github.com/pypa/sampleproject" -"Bug Tracker" = "https://github.com/pypa/sampleproject/issues" +"Homepage" = "https://github.com/TheTreasureMap/gwtm_cron" +"Repository" = "https://github.com/TheTreasureMap/gwtm_cron" +"Bug Tracker" = "https://github.com/TheTreasureMap/gwtm_cron/issues" +"Documentation" = "https://github.com/TheTreasureMap/gwtm_cron/blob/master/README.md" + +[tool.hatch.build.targets.wheel] +packages = ["src/gwtm_cron"] diff --git a/requirements.txt b/requirements.txt index 5eb3fd8..3571993 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,36 @@ -numpy -astropy_healpix -gcn_kafka -ephem -bs4 -shapely -ligo.skymap -boto3 -mocpy -lxml -werkzeug -fsspec -adlfs +# Core scientific computing +numpy>=1.24.0,<2.0.0 +astropy-healpix>=1.0.0 + +# GCN Kafka alert streaming +gcn-kafka>=0.3.0 + +# Astronomical calculations +ephem>=4.1.0 + +# Web scraping and parsing +beautifulsoup4>=4.11.0 +lxml>=4.9.0 + +# Geospatial +shapely>=2.0.0 + +# LIGO/Virgo/KAGRA skymap processing +ligo.skymap>=1.0.0 + +# Cloud storage backends +boto3>=1.26.0 # AWS S3 +s3fs>=2023.1.0 # S3 backend for fsspec +adlfs>=2023.1.0 # Azure Blob Storage +python-swiftclient>=4.0.0 # OpenStack Swift +python-keystoneclient>=5.0.0 # Required for Swift Keystone v3 auth +aiohttp>=3.8.0 # Required by s3fs and other async operations + +# Multi-cloud filesystem abstraction +fsspec>=2023.1.0 + +# MOC (Multi-Order Coverage) support +mocpy>=0.12.0 + +# Utilities +werkzeug>=2.3.0 diff --git a/scripts/migrate_storage.py b/scripts/migrate_storage.py new file mode 100755 index 0000000..2cc4d6f --- /dev/null +++ b/scripts/migrate_storage.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 +""" +Storage Migration Script for GWTM Cron + +Migrates data between different storage backends (S3, Azure, OpenStack Swift). +Useful for migrating existing data when moving to a new deployment environment. + +Usage: + python scripts/migrate_storage.py --source s3 --dest swift --container fit + python scripts/migrate_storage.py --source azure --dest swift --container test --dry-run + +Requirements: + - Source and destination credentials must be configured via environment variables or config file + - Large transfers may take significant time +""" + +import argparse +import sys +import os +import time +from pathlib import Path +from datetime import timedelta + +# Add parent directory to path to import gwtm_cron modules +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from gwtm_cron.gwtm_listener import gwstorage, gw_config + + +def format_size(bytes_size): + """Convert bytes to human-readable format.""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if bytes_size < 1024.0: + return f"{bytes_size:.2f} {unit}" + bytes_size /= 1024.0 + return f"{bytes_size:.2f} PB" + + +def format_time(seconds): + """Convert seconds to human-readable format.""" + if seconds < 60: + return f"{seconds:.1f}s" + elif seconds < 3600: + minutes = seconds / 60 + return f"{minutes:.1f}m" + else: + return str(timedelta(seconds=int(seconds))) + + +def migrate_container(source, dest, container, config, dry_run=False, verbose=True, skip_existing=True): + """ + Migrate all files from one storage backend to another. + + Args: + source: Source storage type ('s3', 'abfs', 'swift') + dest: Destination storage type ('s3', 'abfs', 'swift') + container: Container/path to migrate (e.g., 'fit', 'test') + config: Config object with credentials + dry_run: If True, only list files without transferring + verbose: Print progress messages + skip_existing: If True, skip files that already exist in destination + """ + if verbose: + print(f"{'DRY RUN: ' if dry_run else ''}Migrating container '{container}' from {source} to {dest}") + print("-" * 80) + + start_time = time.time() + + try: + # List all files in source container + if verbose: + print(f"Scanning source ({source})...") + files = gwstorage.list_gwtm_bucket(container, source=source, config=config) + + # List existing files in destination if skip_existing is enabled + existing_files = set() + if skip_existing: + if verbose: + print(f"Scanning destination ({dest})...") + try: + existing_files = set(gwstorage.list_gwtm_bucket(container, source=dest, config=config)) + if verbose: + print(f"Found {len(existing_files)} existing files in destination") + except Exception as e: + if verbose: + print(f"Warning: Could not list destination files ({e}), will transfer all files") + existing_files = set() + + # Filter out files that already exist + if skip_existing and existing_files: + files_to_migrate = [f for f in files if f not in existing_files] + skipped_count = len(files) - len(files_to_migrate) + if verbose: + print(f"Skipping {skipped_count} files that already exist in destination") + files = files_to_migrate + + # Calculate total size by sampling files + if verbose: + print(f"Found {len(files)} files. Calculating total size...") + + total_size = 0 + sample_count = 0 + + # Get sizes for all files (or sample if too many) + files_to_sample = files if len(files) <= 100 else files[::max(1, len(files) // 100)] + + for filename in files_to_sample: + try: + content = gwstorage.download_gwtm_file( + filename, + source=source, + config=config, + decode=False + ) + total_size += len(content) + sample_count += 1 + except Exception: + # Skip files that can't be accessed + pass + + # Estimate total size if we sampled + if len(files) > sample_count: + avg_size = total_size / sample_count if sample_count > 0 else 0 + estimated_total = avg_size * len(files) + is_estimate = True + else: + estimated_total = total_size + is_estimate = False + + if verbose: + size_str = format_size(estimated_total) + print(f"Total size: {'~' if is_estimate else ''}{size_str} ({len(files)} files)") + print("-" * 80) + + if dry_run: + print("\nFiles to be migrated:") + for f in files: + print(f" - {f}") + print(f"\n{'~' if is_estimate else ''}{size_str} would be transferred") + return len(files), estimated_total + + # Transfer each file + migrated = 0 + failed = [] + bytes_transferred = 0 + last_update_time = start_time + + for i, filename in enumerate(files, 1): + try: + file_start = time.time() + + if not dry_run: + # Download from source + content = gwstorage.download_gwtm_file( + filename, + source=source, + config=config, + decode=False + ) + + file_size = len(content) + bytes_transferred += file_size + + # Upload to destination + gwstorage.upload_gwtm_file( + content, + filename, + source=dest, + config=config + ) + + file_time = time.time() - file_start + + if verbose: + # Calculate progress + elapsed = time.time() - start_time + progress_pct = (i / len(files)) * 100 + avg_time_per_file = elapsed / i + remaining_files = len(files) - i + eta_seconds = remaining_files * avg_time_per_file + + # Calculate transfer rate + transfer_rate = bytes_transferred / elapsed if elapsed > 0 else 0 + + # Print progress every file, but update ETA every 5 seconds + current_time = time.time() + if current_time - last_update_time >= 5 or i == 1 or i == len(files): + print(f"[{i}/{len(files)} - {progress_pct:.1f}%] {filename}") + print(f" Size: {format_size(file_size)}, Time: {format_time(file_time)}, " + f"Rate: {format_size(transfer_rate)}/s, ETA: {format_time(eta_seconds)}") + last_update_time = current_time + else: + print(f"[{i}/{len(files)} - {progress_pct:.1f}%] {filename} ({format_size(file_size)}) ✓") + else: + if verbose: + print(f"[{i}/{len(files)}] {filename}") + + migrated += 1 + + except Exception as e: + if verbose: + print(f"[{i}/{len(files)}] {filename} ✗ Error: {e}") + failed.append((filename, str(e))) + + # Summary + elapsed_total = time.time() - start_time + + if verbose: + print("\n" + "=" * 80) + print(f"Migration complete!") + print(f" Successful: {migrated}/{len(files)}") + print(f" Total size: {format_size(bytes_transferred)}") + print(f" Total time: {format_time(elapsed_total)}") + if elapsed_total > 0: + print(f" Avg rate: {format_size(bytes_transferred / elapsed_total)}/s") + if failed: + print(f" Failed: {len(failed)}") + print("\nFailed files:") + for fname, error in failed: + print(f" - {fname}: {error}") + + return migrated, bytes_transferred + + except Exception as e: + print(f"Error during migration: {e}") + return 0, 0 + + +def main(): + parser = argparse.ArgumentParser( + description="Migrate GWTM storage data between backends", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Migrate from S3 to OpenStack Swift (dry run) + python scripts/migrate_storage.py --source s3 --dest swift --container fit --dry-run + + # Migrate from Azure to OpenStack Swift (actual migration) + python scripts/migrate_storage.py --source abfs --dest swift --container fit + + # Migrate specific container with config file + python scripts/migrate_storage.py --source s3 --dest swift --container test --config /path/to/config.json + +Storage Backend Options: + s3 - AWS S3 + abfs - Azure Blob Storage + swift - OpenStack Swift + +Environment Variables Required: + Source S3: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_BUCKET + Source Azure: AZURE_ACCOUNT_NAME, AZURE_ACCOUNT_KEY + Source Swift: OS_AUTH_URL, OS_USERNAME, OS_PASSWORD, OS_PROJECT_NAME, OS_CONTAINER_NAME + + Dest S3: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_BUCKET + Dest Azure: AZURE_ACCOUNT_NAME, AZURE_ACCOUNT_KEY + Dest Swift: OS_AUTH_URL, OS_USERNAME, OS_PASSWORD, OS_PROJECT_NAME, OS_CONTAINER_NAME + """ + ) + + parser.add_argument("--source", required=True, choices=["s3", "abfs", "swift"], + help="Source storage backend") + parser.add_argument("--dest", required=True, choices=["s3", "abfs", "swift"], + help="Destination storage backend") + parser.add_argument("--container", required=True, + help="Container/directory to migrate (e.g., 'fit', 'test')") + parser.add_argument("--config", type=str, + help="Path to JSON config file (optional, uses env vars if not provided)") + parser.add_argument("--dry-run", action="store_true", + help="List files without actually migrating them") + parser.add_argument("--quiet", action="store_true", + help="Suppress progress output") + parser.add_argument("--force", action="store_true", + help="Re-transfer all files, even if they already exist in destination") + + args = parser.parse_args() + + # Validation + if args.source == args.dest: + print("Error: Source and destination must be different") + sys.exit(1) + + # Load configuration + config = gw_config.Config(path_to_config=args.config) + + # Validate credentials are available + if args.source == "s3" or args.dest == "s3": + if not config.AWS_ACCESS_KEY_ID or not config.AWS_BUCKET: + print("Error: AWS credentials not configured") + sys.exit(1) + + if args.source == "abfs" or args.dest == "abfs": + if not config.AZURE_ACCOUNT_NAME: + print("Error: Azure credentials not configured") + sys.exit(1) + + if args.source == "swift" or args.dest == "swift": + if not config.OS_AUTH_URL or not config.OS_USERNAME: + print("Error: OpenStack Swift credentials not configured") + sys.exit(1) + + # Perform migration + try: + migrated, bytes_transferred = migrate_container( + source=args.source, + dest=args.dest, + container=args.container, + config=config, + dry_run=args.dry_run, + verbose=not args.quiet, + skip_existing=not args.force + ) + + if args.dry_run and not args.quiet: + print(f"\nDry run complete. {migrated} files ({format_size(bytes_transferred)}) would be migrated.") + print("Run without --dry-run to perform actual migration.") + + sys.exit(0) + + except KeyboardInterrupt: + print("\n\nMigration interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\nMigration failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/selective_ingest.py b/selective_ingest.py new file mode 100755 index 0000000..f406782 --- /dev/null +++ b/selective_ingest.py @@ -0,0 +1,567 @@ +#!/usr/bin/env python3 +""" +Selective Alert Ingestion Tool + +This script helps you catch up on alerts selectively by: +1. Listing all available alerts in the Kafka stream (without ingesting) +2. Ingesting only alerts matching a specific filter string from Kafka +3. Downloading and ingesting specific alerts directly from GraceDB URLs + +Usage: + # List mode: Show all available alerts without ingestion + python selective_ingest.py --list --max-alerts 50 + + # Ingest mode: Process only alerts matching a filter string from Kafka + python selective_ingest.py --ingest --filter "S230" --write --no-dry-run + + # GraceDB mode: Download specific alerts directly from GraceDB + python selective_ingest.py --gracedb --ids S251010al S251011x --write --no-dry-run + + # Test mode: Show what would be ingested (dry run) + python selective_ingest.py --ingest --filter "S230" +""" + +import sys +import os +import json +import argparse +import requests +from datetime import datetime + +# Add src to path +sys.path.insert(0, 'src/') + +from gcn_kafka import Consumer +from gwtm_cron.gwtm_listener import listener, gw_config + +def list_alerts(config_path, listener_type, max_alerts=50, timeout_seconds=60, debug=False): + """ + List available alerts without ingesting them. + + Args: + config_path: Path to listener config JSON + listener_type: Type of listener (LIGO_ALERT or ICECUBE_NOTICE) + max_alerts: Maximum number of alerts to list + timeout_seconds: How long to wait for alerts + """ + print("="*80) + print(f"LISTING MODE - {listener_type}") + print("="*80) + print(f"Connecting to Kafka stream: {listener.LISTENER_TYPES[listener_type]['domain']}") + print(f"Will list up to {max_alerts} alerts (timeout: {timeout_seconds}s)") + print(f"Press Ctrl+C to stop early\n") + + config = gw_config.Config(path_to_config=config_path) + + # Build consumer config with offset reset + consumer_config = {'broker.address.family': 'v4'} + offset_reset = os.environ.get('KAFKA_OFFSET_RESET', 'latest') + consumer_config['auto.offset.reset'] = offset_reset + + print(f"Kafka offset reset: {offset_reset}") + + consumer = Consumer( + config=consumer_config, + client_id=config.KAFKA_CLIENT_ID, + client_secret=config.KAFKA_CLIENT_SECRET + ) + + consumer.subscribe([listener.LISTENER_TYPES[listener_type]["domain"]]) + + alerts_found = [] + last_activity_time = datetime.now() + + try: + while len(alerts_found) < max_alerts: + # Timeout based on inactivity (no alerts received), not total runtime + elapsed_since_activity = (datetime.now() - last_activity_time).total_seconds() + if elapsed_since_activity > timeout_seconds: + print(f"\nInactivity timeout reached ({timeout_seconds}s with no new alerts)") + break + + for message in consumer.consume(timeout=1): + # Reset activity timer when we receive an alert + last_activity_time = datetime.now() + try: + alert_data = json.loads(message.value()) + + # Debug: print raw JSON structure + if debug and len(alerts_found) < 3: + print(f"\n{'='*60}") + print("DEBUG: Raw alert data:") + print(json.dumps(alert_data, indent=2)) + print(f"{'='*60}\n") + + # Extract key fields based on listener type + if listener_type == "LIGO_ALERT": + alert_id = alert_data.get('superevent_id', 'UNKNOWN') + alert_type = alert_data.get('alert_type', 'UNKNOWN') + time_created = alert_data.get('time_created', 'UNKNOWN') + event_time = alert_data.get('event', {}).get('time', 'UNKNOWN') + elif listener_type == "ICECUBE_NOTICE": + # IceCube notices have different structure + alert_id = alert_data.get('ref_ID', 'UNKNOWN') + # Try to get LVK reference alert ID + if alert_id == 'UNKNOWN' and 'reference' in alert_data: + ref = alert_data.get('reference', {}) + alert_id = ref.get('gcn.notices.LVK.alert', 'UNKNOWN') + alert_type = f"pval_gen={alert_data.get('pval_generic', 'N/A')}" + time_created = alert_data.get('alert_datetime', 'UNKNOWN') + event_time = time_created + + alert_info = { + 'id': alert_id, + 'type': alert_type, + 'created': time_created, + 'event_time': event_time + } + + alerts_found.append(alert_info) + + # Print alert info + print(f"{len(alerts_found):3d}. {alert_id:20s} | {alert_type:15s} | {time_created}") + + if len(alerts_found) >= max_alerts: + break + + except json.JSONDecodeError: + print(f"Warning: Could not parse alert as JSON") + except Exception as e: + print(f"Warning: Error processing alert: {e}") + + except KeyboardInterrupt: + print("\n\nStopped by user") + + print("\n" + "="*80) + print(f"Found {len(alerts_found)} alerts") + print("="*80) + + if alerts_found: + print("\nUnique alert IDs:") + unique_ids = sorted(set(a['id'] for a in alerts_found)) + for alert_id in unique_ids: + count = sum(1 for a in alerts_found if a['id'] == alert_id) + print(f" {alert_id} (x{count})") + + return alerts_found + + +def ingest_filtered(config_path, listener_type, filter_string, write_to_s3=False, + dry_run=True, max_alerts=100, timeout_seconds=60, exclude_test=False): + """ + Ingest only alerts matching the filter string. + + Args: + config_path: Path to listener config JSON + listener_type: Type of listener (LIGO_ALERT or ICECUBE_NOTICE) + filter_string: Only ingest alerts containing this string in their ID + write_to_s3: Whether to write to S3 storage + dry_run: If True, don't post to API + exclude_test: If True, exclude test alerts (MS alerts) + max_alerts: Maximum number of alerts to process + timeout_seconds: How long to wait for alerts + """ + print("="*80) + print(f"INGESTION MODE - {listener_type}") + print("="*80) + print(f"Filter: Only alerts containing '{filter_string}'") + if exclude_test: + print(f"Excluding test alerts (MS*)") + print(f"Write to S3: {write_to_s3}") + print(f"Dry run: {dry_run} (API posting {'DISABLED' if dry_run else 'ENABLED'})") + print(f"Max alerts: {max_alerts} (timeout: {timeout_seconds}s)") + print(f"Press Ctrl+C to stop\n") + + # Create listener + l = listener.Listener(listener_type=listener_type, config_path=config_path) + + alerts_processed = 0 + alerts_skipped = 0 + alerts_examined = 0 + last_activity_time = datetime.now() + + try: + while alerts_processed < max_alerts: + # Timeout based on inactivity (no alerts received), not total runtime + elapsed_since_activity = (datetime.now() - last_activity_time).total_seconds() + if elapsed_since_activity > timeout_seconds: + print(f"\nInactivity timeout reached ({timeout_seconds}s with no new alerts)") + break + + for message in l.consumer.consume(timeout=1): + # Reset activity timer when we receive an alert + last_activity_time = datetime.now() + alerts_examined += 1 + try: + alert_data = json.loads(message.value()) + + # Get alert ID based on listener type + if listener_type == "LIGO_ALERT": + alert_id = alert_data.get('superevent_id', '') + alert_type = alert_data.get('alert_type', 'UNKNOWN') + elif listener_type == "ICECUBE_NOTICE": + # IceCube notices have different structure + alert_id = alert_data.get('ref_ID', '') + # Try to get LVK reference alert ID + if not alert_id and 'reference' in alert_data: + ref = alert_data.get('reference', {}) + alert_id = ref.get('gcn.notices.LVK.alert', '') + alert_type = f"pval_gen={alert_data.get('pval_generic', 'N/A')}" + + # Check if alert should be excluded (test alerts) + is_test_alert = alert_id.startswith('MS') + if exclude_test and is_test_alert: + alerts_skipped += 1 + continue + + # Check if alert matches filter + if filter_string.lower() in alert_id.lower(): + print(f"\n{'='*60}") + print(f"PROCESSING: {alert_id} ({alert_type})") + print(f"{'='*60}") + + # Process the alert + alert, ext_alert = l._listen( + alert=message.value(), + write_to_s3=write_to_s3, + verbose=True, + dry_run=dry_run + ) + + print(f"\nResult:") + print(alert) + if ext_alert: + print("\nExternal Alert:") + print(ext_alert) + + alerts_processed += 1 + print(f"\n✓ Processed {alerts_processed}/{max_alerts}") + + if alerts_processed >= max_alerts: + break + else: + alerts_skipped += 1 + # Show progress every 10 skipped alerts + if alerts_skipped % 10 == 0: + print(f"Examined {alerts_examined} alerts, processed {alerts_processed}, skipped {alerts_skipped}...", end='\r') + + except json.JSONDecodeError: + print(f"Warning: Could not parse alert as JSON") + except Exception as e: + print(f"\n❌ ERROR processing alert: {e}") + import traceback + traceback.print_exc() + + except KeyboardInterrupt: + print("\n\nStopped by user") + + print("\n" + "="*80) + print(f"SUMMARY") + print("="*80) + print(f"Total alerts examined: {alerts_examined}") + print(f"Alerts matching filter: {alerts_processed}") + print(f"Alerts skipped: {alerts_skipped}") + print(f"Mode: {'DRY RUN (no API calls)' if dry_run else 'LIVE (API calls made)'}") + print("="*80) + + +def discover_alert_files(superevent_id): + """ + Discover all available alert JSON files for a superevent from GraceDB. + + Args: + superevent_id: The superevent ID (e.g., 'S251010al') + + Returns: + List of alert filenames (e.g., ['S251017at-preliminary.json', 'S251017at-update.json']) + """ + files_url = f"https://gracedb.ligo.org/api/superevents/{superevent_id}/files/" + + try: + response = requests.get(files_url) + if response.status_code != 200: + print(f"⚠️ Could not list files for {superevent_id} (HTTP {response.status_code})") + return [] + + # Parse the JSON response - GraceDB returns a dict where keys are filenames + files_data = response.json() + + # Filter for alert JSON files (common alert type names) + alert_types = ['earlywarning', 'preliminary', 'initial', 'update', 'retraction'] + alert_files = [] + + for filename in files_data.keys(): + # Check if it's a JSON file + if not filename.endswith('.json'): + continue + + # Extract the alert type from filename (format: S251017at-preliminary.json) + # Remove superevent ID prefix and .json suffix + basename = filename.lower() + for alert_type in alert_types: + if f"-{alert_type}.json" in basename: + alert_files.append(filename) + break + + return sorted(alert_files) + + except requests.RequestException as e: + print(f"⚠️ Network error listing files for {superevent_id}: {e}") + return [] + except (json.JSONDecodeError, KeyError, TypeError) as e: + print(f"⚠️ Could not parse file list for {superevent_id}: {e}") + return [] + + +def ingest_from_gracedb(config_path, superevent_ids, alert_types=None, + write_to_s3=False, dry_run=True, listener_type="LIGO_ALERT"): + """ + Download and ingest alerts directly from GraceDB URLs. + + Args: + config_path: Path to listener config JSON + superevent_ids: List of superevent IDs (e.g., ['S251010al', 'S251011x']) + alert_types: List of alert types to download (e.g., ['preliminary', 'update']) + If None, automatically discovers all available alert files + write_to_s3: Whether to write to S3 storage + dry_run: If True, don't post to API + listener_type: Type of listener (LIGO_ALERT or ICECUBE_NOTICE) + """ + print("="*80) + print(f"GRACEDB DOWNLOAD MODE - {listener_type}") + print("="*80) + print(f"Superevent IDs: {', '.join(superevent_ids)}") + if alert_types: + print(f"Alert types (specified): {', '.join(alert_types)}") + else: + print(f"Alert types: Auto-discover all available") + print(f"Write to S3: {write_to_s3}") + print(f"Dry run: {dry_run} (API posting {'DISABLED' if dry_run else 'ENABLED'})") + print() + + # Create listener + l = listener.Listener(listener_type=listener_type, config_path=config_path) + + alerts_processed = 0 + alerts_failed = 0 + alerts_skipped = 0 + + for superevent_id in superevent_ids: + print(f"\n{'='*60}") + print(f"Processing superevent: {superevent_id}") + print(f"{'='*60}") + + # If alert_types not specified, discover what's available + if alert_types is None: + print("Discovering available alert files...") + alert_files = discover_alert_files(superevent_id) + if not alert_files: + print(f"⚠️ No alert files found for {superevent_id}") + continue + print(f"Found {len(alert_files)} alert file(s): {', '.join(alert_files)}") + else: + # Use specified alert types - construct full filenames + alert_files = [f"{superevent_id}-{atype}.json" for atype in alert_types] + + # Download and process each alert file + for filename in alert_files: + url = f"https://gracedb.ligo.org/api/superevents/{superevent_id}/files/{filename}" + + print(f"\n → Downloading: {filename}") + print(f" URL: {url}") + + try: + # Download the alert JSON + response = requests.get(url) + + if response.status_code == 404: + print(f" ⚠️ Not found - skipping") + alerts_skipped += 1 + continue + elif response.status_code != 200: + print(f" ❌ HTTP {response.status_code} - skipping") + alerts_failed += 1 + continue + + # Parse to verify it's valid JSON + alert_json = response.text + alert_data = json.loads(alert_json) + + print(f" ✓ Downloaded {len(alert_json)} bytes") + print(f" Alert type: {alert_data.get('alert_type', 'UNKNOWN')}") + print(f" Time created: {alert_data.get('time_created', 'UNKNOWN')}") + + # Process the alert through the listener + alert, ext_alert = l._listen( + alert=alert_json, + write_to_s3=write_to_s3, + verbose=True, + dry_run=dry_run + ) + + print(f"\n Result:") + print(f" {alert}") + if ext_alert: + print(f"\n External Alert:") + print(f" {ext_alert}") + + alerts_processed += 1 + print(f"\n ✓ Processed successfully") + + except requests.RequestException as e: + print(f" ❌ Network error: {e}") + alerts_failed += 1 + except json.JSONDecodeError as e: + print(f" ❌ Invalid JSON: {e}") + alerts_failed += 1 + except Exception as e: + print(f" ❌ Processing error: {e}") + import traceback + traceback.print_exc() + alerts_failed += 1 + + print("\n" + "="*80) + print(f"SUMMARY") + print("="*80) + print(f"Alerts processed: {alerts_processed}") + print(f"Alerts skipped: {alerts_skipped}") + print(f"Alerts failed: {alerts_failed}") + print(f"Mode: {'DRY RUN (no API calls)' if dry_run else 'LIVE (API calls made)'}") + print("="*80) + + +def main(): + parser = argparse.ArgumentParser( + description='Selective alert ingestion tool for GWTM', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # List available alerts (first 50) + %(prog)s --list --max 50 + + # List IceCube alerts + %(prog)s --list --type ICECUBE_NOTICE + + # Test what would be ingested (dry run) + %(prog)s --ingest --filter "S230" + + # Actually ingest alerts containing "S230", excluding test alerts + %(prog)s --ingest --filter "S230" --exclude-test --no-dry-run + + # Ingest and write to S3 + %(prog)s --ingest --filter "S230" --write --no-dry-run + + # Ingest all S25* alerts except test alerts (MS*) + %(prog)s --ingest --filter "S25" --exclude-test --write --no-dry-run + + # Download specific alerts from GraceDB (dry run) + %(prog)s --gracedb --ids S251010al S251011x + + # Download and ingest from GraceDB with specific alert types + %(prog)s --gracedb --ids S251010al --alert-types preliminary update --no-dry-run --write + + # Download all alert types for multiple events + %(prog)s --gracedb --ids S251010al S251011x --alert-types preliminary initial update retraction --write --no-dry-run + """ + ) + + # Mode selection + mode_group = parser.add_mutually_exclusive_group(required=True) + mode_group.add_argument('--list', action='store_true', + help='List mode: show available alerts without ingesting') + mode_group.add_argument('--ingest', action='store_true', + help='Ingest mode: process alerts matching filter') + mode_group.add_argument('--gracedb', action='store_true', + help='GraceDB mode: download alerts directly from GraceDB URLs') + + # Common options + parser.add_argument('--config', type=str, default='./listener_config.json', + help='Path to listener config file (default: ./listener_config.json)') + parser.add_argument('--type', type=str, default='LIGO_ALERT', + choices=['LIGO_ALERT', 'ICECUBE_NOTICE'], + help='Listener type (default: LIGO_ALERT)') + parser.add_argument('--max', type=int, default=50, + help='Maximum number of alerts to process (default: 50)') + parser.add_argument('--timeout', type=int, default=60, + help='Timeout in seconds (default: 60)') + parser.add_argument('--debug', action='store_true', + help='Show raw JSON for first few alerts (debug mode)') + parser.add_argument('--exclude-test', action='store_true', + help='Exclude test alerts (MS* alerts)') + + # Ingest mode options + parser.add_argument('--filter', type=str, + help='[INGEST] Only process alerts containing this string') + parser.add_argument('--write', action='store_true', + help='[INGEST/GRACEDB] Write to S3 storage') + parser.add_argument('--no-dry-run', action='store_true', + help='[INGEST/GRACEDB] Actually post to API (default is dry run)') + + # GraceDB mode options + parser.add_argument('--ids', type=str, nargs='+', + help='[GRACEDB] Superevent IDs to download (e.g., S251010al S251011x)') + parser.add_argument('--alert-types', type=str, nargs='+', + help='[GRACEDB] Alert types to download (e.g., preliminary update). Default: preliminary initial update') + + args = parser.parse_args() + + # Validate config file exists + if not os.path.exists(args.config): + print(f"❌ ERROR: Config file not found: {args.config}") + print(f"\nPlease ensure your listener config exists at:") + print(f" {os.path.abspath(args.config)}") + return 1 + + try: + if args.list: + # List mode + list_alerts( + config_path=args.config, + listener_type=args.type, + max_alerts=args.max, + timeout_seconds=args.timeout, + debug=args.debug + ) + elif args.ingest: + # Ingest mode + if not args.filter: + print("❌ ERROR: --filter is required for ingest mode") + return 1 + + ingest_filtered( + config_path=args.config, + listener_type=args.type, + filter_string=args.filter, + write_to_s3=args.write, + dry_run=not args.no_dry_run, + max_alerts=args.max, + timeout_seconds=args.timeout, + exclude_test=args.exclude_test + ) + elif args.gracedb: + # GraceDB mode + if not args.ids: + print("❌ ERROR: --ids is required for gracedb mode") + print("Example: --gracedb --ids S251010al S251011x") + return 1 + + ingest_from_gracedb( + config_path=args.config, + superevent_ids=args.ids, + alert_types=args.alert_types, + write_to_s3=args.write, + dry_run=not args.no_dry_run, + listener_type=args.type + ) + + return 0 + + except Exception as e: + print(f"\n❌ ERROR: {e}") + import traceback + traceback.print_exc() + return 1 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/gwtm_cron/gwtm_listener/gw_config.py b/src/gwtm_cron/gwtm_listener/gw_config.py index 384a517..c3fce23 100644 --- a/src/gwtm_cron/gwtm_listener/gw_config.py +++ b/src/gwtm_cron/gwtm_listener/gw_config.py @@ -4,7 +4,6 @@ class Config(object): def __init__(self, path_to_config=None): - print(path_to_config) if path_to_config and os.path.exists(path_to_config): fi = open(path_to_config) @@ -15,10 +14,20 @@ def __init__(self, path_to_config=None): self.AWS_DEFAULT_REGION = data["AWS_DEFAULT_REGION"] if "AWS_DEFAULT_REGION" in data.keys() else "us-east-2" self.AWS_BUCKET = data["AWS_BUCKET"] if "AWS_BUCKET" in data.keys() else "gwtreasuremap" self.AZURE_ACCOUNT_NAME = data["AZURE_ACCOUNT_NAME"] if "AZURE_ACCOUNT_NAME" in data.keys() else "" - self.AZURE_ACCOUNT_KEY = data["AZURE_ACCOUNT_KEY"] if "AZURE_ACCOUNT_KEY" in data.keys() else "" + self.AZURE_ACCOUNT_KEY = data["AZURE_ACCOUNT_KEY"] if "AZURE_ACCOUNT_KEY" in data.keys() else "" + self.OS_AUTH_URL = data["OS_AUTH_URL"] if "OS_AUTH_URL" in data.keys() else "" + self.OS_USERNAME = data["OS_USERNAME"] if "OS_USERNAME" in data.keys() else "" + self.OS_PASSWORD = data["OS_PASSWORD"] if "OS_PASSWORD" in data.keys() else "" + self.OS_PROJECT_NAME = data["OS_PROJECT_NAME"] if "OS_PROJECT_NAME" in data.keys() else "" + self.OS_USER_DOMAIN_NAME = data["OS_USER_DOMAIN_NAME"] if "OS_USER_DOMAIN_NAME" in data.keys() else "Default" + self.OS_PROJECT_DOMAIN_NAME = data["OS_PROJECT_DOMAIN_NAME"] if "OS_PROJECT_DOMAIN_NAME" in data.keys() else "Default" + self.OS_CONTAINER_NAME = data["OS_CONTAINER_NAME"] if "OS_CONTAINER_NAME" in data.keys() else "gwtreasuremap" + self.OS_STORAGE_URL = data["OS_STORAGE_URL"] if "OS_STORAGE_URL" in data.keys() else "" self.STORAGE_BUCKET_SOURCE = data["STORAGE_BUCKET_SOURCE"] if "STORAGE_BUCKET_SOURCE" in data.keys() else "abfs" self.API_TOKEN = data["API_TOKEN"] if "API_TOKEN" in data.keys() else "" - self.API_BASE = data["API_BASE"] if "API_BASE" in data.keys() else "http://127.0.0.1:5000/api/v0/" + # Default to Kubernetes service if KUBERNETES_SERVICE_HOST is set, otherwise localhost + default_api_base = "http://gwtm-api:5000/api/v0/" if os.environ.get('KUBERNETES_SERVICE_HOST') else "http://127.0.0.1:5000/api/v0/" + self.API_BASE = data["API_BASE"] if "API_BASE" in data.keys() else default_api_base self.ALERT_DOMAIN = data["ALERT_DOMAIN"] if "ALERT_DOMAIN" in data.keys() else "igwn.gwalert" self.OBSERVING_RUN = data["OBSERVING_RUN"] if "OBSERVING_RUN" in data.keys() else "O4" self.KAFKA_CLIENT_ID = data["KAFKA_CLIENT_ID"] if "KAFKA_CLIENT_ID" in data.keys() else "" @@ -31,9 +40,19 @@ def __init__(self, path_to_config=None): self.AWS_BUCKET = os.environ.get('AWS_BUCKET', 'gwtreasuremap') self.AZURE_ACCOUNT_NAME = os.environ.get('AZURE_ACCOUNT_NAME', '') self.AZURE_ACCOUNT_KEY = os.environ.get('AZURE_ACCOUNT_KEY', '') + self.OS_AUTH_URL = os.environ.get('OS_AUTH_URL', '') + self.OS_USERNAME = os.environ.get('OS_USERNAME', '') + self.OS_PASSWORD = os.environ.get('OS_PASSWORD', '') + self.OS_PROJECT_NAME = os.environ.get('OS_PROJECT_NAME', '') + self.OS_USER_DOMAIN_NAME = os.environ.get('OS_USER_DOMAIN_NAME', 'Default') + self.OS_PROJECT_DOMAIN_NAME = os.environ.get('OS_PROJECT_DOMAIN_NAME', 'Default') + self.OS_CONTAINER_NAME = os.environ.get('OS_CONTAINER_NAME', 'gwtreasuremap') + self.OS_STORAGE_URL = os.environ.get('OS_STORAGE_URL', '') self.STORAGE_BUCKET_SOURCE = os.environ.get('STORAGE_BUCKET_SOURCE', 'abfs') self.API_TOKEN = os.environ.get('API_TOKEN', '') - self.API_BASE = os.environ.get('API_BASE', 'http://127.0.0.1:5000/api/v0/') + # Default to Kubernetes service if KUBERNETES_SERVICE_HOST is set, otherwise localhost + default_api_base = "http://gwtm-api:5000/api/v0/" if os.environ.get('KUBERNETES_SERVICE_HOST') else "http://127.0.0.1:5000/api/v0/" + self.API_BASE = os.environ.get('API_BASE', default_api_base) self.ALERT_DOMAIN = os.environ.get('ALERT_DOMAIN', 'igwn.gwalert') self.OBSERVING_RUN = os.environ.get('OBSERVING_RUN', 'O4') self.KAFKA_CLIENT_ID = os.environ.get('KAFKA_CLIENT_ID', '') diff --git a/src/gwtm_cron/gwtm_listener/gw_function.py b/src/gwtm_cron/gwtm_listener/gw_function.py index 46c8841..15fa1a1 100644 --- a/src/gwtm_cron/gwtm_listener/gw_function.py +++ b/src/gwtm_cron/gwtm_listener/gw_function.py @@ -28,12 +28,14 @@ def query_gwtm_alerts(graceid, alert_type, config: config.Config): base = config.API_BASE target = "query_alerts" + headers = { + "api_token": config.API_TOKEN + } params = { "graceid" : graceid, - "alert_type" : alert_type, - "api_token" : config.API_TOKEN + "alert_type" : alert_type } - r = requests.get(f"{base}{target}", json=params) + r = requests.get(f"{base}{target}", headers=headers, params=params) if r.status_code == 200: return json.loads(r.text) else: @@ -43,12 +45,13 @@ def query_gwtm_alerts(graceid, alert_type, config: config.Config): def post_gwtm_alert(gwa, config: config.Config): base = config.API_BASE target = "post_alert" + headers = { + "Content-Type": "application/json", + "api_token": config.API_TOKEN + } params = gwa - params.update({ - 'api_token' : config.API_TOKEN - }) - - r = requests.post(f"{base}{target}", json=params) + + r = requests.post(f"{base}{target}", headers=headers, json=params) if r.status_code == 200: return json.loads(r.text) else: @@ -58,12 +61,13 @@ def post_gwtm_alert(gwa, config: config.Config): def post_galaxy_list(galaxies,config: config.Config): base = config.API_BASE target = "event_galaxies" + headers = { + "Content-Type": "application/json", + "api_token": config.API_TOKEN + } params = galaxies - params.update({ - 'api_token' : config.API_TOKEN - }) - - r = requests.post(f"{base}{target}", json=params) + + r = requests.post(f"{base}{target}", headers=headers, json=params) print("INFO: Successfully posted galaxy list") if r.status_code == 200: return @@ -74,15 +78,15 @@ def post_galaxy_list(galaxies,config: config.Config): def delete_galaxy_list(galaxies,config: config.Config): base = config.API_BASE target = "event_galaxies" + headers = { + "api_token": config.API_TOKEN + } params = { 'groupname' : galaxies['groupname'], 'graceid' : galaxies['graceid'], } - params.update({ - 'api_token' : config.API_TOKEN - }) - r_get = requests.get(f"{base}{target}", json=params) + r_get = requests.get(f"{base}{target}", headers=headers, params=params) if r_get.status_code == 200: gal_list = json.loads(r_get.text) @@ -91,11 +95,10 @@ def delete_galaxy_list(galaxies,config: config.Config): else: target_remove = 'remove_event_galaxies' del_params = { - 'listid':gal_list[0]['listid'], - 'api_token':config.API_TOKEN + 'listid':gal_list[0]['listid'] } - r_post = requests.post(f"{base}{target_remove}", json=del_params) + r_post = requests.post(f"{base}{target_remove}", headers=headers, json=del_params) print("INFO: Successfully deleted galaxy list") if r_post.status_code == 200: @@ -110,13 +113,21 @@ def delete_galaxy_list(galaxies,config: config.Config): def post_icecube_notice(notice, events, config: config.Config): base = config.API_BASE target = "post_icecube_notice" + headers = { + "Content-Type": "application/json", + "api_token": config.API_TOKEN + } params = { - "api_token" : config.API_TOKEN, - "icecube_notice" : notice, - "icecube_notice_coinc_events" : events + "notice_data": notice, + "events_data": events } - r = requests.post(f"{base}{target}", json=params) + print(f"DEBUG: Posting to {base}{target}") + print(f"DEBUG: notice_data keys: {list(notice.keys())}") + print(f"DEBUG: events_data length: {len(events)}") + print(f"DEBUG: Full params: {json.dumps(params, indent=2)}") + + r = requests.post(f"{base}{target}", headers=headers, json=params) if r.status_code == 200: return json.loads(r.text) else: @@ -126,10 +137,12 @@ def post_icecube_notice(notice, events, config: config.Config): def del_test_alerts(config: config.Config): base = config.API_BASE target = "del_test_alerts" - params = { - 'api_token':config.API_TOKEN + headers = { + "Content-Type": "application/json", + "api_token": config.API_TOKEN } - r = requests.post(f"{base}{target}", json=params) + params = {} + r = requests.post(f"{base}{target}", headers=headers, json=params) if r.status_code == 200: return r else: diff --git a/src/gwtm_cron/gwtm_listener/gw_io.py b/src/gwtm_cron/gwtm_listener/gw_io.py index b98b5ca..7bcec67 100644 --- a/src/gwtm_cron/gwtm_listener/gw_io.py +++ b/src/gwtm_cron/gwtm_listener/gw_io.py @@ -24,15 +24,15 @@ import gwstorage # type: ignore class Writer(): - + def __init__( self, alert, s3path: str, - write_to_s3 = True + write_to_storage = True ): - self.write_to_s3 = write_to_s3 + self.write_to_storage = write_to_storage self.s3path = s3path if isinstance(alert, bytes): @@ -43,7 +43,7 @@ def __init__( self.path_info = None self.gwalert_dict: dict = {} - if not write_to_s3: + if not write_to_storage: paths = [ 'saved_alerts', 'skymaps', @@ -91,9 +91,9 @@ def _write_skymap(self, config: config.Config, verbose=False): print(f"Bad skymap URL! {self.gwalert_dict['skymap_fits_url']} Gracedb might be bogged") return - if self.write_to_s3: + if self.write_to_storage: if verbose: - print('Writing skymap.fits.gz to s3') + print('Writing skymap.fits.gz to storage') downloadpath = '{}/{}.fits.gz'.format(self.s3path, self.path_info) gwstorage.upload_gwtm_file(r.content, downloadpath, source=config.STORAGE_BUCKET_SOURCE, config=config) else: @@ -106,9 +106,9 @@ def _write_skymap(self, config: config.Config, verbose=False): def _write_skymap_moc(self, config: config.Config, verbose=False): - if self.write_to_s3: + if self.write_to_storage: if verbose: - print('Writing skymap_moc.fits.gz to s3') + print('Writing skymap_moc.fits.gz to storage') downloadpath = '{}/{}_moc.fits.gz'.format(self.s3path, self.path_info) gwstorage.upload_gwtm_file(self.skymap, downloadpath, source=config.STORAGE_BUCKET_SOURCE, config=config) else: @@ -153,9 +153,9 @@ def _write_contours(self, config: config.Config, verbose=False): ] }) - if self.write_to_s3: + if self.write_to_storage: if verbose: - print('Writing contours to s3') + print('Writing contours to storage') contour_download_path = '{}/{}-contours-smooth.json'.format(self.s3path, self.path_info) gwstorage.upload_gwtm_file(contours_json.encode(), contour_download_path, config.STORAGE_BUCKET_SOURCE, config) @@ -186,9 +186,9 @@ def _write_fermi(self, config: config.Config, verbose=False): print("Error in Fermi MOC creation") return - if self.write_to_s3: + if self.write_to_storage: if verbose: - print('Writing Fermi contour to s3') + print('Writing Fermi contour to storage') fermi_moc_upload_path = '{}/{}-Fermi.json'.format(self.s3path, self.gwalert_dict["graceid"]) dir_contents = gwstorage.list_gwtm_bucket(self.s3path, config.STORAGE_BUCKET_SOURCE, config) if fermi_moc_upload_path not in dir_contents: @@ -225,15 +225,15 @@ def _write_LAT(self, config: config.Config, verbose=False): print("Error in LAT creation") return - if self.write_to_s3: + if self.write_to_storage: if verbose: - print('Writing LAT contour to s3') + print('Writing LAT contour to storage') lat_moc_upload_path = '{}/{}-LAT.json'.format(self.s3path, self.gwalert_dict["graceid"]) dir_contents = gwstorage.list_gwtm_bucket(self.s3path, config.STORAGE_BUCKET_SOURCE, config) if lat_moc_upload_path not in dir_contents: gwstorage.upload_gwtm_file(moc_string.encode(), lat_moc_upload_path, config.STORAGE_BUCKET_SOURCE, config) - + else: local_write_file = os.path.join(os.getcwd(), "contours", f"{self.gwalert_dict['graceid']}-LAT.json") if os.path.exists(local_write_file): @@ -250,13 +250,13 @@ def _write_LAT(self, config: config.Config, verbose=False): def write_alert_json(self, config: config.Config, verbose=False): ''' - function that writes the alert json to s3 or local directory + function that writes the alert json to storage or local directory ''' - if self.write_to_s3: + if self.write_to_storage: if verbose: - print("Writing alert json to s3") + print("Writing alert json to storage") alert_upload_path = os.path.join(self.s3path, f"{self.path_info}_alert.json") dir_contents = gwstorage.list_gwtm_bucket(self.s3path, config.STORAGE_BUCKET_SOURCE, config) @@ -272,16 +272,16 @@ def write_alert_json(self, config: config.Config, verbose=False): class Reader(): - def __init__(self, read_from_s3=True): - self.read_from_s3 = read_from_s3 + def __init__(self, read_from_storage=True): + self.read_from_storage = read_from_storage def read_alert_json(self, alert_path_name, config: config.Config, verbose=False): ''' - function that reads the alert json or local directory + function that reads the alert json from storage or local directory ''' - if self.read_from_s3: + if self.read_from_storage: if verbose: - print('Reading alert json from s3') + print('Reading alert json from storage') try: f = gwstorage.download_gwtm_file(alert_path_name, config.STORAGE_BUCKET_SOURCE, config) data = json.loads(f) diff --git a/src/gwtm_cron/gwtm_listener/gwstorage.py b/src/gwtm_cron/gwtm_listener/gwstorage.py index bded8ab..91da352 100644 --- a/src/gwtm_cron/gwtm_listener/gwstorage.py +++ b/src/gwtm_cron/gwtm_listener/gwstorage.py @@ -1,18 +1,82 @@ import fsspec # type: ignore +from swiftclient import Connection as SwiftConnection # type: ignore +from swiftclient.exceptions import ClientException as SwiftClientException # type: ignore +from keystoneauth1.identity import v3 # type: ignore +from keystoneauth1 import session # type: ignore def _get_fs(source, config): + """Get filesystem for S3 or Azure. Swift uses _get_swift_conn instead.""" try: if source == 's3': return fsspec.filesystem("s3", key=config.AWS_ACCESS_KEY_ID, secret=config.AWS_SECRET_ACCESS_KEY) if source == 'abfs': return fsspec.filesystem("abfs", account_name=config.AZURE_ACCOUNT_NAME, account_key=config.AZURE_ACCOUNT_KEY) - except Exception: - raise Exception(f"Error in creating {source} filesystem") + if source == 'swift': + # Swift uses python-swiftclient directly, not fsspec + raise NotImplementedError("Swift should use _get_swift_conn, not _get_fs") + except Exception as e: + raise Exception(f"Error in creating {source} filesystem: {str(e)}") + + +def _get_swift_conn(config): + """Get Swift connection using python-swiftclient. + + Supports both application credentials and username/password authentication. + If OS_USERNAME looks like a UUID (application credential ID), uses + application credential authentication via keystoneauth1. Otherwise uses + username/password authentication. + """ + try: + # Check if we're using application credentials (UUID format) + # Application credential IDs are 32 character hex strings + import re + is_app_cred = bool(re.match(r'^[a-f0-9]{32}$', config.OS_USERNAME or '')) + + if is_app_cred: + # Use application credential authentication with keystoneauth1 + auth = v3.ApplicationCredential( + auth_url=config.OS_AUTH_URL, + application_credential_id=config.OS_USERNAME, + application_credential_secret=config.OS_PASSWORD + ) + sess = session.Session(auth=auth) + + return SwiftConnection( + session=sess, + os_options={ + 'object_storage_url': config.OS_STORAGE_URL + } + ) + else: + # Use username/password authentication + return SwiftConnection( + authurl=config.OS_AUTH_URL, + user=config.OS_USERNAME, + key=config.OS_PASSWORD, + os_options={ + 'user_domain_name': config.OS_USER_DOMAIN_NAME, + 'project_domain_name': config.OS_PROJECT_DOMAIN_NAME, + 'project_name': config.OS_PROJECT_NAME, + }, + auth_version='3' + ) + except Exception as e: + raise Exception(f"Error creating Swift connection: {str(e)}") def download_gwtm_file(filename, source='s3', config=None, decode=True): try: + if source == 'swift': + # Use python-swiftclient directly for Swift + conn = _get_swift_conn(config) + headers, content = conn.get_object(config.OS_CONTAINER_NAME, filename) + if decode: + return content.decode('utf-8') + else: + return content + + # Use fsspec for S3 and Azure if source == 's3': s3file = fsspec.open(f"s3://{config.AWS_BUCKET}/{filename}") @@ -31,12 +95,19 @@ def download_gwtm_file(filename, source='s3', config=None, decode=True): def upload_gwtm_file(content, filename, source="s3", config=None): + if source == 'swift': + # Use python-swiftclient directly for Swift + conn = _get_swift_conn(config) + conn.put_object(config.OS_CONTAINER_NAME, filename, content) + return True + + # Use fsspec for S3 and Azure fs = _get_fs(source=source, config=config) if source=="s3" and f"{config.AWS_BUCKET}/" not in filename: filename = f"{config.AWS_BUCKET}/{filename}" - open_file = fs.open(filename, "wb") + open_file = fs.open(filename, "wb") with open_file as of: of.write(content) of.close() @@ -44,7 +115,21 @@ def upload_gwtm_file(content, filename, source="s3", config=None): def list_gwtm_bucket(container, source="s3", config=None): + if source == 'swift': + # Use python-swiftclient directly for Swift + conn = _get_swift_conn(config) + headers, objects = conn.get_container(config.OS_CONTAINER_NAME, prefix=f"{container}/") + ret = [] + for obj in objects: + name = obj['name'] + # Skip the directory marker itself + if name != f"{container}/": + ret.append(name) + return sorted(ret) + + # Use fsspec for S3 and Azure fs = _get_fs(source=source, config=config) + if source == 's3': bucket_content = fs.ls(f"{config.AWS_BUCKET}/{container}") ret = [] @@ -53,12 +138,22 @@ def list_gwtm_bucket(container, source="s3", config=None): if split_b != f"{container}/": ret.append(split_b) return sorted(ret) - + ret = fs.ls(container) return sorted(ret) def delete_gwtm_files(keys, source="s3", config=None): + if source == 'swift': + # Use python-swiftclient directly for Swift + conn = _get_swift_conn(config) + if isinstance(keys, str): + keys = [keys] + for key in keys: + conn.delete_object(config.OS_CONTAINER_NAME, key) + return True + + # Use fsspec for S3 and Azure if source=="s3": if isinstance(keys, list): for i,k in enumerate(keys): @@ -66,7 +161,7 @@ def delete_gwtm_files(keys, source="s3", config=None): keys[i] = f"{config.AWS_BUCKET}/{k}" if isinstance(keys, str) and f"{config.AWS_BUCKET}/" not in keys: keys = f"{config.AWS_BUCKET}/{keys}" - + fs = _get_fs(source=source, config=config) fs.rm(keys) return True @@ -93,7 +188,7 @@ class test_config(object): bucket_sources = ["s3", "abfs"] for source in bucket_sources: - content = "line1\line2" + content = "line1\\nline2" filename = "test/vv.text" test1 = upload_gwtm_file(content, filename, source=source, config=config) assert test1, "error upload" @@ -106,4 +201,4 @@ class test_config(object): s3_content = list_gwtm_bucket("fit", "s3", config)[0:10] abfs_content = list_gwtm_bucket("fit", "abfs", config)[0:10] - assert s3_content==abfs_content \ No newline at end of file + assert s3_content==abfs_content diff --git a/src/gwtm_cron/gwtm_listener/icecube_notice.py b/src/gwtm_cron/gwtm_listener/icecube_notice.py index da2d21e..34bbff8 100644 --- a/src/gwtm_cron/gwtm_listener/icecube_notice.py +++ b/src/gwtm_cron/gwtm_listener/icecube_notice.py @@ -11,14 +11,14 @@ import gw_function as function # type: ignore -def listen(config : config.Config, alert, write_to_s3=True, verbose=False, dry_run=False, alertname=None): +def listen(config : config.Config, alert, write_to_storage=True, verbose=False, dry_run=False, alertname=None): record = json.loads(alert) rkeys = record.keys() print(record) icecube_notice = { - "ref_ID" : record["reference"]["gcn.notices.LVK.alert"] if "reference" in rkeys else "error", + "ref_id" : record["reference"]["gcn.notices.LVK.alert"] if "reference" in rkeys else "error", "alert_datetime" : record["alert_datetime"] if "alert_datetime" in rkeys else '1991-12-23T19:15:00', "observation_start" : record["observation_start"] if "observation_start" in rkeys else '1991-12-23T19:15:00', "observation_stop" : record["observation_stop"] if "observation_stop" in rkeys else '1991-12-23T19:15:00', @@ -26,7 +26,7 @@ def listen(config : config.Config, alert, write_to_s3=True, verbose=False, dry_r "pval_bayesian" : record["pval_bayesian"] if "pval_bayesian" in rkeys else '0.0', } - # ref_id = icecube_notice["ref_ID"] + # ref_id = icecube_notice["ref_id"] # if len(ref_id.split("-")): # graceid = ref_id.split("-")[0] # icecube_notice["graceid"] = graceid @@ -105,4 +105,4 @@ def listen(config : config.Config, alert, write_to_s3=True, verbose=False, dry_r if __name__ == '__main__': l = listener.Listener(listener_type="ICECUBE_NOTICE") # noqa: E741 - l.run(write_to_s3=True, verbose=True, dry_run=False) + l.run(write_to_storage=True, verbose=True, dry_run=False) diff --git a/src/gwtm_cron/gwtm_listener/ligo_alert.py b/src/gwtm_cron/gwtm_listener/ligo_alert.py index e30bc53..329b519 100644 --- a/src/gwtm_cron/gwtm_listener/ligo_alert.py +++ b/src/gwtm_cron/gwtm_listener/ligo_alert.py @@ -25,8 +25,8 @@ # from find_galaxies import EventLocalization,generate_galaxy_list -def listen(config : config.Config, alert, write_to_s3=True, verbose=False, dry_run=False, alertname=None): - +def listen(config : config.Config, alert, write_to_storage=True, verbose=False, dry_run=False, alertname=None): + record = json.loads(alert) run_test = True @@ -39,11 +39,12 @@ def listen(config : config.Config, alert, write_to_s3=True, verbose=False, dry_r writer = io.Writer( alert=alert, s3path=s3path, - write_to_s3=write_to_s3 + write_to_storage=write_to_storage ) gwa = {} ext_gwa = None + post_galaxies_json = None alert_keys = record.keys() gwa.update({ @@ -150,7 +151,6 @@ def listen(config : config.Config, alert, write_to_s3=True, verbose=False, dry_r writer.set_skymap(skymap_bytes) writer.process(config=config, verbose=verbose) - post_galaxies_json = None try: # create EventLocatlization object to be passed into the galaxies list gwa_obj = fg.EventLocalization(gwa) @@ -223,15 +223,15 @@ def listen(config : config.Config, alert, write_to_s3=True, verbose=False, dry_r #call delete before function.delete_galaxy_list(post_galaxies_json, config=config) function.post_galaxy_list(post_galaxies_json, config=config) - + if ext_gwa is not None: ext_gwa = function.post_gwtm_alert(ext_gwa, config=config) - - if run_test: - function.del_test_alerts(config=config) + + if run_test: + function.del_test_alerts(config=config) return gwa, ext_gwa if __name__ == '__main__': l = listener.Listener(listener_type="LIGO_ALERT") - l.run(write_to_s3=True, verbose=True, dry_run=False) + l.run(write_to_storage=True, verbose=True, dry_run=False) diff --git a/src/gwtm_cron/gwtm_listener/listener.py b/src/gwtm_cron/gwtm_listener/listener.py index d3da831..caf585c 100644 --- a/src/gwtm_cron/gwtm_listener/listener.py +++ b/src/gwtm_cron/gwtm_listener/listener.py @@ -1,4 +1,6 @@ - +import signal +import sys +import os from gcn_kafka import Consumer # type: ignore @@ -6,11 +8,13 @@ from . import gw_config as config from . import ligo_alert from . import icecube_notice + from . import logger as log_module except ImportError: # If running as a script, import from the parent directory import gw_config as config # type: ignore import ligo_alert # type: ignore import icecube_notice # type: ignore + import logger as log_module # type: ignore LISTENER_TYPES = { "LIGO_ALERT" : { @@ -26,52 +30,136 @@ class Listener(): - def __init__(self, listener_type, config_path: str = "home/azureuser/cron/listener_config.json"): + def __init__(self, listener_type, config_path: str = None): assert listener_type in LISTENER_TYPES.keys(), "Invalid Listener Type" self.listener_type = listener_type + self.running = True + + # Initialize logger + # Use JSON logging if LOG_FORMAT=json, otherwise use print statements + self.use_json_logging = os.environ.get('LOG_FORMAT', '').lower() == 'json' + if self.use_json_logging: + self.logger = log_module.get_logger(__name__) + + # Backwards compatibility: try production path first, then local + if config_path is None: + production_path = "/home/azureuser/cron/listener_config.json" + local_path = "./listener_config.json" + + if os.path.exists(production_path): + config_path = production_path + elif os.path.exists(local_path): + config_path = local_path + # else: config_path stays None, will use environment variables self.config = config.Config(path_to_config=config_path) + # Build consumer config - let gcn_kafka handle security, just specify IPv4 and offset + consumer_config = {'broker.address.family': 'v4'} + + # KAFKA_OFFSET_RESET controls where to start reading from: + # - 'latest' (default): Only new messages arriving after consumer starts + # - 'earliest': Read from the oldest available message in the buffer (past few days) + offset_reset = os.environ.get('KAFKA_OFFSET_RESET', 'latest') + consumer_config['auto.offset.reset'] = offset_reset + + # Optional: Consumer group ID for persistent offset tracking + kafka_group_id = os.environ.get('KAFKA_GROUP_ID', '') + if kafka_group_id: + consumer_config['group.id'] = kafka_group_id + if self.use_json_logging: + self.logger.info(f"Kafka group ID: {kafka_group_id}") + else: + print(f"Kafka group ID: {kafka_group_id}") + + if self.use_json_logging: + self.logger.info(f"Kafka consumer configured (IPv4-only, offset: {offset_reset})") + else: + print(f"Kafka consumer configured (IPv4-only, offset: {offset_reset})") + self.consumer = Consumer( + config=consumer_config, client_id=self.config.KAFKA_CLIENT_ID, client_secret=self.config.KAFKA_CLIENT_SECRET ) - + self.consumer.subscribe([ LISTENER_TYPES[self.listener_type]["domain"] ]) + # Set up signal handlers for graceful shutdown + signal.signal(signal.SIGTERM, self._shutdown_handler) + signal.signal(signal.SIGINT, self._shutdown_handler) + + def _log(self, message: str, level: str = 'info', **kwargs): + """Log message using JSON or print depending on configuration""" + if self.use_json_logging: + log_func = getattr(self.logger, level) + log_func(message, extra={'listener_type': self.listener_type, **kwargs}) + else: + print(message) - def _listen(self, alert, write_to_s3, verbose, dry_run, alertname=None): + def _shutdown_handler(self, signum, frame): + """Handle shutdown signals gracefully""" + self._log(f"Received signal {signum}, initiating graceful shutdown...", level='warning') + self.running = False + + + def _listen(self, alert, write_to_storage, verbose, dry_run, alertname=None): listener_function = LISTENER_TYPES[self.listener_type]["func"] - return listener_function(self.config, alert, write_to_s3, verbose, dry_run, alertname) - + return listener_function(self.config, alert, write_to_storage, verbose, dry_run, alertname) - def run(self, write_to_s3=True, verbose=False, dry_run=False): + + def run(self, write_to_storage=True, verbose=False, dry_run=False): if verbose: - print(f'Listening for alerts from {LISTENER_TYPES[self.listener_type]["domain"]}') + self._log(f'Listening for alerts from {LISTENER_TYPES[self.listener_type]["domain"]}', + domain=LISTENER_TYPES[self.listener_type]["domain"]) + + consume_count = 0 + heartbeat_interval = 300 # Log heartbeat every 5 minutes - while True: + while self.running: for message in self.consumer.consume(timeout=1): + if not self.running: + break + alert, ext_alert = self._listen( - alert=message.value(), - write_to_s3=write_to_s3, + alert=message.value(), + write_to_storage=write_to_storage, verbose=verbose, dry_run=dry_run ) if verbose: - print(alert) - if ext_alert: - print() - print(ext_alert) + if self.use_json_logging: + self._log("Alert processed", alert_summary=str(alert)[:200]) + if ext_alert: + self._log("Extended alert processed", ext_alert_summary=str(ext_alert)[:200]) + else: + print(alert) + if ext_alert: + print() + print(ext_alert) + + # Heartbeat logging to show the listener is active + consume_count += 1 + if verbose and consume_count % heartbeat_interval == 0: + self._log(f"Listener active, no messages received in last {heartbeat_interval} seconds", + consume_cycles=consume_count) + + # Cleanup on shutdown + if verbose: + self._log("Closing Kafka consumer...") + self.consumer.close() + if verbose: + self._log("Listener shutdown complete") - def local_run(self, alert_json_path: str, write_to_s3=False, verbose=True, dry_run=True, alertname=None): + def local_run(self, alert_json_path: str, write_to_storage=False, verbose=True, dry_run=True, alertname=None): with open(alert_json_path, 'r') as f: record = f.read() - alert, ext_alert = self._listen(alert=record, write_to_s3=write_to_s3, verbose=verbose, dry_run=dry_run, alertname=alertname) + alert, ext_alert = self._listen(alert=record, write_to_storage=write_to_storage, verbose=verbose, dry_run=dry_run, alertname=alertname) if verbose: print(alert) if ext_alert: @@ -82,4 +170,4 @@ def local_run(self, alert_json_path: str, write_to_s3=False, verbose=True, dry_r if __name__ == '__main__': atype = "LIGO_ALERT" l = Listener(listener_type=atype) # noqa: E741 - l.run(write_to_s3=False, verbose=True, dry_run=True) + l.run(write_to_storage=False, verbose=True, dry_run=True) diff --git a/src/gwtm_cron/gwtm_listener/logger.py b/src/gwtm_cron/gwtm_listener/logger.py new file mode 100644 index 0000000..21a61c5 --- /dev/null +++ b/src/gwtm_cron/gwtm_listener/logger.py @@ -0,0 +1,100 @@ +""" +Structured JSON logging for Kubernetes deployment. + +This module provides JSON-formatted logging that integrates well with +Kubernetes log aggregation systems (ELK, Loki, etc.). + +Usage: + from gwtm_listener import logger + + log = logger.get_logger(__name__) + log.info("Processing alert", extra={"alert_id": "S230518h", "type": "LIGO"}) +""" + +import logging +import json +import sys +import os +from datetime import datetime + + +class JsonFormatter(logging.Formatter): + """Format log records as JSON for structured logging""" + + def format(self, record): + log_data = { + 'timestamp': datetime.utcnow().isoformat() + 'Z', + 'level': record.levelname, + 'message': record.getMessage(), + 'logger': record.name, + 'module': record.module, + 'function': record.funcName, + 'line': record.lineno, + } + + # Add extra fields if provided + if hasattr(record, 'alert_id'): + log_data['alert_id'] = record.alert_id + if hasattr(record, 'listener_type'): + log_data['listener_type'] = record.listener_type + if hasattr(record, 'event_type'): + log_data['event_type'] = record.event_type + + # Add exception info if present + if record.exc_info: + log_data['exception'] = self.formatException(record.exc_info) + + return json.dumps(log_data) + + +def get_logger(name: str, level: str = None) -> logging.Logger: + """ + Get or create a structured logger. + + Args: + name: Logger name (typically __name__) + level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + If not provided, reads from LOG_LEVEL environment variable, + defaults to INFO + + Returns: + Configured logger instance + """ + logger = logging.getLogger(name) + + # Only configure if not already configured + if not logger.handlers: + # Determine log level + if level is None: + level = os.environ.get('LOG_LEVEL', 'INFO').upper() + + logger.setLevel(getattr(logging, level)) + + # Create handler + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(JsonFormatter()) + logger.addHandler(handler) + + # Prevent propagation to avoid duplicate logs + logger.propagate = False + + return logger + + +# Convenience function for backward compatibility with print statements +def log_info(message: str, **kwargs): + """Quick info log (for migration from print statements)""" + logger = get_logger('gwtm_cron') + logger.info(message, extra=kwargs) + + +def log_error(message: str, **kwargs): + """Quick error log""" + logger = get_logger('gwtm_cron') + logger.error(message, extra=kwargs) + + +def log_warning(message: str, **kwargs): + """Quick warning log""" + logger = get_logger('gwtm_cron') + logger.warning(message, extra=kwargs)