diff --git a/fre/pp/split_netcdf_script.py b/fre/pp/split_netcdf_script.py index ac70815f6..ed5626784 100644 --- a/fre/pp/split_netcdf_script.py +++ b/fre/pp/split_netcdf_script.py @@ -1,326 +1,368 @@ - #!/bin/python +''' +Split NetCDF files by variable -# Split NetCDF files by variable -# -# Can be tiled or not. Component is optional, defaults to all. -# -# Input format: date.component(.tileX).nc -# Output format: date.component.var(.tileX).nc +Can be tiled or not. Component is optional, defaults to all. +Input format: date.component(.tileX).nc +Output format: date.component.var(.tileX).nc +''' + + +import logging import os -from os import path -import subprocess +from pathlib import Path import re import sys + import xarray as xr -from pathlib import Path import yaml -from itertools import chain -import logging from fre.app.helpers import get_variables - fre_logger = logging.getLogger(__name__) -#These are patterns used to match known kinds of metadata-like variables -#in netcdf files -#*_bnds, *_bounds: bounds variables. Defines the edges of a coordinate var -#*_offset: i and j offsets. Constants added to a coordinate var to get -# actual coordinate values, used to compress data -#*_average: calculated averages for a variable. -#These vars may also be covered by the var_shortvars query, but it doesn't -#hurt to double-check. +# These are patterns used to match known kinds of metadata-like variables +# in netcdf files +# *_bnds, *_bounds: bounds variables. Defines the edges of a coordinate var +# *_offset: i and j offsets. Constants added to a coordinate var to get +# actual coordinate values, used to compress data +# *_average: calculated averages for a variable. +# These vars may also be covered by the var_shortvars query, but it doesn't +# hurt to double-check. VAR_PATTERNS = ["_bnds", "_bounds", "_offset", "average_"] -def split_netcdf(inputDir, outputDir, component, history_source, use_subdirs, - yamlfile, split_all_vars=False): - ''' - Given a directory of netcdf files, splits those netcdf files into separate - files for each data variable and copies the data variable files of interest - to the output directory - - Intended to work with data structured for fre-workflows and fre-workflows file naming conventions - - Sample infile name convention: "19790101.atmos_tracer.tile6.nc" - - :param inputDir: directory containing netcdf files - :type inputDir: string - :param outputDir: directory to which to write netcdf files - :type outputDir: string - :param component: the 'component' element we are currently working with in the yaml - :type component: string - :param history_source: a history_file under a 'source' under the 'component' that we are working with. Is used to identify the files in inputDir. - :type history_source: string - :param use_subdirs: whether to recursively search through inputDir under the subdirectories. Used when regridding. - :type use_subdirs: boolean - :param yamlfile: - a .yml config file for fre postprocessing - :type yamlfile: string - :param split_all_vars: Whether to skip parsing the yamlfile and split all available vars in the file. Defaults to False. - :type split_all_vars: boolean - ''' - - #Verify input/output dirs exist and are dirs - if not (os.path.isdir(inputDir)): - fre_logger.error(f"error: input dir {inputDir} does not exist or is not a directory") - raise OSError(f"error: input dir {inputDir} does not exist or is not a directory") - if not (os.path.isdir(outputDir)): - if os.path.isfile(outputDir): - fre_logger.error(f"error: output dir {outputDir} is a file. Please specify a directory.") - else: - if not os.access(outputDir, os.W_OK): - fre_logger.error(f"error: cannot write to output dir {outputDir}") - - curr_dir = os.getcwd() - workdir = os.path.abspath(inputDir) - - #note to self: if CYLC_TASK_PARAM_component isn't doing what we think it's - #doing, we can also use history_source to get the component but it's - #going to be a bit of a pain - if split_all_vars: - varlist = "all" - else: - ydict = yaml.safe_load(Path(yamlfile).read_text()) - vardict = get_variables(ydict, component) - if vardict is None or history_source not in vardict.keys(): - fre_logger.error(f"error: either component {component} not defined or source {history_source} not defined under component {component} in yamlfile {yamlfile}.") - raise ValueError(f"error: either component {component} not defined or source {history_source} not defined under component {component} in yamlfile {yamlfile}.") +def get_file_regex( history_source: str = None ): + ''' + give back a file regex given a history source. all files that contain the current source:history_file name, + 0-1 instances of "tile" and end in .nc under most circumstances, this should match 1 file + older regex for this would be f'*.{history_source}?(.tile?).nc' and f'*.{history_source}*.*.nc' seperately. + the returned regex counts for both. glob.glob is NOT sufficient for this. Since this needs to match both + '00020101.atmos_level_cmip.tile4.nc' and '00020101.ocean_cobalt_omip_2d.nc' + + :param history_source: usually a component string found in history filenames + :type history_source: string + ''' + if history_source is None: + raise ValueError('history_source cannot be none') + return f'.*{history_source}(\\.tile.*)?.nc' + +def split_netcdf( input_dir, + output_dir, + component, + history_source, + use_subdirs, + yamlfile, + split_all_vars = False ): + ''' + Given a directory of netcdf files, splits those netcdf files into separate + files for each data variable and copies the data variable files of interest + to the output directory + + Intended to work with data structured for fre-workflows and fre-workflows file naming conventions + - Sample infile name convention: "19790101.atmos_tracer.tile6.nc" + + :param input_dir: directory containing netcdf files + :type input_dir: string + :param output_dir: directory to which to write netcdf files + :type output_dir: string + :param component: the 'component' element we are currently working with in the yaml + :type component: string + :param history_source: a history_file under a 'source' under the 'component' that we are working with. Is used to + identify the files in input_dir. + :type history_source: string + :param use_subdirs: whether to recursively search through input_dir under the subdirectories. Used when regridding. + :type use_subdirs: boolean + :param yamlfile: - a .yml config file for fre postprocessing + :type yamlfile: string + :param split_all_vars: Whether to skip parsing the yamlfile and split all available vars in the file. Defaults to + False. + :type split_all_vars: boolean + ''' + fre_logger.debug('using re version %s', re.__version__) + fre_logger.debug('using re spec %s', re.__spec__) + + # Verify input/output dirs exist and are dirs + if not os.path.isdir(input_dir): + raise OSError(f"error: input dir {input_dir} does not exist or is not a directory") + + if not os.path.isdir(output_dir): + if os.path.isfile(output_dir): + fre_logger.error("error: output dir %s is a file. Please specify a directory.", output_dir) else: - varlist = vardict[history_source] - - #extend globbing used to find both tiled and non-tiled files - #all files that contain the current source:history_file name, - #0-1 instances of "tile" and end in .nc - #under most circumstances, this should match 1 file - #older regex - not currently working - #file_regex = f'*.{history_source}?(.tile?).nc' - #file_regex = f'*.{history_source}*.*.nc' - #glob.glob is NOT sufficient for this. It needs to match: - # '00020101.atmos_level_cmip.tile4.nc' - # '00020101.ocean_cobalt_omip_2d.nc' - file_regex = f'.*{history_source}(\\.tile.*)?.nc' - - #If in sub-dir mode, process the sub-directories instead of the main one - # and write to $outputdir/$subdir - if use_subdirs: - subdirs = [el for el in os.listdir(workdir) if os.path.isdir(os.path.join(workdir,el))] - num_subdirs = len(subdirs) - fre_logger.info(f"checking {num_subdirs} under {workdir}") - files_split = 0 - sd_string = ",".join(subdirs) - for sd in subdirs: - sdw = os.path.join(workdir,sd) - files=[os.path.join(sdw,el) for el in os.listdir(sdw) if re.match(file_regex, el) is not None] - if len(files) == 0: - fre_logger.info(f"No input files found; skipping subdir {subdir}") - else: - output_subdir = os.path.join(os.path.abspath(outputDir), sd) - if not os.path.isdir(output_subdir): - os.mkdir(output_subdir) + if not os.access(output_dir, os.W_OK): + fre_logger.error("error: cannot write to output dir %s", output_dir) + + workdir = os.path.abspath(input_dir) + + # note to self: if CYLC_TASK_PARAM_component isn't doing what we think it's + # doing, we can also use history_source to get the component but it's + # going to be a bit of a pain + if split_all_vars: + varlist = "all" + else: + ydict = yaml.safe_load( Path( + yamlfile ).read_text( encoding = 'UTF-8') ) + vardict = get_variables(ydict, component) + if vardict is None or history_source not in vardict.keys(): + raise ValueError( f"error: either component {component} not defined or source {history_source} not" + f" defined under component {component} in yamlfile {yamlfile}." ) + varlist = vardict[history_source] + + # extend globbing used to find both tiled and non-tiled files + file_regex = get_file_regex(history_source) # f'.*{history_source}(\\.tile.*)?.nc' + fre_logger.debug('file_regex = %s', file_regex) + + # If in sub-dir mode, process the sub-directories instead of the main one + # and write to $outputdir/$subdir + if use_subdirs: + subdirs = [ el for el in os.listdir(workdir) if os.path.isdir(os.path.join(workdir,el)) ] + num_subdirs = len(subdirs) + + fre_logger.info("checking %s under %s", num_subdirs, workdir) + files_split = 0 + sd_string = ",".join(subdirs) + for sd in subdirs: + sdw = os.path.join(workdir,sd) + + #files=[os.path.join(sdw,el) for el in os.listdir(sdw) if re.match(file_regex, el) is not None] + files = [] + for el in os.listdir(sdw): + fre_logger.debug('el = %s', el) + if any( [ re.match(file_regex, el) is not None, + f".{history_source}." in el ] ): + fre_logger.debug('appending file %s', os.path.join(sdw,el) ) + files.append( os.path.join(sdw,el) ) + + fre_logger.debug('full contents of {sdw} are %s', os.listdir(sdw)) + if len(files) == 0: + fre_logger.info("No input files found; skipping subdir %s", sd) + continue + + output_subdir = os.path.join(os.path.abspath(output_dir), sd) + if not os.path.isdir(output_subdir): + os.mkdir(output_subdir) + for infile in files: + split_file_xarray(infile, output_subdir, varlist) + files_split += 1 + + fre_logger.info("%s files split", files_split) + if files_split == 0: + raise FileNotFoundError(f"error: no files found in dirs {sd_string} under {workdir}" + f" that match pattern {file_regex}; no splitting took place" + f"contents of workdir={workdir} are: {os.listdir(workdir)}" ) + else: + #files=[ os.path.join(workdir, el) for el in os.listdir(workdir) if re.match(file_regex, el) is not None] + files = [] + for el in os.listdir(workdir): + fre_logger.debug('el = %s', el) + #if re.match(file_regex, el) is not None: + if any( [ re.match(file_regex, el) is not None, + f".{history_source}." in el ] ): + fre_logger.debug('appending file %s', os.path.join(workdir, el) ) + files.append( os.path.join(workdir, el) ) + + if len(files) == 0: + raise FileNotFoundError(f"contents of workdir={workdir} are: {os.listdir(workdir)}" + f"no files found in {workdir} matching {file_regex}; no splitting took place") + + # Split the files by variable for infile in files: - split_file_xarray(infile, output_subdir, varlist) - files_split += 1 - fre_logger.info(f"{files_split} files split") - if files_split == 0: - fre_logger.error(f"error: no files found in dirs {sd_string} under {workdir} that match pattern {file_regex}; no splitting took place") - raise OSError - else: - files=[os.path.join(workdir, el) for el in os.listdir(workdir) if re.match(file_regex, el) is not None] - # Split the files by variable - for infile in files: - split_file_xarray(infile, os.path.abspath(outputDir), varlist) - if len(files) == 0: - fre_logger.error(f"error: no files found in {workdir} that match pattern {file_regex}; no splitting took place") - raise OSError - - fre_logger.info("split-netcdf-wrapper call complete") - sys.exit(0) #check this - -def split_file_xarray(infile, outfiledir, var_list='all'): - ''' - Given a netcdf infile containing one or more data variables, - writes out a separate file for each data variable in the file, including the - variable name in the filename. - if var_list if specified, only the vars in var_list are written to file; - if no vars in the file match the vars in var_list, no files are written. - - :param infile: input netcdf file - :type infile: string - :param outfiledir: writeable directory to which to write netcdf files - :type outfiledir: string - :param var_list: python list of string variable names or a string "all" - :type var_list: list of strings - ''' - if not os.path.isdir(outfiledir): - fre_logger.info("creating output directory") - os.makedirs(outfiledir) - - if not os.path.isfile(infile): - fre_logger.error(f"error: input file {infile} not found. Please check the path.") - raise OSError(f"error: input file {infile} not found. Please check the path.") - - dataset = xr.load_dataset(infile, decode_cf=False, decode_times=False, decode_coords="all") - allvars = dataset.data_vars.keys() - - #If you have a file of 3 or more dim vars, 2d-or-fewer vars are likely to be - #metadata vars; if your file is 2d vars, 1d vars are likely to be metadata. - max_ndims = get_max_ndims(dataset) - if max_ndims >= 3: - varsize = 2 - else: - varsize = 1 - #note: netcdf dimensions and xarray coords are NOT ALWAYS THE SAME THING. - #If they were, I could get away with the following: - #var_zerovars = [v for v in datavars if not len(dataset[v].coords) > 0]) - #instead of this: - var_shortvars = [v for v in allvars if (len(dataset[v].shape) <= varsize) and v not in dataset._coord_names] - #having a variable listed as both a metadata var and a coordinate var seems to - #lead to the weird adding a _FillValue behavior - fre_logger.info(f"var patterns: {VAR_PATTERNS}") - fre_logger.info(f"1 or 2-d vars: {var_shortvars}") - #both combined gets you a decent list of non-diagnostic variables - var_exclude = list(set(VAR_PATTERNS + [str(el) for el in var_shortvars] )) - def matchlist(xstr): - ''' checks a string for matches in a list of patterns - - xstr: string to search for matches - var_exclude: list of patterns defined in VAR_EXCLUDE''' - allmatch = [re.search(el, xstr)for el in var_exclude] - #If there's at least one match in the var_exclude list (average_bnds is OK) - return len(list(set(allmatch))) > 1 - metavars = [el for el in allvars if matchlist(el)] - datavars = [el for el in allvars if not matchlist(el)] - fre_logger.debug(f"metavars: {metavars}") - fre_logger.debug(f"datavars: {datavars}") - fre_logger.debug(f"var filter list: {var_list}") - - #datavars does 2 things: keep track of which vars to write, and tell xarray - #which vars to drop. we need to separate those things for the variable filtering. - if var_list == "all": - write_vars = datavars - else: - if isinstance(var_list, str): - var_list = var_list.split(",") - var_list = list(set(var_list)) - write_vars = [el for el in datavars if el in var_list] - fre_logger.debug(f"intersection of datavars and var_list: {write_vars}") - - if len(write_vars) < 0: - fre_logger.info(f"No data variables found in {infile}; no writes take place.") - else: - vc_encode = set_coord_encoding(dataset, dataset._coord_names) - for variable in write_vars: - fre_logger.info(f"splitting var {variable}") - #drop all data vars (diagnostics) that are not the current var of interest - #but KEEP the metadata vars - #(seriously, we need the time_bnds) - data2 = dataset.drop_vars([el for el in datavars if el is not variable]) - v_encode= set_var_encoding(dataset, metavars) - #combine 2 dicts into 1 dict - should be no shared keys, - #so the merge is straightforward - var_encode = {**vc_encode, **v_encode} - fre_logger.debug(f"var_encode settings: {var_encode}") - #Encoding principles for xarray: - # - no coords have a _FillValue - # - Everything is written out with THE SAME precision it was read in - # - Everything has THE SAME UNITS as it did when it was read in - var_outfile = fre_outfile_name(os.path.basename(infile), variable) - var_out = os.path.join(outfiledir, os.path.basename(var_outfile)) - data2.to_netcdf(var_out, encoding = var_encode) - -def get_max_ndims(dataset): - ''' - Gets the maximum number of dimensions of a single var in an xarray Dataset object. Excludes coord vars, which should be single-dim anyway. - - :param dataset: xarray Dataset you want to query - :type dataset: xarray Dataset - :return: The max dimensions that a single var possesses in the Dataset - :rtype: int - ''' - allvars = dataset.data_vars.keys() - ndims = [len(dataset[v].shape) for v in allvars] - return max(ndims) - -def set_coord_encoding(dset, vcoords): - ''' - Gets the encoding settings needed for xarray to write out the coordinates - as expected - we need the list of all vars (varnames) because that's how you get coords - for the metadata vars (i.e. nv or bnds for time_bnds) - - :param dset: xarray Dataset object to query for info - :type dset: xarray Dataset object - :param vcoords: list of coordinate variables to write to file - :type vcoords: list of strings - :return: A dictionary where each key is a coordinate in the xarray Dataset and - each value is a dictionary where the keys are the encoding information from - the coordinate variable in the Dataset plus the units (if present) - :rtype: dict - - .. note:: - This code removes _FillValue from coordinates. CF-compliant files do not - have _FillValue on coordinates, and xarray does not have a good way to get - _FillValue from coordinates. Letting xarray set _FillValue for coordinates - when coordinates *have* a _FillValue gets you wrong metadata, and bad metadata - is worse than no metadata. Dropping the attribute if it's present seems to be - the lesser of two evils. - ''' - fre_logger.debug(f"getting coord encode settings") - encode_dict = {} - for vc in vcoords: - vc_encoding = dset[vc].encoding #dict - encode_dict[vc] = {'_FillValue': None, - 'dtype': dset[vc].encoding['dtype']} - if "units" in vc_encoding.keys(): - encode_dict[vc]['units'] = dset[vc].encoding['units'] - return(encode_dict) - -def set_var_encoding(dset, varnames): - ''' - Gets the encoding settings needed for xarray to write out the variables - as expected - - mostly addressed to time_bnds, because xarray can drop the units attribute - - - https://github.com/pydata/xarray/issues/8368 - - :param dset: xarray dataset object to query for info - :type dset: xarray dataset object - :param varnames: list of variables that will be written to file - :type varnames: list of strings - :return: dict {var1: {encodekey1 : encodeval1, encodekey2:encodeval2...}} - :rtype: dict - ''' - fre_logger.debug(f"getting var encode settings") - encode_dict = {} - for v in varnames: - v_encoding = dset[v].encoding #dict - if not '_FillValue' in v_encoding.keys(): - encode_dict[v] = {'_FillValue': None, - 'dtype': dset[v].encoding['dtype']} - if "units" in v_encoding.keys(): - encode_dict[v]['units'] = dset[v].encoding['units'] - return(encode_dict) - -def fre_outfile_name(infile, varname): - ''' - Builds split var filenames the way that fre expects them - (and in a way that should work for any .nc file) - - This is expected to work with files formed the following way - - - Fre Input format: date.component(.tileX).nc - - Fre Output format: date.component.var(.tileX).nc - - but it should also work on any file filename.nc - - :param infile: name of a file with a . somewhere in the filename - :type infile: string - :param varname: string to add to the infile - :type varname: string - :return: new filename - :rtype: string - ''' - var_outfile = re.sub(".nc", f".{varname}.nc", infile) - return(var_outfile) - -#Main method invocation + split_file_xarray(infile, os.path.abspath(output_dir), varlist) + + fre_logger.info("split-netcdf-wrapper call complete") + sys.exit(0) # check this + +def split_file_xarray( infile, + outfiledir, + var_list = 'all' ): + ''' + Given a netcdf infile containing one or more data variables, + writes out a separate file for each data variable in the file, including the + variable name in the filename. + if var_list if specified, only the vars in var_list are written to file; + if no vars in the file match the vars in var_list, no files are written. + + :param infile: input netcdf file + :type infile: string + :param outfiledir: writeable directory to which to write netcdf files + :type outfiledir: string + :param var_list: python list of string variable names or a string "all" + :type var_list: list of strings + ''' + if not os.path.isfile(infile): + raise FileNotFoundError(f"error: input file {infile} not found. Please check the path.") + + if not os.path.isdir(outfiledir): + fre_logger.info("creating output directory") + os.makedirs(outfiledir) + + dataset = xr.load_dataset(infile, + decode_cf = False, decode_times = False, decode_coords = "all") + allvars = dataset.data_vars.keys() + + # If you have a file of 3 or more dim vars, 2d-or-fewer vars are likely to be + # metadata vars; if your file is 2d vars, 1d vars are likely to be metadata. + max_ndims = get_max_ndims(dataset) + if max_ndims >= 3: + varsize = 2 + else: + varsize = 1 + + # note: netcdf dimensions and xarray coords are NOT ALWAYS THE SAME THING. + # If they were, I could get away with the following: + # var_zerovars = [v for v in datavars if not len(dataset[v].coords) > 0]) + # instead of this: + var_shortvars = [v for v in allvars if (len(dataset[v].shape) <= varsize) and v not in dataset._coord_names] + + # having a variable listed as both a metadata var and a coordinate var seems to + # lead to the weird adding a _FillValue behavior + fre_logger.info( "var patterns: %s", VAR_PATTERNS) + fre_logger.info("1 or 2-d vars: %s", var_shortvars) + + # both combined gets you a decent list of non-diagnostic variables + var_exclude = list( set( + VAR_PATTERNS + [str(el) for el in var_shortvars] )) + def matchlist(xstr): + ''' + checks a string for matches in a list of patterns + xstr: string to search for matches + var_exclude: list of patterns defined in VAR_EXCLUDE + ''' + allmatch = [ re.search(el, xstr) for el in var_exclude ] + + # If there's at least one match in the var_exclude list (average_bnds is OK) + return len( list( set( allmatch ))) > 1 + + metavars = [el for el in allvars if matchlist(el)] + datavars = [el for el in allvars if not matchlist(el)] + fre_logger.debug("metavars: %s", metavars) + fre_logger.debug("datavars: %s", datavars) + fre_logger.debug("var filter list: %s", var_list) + + # datavars does 2 things: keep track of which vars to write, and tell xarray + # which vars to drop. we need to separate those things for the variable filtering. + if var_list == "all": + write_vars = datavars + else: + if isinstance(var_list, str): + var_list = var_list.split(",") + var_list = list(set(var_list)) + write_vars = [el for el in datavars if el in var_list] + fre_logger.debug("intersection of datavars and var_list: %s", write_vars) + + if len(write_vars) < 0: + fre_logger.info("No data variables found in %s; no writes take place.", infile) + else: + vc_encode = set_coord_encoding(dataset, dataset._coord_names) + for variable in write_vars: + fre_logger.info("splitting var %s", variable) + + # drop all data vars (diagnostics) that are not the current var of interest + # but KEEP the metadata vars (seriously, we need the time_bnds) + data2 = dataset.drop_vars( + [ el for el in datavars if el is not variable ] ) + v_encode= set_var_encoding(dataset, metavars) + + # combine 2 dicts into 1 dict - should be no shared keys, + # so the merge is straightforward + var_encode = {**vc_encode, **v_encode} + fre_logger.debug("var_encode settings: %s", var_encode) + + # Encoding principles for xarray: + # - no coords have a _FillValue + # - Everything is written out with THE SAME precision it was read in + # - Everything has THE SAME UNITS as it did when it was read in + var_outfile = fre_outfile_name(os.path.basename(infile), variable) + var_out = os.path.join(outfiledir, os.path.basename(var_outfile)) + data2.to_netcdf(var_out, encoding = var_encode) + +def get_max_ndims( dataset ): + ''' + Gets the maximum number of dimensions of a single var in an xarray Dataset object. + Excludes coord vars, which should be single-dim anyway. + + :param dataset: xarray Dataset you want to query + :type dataset: xarray Dataset + :return: The max dimensions that a single var possesses in the Dataset + :rtype: int + ''' + allvars = dataset.data_vars.keys() + ndims = [ len(dataset[v].shape) for v in allvars ] + return max(ndims) + +def set_coord_encoding( dset, + vcoords ): + ''' + Gets the encoding settings needed for xarray to write out the coordinates as expected + we need the list of all vars (varnames) because that's how you get coords + for the metadata vars (i.e. nv or bnds for time_bnds) + + :param dset: xarray Dataset object to query for info + :type dset: xarray Dataset object + :param vcoords: list of coordinate variables to write to file + :type vcoords: list of strings + :return: A dictionary where each key is a coordinate in the xarray Dataset and + each value is a dictionary where the keys are the encoding information from + the coordinate variable in the Dataset plus the units (if present) + :rtype: dict + + .. note:: + This code removes _FillValue from coordinates. CF-compliant files do not + have _FillValue on coordinates, and xarray does not have a good way to get + _FillValue from coordinates. Letting xarray set _FillValue for coordinates + when coordinates *have* a _FillValue gets you wrong metadata, and bad metadata + is worse than no metadata. Dropping the attribute if it's present seems to be + the lesser of two evils. + ''' + fre_logger.debug("getting coord encode settings") + encode_dict = {} + for vc in vcoords: + vc_encoding = dset[vc].encoding #dict + encode_dict[vc] = {'_FillValue': None, + 'dtype': dset[vc].encoding['dtype']} + if "units" in vc_encoding.keys(): + encode_dict[vc]['units'] = dset[vc].encoding['units'] + return encode_dict + +def set_var_encoding( dset, + varnames ): + ''' + Gets the encoding settings needed for xarray to write out the variables + as expected. mostly addressed to time_bnds, because xarray can drop the units attribute + - https://github.com/pydata/xarray/issues/8368 + + :param dset: xarray dataset object to query for info + :type dset: xarray dataset object + :param varnames: list of variables that will be written to file + :type varnames: list of strings + :return: dict {var1: {encodekey1 : encodeval1, encodekey2:encodeval2...}} + :rtype: dict + ''' + fre_logger.debug("getting var encode settings") + encode_dict = {} + for v in varnames: + v_encoding = dset[v].encoding #dict + if not '_FillValue' in v_encoding.keys(): + encode_dict[v] = {'_FillValue': None, + 'dtype': dset[v].encoding['dtype']} + if "units" in v_encoding.keys(): + encode_dict[v]['units'] = dset[v].encoding['units'] + return encode_dict + +def fre_outfile_name( infile, + varname ): + ''' + Builds split var filenames the way that fre expects them (and in a way that should work for any .nc file) + This is expected to work with files formed the following way, but it should also work on any file filename. + - Fre Input format: date.component(.tileX).nc + - Fre Output format: date.component.var(.tileX).nc + + :param infile: name of a file with a . somewhere in the filename + :type infile: string + :param varname: string to add to the infile + :type varname: string + :return: new filename + :rtype: string + ''' + var_outfile = re.sub(".nc", f".{varname}.nc", infile) + return var_outfile diff --git a/fre/pp/tests/test_split_netcdf.py b/fre/pp/tests/test_split_netcdf.py index 8603d0a89..803cab09f 100644 --- a/fre/pp/tests/test_split_netcdf.py +++ b/fre/pp/tests/test_split_netcdf.py @@ -2,32 +2,33 @@ Tests split-netcdf, parse_yaml from split_netcdf_script.py ''' -import pytest -import re -from fre.pp import split_netcdf_script -from fre.pp.split_netcdf_script import split_file_xarray -import subprocess import os from os import path as osp import pathlib from pathlib import Path -from fre import fre +import re +import subprocess import click from click.testing import CliRunner -runner=CliRunner() +import pytest + +from fre import fre +from fre.pp import split_netcdf_script +from fre.pp.split_netcdf_script import split_file_xarray + +runner = CliRunner() -#rootdir = Path(__file__).parents[3] #get to root directory test_dir = osp.realpath("fre/tests/test_files/ascii_files/split_netcdf") -cases = {"ts": {"dir":"atmos_daily.tile3", +cases = {"ts": { "dir":"atmos_daily.tile3", "nc": "00010101.atmos_daily.tile3.nc", - "cdl": "00010101.atmos_daily.tile3.cdl"}, - "static": {"dir": "ocean_static", + "cdl": "00010101.atmos_daily.tile3.cdl" }, + "static": { "dir": "ocean_static", "nc": "00010101.ocean_static.nc", - "cdl": "00010101.ocean_static.cdl"}} + "cdl": "00010101.ocean_static.cdl" } } -casedirs = [osp.join(test_dir, el) for el in [cases["ts"]["dir"], cases["static"]["dir"]]] +casedirs = [ osp.join(test_dir, el) for el in [ cases["ts"]["dir"], cases["static"]["dir"] ] ] all_ts_varlist = "all" some_ts_varlist = ["tasmax", "tasmin", "ps", "tasmin", "tas", "temp", "zsurf", "pv350K"] @@ -37,61 +38,58 @@ some_static_varlist = ["wet", "wet_c", "wet_v"] #should drop xh dim -#Set up splitting files def test_split_file_setup(): - ''' Sets up the files we need in order to test variable splitting. Mostly ncgen3 commands.''' - #ncgen the test file for test_dir1 + ''' + Sets up the files we need in order to test variable splitting. Mostly ncgen3 commands. + ''' + ncgen_commands = [] nc_files = [] sp_stat = [] + for testcase in cases.keys(): - cds = osp.join(test_dir,cases[testcase]["dir"]) - subdirs = [f.path for f in os.scandir(cds) if f.is_dir()] - for sd in subdirs: - #for each directory in the current dir, make a new dir with "new_" prepended - newdir = osp.join(cds, "new_" + os.path.basename(sd)) - if not osp.exists(newdir): - os.makedirs(newdir) - print(newdir) - cdl_files = [f.path for f in os.scandir(sd) if f.is_file] - cdl_files = [el for el in cdl_files if re.search("cdl", el) is not None] - for cdlf in cdl_files: - cdl_out = re.sub(".cdl", ".nc", cdlf) - cdlf_cmd = ["ncgen3", "-k", "netCDF-4", "-o", cdl_out, cdlf] - nc_files.append(cdl_out) - ncgen_commands.append(cdlf_cmd) - ncgen_commands.append(["ncgen3", "-k", "netCDF-4", "-o", - osp.join(cds, cases[testcase]["nc"]), - osp.join(cds, cases[testcase]["cdl"])]) - for ncg in ncgen_commands: - print(ncg) - sp = subprocess.run(ncg, check = True, capture_output=True) - sp_stat.append(sp.returncode) - sp_success = [el == 0 for el in sp_stat] - nc_files_exist = [osp.isfile(el) for el in nc_files] + cds = osp.join(test_dir, cases[testcase]["dir"]) + subdirs = [ f.path for f in os.scandir(cds) if f.is_dir() ] + + for sd in subdirs: + #for each directory in the current dir, make a new dir with "new_" prepended + newdir = osp.join(cds, "new_" + os.path.basename(sd)) + + if not osp.exists(newdir): + os.makedirs(newdir) + print( newdir ) + + cdl_files = [ f.path for f in os.scandir(sd) if f.is_file ] + cdl_files = [ el for el in cdl_files if re.search("cdl", el) is not None ] + + for cdlf in cdl_files: + cdl_out = re.sub(".cdl", ".nc", cdlf) + cdlf_cmd = ["ncgen3", "-k", "netCDF-4", "-o", cdl_out, cdlf] + nc_files.append(cdl_out) + ncgen_commands.append(cdlf_cmd) + + ncgen_commands.append( [ "ncgen3", "-k", "netCDF-4", "-o", + osp.join(cds, cases[testcase]["nc"]), + osp.join(cds, cases[testcase]["cdl"]) ] ) + for ncg in ncgen_commands: + print(ncg) + sp = subprocess.run(ncg, check = True, capture_output=True) + sp_stat.append(sp.returncode) + + sp_success = [ el == 0 for el in sp_stat ] + nc_files_exist = [ osp.isfile(el) for el in nc_files ] assert all( [ sp_success + nc_files_exist ] ) #test splitting files @pytest.mark.parametrize("workdir,infile,outfiledir,varlist", - [pytest.param(casedirs[0], cases["ts"]["nc"], - "new_all_ts_varlist", "all", - id="ts_all"), - pytest.param(casedirs[0], cases["ts"]["nc"], - "new_some_ts_varlist", - ",".join(some_ts_varlist), - id="ts_some"), - pytest.param(casedirs[0], cases["ts"]["nc"], - "new_none_ts_varlist", - ",".join(none_ts_varlist), id='none'), - pytest.param(casedirs[1], cases["static"]["nc"], - "new_all_static_varlist", "all", - id="static_all"), - pytest.param(casedirs[1], cases["static"]["nc"], - "new_some_static_varlist", - ",".join(some_static_varlist), - id="static_some")]) -def test_split_file_run(workdir,infile, outfiledir, varlist): - ''' Checks that split-netcdf will run when called from the command line + [ pytest.param( casedirs[0], cases["ts"]["nc"], "new_all_ts_varlist", "all", id = "ts_all" ), + pytest.param( casedirs[0], cases["ts"]["nc"], "new_some_ts_varlist", ",".join(some_ts_varlist), id = "ts_some" ), + pytest.param( casedirs[0], cases["ts"]["nc"], "new_none_ts_varlist", ",".join(none_ts_varlist), id = 'none' ), + pytest.param( casedirs[1], cases["static"]["nc"], "new_all_static_varlist", "all", id = "static_all" ), + pytest.param( casedirs[1], cases["static"]["nc"], "new_some_static_varlist", ",".join(some_static_varlist), id = "static_some" ) ] ) +def test_split_file_run(workdir, infile, outfiledir, varlist): + ''' + Checks that split-netcdf will run when called from the command line :param workdir: subdir all operations are relative to :type workdir: string @@ -113,22 +111,24 @@ def test_split_file_run(workdir,infile, outfiledir, varlist): ''' infile = osp.join(workdir, infile) outfiledir = osp.join(workdir, outfiledir) - split_netcdf_args = ["pp", "split-netcdf", - "--file", infile, - "--outputdir", outfiledir, - "--variables", varlist] + split_netcdf_args = [ "pp", "split-netcdf", + "--file", infile, + "--outputdir", outfiledir, + "--variables", varlist ] print(split_netcdf_args) - result = runner.invoke(fre.fre, args=split_netcdf_args) + + result = runner.invoke(fre.fre, args = split_netcdf_args) print(result) assert result.exit_code == 0 @pytest.mark.parametrize("workdir,newdir,origdir", - [pytest.param(casedirs[0],"new_all_ts_varlist", "all_ts_varlist", id='ts_all'), - pytest.param(casedirs[0],"new_some_ts_varlist", "some_ts_varlist", id='ts_some'), - pytest.param(casedirs[1],"new_all_static_varlist", "all_static_varlist", id='static_all'), - pytest.param(casedirs[1],"new_some_static_varlist", "some_static_varlist", id='static_some')]) + [ pytest.param(casedirs[0], "new_all_ts_varlist", "all_ts_varlist", id='ts_all'), + pytest.param(casedirs[0], "new_some_ts_varlist", "some_ts_varlist", id='ts_some'), + pytest.param(casedirs[1], "new_all_static_varlist", "all_static_varlist", id='static_all'), + pytest.param(casedirs[1], "new_some_static_varlist", "some_static_varlist", id='static_some') ]) def test_split_file_data(workdir,newdir, origdir): - ''' Checks that the data in the new files match the data in the old files + ''' + Checks that the data in the new files match the data in the old files :param workdir: dir that all operations are relative to :type workdir: string :param newdir: the directory containing the newly-written files (new_all_ts_varlist, new_some_ts_varlist) @@ -170,12 +170,13 @@ def test_split_file_data(workdir,newdir, origdir): #everything else seems to be matching; discussing this at the code review. @pytest.mark.parametrize("workdir,newdir,origdir", - [pytest.param(casedirs[0],"new_all_ts_varlist", "all_ts_varlist", id='all'), - pytest.param(casedirs[0],"new_some_ts_varlist", "some_ts_varlist", id='some'), - pytest.param(casedirs[1],"new_all_static_varlist", "all_static_varlist", id='static_all'), - pytest.param(casedirs[1],"new_some_static_varlist", "some_static_varlist", id='static_some')]) + [ pytest.param(casedirs[0], "new_all_ts_varlist", "all_ts_varlist", id='all'), + pytest.param(casedirs[0], "new_some_ts_varlist", "some_ts_varlist", id='some'), + pytest.param(casedirs[1], "new_all_static_varlist", "all_static_varlist", id='static_all'), + pytest.param(casedirs[1], "new_some_static_varlist", "some_static_varlist", id='static_some') ] ) def test_split_file_metadata(workdir,newdir, origdir): - ''' Checks that the metadata in the new files match the metadata in the old files + ''' + Checks that the metadata in the new files match the metadata in the old files :param workdir: dir that all operations are relative to :type workdir: string :param newdir: the directory containing the newly-written files (new_all_ts_varlist, new_some_ts_varlist) @@ -192,40 +193,44 @@ def test_split_file_metadata(workdir,newdir, origdir): ''' newdir = osp.join(workdir, newdir) origdir = osp.join(workdir, origdir) - orig_count = len([el for el in os.listdir(origdir) if el.endswith(".nc")]) - split_files = [el for el in os.listdir(newdir) if el.endswith(".nc")] + orig_count = len( [ el for el in os.listdir(origdir) if el.endswith(".nc") ] ) + split_files = [ el for el in os.listdir(newdir) if el.endswith(".nc") ] new_count = len(split_files) - same_count_files = (new_count == orig_count) - all_files_equal=True + same_count_files = new_count == orig_count + all_files_equal = True for sf in split_files: nccmp_cmd = [ 'nccmp', '-mg', '--force', osp.join(origdir, sf), osp.join(newdir, sf) ] - sp = subprocess.run( nccmp_cmd) + sp = subprocess.run( nccmp_cmd ) if sp.returncode != 0: - print(" ".join(nccmp_cmd)) + print( " ".join(nccmp_cmd) ) all_files_equal=False - print("comparison of " + nccmp_cmd[-1] + " and " + nccmp_cmd[-2] + " did not match") - print(sp.stdout, sp.stderr) + print( "comparison of " + nccmp_cmd[-1] + " and " + nccmp_cmd[-2] + " did not match" ) + print( sp.stdout, sp.stderr ) assert all_files_equal and same_count_files #clean up splitting files def test_split_file_cleanup(): - ''' Cleaning up files and dirs created for this set of tests. - Deletes all netcdf files (*.nc) and all dirs created for this test (new_*) - ''' + ''' + Cleaning up files and dirs created for this set of tests. + Deletes all netcdf files (*.nc) and all dirs created for this test (new_*) + ''' el_list = [] dir_list = [] for path, subdirs, files in os.walk(test_dir): - for name in files: - el_list.append(osp.join(path, name)) - for name in subdirs: - dir_list.append(osp.join(path,name)) - netcdf_files = [el for el in el_list if el.endswith(".nc")] + for name in files: + el_list.append(osp.join(path, name)) + for name in subdirs: + dir_list.append(osp.join(path,name)) + + netcdf_files = [ el for el in el_list if el.endswith(".nc") ] for nc in netcdf_files: - pathlib.Path.unlink(Path(nc)) - newdir = [el for el in dir_list if osp.basename(el).startswith("new_")] + pathlib.Path.unlink(Path(nc)) + + newdir = [ el for el in dir_list if osp.basename(el).startswith("new_") ] for nd in newdir: - pathlib.Path.rmdir(Path(nd)) - dir_deleted = [not osp.isdir(el) for el in newdir] - el_deleted = [not osp.isdir(el) for el in netcdf_files] + pathlib.Path.rmdir(Path(nd)) + + dir_deleted = [ not osp.isdir(el) for el in newdir ] + el_deleted = [ not osp.isdir(el) for el in netcdf_files ] assert all(el_deleted + dir_deleted) diff --git a/fre/pp/tests/test_split_netcdf_regex.py b/fre/pp/tests/test_split_netcdf_regex.py index 6abe27ee3..6e619639e 100644 --- a/fre/pp/tests/test_split_netcdf_regex.py +++ b/fre/pp/tests/test_split_netcdf_regex.py @@ -1,14 +1,15 @@ """ -Test fre.pp.split_netcdf_script file regex pattern -Tests the FILE_REGEX at the start of the file +Test fre.pp.split_netcdf_script's get_file_regex finds the right filenames """ + import os -import tempfile -import re -from unittest.mock import patch, MagicMock import pathlib from pathlib import Path -from fre.pp.split_netcdf_script import split_netcdf +import re +import tempfile +from unittest.mock import patch, MagicMock + +from fre.pp.split_netcdf_script import get_file_regex def test_split_netcdf_file_regex_pattern(): """ @@ -21,12 +22,17 @@ def test_split_netcdf_file_regex_pattern(): 'atmos_level_cmip' : '00020101.atmos_level_cmip.tile4.nc', 'ocean_cobalt_omip_2d' : '00020101.ocean_cobalt_omip_2d.nc' } + history_sources = matching_files.keys() + print(f'history_sources = {history_sources}') + for history_source in history_sources: + print(f'history_source = {history_source}') - for history_source in matching_files.keys(): - file_regex = generate_regex(history_source) + file_regex = get_file_regex(history_source) print(file_regex) + match = re.search(file_regex, matching_files[history_source]) assert match is not None, f"File '{matching_files[history_source]}' should match regex pattern {file_regex}'" + non_matching_files = { 'atmos_level_cmip_tile4' : '00020101.atmos_level_cmip.tile4.nc', 'ocean_cobalt' : '00020101.ocean_cobalt_omip_2d.nc', @@ -35,75 +41,13 @@ def test_split_netcdf_file_regex_pattern(): 'atmos_daily': "atmos_daily.nc", 'atmos_daily': "atmos_daily_something.nc" # This should not match as it has extra chars after } - for history_source in non_matching_files.keys(): - file_regex = generate_regex(history_source) - match = re.search(file_regex, non_matching_files[history_source]) - assert match is None, f"File '{non_matching_files[history_source]}' should NOT match regex pattern {file_regex}'" -def generate_regex(history_source): - ''' - Pull the regex from split_netcdf through a bizarre use of side effects - :param history_source: history_source for the regex; used to build regex - :type history_source: string - ''' - #temporary directories for testing - with tempfile.TemporaryDirectory() as temp_input, \ - tempfile.TemporaryDirectory() as temp_output: + non_matching_history_sources = non_matching_files.keys() + print(f'non_matching_history_sources = {non_matching_history_sources}') + for history_source in non_matching_files.keys(): + file_regex = get_file_regex(history_source) + print(file_regex) - # Create some test files that should match the regex pattern - test_files = [ - "00020101.atmos_level_cmip.tile4.nc", - "00020101.ocean_cobalt_omip_2d.nc", - "test_atmos_daily.nc", - "test_atmos_daily.tile1.nc", - "other_file.nc" - ] - - for filename in test_files: - filepath = os.path.join(temp_input, filename) - # Create empty files for testing - with open(filepath, 'w') as f: - f.write("") - - # Mock the parse_yaml_for_varlist function to avoid yaml dependency - with patch('fre.app.helpers.get_variables') as mock_get_variables, \ - patch('fre.pp.split_netcdf_script.split_file_xarray') as mock_split_file, \ - patch('fre.pp.split_netcdf_script.os.listdir') as mock_listdir, \ - patch('fre.pp.split_netcdf_script.re.match') as mock_re_match: - - # Setup mocks - mock_get_variables.return_value = ["var1", "var2"] - mock_listdir.return_value = test_files - - # Mock re.match to capture the regex pattern that gets created - captured_patterns = [] - - def mock_match_side_effect(pattern, string): - captured_patterns.append((pattern, string)) - # Return match objects for files that should match - if "atmos_daily" in string and ("atmos_daily" in pattern): - mock_match = MagicMock() - return mock_match - return None - - mock_re_match.side_effect = mock_match_side_effect - - # Call the function with test parameters - component = "atmos" - - try: - split_netcdf( - inputDir=temp_input, - outputDir=temp_output, - component=component, - history_source=history_source, - use_subdirs=False, - yamlfile="/fake/yaml/file.yml", - split_all_vars=True - ) - except: - # Function calls sys.exit(0) at the end, which is expected - pass - - return captured_patterns[0][0] + match = re.search(file_regex, non_matching_files[history_source]) + assert match is None, f"File '{non_matching_files[history_source]}' should NOT match regex pattern {file_regex}'" diff --git a/pylintrc b/pylintrc index 2466dccf5..9ed735ce7 100644 --- a/pylintrc +++ b/pylintrc @@ -39,7 +39,7 @@ extension-pkg-whitelist= fail-on= # Specify a score threshold under which the program will exit with error. -fail-under=7.80 +fail-under=8.00 # Interpret the stdin as a python script, whose filename needs to be passed as # the module_or_package argument.