diff --git a/model/common/src/icon4py/model/common/decomposition/definitions.py b/model/common/src/icon4py/model/common/decomposition/definitions.py index 0c7179988d..52dc70fc30 100644 --- a/model/common/src/icon4py/model/common/decomposition/definitions.py +++ b/model/common/src/icon4py/model/common/decomposition/definitions.py @@ -12,10 +12,11 @@ import dataclasses import functools import logging +import warnings from collections.abc import Sequence from enum import Enum from types import ModuleType -from typing import Any, Literal, Protocol, TypeAlias, overload, runtime_checkable +from typing import Any, ClassVar, Literal, Protocol, TypeAlias, overload, runtime_checkable import dace # type: ignore[import-untyped] import gt4py.next as gtx @@ -408,12 +409,25 @@ def __str__(self) -> str: @dataclasses.dataclass class SingleNodeExchange(ExchangeRuntime): + _warning_emitted: ClassVar[bool] = False + + @classmethod + def _warn_if_used(cls, *, stacklevel: int = 3) -> None: + if not cls._warning_emitted: + warnings.warn( + "***** SingleNodeExchange is in use; HALO EXCHANGE IS RUNNING IN SINGLE-NODE *****", + RuntimeWarning, + stacklevel=stacklevel, + ) + cls._warning_emitted = True + def start( self, dim: gtx.Dimension, *fields: gtx.Field | data_alloc.NDArray, stream: StreamLike = DEFAULT_STREAM, ) -> ExchangeResult: + self._warn_if_used() return SingleNodeResult() def my_rank(self) -> int: diff --git a/model/common/src/icon4py/model/common/grid/grid_manager.py b/model/common/src/icon4py/model/common/grid/grid_manager.py index 1fde396bf5..46be5fcee6 100644 --- a/model/common/src/icon4py/model/common/grid/grid_manager.py +++ b/model/common/src/icon4py/model/common/grid/grid_manager.py @@ -106,8 +106,8 @@ def __call__( allocator: gtx_typing.Allocator | None, keep_skip_values: bool, decomposer: decomp.Decomposer = _single_node_decomposer, - run_properties=_single_process_props, - ): + run_properties: decomposition.ProcessProperties = _single_process_props, + ) -> None: if not run_properties.is_single_rank() and isinstance( decomposer, decomp.SingleNodeDecomposer ): diff --git a/model/common/src/icon4py/model/common/grid/utils.py b/model/common/src/icon4py/model/common/grid/utils.py index 39b48c9dd5..dbb3d69449 100644 --- a/model/common/src/icon4py/model/common/grid/utils.py +++ b/model/common/src/icon4py/model/common/grid/utils.py @@ -5,21 +5,20 @@ # # Please, refer to the LICENSE file in the root directory. # SPDX-License-Identifier: BSD-3-Clause -from types import ModuleType import numpy as np from icon4py.model.common.grid import gridfile -def revert_repeated_index_to_invalid(offset: np.ndarray, array_ns: ModuleType): +def revert_repeated_index_to_invalid(offset: np.ndarray): num_elements = offset.shape[0] for i in range(num_elements): # convert repeated indices back into -1 - for val in array_ns.flip(offset[i, :]): - if array_ns.count_nonzero(val == offset[i, :]) > 1: - unique_values, counts = array_ns.unique(offset[i, :], return_counts=True) + for val in np.flip(offset[i, :]): + if np.count_nonzero(val == offset[i, :]) > 1: + unique_values, counts = np.unique(offset[i, :], return_counts=True) rep_values = unique_values[counts > 1] - rep_indices = array_ns.where(array_ns.isin(offset[i, :], rep_values))[0] + rep_indices = np.where(np.isin(offset[i, :], rep_values))[0] offset[i, rep_indices[1:]] = gridfile.GridFile.INVALID_INDEX return offset diff --git a/model/common/tests/common/decomposition/mpi_tests/test_mpi_decomposition.py b/model/common/tests/common/decomposition/mpi_tests/test_mpi_decomposition.py index a4324fefc5..094fecd132 100644 --- a/model/common/tests/common/decomposition/mpi_tests/test_mpi_decomposition.py +++ b/model/common/tests/common/decomposition/mpi_tests/test_mpi_decomposition.py @@ -325,6 +325,7 @@ def test_exchange_on_dummy_data( @pytest.mark.mpi @pytest.mark.datatest +@pytest.mark.embedded_only @pytest.mark.parametrize("processor_props", [False], indirect=True) def test_halo_exchange_for_sparse_field( interpolation_savepoint: serialbox.InterpolationSavepoint, diff --git a/model/common/tests/common/decomposition/unit_tests/test_definitions.py b/model/common/tests/common/decomposition/unit_tests/test_definitions.py index d99514a3a6..2a7df1db6b 100644 --- a/model/common/tests/common/decomposition/unit_tests/test_definitions.py +++ b/model/common/tests/common/decomposition/unit_tests/test_definitions.py @@ -6,6 +6,9 @@ # Please, refer to the LICENSE file in the root directory. # SPDX-License-Identifier: BSD-3-Clause +import sys +import warnings + import gt4py.next as gtx import numpy as np import pytest @@ -13,6 +16,7 @@ import icon4py.model.common.dimension as dims import icon4py.model.common.utils.data_allocation as data_alloc +from icon4py.model.common import dimension as dims from icon4py.model.common.decomposition import definitions from icon4py.model.common.decomposition.definitions import ( DecompositionInfo, @@ -94,3 +98,57 @@ def test_decomposition_info_is_distributed(flag, expected) -> None: np.ones((mesh.num_cells,)) * flag, ) assert decomp.is_distributed() == expected + + +def test_single_node_exchange_warns_on_first_use(monkeypatch): + monkeypatch.setattr(SingleNodeExchange, "_warning_emitted", False) + + exchange = SingleNodeExchange() + + with pytest.warns(RuntimeWarning, match="SingleNodeExchange"): + exchange.start(dims.CellDim) + + +def test_single_node_exchange_does_not_warn_on_construction_or_repeat_use(monkeypatch): + monkeypatch.setattr(SingleNodeExchange, "_warning_emitted", False) + + with warnings.catch_warnings(record=True) as recorded_warnings: + warnings.simplefilter("always") + exchange = SingleNodeExchange() + + assert len(recorded_warnings) == 0 + + with pytest.warns(RuntimeWarning, match="SingleNodeExchange"): + exchange.exchange(dims.CellDim) + + with warnings.catch_warnings(record=True) as repeated_warnings: + warnings.simplefilter("always") + exchange.start(dims.CellDim) + + assert len(repeated_warnings) == 0 + + +def _assert_warning_points_to_call_site(monkeypatch, func, expected_line): + monkeypatch.setattr(SingleNodeExchange, "_warning_emitted", False) + + with warnings.catch_warnings(record=True) as caught_warnings: + warnings.simplefilter("always") + func() + + assert len(caught_warnings) == 1 + assert caught_warnings[0].filename == __file__ + assert caught_warnings[0].lineno == expected_line + + +def test_single_node_exchange_warning_points_to_call_site(monkeypatch): + exchange = SingleNodeExchange() + + exchange_line = sys._getframe().f_lineno + 1 + _assert_warning_points_to_call_site( + monkeypatch, lambda: exchange.start(dims.CellDim), exchange_line + ) + + wait_line = sys._getframe().f_lineno + 1 + _assert_warning_points_to_call_site( + monkeypatch, lambda: exchange.exchange(dims.CellDim), wait_line + ) diff --git a/model/common/tests/common/grid/mpi_tests/test_parallel_grid_manager.py b/model/common/tests/common/grid/mpi_tests/test_parallel_grid_manager.py index a59dec248d..2e5692b611 100644 --- a/model/common/tests/common/grid/mpi_tests/test_parallel_grid_manager.py +++ b/model/common/tests/common/grid/mpi_tests/test_parallel_grid_manager.py @@ -5,9 +5,7 @@ # # Please, refer to the LICENSE file in the root directory. # SPDX-License-Identifier: BSD-3-Clause -import functools import logging -import operator import numpy as np import pytest @@ -33,7 +31,7 @@ from icon4py.model.common.metrics import metrics_attributes, metrics_factory from icon4py.model.common.states import utils as state_utils from icon4py.model.common.utils import data_allocation as data_alloc -from icon4py.model.testing import definitions as test_defs, grid_utils, test_utils +from icon4py.model.testing import definitions as test_defs, grid_utils, parallel_helpers, test_utils from icon4py.model.testing.fixtures.datatest import ( backend, experiment, @@ -57,7 +55,7 @@ def test_grid_manager_validate_decomposer( processor_props: decomp_defs.ProcessProperties, experiment: test_defs.Experiment, ) -> None: - if experiment == test_defs.Experiments.MCH_CH_R04B09: + if experiment.grid.params.limited_area: pytest.xfail("Limited-area grids not yet supported") file = grid_utils.resolve_full_grid_file_name(experiment.grid) @@ -85,104 +83,6 @@ def _get_neighbor_tables(grid: base.Grid) -> dict: } -def gather_field(field: np.ndarray, props: decomp_defs.ProcessProperties) -> tuple: - constant_dims = tuple(field.shape[1:]) - _log.info(f"gather_field on rank={props.rank} - gathering field of local shape {field.shape}") - # Because of sparse indexing the field may have a non-contigous layout, - # which Gatherv doesn't support. Make sure the field is contiguous. - field = np.ascontiguousarray(field) - constant_length = functools.reduce(operator.mul, constant_dims, 1) - local_sizes = np.array(props.comm.gather(field.size, root=0)) - if props.rank == 0: - recv_buffer = np.empty(np.sum(local_sizes), dtype=field.dtype) - _log.info( - f"gather_field on rank = {props.rank} - setup receive buffer with size {sum(local_sizes)} on rank 0" - ) - else: - recv_buffer = None - - props.comm.Gatherv(sendbuf=field, recvbuf=(recv_buffer, local_sizes), root=0) - if props.rank == 0: - local_first_dim = tuple(sz // constant_length for sz in local_sizes) - _log.info( - f" gather_field on rank = 0: computed local dims {local_first_dim} - constant dims {constant_dims}" - ) - gathered_field = recv_buffer.reshape((-1, *constant_dims)) # type: ignore [union-attr] - else: - gathered_field = None - local_first_dim = field.shape - return local_first_dim, gathered_field - - -def check_local_global_field( - decomposition_info: decomp_defs.DecompositionInfo, - processor_props: decomp_defs.ProcessProperties, # F811 # fixture - dim: gtx.Dimension, - global_reference_field: np.ndarray, - local_field: np.ndarray, - check_halos: bool, -) -> None: - if dim == dims.KDim: - test_utils.assert_dallclose(global_reference_field, local_field) - return - - _log.info( - f" rank= {processor_props.rank}/{processor_props.comm_size}----exchanging field of main dim {dim}" - ) - assert ( - local_field.shape[0] - == decomposition_info.global_index(dim, decomp_defs.DecompositionInfo.EntryType.ALL).shape[ - 0 - ] - ) - - # Compare halo against global reference field - if check_halos: - test_utils.assert_dallclose( - global_reference_field[ - decomposition_info.global_index(dim, decomp_defs.DecompositionInfo.EntryType.HALO) - ], - local_field[ - decomposition_info.local_index(dim, decomp_defs.DecompositionInfo.EntryType.HALO) - ], - atol=1e-9, - verbose=True, - ) - - # Compare owned local field, excluding halos, against global reference - # field, by gathering owned entries to the first rank. This ensures that in - # total we have the full global field distributed on all ranks. - owned_entries = local_field[ - decomposition_info.local_index(dim, decomp_defs.DecompositionInfo.EntryType.OWNED) - ] - gathered_sizes, gathered_field = gather_field(owned_entries, processor_props) - - global_index_sizes, gathered_global_indices = gather_field( - decomposition_info.global_index(dim, decomp_defs.DecompositionInfo.EntryType.OWNED), - processor_props, - ) - - if processor_props.rank == 0: - _log.info(f"rank = {processor_props.rank}: asserting gathered fields: ") - - assert np.all( - gathered_sizes == global_index_sizes - ), f"gathered field sizes do not match: {dim} {gathered_sizes} - {global_index_sizes}" - _log.info( - f"rank = {processor_props.rank}: Checking field size on dim ={dim}: --- gathered sizes {gathered_sizes} = {sum(gathered_sizes)}" - ) - _log.info( - f"rank = {processor_props.rank}: --- gathered field has size {gathered_sizes}" - ) - sorted_ = np.zeros(global_reference_field.shape, dtype=gtx.float64) # type: ignore [attr-defined] - sorted_[gathered_global_indices] = gathered_field - _log.info( - f" rank = {processor_props.rank}: SHAPES: global reference field {global_reference_field.shape}, gathered = {gathered_field.shape}" - ) - - test_utils.assert_dallclose(sorted_, global_reference_field, atol=1e-9, verbose=True) - - # These fields can't be computed with the embedded backend for one reason or # another, so we declare them here for xfailing. embedded_broken_fields = { @@ -316,7 +216,7 @@ def test_geometry_fields_compare_single_multi_rank( field = multi_rank_geometry.get(attrs_name) dim = field_ref.domain.dims[0] - check_local_global_field( + parallel_helpers.check_local_global_field( decomposition_info=multi_rank_grid_manager.decomposition_info, processor_props=processor_props, dim=dim, @@ -361,7 +261,7 @@ def test_interpolation_fields_compare_single_multi_rank( experiment: test_defs.Experiment, attrs_name: str, ) -> None: - if experiment == test_defs.Experiments.MCH_CH_R04B09: + if experiment.grid.params.limited_area: pytest.xfail("Limited-area grids not yet supported") if attrs_name in embedded_broken_fields and test_utils.is_embedded(backend): @@ -430,7 +330,7 @@ def test_interpolation_fields_compare_single_multi_rank( field = multi_rank_interpolation.get(attrs_name) dim = field_ref.domain.dims[0] - check_local_global_field( + parallel_helpers.check_local_global_field( decomposition_info=multi_rank_grid_manager.decomposition_info, processor_props=processor_props, dim=dim, @@ -503,7 +403,7 @@ def test_metrics_fields_compare_single_multi_rank( experiment: test_defs.Experiment, attrs_name: str, ) -> None: - if experiment == test_defs.Experiments.MCH_CH_R04B09: + if experiment.grid.params.limited_area: pytest.xfail("Limited-area grids not yet supported") if attrs_name in embedded_broken_fields and test_utils.is_embedded(backend): @@ -660,7 +560,7 @@ def test_metrics_fields_compare_single_multi_rank( assert isinstance(field, state_utils.ScalarType) assert pytest.approx(field) == field_ref else: - check_local_global_field( + parallel_helpers.check_local_global_field( decomposition_info=multi_rank_grid_manager.decomposition_info, processor_props=processor_props, dim=field_ref.domain.dims[0], @@ -682,7 +582,7 @@ def test_metrics_mask_prog_halo_c( backend: gtx_typing.Backend | None, experiment: test_defs.Experiment, ) -> None: - if experiment == test_defs.Experiments.MCH_CH_R04B09: + if experiment.grid.params.limited_area: pytest.xfail("Limited-area grids not yet supported") file = grid_utils.resolve_full_grid_file_name(experiment.grid) @@ -813,7 +713,7 @@ def test_validate_skip_values_in_distributed_connectivities( processor_props: decomp_defs.ProcessProperties, experiment: test_defs.Experiment, ) -> None: - if experiment == test_defs.Experiments.MCH_CH_R04B09: + if experiment.grid.params.limited_area: pytest.xfail("Limited-area grids not yet supported") file = grid_utils.resolve_full_grid_file_name(experiment.grid) diff --git a/model/standalone_driver/src/icon4py/model/standalone_driver/driver_utils.py b/model/standalone_driver/src/icon4py/model/standalone_driver/driver_utils.py index edcdd6b032..b1c13d97be 100644 --- a/model/standalone_driver/src/icon4py/model/standalone_driver/driver_utils.py +++ b/model/standalone_driver/src/icon4py/model/standalone_driver/driver_utils.py @@ -23,12 +23,12 @@ from icon4py.model.atmosphere.dycore import dycore_states, solve_nonhydro as solve_nh from icon4py.model.common import ( constants, - dimension as dims, field_type_aliases as fa, model_backends, type_alias as ta, ) from icon4py.model.common.decomposition import ( + decomposer as decomp, definitions as decomposition_defs, mpi_decomposition as mpi_decomp, ) @@ -44,7 +44,6 @@ from icon4py.model.common.interpolation import interpolation_attributes, interpolation_factory from icon4py.model.common.metrics import metrics_attributes, metrics_factory from icon4py.model.common.states import factory as states_factory -from icon4py.model.common.utils import data_allocation as data_alloc from icon4py.model.standalone_driver import config as driver_config, driver_states @@ -64,38 +63,30 @@ def create_grid_manager( grid_file_path: pathlib.Path, vertical_grid_config: v_grid.VerticalGridConfig, allocator: gtx_typing.Allocator, + parallel_props: decomposition_defs.ProcessProperties, global_reductions: decomposition_defs.Reductions = decomposition_defs.single_node_reductions, ) -> gm.GridManager: + decomposer = ( + decomp.MetisDecomposer() + if not parallel_props.is_single_rank() + else decomp.SingleNodeDecomposer() + ) grid_manager = gm.GridManager( grid_file=grid_file_path, config=vertical_grid_config, offset_transformation=gridfile.ToZeroBasedIndexTransformation(), global_reductions=global_reductions, ) - grid_manager(allocator=allocator, keep_skip_values=True) + grid_manager( + allocator=allocator, + keep_skip_values=True, + run_properties=parallel_props, + decomposer=decomposer, + ) return grid_manager -def create_decomposition_info( - grid_manager: gm.GridManager, - allocator: gtx_typing.Allocator, -) -> decomposition_defs.DecompositionInfo: - decomposition_info = decomposition_defs.DecompositionInfo() - xp = data_alloc.import_array_ns(allocator) - - def _add_dimension(dim: gtx.Dimension) -> None: - indices = data_alloc.index_field(grid_manager.grid, dim, allocator=allocator) - owner_mask = xp.ones((grid_manager.grid.size[dim],), dtype=bool) - decomposition_info.set_dimension(dim, indices.ndarray, owner_mask, None) - - _add_dimension(dims.EdgeDim) - _add_dimension(dims.VertexDim) - _add_dimension(dims.CellDim) - - return decomposition_info - - def create_vertical_grid( vertical_grid_config: v_grid.VerticalGridConfig, allocator: gtx_typing.Allocator, @@ -118,6 +109,8 @@ def create_static_field_factories( vertical_grid: v_grid.VerticalGrid, cell_topography: fa.CellField[ta.wpfloat], backend: gtx_typing.Backend | None, + exchange: decomposition_defs.ExchangeRuntime, + global_reductions: decomposition_defs.Reductions, ) -> driver_states.StaticFieldFactories: geometry_field_source = grid_geometry.GridGeometry( grid=grid_manager.grid, @@ -126,6 +119,8 @@ def create_static_field_factories( coordinates=grid_manager.coordinates, extra_fields=grid_manager.geometry_fields, metadata=geometry_meta.attrs, + exchange=exchange, + global_reductions=global_reductions, ) interpolation_field_source = interpolation_factory.InterpolationFieldsFactory( @@ -134,6 +129,7 @@ def create_static_field_factories( geometry_source=geometry_field_source, backend=backend, metadata=interpolation_attributes.attrs, + exchange=exchange, ) metrics_field_source = metrics_factory.MetricsFieldsFactory( @@ -151,6 +147,8 @@ def create_static_field_factories( vwind_offctr=0.15, thslp_zdiffu=0.025, thhgtd_zdiffu=200.0, + exchange=exchange, + global_reductions=global_reductions, ) return driver_states.StaticFieldFactories( @@ -344,6 +342,7 @@ def initialize_granules( edge_geometry=edge_geometry, cell_geometry=cell_geometry, owner_mask=owner_mask, + exchange=exchange, ) advection_granule = advection.convert_config_to_advection( @@ -579,12 +578,15 @@ def configure_logging( display_icon4py_logo_in_log_file() -def get_backend_from_name(backend_name: str) -> model_backends.BackendLike: +def get_backend_from_name( + backend_name: str | model_backends.BackendLike | None, +) -> model_backends.BackendLike: if backend_name not in model_backends.BACKENDS: raise ValueError( f"Invalid driver backend: {backend_name}. \n" f"Available backends are {', '.join([*model_backends.BACKENDS.keys()])}" ) + assert isinstance(backend_name, str) backend = model_backends.BACKENDS[backend_name] log.info(f"Backend name used for the model: {backend_name}") log.info(f"BackendLike derived from the backend name: {backend}") diff --git a/model/standalone_driver/src/icon4py/model/standalone_driver/main.py b/model/standalone_driver/src/icon4py/model/standalone_driver/main.py index 034cfabc70..6204635149 100644 --- a/model/standalone_driver/src/icon4py/model/standalone_driver/main.py +++ b/model/standalone_driver/src/icon4py/model/standalone_driver/main.py @@ -12,6 +12,7 @@ import typer from icon4py.model.common import model_backends +from icon4py.model.common.decomposition import definitions as decomp_defs from icon4py.model.standalone_driver import driver_states, driver_utils, standalone_driver from icon4py.model.standalone_driver.testcases import initial_condition @@ -39,7 +40,13 @@ def main( help=f"Logging level of the model. Possible options are {' / '.join([*driver_utils._LOGGING_LEVELS.keys()])}", ), ] = next(iter(driver_utils._LOGGING_LEVELS.keys())), -) -> driver_states.DriverStates: + force_serial_run: Annotated[ + bool, + typer.Option( + help="Force a single-node run even if MPI is available. Useful to build serial reference output within MPI test sessions.", + ), + ] = False, +) -> tuple[driver_states.DriverStates, decomp_defs.DecompositionInfo]: """ This is a function that runs the icon4py driver from a grid file with the initial condition from the Jablonowski Williamson test case @@ -55,6 +62,7 @@ def main( grid_file_path=grid_file_path, log_level=log_level, backend_name=icon4py_backend, + force_serial_run=force_serial_run, ) log.info("Generating the initial condition") @@ -68,6 +76,7 @@ def main( model_top_height=icon4py_driver.vertical_grid_config.model_top_height, stretch_factor=icon4py_driver.vertical_grid_config.stretch_factor, damping_height=icon4py_driver.vertical_grid_config.rayleigh_damping_height, + exchange=icon4py_driver.exchange, ) log.info("driver setup: DONE") @@ -79,7 +88,7 @@ def main( ) log.info("time loop: DONE") - return ds + return ds, icon4py_driver.decomposition_info if __name__ == "__main__": diff --git a/model/standalone_driver/src/icon4py/model/standalone_driver/standalone_driver.py b/model/standalone_driver/src/icon4py/model/standalone_driver/standalone_driver.py index 276be59f83..ad7f93513d 100644 --- a/model/standalone_driver/src/icon4py/model/standalone_driver/standalone_driver.py +++ b/model/standalone_driver/src/icon4py/model/standalone_driver/standalone_driver.py @@ -22,7 +22,10 @@ from icon4py.model.atmosphere.diffusion import diffusion, diffusion_states from icon4py.model.atmosphere.dycore import dycore_states, solve_nonhydro as solve_nh from icon4py.model.common import dimension as dims, model_backends, model_options, type_alias as ta -from icon4py.model.common.decomposition import definitions as decomposition_defs +from icon4py.model.common.decomposition import ( + definitions as decomposition_defs, + mpi_decomposition as mpi_decomp, +) from icon4py.model.common.grid import geometry_attributes as geom_attr, vertical as v_grid from icon4py.model.common.grid.icon import IconGrid from icon4py.model.common.initialization import topography @@ -46,15 +49,18 @@ def __init__( config: driver_config.DriverConfig, backend: gtx.typing.Backend | None, grid: IconGrid, + decomposition_info: decomposition_defs.DecompositionInfo, static_field_factories: driver_states.StaticFieldFactories, diffusion_granule: diffusion.Diffusion, solve_nonhydro_granule: solve_nh.SolveNonhydro, vertical_grid_config: v_grid.VerticalGridConfig, tracer_advection_granule: advection.Advection, + exchange: decomposition_defs.ExchangeRuntime, ): self.config = config self.backend = backend self.grid = grid + self.decomposition_info = decomposition_info self.static_field_factories = static_field_factories self.diffusion = diffusion_granule self.solve_nonhydro = solve_nonhydro_granule @@ -64,6 +70,7 @@ def __init__( self.timer_collection = driver_states.TimerCollection( [timer.value for timer in driver_states.DriverTimers] ) + self.exchange = exchange driver_utils.display_driver_setup_in_log_file( self.model_time_variables.n_time_steps, @@ -553,7 +560,8 @@ def initialize_driver( output_path: pathlib.Path, grid_file_path: pathlib.Path, log_level: str, - backend_name: str, + backend_name: str | model_backends.BackendLike | None, + force_serial_run: bool = False, ) -> Icon4pyDriver: """ Initialize the driver: @@ -573,8 +581,9 @@ def initialize_driver( Driver: driver object """ + with_mpi = (mpi_decomp.mpi4py is not None) and not force_serial_run parallel_props = decomposition_defs.get_processor_properties( - decomposition_defs.get_runtype(with_mpi=False) + decomposition_defs.get_runtype(with_mpi=with_mpi) ) driver_utils.configure_logging( logging_level=log_level, @@ -611,15 +620,12 @@ def initialize_driver( grid_file_path=grid_file_path, vertical_grid_config=vertical_grid_config, allocator=allocator, + parallel_props=parallel_props, global_reductions=global_reductions, ) log.info("creating the decomposition info") - - decomposition_info = driver_utils.create_decomposition_info( - grid_manager=grid_manager, - allocator=allocator, - ) + decomposition_info = grid_manager.decomposition_info exchange = decomposition_defs.create_exchange(parallel_props, decomposition_info) log.info("initializing the vertical grid") @@ -642,6 +648,8 @@ def initialize_driver( vertical_grid=vertical_grid, cell_topography=gtx.as_field((dims.CellDim,), data=cell_topography, allocator=allocator), # type: ignore[arg-type] # due to array_ns opacity backend=backend, + exchange=exchange, + global_reductions=global_reductions, ) log.info("initializing granules") @@ -668,11 +676,13 @@ def initialize_driver( config=driver_config, backend=backend, grid=grid_manager.grid, + decomposition_info=decomposition_info, static_field_factories=static_field_factories, diffusion_granule=diffusion_granule, solve_nonhydro_granule=solve_nonhydro_granule, vertical_grid_config=vertical_grid_config, tracer_advection_granule=tracer_advection_granule, + exchange=exchange, ) return icon4py_driver diff --git a/model/standalone_driver/src/icon4py/model/standalone_driver/testcases/initial_condition.py b/model/standalone_driver/src/icon4py/model/standalone_driver/testcases/initial_condition.py index 02899c8b9a..c398ee3314 100644 --- a/model/standalone_driver/src/icon4py/model/standalone_driver/testcases/initial_condition.py +++ b/model/standalone_driver/src/icon4py/model/standalone_driver/testcases/initial_condition.py @@ -5,7 +5,6 @@ # # Please, refer to the LICENSE file in the root directory. # SPDX-License-Identifier: BSD-3-Clause -import functools import logging import math @@ -21,6 +20,7 @@ model_backends, type_alias as ta, ) +from icon4py.model.common.decomposition import definitions as decomposition_defs from icon4py.model.common.grid import ( geometry as grid_geometry, geometry_attributes as geometry_meta, @@ -57,6 +57,7 @@ def jablonowski_williamson( # noqa: PLR0915 [too-many-statements] model_top_height: float, stretch_factor: float, damping_height: float, + exchange: decomposition_defs.ExchangeRuntime = decomposition_defs.SingleNodeExchange, ) -> driver_states.DriverStates: """ Initial condition of Jablonowski-Williamson test. Set jw_baroclinic_amplitude to values larger than 0.01 if @@ -240,11 +241,10 @@ def jablonowski_williamson( # noqa: PLR0915 [too-many-statements] vertical_end=num_levels, offset_provider=grid.connectivities, ) + exchange(eta_v_at_edge, dim=dims.EdgeDim) log.info("Cell-to-edge eta_v computation completed.") - prognostic_state_now.vn.ndarray[:, :] = functools.partial( - testcases_utils.zonalwind_2_normalwind_ndarray, array_ns=xp - )( + prognostic_state_now.vn.ndarray[:, :] = testcases_utils.zonalwind_2_normalwind_ndarray( grid=grid, jw_u0=jw_u0, jw_baroclinic_amplitude=jw_baroclinic_amplitude, @@ -254,7 +254,9 @@ def jablonowski_williamson( # noqa: PLR0915 [too-many-statements] edge_lon=edge_lon, primal_normal_x=primal_normal_x, eta_v_at_edge=eta_v_at_edge.ndarray, + array_ns=xp, ) + vertical_config = v_grid.VerticalGridConfig( grid.num_levels, lowest_layer_thickness=lowest_layer_thickness, @@ -283,7 +285,7 @@ def jablonowski_williamson( # noqa: PLR0915 [too-many-statements] ) log.info("U2vn computation completed.") - functools.partial(testcases_utils.apply_hydrostatic_adjustment_ndarray, array_ns=xp)( + testcases_utils.apply_hydrostatic_adjustment_ndarray( rho=rho_ndarray, exner=exner_ndarray, theta_v=theta_v_ndarray, @@ -294,6 +296,7 @@ def jablonowski_williamson( # noqa: PLR0915 [too-many-statements] wgtfac_c=wgtfac_c, ddqz_z_half=ddqz_z_half, num_levels=num_levels, + array_ns=xp, ) log.info("Hydrostatic adjustment computation completed.") prognostic_state_next = prognostics.PrognosticState( @@ -317,6 +320,8 @@ def jablonowski_williamson( # noqa: PLR0915 [too-many-statements] vertical_end=num_levels, offset_provider=grid.connectivities, ) + exchange(diagnostic_state.u, dim=dims.CellDim) + exchange(diagnostic_state.v, dim=dims.CellDim) log.info("U, V computation completed.") diff --git a/model/standalone_driver/src/icon4py/model/standalone_driver/testcases/utils.py b/model/standalone_driver/src/icon4py/model/standalone_driver/testcases/utils.py index 86bf7390f5..09698a8ce3 100644 --- a/model/standalone_driver/src/icon4py/model/standalone_driver/testcases/utils.py +++ b/model/standalone_driver/src/icon4py/model/standalone_driver/testcases/utils.py @@ -190,14 +190,14 @@ def init_w( lb_c = grid.start_index(h_grid.domain(dims.CellDim)(h_grid.Zone.LATERAL_BOUNDARY_LEVEL_2)) ub_c = grid.end_index(h_grid.domain(dims.CellDim)(h_grid.Zone.INTERIOR)) - z_wsfc_e = array_ns.zeros((ub_e,)) + z_wsfc_e = array_ns.zeros((grid.num_edges,)) for je in range(lb_e, ub_e): z_wsfc_e[je] = ( vn[je, nlev - 1] * ((z_ifc[e2c[:, 1]] - z_ifc[e2c[:, 0]])[je, :] * inv_dual_edge_length[je])[nlev] ) - e_inn_c = array_ns.zeros((ub_c, 3)) # or 1 + e_inn_c = array_ns.zeros((grid.num_cells, 3)) # or 1 for jc in range(ub_c): for je in range(3): idx_ce = 0 if e2c[c2e][jc, je, 0] == jc else 1 @@ -208,8 +208,8 @@ def init_w( ) z_wsfc_c = array_ns.sum(z_wsfc_e[c2e] * e_inn_c, axis=1) - w = array_ns.zeros((ub_c, nlev + 1)) - w[lb_c:, nlev] = z_wsfc_c[lb_c:ub_c] - w[lb_c:, 1:] = z_wsfc_c[lb_c:ub_c, array_ns.newaxis] * vct_b[array_ns.newaxis, 1:] + w = array_ns.zeros((grid.num_cells, nlev + 1)) + w[lb_c:ub_c, nlev] = z_wsfc_c[lb_c:ub_c] + w[lb_c:ub_c, 1:] = z_wsfc_c[lb_c:ub_c, array_ns.newaxis] * vct_b[array_ns.newaxis, 1:] return w diff --git a/model/standalone_driver/tests/standalone_driver/integration_tests/test_initial_condition.py b/model/standalone_driver/tests/standalone_driver/integration_tests/test_initial_condition.py index aba7b35f9a..6cde0088ea 100644 --- a/model/standalone_driver/tests/standalone_driver/integration_tests/test_initial_condition.py +++ b/model/standalone_driver/tests/standalone_driver/integration_tests/test_initial_condition.py @@ -9,11 +9,10 @@ import pytest -from icon4py.model.common import dimension as dims, model_backends, model_options -from icon4py.model.common.utils import data_allocation as data_alloc -from icon4py.model.standalone_driver import driver_states, driver_utils, standalone_driver +from icon4py.model.common import model_backends +from icon4py.model.standalone_driver import driver_utils, standalone_driver from icon4py.model.standalone_driver.testcases import initial_condition -from icon4py.model.testing import definitions, grid_utils, serialbox, serialbox as sb, test_utils +from icon4py.model.testing import definitions, grid_utils, serialbox as sb, test_utils from icon4py.model.testing.fixtures.datatest import ( backend, backend_like, @@ -31,7 +30,7 @@ def test_standalone_driver_initial_condition( backend_like: model_backends.BackendLike, tmp_path: pathlib.Path, experiment: definitions.Experiment, - data_provider: serialbox.IconSerialDataProvider, + data_provider: sb.IconSerialDataProvider, ) -> None: backend_name = next( (k for k, v in model_backends.BACKENDS.items() if backend_like == v), "embedded" diff --git a/model/standalone_driver/tests/standalone_driver/integration_tests/test_standalone_driver.py b/model/standalone_driver/tests/standalone_driver/integration_tests/test_standalone_driver.py index e3617291a2..2822f88395 100644 --- a/model/standalone_driver/tests/standalone_driver/integration_tests/test_standalone_driver.py +++ b/model/standalone_driver/tests/standalone_driver/integration_tests/test_standalone_driver.py @@ -9,11 +9,10 @@ import pytest -from icon4py.model.common import model_backends, model_options -from icon4py.model.common.utils import data_allocation as data_alloc -from icon4py.model.standalone_driver import driver_utils, main -from icon4py.model.testing import definitions, grid_utils, serialbox as sb, test_utils -from icon4py.model.testing.fixtures.datatest import backend, backend_like +from icon4py.model.common import model_backends +from icon4py.model.standalone_driver import main +from icon4py.model.testing import definitions as test_defs, grid_utils, serialbox as sb, test_utils +from icon4py.model.testing.fixtures.datatest import backend_like from ..fixtures import * # noqa: F403 @@ -24,7 +23,7 @@ "experiment, istep_exit, substep_exit, timeloop_date_init, timeloop_date_exit, step_date_exit, timeloop_diffusion_linit_init, timeloop_diffusion_linit_exit", [ ( - definitions.Experiments.JW, + test_defs.Experiments.JW, 2, 5, "2008-09-01T00:00:00.000", @@ -36,7 +35,7 @@ ], ) def test_standalone_driver( - experiment: definitions.Experiment, + experiment: test_defs.Experiment, timeloop_date_init: str, timeloop_date_exit: str, timeloop_diffusion_linit_init: bool, @@ -52,7 +51,7 @@ def test_standalone_driver( ) grid_file_path = grid_utils._download_grid_file(experiment.grid) output_path = tmp_path / f"ci_driver_output_for_backend_{backend_name}" - ds = main.main( + ds, _ = main.main( grid_file_path=grid_file_path, icon4py_backend=backend_name, output_path=output_path, diff --git a/model/standalone_driver/tests/standalone_driver/mpi_tests/__init__.py b/model/standalone_driver/tests/standalone_driver/mpi_tests/__init__.py new file mode 100644 index 0000000000..de9850de36 --- /dev/null +++ b/model/standalone_driver/tests/standalone_driver/mpi_tests/__init__.py @@ -0,0 +1,7 @@ +# ICON4Py - ICON inspired code in Python and GT4Py +# +# Copyright (c) 2022-2024, ETH Zurich and MeteoSwiss +# All rights reserved. +# +# Please, refer to the LICENSE file in the root directory. +# SPDX-License-Identifier: BSD-3-Clause diff --git a/model/standalone_driver/tests/standalone_driver/mpi_tests/test_parallel_standalone_driver.py b/model/standalone_driver/tests/standalone_driver/mpi_tests/test_parallel_standalone_driver.py new file mode 100644 index 0000000000..8c96274918 --- /dev/null +++ b/model/standalone_driver/tests/standalone_driver/mpi_tests/test_parallel_standalone_driver.py @@ -0,0 +1,325 @@ +# ICON4Py - ICON inspired code in Python and GT4Py +# +# Copyright (c) 2022-2024, ETH Zurich and MeteoSwiss +# All rights reserved. +# +# Please, refer to the LICENSE file in the root directory. +# SPDX-License-Identifier: BSD-3-Clause + +import logging +import pathlib + +import pytest + +from icon4py.model.common import dimension as dims, model_backends, model_options +from icon4py.model.common.decomposition import definitions as decomp_defs, mpi_decomposition +from icon4py.model.common.utils import data_allocation as data_alloc +from icon4py.model.standalone_driver import driver_states, driver_utils, main, standalone_driver +from icon4py.model.standalone_driver.testcases import initial_condition +from icon4py.model.testing import ( + data_handling, + datatest_utils as dt_utils, + definitions as test_defs, + grid_utils, + parallel_helpers, +) +from icon4py.model.testing.fixtures.datatest import backend_like, experiment, processor_props + + +if mpi_decomposition.mpi4py is None: + pytest.skip("Skipping parallel tests on single node installation", allow_module_level=True) + +_log = logging.getLogger(__file__) + + +@pytest.mark.datatest +@pytest.mark.embedded_remap_error +@pytest.mark.parametrize( + "experiment", + [ + test_defs.Experiments.JW, + ], +) +@pytest.mark.mpi +@pytest.mark.parametrize("processor_props", [True], indirect=True) +def test_initial_condition_jablonowski_williamson_compare_single_multi_rank( + experiment: test_defs.Experiment, + tmp_path: pathlib.Path, + processor_props: decomp_defs.ProcessProperties, + backend_like: model_backends.BackendLike, +) -> None: + if experiment.grid.params.limited_area: + pytest.xfail("Limited-area grids not yet supported") + + _log.info(f"running on {processor_props.comm} with {processor_props.comm_size} ranks") + + backend_name = "embedded" # shut up pyright/mypy + for k, v in model_backends.BACKENDS.items(): + if backend_like == v: + backend_name = k + + grid_file_path = grid_utils._download_grid_file(experiment.grid) + + single_rank_icon4py_driver: standalone_driver.Icon4pyDriver = ( + standalone_driver.initialize_driver( + output_path=tmp_path / f"ci_driver_output_for_backend_{backend_name}_serial_rank0", + grid_file_path=grid_file_path, + log_level="info", + backend_name=backend_name, + force_serial_run=True, + ) + ) + + single_rank_ds: driver_states.DriverStates = initial_condition.jablonowski_williamson( + grid=single_rank_icon4py_driver.grid, + geometry_field_source=single_rank_icon4py_driver.static_field_factories.geometry_field_source, + interpolation_field_source=single_rank_icon4py_driver.static_field_factories.interpolation_field_source, + metrics_field_source=single_rank_icon4py_driver.static_field_factories.metrics_field_source, + backend=single_rank_icon4py_driver.backend, + lowest_layer_thickness=single_rank_icon4py_driver.vertical_grid_config.lowest_layer_thickness, + model_top_height=single_rank_icon4py_driver.vertical_grid_config.model_top_height, + stretch_factor=single_rank_icon4py_driver.vertical_grid_config.stretch_factor, + damping_height=single_rank_icon4py_driver.vertical_grid_config.rayleigh_damping_height, + exchange=single_rank_icon4py_driver.exchange, + ) + + multi_rank_icon4py_driver: standalone_driver.Icon4pyDriver = ( + standalone_driver.initialize_driver( + output_path=tmp_path / f"ci_driver_output_for_backend_{backend_name}_serial_rank0", + grid_file_path=grid_file_path, + log_level="info", + backend_name=backend_name, + ) + ) + + multi_rank_ds: driver_states.DriverStates = initial_condition.jablonowski_williamson( + grid=multi_rank_icon4py_driver.grid, + geometry_field_source=multi_rank_icon4py_driver.static_field_factories.geometry_field_source, + interpolation_field_source=multi_rank_icon4py_driver.static_field_factories.interpolation_field_source, + metrics_field_source=multi_rank_icon4py_driver.static_field_factories.metrics_field_source, + backend=multi_rank_icon4py_driver.backend, + lowest_layer_thickness=multi_rank_icon4py_driver.vertical_grid_config.lowest_layer_thickness, + model_top_height=multi_rank_icon4py_driver.vertical_grid_config.model_top_height, + stretch_factor=multi_rank_icon4py_driver.vertical_grid_config.stretch_factor, + damping_height=multi_rank_icon4py_driver.vertical_grid_config.rayleigh_damping_height, + exchange=multi_rank_icon4py_driver.exchange, + ) + + # TODO (jcanton/msimberg): unify the two checks below and remove code duplication + fields = ["vn", "w", "exner", "theta_v", "rho"] + serial_reference_fields: dict[str, object] = { + field_name: getattr(single_rank_ds.prognostics.current, field_name).asnumpy() + for field_name in fields + } + + for field_name in fields: + print(f"verifying field {field_name}") + global_reference_field = processor_props.comm.bcast( + serial_reference_fields.get(field_name), + root=0, + ) + local_field = getattr(multi_rank_ds.prognostics.current, field_name) + dim = local_field.domain.dims[0] + parallel_helpers.check_local_global_field( + decomposition_info=multi_rank_icon4py_driver.decomposition_info, + processor_props=processor_props, + dim=dim, + global_reference_field=global_reference_field, + local_field=local_field.asnumpy(), + check_halos=True, + atol=0.0, + ) + + fields = ["u", "v"] + serial_reference_fields: dict[str, object] = { + field_name: getattr(single_rank_ds.diagnostic, field_name).asnumpy() + for field_name in fields + } + for field_name in fields: + print(f"verifying diagnostic field {field_name}") + global_reference_field = processor_props.comm.bcast( + serial_reference_fields.get(field_name), + root=0, + ) + local_field = getattr(multi_rank_ds.diagnostic, field_name) + dim = local_field.domain.dims[0] + parallel_helpers.check_local_global_field( + decomposition_info=multi_rank_icon4py_driver.decomposition_info, + processor_props=processor_props, + dim=dim, + global_reference_field=global_reference_field, + local_field=local_field.asnumpy(), + check_halos=True, + atol=0.0, + ) + + +@pytest.mark.datatest +@pytest.mark.embedded_remap_error +@pytest.mark.parametrize( + "experiment", + [ + test_defs.Experiments.JW, + ], +) +@pytest.mark.mpi +@pytest.mark.parametrize("processor_props", [True], indirect=True) +def test_standalone_driver_compare_single_multi_rank( + experiment: test_defs.Experiment, + tmp_path: pathlib.Path, + processor_props: decomp_defs.ProcessProperties, + backend_like: model_backends.BackendLike, +) -> None: + if experiment.grid.params.limited_area: + pytest.xfail("Limited-area grids not yet supported") + + _log.info(f"running on {processor_props.comm} with {processor_props.comm_size} ranks") + + backend_name = "embedded" # shut up pyright/mypy + for k, v in model_backends.BACKENDS.items(): + if backend_like == v: + backend_name = k + + grid_file_path = grid_utils._download_grid_file(experiment.grid) + + single_rank_ds, _ = main.main( + grid_file_path=grid_file_path, + icon4py_backend=backend_name, + output_path=tmp_path / f"ci_driver_output_for_backend_{backend_name}_serial_rank0", + force_serial_run=True, + ) + + multi_rank_ds, decomposition_info = main.main( + grid_file_path=grid_file_path, + icon4py_backend=backend_name, + output_path=tmp_path + / f"ci_driver_output_for_backend_{backend_name}_mpi_rank_{processor_props.rank}", + ) + + fields = ["vn", "w", "exner", "theta_v", "rho"] + serial_reference_fields: dict[str, object] = { + field_name: getattr(single_rank_ds.prognostics.current, field_name).asnumpy() + for field_name in fields + } + + for field_name in fields: + print(f"verifying field {field_name}") + global_reference_field = processor_props.comm.bcast( + serial_reference_fields.get(field_name), + root=0, + ) + local_field = getattr(multi_rank_ds.prognostics.current, field_name) + dim = local_field.domain.dims[0] + parallel_helpers.check_local_global_field( + decomposition_info=decomposition_info, + processor_props=processor_props, + dim=dim, + global_reference_field=global_reference_field, + local_field=local_field.asnumpy(), + check_halos=True, + atol=1e-6, + ) + + +@pytest.mark.datatest +@pytest.mark.embedded_remap_error +@pytest.mark.parametrize( + "experiment, istep_exit, substep_exit, step_date_exit, timeloop_diffusion_linit_exit", + [ + ( + test_defs.Experiments.JW, + 2, + 5, + "2008-09-01T00:05:00.000", + False, + ), + ], +) +@pytest.mark.mpi +@pytest.mark.parametrize("processor_props", [True], indirect=True) +def test_run_single_step_serialized_data( + experiment: test_defs.Experiment, + istep_exit: int, + substep_exit: int, + step_date_exit: str, + timeloop_diffusion_linit_exit: bool, + tmp_path: pathlib.Path, + processor_props: decomp_defs.ProcessProperties, + backend_like: model_backends.BackendLike, +) -> None: + if experiment.grid.params.limited_area: + pytest.xfail("Limited-area grids not yet supported") + + _log.info(f"running on {processor_props.comm} with {processor_props.comm_size} ranks") + + backend_name = "embedded" # shut up pyright/mypy + for k, v in model_backends.BACKENDS.items(): + if backend_like == v: + backend_name = k + + grid_file_path = grid_utils._download_grid_file(experiment.grid) + + multi_rank_ds, decomposition_info = main.main( + grid_file_path=grid_file_path, + icon4py_backend=backend_name, + output_path=tmp_path + / f"ci_driver_output_for_backend_{backend_name}_mpi_rank_{processor_props.rank}", + ) + + serial_reference_fields = None + if processor_props.rank == 0: + single_rank_processor_props = decomp_defs.get_processor_properties( + decomp_defs.get_runtype(with_mpi=False) + ) + root_url = test_defs.SERIALIZED_DATA_ROOT_URLS[single_rank_processor_props.comm_size] + archive_filename = dt_utils.get_experiment_archive_filename( + experiment, single_rank_processor_props.comm_size + ) + archive_path = f"{test_defs.SERIALIZED_DATA_DIR}/{archive_filename}" + uri = dt_utils.get_serialized_data_url(root_url, archive_path) + data_path = dt_utils.get_datapath_for_experiment(experiment, single_rank_processor_props) + data_handling.download_test_data(data_path.parent, uri) + + backend = model_options.customize_backend( + program=None, backend=driver_utils.get_backend_from_name(backend_name) + ) + data_provider = dt_utils.create_icon_serial_data_provider( + data_path, single_rank_processor_props.rank, backend + ) + savepoint_nonhydro_exit = data_provider.from_savepoint_nonhydro_exit( + istep=istep_exit, + date=step_date_exit, + substep=substep_exit, + ) + savepoint_diffusion_exit = data_provider.from_savepoint_diffusion_exit( + linit=timeloop_diffusion_linit_exit, + date=step_date_exit, + ) + serial_reference_fields = { + "vn": savepoint_diffusion_exit.vn().asnumpy(), + "w": savepoint_diffusion_exit.w().asnumpy(), + "exner": savepoint_diffusion_exit.exner().asnumpy(), + "theta_v": savepoint_diffusion_exit.theta_v().asnumpy(), + "rho": savepoint_nonhydro_exit.rho_new().asnumpy(), + } + + fields = ["vn", "w", "exner", "theta_v", "rho"] + for field_name in fields: + print(f"verifying field {field_name}") + global_reference_field = processor_props.comm.bcast( + serial_reference_fields.get(field_name) + if serial_reference_fields is not None + else None, + root=0, + ) + local_field = getattr(multi_rank_ds.prognostics.current, field_name) + dim = local_field.domain.dims[0] + parallel_helpers.check_local_global_field( + decomposition_info=decomposition_info, + processor_props=processor_props, + dim=dim, + global_reference_field=global_reference_field, + local_field=local_field.asnumpy(), + check_halos=True, + atol=1e-6, + ) diff --git a/model/testing/src/icon4py/model/testing/parallel_helpers.py b/model/testing/src/icon4py/model/testing/parallel_helpers.py index b0ad1b0465..1c6c930252 100644 --- a/model/testing/src/icon4py/model/testing/parallel_helpers.py +++ b/model/testing/src/icon4py/model/testing/parallel_helpers.py @@ -5,15 +5,21 @@ # # Please, refer to the LICENSE file in the root directory. # SPDX-License-Identifier: BSD-3-Clause +import functools import logging +import operator +import numpy as np import pytest +from gt4py import next as gtx from icon4py.model.common import dimension as dims -from icon4py.model.common.decomposition import definitions +from icon4py.model.common.decomposition import definitions, definitions as decomp_defs +from icon4py.model.common.utils import data_allocation as data_alloc log = logging.getLogger(__file__) +_log = log def check_comm_size( @@ -33,3 +39,120 @@ def log_local_field_size(decomposition_info: definitions.DecompositionInfo) -> N f"edges={decomposition_info.global_index(dims.EdgeDim).size}, " f"vertices={decomposition_info.global_index(dims.VertexDim).size}" ) + + +def gather_field(field: np.ndarray, props: decomp_defs.ProcessProperties) -> tuple: + constant_dims = tuple(field.shape[1:]) + _log.info(f"gather_field on rank={props.rank} - gathering field of local shape {field.shape}") + # Because of sparse indexing the field may have a non-contigous layout, + # which Gatherv doesn't support. Make sure the field is contiguous. + field = np.ascontiguousarray(field) + constant_length = functools.reduce(operator.mul, constant_dims, 1) + local_sizes = np.array(props.comm.gather(field.size, root=0)) + if props.rank == 0: + recv_buffer = np.empty(np.sum(local_sizes), dtype=field.dtype) + _log.info( + f"gather_field on rank = {props.rank} - setup receive buffer with size {sum(local_sizes)} on rank 0" + ) + else: + recv_buffer = None + + props.comm.Gatherv(sendbuf=field, recvbuf=(recv_buffer, local_sizes), root=0) + if props.rank == 0: + local_first_dim = tuple(sz // constant_length for sz in local_sizes) + _log.info( + f" gather_field on rank = 0: computed local dims {local_first_dim} - constant dims {constant_dims}" + ) + gathered_field = recv_buffer.reshape((-1, *constant_dims)) # type: ignore [union-attr] + else: + gathered_field = None + local_first_dim = field.shape + return local_first_dim, gathered_field + + +def check_local_global_field( + decomposition_info: decomp_defs.DecompositionInfo, + processor_props: decomp_defs.ProcessProperties, # F811 # fixture + dim: gtx.Dimension, + global_reference_field: np.ndarray, + local_field: np.ndarray, + check_halos: bool, + atol: float, +) -> None: + if dim == dims.KDim: + np.testing.assert_allclose(global_reference_field, local_field) + return + + _log.info( + f" rank= {processor_props.rank}/{processor_props.comm_size}----exchanging field of main dim {dim}" + ) + assert ( + local_field.shape[0] + == decomposition_info.global_index(dim, decomp_defs.DecompositionInfo.EntryType.ALL).shape[ + 0 + ] + ) + + def _non_blocking_allclose(a: np.ndarray, b: np.ndarray, atol: float, verbose: bool) -> None: + print("max diff", np.max(np.abs(a - b))) + + # Compare halo against global reference field + if check_halos: + print("checking halos") + _non_blocking_allclose( + # np.testing.assert_allclose( + global_reference_field[ + data_alloc.as_numpy( + decomposition_info.global_index( + dim, decomp_defs.DecompositionInfo.EntryType.HALO + ) + ) + ], + local_field[ + data_alloc.as_numpy( + decomposition_info.local_index( + dim, decomp_defs.DecompositionInfo.EntryType.HALO + ) + ) + ], + atol=atol, + verbose=True, + ) + + # Compare owned local field, excluding halos, against global reference + # field, by gathering owned entries to the first rank. This ensures that in + # total we have the full global field distributed on all ranks. + owned_entries = local_field[ + data_alloc.as_numpy( + decomposition_info.local_index(dim, decomp_defs.DecompositionInfo.EntryType.OWNED) + ) + ] + gathered_sizes, gathered_field = gather_field(owned_entries, processor_props) + + global_index_sizes, gathered_global_indices = gather_field( + data_alloc.as_numpy( + decomposition_info.global_index(dim, decomp_defs.DecompositionInfo.EntryType.OWNED) + ), + processor_props, + ) + + if processor_props.rank == 0: + _log.info(f"rank = {processor_props.rank}: asserting gathered fields: ") + + assert np.all( + gathered_sizes == global_index_sizes + ), f"gathered field sizes do not match: {dim} {gathered_sizes} - {global_index_sizes}" + _log.info( + f"rank = {processor_props.rank}: Checking field size on dim ={dim}: --- gathered sizes {gathered_sizes} = {sum(gathered_sizes)}" + ) + _log.info( + f"rank = {processor_props.rank}: --- gathered field has size {gathered_sizes}" + ) + sorted_ = np.zeros(global_reference_field.shape, dtype=gtx.float64) # type: ignore [attr-defined] + sorted_[gathered_global_indices] = gathered_field + _log.info( + f" rank = {processor_props.rank}: SHAPES: global reference field {global_reference_field.shape}, gathered = {gathered_field.shape}" + ) + + # np.testing.assert_allclose(sorted_, global_reference_field, atol=atol, verbose=True) + _non_blocking_allclose(sorted_, global_reference_field, atol=atol, verbose=True) diff --git a/model/testing/src/icon4py/model/testing/serialbox.py b/model/testing/src/icon4py/model/testing/serialbox.py index 15e86d11e3..6498c930c1 100644 --- a/model/testing/src/icon4py/model/testing/serialbox.py +++ b/model/testing/src/icon4py/model/testing/serialbox.py @@ -73,7 +73,7 @@ def wrapper(self, *args, **kwargs): # as a workaround for the lack of support for optional fields in gt4py. shp = (1,) * len(dims) return gtx.as_field( - dims, np.zeros(shp, dtype=dtype), allocator=self.backend + dims, self.xp.zeros(shp, dtype=dtype), allocator=self.backend ) else: return None @@ -559,9 +559,8 @@ def construct_icon_grid( def potentially_revert_icon_index_transformation(ar): return ar else: - potentially_revert_icon_index_transformation = functools.partial( - grid_utils.revert_repeated_index_to_invalid, - array_ns=data_alloc.import_array_ns(backend), + potentially_revert_icon_index_transformation = ( + grid_utils.revert_repeated_index_to_invalid ) c2e2c = self.c2e2c() diff --git a/pyproject.toml b/pyproject.toml index 88713cddf1..eafb0fb033 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -407,6 +407,7 @@ url = 'https://gridtools.github.io/pypi/' [tool.uv.sources] dace = {index = "gridtools"} +gt4py = {git = "https://github.com/GridTools/gt4py", branch = "workaround_caching_issue_with_embedded_inverse_image_caching"} # gt4py = {git = "https://github.com/GridTools/gt4py", branch = "main"} # gt4py = {index = "test.pypi"} icon4py-atmosphere-advection = {workspace = true} diff --git a/uv.lock b/uv.lock index 8bae6ba865..b9154a49a2 100644 --- a/uv.lock +++ b/uv.lock @@ -1428,8 +1428,8 @@ wheels = [ [[package]] name = "gt4py" -version = "1.1.7" -source = { registry = "https://pypi.org/simple" } +version = "1.1.6.post10+8f3567da" +source = { git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching#8f3567da4fba75ad9ec68c29409f785d9ea95333" } dependencies = [ { name = "array-api-compat" }, { name = "attrs" }, @@ -1460,10 +1460,6 @@ dependencies = [ { name = "versioningit" }, { name = "xxhash" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/85/8e/8520d068a4fd51d9a52b022895b5118a2a4aca0e596fe50dda54cc49e87b/gt4py-1.1.7.tar.gz", hash = "sha256:76ce71747bea7bf2345b73bdc24ace177547958b5d60bfd973b5e1d9e528ba1b", size = 814320, upload-time = "2026-03-11T16:53:54.715Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/29/4c/bbf846ed799ebc4125b9cc12e4ce7aaa105d39829d00d5f57e8303d43d2c/gt4py-1.1.7-py3-none-any.whl", hash = "sha256:b408f1bea1ef29a4c8b6d4e53ec5f69773d3ebe3f5f3f751277734ad7b40ff95", size = 1025469, upload-time = "2026-03-11T16:53:53.191Z" }, -] [package.optional-dependencies] cuda11 = [ @@ -1794,7 +1790,7 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "gt4py", specifier = "==1.1.7" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "icon4py-common", editable = "model/common" }, { name = "packaging", specifier = ">=20.0" }, ] @@ -1811,7 +1807,7 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "gt4py", specifier = "==1.1.7" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "icon4py-common", editable = "model/common" }, { name = "packaging", specifier = ">=20.0" }, ] @@ -1828,7 +1824,7 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "gt4py", specifier = "==1.1.7" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "icon4py-common", editable = "model/common" }, { name = "packaging", specifier = ">=20.0" }, ] @@ -1845,7 +1841,7 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "gt4py", specifier = "==1.1.7" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "icon4py-common", editable = "model/common" }, { name = "packaging", specifier = ">=20.0" }, ] @@ -1863,7 +1859,7 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "gt4py", specifier = "==1.1.7" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "icon4py-common", extras = ["io"], editable = "model/common" }, { name = "numpy", specifier = ">=1.23.3" }, { name = "packaging", specifier = ">=20.0" }, @@ -1933,9 +1929,9 @@ requires-dist = [ { name = "dace", specifier = "==43!2026.2.12", index = "https://gridtools.github.io/pypi/" }, { name = "datashader", marker = "extra == 'io'", specifier = ">=0.16.1" }, { name = "ghex", marker = "extra == 'distributed'", specifier = ">=0.5.0" }, - { name = "gt4py", specifier = "==1.1.7" }, - { name = "gt4py", extras = ["cuda11"], marker = "extra == 'cuda11'" }, - { name = "gt4py", extras = ["cuda12"], marker = "extra == 'cuda12'" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, + { name = "gt4py", extras = ["cuda11"], marker = "extra == 'cuda11'", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, + { name = "gt4py", extras = ["cuda12"], marker = "extra == 'cuda12'", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "holoviews", marker = "extra == 'io'", specifier = ">=1.16.0" }, { name = "icon4py-common", extras = ["distributed", "io"], marker = "extra == 'all'", editable = "model/common" }, { name = "mpi4py", marker = "extra == 'distributed'", specifier = ">=3.1.5" }, @@ -1972,7 +1968,7 @@ dependencies = [ requires-dist = [ { name = "click", specifier = ">=8.0.1" }, { name = "devtools", specifier = ">=0.12" }, - { name = "gt4py", specifier = "==1.1.7" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "icon4py-atmosphere-diffusion", editable = "model/atmosphere/diffusion" }, { name = "icon4py-atmosphere-dycore", editable = "model/atmosphere/dycore" }, { name = "icon4py-common", editable = "model/common" }, @@ -2001,7 +1997,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "devtools", specifier = ">=0.12" }, - { name = "gt4py", specifier = "==1.1.7" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "icon4py-atmosphere-diffusion", editable = "model/atmosphere/diffusion" }, { name = "icon4py-atmosphere-dycore", editable = "model/atmosphere/dycore" }, { name = "icon4py-common", editable = "model/common" }, @@ -2031,7 +2027,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "filelock", specifier = ">=3.18.0" }, - { name = "gt4py", specifier = "==1.1.7" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "icon4py-common", extras = ["io"], editable = "model/common" }, { name = "numpy", specifier = ">=1.23.3" }, { name = "packaging", specifier = ">=20.0" }, @@ -2083,9 +2079,9 @@ requires-dist = [ { name = "cupy-cuda11x", marker = "extra == 'cuda11'", specifier = ">=13.0" }, { name = "cupy-cuda12x", marker = "extra == 'cuda12'", specifier = ">=13.0" }, { name = "fprettify", specifier = ">=0.3.7" }, - { name = "gt4py", specifier = "==1.1.7" }, - { name = "gt4py", extras = ["cuda11"], marker = "extra == 'cuda11'" }, - { name = "gt4py", extras = ["cuda12"], marker = "extra == 'cuda12'" }, + { name = "gt4py", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, + { name = "gt4py", extras = ["cuda11"], marker = "extra == 'cuda11'", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, + { name = "gt4py", extras = ["cuda12"], marker = "extra == 'cuda12'", git = "https://github.com/GridTools/gt4py?branch=workaround_caching_issue_with_embedded_inverse_image_caching" }, { name = "icon4py-atmosphere-advection", editable = "model/atmosphere/advection" }, { name = "icon4py-atmosphere-diffusion", editable = "model/atmosphere/diffusion" }, { name = "icon4py-atmosphere-dycore", editable = "model/atmosphere/dycore" },