Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions HeterogeneousCore/MPICore/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<use name="mpi"/>
<use name="DataFormats/Common"/>
<use name="FWCore/Framework"/>
<use name="HeterogeneousCore/TrivialSerialisation"/>
<export>
<lib name="1"/>
</export>
105 changes: 105 additions & 0 deletions HeterogeneousCore/MPICore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Extend CMSSW to a fully distributed application

Let multiple CMSSW processes on the same or different machines coordinate event processing and transfer data products
over MPI.

The implementation is based on four CMSSW modules.
Two are responsible for setting up the communication channels and coordinate the event processing:
- the `MPIController`
- the `MPISource`

and two are responsible for the transfer of data products:
- the `MPISender`
- the `MPIReceiver`

.

## `MPIController` class

The `MPIController` is an `EDProducer` running in a regular CMSSW process. After setting up the communication with an
`MPISource`, it transmits to it all EDM run, lumi and event transitions, and instructs the `MPISource` to replicate them
in the second process.


## `MPISource` class

The `MPISource` is a `Source` controlling the execution of a second CMSSW process. After setting up the communication
with an `MPIController`, it listens for EDM run, lumi and event transitions, and replicates them in its own process.


## `MPISender` class

The `MPISender` is an `EDProducer` that can read any number of collections of arbitrary types from the `Event`.
For each event, it first sends a metadata message describing the products to be transferred, including their number and
basic characteristics.

If `TrivialCopyTraits` are defined for a given product, the data are transferred directly from the product's memory
regions; otherwise, the product is serialised into a single buffer using its ROOT dictionary. The regions and the buffer
are sent to another process over the MPI communication channel.

The number and types of the collections to be read from the `Event` is determined by the module configuration. The
configuration can speficy a list of module labels, branch names, or a mix of the two:
- a module label selects all collections produced by that module, irrespective of the type and instance;
- a branch name selects only the collections that match all the branch fields (type, label, instance, process name),
similar to an `OutputModule`'s `"keep ..."` statement.

Wildcards (`?` and `*`) are allowed in a module label or in each field of a branch name.


## `MPIReceiver` class

The `MPIReceiver` is an `EDProducer` that can receive any number of collections of arbitrary types over the MPI
communication channel. It first receives metadata, which is leter used to initialise trivially copyable products and
allocate buffers for serialised products.

For trivially copyable products, the receiver initialises the target objects using the metadata and performs an
`MPI_Recv` for each memory region. For non-trivially copyable products, it receives the serialised buffer and
deserialises it using the corresponding ROOT dictionary.

All received products are put into the `Event`. The number, type and label of the collections to be produced is
determined by the module configuration.

For each collection, the `type` indicates the C++ type as understood by the ROOT dictionary, and the `label` indicates
the module instance label to be used for producing that cllection into the `Event`.


## `MPISender` and `MPIReceiver` instances

Both `MPISender` and `MPIReceiver` are configured with an instance value that is used to match one `MPISender` in one
process to one `MPIReceiver` in another process. Using different instance values allows the use of multiple pairs of
`MPISender`/`MPIReceiver` modules in a process.


## MPI communication channel

The `MPIController` and `MPISource` produce an `MPIToken`, a special data product that encapsulates the information
about the MPI communication channel.

Both `MPISender` and `MPIReceiver` obtain the MPI communication channel reading an `MPIToken` from the event, identified
by the `upstream` parmeter.
They also produce a copy of the `MPIToken`, so other modules can consume it to declare a dependency on those modules.


## Testing

An automated test is available in the `test/` directory.


## Current limitations

- `MPIController` is a "one" module that supports only a single luminosity block at a time;
- there is only a partial check the number, type and order of collections sent by the `MPISender` matches those
expected by the `MPIReceiver`.


## Notes for future developments

- implement efficient GPU-direct transfers for trivially serialisable products (in progress);
- check the the collection sent by the `MPISender` and the one expected by the `MPIReceiver` match;
- integrate filter decisions and GPU backend into the metadata message
- improve the `MPIController` to be a `global` module rather than a `one` module;
- let an `MPISource` accept connections and events from multiple `MPIController` modules in different jobs;
- let an `MPIController` connect and sent events to multiple `MPISource` modules in different jobs (in progress);
- support multiple concurrent runs and luminosity blocks, up to a given maximum;
- when a run, luminosity block or event is received, check that they belong to the same `ProcessingHistory` as the
ongoing run?
26 changes: 26 additions & 0 deletions HeterogeneousCore/MPICore/interface/MPIToken.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#ifndef HeterogeneousCore_MPICore_MPIToken_h
#define HeterogeneousCore_MPICore_MPIToken_h

// C++ standard library headers
#include <memory>

// forward declaration
class MPIChannel;

