Skip to content
9 changes: 9 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,12 @@ if ( oops_FOUND )
add_subdirectory( add_increment )
endif()
endif()

# ----------------------------------------------------------------------
# copy unified restriction script into build/bin/
# ----------------------------------------------------------------------
add_custom_target(copy_restriction ALL
COMMAND ${CMAKE_COMMAND} -E copy
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copy_restriction target copies into ${CMAKE_BINARY_DIR}/bin, but it doesn't ensure that directory exists. If oops_FOUND is false (no executables built) or the bin dir hasn't been created yet, the build can fail at this step. Add a preceding cmake -E make_directory ${CMAKE_BINARY_DIR}/bin (and consider copy_if_different to avoid unnecessary rebuilds).

Suggested change
COMMAND ${CMAKE_COMMAND} -E copy
COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_BINARY_DIR}/bin
COMMAND ${CMAKE_COMMAND} -E copy_if_different

Copilot uses AI. Check for mistakes.
${CMAKE_CURRENT_SOURCE_DIR}/ioda-restrict/ioda_restriction_filter.py
${CMAKE_BINARY_DIR}/bin/ioda_restriction_filter.py
)
Comment thread
CoryMartin-NOAA marked this conversation as resolved.
341 changes: 341 additions & 0 deletions src/ioda-restrict/ioda_restriction_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
#!/usr/bin/env python3

from netCDF4 import Dataset
import numpy as np
import argparse
import yaml
import os
import glob
from datetime import datetime, timedelta

OBS_DIM = "Location"


