Skip to content
9 changes: 9 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,12 @@ if ( oops_FOUND )
add_subdirectory( add_increment )
endif()
endif()

# ----------------------------------------------------------------------
# copy unified restriction script into build/bin/
# ----------------------------------------------------------------------
add_custom_target(copy_restriction ALL
COMMAND ${CMAKE_COMMAND} -E copy
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copy_restriction target copies into ${CMAKE_BINARY_DIR}/bin, but it doesn't ensure that directory exists. If oops_FOUND is false (no executables built) or the bin dir hasn't been created yet, the build can fail at this step. Add a preceding cmake -E make_directory ${CMAKE_BINARY_DIR}/bin (and consider copy_if_different to avoid unnecessary rebuilds).

Suggested change
COMMAND ${CMAKE_COMMAND} -E copy
COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_BINARY_DIR}/bin
COMMAND ${CMAKE_COMMAND} -E copy_if_different

Copilot uses AI. Check for mistakes.
${CMAKE_CURRENT_SOURCE_DIR}/ioda-restrict/ioda_restriction_filter.py
${CMAKE_BINARY_DIR}/bin/ioda_restriction_filter.py
)
Comment thread
CoryMartin-NOAA marked this conversation as resolved.
364 changes: 364 additions & 0 deletions src/ioda-restrict/ioda_restriction_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,364 @@
#!/usr/bin/env python3

from netCDF4 import Dataset
import numpy as np
import argparse
import yaml
import os
import glob
import shutil
from datetime import datetime, timedelta

OBS_DIM = "Location"


# ----------------------------------------------------------------------
# Compress consecutive index ranges (used only in exprsrd mode)
# ----------------------------------------------------------------------
def compress_ranges(idx_list):
if not idx_list:
return []
ranges = []
start = idx_list[0]
prev = idx_list[0]

for x in idx_list[1:]:
if x == prev + 1:
prev = x
else:
if start == prev:
ranges.append(f"{start}")
else:
ranges.append(f"{start}–{prev}")
start = x
prev = x
if start == prev:
ranges.append(f"{start}")
else:
ranges.append(f"{start}–{prev}")
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compress_ranges uses an en-dash character ("–") in the range strings. This can render oddly or break downstream parsing/log collection that assumes ASCII. Prefer a plain hyphen ("-") unless Unicode output is explicitly required.

Suggested change
ranges.append(f"{start}{prev}")
start = x
prev = x
if start == prev:
ranges.append(f"{start}")
else:
ranges.append(f"{start}{prev}")
ranges.append(f"{start}-{prev}")
start = x
prev = x
if start == prev:
ranges.append(f"{start}")
else:
ranges.append(f"{start}-{prev}")

Copilot uses AI. Check for mistakes.
return ranges


# ----------------------------------------------------------------------
# Recursively copy groups and variables
# ----------------------------------------------------------------------
def copy_group(in_group, out_group, mask):

# Copy group-level attributes
for attr in in_group.ncattrs():
setattr(out_group, attr, getattr(in_group, attr))

for var_name, var_in in in_group.variables.items():
fill_value = getattr(var_in, "_FillValue", None)

if fill_value is not None:
var_out = out_group.createVariable(
var_name, var_in.dtype, var_in.dimensions, fill_value=fill_value
)
else:
var_out = out_group.createVariable(
var_name, var_in.dtype, var_in.dimensions
)

Comment thread
CoryMartin-NOAA marked this conversation as resolved.
data = var_in[:]

if mask is None:
var_out[:] = data
elif OBS_DIM in var_in.dimensions:
if data.ndim == 1:
var_out[:] = data[mask]
else:
var_out[:] = data[mask, ...]
else:
var_out[:] = data
Comment on lines +61 to +71
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy_group reads every variable fully into memory (data = var_in[:]) even when only a subset of locations is needed. For large IODA files this can cause very high memory use and slow runtimes. Prefer slicing directly from the netCDF variable when OBS_DIM is present (read only the kept indices) instead of loading the full array first.

