Introduce Restriction Screening Scripts. Add RSRD/EXPRSRD Extraction and update CMake#73
Conversation
There was a problem hiding this comment.
Pull request overview
Adds two Python utilities under src/ioda-restrict/ to post-process IODA NetCDF observation files based on restriction metadata, and wires them into the build so they are copied into the build bin/ directory.
Changes:
- Add
check_rsrd.pyto filter/copy NetCDF files based onrestrictionFlag/restrictionExpirationmetadata. - Add
check_exprsrd.pyto extract non-restricted observations from a “previous 48h” directory and write filtered outputs. - Update
src/CMakeLists.txtto copy both scripts into${CMAKE_BINARY_DIR}/bin.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
src/ioda-restrict/check_rsrd.py |
New NetCDF filtering/copy tool based on restriction metadata. |
src/ioda-restrict/check_exprsrd.py |
New tool to compute non-restricted mask (incl. EXPRSRD logic) and write filtered NetCDF outputs. |
src/CMakeLists.txt |
Adds a custom target to copy the new scripts into the build bin/ directory. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| for i in range(len(flag)): | ||
| fval = None if flag.mask[i] else int(flag[i]) | ||
| eval = None if exp.mask[i] else int(exp[i]) |
| # Recursively copy subgroups | ||
| for grp_name, grp_in in in_group.groups.items(): | ||
| grp_out = out_group.createGroup(grp_name) | ||
| copy_group(grp_in, grp_out, mask) | ||
|
|
| print(" Unique RSRD / EXPRSRD patterns:") | ||
|
|
||
| unique_groups = {} | ||
|
|
||
| for i in range(len(flag)): |
| mask = flag.mask & exp.mask | ||
|
|
||
| total = len(mask) | ||
| kept = np.sum(mask) | ||
| dropped = total - kept |
| if fill_value is not None: | ||
| var_out = out_group.createVariable( | ||
| var_name, var_in.dtype, var_in.dimensions, fill_value=fill_value | ||
| ) | ||
| else: | ||
| var_out = out_group.createVariable( | ||
| var_name, var_in.dtype, var_in.dimensions | ||
| ) | ||
|
|
| add_custom_target(copy_restriction ALL | ||
| COMMAND ${CMAKE_COMMAND} -E copy | ||
| ${CMAKE_CURRENT_SOURCE_DIR}/ioda-restrict/check_rsrd.py | ||
| ${CMAKE_BINARY_DIR}/bin/check_rsrd.py | ||
| COMMAND ${CMAKE_COMMAND} -E copy | ||
| ${CMAKE_CURRENT_SOURCE_DIR}/ioda-restrict/check_exprsrd.py | ||
| ${CMAKE_BINARY_DIR}/bin/check_exprsrd.py |
CoryMartin-NOAA
left a comment
There was a problem hiding this comment.
The two check scripts seem to be similar. Could they be combined in to one?
| if not idx_list: | ||
| return [] | ||
| ranges = [] | ||
| start = prev = idx_list[0] |
There was a problem hiding this comment.
I think python is a bit finicky with this sort of thing, and it might change prev when you change start, unless you do a deep copy
Good point. I will dig into both scripts and see how they can be merged into one. |
| # ---------------------------------------------------------------------- | ||
| # Copy entire file unchanged | ||
| # ---------------------------------------------------------------------- | ||
| def copy_entire_file(nc_in, outfile): |
There was a problem hiding this comment.
I think this might be overkill. If you just are copying an entire file, use shutil.copy and just copy it, no need to use the netCDF library
There was a problem hiding this comment.
Pull request overview
This PR introduces a new operational workflow for extracting and screening observation-level restriction metadata from IODA NetCDF files, and makes the script available from the build tree.
Changes:
- Add
ioda_restriction_filter.pyto run both RSRD (current cycle) and EXPRSRD (previous 48h cycle) filtering on directories of.ncfiles. - Add a CMake custom target to copy the restriction filtering script into
build/bin/.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
src/ioda-restrict/ioda_restriction_filter.py |
New Python script implementing directory-level filtering and NetCDF group/variable copying for restriction screening. |
src/CMakeLists.txt |
Adds an always-built custom target to copy the restriction filtering script into the build’s bin/ directory. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| mask = flag.mask & exp.mask | ||
|
|
||
| total = len(mask) | ||
| kept = np.sum(mask) |
There was a problem hiding this comment.
mask = flag.mask & exp.mask has the same scalar-mask problem as elsewhere: if either variable has no masked values, .mask can be a scalar bool and len(mask)/np.sum(mask) will not behave as intended. Normalize to boolean arrays (e.g., via np.ma.getmaskarray) before computing mask and counting kept/dropped.
| mask = flag.mask & exp.mask | |
| total = len(mask) | |
| kept = np.sum(mask) | |
| flag_mask = np.ma.getmaskarray(flag) | |
| exp_mask = np.ma.getmaskarray(exp) | |
| mask = flag_mask & exp_mask | |
| total = mask.size | |
| kept = int(mask.sum()) |
| data = var_in[:] | ||
|
|
||
| if mask is None: | ||
| var_out[:] = data | ||
| elif OBS_DIM in var_in.dimensions: | ||
| if data.ndim == 1: | ||
| var_out[:] = data[mask] | ||
| else: | ||
| var_out[:] = data[mask, ...] | ||
| else: | ||
| var_out[:] = data |
There was a problem hiding this comment.
copy_group reads every variable fully into memory (data = var_in[:]) even when only a subset of locations is needed. For large IODA files this can cause very high memory use and slow runtimes. Prefer slicing directly from the netCDF variable when OBS_DIM is present (read only the kept indices) instead of loading the full array first.
| data = var_in[:] | |
| if mask is None: | |
| var_out[:] = data | |
| elif OBS_DIM in var_in.dimensions: | |
| if data.ndim == 1: | |
| var_out[:] = data[mask] | |
| else: | |
| var_out[:] = data[mask, ...] | |
| else: | |
| var_out[:] = data | |
| if mask is None: | |
| # No restriction: copy entire variable | |
| var_out[:] = var_in[:] | |
| elif OBS_DIM in var_in.dimensions: | |
| # Restricted copy along OBS_DIM; assume OBS_DIM is the leading dimension | |
| if var_in.ndim == 1: | |
| var_out[:] = var_in[mask] | |
| else: | |
| var_out[:] = var_in[mask, ...] | |
| else: | |
| # Variable does not depend on OBS_DIM: copy entire variable | |
| var_out[:] = var_in[:] |
| restricted_mask = ~non_restricted_mask | ||
| restricted_idx = np.where(restricted_mask)[0] |
There was a problem hiding this comment.
restricted_idx is computed but never used, which adds noise and can confuse future maintenance. Remove it or use it for the intended logging/reporting.
| restricted_mask = ~non_restricted_mask | |
| restricted_idx = np.where(restricted_mask)[0] |
| ranges.append(f"{start}–{prev}") | ||
| start = prev = x | ||
| if start == prev: | ||
| ranges.append(f"{start}") | ||
| else: | ||
| ranges.append(f"{start}–{prev}") |
There was a problem hiding this comment.
compress_ranges formats ranges using a Unicode en-dash ("–"). This can cause encoding/display issues in some logs/terminals and makes grepping harder. Prefer an ASCII hyphen ("-") for portability.
| ranges.append(f"{start}–{prev}") | |
| start = prev = x | |
| if start == prev: | |
| ranges.append(f"{start}") | |
| else: | |
| ranges.append(f"{start}–{prev}") | |
| ranges.append(f"{start}-{prev}") | |
| start = prev = x | |
| if start == prev: | |
| ranges.append(f"{start}") | |
| else: | |
| ranges.append(f"{start}-{prev}") |
There was a problem hiding this comment.
I agree with this suggestion
| # copy unified restriction script into build/bin/ | ||
| # ---------------------------------------------------------------------- | ||
| add_custom_target(copy_restriction ALL | ||
| COMMAND ${CMAKE_COMMAND} -E copy |
There was a problem hiding this comment.
The copy_restriction target copies into ${CMAKE_BINARY_DIR}/bin, but it doesn't ensure that directory exists. If oops_FOUND is false (no executables built) or the bin dir hasn't been created yet, the build can fail at this step. Add a preceding cmake -E make_directory ${CMAKE_BINARY_DIR}/bin (and consider copy_if_different to avoid unnecessary rebuilds).
| COMMAND ${CMAKE_COMMAND} -E copy | |
| COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_BINARY_DIR}/bin | |
| COMMAND ${CMAKE_COMMAND} -E copy_if_different |
|
To enable ioda_restriction_filter.py from da-utils/src/ioda-restriction within obsForge, the task scripts Would it be appropriate to include these updates in obsForge push request? |
|
@HyundeokChoi-NOAA yes absolutely. But one thing to think about. @ilianagenkova mentioned that in operations, we don't run the unrestriction code, so it may need to be a separate job rather than part of the existing jobs. Does that make sense? Happy to discuss further if needed |
|
The two scripts (one to create *nr and one to open up the files after EXPRSRD) could stay in one job, but check which machine you are on (dev or prod) to trigger the second scripts. That's how we have it in legacy obsproc, NCO has not complained (not yet). |
… filter either in prod or dev machines
|
@CoryMartin-NOAA @ilianagenkova I added this block ahead of the EXPRSRD filter step and confirmed that it behaves as expected. This filter does not run on the production machine. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ranges.append(f"{start}–{prev}") | ||
| start = x | ||
| prev = x | ||
| if start == prev: | ||
| ranges.append(f"{start}") | ||
| else: | ||
| ranges.append(f"{start}–{prev}") |
There was a problem hiding this comment.
compress_ranges uses an en-dash character ("–") in the range strings. This can render oddly or break downstream parsing/log collection that assumes ASCII. Prefer a plain hyphen ("-") unless Unicode output is explicitly required.
| ranges.append(f"{start}–{prev}") | |
| start = x | |
| prev = x | |
| if start == prev: | |
| ranges.append(f"{start}") | |
| else: | |
| ranges.append(f"{start}–{prev}") | |
| ranges.append(f"{start}-{prev}") | |
| start = x | |
| prev = x | |
| if start == prev: | |
| ranges.append(f"{start}") | |
| else: | |
| ranges.append(f"{start}-{prev}") |
| print(" Missing restriction variables — copying unchanged.") | ||
| copy_entire_file(infile, outfile) | ||
| continue | ||
|
|
||
| flag = md["restrictionFlag"][:] | ||
| exp = md["restrictionExpiration"][:] | ||
|
|
||
| if flag.size == 0 or exp.size == 0: | ||
| print(" Restriction arrays zero length — copying unchanged.") | ||
| copy_entire_file(infile, outfile) | ||
| continue | ||
|
|
||
| flag_mask = np.ma.getmaskarray(flag) | ||
| exp_mask = np.ma.getmaskarray(exp) | ||
| mask = flag_mask & exp_mask | ||
|
|
There was a problem hiding this comment.
When restriction variables are missing, the script copies the input file unchanged into the supposedly filtered output directory. If this tool is used as a restriction screen, passing data through unscreened can defeat the purpose and may leak restricted observations. Consider failing fast (non-zero exit), or at least skipping output / writing an empty filtered file when the screening fields are unavailable.
| print(" Missing restriction variables — copying unchanged.") | |
| copy_entire_file(infile, outfile) | |
| continue | |
| flag = md["restrictionFlag"][:] | |
| exp = md["restrictionExpiration"][:] | |
| if flag.size == 0 or exp.size == 0: | |
| print(" Restriction arrays zero length — copying unchanged.") | |
| copy_entire_file(infile, outfile) | |
| continue | |
| flag_mask = np.ma.getmaskarray(flag) | |
| exp_mask = np.ma.getmaskarray(exp) | |
| mask = flag_mask & exp_mask | |
| print(" Missing restriction variables — writing empty restricted file.") | |
| mask = np.zeros(len(loc_dim), dtype=bool) | |
| else: | |
| flag = md["restrictionFlag"][:] | |
| exp = md["restrictionExpiration"][:] | |
| if flag.size == 0 or exp.size == 0: | |
| print(" Restriction arrays zero length — writing empty restricted file.") | |
| mask = np.zeros(len(loc_dim), dtype=bool) | |
| else: | |
| flag_mask = np.ma.getmaskarray(flag) | |
| exp_mask = np.ma.getmaskarray(exp) | |
| mask = flag_mask & exp_mask |
| with open("/lfs/h1/ops/prod/config/prodmachinefile") as f: | ||
| for line in f: | ||
| if "backup" in line: | ||
| parts = line.strip().split(":") | ||
| if len(parts) >= 2: | ||
| dev_m = parts[1] | ||
| break | ||
|
|
||
| # this_m = dev machine | ||
| with open("/etc/cluster_name") as f: | ||
| this_m = f.read().strip() |
There was a problem hiding this comment.
main() unconditionally reads /lfs/h1/ops/prod/config/prodmachinefile and /etc/cluster_name without handling missing/unreadable files. Since this script is copied into the general build bin/, this will crash on non-OPS systems. Consider making this an optional CLI flag/env override, and catch FileNotFoundError/OSError to default to skipping EXPRSRD mode with a clear message.
There was a problem hiding this comment.
This is a really good point
There was a problem hiding this comment.
The copilot solution is acceptable, i.e. if the machine can't be identified, don't run the filter.
| print(" Missing restriction variables — copying unchanged.") | ||
| copy_entire_file(infile, outfile) |
There was a problem hiding this comment.
Same issue as above for EXPRSRD mode: if restriction variables are missing the script copies the file unchanged into atmos.us. For a screening/filtering script this is unsafe; prefer skipping output or failing so restricted data can't silently pass through.
| print(" Missing restriction variables — copying unchanged.") | |
| copy_entire_file(infile, outfile) | |
| print(" Missing restriction variables — no output will be written for this file.") |
|
This PR is required for obsForge PR #205, and I believe it is ready for merge. Please review when you have a moment. Thank you. |
CoryMartin-NOAA
left a comment
There was a problem hiding this comment.
Thanks @HyundeokChoi-NOAA
|
@CoryMartin-NOAA @ilianagenkova Thank you all!!!! |
This PR adds full support for extracting and screening observation‑level restriction metadata.
It adds two operational scripts for extracting and filtering restricted data.