class MPIToken {
public:
// default constructor, needed to write the type's dictionary
MPIToken() = default;

// user-defined constructor
explicit MPIToken(std::shared_ptr<MPIChannel> channel) : channel_(channel) {}

// access the data member
MPIChannel* channel() const { return channel_.get(); }

private:
// wrap the MPI communicator and destination
std::shared_ptr<MPIChannel> channel_;
};

#endif // HeterogeneousCore_MPICore_MPIToken_h
182 changes: 182 additions & 0 deletions HeterogeneousCore/MPICore/interface/api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#ifndef HeterogeneousCore_MPICore_interface_api_h
#define HeterogeneousCore_MPICore_interface_api_h

// C++ standard library headers
#include <bitset>
#include <iostream>
#include <type_traits>
#include <utility>

// MPI headers
#include <mpi.h>

// ROOT headers
#include <TClass.h>

// CMSSW headers
#include "DataFormats/Common/interface/WrapperBase.h"
#include "DataFormats/Provenance/interface/ProvenanceFwd.h"
#include "FWCore/Reflection/interface/ObjectWithDict.h"
#include "HeterogeneousCore/MPICore/interface/messages.h"
#include "HeterogeneousCore/MPICore/interface/metadata.h"
#include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h"
#include "HeterogeneousCore/TrivialSerialisation/interface/WriterBase.h"

class MPIChannel {
public:
MPIChannel() = default;
MPIChannel(MPI_Comm comm, int destination) : comm_(comm), dest_(destination) {}

// build a new MPIChannel that uses a duplicate of the underlying communicator and the same destination
MPIChannel duplicate() const;

// close the underlying communicator and reset the MPIChannel to an invalid state
void reset();

// announce that a client has just connected
void sendConnect() { sendEmpty_(EDM_MPI_Connect); }

// announce that the client will disconnect
void sendDisconnect() { sendEmpty_(EDM_MPI_Disconnect); }

// signal the begin of stream
void sendBeginStream() { sendEmpty_(EDM_MPI_BeginStream); }

// signal the end of stream
void sendEndStream() { sendEmpty_(EDM_MPI_EndStream); }

// signal a new run, and transmit the RunAuxiliary
void sendBeginRun(edm::RunAuxiliary const& aux) { sendRunAuxiliary_(EDM_MPI_BeginRun, aux); }

// signal the end of run, and re-transmit the RunAuxiliary
void sendEndRun(edm::RunAuxiliary const& aux) { sendRunAuxiliary_(EDM_MPI_EndRun, aux); }

// signal a new luminosity block, and transmit the LuminosityBlockAuxiliary
void sendBeginLuminosityBlock(edm::LuminosityBlockAuxiliary const& aux) {
sendLuminosityBlockAuxiliary_(EDM_MPI_BeginLuminosityBlock, aux);
}

// signal the end of luminosity block, and re-transmit the LuminosityBlockAuxiliary
void sendEndLuminosityBlock(edm::LuminosityBlockAuxiliary const& aux) {
sendLuminosityBlockAuxiliary_(EDM_MPI_EndLuminosityBlock, aux);
}

// signal a new event, and transmit the EventAuxiliary
void sendEvent(edm::EventAuxiliary const& aux) { sendEventAuxiliary_(aux); }

/*
// start processing a new event, and receive the EventAuxiliary
MPI_Status receiveEvent(edm::EventAuxiliary& aux, int source) {
return receiveEventAuxiliary_(aux, source, EDM_MPI_ProcessEvent);
}
*/

// start processing a new event, and receive the EventAuxiliary
MPI_Status receiveEvent(edm::EventAuxiliary& aux, MPI_Message& message) {
return receiveEventAuxiliary_(aux, message);
}

void sendMetadata(int instance, std::shared_ptr<ProductMetadataBuilder> meta);
void receiveMetadata(int instance, std::shared_ptr<ProductMetadataBuilder> meta);

// send buffer of serialized products
void sendBuffer(const void* buf, size_t size, int instance, EDM_MPI_MessageTag tag);

// serialize an object of type T using its ROOT dictionary, and transmit it
template <typename T>
void sendProduct(int instance, T const& product) {
if constexpr (std::is_fundamental_v<T>) {
sendTrivialProduct_(instance, product);
} else {
static const TClass* type = TClass::GetClass<T>();
if (!type) {
throw std::runtime_error("ROOT dictionary not found for type " + std::string(typeid(T).name()));
}
sendSerializedProduct_(instance, type, &product);
}
}

// receive and object of type T, and deserialize it using its ROOT dictionary
template <typename T>
void receiveProduct(int instance, T& product) {
if constexpr (std::is_fundamental_v<T>) {
receiveTrivialProduct_(instance, product);
} else {
static const TClass* type = TClass::GetClass<T>();
if (!type) {
throw std::runtime_error("ROOT dictionary not found for type " + std::string(typeid(T).name()));
}
receiveSerializedProduct_(instance, type, &product);
}
}

// serialize a generic object using its ROOT dictionary, and send the binary blob
void sendSerializedProduct_(int instance, TClass const* type, void const* product);

// receive a binary blob, and deserialize an object of generic type using its ROOT dictionary
void receiveSerializedProduct_(int instance, TClass const* type, void* product);

// receive product buffer of known size
std::unique_ptr<TBufferFile> receiveSerializedBuffer(int instance, int bufSize);

// transfer a wrapped object using its MemoryCopyTraits
void sendTrivialCopyProduct(int instance, const ngt::ReaderBase& reader);

// receive into wrapped object
void receiveInitializedTrivialCopy(int instance, ngt::WriterBase& writer);

private:
// serialize an EDM object to a simplified representation that can be transmitted as an MPI message
void edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux);
void edmToBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux);
void edmToBuffer_(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux);

