Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distribution of detector blocks across MPI processes #334

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Install basic dependencies (MPI, FFTW, poetry, IMO)
run: |
./bin/install-deps.sh ${{ matrix.mpi }}
pip install poetry
pip install poetry==1.8.5
mkdir -p $HOME/.config/litebird_imo
echo -e "[[repositories]]\nlocation = \"$(pwd)/test/mock_imo/\"\nname = \"Mock IMO\"" | tee $HOME/.config/litebird_imo/imo.toml

Expand Down Expand Up @@ -87,6 +87,7 @@ jobs:
for proc in 1 5 9 ; do
echo "Running MPI test ($MPI) with $proc processes"
PYTHONPATH=. mpiexec -n $proc python3 ./test/test_mpi.py
PYTHONPATH=. mpiexec -n $proc python3 ./test/test_detector_blocks.py
done

- name: Tests OpenMPI
Expand All @@ -95,4 +96,5 @@ jobs:
for proc in 1 2 ; do
echo "Running MPI test ($MPI) with $proc processes"
PYTHONPATH=. mpiexec -n $proc python3 ./test/test_mpi.py
PYTHONPATH=. mpiexec -n $proc python3 ./test/test_detector_blocks.py
done
4 changes: 2 additions & 2 deletions .readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ build:
jobs:
pre_create_environment:
- asdf plugin add poetry
- asdf install poetry latest
- asdf global poetry latest
- asdf install poetry 1.8.5
- asdf global poetry 1.8.5
- poetry export --without-hashes > requirements.txt
post_install:
- pip install -r requirements.txt
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/source/images/detector_groups_case1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/source/images/detector_groups_case2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/source/images/detector_groups_case3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/source/images/grid_communicator.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/source/images/time_block_communicator.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 25 additions & 0 deletions docs/source/mpi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,31 @@ variable :data:`.MPI_ENABLED`::
To ensure that your code uses MPI in the proper way, you should always
use :data:`.MPI_COMM_WORLD` instead of importing ``mpi4py`` directly.

The simulation framework also provides a global object
:data:`.MPI_COMM_GRID`. It has two attributes:

- ``COMM_OBS_GRID``: This is an MPI communicator that contains all the
MPI processes with the global rank less than ``n_blocks_time * n_blocks_det``.
It provides a safety net to the operations and MPI communications
that are needed to be performed only on the partition of :data:`.MPI_COMM_WORLD`
that contain non-zero number of pointings and TODs. By default,
``COMM_OBS_GRID`` points to the global MPI communicator :data:`.MPI_COMM_WORLD`.
It is updated once :class:`.Observation` are defined. For example,
consider the case when a user runs the simulation with 10 MPI
processes but due some specific ``det_blocks_attributes`` argument
in :class:`.Observation` class, the number of detector and time
blocks are determined to be 2 and 4 respectively. Then the
simulation framework will store the pointings and TODs only on
:math:`2\times4=8` MPI processes and the last two ranks of :data:`.MPI_COMM_WORLD`
will be left unused. Once this happens, ``COMM_OBS_GRID`` on first 8
ranks (rank 0 to 7) will point to the local sub-communicator
containing the processes with global rank 0 to 7. On the unused
ranks, it will simply point to the NULL communicator.
- ``COMM_NULL``: If :data:`.MPI_ENABLED` is ``True``, this object
points to a NULL MPI communicator (``mpi4py.MPI.COMM_NULL``).
Otherwise it is set to ``None``. The user should compare
``COMM_OBS_GRID`` with ``COMM_NULL`` on every MPI process in order
to avoid running a piece of code on unused MPI processes.

Enabling/disabling MPI
----------------------
Expand Down
257 changes: 239 additions & 18 deletions docs/source/observations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,15 @@ With this memory layout, typical operations look like this::
Parallel applications
---------------------

The only work that the :class:`.Observation` class actually does is handling
parallelism. ``obs.tod`` can be distributed over a
The :class:`.Observation` class allows the distribution of ``obs.tod`` over multiple MPI
processes to enable the parallelization of computations. The distribution of ``obs.tod``
can be achieved in two different ways:

1. Uniform distribution of detectors along the detector axis
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

