Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ tests/__pycache__/
tests/data/
tests/*.txt
docs/examples/*/forcings
docs/examples/*/restart
*.txt
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ where the list of `nwm-id`s are the NHD reaches associated with that NextGen hyd
| nwm_file | Path to a text file containing nwm file names. One filename per line. [Tool](#nwm_file) to create this file | :white_check_mark: |
| gpkg_file | Geopackage file to define spatial domain. Use [hfsubset](https://github.com/lynker-spatial/hfsubsetCLI) to generate a geopackage with a `forcing-weights` layer. Accepts local absolute path, s3 URI or URL. Also acceptable is a weights parquet generated with [weights_hf2ds.py](https://github.com/CIROH-UA/forcingprocessor/blob/main/src/forcingprocessor/weights_hf2ds.py), though the plotting option will no longer be available. | :white_check_mark: |
| map_file | Path to a json containing the NWM to NGEN mapping for channel routing data extraction. Absolute path or s3 URI | |
| restart_map_file | Path to a json containing the NWM to NGEN catchment mapping for t-route restart generation. Absolute path or s3 URI | |
| crosswalk_file | Path to a netCDF containing the exact order of the catchments in the t-route restart file. Absolute path or s3 URI | |
| routelink_file | Path to a netCDF containing the NWM channel geometry data, needed for t-route restart generation. Absolute path or s3 URI | |

### 2. Storage

Expand Down
Binary file not shown.
Binary file not shown.
20 changes: 20 additions & 0 deletions docs/examples/troute-restart_example/conf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"forcing" : {
"nwm_file" : "filenamelist.txt",
"restart_map_file" : "hf2.2_subset_cat_map.json",
"crosswalk_file" : "crosswalk_subset.nc",
"routelink_file" : "RouteLink_CONUS_subset.nc"
},

"storage":{
"storage_type" : "local",
"output_path" : "./restart",
"output_file_type" : ["netcdf"]
},

"run" : {
"verbose" : true,
"collect_stats" : true,
"nprocs" : 8
}
}
Binary file not shown.
1 change: 1 addition & 0 deletions docs/examples/troute-restart_example/filenamelist.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
https://noaa-nwm-pds.s3.amazonaws.com/nwm.20250718/analysis_assim/nwm.t00z.analysis_assim.channel_rt.tm00.conus.nc
18 changes: 18 additions & 0 deletions docs/examples/troute-restart_example/hf2.2_subset_cat_map.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"cat-20469": [
166196253.0,
166196257.0,
166196256.0
],
"cat-20479": [
4599061.0,
4599715.0,
166196261.0
],
"cat-19499": [
25068048.0
],
"cat-19630": [
25077314.0
]
}
172 changes: 135 additions & 37 deletions src/forcingprocessor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from forcingprocessor.plot_forcings import plot_ngen_forcings
from forcingprocessor.utils import make_forcing_netcdf, get_window, log_time, convert_url2key, report_usage, nwm_variables, ngen_variables
from forcingprocessor.channel_routing_tools import channelrouting_nwm2ngen, write_netcdf_chrt
from forcingprocessor.troute_restart_tools import create_restart, write_netcdf_restart

B2MB = 1048576

Expand Down Expand Up @@ -790,6 +791,9 @@ def prep_ngen_data(conf):
else: gpkg_files = gpkg_file

map_file_path = conf['forcing'].get("map_file",None)
restart_map_file_path = conf['forcing'].get("restart_map_file", None)
crosswalk_file_path = conf['forcing'].get("crosswalk_file", None)
routelink_file_path = conf['forcing'].get("routelink_file", None)
if map_file_path: # NWM to NGEN channel routing processing requires json map
data_source = "channel_routing"
if "s3://" in map_file_path:
Expand All @@ -799,6 +803,30 @@ def prep_ngen_data(conf):
else:
with open(map_file_path, "r", encoding="utf-8") as map_file:
full_nwm_ngen_map = json.load(map_file)
elif restart_map_file_path:
data_source = "troute_restarts"

if "s3://" in restart_map_file_path:
s3 = s3fs.S3FileSystem(anon=True)
with s3.open(restart_map_file_path, "r") as map_file:
cat_map = json.load(map_file)
else:
with open(restart_map_file_path, "r", encoding="utf-8") as map_file:
cat_map = json.load(map_file)

if "s3://" in crosswalk_file_path:
s3 = s3fs.S3FileSystem(anon=True)
with s3.open(crosswalk_file_path, "rb") as crosswalk_file:
crosswalk_ds = xr.open_dataset(crosswalk_file)
else:
crosswalk_ds = xr.open_dataset(crosswalk_file_path)

if "s3://" in routelink_file_path:
s3 = s3fs.S3FileSystem(anon=True)
with s3.open(routelink_file_path, "rb") as routelink_file:
routelink_ds = xr.open_dataset(routelink_file)
else:
routelink_ds = xr.open_dataset(routelink_file_path)

else:
data_source = "forcings"
Expand All @@ -815,8 +843,8 @@ def prep_ngen_data(conf):
global ii_plot, nts_plot, ngen_vars_plot
ii_plot = conf.get("plot",False)
if ii_plot:
if data_source == "channel_routing":
raise RuntimeError("Plotting not supported for channel routing processing.")
if data_source == "channel_routing" or data_source == "troute_restarts":
raise RuntimeError("Plotting not supported for channel routing or restart processing.")

nts_plot = conf["plot"].get("nts_plot",10)
ngen_vars_plot = conf["plot"].get("ngen_vars",ngen_variables)
Expand Down Expand Up @@ -880,7 +908,7 @@ def prep_ngen_data(conf):
window = [x_max, x_min, y_max, y_min]
weight_time = time.perf_counter() - tw
log_time("CALC_WINDOW_END", log_file)
else:
elif data_source == "channel_routing":
log_time("READMAP_START", log_file)
tw = time.perf_counter()
if ii_verbose:
Expand All @@ -893,6 +921,8 @@ def prep_ngen_data(conf):
nwm_ngen_map[jcatch] = full_nwm_ngen_map[jcatch]
ncatchments = len(nwm_ngen_map)
log_time("READMAP_END", log_file)
else:
ncatchments = 1

log_time("STORE_METADATA_START", log_file)
global forcing_path
Expand All @@ -903,6 +933,8 @@ def prep_ngen_data(conf):
output_path = Path(output_path)
if data_source == "channel_routing":
forcing_path = Path(output_path, 'outputs', 'ngen')
elif data_source == "troute_restarts":
forcing_path = Path(output_path, 'restart')
else:
forcing_path = Path(output_path, 'forcings')
meta_path = Path(output_path, 'metadata')
Expand Down Expand Up @@ -950,26 +982,37 @@ def prep_ngen_data(conf):
# s3://noaa-nwm-pds/nwm.20241029/forcing_short_range/nwm.t00z.short_range.forcing.f001.conus.nc
if data_source == "forcings":
pattern = r"nwm\.(\d{8})/forcing_(\w+)/nwm\.(\w+)(\d{2})z\.\w+\.forcing\.(\w+)(\d{2})\.conus\.nc"
else:
elif data_source == "channel_routing":
pattern = r"nwm\.(\d{8})/(\w+)/nwm\.(\w+)(\d{2})z\.\w+\.channel_rt[^\.]*\.(\w+)(\d{2})\.conus\.nc"
else:
#s3://noaa-nwm-pds/nwm.20241029/analysis_assim/nwm.t16z.analysis_assim.channel_rt.tm00.conus.nc
pattern = r"nwm\.(\d{8})/analysis_assim/nwm\.t(\d{2})z\.analysis_assim\.channel_rt\.tm00\.conus\.nc"
pass

# Extract forecast cycle and lead time from the first and last file names
global URLBASE, FCST_CYCLE, LEAD_START, LEAD_END
match = re.search(pattern, nwm_forcing_files[0])
FCST_CYCLE=None
LEAD_START=None
LEAD_END=None
if match:
URLBASE = match.group(2)
FCST_CYCLE = match.group(3) + match.group(4)
LEAD_START = match.group(5) + match.group(6)
else:
print(f"Could not extract forecast cycle and lead start from the first NWM forcing file: {nwm_forcing_files[0]}")
match = re.search(pattern, nwm_forcing_files[-1])
if match:
LEAD_END = match.group(5) + match.group(6)
if data_source != "troute_restarts":
if match:
URLBASE = match.group(2)
FCST_CYCLE = match.group(3) + match.group(4)
LEAD_START = match.group(5) + match.group(6)
else:
print(f"Could not extract forecast cycle and lead start from the first NWM forcing file: {nwm_forcing_files[0]}")
match = re.search(pattern, nwm_forcing_files[-1])
if match:
LEAD_END = match.group(5) + match.group(6)
else:
print(f"Could not extract lead end from the last NWM forcing file: {nwm_forcing_files[-1]}")
else:
print(f"Could not extract lead end from the last NWM forcing file: {nwm_forcing_files[-1]}")
if match:
restart_date = match.group(1)
restart_hour = match.group(2)
else:
print("Could not extract restart date and time")

# Determine the file system type based on the first NWM forcing file
global fs_type
Expand Down Expand Up @@ -998,47 +1041,88 @@ def prep_ngen_data(conf):
# data_array=data_array[0][None,:]
# t_ax = t_ax
# nwm_data=nwm_data[0][None,:]
if data_source == "forcings":
data_array, t_ax, nwm_data, nwm_file_sizes_MB = multiprocess_data_extract(nwm_forcing_files,nprocs,weights_df,fs)
if data_source == "forcings" or data_source == "channel_routing":
if data_source == "forcings":
data_array, t_ax, nwm_data, nwm_file_sizes_MB = multiprocess_data_extract(nwm_forcing_files,nprocs,weights_df,fs)
else:
data_array, t_ax, nwm_file_sizes_MB = multiprocess_chrt_extract(
nwm_forcing_files,nprocs,nwm_ngen_map,fs)

if datetime.strptime(t_ax[0],'%Y-%m-%d %H:%M:%S') > datetime.strptime(t_ax[-1],'%Y-%m-%d %H:%M:%S'):
# Hack to ensure data is always written out with time moving forward.
t_ax=list(reversed(t_ax))
data_array = np.flip(data_array,axis=0)
tmp = LEAD_START
LEAD_START = LEAD_END
LEAD_END = tmp

t_extract = time.perf_counter() - t0
complexity = (nfiles * ncatchments) / 10000
score = complexity / t_extract
if ii_verbose: print(f'Data extract processs: {nprocs:.2f}\nExtract time: {t_extract:.2f}\nComplexity: {complexity:.2f}\nScore: {score:.2f}\n', end=None,flush=True)

else:
data_array, t_ax, nwm_file_sizes_MB = multiprocess_chrt_extract(
nwm_forcing_files,nprocs,nwm_ngen_map,fs)

if datetime.strptime(t_ax[0],'%Y-%m-%d %H:%M:%S') > datetime.strptime(t_ax[-1],'%Y-%m-%d %H:%M:%S'):
# Hack to ensure data is always written out with time moving forward.
t_ax=list(reversed(t_ax))
data_array = np.flip(data_array,axis=0)
tmp = LEAD_START
LEAD_START = LEAD_END
LEAD_END = tmp

t_extract = time.perf_counter() - t0
complexity = (nfiles * ncatchments) / 10000
score = complexity / t_extract
if ii_verbose: print(f'Data extract processs: {nprocs:.2f}\nExtract time: {t_extract:.2f}\nComplexity: {complexity:.2f}\nScore: {score:.2f}\n', end=None,flush=True)
nwm_file = nwm_forcing_files[0]
nwm_file_sizes_MB = []
if fs_type == 'google':
fs_arg = gcsfs.GCSFileSystem()
elif fs_type == 's3':
fs_arg = s3fs.S3FileSystem(anon=True)
else:
fs_arg = None
if fs_arg:
if nwm_file.find('https://') >= 0:
_, bucket_key = convert_url2key(nwm_file,fs_type)
else:
bucket_key = nwm_file
file_obj = fs_arg.open(bucket_key, mode='rb')
nwm_file_sizes_MB.append(file_obj.details['size'])
elif 'https://' in nwm_file:
response = requests.get(nwm_file, timeout=10)

if response.status_code == 200:
file_obj = BytesIO(response.content)
else:
raise RuntimeError(f"{nwm_file} does not exist")
nwm_file_sizes_MB.append(len(response.content) / B2MB)
else:
file_obj = nwm_file
nwm_file_sizes_MB.append(os.path.getsize(nwm_file) / B2MB)

with xr.open_dataset(file_obj) as nwm_ds:
data_array = create_restart(cat_map, crosswalk_ds, nwm_ds, routelink_ds)

log_time("PROCESSING_END", log_file)

log_time("FILEWRITING_START", log_file)
t0 = time.perf_counter()
if "netcdf" in output_file_type:
if data_source == "forcings":
netcdf_cat_file_sizes_MB = multiprocess_write_netcdf(data_array, jcatchment_dict, t_ax)
else:
elif data_source == "channel_routing":
if FCST_CYCLE is None:
filename = 'qlaterals.nc'
else:
filename = f'ngen.{FCST_CYCLE}z.{URLBASE}.channel_routing.{LEAD_START}_{LEAD_END}.nc'
netcdf_cat_file_sizes_MB = write_netcdf_chrt(
storage_type, forcing_path, data_array, t_ax, filename)
else:
filename = "channel_restart_" + restart_date + "_" + restart_hour + "0000.nc"
netcdf_cat_file_sizes_MB = write_netcdf_restart(
storage_type, forcing_path, data_array, filename
)
# write_netcdf(data_array,"1", t_ax, jcatchment_dict['1'])
if ii_verbose: print(f'Writing catchment forcings to {output_path}!', end=None,flush=True)
if ii_plot or ii_collect_stats or any(x in output_file_type for x in ["csv","parquet","tar"]):
if data_source == "forcings":
forcing_cat_ids, filenames, individual_cat_file_sizes_MB, individual_cat_file_sizes_MB_zipped, tar_buffs = multiprocess_write_df(
data_array,t_ax,list(weights_df.index),nprocs,forcing_path,data_source)
else:
elif data_source == "channel_routing":
forcing_cat_ids, filenames, individual_cat_file_sizes_MB, individual_cat_file_sizes_MB_zipped, tar_buffs = multiprocess_write_df(
data_array,t_ax,list(nwm_ngen_map.keys()),nprocs,forcing_path,data_source)
else:
print("Dataframes don't get written for t-route restarts")

write_time += time.perf_counter() - t0
write_rate = ncatchments / write_time
if ii_verbose: print(f'\n\nWrite processs: {nprocs}\nWrite time: {write_time:.2f}\nWrite rate {write_rate:.2f} files/second\n', end=None,flush=True)
Expand Down Expand Up @@ -1118,7 +1202,7 @@ def prep_ngen_data(conf):
"netcdf_catch_file_size_med_MB" : [netcdf_catch_file_size_med],
"netcdf_catch_file_size_std_MB" : [netcdf_catch_file_size_std]
}
else:
elif data_source == "channel_routing":
metadata = {
"runtime_s" : [round(runtime,2)],
"nvars_intput" : [1],
Expand All @@ -1138,6 +1222,14 @@ def prep_ngen_data(conf):
"netcdf_catch_file_size_med_MB" : [netcdf_catch_file_size_med],
"netcdf_catch_file_size_std_MB" : [netcdf_catch_file_size_std]
}
else:
# metadata for troute restart gen
metadata = {
"runtime_s" : [round(runtime,2)],
"nwmfiles_input" : [len(nwm_forcing_files)],
"nwm_file_size" : [nwm_file_size_avg],
"netcdf_catch_file_size_MB" : [netcdf_catch_file_size_avg],
}

if data_source == "forcings":
data_avg = np.average(data_array,axis=0)
Expand All @@ -1147,14 +1239,18 @@ def prep_ngen_data(conf):
data_med = np.median(data_array,axis=0)
med_df = pd.DataFrame(data_med.T,columns=ngen_variables)
med_df.insert(0,"catchment id",forcing_cat_ids)
else:
elif data_source == "channel_routing":
data_avg = np.average(data_array[:,:,1],axis=0)
avg_df = pd.DataFrame(data_avg.T, columns=['q_lateral'])
avg_df.insert(0,"nexus id",list(nwm_ngen_map.keys()))

data_med = np.median(data_array[:,:,1],axis=0)
med_df = pd.DataFrame(data_med.T,columns=['q_lateral'])
med_df.insert(0,"nexus id",list(nwm_ngen_map.keys()))
else:
# troute restarts won't need stats calculated for them since there's no time axis
avg_df = pd.DataFrame()
med_df = pd.DataFrame()

del data_array

Expand All @@ -1171,8 +1267,10 @@ def prep_ngen_data(conf):
local_metapath = metaf_path

write_df(metadata_df, "metadata.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3)
write_df(avg_df, "catchments_avg.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3)
write_df(med_df, "catchments_median.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3)
if not avg_df.empty:
write_df(avg_df, "catchments_avg.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3)
if not med_df.empty:
write_df(med_df, "catchments_median.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3)

meta_time = time.perf_counter() - t000
log_time("METADATA_END", log_file)
Expand Down
Loading
Loading