// dwserialize an EDM object from a simplified representation transmitted as an MPI message
void edmFromBuffer_(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux);
void edmFromBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux);
void edmFromBuffer_(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux);

// fill and send an EDM_MPI_Empty_t buffer
void sendEmpty_(int tag);

// fill and send an EDM_MPI_RunAuxiliary_t buffer
void sendRunAuxiliary_(int tag, edm::RunAuxiliary const& aux);

// fill and send an EDM_MPI_LuminosityBlock_t buffer
void sendLuminosityBlockAuxiliary_(int tag, edm::LuminosityBlockAuxiliary const& aux);

// fill and send an EDM_MPI_EventAuxiliary_t buffer
void sendEventAuxiliary_(edm::EventAuxiliary const& aux);

// receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary
MPI_Status receiveEventAuxiliary_(edm::EventAuxiliary& aux, int source, int tag);
MPI_Status receiveEventAuxiliary_(edm::EventAuxiliary& aux, MPI_Message& message);

// this is what is used for sending when product is of raw fundamental type
template <typename T>
void sendTrivialProduct_(int instance, T const& product) {
int tag = EDM_MPI_SendTrivialProduct | instance * EDM_MPI_MessageTagWidth_;
MPI_Send(&product, sizeof(T), MPI_BYTE, dest_, tag, comm_);
}

// this is what is used when product is of raw fundamental type
template <typename T>
void receiveTrivialProduct_(int instance, T& product) {
int tag = EDM_MPI_SendTrivialProduct | instance * EDM_MPI_MessageTagWidth_;
MPI_Message message;
MPI_Status status;
MPI_Mprobe(dest_, tag, comm_, &message, &status);
int size;
MPI_Get_count(&status, MPI_BYTE, &size);
assert(static_cast<int>(sizeof(T)) == size);
MPI_Mrecv(&product, size, MPI_BYTE, &message, MPI_STATUS_IGNORE);
}

// MPI intercommunicator
MPI_Comm comm_ = MPI_COMM_NULL;

// MPI destination
int dest_ = MPI_UNDEFINED;
};

#endif // HeterogeneousCore_MPICore_interface_api_h
26 changes: 26 additions & 0 deletions HeterogeneousCore/MPICore/interface/conversion.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#ifndef HeterogeneousCore_MPICore_interface_conversion_h
#define HeterogeneousCore_MPICore_interface_conversion_h

// CMSSW headers
#include "DataFormats/Provenance/interface/ProvenanceFwd.h"
#include "HeterogeneousCore/MPICore/interface/messages.h"

// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary_t buffer
void edmFromBuffer(EDM_MPI_RunAuxiliary_t const &, edm::RunAuxiliary &);

// fill an EDM_MPI_RunAuxiliary_t buffer from an edm::RunAuxiliary object
void edmToBuffer(EDM_MPI_RunAuxiliary_t &, edm::RunAuxiliary const &);

// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary_t buffer
void edmFromBuffer(EDM_MPI_LuminosityBlockAuxiliary_t const &, edm::LuminosityBlockAuxiliary &);

// fill an EDM_MPI_LuminosityBlockAuxiliary_t buffer from an edm::LuminosityBlockAuxiliary object
void edmToBuffer(EDM_MPI_LuminosityBlockAuxiliary_t &, edm::LuminosityBlockAuxiliary const &);

// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary_t buffer
void edmFromBuffer(EDM_MPI_EventAuxiliary_t const &, edm::EventAuxiliary &);

// fill an EDM_MPI_EventAuxiliary_t buffer from an edm::EventAuxiliary object
void edmToBuffer(EDM_MPI_EventAuxiliary_t &, edm::EventAuxiliary const &);

#endif // HeterogeneousCore_MPICore_interface_conversion_h
Loading