Suggested change
data = var_in[:]
if mask is None:
var_out[:] = data
elif OBS_DIM in var_in.dimensions:
if data.ndim == 1:
var_out[:] = data[mask]
else:
var_out[:] = data[mask, ...]
else:
var_out[:] = data
if mask is None:
# No restriction: copy entire variable
var_out[:] = var_in[:]
elif OBS_DIM in var_in.dimensions:
# Restricted copy along OBS_DIM; assume OBS_DIM is the leading dimension
if var_in.ndim == 1:
var_out[:] = var_in[mask]
else:
var_out[:] = var_in[mask, ...]
else:
# Variable does not depend on OBS_DIM: copy entire variable
var_out[:] = var_in[:]

Copilot uses AI. Check for mistakes.

for attr in var_in.ncattrs():
if attr != "_FillValue":
setattr(var_out, attr, getattr(var_in, attr))

for grp_name, grp_in in in_group.groups.items():
grp_out = out_group.createGroup(grp_name)
copy_group(grp_in, grp_out, mask)

Comment thread
CoryMartin-NOAA marked this conversation as resolved.
Comment thread
CoryMartin-NOAA marked this conversation as resolved.

# ----------------------------------------------------------------------
# Copy entire file unchanged
# ----------------------------------------------------------------------
def copy_entire_file(infile, outfile):
shutil.copy(infile, outfile)
print(f" Wrote (unchanged): {outfile}")

# ----------------------------------------------------------------------
# Extract date from path (exprsrd mode)
# ----------------------------------------------------------------------
def extract_date_from_path(input_dir):
prefixes = ["gfs.", "gdas.", "gcdas."]
for p in prefixes:
idx = input_dir.find(p)
if idx != -1:
date_start = idx + len(p)
date_str = input_dir[date_start:date_start + 8]
return p, date_start, date_str
raise ValueError(f"Could not find gfs., gdas., or gcdas. in path: {input_dir}")


def get_prev_48h_dir(input_dir):
prefix, date_start, date_str = extract_date_from_path(input_dir)
cur_date = datetime.strptime(date_str, "%Y%m%d")
prev_date = cur_date - timedelta(hours=48)
prev_date_str = prev_date.strftime("%Y%m%d")

return (
input_dir[:date_start]
+ prev_date_str
+ input_dir[date_start + 8:]
)


# ----------------------------------------------------------------------
# Compute non-restricted mask (exprsrd mode)
# ----------------------------------------------------------------------
def compute_nonrestricted_mask(flag, exp):
flag = np.ma.array(flag)
exp = np.ma.array(exp)

n = len(flag)
non_restricted = np.zeros(n, dtype=bool)

rsrd_missing = np.ma.getmaskarray(flag)
expr_missing = np.ma.getmaskarray(exp)
rsrd_present = ~rsrd_missing
expr_present = ~expr_missing

missing_either = rsrd_missing | expr_missing
exception_restricted = rsrd_present & expr_missing
nr_missing = missing_either & ~exception_restricted
non_restricted |= nr_missing
Comment thread
CoryMartin-NOAA marked this conversation as resolved.

expired = rsrd_present & expr_present & (exp == 48)
non_restricted |= expired

return non_restricted


# ----------------------------------------------------------------------
# Mode 1: RSRD filtering (atmos.nr)
# ----------------------------------------------------------------------
def process_rsrd_directory(input_dir, output_dir):
print(f"[RSRD] Input directory: {input_dir}")
print(f"[RSRD] Output directory: {output_dir}")

os.makedirs(output_dir, exist_ok=True)
nc_files = sorted(glob.glob(os.path.join(input_dir, "*.nc")))

if not nc_files:
print("[RSRD] No .nc files found.")
return

for infile in nc_files:
fname = os.path.basename(infile)
outfile = os.path.join(output_dir, fname)

print(f"\n[RSRD] Processing {fname}")

with Dataset(infile, "r") as nc_in:
loc_dim = nc_in.dimensions.get(OBS_DIM)

if loc_dim is None or (loc_dim.isunlimited() and len(loc_dim) == 0):
print(" No valid Location dimension — copying unchanged.")
copy_entire_file(infile, outfile)
continue

md = nc_in.groups.get("MetaData", None)
if md is None or "restrictionFlag" not in md.variables or "restrictionExpiration" not in md.variables:
print(" Missing restriction variables — copying unchanged.")
copy_entire_file(infile, outfile)
continue

flag = md["restrictionFlag"][:]
exp = md["restrictionExpiration"][:]

if flag.size == 0 or exp.size == 0:
print(" Restriction arrays zero length — copying unchanged.")
copy_entire_file(infile, outfile)
continue

