diff --git a/delme/run_cell_statistics.py b/delme/run_cell_statistics.py new file mode 100644 index 000000000..6e052b14f --- /dev/null +++ b/delme/run_cell_statistics.py @@ -0,0 +1,40 @@ +from pathlib import Path + +from CSET._common import parse_recipe +from CSET.operators import execute_recipe + + +recipe_fpath = ( + Path(__file__).parent.parent / "src" / "CSET" / "recipes" / "cell_statistics.yaml" +) + +model_data = [ + ( + "uk_ctrl_um", + "/data/scratch/byron.blay/RES_RETRIEVAL_CACHE/test_lfric/20240121T0000Z/uk_ctrl_um/20240121T0000Z_UK_672x672_1km_RAL3P3_pdiag*.pp", + ), + ( + "uk_expt_lfric", + "/data/scratch/byron.blay/RES_RETRIEVAL_CACHE/test_lfric/20240121T0000Z/uk_expt_lfric/20240121T0000Z_UK_672x672_1km_RAL3P3_lfric*.nc", + ), +] + +vars = [ + "surface_microphysical_rainfall_rate", # m01s04i203 + "surface_microphysical_rainfall_amount", # m01s04i201 +] + +# todo: it's inefficient to keep reloading the data for each var/cell_attribute/time_grouping +# it would seem to be better to send the list of vars to the operator and plot functions, just once. +for var in vars: + print(f"\nrunning recipe for {var}\n" ) + + recipe_variables = { + "INPUT_PATHS": [i[1] for i in model_data], + "MODEL_NAMES": [i[0] for i in model_data], + "VARNAME": var, + "OUTPUT_FOLDER": "/data/scratch/byron.blay/cset/cell_stats/out4", + } + + recipe = parse_recipe(recipe_fpath, recipe_variables) + execute_recipe(recipe, Path(__file__).parent) diff --git a/src/CSET/operators/__init__.py b/src/CSET/operators/__init__.py index 8f68de590..0dc5ae949 100644 --- a/src/CSET/operators/__init__.py +++ b/src/CSET/operators/__init__.py @@ -28,6 +28,7 @@ from CSET.operators import ( ageofair, aggregate, + cell_statistics, collapse, constraints, convection, @@ -46,6 +47,7 @@ __all__ = [ "ageofair", "aggregate", + "cell_statistics", "collapse", "constraints", "convection", diff --git a/src/CSET/operators/_cell_stats_plots_template.html b/src/CSET/operators/_cell_stats_plots_template.html new file mode 100644 index 000000000..380ff0dfb --- /dev/null +++ b/src/CSET/operators/_cell_stats_plots_template.html @@ -0,0 +1,170 @@ + + + + + + + {{title}} + + + + + + + + +
+ +
+ + + + + + + + + + + + + +
+ + plot + +
+ + + \ No newline at end of file diff --git a/src/CSET/operators/cell_statistics.py b/src/CSET/operators/cell_statistics.py new file mode 100644 index 000000000..da610b5c1 --- /dev/null +++ b/src/CSET/operators/cell_statistics.py @@ -0,0 +1,931 @@ +"""OMG YOU CAN'T COMMIT WITHOUT DOCSTRINGS, EVEN IN EXPLORATORY CODE.""" + +import collections +import datetime +import inspect +import itertools +import logging +from copy import deepcopy +from typing import Callable + +import cftime +import iris.analysis +import iris.coords +import iris.cube +import iris.util +import numpy as np +from iris.cube import CubeList +from scipy import ndimage + +M_IN_KM = 1000 +HOUR_IN_SECONDS = 3600 + + +# some things from the res job +# +# DATES="20200801T0600Z 20200801T1800Z 20200802T0600Z" +# LAND_SEA_SPLIT="False" +# NUM_PROC="8" +# INPUT_DIR="/data/scratch/byron.blay/RES_RETRIEVAL_CACHE/test_quick" +# PLOT_DIR="/data/users/byron.blay/RES/test_full/diags" +# RECIPE="generic_basic_qq_plot.yaml" +# SHORT_NAMES="ral2 ral3" +# SITE="metoffice" +# Y_AXIS="frequency" +# PLOT_PARAM="$CYLC_TASK_PARAM_cellplots" + + +def var_filter(cubes, long_name=None, stash=None): + result = [] + for cube in cubes: + if long_name and cube.long_name == long_name: + result.append(cube) + elif stash and "STASH" in cube.attributes and cube.attributes["STASH"] == stash: + result.append(cube) + + return iris.cube.CubeList(result) + + +def get_bin_edges(cell_attribute): + # todo: we probably want to let the user specify the bins + if cell_attribute == "effective_radius_in_km": + bin_edges = 10 ** (np.arange(0.0, 3.12, 0.12)) + bin_edges = np.insert(bin_edges, 0, 0) + elif cell_attribute == "mean_value": + bin_edges = 10 ** (np.arange(-1, 2.7, 0.12)) + bin_edges = np.insert(bin_edges, 0, 0) + else: + raise ValueError(f"Unknown cell attribute '{cell_attribute}'") + + return bin_edges + + +def identify_unique_times(cubelist, time_coord_name): + """ + Given a :class:`iris.cube.CubeList`, this finds the set of unique times \ + which occur across all cubes in the cubelist. + + Arguments: + + * **cubelist** - a :class:`iris.cube.CubeList` of :class:`iris.cube.Cube` \ + objects. + * **time_coord_name** - the name of the time coordinate to select, \ + typically "time", "forecast_period" or "hour". + + Returns + ------- + * **time_coord** - an :class:`iris.coords.Coord` instance containing the \ + unique times that occur across the cubes in the \ + input cubelist. + """ + times = [] + time_unit = None + # Loop over cubes + for cube in cubelist: + # Extract the desired time coordinate from the cube + time_coord = cube.coord(time_coord_name) + + # Get the units for the specifed time coordinate + if time_unit is None: + time_unit = time_coord.units + + # Store the time coordinate points + times.extend(time_coord.points) + + # Construct a list of unique times... + times = sorted(list(set(times))) + # ...and store them in a new time coordinate + time_coord = iris.coords.DimCoord(times, units=time_unit) + time_coord.rename(time_coord_name) + + return time_coord + + +def add_categorised_coord( + cube: iris.cube.Cube, + name: str, + from_coord: iris.coords.DimCoord | iris.coords.AuxCoord | str, + category_function: Callable, + units: str = "1", +) -> None: + """Add a new coordinate to a cube, by categorising an existing one. + + Make a new :class:`iris.coords.AuxCoord` from mapped values, and add + it to the cube. + + Parameters + ---------- + cube : + The cube containing 'from_coord'. The new coord will be added into it. + name : + Name of the created coordinate. + from_coord : + Coordinate in 'cube', or the name of one. + category_function : + Function(coordinate, value), returning a category value for a coordinate + point-value. If ``value`` has a type hint :obj:`cftime.datetime`, the + coordinate points are translated to :obj:`cftime.datetime` s before + calling ``category_function``. + units : + Units of the category value, typically 'no_unit' or '1'. + """ + # Interpret coord, if given as a name + coord = cube.coord(from_coord) if isinstance(from_coord, str) else from_coord + + if len(cube.coords(name)) > 0: + msg = 'A coordinate "%s" already exists in the cube.' % name + raise ValueError(msg) + + # Translate the coordinate points to cftime datetimes if requested. + value_param = list(inspect.signature(category_function).parameters.values())[1] + if issubclass(value_param.annotation, cftime.datetime): + points = coord.units.num2date(coord.points, only_use_cftime_datetimes=True) + else: + points = coord.points + + # Construct new coordinate by mapping values, using numpy.vectorize to + # support multi-dimensional coords. + # Test whether the result contains strings. If it does we must manually + # force the dtype because of a numpy bug (see numpy #3270 on GitHub). + result = category_function(coord, points.ravel()[0]) + if isinstance(result, str): + str_vectorised_fn = np.vectorize(category_function, otypes=[object]) + + def vectorised_fn(*args): + # Use a common type for string arrays (N.B. limited to 64 chars). + return str_vectorised_fn(*args).astype("|U64") + + else: + vectorised_fn = np.vectorize(category_function) + new_coord = iris.coords.AuxCoord( + vectorised_fn(coord, points), + units=units, + attributes=coord.attributes.copy(), + ) + new_coord.rename(name) + + # Add into the cube + cube.add_aux_coord(new_coord, cube.coord_dims(coord)) + + +def hour_from_time(coord, point): + """ + Category function to calculate the hour given a time, for use in \ + :func:`iris.coord_categorisation.add_categorised_coord`. + """ + time = coord.units.num2date(point) + day_start = datetime.datetime(time.year, time.month, time.day) + seconds_since_day_start = (time - day_start).total_seconds() + hours_since_day_start = seconds_since_day_start / float(HOUR_IN_SECONDS) + return hours_since_day_start + + +def remove_duplicates(cubelist): + """ + Removes any duplicate :class:`iris.cube.Cube` objects from an \ + :class:`iris.cube.CubeList`. + """ + # Nothing to do if the cubelist is empty + if not cubelist: + return cubelist + # Build up a list of indices of the cubes to remove because they are + # duplicated + indices_to_remove = [] + for i in range(len(cubelist) - 1): + cube_i = cubelist[i] + for j in range(i + 1, len(cubelist)): + cube_j = cubelist[j] + if cube_i == cube_j: + if j not in indices_to_remove: + indices_to_remove.append(j) + # Only keep unique cubes + cubelist = iris.cube.CubeList( + [cube for index, cube in enumerate(cubelist) if index not in indices_to_remove] + ) + return cubelist + + +def extract_unique(cubes, group): + # Preparation of data for averaging and plotting + if group == "time": + # Identify a unique set of times + times = identify_unique_times(cubes, group) + + # Now extract data at these times + time_constraint = iris.Constraint( + coord_values={group: lambda cell: cell.point in times.points} + ) + cubes = cubes.extract(time_constraint) + + # Remove other time coordinates to allow a cube merge + # later + for cube in cubes: + cube.remove_coord("forecast_reference_time") + cube.remove_coord("forecast_period") + elif group == "forecast_period": + # Identify a unique set of lead times + times = identify_unique_times(cubes, group) + + # Remove other time coordinates to allow a cube + # merge later + for cube in cubes: + cube.remove_coord("forecast_reference_time") + cube.remove_coord("time") + elif group == "hour": + # Categorise the time coordinate of each cube into + # hours + for cube in cubes: + add_categorised_coord( + cube, "hour", cube.coord("time"), hour_from_time, units="hour" + ) + + # Identify a unique set of times of day + times = identify_unique_times(cubes, group) + + # Now extract data at these times + time_constraint = iris.Constraint( + coord_values={group: lambda cell: cell.point in times.points} + ) + cubes = cubes.extract(time_constraint) + + # Remove other time coordinates to allow a cube + # merge later + for cube in cubes: + cube.remove_coord("forecast_reference_time") + cube.remove_coord("time") + cube.remove_coord("forecast_period") + + # Remove any duplicate cubes to allow a successful merge + # Note this is typcially because we have the same set of + # observations associated with more than one model + cubes = remove_duplicates(cubes) + + return cubes, times + + +def remove_cell_method(cube, cell_method): + """ + Removes the supplied :class:`iris.coords.CellMethod` from an input + :class:`iris.cube.Cube`, then returns the cube. + """ + cell_methods = [cm for cm in cube.cell_methods if cm != cell_method] + cube.cell_methods = () + for cm in cell_methods: + cube.add_cell_method(cm) + return cube + + +def aggregate_at_time(input_params): + """ + Extracts data valid at a given time from each cube in a list of cubes, \ + then performs an aggregation operation (e.g. mean) across this data. + + Arguments (passed in as a tuple to allow parallelisation): + + * **input_params** - a four-element tuple consisting of: + + * **cubes** - a :class:`iris.cube.CubeList` holding the \ + :class:`iris.cube.Cube` objects to process. + * **time_coord** - the time at which the aggregation should be performed, \ + supplied as an :class:`iris.coords.Coord` object. + * **aggregator** - the aggregator to use, which can be any from \ + :mod:`iris.analysis`. + * **percentile** - the value of the percentile rank at which to extract \ + values, if the chosen aggregator is \ + :class:`iris.analysis.PERCENTILE`. For other \ + aggregators this not used. + + Returns + ------- + * **aggregated_cubes** - an :class:`iris.cube.CubeList` of \ + :class:`iris.cube.Cube` objects holding the \ + aggregated data. + """ + # Unpack input parameters tuple + cubes = input_params[0] + time_coord = input_params[1] + aggregator = input_params[2] + # TODO: Can we improve the handling of this with keyword arguments? + percentile = input_params[3] + + # Check the supplied time coordinate to make sure it corresponds to a + # single time only + if len(time_coord.points) != 1: + raise ValueError("Time coordinate should specify a single time only") + + # Remove any duplicate cubes in the input cubelist otherwise this + # will break the aggregation + cubes = remove_duplicates(cubes) + + # Name of the supplied time coordinate + time_coord_name = time_coord.name() + + # Extract cubes matching the time specified by the supplied time coordinate. + # Even though the source coord might have floats for its points, + # the cell here will have cftime objects, such as DatetimeGregorian, + # so we can't just compare against the time coord's points. + time_constraint = iris.Constraint( + coord_values={time_coord_name: lambda cell: cell.point in time_coord.cells()} + ) + cubes_at_time = cubes.extract(time_constraint) + + # Add a temporary "number" coordinate to uniquely label the different + # data points at this time. + # An example of when there can be multiple data points at the time of + # interest is if the time coordinate represents the hour of day. + number = 0 + numbered_cubes = iris.cube.CubeList() + for cube in cubes_at_time: + for slc in cube.slices_over(time_coord_name): + number_coord = iris.coords.AuxCoord(number, long_name="number") + slc.add_aux_coord(number_coord) + numbered_cubes.append(slc) + number += 1 + cubes_at_time = numbered_cubes + + # Merge + cubes_at_time = cubes_at_time.merge() + + # For each cube in the cubelist, aggregate over all cases at this time + # using the supplied aggregator + aggregated_cubes = iris.cube.CubeList() + for cube in cubes_at_time: + # If there was only a single data point at this time, then "number" + # will be a scalar coordinate. If so, make it a dimension coordinate + # to allow collapsing below + if not cube.coord_dims("number"): + cube = iris.util.new_axis(cube, scalar_coord="number") + + # Store the total number of data points found at this time + num_cases = cube.coord("number").points.size + num_cases_coord = iris.coords.AuxCoord(num_cases, long_name="num_cases") + cube.add_aux_coord(num_cases_coord) + + # Do aggregation across the temporary "number" coordinate + if isinstance(aggregator, type(iris.analysis.PERCENTILE)): + cube = cube.collapsed("number", aggregator, percent=percentile) + else: + cube = cube.collapsed("number", aggregator) + + # Now remove the "number" coordinate... + cube.remove_coord("number") + # ...and associated cell method + cell_method = iris.coords.CellMethod(aggregator.name(), coords="number") + cube = remove_cell_method(cube, cell_method) + + aggregated_cubes.append(cube) + + return aggregated_cubes + + +def repeat_scalar_coord_along_dim_coord(cubelist, scalar_coord_name, dim_coord_name): + """ + For each :class:`iris.cube.Cube` in a given :class:`iris.cube.CubeList`, \ + this extends (by repetition) a specified scalar coordinate along the \ + dimension corresponding to a specified dimension coordinate. + """ + for cube in cubelist: + scalar_coord = cube.coord(scalar_coord_name) + # Check the coordinate referenced by scalar_coord_name is indeed + # a scalar coordinate. Otherwise nothing to do. + if scalar_coord.points.size == 1: + # Get the data value held by the scalar coordinate... + scalar_coord_val = scalar_coord.points[0] + # ...then remove it from the cube + cube.remove_coord(scalar_coord) + # Extract the dimension coordinate matching dim_coord_name + dim_coord = cube.coord(dim_coord_name) + # Get the dimension spanned by this dimension coordinate... + dim = cube.coord_dims(dim_coord_name)[0] + # ...and its length + dim_size = dim_coord.points.size + # Construct an auxillary coordinate by replicating the data + # value from the scalar coordinate to match the size of the + # specified dimension coordinate + scalar_coord = iris.coords.AuxCoord( + np.repeat(scalar_coord_val, dim_size), long_name=scalar_coord_name + ) + + # Add the new auxillary coordinate to the cube + cube.add_aux_coord(scalar_coord, dim) + + return cubelist + + +def ensure_bounds(cubes, coord_name): + for cube in cubes: + coord = cube.coord(coord_name) + if not coord.bounds is not None: + coord.guess_bounds() + + +def caller_thing(cubes: CubeList): + """ + Create a histogram from each given cube, for every threshold and cell attribute. + + The cube will be "1-hourly mean precipitation rate" or "Large-scale rainfall rate". + + Arguments + --------- + cubes: iris.cube.CubeList + Cube(s) to filter. Will be "1-hourly mean precipitation rate" or "Large-scale rainfall rate". + Assumed to be one cube per model, with the first being the control. + cell_attribute: str + "effective_radius_in_km" or "mean_value" + time_grouping: str + "forecast_period", "hour" or "time" + + Returns a nested dictionary: + result[cell_attribute][time_grouping][threshold][time_point][model_name] = cube + + """ + # We loop through these, producing a result per model for each combo. + # todo: we might not want to produce data for all cell attributes or time groupings, so we could paramterise. + cell_attributes = ["effective_radius_in_km", "mean_value"] + time_groupings = ["forecast_period", "hour", "time"] + thresholds = [0.5, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0] # todo: parametrise this? + + # todo: paramterise y_axis? + # set to "frequency" for cell statistic histograms that are \ + # plain number counts in bins, or "relative_frequency" for \ + # histograms showing the overall proportion of data values in \ + # each bin (normalised to 100%). The default is \ + # "relative_frequency". + y_axis = "relative_frequency" + + if cubes in (None, [], CubeList([])): + logging.warning(f'no cubes received') + return None + + if not isinstance(cubes, CubeList): + cubes = CubeList([cubes]) + + # todo: is this correct? it caused more empty cube lists... + # todo: put this cell method filtering into the yaml + # we saw lfric data with "point" or "mean". + def check_cell_methods(cube): + """discard "means", we want "points" or empty cell methods""" + for cm in cube.cell_methods: + if cm.method == "mean": + return False + return True + cubes = CubeList([c for c in cubes if check_cell_methods(c)]) + + if len(cubes) == 0: + logging.warning(f'no cubes after cell method filtering') + return None + + # Get a list of the common forecast periods and hours (i.e time groupings). + # For forecast time, this must contain "all", plus all the T+0.5 .. T+71.5 + # For hourly, this must contain "all", plus all the 0.5Z .. 23.5Z + # todo: What about time? + forecast_periods = set(cubes[0].coord("forecast_period").points) + hours = set(cubes[0].coord("time").points) # todo: check it's actually hourly? + for cube in cubes[1:]: + forecast_periods.intersection_update(set(cube.coord("forecast_period").points)) + hours.intersection_update(set(cube.coord("time").points)) + + # todo: we need a land-sea split input? + # for now, hard-code as false (this is what the res output had it set to) + land_sea_split = False + + # what about obs? GPM & UK_RADAR_2KM? ignore for now. + + # this will be a nested dict: + # result[cell_attribute][time_grouping][threshold][time_point][model_name] = cube + result = {} + + combos = itertools.product(cell_attributes, time_groupings, thresholds) + for cell_attribute, time_grouping, threshold in combos: + print(f'cell_attribute: {cell_attribute}, time_grouping: {time_grouping}, threshold: {threshold}') + + # create the top three levels of nested dictionaries + result.setdefault(cell_attribute, {}) + result[cell_attribute].setdefault(time_grouping, {}) + result[cell_attribute][time_grouping].setdefault(str(threshold), {}) + time_data = result[cell_attribute][time_grouping][str(threshold)] # helper var + + # todo: check var_name has been removed by this point, as RES removed it to help with merge + + # todo: can't we just let mpl make the histograms at plot time? + hist_cubes = CubeList([]) + for cube in cubes: + hist_cube = something_like_cell_attribute_histogram( + cube, attribute=cell_attribute, bin_edges=get_bin_edges(cell_attribute), threshold=threshold) + hist_cubes.append(hist_cube) + + # RES used it's own version because Iris' was slow at the time. todo: is it fast now? + # hist_cubes = RES_extract_overlapping(hist_cubes, "forecast_period") + ensure_bounds(hist_cubes, "forecast_period") # iris fails if only one has bounds + hist_cubes = hist_cubes.extract_overlapping("forecast_period") + + # Now we take the cell statistics and aggregate by "time_grouping". + + # We should deep copy the cubes here becuase there is cube wrangling below, + # and it's possible that we'll be using the same cubes across iterations in this loop. + # (as extract doesn't always create a new cube). + hist_cubes = deepcopy(hist_cubes) + + # Res comment for this bit is: preparation of data for averaging and plotting + # It removes the other time coords, other than that named by time_grouping. + hist_cubes, times = extract_unique(hist_cubes, time_grouping) + + # Sum cell statistic histograms at each time. todo: parallelise? + summed_cubes = [] + for time in times: + input_param = (hist_cubes, time, iris.analysis.SUM, None) + summed_cube = aggregate_at_time(input_param) + summed_cubes.append(summed_cube) + summed_cubes = iris.cube.CubeList(itertools.chain.from_iterable(summed_cubes)) + summed_cubes = summed_cubes.merge() + + # If the number of cases at each time is the same, the + # above merge results in a scalar coordinate representing + # the number of cases. Replace this scalar coordinate with + # an auxillary coordinate that has the same length as the + # time coordinate + summed_cubes = repeat_scalar_coord_along_dim_coord( + summed_cubes, "num_cases", time_grouping + ) + + # At this point, RES extracts every time point into a separate cube list and plots it. + for time in times: + assert len(time.points) == 1 + time_point = time.points[0] + + if time_grouping == "forecast_period": + time_title = "T+{0:.1f}".format(time_point) + elif time_grouping == "hour": + time_title = "{0:.1f}Z".format(time_point) + elif time_grouping == "time": + time_unit = time.units + datetime = time_unit.num2date(time_point) + time_title = "{0:%Y/%m/%d} {1:%H%M}Z".format(datetime, datetime) + else: + raise ValueError(f"Unknown time grouping '{time_grouping}'") + + time_data[time_title] = {} + + # Extract histogram at this time. + time_constraint = iris.Constraint( + coord_values={time_grouping: lambda cell: cell.point in time.points} + ) + cubes_at_time = summed_cubes.extract(time_constraint) + + # todo: RES has some analysis of the number of cases used to construct the histogram at this point. + + # todo: RES plots each cube here. + for cube in cubes_at_time: + # todo: Normalise histogram? Again, should this be in the plotting? + if y_axis == "relative_frequency": + cube.data = (100.0 * cube.data) / np.sum(cube.data, dtype=np.float64) + + time_data[time_title][cube.attributes["model_name"]] = cube + + # Sum all histograms. This creates the data for the "all" time point. + time_data["all"] = {} + for cube in summed_cubes: + cube = cube.collapsed(time_grouping, iris.analysis.SUM) + + # todo: Normalise histogram? + if y_axis == "relative_frequency": + cube.data = (100.0 * cube.data) / np.sum(cube.data, dtype=np.float64) + + time_data["all"][cube.attributes["model_name"]] = cube + + # return cubes to plot. todo: in what exact arrangement? + return result + + +def something_like_cell_attribute_histogram(cube, attribute, bin_edges, threshold=0.0): + # Express histogram bins as an Iris coordinate + bin_centres = 0.5 * (bin_edges[1:] + bin_edges[:-1]) + bins_as_coord = iris.coords.DimCoord( + bin_centres, + long_name=attribute, + units=cube.units, + coord_system=None, + bounds=np.column_stack((bin_edges[0:-1], bin_edges[1:])), + ) + + data_min, data_max = None, None + hist_cubes = iris.cube.CubeList() + coords = get_non_spatial_coords(cube) + + for slc in cube.slices_over(coords): + # Identify connected cells in this spatial slice + cells = find_cells(slc, threshold=threshold) + + if cells: + # Extract values of the desired cell attribute + cell_attributes = [cell.attributes[attribute] for cell in cells] + + # Store the minimum/maximum values of the cell attribute + if data_min is None or np.min(cell_attributes) < data_min: + data_min = np.min(cell_attributes) + if data_max is None or np.max(cell_attributes) > data_max: + data_max = np.max(cell_attributes) + + # Construct a histogram of the desired cell attribute + hist, _ = np.histogram(cell_attributes, bin_edges) + else: + # Assign zeros to all bins + hist = np.zeros(bin_centres.size).astype(np.int64) + + # Get a list of the non-spatial coordinates for this slice + aux_coords = [(coord, []) for coord in get_non_spatial_coords(slc)] + + # Construct a cube to hold the cell statistic histogram for this slice + hist_slc = iris.cube.Cube( + hist, + long_name=("{0:s} cell {1:s} histogram".format(slc.name(), attribute)), + units="no_unit", + attributes=slc.attributes, + cell_methods=slc.cell_methods, + dim_coords_and_dims=[(bins_as_coord, 0)], + aux_coords_and_dims=aux_coords, + ) + + hist_cubes.append(hist_slc) + + hist_cube = hist_cubes.merge_cube() + return hist_cube + + +def get_spatial_coords(cube): + """Returns the x, y coordinates of an input :class:`iris.cube.Cube`.""" + # Usual names for spatial coordinates + X_COORD_NAMES = ["longitude", "grid_longitude", "projection_x_coordinate", "x"] + Y_COORD_NAMES = ["latitude", "grid_latitude", "projection_y_coordinate", "y"] + + # Get a list of coordinate names for the cube + coord_names = [coord.name() for coord in cube.coords()] + + # Check which x-coordinate we have, if any + x_coords = [coord for coord in coord_names if coord in X_COORD_NAMES] + if len(x_coords) != 1: + raise ValueError("Could not identify a unique x-coordinate in cube") + x_coord = cube.coord(x_coords[0]) + + # Check which y-coordinate we have, if any + y_coords = [coord for coord in coord_names if coord in Y_COORD_NAMES] + if len(y_coords) != 1: + raise ValueError("Could not identify a unique y-coordinate in cube") + y_coord = cube.coord(y_coords[0]) + + return [x_coord, y_coord] + + +def get_non_spatial_coords(cube): + """ + Returns a list of the non-spatial coordinates of an input \ + :class:`iris.cube.Cube`. + """ + # Get a list of the cube coordinates + coords = cube.coords() + # Get the spatial coordinates of the cube + x_coord, y_coord = get_spatial_coords(cube) + # Remove the spatial coordinates from the list of coordinates + coords.remove(x_coord) + coords.remove(y_coord) + return coords + + +def guess_bounds(cube): + """ + Takes an input :class:`iris.cube.Cube`, guesses bounds on the x, y \ + coordinates, then returns the cube. Such bounds are often required by \ + regridding algorithms. + """ + # Loop over spatial coordinates + for axis in ["x", "y"]: + coord = cube.coord(axis=axis) + # Check that this is not a variable resolution grid + # TODO: Does bounds really not work with variable resolution? AVD + # seem to think it won't... + try: + _ = iris.util.regular_step(coord) + except: + raise ValueError("Cannot guess bounds for a variable resolution grid") + # Guess bounds if there aren't any + if coord.bounds is None: + coord.guess_bounds() + return cube + + +def connected_object_labelling(data, threshold=0.0, min_size=1, connectivity=1): + """ + Finds connected objects in an input array and assigns them unique labels. + + Arguments: + + * **data** - a :class:`numpy.ndarray` array in which to label objects. + + Keyword arguments: + + * **threshold** - if supplied, only regions where the input data exceeds \ + the threshold will be considered when searching for \ + connected objects. + * **min_size** - minimum size in grids points for connected objects. Must \ + be an integer >= 1. + * **connectivity** - given a particular grid point, all grid points up to \ + a squared distance of connectivity away are considered \ + neighbours. Connectivity may range from 1 (only direct \ + neighbours are considered) to :attr:`data.ndim`. + + Returns + ------- + * **label_array** - an integer array where each unique object in the input \ + array has a unique label in the returned array. + * **num_objects** - the number of objects found. + """ + # Apply the supplied threshold to the data to generate a binary array + binary_data = data.copy() + if np.ma.is_masked(binary_data): + # If the data is masked, replace any masked values with (threshold - 1) + # thus guaranteeing they will be below the threshold and thus set to 0 + # below + binary_data = np.ma.filled(binary_data, fill_value=(threshold - 1)) + # Set values above and below the threshold to 1 and 0, respectively + indices_above = binary_data > threshold + indices_below = binary_data <= threshold + binary_data[indices_above] = 1 + binary_data[indices_below] = 0 + + # Construct a structuring element that defines how the neighbours of + # a grid point are assigned + structure_element = ndimage.morphology.generate_binary_structure( + data.ndim, connectivity + ) + + # Label distinct (connected) objects in the binary array + label_array, num_objects = ndimage.measurements.label( + binary_data, structure=structure_element + ) + + # Throw away any objects smaller than min_size + if min_size < 1: + raise ValueError('"min_size" must be 1 or greater') + elif min_size > 1: + labels = np.unique(label_array) + # Discard the background (which will be labelled as 0) + labels = labels[(labels > 0)] + # Loop over distinct objects + for label in labels: + # Find the indices of the grid points comprising this object + indices = np.where(label_array == label) + # If this object is smaller than min_size, set it as background + if indices[0].size < min_size: + label_array[indices] = 0 + num_objects -= 1 + + return label_array, num_objects + + +def find_cells(cube, threshold=0.0, area_threshold=0.0, connectivity=1): + """ + Finds connected objects (i.e. cells) in spatial slices of a given \ + :class:`iris.cube.Cube`. + + Arguments: + + * **cube** - an input :class:`iris.cube.Cube` object. + + Keyword arguments: + + * **threshold** - if supplied, only regions where the input data exceeds \ + the threshold will be considered when identifying cells. + * **area_threshold** - minimum area in km^2 that cells must have. + * **connectivity** - given a particular grid point, all grid points up to a \ + squared distance of connectivity away are considered \ + neighbours. Connectivity may range from 1 (only \ + direct neighbours are considered) to \ + :attr:`cube.data.ndim`. + + Returns + ------- + * **cells** - a :class:`iris.cube.CubeList` of \ + :class:`iris.cube.Cube` objects, each one corresponding to \ + an identified cell. + """ + # Convert input area threshold from km^2 to m^2 + area_threshold = (float(M_IN_KM) ** 2) * area_threshold + + # Get x, y coordinates of input cube + x_coord, y_coord = get_spatial_coords(cube) + x, y = iris.analysis.cartography.get_xy_grids(cube) + + # Guess x, y coordinate bounds + cube = guess_bounds(cube) + + # Loop over 2D spatial slices of the input cube and find cells in each + # slice + grid_areas = None + cells = iris.cube.CubeList() + coords = get_non_spatial_coords(cube) + for slc in cube.slices_over(coords): + if grid_areas is None: + # Area of grid cells, in m^2 + grid_areas = iris.analysis.cartography.area_weights(slc) + + # Store a list of the non-spatial coordinates for this slice + aux_coords = [(coord, []) for coord in get_non_spatial_coords(slc)] + + # Find and label cells + # Call connected object labelling function based on + # scipy.ndimage.measurements.label + cell_label_array, _ = connected_object_labelling( + slc.data, threshold=threshold, min_size=1, connectivity=connectivity + ) + + # Get a list of unique cell labels + cell_labels = np.unique(cell_label_array) + # Discard background (which has a label of 0) + cell_labels = cell_labels[(cell_labels > 0)] + # Loop over cell and store their properties + for cell_label in cell_labels: + # Find the indices of the grid points comprising this cell + cell_indices = np.where(cell_label_array == cell_label) + cell_x = x[cell_indices] + cell_y = y[cell_indices] + cell_values = slc.data[cell_indices] + cell_grid_areas = grid_areas[cell_indices] + + # There should not be any masked data present in cells! + if np.ma.is_masked(cell_values): + raise ValueError("Masked data found in cell {0:d}".format(cell_label)) + + # If cell area is less than area_threshold, discard it + # (by setting its label to the background value) + cell_area = np.sum(cell_grid_areas, dtype=np.float64) + if cell_area < area_threshold: + cell_label_array[cell_indices] = 0 + continue + + # Estimate cell centre position + # TODO Is there a better way of doing this? C.O.M? + cell_centre = ( + np.mean(cell_x, dtype=np.float64), + np.mean(cell_y, dtype=np.float64), + ) + # Area-weighted mean value in cell + cell_mean = ( + np.sum((cell_grid_areas * cell_values), dtype=np.float64) / cell_area + ) + # Convert cell area from m^2 to km^2... + cell_area /= float(M_IN_KM) ** 2 + # ...and then cell effective radius in km + cell_radius = np.sqrt(cell_area / np.pi) + + # Create an Iris cube to store this cell + cell_cube = iris.cube.Cube( + cell_values, + long_name="{:s} cell".format(cube.name()), + units=cube.units, + attributes=cube.attributes, + cell_methods=cube.cell_methods, + aux_coords_and_dims=aux_coords, + ) + + # Set up x, y coordinates describing the grid points in the cell... + cell_x_coord = iris.coords.AuxCoord( + cell_x, + standard_name=x_coord.standard_name, + long_name=x_coord.long_name, + units=x_coord.units, + bounds=None, + attributes=x_coord.attributes, + coord_system=x_coord.coord_system, + ) + cell_y_coord = iris.coords.AuxCoord( + cell_y, + standard_name=y_coord.standard_name, + long_name=y_coord.long_name, + units=y_coord.units, + bounds=None, + attributes=y_coord.attributes, + coord_system=y_coord.coord_system, + ) + # ...and add them to the cell cube + cell_cube.add_aux_coord(cell_x_coord, 0) + cell_cube.add_aux_coord(cell_y_coord, 0) + + # Set up a coordinate describing the areas of grid cells in + # the cell object... + cell_grid_area_coord = iris.coords.AuxCoord( + cell_grid_areas, long_name="grid_areas", units="m2" + ) + # ...and add it to the cell cube + cell_cube.add_aux_coord(cell_grid_area_coord, 0) + + # Finally add some attriubtes to the cube that describe some + # useful information about the cell + # cell_cube.attributes["label"] = int(cell_label) + cell_cube.attributes["centre"] = cell_centre + cell_cube.attributes["area_in_km2"] = cell_area + cell_cube.attributes["effective_radius_in_km"] = cell_radius + cell_cube.attributes["mean_value"] = cell_mean + # cell_cube.attributes["indices"] = cell_indices + cells.append(cell_cube) + + return cells diff --git a/src/CSET/operators/plot.py b/src/CSET/operators/plot.py index 1eef7ff00..1ad482b84 100644 --- a/src/CSET/operators/plot.py +++ b/src/CSET/operators/plot.py @@ -23,6 +23,7 @@ import math import os import sys +from pathlib import Path from typing import Literal import cartopy.crs as ccrs @@ -2208,3 +2209,80 @@ def plot_histogram_series( _make_plot_html_page(complete_plot_index) return cubes + + +def check_for_nan(cubes): + # The plot function was crashing when cube.data had nan in the min/max. + for cube in cubes: + # if np.nan in [cube.data.min, cube.data.max]: + if np.isnan(cube.data.min()) or np.isnan(cube.data.max()): + return True + return False + + +def make_cell_stats_page(html_fpath, title, plot_urls): + template_file = Path(__file__).parent / "_cell_stats_plots_template.html" + variables = { + 'title': '', + 'plot_urls': plot_urls, + } + html = render_file(str(template_file), **variables) + with open(html_fpath, "wt", encoding="UTF-8") as fp: + fp.write(html) + + +def plot_cell_stats_histograms(data: dict, varname: str, output_folder: str): + """ + + Parameters + ---------- + data: + A dict mapping thresholds to histogram data for each time point: + data[cell_attribute][time_grouping][threshold][time_point][model_name] = cube + + NOTE: IT SHOULD PROBABLY BE A CUBE/CUBELIST + but that would seem to require considerable inefficiency/repetition in the cells stats + so we need to think about this a bit more + + E.g, data['effective_radius_in_km']['forecast_period']['0.5']['T+0.5']['uk_ctrl_um'] + + The cube is one dimensional, with a dim coord of the given cell_attribute. + + Returns + ------- + + """ + if data is None: + return None + + plot_urls = {} + + # todo: parameterise this + root_folder = Path(output_folder) / varname + + for cell_attribute, time_groupings in data.items(): + plot_urls[cell_attribute] = {} + for time_grouping, thresholds in time_groupings.items(): + plot_urls[cell_attribute][time_grouping] = {} + for threshold, time_points in thresholds.items(): + plot_urls[cell_attribute][time_grouping][threshold] = {} + (root_folder / varname / cell_attribute / time_grouping / threshold).mkdir(parents=True, exist_ok=True) + for time_point, models in time_points.items(): + filename = root_folder / varname / cell_attribute / time_grouping / threshold / f'{time_point}.png' + plot_urls[cell_attribute][time_grouping][threshold][time_point] = str(filename) + + # make a new plot for this combination of cell_attribute, time_grouping, threshold & time_point + plt.figure(figsize=(10, 10)) + plt.title(f'{varname}, {cell_attribute}, {time_grouping}, threshold {threshold}, at {time_point}') + for model_name, cube in models.items(): + plt.plot(cube.coord(cell_attribute).points, cube.data) + + filename.parent.mkdir(parents=True, exist_ok=True) + plt.savefig(filename) + plt.close() + + make_cell_stats_page(root_folder / 'index.html', title=f'{varname}', plot_urls=json.dumps(plot_urls)) + + # todo: by convention we should return the cubes, but it's not cubes + # the cells stats produces a lot of stuff and it's currently passed as a dict + return None diff --git a/src/CSET/operators/read.py b/src/CSET/operators/read.py index 862396d68..cad4e8e22 100644 --- a/src/CSET/operators/read.py +++ b/src/CSET/operators/read.py @@ -996,3 +996,12 @@ def _lfric_forecast_period_standard_name_callback(cube: iris.cube.Cube): coord.standard_name = "forecast_period" except iris.exceptions.CoordinateNotFoundError: pass + + +def read_pickle(filename): + """Helper to read the cell stats output, until it returns cubes.""" + import pickle + + with open(filename, "rb") as f: + data = pickle.load(f) + return data diff --git a/src/CSET/operators/write.py b/src/CSET/operators/write.py index e67afd2c0..9a9cdc00c 100644 --- a/src/CSET/operators/write.py +++ b/src/CSET/operators/write.py @@ -66,3 +66,16 @@ def write_cube_to_nc( # Save the file as nc compliant (iris should handle this) iris.save(cube, filename, zlib=True) return cube + + +def write_pickle(data, filename): + """Helper to save the cell stats output, until it returns cubes.""" + import pickle + if not data: + return None + + Path(filename).parent.mkdir(exist_ok=True, parents=True) + with open(filename, "wb") as f: + pickle.dump(data, f) + + return data diff --git a/src/CSET/recipes/cell_statistics.yaml b/src/CSET/recipes/cell_statistics.yaml new file mode 100644 index 000000000..8c927a788 --- /dev/null +++ b/src/CSET/recipes/cell_statistics.yaml @@ -0,0 +1,38 @@ +category: Diagnostics +title: cell statistics plot +description: | + TBD. + +steps: + + # todo: ideally we'd load just once and have a list of variable names to extract + - operator: read.read_cubes + file_paths: $INPUT_PATHS + model_names: $MODEL_NAMES + constraint: + operator: constraints.generate_var_constraint + varname: $VARNAME + + # todo: check what the grid resolution should be + - operator: regrid.regrid_onto_xyspacing + xspacing: 0.5 + yspacing: 0.5 + method: Linear + +# dev speedup + - operator: write.write_pickle + filename: $OUTPUT_FOLDER/$VARNAME.pkl +# - operator: read.read_pickle +# filename: $OUTPUT_FOLDER/$VARNAME.pkl + + - operator: cell_statistics.caller_thing + +# dev speedup + - operator: write.write_pickle + filename: $OUTPUT_FOLDER/$VARNAME_stats.pkl +# - operator: read.read_pickle +# filename: $OUTPUT_FOLDER/$VARNAME_stats.pkl + + - operator: plot.plot_cell_stats_histograms + varname: $VARNAME + output_folder: $OUTPUT_FOLDER