diff --git a/src/CSET/operators/__init__.py b/src/CSET/operators/__init__.py index ef40fdf22..2f9b379aa 100644 --- a/src/CSET/operators/__init__.py +++ b/src/CSET/operators/__init__.py @@ -38,6 +38,7 @@ mesoscale, misc, plot, + power_spectrum, read, regrid, transect, @@ -61,6 +62,7 @@ "mesoscale", "misc", "plot", + "power_spectrum", "read", "regrid", "transect", diff --git a/src/CSET/operators/plot.py b/src/CSET/operators/plot.py index ab0415558..3d689ae93 100644 --- a/src/CSET/operators/plot.py +++ b/src/CSET/operators/plot.py @@ -34,7 +34,6 @@ import matplotlib.colors as mcolors import matplotlib.pyplot as plt import numpy as np -import scipy.fft as fft from iris.cube import Cube from markdown_it import MarkdownIt @@ -747,6 +746,138 @@ def _plot_and_save_line_series( plt.close(fig) +def _plot_and_save_line_power_spectrum( + cubes: iris.cube.CubeList, + coords: list[iris.coords.Coord], + ensemble_coord: str, + filename: str, + title: str, + **kwargs, +): + """Plot and save a 1D line series. + + Parameters + ---------- + cubes: Cube or CubeList + Cube or CubeList containing the cubes to plot on the y-axis. + coords: list[Coord] + Coordinates to plot on the x-axis, one per cube. + ensemble_coord: str + Ensemble coordinate in the cube. + filename: str + Filename of the plot to write. + title: str + Plot title. + """ + fig = plt.figure(figsize=(10, 10), facecolor="w", edgecolor="k") + + model_colors_map = _get_model_colors_map(cubes) + + # Store min/max ranges. + y_levels = [] + + # Check match-up across sequence coords gives consistent sizes + _validate_cubes_coords(cubes, coords) + + if "Spectrum" in title: + line_marker = None + line_width = 1 + else: + line_marker = "o" + line_width = 3 + + for cube, coord in zip(cubes, coords, strict=True): + label = None + color = "black" + if model_colors_map: + label = cube.attributes.get("model_name") + color = model_colors_map.get(label) + for cube_slice in cube.slices_over(ensemble_coord): + # Label with (control) if part of an ensemble or not otherwise. + if cube_slice.coord(ensemble_coord).points == [0]: + iplt.plot( + coord, + cube_slice, + color=color, + marker=line_marker, + ls="-", + lw=line_width, + label=f"{label} (control)" + if len(cube.coord(ensemble_coord).points) > 1 + else label, + ) + # Label with (perturbed) if part of an ensemble and not the control. + else: + iplt.plot( + coord, + cube_slice, + color=color, + ls="-", + lw=1.5, + alpha=0.75, + label=f"{label} (member)", + ) + + # Calculate the global min/max if multiple cubes are given. + _, levels, _ = _colorbar_map_levels(cube, axis="y") + if levels is not None: + y_levels.append(min(levels)) + y_levels.append(max(levels)) + + # Get the current axes. + ax = plt.gca() + + # Add some labels and tweak the style. + # check if cubes[0] works for single cube if not CubeList + if "Spectrum" in title: + title = f"{title}\n [{coord.units.title(coord.points[0])}]" + ax.set_title(title, fontsize=16) + ax.set_xlabel("Wavenumber", fontsize=14) + ax.set_ylabel("Power", fontsize=14) + ax.tick_params(axis="both", labelsize=12) + else: + ax.set_xlabel(f"{coords[0].name()} / {coords[0].units}", fontsize=14) + ax.set_ylabel(f"{cubes[0].name()} / {cubes[0].units}", fontsize=14) + ax.set_title(title, fontsize=16) + + ax.ticklabel_format(axis="y", useOffset=False) + ax.tick_params(axis="x", labelrotation=15) + ax.tick_params(axis="both", labelsize=12) + + # Set y limits to global min and max, autoscale if colorbar doesn't exist. + if "Spectrum" in title: + # Set log-log scale + ax.set_xscale("log") + ax.set_yscale("log") + + elif y_levels: + ax.set_ylim(min(y_levels), max(y_levels)) + # Add zero line. + if min(y_levels) < 0.0 and max(y_levels) > 0.0: + ax.axhline(y=0, xmin=0, xmax=1, ls="-", color="grey", lw=2) + logging.debug( + "Line plot with y-axis limits %s-%s", min(y_levels), max(y_levels) + ) + else: + ax.autoscale() + + # Add gridlines + ax.grid(linestyle="--", color="grey", linewidth=1) + # Ientify unique labels for legend + handles = list( + { + label: handle + for (handle, label) in zip(*ax.get_legend_handles_labels(), strict=True) + }.values() + ) + ax.legend(handles=handles, loc="best", ncol=1, frameon=False, fontsize=16) + + # Save plot. + fig.savefig(filename, bbox_inches="tight", dpi=_get_plot_resolution()) + logging.info("Saved line plot to %s", filename) + plt.close(fig) + + def _plot_and_save_vertical_line_series( cubes: iris.cube.CubeList, coords: list[iris.coords.Coord], @@ -1342,192 +1473,6 @@ def _plot_and_save_scattermap_plot( plt.close(fig) -def _plot_and_save_power_spectrum_series( - cubes: iris.cube.Cube | iris.cube.CubeList, - filename: str, - title: str, - **kwargs, -): - """Plot and save a power spectrum series. - - Parameters - ---------- - cubes: Cube or CubeList - 2 dimensional Cube or CubeList of the data to plot as power spectrum. - filename: str - Filename of the plot to write. - title: str - Plot title. - """ - fig = plt.figure(figsize=(10, 10), facecolor="w", edgecolor="k") - ax = plt.gca() - - model_colors_map = _get_model_colors_map(cubes) - - for cube in iter_maybe(cubes): - # Calculate power spectrum - - # Extract time coordinate and convert to datetime - time_coord = cube.coord("time") - time_points = time_coord.units.num2date(time_coord.points) - - # Choose one time point (e.g., the first one) - target_time = time_points[0] - - # Bind target_time inside the lambda using a default argument - time_constraint = iris.Constraint( - time=lambda cell, target_time=target_time: cell.point == target_time - ) - - cube = cube.extract(time_constraint) - - if cube.ndim == 2: - cube_3d = cube.data[np.newaxis, :, :] - logging.debug("Adding in new axis for a 2 dimensional cube.") - elif cube.ndim == 3: - cube_3d = cube.data - else: - raise ValueError("Cube dimensions unsuitable for power spectra code") - raise ValueError( - f"Cube is {cube.ndim} dimensional. Cube should be 2 or 3 dimensional." - ) - - # Calculate spectra - ps_array = _DCT_ps(cube_3d) - - ps_cube = iris.cube.Cube( - ps_array, - long_name="power_spectra", - ) - - ps_cube.attributes["model_name"] = cube.attributes.get("model_name") - - # Create a frequency/wavelength array for coordinate - ps_len = ps_cube.data.shape[1] - freqs = np.arange(1, ps_len + 1) - freq_coord = iris.coords.DimCoord(freqs, long_name="frequency", units="1") - - # Convert datetime to numeric time using original units - numeric_time = time_coord.units.date2num(time_points) - # Create a new DimCoord with numeric time - new_time_coord = iris.coords.DimCoord( - numeric_time, standard_name="time", units=time_coord.units - ) - - # Add time and frequency coordinate to spectra cube. - ps_cube.add_dim_coord(new_time_coord.copy(), 0) - ps_cube.add_dim_coord(freq_coord.copy(), 1) - - # Extract data from the cube - frequency = ps_cube.coord("frequency").points - power_spectrum = ps_cube.data - - label = None - color = "black" - if model_colors_map: - label = ps_cube.attributes.get("model_name") - color = model_colors_map[label] - ax.plot(frequency, power_spectrum[0], color=color, label=label) - - # Add some labels and tweak the style. - ax.set_title(title, fontsize=16) - ax.set_xlabel("Wavenumber", fontsize=14) - ax.set_ylabel("Power", fontsize=14) - ax.tick_params(axis="both", labelsize=12) - - # Set log-log scale - ax.set_xscale("log") - ax.set_yscale("log") - - # Overlay grid-lines onto power spectrum plot. - ax.grid(linestyle="--", color="grey", linewidth=1) - if model_colors_map: - ax.legend(loc="best", ncol=1, frameon=False, fontsize=16) - - # Save plot. - fig.savefig(filename, bbox_inches="tight", dpi=_get_plot_resolution()) - logging.info("Saved line plot to %s", filename) - plt.close(fig) - - -def _plot_and_save_postage_stamp_power_spectrum_series( - cube: iris.cube.Cube, - filename: str, - title: str, - stamp_coordinate: str, - **kwargs, -): - """Plot and save postage (ensemble members) stamps for a power spectrum series. - - Parameters - ---------- - cube: Cube - 2 dimensional Cube of the data to plot as power spectrum. - filename: str - Filename of the plot to write. - title: str - Plot title. - stamp_coordinate: str - Coordinate that becomes different plots. - """ - # Use the smallest square grid that will fit the members. - grid_size = int(math.ceil(math.sqrt(len(cube.coord(stamp_coordinate).points)))) - - fig = plt.figure(figsize=(10, 10), facecolor="w", edgecolor="k") - # Make a subplot for each member. - for member, subplot in zip( - cube.slices_over(stamp_coordinate), range(1, grid_size**2 + 1), strict=False - ): - # Implicit interface is much easier here, due to needing to have the - # cartopy GeoAxes generated. - plt.subplot(grid_size, grid_size, subplot) - - frequency = member.coord("frequency").points - power_spectrum = member.data - - ax = plt.gca() - ax.plot(frequency, power_spectrum[0]) - ax.set_title(f"Member #{member.coord(stamp_coordinate).points[0]}") - - # Overall figure title. - fig.suptitle(title, fontsize=16) - - fig.savefig(filename, bbox_inches="tight", dpi=_get_plot_resolution()) - logging.info("Saved power spectra postage stamp plot to %s", filename) - plt.close(fig) - - -def _plot_and_save_postage_stamps_in_single_plot_power_spectrum_series( - cube: iris.cube.Cube, - filename: str, - title: str, - stamp_coordinate: str, - **kwargs, -): - fig, ax = plt.subplots(figsize=(10, 10), facecolor="w", edgecolor="k") - ax.set_title(title, fontsize=16) - ax.set_xlabel(f"{cube.name()} / {cube.units}", fontsize=14) - ax.set_ylabel("Power", fontsize=14) - # Loop over all slices along the stamp_coordinate - for member in cube.slices_over(stamp_coordinate): - frequency = member.coord("frequency").points - power_spectrum = member.data - ax.plot( - frequency, - power_spectrum[0], - label=f"Member #{member.coord(stamp_coordinate).points[0]}", - ) - - # Add a legend - ax.legend(fontsize=16) - - # Save the figure to a file - plt.savefig(filename, bbox_inches="tight", dpi=_get_plot_resolution()) - - # Close the figure - plt.close(fig) - - def _spatial_plot( method: Literal["contourf", "pcolormesh"], cube: iris.cube.Cube, @@ -2114,6 +2059,7 @@ def plot_line_series( cube: iris.cube.Cube | iris.cube.CubeList, filename: str = None, series_coordinate: str = "time", + sequence_coordinate: str = "time", # line_coordinate: str = "realization", **kwargs, ) -> iris.cube.Cube | iris.cube.CubeList: @@ -2148,12 +2094,12 @@ def plot_line_series( If the cube isn't a Cube or CubeList. """ # Ensure we have a name for the plot file. - title = get_recipe_metadata().get("title", "Untitled") + recipe_title = get_recipe_metadata().get("title", "Untitled") if filename is None: - filename = slugify(title) + filename = slugify(recipe_title) - # Add file extension. + # Add file extension. This may be overwritten later on. plot_filename = f"{filename.rsplit('.', 1)[0]}.png" num_models = _get_num_models(cube) @@ -2173,14 +2119,98 @@ def plot_line_series( if cube.ndim > 2 or not cube.coords("realization"): raise ValueError("Cube must be 1D or 2D with a realization coordinate.") - # Do the actual plotting. - _plot_and_save_line_series(cubes, coords, "realization", plot_filename, title) + plot_index = [] + if series_coordinate == "time": + # Do the actual plotting for timeseries. + _plot_and_save_line_series( + cubes, coords, "realization", plot_filename, recipe_title + ) - # Add list of plots to plot metadata. - plot_index = _append_to_plot_index([plot_filename]) + plot_index.append(plot_filename) + else: + # If series coordinate is not time, for example power spectra with series + # coordinate frequency/wavelength. + # If several power spectra are plotted with time as sequence_coordinate for the + # time slider option. + for cube in cubes: + try: + cube.coord(sequence_coordinate) + except iris.exceptions.CoordinateNotFoundError as err: + raise ValueError( + f"Cube must have a {sequence_coordinate} coordinate." + ) from err + + if num_models == 1: + cube_iterables = cubes[0].slices_over(sequence_coordinate) + else: + all_points = sorted( + set( + itertools.chain.from_iterable( + cb.coord(sequence_coordinate).points for cb in cubes + ) + ) + ) + all_slices = list( + itertools.chain.from_iterable( + cb.slices_over(sequence_coordinate) for cb in cubes + ) + ) + # Matched slices (matched by seq coord point; it may happen that + # evaluated models do not cover the same seq coord range, hence matching + # necessary) + cube_iterables = [ + iris.cube.CubeList( + s + for s in all_slices + if s.coord(sequence_coordinate).points[0] == point + ) + for point in all_points + ] + + # plot_index = [] + nplot = np.size(cube.coord(sequence_coordinate).points) + + # Create a plot for each value of the sequence coordinate. Allowing for + # multiple cubes in a CubeList to be plotted in the same plot for similar + # sequence values. Passing a CubeList into the internal plotting function + # for similar values of the sequence coordinate. cube_slice can be an + # iris.cube.Cube or an iris.cube.CubeList. + for cube_slice in cube_iterables: + # Normalize cube_slice to a list of cubes + if isinstance(cube_slice, iris.cube.CubeList): + cubes = list(cube_slice) + elif isinstance(cube_slice, iris.cube.Cube): + cubes = [cube_slice] + else: + raise TypeError(f"Expected Cube or CubeList, got {type(cube_slice)}") + + single_cube = cubes[0] + + # Use sequence value so multiple sequences can merge. + sequence_value = single_cube.coord(sequence_coordinate).points[0] + plot_filename = f"{filename.rsplit('.', 1)[0]}_{sequence_value}.png" + coord = single_cube.coord(sequence_coordinate) + + # Format the coordinate value in a unit appropriate way. + title = f"{recipe_title}\n [{coord.units.title(coord.points[0])}]" + + # Use sequence (e.g. time) bounds if plotting single non-sequence outputs + if nplot == 1 and coord.has_bounds: + if np.size(coord.bounds) > 1: + title = f"{recipe_title}\n [{coord.units.title(coord.bounds[0][0])} to {coord.units.title(coord.bounds[0][1])}]" + + # Do the actual plotting. + _plot_and_save_line_power_spectrum( + cubes, coords, "realization", plot_filename, title + ) + + plot_index.append(plot_filename) + # not sure what the indent should be here (4 or 8 spaces) + # append plot to list of plots + complete_plot_index = _append_to_plot_index(plot_index) # Make a page to display the plots. - _make_plot_html_page(plot_index) + _make_plot_html_page(complete_plot_index) return cube @@ -2822,251 +2852,3 @@ def plot_histogram_series( _make_plot_html_page(complete_plot_index) return cubes - - -def plot_power_spectrum_series( - cubes: iris.cube.Cube | iris.cube.CubeList, - filename: str = None, - sequence_coordinate: str = "time", - stamp_coordinate: str = "realization", - single_plot: bool = False, - **kwargs, -) -> iris.cube.Cube | iris.cube.CubeList: - """Plot a power spectrum plot for each vertical level provided. - - A power spectrum plot can be plotted, but if the sequence_coordinate (i.e. time) - is present then a sequence of plots will be produced using the time slider - functionality to scroll through power spectrum against time. If a - stamp_coordinate is present then postage stamp plots will be produced. If - stamp_coordinate and single_plot is True, all postage stamp plots will be - plotted in a single plot instead of separate postage stamp plots. - - Parameters - ---------- - cubes: Cube | iris.cube.CubeList - Iris cube or CubeList of the data to plot. It should have a single dimension other - than the stamp coordinate. - The cubes should cover the same phenomenon i.e. all cubes contain temperature data. - We do not support different data such as temperature and humidity in the same CubeList for plotting. - filename: str, optional - Name of the plot to write, used as a prefix for plot sequences. Defaults - to the recipe name. - sequence_coordinate: str, optional - Coordinate about which to make a plot sequence. Defaults to ``"time"``. - This coordinate must exist in the cube and will be used for the time - slider. - stamp_coordinate: str, optional - Coordinate about which to plot postage stamp plots. Defaults to - ``"realization"``. - single_plot: bool, optional - If True, all postage stamp plots will be plotted in a single plot. If - False, each postage stamp plot will be plotted separately. Is only valid - if stamp_coordinate exists and has more than a single point. - - Returns - ------- - iris.cube.Cube | iris.cube.CubeList - The original Cube or CubeList (so further operations can be applied). - Plotted data. - - Raises - ------ - ValueError - If the cube doesn't have the right dimensions. - TypeError - If the cube isn't a Cube or CubeList. - """ - recipe_title = get_recipe_metadata().get("title", "Untitled") - - cubes = iter_maybe(cubes) - # Ensure we have a name for the plot file. - if filename is None: - filename = slugify(recipe_title) - - # Internal plotting function. - plotting_func = _plot_and_save_power_spectrum_series - - num_models = _get_num_models(cubes) - - _validate_cube_shape(cubes, num_models) - - # If several power spectra are plotted with time as sequence_coordinate for the - # time slider option. - for cube in cubes: - try: - cube.coord(sequence_coordinate) - except iris.exceptions.CoordinateNotFoundError as err: - raise ValueError( - f"Cube must have a {sequence_coordinate} coordinate." - ) from err - - # Make postage stamp plots if stamp_coordinate exists and has more than a - # single point. If single_plot is True: - # -- all postage stamp plots will be plotted in a single plot instead of - # separate postage stamp plots. - # -- model names (hidden in cube attrs) are ignored, that is stamp plots are - # produced per single model only - if num_models == 1: - if ( - stamp_coordinate in [c.name() for c in cubes[0].coords()] - and cubes[0].coord(stamp_coordinate).shape[0] > 1 - ): - if single_plot: - plotting_func = ( - _plot_and_save_postage_stamps_in_single_plot_power_spectrum_series - ) - else: - plotting_func = _plot_and_save_postage_stamp_power_spectrum_series - cube_iterables = cubes[0].slices_over(sequence_coordinate) - else: - all_points = sorted( - set( - itertools.chain.from_iterable( - cb.coord(sequence_coordinate).points for cb in cubes - ) - ) - ) - all_slices = list( - itertools.chain.from_iterable( - cb.slices_over(sequence_coordinate) for cb in cubes - ) - ) - # Matched slices (matched by seq coord point; it may happen that - # evaluated models do not cover the same seq coord range, hence matching - # necessary) - cube_iterables = [ - iris.cube.CubeList( - s for s in all_slices if s.coord(sequence_coordinate).points[0] == point - ) - for point in all_points - ] - - plot_index = [] - nplot = np.size(cube.coord(sequence_coordinate).points) - # Create a plot for each value of the sequence coordinate. Allowing for - # multiple cubes in a CubeList to be plotted in the same plot for similar - # sequence values. Passing a CubeList into the internal plotting function - # for similar values of the sequence coordinate. cube_slice can be an - # iris.cube.Cube or an iris.cube.CubeList. - for cube_slice in cube_iterables: - single_cube = cube_slice - if isinstance(cube_slice, iris.cube.CubeList): - single_cube = cube_slice[0] - - # Use sequence value so multiple sequences can merge. - sequence_value = single_cube.coord(sequence_coordinate).points[0] - plot_filename = f"{filename.rsplit('.', 1)[0]}_{sequence_value}.png" - coord = single_cube.coord(sequence_coordinate) - # Format the coordinate value in a unit appropriate way. - title = f"{recipe_title}\n [{coord.units.title(coord.points[0])}]" - # Use sequence (e.g. time) bounds if plotting single non-sequence outputs - if nplot == 1 and coord.has_bounds: - if np.size(coord.bounds) > 1: - title = f"{recipe_title}\n [{coord.units.title(coord.bounds[0][0])} to {coord.units.title(coord.bounds[0][1])}]" - # Do the actual plotting. - plotting_func( - cube_slice, - filename=plot_filename, - stamp_coordinate=stamp_coordinate, - title=title, - ) - plot_index.append(plot_filename) - - # Add list of plots to plot metadata. - complete_plot_index = _append_to_plot_index(plot_index) - - # Make a page to display the plots. - _make_plot_html_page(complete_plot_index) - - return cubes - - -def _DCT_ps(y_3d): - """Calculate power spectra for regional domains. - - Parameters - ---------- - y_3d: 3D array - 3 dimensional array to calculate spectrum for. - (2D field data with 3rd dimension of time) - - Returns: ps_array - Array of power spectra values calculated for input field (for each time) - - Method for regional domains: - Calculate power spectra over limited area domain using Discrete Cosine Transform (DCT) - as described in Denis et al 2002 [Denis_etal_2002]_. - - References - ---------- - .. [Denis_etal_2002] Bertrand Denis, Jean Côté and René Laprise (2002) - "Spectral Decomposition of Two-Dimensional Atmospheric Fields on - Limited-Area Domains Using the Discrete Cosine Transform (DCT)" - Monthly Weather Review, Vol. 130, 1812-1828 - doi: https://doi.org/10.1175/1520-0493(2002)130<1812:SDOTDA>2.0.CO;2 - """ - Nt, Ny, Nx = y_3d.shape - - # Max coefficient - Nmin = min(Nx - 1, Ny - 1) - - # Create alpha matrix (of wavenumbers) - alpha_matrix = _create_alpha_matrix(Ny, Nx) - - # Prepare output array - ps_array = np.zeros((Nt, Nmin)) - - # Loop over time to get spectrum for each time. - for t in range(Nt): - y_2d = y_3d[t] - - # Apply 2D DCT to transform y_3d[t] from physical space to spectral space. - # fkk is a 2D array of DCT coefficients, representing the amplitudes of - # cosine basis functions at different spatial frequencies. - - # normalise spectrum to allow comparison between models. - fkk = fft.dctn(y_2d, norm="ortho") - - # Normalise fkk - fkk = fkk / np.sqrt(Ny * Nx) - - # calculate variance of spectral coefficient - sigma_2 = fkk**2 / Nx / Ny - - # Group ellipses of alphas into the same wavenumber k/Nmin - for k in range(1, Nmin + 1): - alpha = k / Nmin - alpha_p1 = (k + 1) / Nmin - - # Sum up elements matching k - mask_k = np.where((alpha_matrix >= alpha) & (alpha_matrix < alpha_p1)) - ps_array[t, k - 1] = np.sum(sigma_2[mask_k]) - - return ps_array - - -def _create_alpha_matrix(Ny, Nx): - """Construct an array of 2D wavenumbers from 2D wavenumber pair. - - Parameters - ---------- - Ny, Nx: - Dimensions of the 2D field for which the power spectra is calculated. Used to - create the array of 2D wavenumbers. Each Ny, Nx pair is associated with a - single-scale parameter. - - Returns: alpha_matrix - normalisation of 2D wavenumber axes, transforming the spectral domain into - an elliptic coordinate system. - - """ - # Create x_indices: each row is [1, 2, ..., Nx] - x_indices = np.tile(np.arange(1, Nx + 1), (Ny, 1)) - - # Create y_indices: each column is [1, 2, ..., Ny] - y_indices = np.tile(np.arange(1, Ny + 1).reshape(Ny, 1), (1, Nx)) - - # Compute alpha_matrix - alpha_matrix = np.sqrt((x_indices**2) / Nx**2 + (y_indices**2) / Ny**2) - - return alpha_matrix diff --git a/src/CSET/operators/power_spectrum.py b/src/CSET/operators/power_spectrum.py new file mode 100644 index 000000000..6505761f1 --- /dev/null +++ b/src/CSET/operators/power_spectrum.py @@ -0,0 +1,214 @@ +# © Crown copyright, Met Office (2022-2025) and CSET contributors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Operators for calculating power spectra.""" + +import logging + +import iris +import iris.coords +import iris.cube +import numpy as np +import scipy.fft as fft + + +def calculate_power_spectrum( + cube: iris.cube.Cube, +) -> iris.cube.Cube | iris.cube.CubeList: + """Calculate power spectrum plot for 1 vertical level at 1 time. + + Parameters + ---------- + cubes: Cube + 2 dimensional Cube of the data to plot as power spectrum. + It should have a single dimension other than the stamp coordinate. + The cubes should cover the same phenomenon i.e. all cubes contain temperature data. + We do not support different data such as temperature and humidity in the same CubeList for plotting. + plev: int + The pressure level of which to compute the power spectrum on. The function will search to + see if this exists and if not, will raise an exception. + + Returns + ------- + iris.cube.Cube + The power spectra of the data. + To be plotted and aggregation performed after. + + Raises + ------ + ValueError + If the cube doesn't have the right dimensions. + TypeError + If the cube isn't a Cube. + """ + cube = cube[0] + + ## Get array index for user specified pressure level. + # print('Levels ',cube.coord("pressure").points) + # if plev not in cube.coord("pressure").points: + # raise IndexError(f"Can't find plev {plev} in {cube.coord('pressure').points}") + # + ## Find corresponding pressure level index + # plev_idx = np.where(cube.coord("pressure").points == plev)[0][0] + + # Extract time coordinate and convert to datetime + time_coord = cube.coord("time") + time_points = time_coord.units.num2date(time_coord.points) + + if cube.ndim == 2: + cube_3d = cube.data[np.newaxis, :, :] + logging.debug("Adding in new axis for a 2 dimensional cube.") + elif cube.ndim == 3: + cube_3d = cube.data + else: + raise ValueError("Cube dimensions unsuitable for power spectra code") + raise ValueError( + f"Cube is {cube.ndim} dimensional. Cube should be 2 or 3 dimensional." + ) + + # Calculate spectrum + ps_array = _DCT_ps(cube_3d) + + ps_cube = iris.cube.Cube( + ps_array, + long_name="power_spectra", + ) + + # Create a frequency/wavelength array for new coordinate + ps_len = ps_cube.data.shape[1] + freqs = np.arange(1, ps_len + 1) + + # Create a new DimCoord with frequency + freq_coord = iris.coords.DimCoord(freqs, long_name="frequency", units="1") + + # Convert datetime to numeric time using original units + numeric_time = time_coord.units.date2num(time_points) + + # Create a new DimCoord with numeric time + new_time_coord = iris.coords.DimCoord( + numeric_time, standard_name="time", units=time_coord.units + ) + + # Create a new cube + new_data = ps_cube.data.copy() + new_cube = iris.cube.Cube(new_data, long_name=ps_cube.long_name) + + # Add time and frequency coordinates + new_cube.add_dim_coord(new_time_coord, 0) # time axis + new_cube.add_dim_coord(freq_coord, 1) # frequency axis + + # Power spectrum cube + ps_cube = new_cube + + # Ensure cube has a realisation coordinate by creating and adding to cube + realization_coord = iris.coords.AuxCoord(0, standard_name="realization", units="1") + ps_cube.add_aux_coord(realization_coord) + + # cubes = [ps_cube[0]] # list of cubes + # coords = [ps_cube[0].coord('frequency')] # matching coordinate + + # series_coordinate="frequency" + + return ps_cube + + +def _DCT_ps(y_3d): + """Calculate power spectra for regional domains. + + Parameters + ---------- + y_3d: 3D array + 3 dimensional array to calculate spectrum for. + (2D field data with 3rd dimension of time) + + Returns: ps_array + Array of power spectra values calculated for input field (for each time) + + Method for regional domains: + Calculate power spectra over limited area domain using Discrete Cosine Transform (DCT) + as described in Denis et al 2002 [Denis_etal_2002]_. + + References + ---------- + .. [Denis_etal_2002] Bertrand Denis, Jean Côté and René Laprise (2002) + "Spectral Decomposition of Two-Dimensional Atmospheric Fields on + Limited-Area Domains Using the Discrete Cosine Transform (DCT)" + Monthly Weather Review, Vol. 130, 1812-1828 + doi: https://doi.org/10.1175/1520-0493(2002)130<1812:SDOTDA>2.0.CO;2 + """ + Nt, Ny, Nx = y_3d.shape + + # Max coefficient + Nmin = min(Nx - 1, Ny - 1) + + # Create alpha matrix (of wavenumbers) + alpha_matrix = _create_alpha_matrix(Ny, Nx) + + # Prepare output array + ps_array = np.zeros((Nt, Nmin)) + + # Loop over time to get spectrum for each time. + for t in range(Nt): + y_2d = y_3d[t] + + # Apply 2D DCT to transform y_3d[t] from physical space to spectral space. + # fkk is a 2D array of DCT coefficients, representing the amplitudes of + # cosine basis functions at different spatial frequencies. + + # normalise spectrum to allow comparison between models. + fkk = fft.dctn(y_2d, norm="ortho") + + # Normalise fkk + fkk = fkk / np.sqrt(Ny * Nx) + + # calculate variance of spectral coefficient + sigma_2 = fkk**2 / Nx / Ny + + # Group ellipses of alphas into the same wavenumber k/Nmin + for k in range(1, Nmin + 1): + alpha = k / Nmin + alpha_p1 = (k + 1) / Nmin + + # Sum up elements matching k + mask_k = np.where((alpha_matrix >= alpha) & (alpha_matrix < alpha_p1)) + ps_array[t, k - 1] = np.sum(sigma_2[mask_k]) + + return ps_array + + +def _create_alpha_matrix(Ny, Nx): + """Construct an array of 2D wavenumbers from 2D wavenumber pair. + + Parameters + ---------- + Ny, Nx: + Dimensions of the 2D field for which the power spectra is calculated. Used to + create the array of 2D wavenumbers. Each Ny, Nx pair is associated with a + single-scale parameter. + + Returns: alpha_matrix + normalisation of 2D wavenumber axes, transforming the spectral domain into + an elliptic coordinate system. + + """ + # Create x_indices: each row is [1, 2, ..., Nx] + x_indices = np.tile(np.arange(1, Nx + 1), (Ny, 1)) + + # Create y_indices: each column is [1, 2, ..., Ny] + y_indices = np.tile(np.arange(1, Ny + 1).reshape(Ny, 1), (1, Nx)) + + # Compute alpha_matrix + alpha_matrix = np.sqrt((x_indices**2) / Nx**2 + (y_indices**2) / Ny**2) + + return alpha_matrix diff --git a/src/CSET/recipes/level_fields/generic_level_power_spectrum_series.yaml b/src/CSET/recipes/level_fields/generic_level_power_spectrum_series.yaml index 96a0abd0d..980755215 100644 --- a/src/CSET/recipes/level_fields/generic_level_power_spectrum_series.yaml +++ b/src/CSET/recipes/level_fields/generic_level_power_spectrum_series.yaml @@ -41,11 +41,10 @@ steps: subarea_type: $SUBAREA_TYPE subarea_extent: $SUBAREA_EXTENT - - operator: plot.plot_power_spectrum_series - sequence_coordinate: $SEQUENCE - # stamp_coordinate and single_plot optional and only required for ensemble data - stamp_coordinate: "realization" - single_plot: False + - operator: power_spectrum.calculate_power_spectrum + + - operator: plot.plot_line_series + series_coordinate: "frequency" - operator: write.write_cube_to_nc overwrite: True diff --git a/src/CSET/recipes/surface_fields/generic_surface_power_spectrum_series.yaml b/src/CSET/recipes/surface_fields/generic_surface_power_spectrum_series.yaml index 596c20176..12178db01 100644 --- a/src/CSET/recipes/surface_fields/generic_surface_power_spectrum_series.yaml +++ b/src/CSET/recipes/surface_fields/generic_surface_power_spectrum_series.yaml @@ -38,8 +38,10 @@ steps: subarea_type: $SUBAREA_TYPE subarea_extent: $SUBAREA_EXTENT + - operator: power_spectrum.calculate_power_spectrum + + - operator: plot.plot_line_series + series_coordinate: "frequency" + - operator: write.write_cube_to_nc overwrite: True - - - operator: plot.plot_power_spectrum_series - sequence_coordinate: $SEQUENCE