# ----------------------------------------------------------------------
# Compress consecutive index ranges (used only in exprsrd mode)
# ----------------------------------------------------------------------
def compress_ranges(idx_list):
if not idx_list:
return []
ranges = []
start = prev = idx_list[0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think python is a bit finicky with this sort of thing, and it might change prev when you change start, unless you do a deep copy

for x in idx_list[1:]:
if x == prev + 1:
prev = x
else:
if start == prev:
ranges.append(f"{start}")
else:
ranges.append(f"{start}–{prev}")
start = prev = x
if start == prev:
ranges.append(f"{start}")
else:
ranges.append(f"{start}–{prev}")
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compress_ranges formats ranges using a Unicode en-dash ("–"). This can cause encoding/display issues in some logs/terminals and makes grepping harder. Prefer an ASCII hyphen ("-") for portability.

Suggested change
ranges.append(f"{start}{prev}")
start = prev = x
if start == prev:
ranges.append(f"{start}")
else:
ranges.append(f"{start}{prev}")
ranges.append(f"{start}-{prev}")
start = prev = x
if start == prev:
ranges.append(f"{start}")
else:
ranges.append(f"{start}-{prev}")

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this suggestion

return ranges


# ----------------------------------------------------------------------
# Recursively copy groups and variables
# ----------------------------------------------------------------------
def copy_group(in_group, out_group, mask):
for var_name, var_in in in_group.variables.items():
fill_value = getattr(var_in, "_FillValue", None)

if fill_value is not None:
var_out = out_group.createVariable(
var_name, var_in.dtype, var_in.dimensions, fill_value=fill_value
)
else:
var_out = out_group.createVariable(
var_name, var_in.dtype, var_in.dimensions
)

Comment thread
CoryMartin-NOAA marked this conversation as resolved.
data = var_in[:]

if mask is None:
var_out[:] = data
elif OBS_DIM in var_in.dimensions:
if data.ndim == 1:
var_out[:] = data[mask]
else:
var_out[:] = data[mask, ...]
else:
var_out[:] = data
Comment on lines +61 to +71
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy_group reads every variable fully into memory (data = var_in[:]) even when only a subset of locations is needed. For large IODA files this can cause very high memory use and slow runtimes. Prefer slicing directly from the netCDF variable when OBS_DIM is present (read only the kept indices) instead of loading the full array first.

Suggested change
data = var_in[:]
if mask is None:
var_out[:] = data
elif OBS_DIM in var_in.dimensions:
if data.ndim == 1:
var_out[:] = data[mask]
else:
var_out[:] = data[mask, ...]
else:
var_out[:] = data
if mask is None:
# No restriction: copy entire variable
var_out[:] = var_in[:]
elif OBS_DIM in var_in.dimensions:
# Restricted copy along OBS_DIM; assume OBS_DIM is the leading dimension
if var_in.ndim == 1:
var_out[:] = var_in[mask]
else:
var_out[:] = var_in[mask, ...]
else:
# Variable does not depend on OBS_DIM: copy entire variable
var_out[:] = var_in[:]

Copilot uses AI. Check for mistakes.

for attr in var_in.ncattrs():
if attr != "_FillValue":
setattr(var_out, attr, getattr(var_in, attr))

for grp_name, grp_in in in_group.groups.items():
grp_out = out_group.createGroup(grp_name)
copy_group(grp_in, grp_out, mask)

Comment thread
CoryMartin-NOAA marked this conversation as resolved.
Comment thread
CoryMartin-NOAA marked this conversation as resolved.

# ----------------------------------------------------------------------
# Copy entire file unchanged
# ----------------------------------------------------------------------
def copy_entire_file(nc_in, outfile):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be overkill. If you just are copying an entire file, use shutil.copy and just copy it, no need to use the netCDF library

with Dataset(outfile, "w") as nc_out:
for attr in nc_in.ncattrs():
setattr(nc_out, attr, getattr(nc_in, attr))

for dim_name, dim in nc_in.dimensions.items():
nc_out.createDimension(
dim_name,
None if dim.isunlimited() else len(dim)
)

copy_group(nc_in, nc_out, mask=None)

print(f" Wrote (unchanged): {outfile}")


# ----------------------------------------------------------------------
# Extract date from path (exprsrd mode)
# ----------------------------------------------------------------------
def extract_date_from_path(input_dir):
prefixes = ["gfs.", "gdas.", "gcdas."]
for p in prefixes:
idx = input_dir.find(p)
if idx != -1:
date_start = idx + len(p)
date_str = input_dir[date_start:date_start + 8]
return p, date_start, date_str
raise ValueError(f"Could not find gfs., gdas., or gcdas. in path: {input_dir}")


def get_prev_48h_dir(input_dir):
prefix, date_start, date_str = extract_date_from_path(input_dir)
cur_date = datetime.strptime(date_str, "%Y%m%d")
prev_date = cur_date - timedelta(hours=48)
prev_date_str = prev_date.strftime("%Y%m%d")

return (
input_dir[:date_start]
+ prev_date_str
+ input_dir[date_start + 8:]
)


# ----------------------------------------------------------------------
# Compute non-restricted mask (exprsrd mode)
# ----------------------------------------------------------------------
def compute_nonrestricted_mask(flag, exp):
flag = np.ma.array(flag)
exp = np.ma.array(exp)

n = len(flag)
non_restricted = np.zeros(n, dtype=bool)

rsrd_missing = flag.mask
expr_missing = exp.mask
rsrd_present = ~flag.mask
expr_present = ~exp.mask

missing_either = rsrd_missing | expr_missing
exception_restricted = rsrd_present & expr_missing
nr_missing = missing_either & ~exception_restricted
non_restricted |= nr_missing
Comment thread
CoryMartin-NOAA marked this conversation as resolved.

expired = rsrd_present & expr_present & (exp == 48)
non_restricted |= expired

return non_restricted


# ----------------------------------------------------------------------
# Mode 1: RSRD filtering (atmos.nr)
# ----------------------------------------------------------------------
def process_rsrd_directory(input_dir, output_dir):
print(f"[RSRD] Input directory: {input_dir}")
print(f"[RSRD] Output directory: {output_dir}")

os.makedirs(output_dir, exist_ok=True)
nc_files = sorted(glob.glob(os.path.join(input_dir, "*.nc")))

if not nc_files:
print("[RSRD] No .nc files found.")
return

for infile in nc_files:
fname = os.path.basename(infile)
outfile = os.path.join(output_dir, fname)

print(f"\n[RSRD] Processing {fname}")

with Dataset(infile, "r") as nc_in:
loc_dim = nc_in.dimensions.get(OBS_DIM)

if loc_dim is None or (loc_dim.isunlimited() and len(loc_dim) == 0):
print(" No valid Location dimension — copying unchanged.")
copy_entire_file(nc_in, outfile)
continue

md = nc_in.groups.get("MetaData", None)
if md is None or "restrictionFlag" not in md.variables or "restrictionExpiration" not in md.variables:
print(" Missing restriction variables — copying unchanged.")
copy_entire_file(nc_in, outfile)
continue

flag = md["restrictionFlag"][:]
exp = md["restrictionExpiration"][:]

if flag.size == 0 or exp.size == 0:
print(" Restriction arrays zero length — copying unchanged.")
copy_entire_file(nc_in, outfile)
continue

mask = flag.mask & exp.mask

Comment on lines +169 to +185
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When restriction variables are missing, the script copies the input file unchanged into the supposedly filtered output directory. If this tool is used as a restriction screen, passing data through unscreened can defeat the purpose and may leak restricted observations. Consider failing fast (non-zero exit), or at least skipping output / writing an empty filtered file when the screening fields are unavailable.

Suggested change
print(" Missing restriction variables — copying unchanged.")
copy_entire_file(infile, outfile)
continue
flag = md["restrictionFlag"][:]
exp = md["restrictionExpiration"][:]
if flag.size == 0 or exp.size == 0:
print(" Restriction arrays zero length — copying unchanged.")
copy_entire_file(infile, outfile)
continue
flag_mask = np.ma.getmaskarray(flag)
exp_mask = np.ma.getmaskarray(exp)
mask = flag_mask & exp_mask
print(" Missing restriction variables — writing empty restricted file.")
mask = np.zeros(len(loc_dim), dtype=bool)
else:
flag = md["restrictionFlag"][:]
exp = md["restrictionExpiration"][:]
if flag.size == 0 or exp.size == 0:
print(" Restriction arrays zero length — writing empty restricted file.")
mask = np.zeros(len(loc_dim), dtype=bool)
else:
flag_mask = np.ma.getmaskarray(flag)
exp_mask = np.ma.getmaskarray(exp)
mask = flag_mask & exp_mask

Copilot uses AI. Check for mistakes.
total = len(mask)
kept = np.sum(mask)
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mask = flag.mask & exp.mask has the same scalar-mask problem as elsewhere: if either variable has no masked values, .mask can be a scalar bool and len(mask)/np.sum(mask) will not behave as intended. Normalize to boolean arrays (e.g., via np.ma.getmaskarray) before computing mask and counting kept/dropped.

Suggested change
mask = flag.mask & exp.mask
total = len(mask)
kept = np.sum(mask)
flag_mask = np.ma.getmaskarray(flag)
exp_mask = np.ma.getmaskarray(exp)
mask = flag_mask & exp_mask
total = mask.size
kept = int(mask.sum())

Copilot uses AI. Check for mistakes.
dropped = total - kept

print(f" Total obs: {total}")
print(f" Kept obs: {kept}")
print(f" Dropped obs: {dropped}")

with Dataset(outfile, "w") as nc_out:
for attr in nc_in.ncattrs():
setattr(nc_out, attr, getattr(nc_in, attr))

for dim_name, dim in nc_in.dimensions.items():
if dim_name == OBS_DIM:
nc_out.createDimension(dim_name, kept)
else:
nc_out.createDimension(
dim_name,
None if dim.isunlimited() else len(dim)
)
Comment thread
CoryMartin-NOAA marked this conversation as resolved.

copy_group(nc_in, nc_out, mask)

print(f" Wrote: {outfile}")


# ----------------------------------------------------------------------
# Mode 2: EXPRSRD / Non-restricted filtering (atmos.us)
# ----------------------------------------------------------------------
def process_exprsrd_directory(prev_dir, output_dir):
print(f"[NonRestrict] Previous directory: {prev_dir}")
print(f"[NonRestrict] Output directory: {output_dir}")

os.makedirs(output_dir, exist_ok=True)
nc_files = sorted(glob.glob(os.path.join(prev_dir, "*.nc")))

if not nc_files:
print("[NonRestrict] No .nc files found.")
return

for infile in nc_files:
fname = os.path.basename(infile)
outfile = os.path.join(output_dir, fname)

print(f"\n[NonRestrict] Processing {fname}")

with Dataset(infile, "r") as nc_in:
loc_dim = nc_in.dimensions.get(OBS_DIM)
if loc_dim is None or (loc_dim.isunlimited() and len(loc_dim) == 0):
print(" No valid Location dimension — copying unchanged.")
copy_entire_file(nc_in, outfile)
continue

md = nc_in.groups.get("MetaData", None)
if md is None or "restrictionFlag" not in md.variables or "restrictionExpiration" not in md.variables:
print(" Missing restriction variables — copying unchanged.")
copy_entire_file(nc_in, outfile)
continue

flag = md["restrictionFlag"][:]
exp = md["restrictionExpiration"][:]

non_restricted_mask = compute_nonrestricted_mask(flag, exp)
restricted_mask = ~non_restricted_mask
restricted_idx = np.where(restricted_mask)[0]
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restricted_idx is computed but never used, which adds noise and can confuse future maintenance. Remove it or use it for the intended logging/reporting.

Suggested change
restricted_mask = ~non_restricted_mask
restricted_idx = np.where(restricted_mask)[0]

Copilot uses AI. Check for mistakes.

total = len(flag)
kept = int(np.sum(non_restricted_mask))
dropped = total - kept

print(f" Total obs: {total}")
print(f" Non-restricted obs: {kept}")
print(f" Restricted obs: {dropped}")

print(" Unique RSRD / EXPRSRD patterns:")
unique_groups = {}

for i in range(len(flag)):
fval = None if flag.mask[i] else int(flag[i])
eval = None if exp.mask[i] else int(exp[i])
key = (fval, eval)
unique_groups.setdefault(key, []).append(i)

for (fval, eval), idx_list in unique_groups.items():
idx_list_sorted = sorted(idx_list)
compressed = compress_ranges(idx_list_sorted)
count = len(idx_list_sorted)

ftxt = fval if fval is not None else "--"
etxt = eval if eval is not None else "--"

print(f" RSRD = {ftxt}, EXPRSRD = {etxt}")
print(f" idx ({count}) = {compressed}")
print()

if kept == 0:
print(" No non-restricted obs — skipping output.")
continue

with Dataset(outfile, "w") as nc_out:
for attr in nc_in.ncattrs():
setattr(nc_out, attr, getattr(nc_in, attr))

for dim_name, dim in nc_in.dimensions.items():
if dim_name == OBS_DIM:
nc_out.createDimension(dim_name, kept)
else:
nc_out.createDimension(
dim_name,
None if dim.isunlimited() else len(dim)
)

copy_group(nc_in, nc_out, non_restricted_mask)

print(f" Wrote (non-restricted only): {outfile}")


# ----------------------------------------------------------------------
# Main driver — ALWAYS RUN BOTH FILTERS
# ----------------------------------------------------------------------
def main(stats_yaml):
Comment thread
CoryMartin-NOAA marked this conversation as resolved.
Outdated
Comment thread
CoryMartin-NOAA marked this conversation as resolved.
Outdated
with open(stats_yaml, "r") as f:
stats = yaml.safe_load(f)

input_dir = stats["input directory"]

# --- 1. RSRD filter on current cycle ---
output_nr = os.path.join(os.path.dirname(input_dir), "atmos.nr")
print("\n=== Running RSRD filter (atmos.nr) ===")
process_rsrd_directory(input_dir, output_nr)

# --- 2. EXPRSRD filter on previous 48h cycle ---
prev_dir = get_prev_48h_dir(input_dir)
output_us = os.path.join(os.path.dirname(prev_dir), "atmos.us")
print("\n=== Running EXPRSRD filter (atmos.us) ===")
process_exprsrd_directory(prev_dir, output_us)


# ----------------------------------------------------------------------
# Entry point
# ----------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run both RSRD and EXPRSRD filtering for IODA NetCDF files"
)
parser.add_argument("-s", "--stats", required=True,
help="stats.yaml file created by atmos_bufr_prepobs")

args = parser.parse_args()
main(args.stats)

Loading