Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 164 additions & 10 deletions src/CSET/operators/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


def time_aggregate(
cube: iris.cube.Cube,
cubes: iris.cube.Cube | iris.cube.CubeList,
method: str,
interval_iso: str,
**kwargs,
Expand Down Expand Up @@ -77,17 +77,89 @@ def time_aggregate(
# Convert interval format to whole hours.
interval = int(timedelta.total_seconds() / 3600)

# Add time categorisation overwriting hourly increment via lambda coord.
# https://scitools-iris.readthedocs.io/en/latest/_modules/iris/coord_categorisation.html
iris.coord_categorisation.add_categorised_coord(
cube, "interval", "time", lambda coord, cell: cell // interval * interval
)
new_cubelist = iris.cube.CubeList()
for cube in iter_maybe(cubes):
# Add time categorisation overwriting hourly increment via lambda coord.
# https://scitools-iris.readthedocs.io/en/latest/_modules/iris/coord_categorisation.html
iris.coord_categorisation.add_categorised_coord(
cube, "interval", "time", lambda coord, cell: cell // interval * interval
)

# Aggregate cube using supplied method.
aggregated_cube = cube.aggregated_by("interval", getattr(iris.analysis, method))
aggregated_cube.remove_coord("interval")

new_cubelist.append(aggregated_cube)

if len(new_cubelist) == 1:
return new_cubelist[0]
else:
return new_cubelist

def season_aggregate(
cubes: iris.cube.Cube | iris.cube.CubeList,
agg_coord : str,
method: str,
additional_percent: float = None,
**kwargs,
) -> iris.cube.Cube | iris.cube.CubeList:
"""Aggregate cube by its season coordinate.

Aggregates similar (stash) fields in a cube for the specified coordinate and
using the method supplied. The aggregated cube will keep the coordinate and
add a further coordinate with the aggregated end time points.

Examples are: 1. Generating hourly or 6-hourly precipitation accumulations
given an interval for the new time coordinate.

We use the isodate class to convert ISO 8601 durations into time intervals
for creating a new time coordinate for aggregation.

We use the lambda function to pass coord and interval into the callable
category function in add_categorised to allow users to define their own
sub-daily intervals for the new time coordinate.

# Aggregate cube using supplied method.
aggregated_cube = cube.aggregated_by("interval", getattr(iris.analysis, method))
aggregated_cube.remove_coord("interval")
return aggregated_cube
Arguments
---------
cube: iris.cube.Cube
Cube to aggregate and iterate over one dimension
method: str
Type of aggregate i.e. method: 'SUM', getattr creates
iris.analysis.SUM, etc.

Returns
-------
cube: iris.cube.Cube
Single variable but several methods of aggregation

Raises
------
ValueError
If the constraint doesn't produce a single cube containing a field.
"""
new_cubelist = iris.cube.CubeList()
for cube in iter_maybe(cubes):
coord_names = [coord.name() for coord in cube.coords()]
if agg_coord not in coord_names:
raise ValueError("Cube must have a {} coordinate.".format(agg_coord))

# Aggregate cube using supplied method.
if method == "PERCENTILE":
aggregated_cube = cube.aggregated_by(agg_coord,
getattr(iris.analysis, method),
percent=additional_percent)
else:
aggregated_cube = cube.aggregated_by(agg_coord, getattr(iris.analysis, method))

# Need to remove time coordinate to allow misc.difference to work
aggregated_cube.remove_coord("time")

new_cubelist.append(aggregated_cube)

if len(new_cubelist) == 1:
return new_cubelist[0]
else:
return new_cubelist

def ensure_aggregatable_across_cases(
cubes: iris.cube.Cube | iris.cube.CubeList,
Expand Down Expand Up @@ -214,3 +286,85 @@ def add_hour_coordinate(
return new_cubelist[0]
else:
return new_cubelist

def add_month_coordinate(
cubes: iris.cube.Cube | iris.cube.CubeList,
) -> iris.cube.Cube | iris.cube.CubeList:
"""Add a category coordinate of month to a Cube or CubeList.

Arguments
---------
cubes: iris.cube.Cube | iris.cube.CubeList
Cube of any variable that has a time coordinate.

Returns
-------
cube: iris.cube.Cube
A Cube with an additional auxiliary coordinate of month.

"""
new_cubelist = iris.cube.CubeList()
for cube in iter_maybe(cubes):
# Add a category coordinate of month into each cube.
iris.coord_categorisation.add_month(cube, "time", name="month")
new_cubelist.append(cube)

if len(new_cubelist) == 1:
return new_cubelist[0]
else:
return new_cubelist

def add_month_number_coordinate(
cubes: iris.cube.Cube | iris.cube.CubeList,
) -> iris.cube.Cube | iris.cube.CubeList:
"""Add a category coordinate of month_number to a Cube or CubeList.

Arguments
---------
cubes: iris.cube.Cube | iris.cube.CubeList
Cube of any variable that has a time coordinate.

Returns
-------
cube: iris.cube.Cube
A Cube with an additional auxiliary coordinate of month number.

"""
new_cubelist = iris.cube.CubeList()
for cube in iter_maybe(cubes):
# Add a category coordinate of month number into each cube.
iris.coord_categorisation.add_month_number(cube, "time", name="month_number")
new_cubelist.append(cube)

if len(new_cubelist) == 1:
return new_cubelist[0]
else:
return new_cubelist

def add_season_coordinate(
cubes: iris.cube.Cube | iris.cube.CubeList,
seasons: list[str] = ["djf", "mam", "jja", "son"],
) -> iris.cube.Cube | iris.cube.CubeList:
"""Add a category coordinate of season to a Cube or CubeList.

Arguments
---------
cubes: iris.cube.Cube | iris.cube.CubeList
Cube of any variable that has a time coordinate.

Returns
-------
cube: iris.cube.Cube
A Cube with an additional auxiliary coordinate of season.

"""
new_cubelist = iris.cube.CubeList()
for cube in iter_maybe(cubes):
# Add a category coordinate of month number into each cube.
iris.coord_categorisation.add_season(cube, "time", name="season", seasons=seasons)
new_cubelist.append(cube)

if len(new_cubelist) == 1:
return new_cubelist[0]
else:
return new_cubelist
90 changes: 48 additions & 42 deletions src/CSET/operators/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def difference(cubes: CubeList):
--------
>>> model_diff = misc.difference(temperature_model_A, temperature_model_B)

"""
"""
if len(cubes) != 2:
raise ValueError("cubes should contain exactly 2 cubes.")
base: Cube = cubes.extract_cube(iris.AttributeConstraint(cset_comparison_base=1))
Expand All @@ -286,49 +286,55 @@ def difference(cubes: CubeList):
base_lat_name, base_lon_name = get_cube_yxcoordname(base)
other_lat_name, other_lon_name = get_cube_yxcoordname(other)

# Check latitude, longitude shape the same. Not comparing points, as these
# might slightly differ due to rounding errors (especially in future if we
# are regridding cubes to common resolutions).
# An exception has been included here to deal with some variables on different
# grid staggering (cell center vs edge of cell) when comparing UM to LFRic,
# depending on whether they are on a B grid or Arakawa staggering. Note we do not
# generally apply regridding to any variable where dimension sizes do not
# match, to make sure we are using appropriate regridding technique. In this
# case for winds, a Linear regridding is appropriate (smooth variable).
if (
base.coord(base_lat_name).shape != other.coord(other_lat_name).shape
or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape
base.coord(base_lat_name).points.size == 1 and other.coord(other_lat_name).points.size == 1
and base.coord(base_lon_name).points.size == 1 and other.coord(other_lon_name).points.size == 1
):
if base.long_name in [
"eastward_wind_at_10m",
"northward_wind_at_10m",
"northward_wind_at_cell_centres",
"eastward_wind_at_cell_centres",
"zonal_wind_at_pressure_levels",
"meridional_wind_at_pressure_levels",
"potential_vorticity_at_pressure_levels",
"vapour_specific_humidity_at_pressure_levels_for_climate_averaging",
]:
base = regrid_onto_cube(base, other, method="Linear")
else:
raise ValueError(
f"Cubes should have the same shape, got {base.coord(base_lat_name)} {other.coord(other_lat_name)}"
)

def is_increasing(sequence: list) -> bool:
"""Determine the direction of an ordered sequence.

Returns "increasing" or "decreasing" depending on whether the sequence
is going up or down. The sequence should already be monotonic, with no
duplicate values. An iris DimCoord's points fulfills this criteria.
"""
return sequence[0] < sequence[1]

# Figure out if we are comparing between UM and LFRic; flip array if so.
base_lat_direction = is_increasing(base.coord(base_lat_name).points)
other_lat_direction = is_increasing(other.coord(other_lat_name).points)
if base_lat_direction != other_lat_direction:
other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other))
print("Dealing with timeseries data")
else:
# Check latitude, longitude shape the same. Not comparing points, as these
# might slightly differ due to rounding errors (especially in future if we
# are regridding cubes to common resolutions).
# An exception has been included here to deal with some variables on different
# grid staggering (cell center vs edge of cell) when comparing UM to LFRic,
# depending on whether they are on a B grid or Arakawa staggering. Note we do not
# generally apply regridding to any variable where dimension sizes do not
# match, to make sure we are using appropriate regridding technique. In this
# case for winds, a Linear regridding is appropriate (smooth variable).
if (
base.coord(base_lat_name).shape != other.coord(other_lat_name).shape
or base.coord(base_lon_name).shape != other.coord(other_lon_name).shape
):
if base.long_name in [
"eastward_wind_at_10m",
"northward_wind_at_10m",
"northward_wind_at_cell_centres",
"eastward_wind_at_cell_centres",
"zonal_wind_at_pressure_levels",
"meridional_wind_at_pressure_levels",
"potential_vorticity_at_pressure_levels",
"vapour_specific_humidity_at_pressure_levels_for_climate_averaging",
]:
base = regrid_onto_cube(base, other, method="Linear")
else:
raise ValueError(
f"Cubes should have the same shape, got {base.coord(base_lat_name)} {other.coord(other_lat_name)}"
)

def is_increasing(sequence: list) -> bool:
"""Determine the direction of an ordered sequence.

Returns "increasing" or "decreasing" depending on whether the sequence
is going up or down. The sequence should already be monotonic, with no
duplicate values. An iris DimCoord's points fulfills this criteria.
"""
return sequence[0] < sequence[1]

# Figure out if we are comparing between UM and LFRic; flip array if so.
base_lat_direction = is_increasing(base.coord(base_lat_name).points)
other_lat_direction = is_increasing(other.coord(other_lat_name).points)
if base_lat_direction != other_lat_direction:
other.data = np.flip(other.data, other.coord(other_lat_name).cube_dims(other))

# Extract just common time points.
base, other = _extract_common_time_points(base, other)
Expand Down
Loading