flag_mask = np.ma.getmaskarray(flag)
exp_mask = np.ma.getmaskarray(exp)
mask = flag_mask & exp_mask

Comment on lines +169 to +185
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When restriction variables are missing, the script copies the input file unchanged into the supposedly filtered output directory. If this tool is used as a restriction screen, passing data through unscreened can defeat the purpose and may leak restricted observations. Consider failing fast (non-zero exit), or at least skipping output / writing an empty filtered file when the screening fields are unavailable.

Suggested change
print(" Missing restriction variables — copying unchanged.")
copy_entire_file(infile, outfile)
continue
flag = md["restrictionFlag"][:]
exp = md["restrictionExpiration"][:]
if flag.size == 0 or exp.size == 0:
print(" Restriction arrays zero length — copying unchanged.")
copy_entire_file(infile, outfile)
continue
flag_mask = np.ma.getmaskarray(flag)
exp_mask = np.ma.getmaskarray(exp)
mask = flag_mask & exp_mask
print(" Missing restriction variables — writing empty restricted file.")
mask = np.zeros(len(loc_dim), dtype=bool)
else:
flag = md["restrictionFlag"][:]
exp = md["restrictionExpiration"][:]
if flag.size == 0 or exp.size == 0:
print(" Restriction arrays zero length — writing empty restricted file.")
mask = np.zeros(len(loc_dim), dtype=bool)
else:
flag_mask = np.ma.getmaskarray(flag)
exp_mask = np.ma.getmaskarray(exp)
mask = flag_mask & exp_mask

Copilot uses AI. Check for mistakes.
total = mask.size
kept = int(mask.sum())
dropped = total - kept

print(f" Total obs: {total}")
print(f" Kept obs: {kept}")
print(f" Dropped obs: {dropped}")

with Dataset(outfile, "w") as nc_out:
for attr in nc_in.ncattrs():
setattr(nc_out, attr, getattr(nc_in, attr))

for dim_name, dim in nc_in.dimensions.items():
if dim_name == OBS_DIM:
nc_out.createDimension(dim_name, kept)
else:
nc_out.createDimension(
dim_name,
None if dim.isunlimited() else len(dim)
)
Comment thread
CoryMartin-NOAA marked this conversation as resolved.

copy_group(nc_in, nc_out, mask)

print(f" Wrote: {outfile}")


# ----------------------------------------------------------------------
# Mode 2: EXPRSRD / Non-restricted filtering (atmos.us)
# ----------------------------------------------------------------------
def process_exprsrd_directory(prev_dir, output_dir):
print(f"[NonRestrict] Previous directory: {prev_dir}")
print(f"[NonRestrict] Output directory: {output_dir}")

os.makedirs(output_dir, exist_ok=True)
nc_files = sorted(glob.glob(os.path.join(prev_dir, "*.nc")))

if not nc_files:
print("[NonRestrict] No .nc files found.")
return

for infile in nc_files:
fname = os.path.basename(infile)
outfile = os.path.join(output_dir, fname)

print(f"\n[NonRestrict] Processing {fname}")

with Dataset(infile, "r") as nc_in:
loc_dim = nc_in.dimensions.get(OBS_DIM)
if loc_dim is None or (loc_dim.isunlimited() and len(loc_dim) == 0):
print(" No valid Location dimension — copying unchanged.")
copy_entire_file(infile, outfile)
continue

md = nc_in.groups.get("MetaData", None)
if md is None or "restrictionFlag" not in md.variables or "restrictionExpiration" not in md.variables:
print(" Missing restriction variables — copying unchanged.")
copy_entire_file(infile, outfile)
Comment on lines +256 to +257
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as above for EXPRSRD mode: if restriction variables are missing the script copies the file unchanged into atmos.us. For a screening/filtering script this is unsafe; prefer skipping output or failing so restricted data can't silently pass through.

Suggested change
print(" Missing restriction variables — copying unchanged.")
copy_entire_file(infile, outfile)
print(" Missing restriction variables — no output will be written for this file.")

Copilot uses AI. Check for mistakes.
continue

flag = md["restrictionFlag"][:]
exp = md["restrictionExpiration"][:]

non_restricted_mask = compute_nonrestricted_mask(flag, exp)
restricted_mask = ~non_restricted_mask
restricted_idx = np.where(restricted_mask)[0]
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restricted_idx is computed but never used, which adds noise and can confuse future maintenance. Remove it or use it for the intended logging/reporting.