With ``n_blocks_det`` and ``n_blocks_time`` arguments of :class:`.Observation` class,
the ``obs.tod`` is evenly distributed over a
``n_blocks_det`` by ``n_blocks_time`` grid of MPI ranks. The blocks can be
changed at run-time.

Expand All @@ -111,7 +118,7 @@ The main advantage is that the example operations in the Serial section are
achieved with the same lines of code.
The price to pay is that you have to set detector properties with special methods.

::
.. code-block:: python

import litebird_sim as lbs
from mpi4py import MPI
Expand Down Expand Up @@ -158,21 +165,235 @@ TOD) gets distributed.

.. image:: ./images/observation_data_distribution.png

When ``n_blocks_det != 1``, keep in mind that ``obs.tod[0]`` or
``obs.wn_levels[0]`` are quantities of the first *local* detector, not global.
This should not be a problem as the only thing that matters is that the two
quantities refer to the same detector. If you need the global detector index,
you can get it with ``obs.det_idx[0]``, which is created
at construction time.

To get a better understanding of how observations are being used in a
MPI simulation, use the method :meth:`.Simulation.describe_mpi_distribution`.
This method must be called *after* the observations have been allocated using
:meth:`.Simulation.create_observations`; it will return an instance of the
class :class:`.MpiDistributionDescr`, which can be inspected to determine
which detectors and time spans are covered by each observation in all the
MPI processes that are being used. For more information, refer to the Section
:ref:`simulations`.
2. Custom grouping of detectors along the detector axis
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

While uniform distribution of detectors along the detector axis optimizes load
balancing, it is less suitable for simulating some effects, like crosstalk and
noise correlation between the detectors. For these effects, uniform distribution
across MPI processes necessitates the transfer of large TOD arrays across multiple
MPI processes, which complicates the code implementation and may potentially lead
to significant performance overhead. To save us from this situation, the
:class:`.Observation` class
accepts an argument ``det_blocks_attributes`` that is a list of string objects
specifying the detector attributes to create the group of detectors. Once the
detector groups are made, the detectors are distributed to the MPI processes in such
a way that all the detectors of a group reside on the same set of MPI processes.

