From fb93cb35fa07fe2be0b9eb85083bdd7ed01d6902 Mon Sep 17 00:00:00 2001 From: antoine-le-calloch Date: Mon, 11 May 2026 23:12:34 -0500 Subject: [PATCH 1/3] Add repair_fp_hists_ordering binary for fixing timeseries field ordering. --- Cargo.lock | 49 ++++ Cargo.toml | 1 + Dockerfile | 1 + Dockerfile.gpu | 3 +- src/bin/repair_fp_hists_ordering.rs | 363 ++++++++++++++++++++++++++++ src/utils/data.rs | 16 +- src/utils/mod.rs | 1 + src/utils/parser.rs | 7 + 8 files changed, 439 insertions(+), 2 deletions(-) create mode 100644 src/bin/repair_fp_hists_ordering.rs create mode 100644 src/utils/parser.rs diff --git a/Cargo.lock b/Cargo.lock index d1125d57..36705606 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -396,6 +396,18 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -619,6 +631,7 @@ dependencies = [ "apache-avro", "apache-avro-derive", "apache-avro-macros", + "async-channel", "async-trait", "base64 0.22.1", "bcrypt", @@ -931,6 +944,15 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "config" version = "0.15.22" @@ -1468,6 +1490,27 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.4.1" @@ -3157,6 +3200,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" diff --git a/Cargo.toml b/Cargo.toml index bace41ce..d9bdb42d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ aes-gcm = "0.10.3" apache-avro = { version = "0.21.0", features = ["snappy", "derive"] } apache-avro-derive = "0.21.0" apache-avro-macros = { version = "1", path = "apache-avro-macros" } +async-channel = "2.5.0" async-trait = "0.1.89" base64 = "0.22.1" bcrypt = "0.17.1" diff --git a/Dockerfile b/Dockerfile index 8776bf3d..36473991 100644 --- a/Dockerfile +++ b/Dockerfile @@ -75,6 +75,7 @@ COPY --from=builder /app/target/release/kafka_producer /app/kafka_producer COPY --from=builder /app/target/release/api /app/boom-api COPY --from=builder /app/target/release/migrate_fp_flux /app/migrate_fp_flux COPY --from=builder /app/target/release/migrate_snr /app/migrate_snr +COPY --from=builder /app/target/release/repair_fp_hists_ordering /app/repair_fp_hists_ordering COPY --from=builder /opt/ort /opt/ort CMD ["/app/scheduler"] diff --git a/Dockerfile.gpu b/Dockerfile.gpu index 1029259d..4b0ba5d5 100644 --- a/Dockerfile.gpu +++ b/Dockerfile.gpu @@ -37,7 +37,7 @@ RUN mkdir -p /app/src && \ cargo build --release && rm -rf /app/src COPY ./src /app/src -RUN cargo build --release --bin scheduler --bin migrate_fp_flux --bin migrate_snr +RUN cargo build --release --bin scheduler --bin migrate_fp_flux --bin migrate_snr --bin repair_fp_hists_ordering FROM nvidia/cuda:12.8.1-cudnn-runtime-ubuntu24.04 AS app @@ -56,6 +56,7 @@ WORKDIR /app COPY --from=builder /app/target/release/scheduler /app/scheduler COPY --from=builder /app/target/release/migrate_fp_flux /app/migrate_fp_flux COPY --from=builder /app/target/release/migrate_snr /app/migrate_snr +COPY --from=builder /app/target/release/repair_fp_hists_ordering /app/repair_fp_hists_ordering COPY --from=builder /opt/ort /opt/ort CMD ["/app/scheduler"] diff --git a/src/bin/repair_fp_hists_ordering.rs b/src/bin/repair_fp_hists_ordering.rs new file mode 100644 index 00000000..9fd3fbb5 --- /dev/null +++ b/src/bin/repair_fp_hists_ordering.rs @@ -0,0 +1,363 @@ +use boom::{ + conf::{load_dotenv, AppConfig}, + utils::{data::make_progress_bar, enums::Survey, parser::parse_positive_usize}, +}; +use clap::Parser; +use futures::TryStreamExt; +use mongodb::{ + bson::{doc, Document}, + options::{UpdateModifications, UpdateOneModel, WriteModel}, + Namespace, +}; +use tracing::{error, info, warn, Level}; +use tracing_subscriber::FmtSubscriber; + +const QUEUE_MULTIPLIER: usize = 2; + +/// Binary for repairing the ordering of timeseries fields +/// (`fp_hists`, `prv_candidates`, and `prv_nondetections` for ZTF) in the +/// `_alerts_aux` collection. +/// +/// The boom pipeline require each series to be strictly increasing by +/// `jd`. Records inserted before that invariant was enforced can contain +/// out-of-order points, which causes `prepare_timeseries_update` +/// to log the error "is not strictly increasing". +/// +/// This binary streams `_alerts_aux` projecting only each series' +/// `jd` arrays, detects which records violate the invariant, and dispatches a +/// server-side aggregation update that filters out non-finite `jd`s, sorts by +/// `jd` ascending, and drops duplicate-`jd` neighbours - the exact same +/// sanitization `TimeSeries::sanitize_timeseries` performs in Rust. +#[derive(Parser)] +struct Cli { + #[arg(long, value_enum)] + survey: Survey, + + #[arg(long, value_name = "FILE", default_value = "config.yaml")] + config: String, + + #[arg(long, default_value_t = 5000, value_parser = parse_positive_usize)] + batch_size: usize, + + /// Number of parallel worker tasks. Each worker holds its own DB connection. + #[arg(long, default_value_t = 1, value_parser = parse_positive_usize)] + processes: usize, + + /// Scan and report broken records without writing anything. + #[arg(long, default_value_t = false)] + dry_run: bool, +} + +/// Timeseries fields stored in `_alerts_aux` that must be strictly +/// increasing by `jd`. Source of truth: the `AlertAuxForUpdate` structs in +/// `src/alert/.rs`. +fn timeseries_fields(survey: &Survey) -> &'static [&'static str] { + match survey { + Survey::Ztf => &["prv_candidates", "prv_nondetections", "fp_hists"], + Survey::Lsst => &["prv_candidates", "fp_hists"], + Survey::Decam => &["prv_candidates", "fp_hists"], + } +} + +/// Mirrors `TimeSeries::validate_monotonic_increasing` in `src/alert/base.rs`: +/// any non-finite `jd` or `jd <= prev_jd` makes the series invalid. A missing +/// or non-array field is treated as valid (nothing to repair). +fn is_strictly_increasing(doc: &Document, field: &str) -> bool { + let arr = match doc.get_array(field) { + Ok(a) => a, + Err(_) => return true, + }; + let mut prev: Option = None; + for item in arr { + let jd = match item.as_document().and_then(|d| d.get_f64("jd").ok()) { + Some(v) => v, + None => return false, + }; + if !jd.is_finite() { + return false; + } + if let Some(p) = prev { + if jd <= p { + return false; + } + } + prev = Some(jd); + } + true +} + +/// Aggregation expression that returns a strictly-increasing-by-`jd` version +/// of `$`: +/// 1. `$filter` out elements whose `jd` is missing, non-numeric, NaN, or +/// infinite (matches `sanitize_timeseries`'s `is_finite()` retain). +/// 2. `$sortArray` by `jd` ascending. +/// 3. `$reduce` to drop any element whose `jd` equals its left neighbour +/// (keeping the first occurrence, matching `dedup_by`). +/// A non-array field is returned unchanged. +fn sort_dedup_expr(field: &str) -> Document { + let field_ref = format!("${}", field); + + let filter_finite = doc! { + "$filter": { + "input": &field_ref, + "as": "x", + "cond": { + "$and": [ + { "$isNumber": "$$x.jd" }, + { "$eq": ["$$x.jd", "$$x.jd"] }, + { "$lt": [{ "$abs": "$$x.jd" }, 1e308_f64] } + ] + } + } + }; + + let sorted = doc! { + "$sortArray": { + "input": filter_finite, + "sortBy": { "jd": 1 } + } + }; + + let keep_this = doc! { + "$or": [ + { "$eq": [{ "$size": "$$value" }, 0] }, + { + "$let": { + "vars": { "last": { "$arrayElemAt": ["$$value", -1] } }, + "in": { "$ne": ["$$last.jd", "$$this.jd"] } + } + } + ] + }; + + let dedup = doc! { + "$reduce": { + "input": sorted, + "initialValue": [], + "in": { + "$cond": [ + keep_this, + { "$concatArrays": ["$$value", ["$$this"]] }, + "$$value" + ] + } + } + }; + + doc! { + "$cond": [ + { "$isArray": &field_ref }, + dedup, + &field_ref + ] + } +} + +struct RepairWork { + id: mongodb::bson::Bson, + broken_fields: Vec<&'static str>, +} + +async fn run_repair( + survey: &Survey, + db: mongodb::Database, + batch_size: usize, + processes: usize, + dry_run: bool, +) -> Result<(), mongodb::error::Error> { + let aux_collection: mongodb::Collection = + db.collection(&format!("{}_alerts_aux", survey)); + let aux_ns = aux_collection.namespace(); + let fields = timeseries_fields(survey); + + // Only fetch what we need to validate ordering: `_id` and each series' `jd`s. + let mut projection = doc! { "_id": 1 }; + for f in fields { + projection.insert(format!("{}.jd", f), 1); + } + + let estimated = aux_collection.estimated_document_count().await.unwrap_or(0); + let pb = make_progress_bar(estimated, format!("scan→{}", survey)); + + let queue_capacity = processes * batch_size * QUEUE_MULTIPLIER; + let (tx, rx) = async_channel::bounded::(queue_capacity); + + let mut workers = Vec::with_capacity(processes); + for _ in 0..processes { + let rx = rx.clone(); + let db = db.clone(); + let aux_ns = aux_ns.clone(); + workers.push(tokio::spawn(async move { + repair_worker(db, aux_ns, rx, batch_size, dry_run).await + })); + } + drop(rx); + + let mut cursor = aux_collection + .find(doc! {}) + .projection(projection) + .no_cursor_timeout(true) + .await?; + + let mut scanned: u64 = 0; + let mut broken_total: u64 = 0; + while let Some(d) = cursor.try_next().await? { + scanned += 1; + pb.inc(1); + + let broken: Vec<&'static str> = fields + .iter() + .copied() + .filter(|f| !is_strictly_increasing(&d, f)) + .collect(); + if broken.is_empty() { + continue; + } + broken_total += 1; + let id = match d.get("_id") { + Some(v) => v.clone(), + None => continue, + }; + if tx + .send(RepairWork { + id, + broken_fields: broken, + }) + .await + .is_err() + { + break; + } + } + drop(tx); + + let mut first_err: Option = None; + let mut modified_total: u64 = 0; + for h in workers { + match h.await { + Ok(Ok(n)) => modified_total += n, + Ok(Err(e)) => { + error!("worker failed: {}", e); + first_err.get_or_insert(e); + } + Err(e) => { + error!("worker join failed: {}", e); + } + } + } + pb.finish(); + + info!( + survey = %survey, + scanned, + broken = broken_total, + modified = modified_total, + dry_run, + "repair_fp_hists_ordering done" + ); + + if let Some(e) = first_err { + return Err(e); + } + Ok(()) +} + +async fn repair_worker( + db: mongodb::Database, + aux_ns: Namespace, + rx: async_channel::Receiver, + batch_size: usize, + dry_run: bool, +) -> Result { + let client = db.client().clone(); + let mut batch: Vec = Vec::with_capacity(batch_size); + let mut modified: u64 = 0; + + while let Ok(work) = rx.recv().await { + if dry_run { + warn!( + id = ?work.id, + fields = ?work.broken_fields, + "would repair" + ); + continue; + } + let mut set_doc = Document::new(); + for f in &work.broken_fields { + set_doc.insert(*f, sort_dedup_expr(f)); + } + let pipeline = vec![doc! { "$set": set_doc }]; + batch.push(WriteModel::UpdateOne( + UpdateOneModel::builder() + .namespace(aux_ns.clone()) + .filter(doc! { "_id": work.id }) + .update(UpdateModifications::Pipeline(pipeline)) + .build(), + )); + if batch.len() >= batch_size { + modified += flush_batch(&client, &mut batch).await?; + } + } + if !batch.is_empty() { + modified += flush_batch(&client, &mut batch).await?; + } + Ok(modified) +} + +async fn flush_batch( + client: &mongodb::Client, + batch: &mut Vec, +) -> Result { + if batch.is_empty() { + return Ok(0); + } + let drained: Vec = std::mem::take(batch); + let result = client.bulk_write(drained).await?; + Ok(result.modified_count as u64) +} + +#[tokio::main] +async fn main() { + load_dotenv(); + + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::INFO) + .finish(); + tracing::subscriber::set_global_default(subscriber).expect("setting subscriber failed"); + + let args = Cli::parse(); + + let config = match AppConfig::from_path(&args.config) { + Ok(c) => c, + Err(e) => { + error!("failed to load config from {}: {}", args.config, e); + std::process::exit(1); + } + }; + + let db = match config.build_db().await { + Ok(db) => db, + Err(e) => { + error!("failed to build mongo client: {}", e); + std::process::exit(1); + } + }; + + info!( + "starting repair_fp_hists_ordering: survey={} processes={} batch_size={} dry_run={}", + args.survey, args.processes, args.batch_size, args.dry_run, + ); + + if let Err(e) = run_repair( + &args.survey, + db, + args.batch_size, + args.processes, + args.dry_run, + ) + .await + { + error!("repair run failed: {}", e); + std::process::exit(1); + } +} diff --git a/src/utils/data.rs b/src/utils/data.rs index 080d3edc..3912f3a8 100644 --- a/src/utils/data.rs +++ b/src/utils/data.rs @@ -1,7 +1,21 @@ use futures::StreamExt; -use indicatif::ProgressBar; +use indicatif::{ProgressBar, ProgressStyle}; use std::io::Write; +/// Standard progress bar used by long-running maintenance binaries +/// (`repair_fp_hists_ordering`, `migrate_*`). +pub fn make_progress_bar(total: u64, label: String) -> ProgressBar { + let pb = ProgressBar::new(total); + pb.set_style( + ProgressStyle::with_template( + "{msg} {bar:40} {pos}/{len} [{elapsed_precise} < {eta_precise}]", + ) + .unwrap(), + ); + pb.set_message(label); + pb +} + // let's make this more generic so we can take any file type, not just a NamedTempFile pub async fn download_to_file( file: &mut impl Write, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index bb9f1dda..b77ebcba 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -5,6 +5,7 @@ pub mod enums; pub mod fits; pub mod lightcurves; pub mod o11y; +pub mod parser; pub mod spatial; pub mod testing; pub mod worker; diff --git a/src/utils/parser.rs b/src/utils/parser.rs new file mode 100644 index 00000000..ef58fc25 --- /dev/null +++ b/src/utils/parser.rs @@ -0,0 +1,7 @@ +pub fn parse_positive_usize(s: &str) -> Result { + let n: usize = s.parse().map_err(|e| format!("not a usize: {e}"))?; + if n == 0 { + return Err("must be > 0".to_string()); + } + Ok(n) +} From 52ee5fb60a936f99b872247e4b652519dc765a27 Mon Sep 17 00:00:00 2001 From: antoine-le-calloch Date: Tue, 12 May 2026 00:40:29 -0500 Subject: [PATCH 2/3] Rename repair_fp_hists_ordering to repair_photometry_ordering and update references. --- Dockerfile | 2 +- Dockerfile.gpu | 4 ++-- ...r_fp_hists_ordering.rs => repair_photometry_ordering.rs} | 6 +++--- src/utils/data.rs | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) rename src/bin/{repair_fp_hists_ordering.rs => repair_photometry_ordering.rs} (97%) diff --git a/Dockerfile b/Dockerfile index 36473991..894b12f6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -75,7 +75,7 @@ COPY --from=builder /app/target/release/kafka_producer /app/kafka_producer COPY --from=builder /app/target/release/api /app/boom-api COPY --from=builder /app/target/release/migrate_fp_flux /app/migrate_fp_flux COPY --from=builder /app/target/release/migrate_snr /app/migrate_snr -COPY --from=builder /app/target/release/repair_fp_hists_ordering /app/repair_fp_hists_ordering +COPY --from=builder /app/target/release/repair_photometry_ordering /app/repair_photometry_ordering COPY --from=builder /opt/ort /opt/ort CMD ["/app/scheduler"] diff --git a/Dockerfile.gpu b/Dockerfile.gpu index 4b0ba5d5..b89c6184 100644 --- a/Dockerfile.gpu +++ b/Dockerfile.gpu @@ -37,7 +37,7 @@ RUN mkdir -p /app/src && \ cargo build --release && rm -rf /app/src COPY ./src /app/src -RUN cargo build --release --bin scheduler --bin migrate_fp_flux --bin migrate_snr --bin repair_fp_hists_ordering +RUN cargo build --release --bin scheduler --bin migrate_fp_flux --bin migrate_snr --bin repair_photometry_ordering FROM nvidia/cuda:12.8.1-cudnn-runtime-ubuntu24.04 AS app @@ -56,7 +56,7 @@ WORKDIR /app COPY --from=builder /app/target/release/scheduler /app/scheduler COPY --from=builder /app/target/release/migrate_fp_flux /app/migrate_fp_flux COPY --from=builder /app/target/release/migrate_snr /app/migrate_snr -COPY --from=builder /app/target/release/repair_fp_hists_ordering /app/repair_fp_hists_ordering +COPY --from=builder /app/target/release/repair_photometry_ordering /app/repair_photometry_ordering COPY --from=builder /opt/ort /opt/ort CMD ["/app/scheduler"] diff --git a/src/bin/repair_fp_hists_ordering.rs b/src/bin/repair_photometry_ordering.rs similarity index 97% rename from src/bin/repair_fp_hists_ordering.rs rename to src/bin/repair_photometry_ordering.rs index 9fd3fbb5..f39cc8d1 100644 --- a/src/bin/repair_fp_hists_ordering.rs +++ b/src/bin/repair_photometry_ordering.rs @@ -39,7 +39,7 @@ struct Cli { #[arg(long, default_value_t = 5000, value_parser = parse_positive_usize)] batch_size: usize, - /// Number of parallel worker tasks. Each worker holds its own DB connection. + /// Number of parallel worker tasks. #[arg(long, default_value_t = 1, value_parser = parse_positive_usize)] processes: usize, @@ -253,7 +253,7 @@ async fn run_repair( broken = broken_total, modified = modified_total, dry_run, - "repair_fp_hists_ordering done" + "repair_photometry_ordering done" ); if let Some(e) = first_err { @@ -344,7 +344,7 @@ async fn main() { }; info!( - "starting repair_fp_hists_ordering: survey={} processes={} batch_size={} dry_run={}", + "starting repair_photometry_ordering: survey={} processes={} batch_size={} dry_run={}", args.survey, args.processes, args.batch_size, args.dry_run, ); diff --git a/src/utils/data.rs b/src/utils/data.rs index 3912f3a8..22383427 100644 --- a/src/utils/data.rs +++ b/src/utils/data.rs @@ -3,7 +3,7 @@ use indicatif::{ProgressBar, ProgressStyle}; use std::io::Write; /// Standard progress bar used by long-running maintenance binaries -/// (`repair_fp_hists_ordering`, `migrate_*`). +/// (`repair_photometry_ordering`, `migrate_*`). pub fn make_progress_bar(total: u64, label: String) -> ProgressBar { let pb = ProgressBar::new(total); pb.set_style( From c116ce96ab6d5df04946b41ff22cb549d2500fcc Mon Sep 17 00:00:00 2001 From: antoine-le-calloch Date: Mon, 18 May 2026 08:34:43 -0500 Subject: [PATCH 3/3] Optimize repair_photometry_ordering binary to use created_at index if available to batch the scan. --- src/bin/repair_photometry_ordering.rs | 365 +++++++++++++++++--------- 1 file changed, 237 insertions(+), 128 deletions(-) diff --git a/src/bin/repair_photometry_ordering.rs b/src/bin/repair_photometry_ordering.rs index f39cc8d1..6271a84d 100644 --- a/src/bin/repair_photometry_ordering.rs +++ b/src/bin/repair_photometry_ordering.rs @@ -3,31 +3,41 @@ use boom::{ utils::{data::make_progress_bar, enums::Survey, parser::parse_positive_usize}, }; use clap::Parser; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; +use indicatif::ProgressBar; use mongodb::{ - bson::{doc, Document}, - options::{UpdateModifications, UpdateOneModel, WriteModel}, - Namespace, + bson::{doc, Bson, Document}, + options::{Hint, UpdateModifications, UpdateOneModel, WriteModel}, + Collection, Namespace, }; use tracing::{error, info, warn, Level}; use tracing_subscriber::FmtSubscriber; -const QUEUE_MULTIPLIER: usize = 2; - -/// Binary for repairing the ordering of timeseries fields -/// (`fp_hists`, `prv_candidates`, and `prv_nondetections` for ZTF) in the -/// `_alerts_aux` collection. +/// Repair the photometry timeseries arrays in `_alerts_aux`. +/// +/// Each aux document holds timeseries fields (e.g. `prv_candidates`, +/// `prv_nondetections`, `fp_hists`) that are expected to be strictly increasing +/// by `jd`. Bugs in the alert ingestion path could leave these arrays out of +/// order, containing duplicate `jd` values, or carrying entries with a +/// non-finite/non-numeric `jd`. This is a one-shot maintenance tool that finds +/// and fixes those documents. /// -/// The boom pipeline require each series to be strictly increasing by -/// `jd`. Records inserted before that invariant was enforced can contain -/// out-of-order points, which causes `prepare_timeseries_update` -/// to log the error "is not strictly increasing". +/// Pipeline: +/// 1. Resolve the survey-specific set of timeseries fields and project only +/// `_id` and each field's `jd` so the scan stays cheap. +/// 2. If an index on `created_at` exists, split the collection into +/// `--processes` half-open partitions (via `$sample`) scanned concurrently; +/// otherwise fall back to a single unhinted full scan. +/// 3. For each document, flag fields that violate the strictly-increasing +/// invariant (`is_strictly_increasing`). +/// 4. For broken fields, issue a `$set` update whose value is an aggregation +/// expression (`sort_dedup_expr`) that filters, sorts, and dedups the array +/// in place. Updates are batched into bulk writes. /// -/// This binary streams `_alerts_aux` projecting only each series' -/// `jd` arrays, detects which records violate the invariant, and dispatches a -/// server-side aggregation update that filters out non-finite `jd`s, sorts by -/// `jd` ascending, and drops duplicate-`jd` neighbours - the exact same -/// sanitization `TimeSeries::sanitize_timeseries` performs in Rust. +/// `--dry-run` performs steps 1-3 and reports counts without writing anything. +/// The repair logic mirrors the validation/normalization rules used at +/// ingestion time so a repaired document matches what a fresh write would +/// produce. #[derive(Parser)] struct Cli { #[arg(long, value_enum)] @@ -39,7 +49,7 @@ struct Cli { #[arg(long, default_value_t = 5000, value_parser = parse_positive_usize)] batch_size: usize, - /// Number of parallel worker tasks. + /// Number of parallel scan+repair partitions. #[arg(long, default_value_t = 1, value_parser = parse_positive_usize)] processes: usize, @@ -59,7 +69,7 @@ fn timeseries_fields(survey: &Survey) -> &'static [&'static str] { } } -/// Mirrors `TimeSeries::validate_monotonic_increasing` in `src/alert/base.rs`: +/// Mirrors `TimeSeries::validate_monotonic_increasing`: /// any non-finite `jd` or `jd <= prev_jd` makes the series invalid. A missing /// or non-array field is treated as valid (nothing to repair). fn is_strictly_increasing(doc: &Document, field: &str) -> bool { @@ -86,14 +96,13 @@ fn is_strictly_increasing(doc: &Document, field: &str) -> bool { true } -/// Aggregation expression that returns a strictly-increasing-by-`jd` version -/// of `$`: -/// 1. `$filter` out elements whose `jd` is missing, non-numeric, NaN, or -/// infinite (matches `sanitize_timeseries`'s `is_finite()` retain). -/// 2. `$sortArray` by `jd` ascending. -/// 3. `$reduce` to drop any element whose `jd` equals its left neighbour -/// (keeping the first occurrence, matching `dedup_by`). -/// A non-array field is returned unchanged. +/// Builds the MongoDB aggregation expression used in a `$set` pipeline stage to +/// repair `field` in place. If the field is not an array it is left untouched; +/// otherwise it is rebuilt by: filtering out elements with a non-numeric or +/// non-finite `jd`, sorting the survivors ascending by `jd`, then deduplicating +/// so that for any group of equal `jd` only the last one (after sorting) is +/// kept. The result is a strictly-increasing-by-`jd` array, matching the +/// invariant checked by `is_strictly_increasing`. fn sort_dedup_expr(field: &str) -> Document { let field_ref = format!("${}", field); @@ -153,54 +162,108 @@ fn sort_dedup_expr(field: &str) -> Document { } } -struct RepairWork { - id: mongodb::bson::Bson, - broken_fields: Vec<&'static str>, -} - -async fn run_repair( - survey: &Survey, - db: mongodb::Database, - batch_size: usize, +/// Sample `processes - 1` `_id` values and return `processes` half-open +/// partitions covering the full collection. `None` on a side means +/// open-ended (no lower / no upper bound). +async fn compute_partitions( + aux_collection: &Collection, processes: usize, - dry_run: bool, -) -> Result<(), mongodb::error::Error> { - let aux_collection: mongodb::Collection = - db.collection(&format!("{}_alerts_aux", survey)); - let aux_ns = aux_collection.namespace(); - let fields = timeseries_fields(survey); - - // Only fetch what we need to validate ordering: `_id` and each series' `jd`s. - let mut projection = doc! { "_id": 1 }; - for f in fields { - projection.insert(format!("{}.jd", f), 1); +) -> Result, Option)>, mongodb::error::Error> { + if processes <= 1 { + return Ok(vec![(None, None)]); } + let pipeline = vec![ + doc! { "$sample": { "size": (processes - 1) as i64 } }, + doc! { "$project": { "created_at": 1 } }, + doc! { "$sort": { "created_at": 1 } }, + ]; + let mut cursor = aux_collection.aggregate(pipeline).await?; + let mut boundaries: Vec = Vec::with_capacity(processes - 1); + while let Some(d) = cursor.try_next().await? { + if let Some(v) = d.get("created_at") { + boundaries.push(v.clone()); + } + } + boundaries.dedup(); - let estimated = aux_collection.estimated_document_count().await.unwrap_or(0); - let pb = make_progress_bar(estimated, format!("scan→{}", survey)); + let mut parts = Vec::with_capacity(boundaries.len() + 1); + let mut prev: Option = None; + for b in &boundaries { + parts.push((prev.clone(), Some(b.clone()))); + prev = Some(b.clone()); + } + parts.push((prev, None)); + Ok(parts) +} - let queue_capacity = processes * batch_size * QUEUE_MULTIPLIER; - let (tx, rx) = async_channel::bounded::(queue_capacity); +/// Returns the direction (`1` or `-1`) of an index whose first key is +/// `created_at`, or `None` if no such index exists. The direction is needed so +/// the scan hint matches the actual index spec (a hint must correspond exactly +/// to an existing index). +async fn created_at_index_direction( + aux_collection: &Collection, +) -> Result, mongodb::error::Error> { + let mut cursor = aux_collection.list_indexes().await?; + while let Some(idx) = cursor.next().await { + let idx = idx?; + if let Some((k, v)) = idx.keys.iter().next() { + if k == "created_at" { + let direction = v.as_i32().or_else(|| v.as_i64().map(|n| n as i32)); + return Ok(direction); + } + } + } + Ok(None) +} - let mut workers = Vec::with_capacity(processes); - for _ in 0..processes { - let rx = rx.clone(); - let db = db.clone(); - let aux_ns = aux_ns.clone(); - workers.push(tokio::spawn(async move { - repair_worker(db, aux_ns, rx, batch_size, dry_run).await - })); +fn partition_filter(lower: &Option, upper: &Option) -> Document { + let mut cond = Document::new(); + if let Some(l) = lower { + cond.insert("$gte", l.clone()); } - drop(rx); + if let Some(u) = upper { + cond.insert("$lt", u.clone()); + } + let mut f = Document::new(); + if !cond.is_empty() { + f.insert("created_at", cond); + } + f +} + +struct PartitionStats { + scanned: u64, + broken: u64, + modified: u64, +} - let mut cursor = aux_collection - .find(doc! {}) +async fn scan_and_repair_partition( + aux_collection: Collection, + aux_ns: Namespace, + fields: &'static [&'static str], + filter: Document, + projection: Document, + batch_size: usize, + dry_run: bool, + created_at_direction: Option, + pb: ProgressBar, +) -> Result { + let client = aux_collection.client().clone(); + let mut find = aux_collection + .find(filter) .projection(projection) .no_cursor_timeout(true) - .await?; + .batch_size(batch_size as u32); + if let Some(dir) = created_at_direction { + find = find.hint(Hint::Keys(doc! { "created_at": dir })); + } + let mut cursor = find.await?; let mut scanned: u64 = 0; let mut broken_total: u64 = 0; + let mut modified: u64 = 0; + let mut batch: Vec = Vec::with_capacity(batch_size); + while let Some(d) = cursor.try_next().await? { scanned += 1; pb.inc(1); @@ -218,79 +281,20 @@ async fn run_repair( Some(v) => v.clone(), None => continue, }; - if tx - .send(RepairWork { - id, - broken_fields: broken, - }) - .await - .is_err() - { - break; - } - } - drop(tx); - - let mut first_err: Option = None; - let mut modified_total: u64 = 0; - for h in workers { - match h.await { - Ok(Ok(n)) => modified_total += n, - Ok(Err(e)) => { - error!("worker failed: {}", e); - first_err.get_or_insert(e); - } - Err(e) => { - error!("worker join failed: {}", e); - } - } - } - pb.finish(); - - info!( - survey = %survey, - scanned, - broken = broken_total, - modified = modified_total, - dry_run, - "repair_photometry_ordering done" - ); - - if let Some(e) = first_err { - return Err(e); - } - Ok(()) -} -async fn repair_worker( - db: mongodb::Database, - aux_ns: Namespace, - rx: async_channel::Receiver, - batch_size: usize, - dry_run: bool, -) -> Result { - let client = db.client().clone(); - let mut batch: Vec = Vec::with_capacity(batch_size); - let mut modified: u64 = 0; - - while let Ok(work) = rx.recv().await { if dry_run { - warn!( - id = ?work.id, - fields = ?work.broken_fields, - "would repair" - ); continue; } + let mut set_doc = Document::new(); - for f in &work.broken_fields { + for f in &broken { set_doc.insert(*f, sort_dedup_expr(f)); } let pipeline = vec![doc! { "$set": set_doc }]; batch.push(WriteModel::UpdateOne( UpdateOneModel::builder() .namespace(aux_ns.clone()) - .filter(doc! { "_id": work.id }) + .filter(doc! { "_id": id }) .update(UpdateModifications::Pipeline(pipeline)) .build(), )); @@ -301,7 +305,11 @@ async fn repair_worker( if !batch.is_empty() { modified += flush_batch(&client, &mut batch).await?; } - Ok(modified) + Ok(PartitionStats { + scanned, + broken: broken_total, + modified, + }) } async fn flush_batch( @@ -316,6 +324,107 @@ async fn flush_batch( Ok(result.modified_count as u64) } +async fn run_repair( + survey: &Survey, + db: mongodb::Database, + batch_size: usize, + processes: usize, + dry_run: bool, +) -> Result<(), mongodb::error::Error> { + let aux_collection: Collection = db.collection(&format!("{}_alerts_aux", survey)); + let aux_ns = aux_collection.namespace(); + let fields = timeseries_fields(survey); + + let mut projection = doc! { "_id": 1 }; + for f in fields { + projection.insert(format!("{}.jd", f), 1); + } + + let created_at_direction = created_at_index_direction(&aux_collection).await?; + let partitions = if created_at_direction.is_some() { + info!( + "computing {} partitions via $sample on created_at", + processes + ); + let parts = compute_partitions(&aux_collection, processes).await?; + info!( + "computed {} partition(s) (requested {})", + parts.len(), + processes + ); + parts + } else { + warn!( + "no index on `created_at` for {}; falling back to a single unhinted full scan", + aux_ns + ); + vec![(None, None)] + }; + + let estimated = aux_collection.estimated_document_count().await.unwrap_or(0); + let pb = make_progress_bar(estimated, format!("scan→{}", survey)); + pb.enable_steady_tick(std::time::Duration::from_millis(200)); + + let mut handles = Vec::with_capacity(partitions.len()); + for (lower, upper) in partitions { + let aux = aux_collection.clone(); + let aux_ns = aux_ns.clone(); + let proj = projection.clone(); + let pb = pb.clone(); + let filter = partition_filter(&lower, &upper); + handles.push(tokio::spawn(async move { + scan_and_repair_partition( + aux, + aux_ns, + fields, + filter, + proj, + batch_size, + dry_run, + created_at_direction, + pb, + ) + .await + })); + } + + let mut first_err: Option = None; + let mut scanned_total: u64 = 0; + let mut broken_total: u64 = 0; + let mut modified_total: u64 = 0; + for h in handles { + match h.await { + Ok(Ok(s)) => { + scanned_total += s.scanned; + broken_total += s.broken; + modified_total += s.modified; + } + Ok(Err(e)) => { + error!("partition failed: {}", e); + first_err.get_or_insert(e); + } + Err(e) => { + error!("partition join failed: {}", e); + } + } + } + pb.finish(); + + info!( + survey = %survey, + scanned = scanned_total, + broken = broken_total, + modified = modified_total, + dry_run, + "repair_photometry_ordering_optimized done" + ); + + if let Some(e) = first_err { + return Err(e); + } + Ok(()) +} + #[tokio::main] async fn main() { load_dotenv(); @@ -344,7 +453,7 @@ async fn main() { }; info!( - "starting repair_photometry_ordering: survey={} processes={} batch_size={} dry_run={}", + "starting repair_photometry_ordering_optimized: survey={} processes={} batch_size={} dry_run={}", args.survey, args.processes, args.batch_size, args.dry_run, );