Suggested change
restricted_mask = ~non_restricted_mask
restricted_idx = np.where(restricted_mask)[0]

Copilot uses AI. Check for mistakes.

total = len(flag)
kept = int(np.sum(non_restricted_mask))
dropped = total - kept

print(f" Total obs: {total}")
print(f" Non-restricted obs: {kept}")
print(f" Restricted obs: {dropped}")

print(" Unique RSRD / EXPRSRD patterns:")
unique_groups = {}

flag_mask = np.ma.getmaskarray(flag)
exp_mask = np.ma.getmaskarray(exp)

for i in range(len(flag)):
fval = None if flag_mask[i] else int(flag[i])
eval = None if exp_mask[i] else int(exp[i])
key = (fval, eval)
unique_groups.setdefault(key, []).append(i)

for (fval, eval), idx_list in unique_groups.items():
idx_list_sorted = sorted(idx_list)
compressed = compress_ranges(idx_list_sorted)
count = len(idx_list_sorted)

ftxt = fval if fval is not None else "--"
etxt = eval if eval is not None else "--"

print(f" RSRD = {ftxt}, EXPRSRD = {etxt}")
print(f" idx ({count}) = {compressed}")
print()

if kept == 0:
print(" No non-restricted obs — skipping output.")
continue

with Dataset(outfile, "w") as nc_out:
for attr in nc_in.ncattrs():
setattr(nc_out, attr, getattr(nc_in, attr))

for dim_name, dim in nc_in.dimensions.items():
if dim_name == OBS_DIM:
nc_out.createDimension(dim_name, kept)
else:
nc_out.createDimension(
dim_name,
None if dim.isunlimited() else len(dim)
)

copy_group(nc_in, nc_out, non_restricted_mask)

print(f" Wrote (non-restricted only): {outfile}")


# ----------------------------------------------------------------------
# Main driver — ALWAYS RUN BOTH FILTERS
# ----------------------------------------------------------------------
def main(stats_yaml):
Comment thread
CoryMartin-NOAA marked this conversation as resolved.
Outdated
Comment thread
CoryMartin-NOAA marked this conversation as resolved.
Outdated
with open(stats_yaml, "r") as f:
stats = yaml.safe_load(f)

input_dir = stats["input directory"]

# --- 1. RSRD filter on current cycle ---
output_nr = os.path.join(os.path.dirname(input_dir), "atmos.nr")
print("\n=== Running RSRD filter (atmos.nr) ===")
process_rsrd_directory(input_dir, output_nr)

# --- Determine whether to run EXPRSRD filter ---
# dev_m = current dev machine
dev_m = None
with open("/lfs/h1/ops/prod/config/prodmachinefile") as f:
for line in f:
if "backup" in line:
parts = line.strip().split(":")
if len(parts) >= 2:
dev_m = parts[1]
break

# this_m = dev machine
with open("/etc/cluster_name") as f:
this_m = f.read().strip()
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main() unconditionally reads /lfs/h1/ops/prod/config/prodmachinefile and /etc/cluster_name without handling missing/unreadable files. Since this script is copied into the general build bin/, this will crash on non-OPS systems. Consider making this an optional CLI flag/env override, and catch FileNotFoundError/OSError to default to skipping EXPRSRD mode with a clear message.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good point

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copilot solution is acceptable, i.e. if the machine can't be identified, don't run the filter.


print(f"\nCluster check: dev_m={dev_m}, this_m={this_m}")

run_exprsrd = (dev_m == this_m)

# --- 2. EXPRSRD filter on previous 48h cycle ---
if run_exprsrd:
prev_dir = get_prev_48h_dir(input_dir)
output_us = os.path.join(os.path.dirname(prev_dir), "atmos.us")
print("\n=== Running EXPRSRD filter (atmos.us) ===")
process_exprsrd_directory(prev_dir, output_us)
else:
print("\n=== Skipping EXPRSRD filter (cluster mismatch) ===")

# ----------------------------------------------------------------------
# Entry point
# ----------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run both RSRD and EXPRSRD filtering for IODA NetCDF files"
)
parser.add_argument("-s", "--stats", required=True,
help="stats.yaml file created by atmos_bufr_prepobs")

args = parser.parse_args()
main(args.stats)

Loading