If a valid ``det_blocks_attributes`` argument is passed to the :class:`.Observation`
class, the arguments ``n_blocks_det`` and ``n_blocks_time`` are ignored. Since the
``det_blocks_attributes`` creates the detector blocks dynamically, the
``n_blocks_time`` is computed during runtime using the size of MPI communicator and
the number of detector blocks (``n_blocks_time = comm.size // n_blocks_det``).

The detector blocks made in this way can be accessed with
``Observation.detector_blocks``. It is a dictionary object that has the tuple of
``det_blocks_attributes`` values as dictionary keys and the list of detectors
corresponding to the key as dictionary values. This dictionary is sorted so that the
group with the largest number of detectors comes first and the one with
the fewest detectors comes last.

The following example illustrates the distribution of ``obs.tod`` matrix across the
MPI processes when ``det_blocks_attributes`` is specified.

.. code-block:: python

import litebird_sim as lbs

comm = lbs.MPI_COMM_WORLD

start_time = 456
duration_s = 100
sampling_freq_Hz = 1

# Creating a list of detectors.
dets = [
lbs.DetectorInfo(
name="channel1_w9_detA",
wafer="wafer_9",
channel="channel1",
sampling_rate_hz=sampling_freq_Hz,
),
lbs.DetectorInfo(
name="channel1_w3_detB",
wafer="wafer_3",
channel="channel1",
sampling_rate_hz=sampling_freq_Hz,
),
lbs.DetectorInfo(
name="channel1_w1_detC",
wafer="wafer_1",
channel="channel1",
sampling_rate_hz=sampling_freq_Hz,
),
lbs.DetectorInfo(
name="channel1_w1_detD",
wafer="wafer_1",
channel="channel1",
sampling_rate_hz=sampling_freq_Hz,
),
lbs.DetectorInfo(
name="channel2_w4_detA",
wafer="wafer_4",
channel="channel2",
sampling_rate_hz=sampling_freq_Hz,
),
lbs.DetectorInfo(
name="channel2_w4_detB",
wafer="wafer_4",
channel="channel2",
sampling_rate_hz=sampling_freq_Hz,
),
]

# Initializing a simulation
sim = lbs.Simulation(
start_time=start_time,
duration_s=duration_s,
random_seed=12345,
mpi_comm=comm,
)

# Creating the observations with detector blocks
sim.create_observations(
detectors=dets,
split_list_over_processes=False,
num_of_obs_per_detector=3,
det_blocks_attributes=["channel"], # case 1 and 2
# det_blocks_attributes=["channel", "wafer"] # case 3
)

With the list of detectors defined in the code snippet above, we can see how the
detectors axis and time axis is divided depending on the size of MPI communicator and
``det_blocks_attributes``.

**Case 1**

*Size of global MPI communicator = 3*, ``det_blocks_attributes=["channel"]``

.. image:: ./images/detector_groups_case1.png

**Case 2**

*Size of global MPI communicator = 4*, ``det_blocks_attributes=["channel"]``

.. image:: ./images/detector_groups_case2.png

**Case 3**

*Size of global MPI communicator = 10*, ``det_blocks_attributes=["channel", "wafer"]``

.. image:: ./images/detector_groups_case3.png

.. note::
When ``n_blocks_det != 1``, keep in mind that ``obs.tod[0]`` or
``obs.wn_levels[0]`` are quantities of the first *local* detector, not global.
This should not be a problem as the only thing that matters is that the two
quantities refer to the same detector. If you need the global detector index,
you can get it with ``obs.det_idx[0]``, which is created at construction time.
``obs.det_idx`` stores the detector indices of the detectors available to an
:class:`.Observation` class, with respect to the list of detectors stored in
``obs.detectors_global`` variable.

.. note::
To get a better understanding of how observations are being used in a
MPI simulation, use the method :meth:`.Simulation.describe_mpi_distribution`.
This method must be called *after* the observations have been allocated using
:meth:`.Simulation.create_observations`; it will return an instance of the
class :class:`.MpiDistributionDescr`, which can be inspected to determine
which detectors and time spans are covered by each observation in all the
MPI processes that are being used. For more information, refer to the Section
:ref:`simulations`.

MPI communicators
^^^^^^^^^^^^^^^^^

The simulation framework exposes MPI communicators at different
levels. The first one is the global MPI communicator. It can be
accessed with a global variable :data:`.MPI_COMM_WORLD`, and it is
same as ``mpi4py.MPI.COMM_WORLD``. It contains all the MPI processes
used by the script. The other MPI communicators defined in the
simulation framework are the partitions of global MPI communicator and
they contain the MPI processes that have certain properties as we
explain below. For all the examples in this sub-section, we consider
the distribution of TODs across 10 MPI processes with ``n_blocks_time = 2``
and ``n_blocks_det = 4``.

To distribute the TODs across
``n_blocks_det`` :math:`\times` ``n_blocks_time`` :math:`=\, N`
MPI ranks, it is necessary that the script is executed with at least
:math:`N` MPI processes. There is, however, no upper limit on the
number of MPI processes to be used. When the number of MPI processes
is higher than :math:`N`, it leaves all the MPI processes with
``rank`` :math:`\geq N` with no detector (and TOD). In many cases, it
is useful to identify the unused ranks so that they can be avoided
while performing some computations. The :class:`.Observation` class
makes this happen by making a sub-communicator containing only the
processes that contain a non-zero number of detectors (and TODs).

However, once the detectors and TODs are distributed across several
processes, it is not trivial to find out all the ranks that contain
the TOD chunks of a given detector (or detector block). Likewise, it
is hard to find all the ranks that contain the TOD chunks
corresponding to the same time interval. To solve this issue, :class:`.Observation`
class provides the MPI sub-communicators corresponding to different
groups of MPI processes.

The three sub-communicators provided by the framework - each generated
by splitting the global MPI communicator - are listed below. The
process ranks in the sub-communicators are based on the order of their
global ranks:

- **Grid communicator**: Once the number of detector blocks and the
number of time blocks are available, the Observation class splits the
global communicator into a grid communicator and a null communicator.
Grid communicator consist all the mpi processes whose rank is less
than :math:`N`. The null communicator contains all other MPI
processes. On the MPI processes with global rank less than
:math:`N`, the global variable ``MPI_COMM_GRID.COMM_OBS_GRID``
points to the grid communicator. On other MPI processes, it points
to the null MPI communicator (``MPI_COMM_GRID.COMM_NULL`` which is
same as ``mpi4py.MPI.COMM_NULL``). In the example below, the
processes with global rank from 0 to 7 belong to the grid
sub-communicator. Since the processes with rank 8 and 9 are unused,
they are excluded from the grid communicator.

.. image:: ./images/grid_communicator.png

Note that, to exclude the MPI processes belonging to the null
communicator from the computations, one should enclose the
computations under an if condition comparing
``MPI_COMM_GRID.COMM_OBS_GRID`` against ``MPI_COMM_GRID.COMM_NULL``:

::

if MPI_COMM_GRID.COMM_OBS_GRID != MPI_COMM_GRID.COMM_NULL:
# proceed with the following computations when
# MPI_COMM_GRID.COMM_OBS_GRID is not null
...

- **Detector-block communicator**: The detector-block communicator is
made by splitting the grid communicator into ``n_blocks_det``
sub-communicators, such that each sub-communicator contains the
processes where TODs of the same set of detectors reside. This
sub-communicator can be accessed with ``comm_det_blocks`` attribute
of the :class:`.Observation` class. In the following example, the
grid communicator is split into 4 detector-block communicators
containing the processes with global rank 0-1, 2-3, 4-5, and 6-7
respectively.

.. image:: ./images/detector_block_communicator.png

- **Time-block communicator**: The time-block communicator is made by
splitting the grid communicator into ``n_blocks_time``
sub-communicators, such that each sub-communicator contains the
processes where TOD chunks of same time interval reside. This
sub-communicator can be accessed with ``comm_time_block`` attribute
of the :class:`.Observation` class. In the example below, the grid
communicator is split into 2 time-block communicators containing the
processes with global rank 0-2-4-6 and 1-3-5-7 respectively.

.. image:: ./images/time_block_communicator.png

Other notable functionalities
-----------------------------
Expand Down
3 changes: 2 additions & 1 deletion litebird_sim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
)
from .madam import save_simulation_for_madam
from .mbs.mbs import Mbs, MbsParameters, MbsSavedMapInfo
from .mpi import MPI_COMM_WORLD, MPI_ENABLED, MPI_CONFIGURATION
from .mpi import MPI_COMM_WORLD, MPI_ENABLED, MPI_CONFIGURATION, MPI_COMM_GRID
from .noise import (
add_white_noise,
add_one_over_f_noise,
Expand Down Expand Up @@ -218,6 +218,7 @@ def destripe_with_toast2(*args, **kwargs):
"MPI_COMM_WORLD",
"MPI_ENABLED",
"MPI_CONFIGURATION",
"MPI_COMM_GRID",
# observations.py
"Observation",
"TodDescription",
Expand Down
7 changes: 5 additions & 2 deletions litebird_sim/detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class DetectorInfo:

- channel (Union[str, None]): The channel. The default is None

- squid (Union[int, None]): The squid number of the detector.
The default value is None.

- sampling_rate_hz (float): The sampling rate of the ADC
associated with this detector. The default is 0.0

Expand Down Expand Up @@ -136,6 +139,7 @@ class DetectorInfo:
pixel: Union[int, None] = None
pixtype: Union[str, None] = None
channel: Union[str, None] = None
squid: Union[int, None] = None
sampling_rate_hz: float = 0.0
fwhm_arcmin: float = 0.0
ellipticity: float = 0.0
Expand All @@ -148,8 +152,6 @@ class DetectorInfo:
fknee_mhz: float = 0.0
fmin_hz: float = 0.0
alpha: float = 0.0
bandcenter_ghz: float = 0.0
bandwidth_ghz: float = 0.0
pol: Union[str, None] = None
orient: Union[str, None] = None
quat: Any = None
Expand All @@ -175,6 +177,7 @@ def from_dict(dictionary: Dict[str, Any]):
- ``pixel``
- ``pixtype``
- ``channel``
- ``squid``
- ``bandcenter_ghz``
- ``bandwidth_ghz``
- ``band_freqs_ghz``
Expand Down
Loading