From c4ae7ce78b4d8e65fe1dbf18451c71230c45f0db Mon Sep 17 00:00:00 2001 From: Anna Polova Date: Sat, 29 Nov 2025 10:51:40 +0100 Subject: [PATCH] Extend CMSSW to a distributed application over MPI Let multiple CMSSW processes on the same or different machines coordinate event processing and transfer data products over MPI. The implementation is based on four CMSSW modules. Two are responsible for setting up the communication channels and coordinate the event processing: - a "remote controller" called MPIController; - a "remote source" called MPISource; and two are responsible for the transfer of data products: - a "sender" called MPISender; - a "receiver" called MPIReceiver. Data products can be serialised and transferred using the trivial serialisation from HeterogeneousCore/TrivialSerialisation - if available - or the ROOT-based serialisation. Various tests are used to validate the implementation: matching the local and remote event id, transferring SoA products with trivial serialisation, and transferring legacy product with ROOT serialisation. Co-authored-by: Andrea Bocci Co-authored-by: Anna Polova Co-authored-by: Mario Gonzalez --- HeterogeneousCore/MPICore/BuildFile.xml | 7 + HeterogeneousCore/MPICore/README.md | 105 ++++++ .../MPICore/interface/MPIToken.h | 26 ++ HeterogeneousCore/MPICore/interface/api.h | 182 ++++++++++ .../MPICore/interface/conversion.h | 26 ++ .../MPICore/interface/messages.h | 119 +++++++ .../MPICore/interface/metadata.h | 114 ++++++ .../MPICore/plugins/BuildFile.xml | 14 + .../MPICore/plugins/MPIController.cc | 293 ++++++++++++++++ .../MPICore/plugins/MPIReceiver.cc | 196 +++++++++++ .../MPICore/plugins/MPIReporter.cc | 59 ++++ .../MPICore/plugins/MPISender.cc | 263 ++++++++++++++ .../MPICore/plugins/MPISource.cc | 332 ++++++++++++++++++ HeterogeneousCore/MPICore/src/api.cc | 238 +++++++++++++ HeterogeneousCore/MPICore/src/classes.h | 2 + HeterogeneousCore/MPICore/src/classes_def.xml | 4 + HeterogeneousCore/MPICore/src/conversion.cc | 86 +++++ HeterogeneousCore/MPICore/src/macros.h | 149 ++++++++ HeterogeneousCore/MPICore/src/messages.cc | 81 +++++ HeterogeneousCore/MPICore/src/metadata.cc | 241 +++++++++++++ HeterogeneousCore/MPICore/test/BuildFile.xml | 6 + .../MPICore/test/controller_cfg.py | 58 +++ .../MPICore/test/controller_complex_cfg.py | 95 +++++ .../MPICore/test/controller_soa_cfg.py | 56 +++ .../MPICore/test/eventlist_cff.py | 39 ++ .../MPICore/test/follower_cfg.py | 51 +++ .../MPICore/test/follower_complex_cfg.py | 90 +++++ .../MPICore/test/follower_soa_cfg.py | 57 +++ .../MPICore/test/testMPICommWorld.sh | 17 + .../MPICore/test/testMPIInterCommunicator.sh | 71 ++++ 30 files changed, 3077 insertions(+) create mode 100644 HeterogeneousCore/MPICore/BuildFile.xml create mode 100644 HeterogeneousCore/MPICore/README.md create mode 100644 HeterogeneousCore/MPICore/interface/MPIToken.h create mode 100644 HeterogeneousCore/MPICore/interface/api.h create mode 100644 HeterogeneousCore/MPICore/interface/conversion.h create mode 100644 HeterogeneousCore/MPICore/interface/messages.h create mode 100644 HeterogeneousCore/MPICore/interface/metadata.h create mode 100644 HeterogeneousCore/MPICore/plugins/BuildFile.xml create mode 100644 HeterogeneousCore/MPICore/plugins/MPIController.cc create mode 100644 HeterogeneousCore/MPICore/plugins/MPIReceiver.cc create mode 100644 HeterogeneousCore/MPICore/plugins/MPIReporter.cc create mode 100644 HeterogeneousCore/MPICore/plugins/MPISender.cc create mode 100644 HeterogeneousCore/MPICore/plugins/MPISource.cc create mode 100644 HeterogeneousCore/MPICore/src/api.cc create mode 100644 HeterogeneousCore/MPICore/src/classes.h create mode 100644 HeterogeneousCore/MPICore/src/classes_def.xml create mode 100644 HeterogeneousCore/MPICore/src/conversion.cc create mode 100644 HeterogeneousCore/MPICore/src/macros.h create mode 100644 HeterogeneousCore/MPICore/src/messages.cc create mode 100644 HeterogeneousCore/MPICore/src/metadata.cc create mode 100644 HeterogeneousCore/MPICore/test/controller_cfg.py create mode 100644 HeterogeneousCore/MPICore/test/controller_complex_cfg.py create mode 100644 HeterogeneousCore/MPICore/test/controller_soa_cfg.py create mode 100644 HeterogeneousCore/MPICore/test/eventlist_cff.py create mode 100644 HeterogeneousCore/MPICore/test/follower_cfg.py create mode 100644 HeterogeneousCore/MPICore/test/follower_complex_cfg.py create mode 100644 HeterogeneousCore/MPICore/test/follower_soa_cfg.py create mode 100755 HeterogeneousCore/MPICore/test/testMPICommWorld.sh create mode 100755 HeterogeneousCore/MPICore/test/testMPIInterCommunicator.sh diff --git a/HeterogeneousCore/MPICore/BuildFile.xml b/HeterogeneousCore/MPICore/BuildFile.xml new file mode 100644 index 0000000000000..235463762b1ae --- /dev/null +++ b/HeterogeneousCore/MPICore/BuildFile.xml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/HeterogeneousCore/MPICore/README.md b/HeterogeneousCore/MPICore/README.md new file mode 100644 index 0000000000000..38ab5ebe80343 --- /dev/null +++ b/HeterogeneousCore/MPICore/README.md @@ -0,0 +1,105 @@ +# Extend CMSSW to a fully distributed application + +Let multiple CMSSW processes on the same or different machines coordinate event processing and transfer data products +over MPI. + +The implementation is based on four CMSSW modules. +Two are responsible for setting up the communication channels and coordinate the event processing: + - the `MPIController` + - the `MPISource` + +and two are responsible for the transfer of data products: + - the `MPISender` + - the `MPIReceiver` + +. + +## `MPIController` class + +The `MPIController` is an `EDProducer` running in a regular CMSSW process. After setting up the communication with an +`MPISource`, it transmits to it all EDM run, lumi and event transitions, and instructs the `MPISource` to replicate them +in the second process. + + +## `MPISource` class + +The `MPISource` is a `Source` controlling the execution of a second CMSSW process. After setting up the communication +with an `MPIController`, it listens for EDM run, lumi and event transitions, and replicates them in its own process. + + +## `MPISender` class + +The `MPISender` is an `EDProducer` that can read any number of collections of arbitrary types from the `Event`. +For each event, it first sends a metadata message describing the products to be transferred, including their number and +basic characteristics. + +If `TrivialCopyTraits` are defined for a given product, the data are transferred directly from the product's memory +regions; otherwise, the product is serialised into a single buffer using its ROOT dictionary. The regions and the buffer +are sent to another process over the MPI communication channel. + +The number and types of the collections to be read from the `Event` is determined by the module configuration. The +configuration can speficy a list of module labels, branch names, or a mix of the two: + - a module label selects all collections produced by that module, irrespective of the type and instance; + - a branch name selects only the collections that match all the branch fields (type, label, instance, process name), + similar to an `OutputModule`'s `"keep ..."` statement. + +Wildcards (`?` and `*`) are allowed in a module label or in each field of a branch name. + + +## `MPIReceiver` class + +The `MPIReceiver` is an `EDProducer` that can receive any number of collections of arbitrary types over the MPI +communication channel. It first receives metadata, which is leter used to initialise trivially copyable products and +allocate buffers for serialised products. + +For trivially copyable products, the receiver initialises the target objects using the metadata and performs an +`MPI_Recv` for each memory region. For non-trivially copyable products, it receives the serialised buffer and +deserialises it using the corresponding ROOT dictionary. + +All received products are put into the `Event`. The number, type and label of the collections to be produced is +determined by the module configuration. + +For each collection, the `type` indicates the C++ type as understood by the ROOT dictionary, and the `label` indicates +the module instance label to be used for producing that cllection into the `Event`. + + +## `MPISender` and `MPIReceiver` instances + +Both `MPISender` and `MPIReceiver` are configured with an instance value that is used to match one `MPISender` in one +process to one `MPIReceiver` in another process. Using different instance values allows the use of multiple pairs of +`MPISender`/`MPIReceiver` modules in a process. + + +## MPI communication channel + +The `MPIController` and `MPISource` produce an `MPIToken`, a special data product that encapsulates the information +about the MPI communication channel. + +Both `MPISender` and `MPIReceiver` obtain the MPI communication channel reading an `MPIToken` from the event, identified +by the `upstream` parmeter. +They also produce a copy of the `MPIToken`, so other modules can consume it to declare a dependency on those modules. + + +## Testing + +An automated test is available in the `test/` directory. + + +## Current limitations + + - `MPIController` is a "one" module that supports only a single luminosity block at a time; + - there is only a partial check the number, type and order of collections sent by the `MPISender` matches those + expected by the `MPIReceiver`. + + +## Notes for future developments + + - implement efficient GPU-direct transfers for trivially serialisable products (in progress); + - check the the collection sent by the `MPISender` and the one expected by the `MPIReceiver` match; + - integrate filter decisions and GPU backend into the metadata message + - improve the `MPIController` to be a `global` module rather than a `one` module; + - let an `MPISource` accept connections and events from multiple `MPIController` modules in different jobs; + - let an `MPIController` connect and sent events to multiple `MPISource` modules in different jobs (in progress); + - support multiple concurrent runs and luminosity blocks, up to a given maximum; + - when a run, luminosity block or event is received, check that they belong to the same `ProcessingHistory` as the + ongoing run? diff --git a/HeterogeneousCore/MPICore/interface/MPIToken.h b/HeterogeneousCore/MPICore/interface/MPIToken.h new file mode 100644 index 0000000000000..ec31dd2f6e7f9 --- /dev/null +++ b/HeterogeneousCore/MPICore/interface/MPIToken.h @@ -0,0 +1,26 @@ +#ifndef HeterogeneousCore_MPICore_MPIToken_h +#define HeterogeneousCore_MPICore_MPIToken_h + +// C++ standard library headers +#include + +// forward declaration +class MPIChannel; + +class MPIToken { +public: + // default constructor, needed to write the type's dictionary + MPIToken() = default; + + // user-defined constructor + explicit MPIToken(std::shared_ptr channel) : channel_(channel) {} + + // access the data member + MPIChannel* channel() const { return channel_.get(); } + +private: + // wrap the MPI communicator and destination + std::shared_ptr channel_; +}; + +#endif // HeterogeneousCore_MPICore_MPIToken_h diff --git a/HeterogeneousCore/MPICore/interface/api.h b/HeterogeneousCore/MPICore/interface/api.h new file mode 100644 index 0000000000000..8cfc0d2bb1295 --- /dev/null +++ b/HeterogeneousCore/MPICore/interface/api.h @@ -0,0 +1,182 @@ +#ifndef HeterogeneousCore_MPICore_interface_api_h +#define HeterogeneousCore_MPICore_interface_api_h + +// C++ standard library headers +#include +#include +#include +#include + +// MPI headers +#include + +// ROOT headers +#include + +// CMSSW headers +#include "DataFormats/Common/interface/WrapperBase.h" +#include "DataFormats/Provenance/interface/ProvenanceFwd.h" +#include "FWCore/Reflection/interface/ObjectWithDict.h" +#include "HeterogeneousCore/MPICore/interface/messages.h" +#include "HeterogeneousCore/MPICore/interface/metadata.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/WriterBase.h" + +class MPIChannel { +public: + MPIChannel() = default; + MPIChannel(MPI_Comm comm, int destination) : comm_(comm), dest_(destination) {} + + // build a new MPIChannel that uses a duplicate of the underlying communicator and the same destination + MPIChannel duplicate() const; + + // close the underlying communicator and reset the MPIChannel to an invalid state + void reset(); + + // announce that a client has just connected + void sendConnect() { sendEmpty_(EDM_MPI_Connect); } + + // announce that the client will disconnect + void sendDisconnect() { sendEmpty_(EDM_MPI_Disconnect); } + + // signal the begin of stream + void sendBeginStream() { sendEmpty_(EDM_MPI_BeginStream); } + + // signal the end of stream + void sendEndStream() { sendEmpty_(EDM_MPI_EndStream); } + + // signal a new run, and transmit the RunAuxiliary + void sendBeginRun(edm::RunAuxiliary const& aux) { sendRunAuxiliary_(EDM_MPI_BeginRun, aux); } + + // signal the end of run, and re-transmit the RunAuxiliary + void sendEndRun(edm::RunAuxiliary const& aux) { sendRunAuxiliary_(EDM_MPI_EndRun, aux); } + + // signal a new luminosity block, and transmit the LuminosityBlockAuxiliary + void sendBeginLuminosityBlock(edm::LuminosityBlockAuxiliary const& aux) { + sendLuminosityBlockAuxiliary_(EDM_MPI_BeginLuminosityBlock, aux); + } + + // signal the end of luminosity block, and re-transmit the LuminosityBlockAuxiliary + void sendEndLuminosityBlock(edm::LuminosityBlockAuxiliary const& aux) { + sendLuminosityBlockAuxiliary_(EDM_MPI_EndLuminosityBlock, aux); + } + + // signal a new event, and transmit the EventAuxiliary + void sendEvent(edm::EventAuxiliary const& aux) { sendEventAuxiliary_(aux); } + + /* + // start processing a new event, and receive the EventAuxiliary + MPI_Status receiveEvent(edm::EventAuxiliary& aux, int source) { + return receiveEventAuxiliary_(aux, source, EDM_MPI_ProcessEvent); + } + */ + + // start processing a new event, and receive the EventAuxiliary + MPI_Status receiveEvent(edm::EventAuxiliary& aux, MPI_Message& message) { + return receiveEventAuxiliary_(aux, message); + } + + void sendMetadata(int instance, std::shared_ptr meta); + void receiveMetadata(int instance, std::shared_ptr meta); + + // send buffer of serialized products + void sendBuffer(const void* buf, size_t size, int instance, EDM_MPI_MessageTag tag); + + // serialize an object of type T using its ROOT dictionary, and transmit it + template + void sendProduct(int instance, T const& product) { + if constexpr (std::is_fundamental_v) { + sendTrivialProduct_(instance, product); + } else { + static const TClass* type = TClass::GetClass(); + if (!type) { + throw std::runtime_error("ROOT dictionary not found for type " + std::string(typeid(T).name())); + } + sendSerializedProduct_(instance, type, &product); + } + } + + // receive and object of type T, and deserialize it using its ROOT dictionary + template + void receiveProduct(int instance, T& product) { + if constexpr (std::is_fundamental_v) { + receiveTrivialProduct_(instance, product); + } else { + static const TClass* type = TClass::GetClass(); + if (!type) { + throw std::runtime_error("ROOT dictionary not found for type " + std::string(typeid(T).name())); + } + receiveSerializedProduct_(instance, type, &product); + } + } + + // serialize a generic object using its ROOT dictionary, and send the binary blob + void sendSerializedProduct_(int instance, TClass const* type, void const* product); + + // receive a binary blob, and deserialize an object of generic type using its ROOT dictionary + void receiveSerializedProduct_(int instance, TClass const* type, void* product); + + // receive product buffer of known size + std::unique_ptr receiveSerializedBuffer(int instance, int bufSize); + + // transfer a wrapped object using its MemoryCopyTraits + void sendTrivialCopyProduct(int instance, const ngt::ReaderBase& reader); + + // receive into wrapped object + void receiveInitializedTrivialCopy(int instance, ngt::WriterBase& writer); + +private: + // serialize an EDM object to a simplified representation that can be transmitted as an MPI message + void edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux); + void edmToBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux); + void edmToBuffer_(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux); + + // dwserialize an EDM object from a simplified representation transmitted as an MPI message + void edmFromBuffer_(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux); + void edmFromBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux); + void edmFromBuffer_(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux); + + // fill and send an EDM_MPI_Empty_t buffer + void sendEmpty_(int tag); + + // fill and send an EDM_MPI_RunAuxiliary_t buffer + void sendRunAuxiliary_(int tag, edm::RunAuxiliary const& aux); + + // fill and send an EDM_MPI_LuminosityBlock_t buffer + void sendLuminosityBlockAuxiliary_(int tag, edm::LuminosityBlockAuxiliary const& aux); + + // fill and send an EDM_MPI_EventAuxiliary_t buffer + void sendEventAuxiliary_(edm::EventAuxiliary const& aux); + + // receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary + MPI_Status receiveEventAuxiliary_(edm::EventAuxiliary& aux, int source, int tag); + MPI_Status receiveEventAuxiliary_(edm::EventAuxiliary& aux, MPI_Message& message); + + // this is what is used for sending when product is of raw fundamental type + template + void sendTrivialProduct_(int instance, T const& product) { + int tag = EDM_MPI_SendTrivialProduct | instance * EDM_MPI_MessageTagWidth_; + MPI_Send(&product, sizeof(T), MPI_BYTE, dest_, tag, comm_); + } + + // this is what is used when product is of raw fundamental type + template + void receiveTrivialProduct_(int instance, T& product) { + int tag = EDM_MPI_SendTrivialProduct | instance * EDM_MPI_MessageTagWidth_; + MPI_Message message; + MPI_Status status; + MPI_Mprobe(dest_, tag, comm_, &message, &status); + int size; + MPI_Get_count(&status, MPI_BYTE, &size); + assert(static_cast(sizeof(T)) == size); + MPI_Mrecv(&product, size, MPI_BYTE, &message, MPI_STATUS_IGNORE); + } + + // MPI intercommunicator + MPI_Comm comm_ = MPI_COMM_NULL; + + // MPI destination + int dest_ = MPI_UNDEFINED; +}; + +#endif // HeterogeneousCore_MPICore_interface_api_h diff --git a/HeterogeneousCore/MPICore/interface/conversion.h b/HeterogeneousCore/MPICore/interface/conversion.h new file mode 100644 index 0000000000000..3ac4062b3d845 --- /dev/null +++ b/HeterogeneousCore/MPICore/interface/conversion.h @@ -0,0 +1,26 @@ +#ifndef HeterogeneousCore_MPICore_interface_conversion_h +#define HeterogeneousCore_MPICore_interface_conversion_h + +// CMSSW headers +#include "DataFormats/Provenance/interface/ProvenanceFwd.h" +#include "HeterogeneousCore/MPICore/interface/messages.h" + +// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary_t buffer +void edmFromBuffer(EDM_MPI_RunAuxiliary_t const &, edm::RunAuxiliary &); + +// fill an EDM_MPI_RunAuxiliary_t buffer from an edm::RunAuxiliary object +void edmToBuffer(EDM_MPI_RunAuxiliary_t &, edm::RunAuxiliary const &); + +// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary_t buffer +void edmFromBuffer(EDM_MPI_LuminosityBlockAuxiliary_t const &, edm::LuminosityBlockAuxiliary &); + +// fill an EDM_MPI_LuminosityBlockAuxiliary_t buffer from an edm::LuminosityBlockAuxiliary object +void edmToBuffer(EDM_MPI_LuminosityBlockAuxiliary_t &, edm::LuminosityBlockAuxiliary const &); + +// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary_t buffer +void edmFromBuffer(EDM_MPI_EventAuxiliary_t const &, edm::EventAuxiliary &); + +// fill an EDM_MPI_EventAuxiliary_t buffer from an edm::EventAuxiliary object +void edmToBuffer(EDM_MPI_EventAuxiliary_t &, edm::EventAuxiliary const &); + +#endif // HeterogeneousCore_MPICore_interface_conversion_h diff --git a/HeterogeneousCore/MPICore/interface/messages.h b/HeterogeneousCore/MPICore/interface/messages.h new file mode 100644 index 0000000000000..de07f6ac0db67 --- /dev/null +++ b/HeterogeneousCore/MPICore/interface/messages.h @@ -0,0 +1,119 @@ +#ifndef HeterogeneousCore_MPICore_interface_messages_h +#define HeterogeneousCore_MPICore_interface_messages_h + +// C++ standard library headers +#include + +// MPI headers +#include + +/* register the MPI message types forthe EDM communication + */ +void EDM_MPI_build_types(); + +/* MPI message tags corresponding to EDM transitions + */ +enum EDM_MPI_MessageTag { + EDM_MPI_Connect, + EDM_MPI_Disconnect, + EDM_MPI_BeginStream, + EDM_MPI_EndStream, + EDM_MPI_BeginRun, + EDM_MPI_EndRun, + EDM_MPI_BeginLuminosityBlock, + EDM_MPI_EndLuminosityBlock, + EDM_MPI_ProcessEvent, + EDM_MPI_SendMetadata, + EDM_MPI_SendSerializedProduct, + EDM_MPI_SendTrivialProduct, + EDM_MPI_SendTrivialCopyProduct, + EDM_MPI_MessageTagCount_ +}; + +/* Ensure that the MPI message tags can fit in a single byte + */ +inline constexpr int EDM_MPI_MessageTagWidth_ = 256; +static_assert(EDM_MPI_MessageTagCount_ <= EDM_MPI_MessageTagWidth_); + +extern MPI_Datatype EDM_MPI_MessageType[EDM_MPI_MessageTagCount_]; + +/* Common header for EDM MPI messages, containing + * - the message type (to allow decoding the message further) + */ +struct __attribute__((aligned(8))) EDM_MPI_Header_t { + uint32_t messageTag; // EDM_MPI_MessageTag +}; + +/* Empty EDM MPI message, used when only the header is needed + */ +struct EDM_MPI_Empty_t : public EDM_MPI_Header_t {}; + +// corresponding MPI type +extern MPI_Datatype EDM_MPI_Empty; + +/* Run information stored in edm::RunAuxiliary, + * augmented with the MPI message id + * + * See DataFormats/Provenance/interface/RunAuxiliary.h + */ +struct EDM_MPI_RunAuxiliary_t : public EDM_MPI_Header_t { + // from DataFormats/Provenance/interface/RunAuxiliary.h + char processHistoryID[16]; // edm::ProcessHistoryID::compactForm() + uint64_t beginTime; // edm::TimeValue_t + uint64_t endTime; // edm::TimeValue_t + uint32_t run; // edm::RunNumber_t +}; + +// corresponding MPI type +extern MPI_Datatype EDM_MPI_RunAuxiliary; + +/* LuminosityBlock information stored in edm::LuminosityBlockAuxiliary, + * augmented with the MPI message id + * + * See DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h + */ +struct EDM_MPI_LuminosityBlockAuxiliary_t : public EDM_MPI_Header_t { + // from DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h + char processHistoryID[16]; // edm::ProcessHistoryID::compactForm() + uint64_t beginTime; // edm::TimeValue_t + uint64_t endTime; // edm::TimeValue_t + uint32_t run; // edm::RunNumber_t + uint32_t lumi; // edm::LuminosityBlockNumber_t +}; + +// corresponding MPI type +extern MPI_Datatype EDM_MPI_LuminosityBlockAuxiliary; + +/* Event information stored in edm::EventAuxiliary, + * augmented with the MPI message id + * + * See DataFormats/Provenance/interface/EventAuxiliary.h + */ +struct EDM_MPI_EventAuxiliary_t : public EDM_MPI_Header_t { + // from DataFormats/Provenance/interface/EventAuxiliary.h + char processHistoryID[16]; // edm::ProcessHistoryID::compactForm() + char processGuid[16]; // process GUID + uint64_t time; // edm::TimeValue_t + int32_t realData; // real data (true) vs simulation (false) + int32_t experimentType; // edm::EventAuxiliary::ExperimentType + int32_t bunchCrossing; // LHC bunch crossing + int32_t orbitNumber; // LHC orbit number + int32_t storeNumber; // LHC fill number ? + uint32_t run; // edm::RunNumber_t + uint32_t lumi; // edm::LuminosityBlockNumber_t + uint32_t event; // edm::EventNumber_t +}; + +// corresponding MPI type +extern MPI_Datatype EDM_MPI_EventAuxiliary; + +// union of all possible messages +union EDM_MPI_Any_t { + EDM_MPI_Header_t header; + EDM_MPI_Empty_t empty; + EDM_MPI_RunAuxiliary_t runAuxiliary; + EDM_MPI_LuminosityBlockAuxiliary_t luminosityBlockAuxiliary; + EDM_MPI_EventAuxiliary_t eventAuxiliary; +}; + +#endif // HeterogeneousCore_MPICore_interface_messages_h diff --git a/HeterogeneousCore/MPICore/interface/metadata.h b/HeterogeneousCore/MPICore/interface/metadata.h new file mode 100644 index 0000000000000..f131fd1ce5c14 --- /dev/null +++ b/HeterogeneousCore/MPICore/interface/metadata.h @@ -0,0 +1,114 @@ +#ifndef HeterogeneousCore_MPICore_interface_metadata_h +#define HeterogeneousCore_MPICore_interface_metadata_h + +// C++ standard library headers +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// MPI headers +#include + +enum ProductFlags : uint8_t { + HasMissing = 1 << 0, + HasSerialized = 1 << 1, + HasTrivialCopy = 1 << 2, +}; + +struct ProductMetadata { + enum class Kind : uint8_t { Missing = 0, Serialized = 1, TrivialCopy = 2 }; + + Kind kind; + size_t sizeMeta = 0; + const uint8_t* trivialCopyOffset = nullptr; // Only valid if kind == TrivialCopy +}; + +class ProductMetadataBuilder { +public: + ProductMetadataBuilder(); + explicit ProductMetadataBuilder(size_t productCount); + ~ProductMetadataBuilder(); + + // No copy + ProductMetadataBuilder(const ProductMetadataBuilder&) = delete; + ProductMetadataBuilder& operator=(const ProductMetadataBuilder&) = delete; + + // Move + ProductMetadataBuilder(ProductMetadataBuilder&& other) noexcept; + ProductMetadataBuilder& operator=(ProductMetadataBuilder&& other) noexcept; + + // Sender-side: pre-allocate + void reserve(size_t bytes); + + // set or reset number of products. will fail if not set called before sending + void setProductCount(size_t prod_num) { productCount_ = prod_num; } + void setHeader(); + + // Sender API + void addMissing(); + void addSerialized(size_t size); + void addTrivialCopy(const std::byte* buffer, size_t size); + + const uint8_t* data() const; + uint8_t* data(); + size_t size() const; + std::span buffer() const; + + // Receiver-side + void receiveMetadata(int src, int tag, MPI_Comm comm); + + // Not memory safe for trivial copy products. + // Please make sure that ProductMetadataBuilder lives longer than returned ProductMetadata + ProductMetadata getNext(); + + int64_t productCount() const { return productCount_; } + int64_t serializedBufferSize() const { return serializedBufferSize_; } + bool hasMissing() const { return productFlags_ & HasMissing; } + bool hasSerialized() const { return productFlags_ & HasSerialized; } + bool hasTrivialCopy() const { return productFlags_ & HasTrivialCopy; } + + void debugPrintMetadataSummary() const; + +private: + uint8_t* buffer_; + size_t capacity_; + size_t size_; + size_t readOffset_; + const size_t headerSize_ = 13; // header is always present in the metadata object do describe products in general + const size_t maxMetadataSize_ = 1024; // default size for buffer initialization. must fit any metadata + int serializedBufferSize_ = 0; + uint8_t productFlags_ = 0; + int64_t productCount_ = 0; + + void resizeBuffer(size_t newCap); + void ensureCapacity(size_t needed); + + void appendBytes(const std::byte* src, size_t size); + + template + void append(T value) { + static_assert(std::is_trivially_copyable_v); + ensureCapacity(sizeof(T)); + std::memcpy(buffer_ + size_, &value, sizeof(T)); + size_ += sizeof(T); + } + + template + T consume() { + static_assert(std::is_trivially_copyable_v); + if (readOffset_ + sizeof(T) > size_) + throw std::runtime_error("Buffer underflow"); + T val; + std::memcpy(&val, buffer_ + readOffset_, sizeof(T)); + readOffset_ += sizeof(T); + return val; + } +}; + +#endif // HeterogeneousCore_MPICore_interface_metadata_h diff --git a/HeterogeneousCore/MPICore/plugins/BuildFile.xml b/HeterogeneousCore/MPICore/plugins/BuildFile.xml new file mode 100644 index 0000000000000..0b56675f5b4d6 --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/BuildFile.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + diff --git a/HeterogeneousCore/MPICore/plugins/MPIController.cc b/HeterogeneousCore/MPICore/plugins/MPIController.cc new file mode 100644 index 0000000000000..f62faaa81b287 --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPIController.cc @@ -0,0 +1,293 @@ +// C++ headers +#include +#include +#include + +// MPI headers +#include + +// ROOT headers +#include +#include + +// CMSSW headers +#include "DataFormats/Provenance/interface/BranchKey.h" +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/LuminosityBlock.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/Framework/interface/Run.h" +#include "FWCore/Framework/interface/one/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/EmptyGroupDescription.h" +#include "FWCore/ParameterSet/interface/ParameterDescriptionNode.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Reflection/interface/ObjectWithDict.h" +#include "FWCore/Reflection/interface/TypeWithDict.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/Utilities/interface/Guid.h" +#include "HeterogeneousCore/MPICore/interface/api.h" +#include "HeterogeneousCore/MPICore/interface/messages.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" +#include "HeterogeneousCore/MPIServices/interface/MPIService.h" + +/* MPIController class + * + * This module runs inside a CMSSW job (the "controller") and connects to an "MPISource" in a separate CMSSW job (the "follower"). + * The follower is informed of all transitions seen by the controller, and can replicate them in its own process. + * + * Current limitations: + * - support a single "follower" + * + * Future work: + * - support multiple "followers" + */ + +class MPIController : public edm::one::EDProducer { +public: + explicit MPIController(edm::ParameterSet const& config); + ~MPIController() override; + + void beginJob() override; + void endJob() override; + + void beginRun(edm::Run const& run, edm::EventSetup const& setup) override; + void endRun(edm::Run const& run, edm::EventSetup const& setup) override; + + void beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) override; + void endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) override; + + void produce(edm::Event& event, edm::EventSetup const& setup) override; + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); + +private: + enum Mode { kInvalid = 0, kCommWorld, kIntercommunicator }; + static constexpr const char* ModeDescription[] = {"Invalid", "CommWorld", "Intercommunicator"}; + Mode parseMode(std::string const& label) { + if (label == ModeDescription[kCommWorld]) + return kCommWorld; + else if (label == ModeDescription[kIntercommunicator]) + return kIntercommunicator; + else + return kInvalid; + } + + MPI_Comm comm_ = MPI_COMM_NULL; + MPIChannel channel_; + edm::EDPutTokenT token_; + Mode mode_; +}; + +MPIController::MPIController(edm::ParameterSet const& config) + : token_(produces()), + mode_(parseMode(config.getUntrackedParameter("mode"))) // +{ + // make sure that MPI is initialised + MPIService::required(); + + // make sure the EDM MPI types are available + EDM_MPI_build_types(); + + if (mode_ == kCommWorld) { + // All processes are in MPI_COMM_WORLD. + // The current implementation supports only two processes: one controller and one source. + edm::LogAbsolute("MPI") << "MPIController in " << ModeDescription[mode_] << " mode."; + + // Check how many processes are there in MPI_COMM_WORLD + int size; + MPI_Comm_size(MPI_COMM_WORLD, &size); + if (size != 2) { + throw edm::Exception(edm::errors::Configuration) + << "The current implementation supports only two processes: one controller and one source."; + } + + // Check the rank of this process, and determine the rank of the other process. + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + edm::LogAbsolute("MPI") << "MPIController has rank " << rank << " in MPI_COMM_WORLD."; + int other_rank = 1 - rank; + comm_ = MPI_COMM_WORLD; + channel_ = MPIChannel(comm_, other_rank); + } else if (mode_ == kIntercommunicator) { + // Use an intercommunicator to let two groups of processes communicate with each other. + // The current implementation supports only two processes: one controller and one source. + edm::LogAbsolute("MPI") << "MPISource in " << ModeDescription[mode_] << " mode."; + + // Check how many processes are there in MPI_COMM_WORLD + int size; + MPI_Comm_size(MPI_COMM_WORLD, &size); + if (size != 1) { + throw edm::Exception(edm::errors::Configuration) + << "The current implementation supports only two processes: one controller and one source."; + } + + // Look for the port under the name indicated by the parameter "server". + std::string name = config.getUntrackedParameter("name", "server"); + char port[MPI_MAX_PORT_NAME]; + MPI_Lookup_name(name.c_str(), MPI_INFO_NULL, port); + edm::LogAbsolute("MPI") << "Trying to connect to the MPI server on port " << port; + + // Create an intercommunicator and connect to the server. + MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_); + MPI_Comm_remote_size(comm_, &size); + if (size != 1) { + throw edm::Exception(edm::errors::Configuration) + << "The current implementation supports only two processes: one controller and one source."; + } + edm::LogAbsolute("MPI") << "Client connected to " << size << (size == 1 ? " server" : " servers"); + channel_ = MPIChannel(comm_, 0); + } else { + // Invalid mode. + throw edm::Exception(edm::errors::Configuration) + << "Invalid mode \"" << config.getUntrackedParameter("mode") << "\""; + } +} + +MPIController::~MPIController() { + // Close the intercommunicator. + if (mode_ == kIntercommunicator) { + MPI_Comm_disconnect(&comm_); + } +} + +void MPIController::beginJob() { + // signal the connection + channel_.sendConnect(); + + /* is there a way to access all known process histories ? + edm::ProcessHistoryRegistry const& registry = * edm::ProcessHistoryRegistry::instance(); + edm::LogAbsolute("MPI") << "ProcessHistoryRegistry:"; + for (auto const& keyval: registry) { + edm::LogAbsolute("MPI") << keyval.first << ": " << keyval.second; + } + */ +} + +void MPIController::endJob() { + // signal the disconnection + channel_.sendDisconnect(); +} + +void MPIController::beginRun(edm::Run const& run, edm::EventSetup const& setup) { + // signal a new run, and transmit the RunAuxiliary + /* FIXME + * Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and + * we could simply do + + channel_.sendBeginRun(run.runAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the + * _parent_ process. + * So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and + * transmit the modified RunAuxiliary. + */ + auto aux = run.runAuxiliary(); + aux.setProcessHistoryID(run.processHistory().id()); + channel_.sendBeginRun(aux); + + // transmit the ProcessHistory + channel_.sendProduct(0, run.processHistory()); +} + +void MPIController::endRun(edm::Run const& run, edm::EventSetup const& setup) { + // signal the end of run + /* FIXME + * Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and + * we could simply do + + channel_.sendEndRun(run.runAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the + * _parent_ process. + * So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and + * transmit the modified RunAuxiliary. + */ + auto aux = run.runAuxiliary(); + aux.setProcessHistoryID(run.processHistory().id()); + channel_.sendEndRun(aux); +} + +void MPIController::beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) { + // signal a new luminosity block, and transmit the LuminosityBlockAuxiliary + /* FIXME + * Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the + * correct one, and we could simply do + + channel_.sendBeginLuminosityBlock(lumi.luminosityBlockAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is + * that of the _parent_ process. + * So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct + * value, and transmit the modified LuminosityBlockAuxiliary. + */ + auto aux = lumi.luminosityBlockAuxiliary(); + aux.setProcessHistoryID(lumi.processHistory().id()); + channel_.sendBeginLuminosityBlock(aux); +} + +void MPIController::endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) { + // signal the end of luminosity block + /* FIXME + * Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the + * correct one, and we could simply do + + channel_.sendEndLuminosityBlock(lumi.luminosityBlockAuxiliary()); + + * Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is + * that of the _parent_ process. + * So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct + * value, and transmit the modified LuminosityBlockAuxiliary. + */ + auto aux = lumi.luminosityBlockAuxiliary(); + aux.setProcessHistoryID(lumi.processHistory().id()); + channel_.sendEndLuminosityBlock(aux); +} + +void MPIController::produce(edm::Event& event, edm::EventSetup const& setup) { + { + edm::LogInfo log("MPI"); + log << "processing run " << event.run() << ", lumi " << event.luminosityBlock() << ", event " << event.id().event(); + log << "\nprocess history: " << event.processHistory(); + log << "\nprocess history id: " << event.processHistory().id(); + log << "\nprocess history id: " << event.eventAuxiliary().processHistoryID() << " (from eventAuxiliary)"; + log << "\nisRealData " << event.eventAuxiliary().isRealData(); + log << "\nexperimentType " << event.eventAuxiliary().experimentType(); + log << "\nbunchCrossing " << event.eventAuxiliary().bunchCrossing(); + log << "\norbitNumber " << event.eventAuxiliary().orbitNumber(); + log << "\nstoreNumber " << event.eventAuxiliary().storeNumber(); + log << "\nprocessHistoryID " << event.eventAuxiliary().processHistoryID(); + log << "\nprocessGUID " << edm::Guid(event.eventAuxiliary().processGUID(), true).toString(); + } + + // signal a new event, and transmit the EventAuxiliary + channel_.sendEvent(event.eventAuxiliary()); + + // duplicate the MPIChannel and put the copy into the Event + std::shared_ptr link(new MPIChannel(channel_.duplicate()), [](MPIChannel* ptr) { + ptr->reset(); + delete ptr; + }); + event.emplace(token_, std::move(link)); +} + +void MPIController::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + descriptions.setComment( + "This module connects to an \"MPISource\" in a separate CMSSW job, and transmits all Run, LuminosityBlock and " + "Event transitions from the current process to the remote one."); + + edm::ParameterSetDescription desc; + desc.ifValue( + edm::ParameterDescription("mode", "CommWorld", false), + ModeDescription[kCommWorld] >> edm::EmptyGroupDescription() or + ModeDescription[kIntercommunicator] >> edm::ParameterDescription("name", "server", false)) + ->setComment( + "Valid modes are CommWorld (use MPI_COMM_WORLD) and Intercommunicator (use an MPI name server to setup an " + "intercommunicator)."); + + descriptions.addWithDefaultLabel(desc); +} + +DEFINE_FWK_MODULE(MPIController); diff --git a/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc new file mode 100644 index 0000000000000..b6b36baffc067 --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc @@ -0,0 +1,196 @@ +// C++ include files +#include +#include +#include +#include + +// ROOT headers +#include +#include + +// CMSSW include files +#include "FWCore/Concurrency/interface/Async.h" +#include "FWCore/Concurrency/interface/chain_first.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/Framework/interface/WrapperBaseOrphanHandle.h" +#include "FWCore/Framework/interface/global/EDProducer.h" +#include "FWCore/Framework/interface/stream/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/ServiceRegistry/interface/ServiceMaker.h" +#include "FWCore/Utilities/interface/Exception.h" +#include "HeterogeneousCore/MPICore/interface/api.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/AnyBuffer.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserFactory.h" + +class MPIReceiver : public edm::stream::EDProducer { +public: + MPIReceiver(edm::ParameterSet const& config) + : upstream_(consumes(config.getParameter("upstream"))), + token_(produces()), + instance_(config.getParameter("instance")) // + { + // instance 0 is reserved for the MPIController / MPISource pair + // instance values greater than 255 may not fit in the MPI tag + if (instance_ < 1 or instance_ > 255) { + throw cms::Exception("InvalidValue") + << "Invalid MPIReceiver instance value, please use a value between 1 and 255"; + } + + auto const& products = config.getParameter>("products"); + products_.reserve(products.size()); + for (auto const& product : products) { + auto const& type = product.getParameter("type"); + auto const& label = product.getParameter("label"); + Entry entry; + entry.type = edm::TypeWithDict::byName(type); + entry.wrappedType = edm::TypeWithDict::byName("edm::Wrapper<" + type + ">"); + entry.token = produces(edm::TypeID{entry.type.typeInfo()}, label); + + LogTrace("MPIReceiver") << "receive type \"" << entry.type.name() << "\" for label \"" << label + << "\" over MPI channel instance " << this->instance_; + + products_.emplace_back(std::move(entry)); + } + } + + void acquire(edm::Event const& event, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final { + const MPIToken& token = event.get(upstream_); + + //also try unique or optional + received_meta_ = std::make_shared(); + + edm::Service as; + as->runAsync( + std::move(holder), + [this, token]() { token.channel()->receiveMetadata(instance_, received_meta_); }, + []() { return "Calling MPIReceiver::acquire()"; }); + } + + void produce(edm::Event& event, edm::EventSetup const&) final { + // read the MPIToken used to establish the communication channel + MPIToken token = event.get(upstream_); +#ifdef EDM_ML_DEBUG + // dump the summary of metadata + received_meta_->debugPrintMetadataSummary(); +#endif + + // if filter was false before the sender, receive nothing + if (received_meta_->productCount() == -1) { + event.emplace(token_, token); + return; + } + + int buffer_offset = 0; + std::unique_ptr serialized_buffer; + if (received_meta_->hasSerialized()) { + serialized_buffer = token.channel()->receiveSerializedBuffer(instance_, received_meta_->serializedBufferSize()); +#ifdef EDM_ML_DEBUG + { + edm::LogSystem msg("MPISender"); + msg << "Received serialised product:\n"; + for (int i = 0; i < serialized_buffer->Length(); ++i) { + msg << "0x" << std::hex << std::setw(2) << std::setfill('0') + << (unsigned int)(unsigned char)serialized_buffer->Buffer()[i] << (i % 16 == 15 ? '\n' : ' '); + } + } +#endif + } + + for (auto const& entry : products_) { + auto product_meta = received_meta_->getNext(); + if (product_meta.kind == ProductMetadata::Kind::Missing) { + edm::LogWarning("MPIReceiver") << "Product " << entry.type.name() << " was not received."; + continue; // Skip products that weren't sent + } + + else if (product_meta.kind == ProductMetadata::Kind::Serialized) { + std::unique_ptr wrapper( + reinterpret_cast(entry.wrappedType.getClass()->New())); + auto productBuffer = TBufferFile(TBuffer::kRead, product_meta.sizeMeta); + // assert(!wrapper->hasTrivialCopyTraits() && "mismatch between expected and actual metadata type"); + assert(buffer_offset < serialized_buffer->BufferSize() && "serialized data buffer is shorter than expected"); + productBuffer.SetBuffer(serialized_buffer->Buffer() + buffer_offset, product_meta.sizeMeta, false); + buffer_offset += product_meta.sizeMeta; + entry.wrappedType.getClass()->Streamer(wrapper.get(), productBuffer); + // put the data into the Event + event.put(entry.token, std::move(wrapper)); + } + + else if (product_meta.kind == ProductMetadata::Kind::TrivialCopy) { + // assert(wrapper->hasTrivialCopyTraits() && "mismatch between expected and actual metadata type"); + std::unique_ptr serialiser = + ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name()); + if (not serialiser) { + throw cms::Exception("SerializerError") << "Receiver could not retrieve its serializer when it was expected"; + } + auto writer = serialiser->writer(); + ngt::AnyBuffer buffer = writer->uninitialized_parameters(); // constructs buffer with typeid + assert(buffer.size_bytes() == product_meta.sizeMeta); + std::memcpy(buffer.data(), product_meta.trivialCopyOffset, product_meta.sizeMeta); + // why both of these methods are called initialize? I find this rather confusing + writer->initialize(buffer); + token.channel()->receiveInitializedTrivialCopy(instance_, *writer); + writer->finalize(); + // put the data into the Event + event.put(entry.token, writer->get()); + } + } + + if (received_meta_->hasSerialized()) { + assert(buffer_offset == received_meta_->serializedBufferSize() && + "serialized data buffer is not equal to the expected length"); + } + + // write a shallow copy of the channel to the output, so other modules can consume it + // to indicate that they should run after this + event.emplace(token_, token); + } + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + descriptions.setComment( + "This module can receive arbitrary event products from an \"MPISender\" module in a separate CMSSW job, and " + "produce them into the event."); + + edm::ParameterSetDescription product; + product.add("type")->setComment("C++ type of the product to be received."); + product.add("label", "")->setComment("Product instance label to be associated to the product."); + + edm::ParameterSetDescription desc; + desc.add("upstream", {"source"}) + ->setComment( + "MPI communication channel. Can be an \"MPIController\", \"MPISource\", \"MPISender\" or \"MPIReceiver\". " + "Passing an \"MPIController\" or \"MPISource\" only identifies the pair of local and remote application " + "that communicate. Passing an \"MPISender\" or \"MPIReceiver\" in addition in addition imposes a " + "scheduling dependency."); + desc.addVPSet("products", product, {}) + ->setComment("Products to be received by a separate CMSSW job and produced into the event."); + desc.add("instance", 0) + ->setComment("A value between 1 and 255 used to identify a matching pair of \"MPISender\"/\"MPIRecevier\"."); + + descriptions.addWithDefaultLabel(desc); + } + +private: + struct Entry { + edm::TypeWithDict type; + edm::TypeWithDict wrappedType; + edm::EDPutToken token; + }; + + edm::EDGetTokenT const upstream_; // MPIToken used to establish the communication channel + edm::EDPutTokenT const token_; // copy of the MPIToken that may be used to implement an ordering relation + std::vector products_; // data to be read over the channel and put into the Event + int32_t const instance_; // instance used to identify the source-destination pair + + std::shared_ptr received_meta_; +}; + +#include "FWCore/Framework/interface/MakerMacros.h" +DEFINE_FWK_MODULE(MPIReceiver); diff --git a/HeterogeneousCore/MPICore/plugins/MPIReporter.cc b/HeterogeneousCore/MPICore/plugins/MPIReporter.cc new file mode 100644 index 0000000000000..49ec608e0a0f5 --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPIReporter.cc @@ -0,0 +1,59 @@ +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/stream/EDAnalyzer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/Guid.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" + +/* MPIReporter class + * + */ + +class MPIReporter : public edm::stream::EDAnalyzer<> { +public: + explicit MPIReporter(edm::ParameterSet const& config); + ~MPIReporter() override = default; + + void analyze(edm::Event const& event, edm::EventSetup const& setup) override; + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); + +private: + edm::EDGetTokenT token_; +}; + +MPIReporter::MPIReporter(edm::ParameterSet const& config) : token_(consumes(edm::InputTag("source"))) {} + +void MPIReporter::analyze(edm::Event const& event, edm::EventSetup const& setup) { + { + edm::LogAbsolute log("MPI"); + log << "stream " << event.streamID() << ": processing run " << event.run() << ", lumi " << event.luminosityBlock() + << ", event " << event.id().event(); + log << "\nprocess history: " << event.processHistory(); + log << "\nprocess history id: " << event.processHistory().id(); + log << "\nprocess history id: " << event.eventAuxiliary().processHistoryID() << " (from eventAuxiliary)"; + log << "\nisRealData " << event.eventAuxiliary().isRealData(); + log << "\nexperimentType " << event.eventAuxiliary().experimentType(); + log << "\nbunchCrossing " << event.eventAuxiliary().bunchCrossing(); + log << "\norbitNumber " << event.eventAuxiliary().orbitNumber(); + log << "\nstoreNumber " << event.eventAuxiliary().storeNumber(); + log << "\nprocessHistoryID " << event.eventAuxiliary().processHistoryID(); + log << "\nprocessGUID " << edm::Guid(event.eventAuxiliary().processGUID(), true).toString(); + } + + auto const& token = event.get(token_); + { + edm::LogAbsolute log("MPI"); + log << "got the MPIToken opaque wrapper around the MPIChannel at " << &token; + } +} + +void MPIReporter::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + descriptions.addWithDefaultLabel(desc); +} + +#include "FWCore/Framework/interface/MakerMacros.h" +DEFINE_FWK_MODULE(MPIReporter); diff --git a/HeterogeneousCore/MPICore/plugins/MPISender.cc b/HeterogeneousCore/MPICore/plugins/MPISender.cc new file mode 100644 index 0000000000000..8e87a9ae7c77b --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPISender.cc @@ -0,0 +1,263 @@ +// C++ include files +#include +#include +#include +#include +#include +#include +#include + +// ROOT headers +#include +#include + +// CMSSW include files +#include "DataFormats/Provenance/interface/ProductDescription.h" +#include "DataFormats/Provenance/interface/ProductNamePattern.h" +#include "FWCore/Concurrency/interface/Async.h" +#include "FWCore/Concurrency/interface/chain_first.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/GenericHandle.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/Framework/interface/WrapperBaseHandle.h" +#include "FWCore/Framework/interface/global/EDProducer.h" +#include "FWCore/Framework/interface/stream/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Reflection/interface/TypeWithDict.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/ServiceRegistry/interface/ServiceMaker.h" +#include "FWCore/Utilities/interface/Exception.h" +#include "HeterogeneousCore/MPICore/interface/api.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/AnyBuffer.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserFactory.h" + +class MPISender : public edm::stream::EDProducer { +public: + MPISender(edm::ParameterSet const& config) + : upstream_(consumes(config.getParameter("upstream"))), + token_(produces()), + patterns_(edm::productPatterns(config.getParameter>("products"))), + instance_(config.getParameter("instance")), + buffer_(std::make_unique(TBuffer::kWrite)), + buffer_offset_(0), + metadata_size_(0) { + // instance 0 is reserved for the MPIController / MPISource pair + // instance values greater than 255 may not fit in the MPI tag + if (instance_ < 1 or instance_ > 255) { + throw cms::Exception("InvalidValue") << "Invalid MPISender instance value, please use a value between 1 and 255"; + } + + products_.resize(patterns_.size()); + + callWhenNewProductsRegistered([this](edm::ProductDescription const& product) { + static const std::string_view kPathStatus("edm::PathStatus"); + static const std::string_view kEndPathStatus("edm::EndPathStatus"); + + // std::cout << "MPISender: considering product " << product.friendlyClassName() << '_' + // << product.moduleLabel() << '_' << product.productInstanceName() << '_' << product.processName() + // << " of type " << product.unwrappedType().name() << " branch type " << product.branchType() << "\n"; + + switch (product.branchType()) { + case edm::InEvent: + if (product.className() == kPathStatus or product.className() == kEndPathStatus) + return; + for (size_t pattern_index = 0; pattern_index < patterns_.size(); pattern_index++) { + if (patterns_[pattern_index].match(product)) { + Entry entry; + entry.type = product.unwrappedType(); + entry.wrappedType = product.wrappedType(); + // TODO move this to EDConsumerBase::consumes() ? + entry.token = this->consumes( + edm::TypeToGet{product.unwrappedTypeID(), edm::PRODUCT_TYPE}, + edm::InputTag{product.moduleLabel(), product.productInstanceName(), product.processName()}); + + LogDebug("MPISender") << "send product \"" << product.friendlyClassName() << '_' << product.moduleLabel() + << '_' << product.productInstanceName() << '_' << product.processName() + << "\" of type \"" << entry.type.name() << "\" over MPI channel instance " + << instance_; + + products_[pattern_index] = std::move(entry); + break; + } + } + break; + + case edm::InLumi: + case edm::InRun: + case edm::InProcess: + // lumi, run and process products are not supported + break; + + default: + throw edm::Exception(edm::errors::LogicError) + << "Unexpected branch type " << product.branchType() << "\nPlease contact a Framework developer\n"; + } + }); + + // TODO add an error if a pattern does not match any branches? how? + } + + void acquire(edm::Event const& event, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final { + const MPIToken& token = event.get(upstream_); + // we need 1 byte for type, 8 bytes for size and at least 8 bytes for trivial copy parameters buffer + auto meta = std::make_shared(products_.size() * 24); + size_t index = 0; + // this seems to work fine, but does this vector indeed persist between acquire() and produce()? + // serializedBuffers_.clear(); + buffer_->Reset(); + buffer_offset_ = 0; + meta->setProductCount(products_.size()); + has_serialized_ = false; + is_active_ = true; + + // estimate buffer size in the constructor + + for (auto const& entry : products_) { + // Get the product + edm::Handle handle(entry.type.typeInfo()); + event.getByToken(entry.token, handle); + + // product count -1 indicates that the event was filtered out on given path + if (not handle.isValid() and entry.type.name() == "edm::PathStateToken") { + meta->setProductCount(-1); + is_active_ = false; + break; + } + + if (handle.isValid()) { + edm::WrapperBase const* wrapper = handle.product(); + std::unique_ptr serialiser = + ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name()); + + if (serialiser) { + LogDebug("MPISender") << "Found serializer for type \"" << entry.type.name() << "\" (" + << entry.type.typeInfo().name() << ")"; + auto reader = serialiser->reader(*wrapper); + ngt::AnyBuffer buffer = reader->parameters(); + meta->addTrivialCopy(buffer.data(), buffer.size_bytes()); + } else { + LogDebug("MPISender") << "No serializer for type \"" << entry.type.name() << "\" (" + << entry.type.typeInfo().name() << "), using ROOT serialization"; + TClass* cls = entry.wrappedType.getClass(); + if (!cls) { + throw cms::Exception("MPISender") << "Failed to get TClass for type: " << entry.type.name(); + } + size_t bufLen = serializeAndStoreBuffer_(index, cls, wrapper); + meta->addSerialized(bufLen); + has_serialized_ = true; + } + + } else { + // handle missing product + meta->addMissing(); + } + index++; + } + + // Submit sending of all products to run in the additional asynchronous threadpool + edm::Service as; + as->runAsync( + std::move(holder), + [this, token, meta = std::move(meta)]() { token.channel()->sendMetadata(instance_, meta); }, + []() { return "Calling MPISender::acquire()"; }); + } + + void produce(edm::Event& event, edm::EventSetup const&) final { + MPIToken token = event.get(upstream_); + + if (!is_active_) { + event.emplace(token_, token); + return; + } + + if (has_serialized_) { +#ifdef EDM_ML_DEBUG + { + edm::LogSystem msg("MPISender"); + msg << "Sending serialised product:\n"; + for (int i = 0; i < buffer_->Length(); ++i) { + msg << "0x" << std::hex << std::setw(2) << std::setfill('0') + << (unsigned int)(unsigned char)buffer_->Buffer()[i] << (i % 16 == 15 ? '\n' : ' '); + } + } +#endif + token.channel()->sendBuffer(buffer_->Buffer(), buffer_->Length(), instance_, EDM_MPI_SendSerializedProduct); + } + + for (auto const& entry : products_) { + edm::Handle handle(entry.type.typeInfo()); + event.getByToken(entry.token, handle); + edm::WrapperBase const* wrapper = handle.product(); + // we don't send missing products + if (handle.isValid()) { + std::unique_ptr serialiser = + ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name()); + if (serialiser) { + auto reader = serialiser->reader(*wrapper); + token.channel()->sendTrivialCopyProduct(instance_, *reader); + } + } + } + // write a shallow copy of the channel to the output, so other modules can consume it + // to indicate that they should run after this + event.emplace(token_, token); + } + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + descriptions.setComment( + "This module can consume arbitrary event products and copy them to an \"MPIReceiver\" module in a separate " + "CMSSW job."); + + edm::ParameterSetDescription desc; + desc.add("upstream", {"source"}) + ->setComment( + "MPI communication channel. Can be an \"MPIController\", \"MPISource\", \"MPISender\" or \"MPIReceiver\". " + "Passing an \"MPIController\" or \"MPISource\" only identifies the pair of local and remote application " + "that communicate. Passing an \"MPISender\" or \"MPIReceiver\" in addition in addition imposes a " + "scheduling dependency."); + desc.add>("products", {}) + ->setComment( + "Event products to be consumed and copied over to a separate CMSSW job. Can be a list of module labels, " + "branch names (similar to an OutputModule's \"keep ...\" statement), or a mix of the two. Wildcards (\"?\" " + "and \"*\") are allowed in a module label or in each field of a branch name."); + desc.add("instance", 0) + ->setComment("A value between 1 and 255 used to identify a matching pair of \"MPISender\"/\"MPIRecevier\"."); + + descriptions.addWithDefaultLabel(desc); + } + +private: + size_t serializeAndStoreBuffer_(size_t index, TClass* type, void const* product) { + buffer_->ResetMap(); + type->Streamer(const_cast(product), *buffer_); + size_t prod_size = buffer_->Length() - buffer_offset_; + buffer_offset_ = buffer_->Length(); + return prod_size; + } + + struct Entry { + edm::TypeWithDict type; + edm::TypeWithDict wrappedType; + edm::EDGetToken token; + }; + + // TODO consider if upstream_ should be a vector instead of a single token ? + edm::EDGetTokenT const upstream_; // MPIToken used to establish the communication channel + edm::EDPutTokenT const token_; // copy of the MPIToken that may be used to implement an ordering relation + std::vector patterns_; // branches to read from the Event and send over the MPI channel + std::vector products_; // types and tokens corresponding to the branches + int32_t const instance_; // instance used to identify the source-destination pair + std::unique_ptr buffer_; + size_t buffer_offset_; + size_t metadata_size_; + bool has_serialized_ = false; + bool is_active_ = true; +}; + +#include "FWCore/Framework/interface/MakerMacros.h" +DEFINE_FWK_MODULE(MPISender); diff --git a/HeterogeneousCore/MPICore/plugins/MPISource.cc b/HeterogeneousCore/MPICore/plugins/MPISource.cc new file mode 100644 index 0000000000000..cf44ca57a3a3c --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/MPISource.cc @@ -0,0 +1,332 @@ +// C++ headers +#include +#include +#include + +// MPI headers +#include + +// ROOT headers +#include +#include +#include + +// CMSSW headers +#include "DataFormats/Provenance/interface/BranchListIndex.h" +#include "DataFormats/Provenance/interface/EventAuxiliary.h" +#include "DataFormats/Provenance/interface/EventSelectionID.h" +#include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h" +#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h" +#include "DataFormats/Provenance/interface/ProcessHistory.h" +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" +#include "DataFormats/Provenance/interface/RunAuxiliary.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/EventPrincipal.h" +#include "FWCore/Framework/interface/InputSource.h" +#include "FWCore/Framework/interface/InputSourceDescription.h" +#include "FWCore/Framework/interface/InputSourceMacros.h" +#include "FWCore/Framework/interface/ProductProvenanceRetriever.h" +#include "FWCore/MessageLogger/interface/ErrorObj.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/EmptyGroupDescription.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescriptionFiller.h" +#include "FWCore/Sources/interface/ProducerSourceBase.h" +#include "FWCore/Utilities/interface/EDMException.h" +#include "HeterogeneousCore/MPICore/interface/api.h" +#include "HeterogeneousCore/MPICore/interface/conversion.h" +#include "HeterogeneousCore/MPICore/interface/messages.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" +#include "HeterogeneousCore/MPIServices/interface/MPIService.h" + +class MPISource : public edm::ProducerSourceBase { +public: + explicit MPISource(edm::ParameterSet const& config, edm::InputSourceDescription const& desc); + ~MPISource() override; + using InputSource::processHistoryRegistryForUpdate; + using InputSource::productRegistryUpdate; + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); + +private: + bool setRunAndEventInfo(edm::EventID& id, edm::TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override; + void produce(edm::Event&) override; + + enum Mode { kInvalid = 0, kCommWorld, kIntercommunicator }; + static constexpr const char* ModeDescription[] = {"Invalid", "CommWorld", "Intercommunicator"}; + Mode parseMode(std::string const& label) { + if (label == ModeDescription[kCommWorld]) + return kCommWorld; + else if (label == ModeDescription[kIntercommunicator]) + return kIntercommunicator; + else + return kInvalid; + } + + char port_[MPI_MAX_PORT_NAME]; + MPI_Comm comm_ = MPI_COMM_NULL; + MPIChannel channel_; + edm::EDPutTokenT token_; + Mode mode_; + + edm::ProcessHistory history_; +}; + +MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescription const& desc) + : // note that almost all configuration parameters passed to IDGeneratorSourceBase via ProducerSourceBase will + // effectively be ignored, because this ConfigurableSource will explicitly set the run, lumi, and event + // numbers, the timestamp, and the event type + edm::ProducerSourceBase(config, desc, false), + token_(produces()), + mode_(parseMode(config.getUntrackedParameter("mode"))) // +{ + // make sure that MPI is initialised + + MPIService::required(); + + // Make sure the EDM MPI types are available. + EDM_MPI_build_types(); + + if (mode_ == kCommWorld) { + // All processes are in MPI_COMM_WORLD. + // The current implementation supports only two processes: one controller and one source. + edm::LogAbsolute("MPI") << "MPISource in " << ModeDescription[mode_] << " mode."; + + // Check how many processes are there in MPI_COMM_WORLD + int size; + MPI_Comm_size(MPI_COMM_WORLD, &size); + if (size != 2) { + throw edm::Exception(edm::errors::Configuration) + << "The current implementation supports only two processes: one controller and one source."; + } + + // Check the rank of this process, and determine the rank of the other process. + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + edm::LogAbsolute("MPI") << "MPISource has rank " << rank << " in MPI_COMM_WORLD."; + int other_rank = 1 - rank; + comm_ = MPI_COMM_WORLD; + channel_ = MPIChannel(comm_, other_rank); + } else if (mode_ == kIntercommunicator) { + // Use an intercommunicator to let two groups of processes communicate with each other. + // The current implementation supports only two processes: one controller and one source. + edm::LogAbsolute("MPI") << "MPISource in " << ModeDescription[mode_] << " mode."; + + // Check how many processes are there in MPI_COMM_WORLD + int size; + MPI_Comm_size(MPI_COMM_WORLD, &size); + if (size != 1) { + throw edm::Exception(edm::errors::Configuration) + << "The current implementation supports only two processes: one controller and one source."; + } + + // Open a server-side port. + MPI_Open_port(MPI_INFO_NULL, port_); + + // Publish the port under the name indicated by the parameter "server". + std::string name = config.getUntrackedParameter("name", "server"); + MPI_Info port_info; + MPI_Info_create(&port_info); + MPI_Info_set(port_info, "ompi_global_scope", "true"); + MPI_Info_set(port_info, "ompi_unique", "true"); + MPI_Publish_name(name.c_str(), port_info, port_); + + // Create an intercommunicator and accept a client connection. + edm::LogAbsolute("MPI") << "Waiting for a connection to the MPI server at port " << port_; + + MPI_Comm_accept(port_, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_); + edm::LogAbsolute("MPI") << "Connection accepted."; + channel_ = MPIChannel(comm_, 0); + } else { + // Invalid mode. + throw edm::Exception(edm::errors::Configuration) + << "Invalid mode \"" << config.getUntrackedParameter("mode") << "\""; + } + + // Wait for a client to connect. + MPI_Status status; + EDM_MPI_Empty_t buffer; + MPI_Recv(&buffer, 1, EDM_MPI_Empty, MPI_ANY_SOURCE, EDM_MPI_Connect, comm_, &status); + edm::LogAbsolute("MPI") << "connected from " << status.MPI_SOURCE; +} + +MPISource::~MPISource() { + if (mode_ == kIntercommunicator) { + // Close the intercommunicator. + MPI_Comm_disconnect(&comm_); + + // Unpublish and close the port. + MPI_Info port_info; + MPI_Info_create(&port_info); + MPI_Info_set(port_info, "ompi_global_scope", "true"); + MPI_Info_set(port_info, "ompi_unique", "true"); + MPI_Unpublish_name("server", port_info, port_); + MPI_Close_port(port_); + } +} + +//MPISource::ItemTypeInfo MPISource::getNextItemType() { +bool MPISource::setRunAndEventInfo(edm::EventID& event, + edm::TimeValue_t& time, + edm::EventAuxiliary::ExperimentType& type) { + while (true) { + MPI_Status status; + MPI_Message message; + MPI_Mprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm_, &message, &status); + switch (status.MPI_TAG) { + // Connect message + case EDM_MPI_Connect: { + // receive the message header + EDM_MPI_Empty_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); + + // the Connect message is unexpected here (see above) + throw cms::Exception("InvalidValue") + << "The MPISource has received an EDM_MPI_Connect message after the initial connection"; + return false; + } + + // Disconnect message + case EDM_MPI_Disconnect: { + // receive the message header + EDM_MPI_Empty_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); + + // signal the end of the input data + return false; + } + + // BeginStream message + case EDM_MPI_BeginStream: { + // receive the message header + EDM_MPI_Empty_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); + + // receive the next message + break; + } + + // EndStream message + case EDM_MPI_EndStream: { + // receive the message header + EDM_MPI_Empty_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status); + + // receive the next message + break; + } + + // BeginRun message + case EDM_MPI_BeginRun: { + // receive the RunAuxiliary + EDM_MPI_RunAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_RunAuxiliary, &message, &status); + // TODO this is currently not used + edm::RunAuxiliary runAuxiliary; + edmFromBuffer(buffer, runAuxiliary); + + // receive the ProcessHistory + history_.clear(); + channel_.receiveProduct(0, history_); + history_.initializeTransients(); + if (processHistoryRegistryForUpdate().registerProcessHistory(history_)) { + edm::LogAbsolute("MPI") << "new ProcessHistory registered: " << history_; + } + + // receive the next message + break; + } + + // EndRun message + case EDM_MPI_EndRun: { + // receive the RunAuxiliary message + EDM_MPI_RunAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_RunAuxiliary, &message, &status); + + // receive the next message + break; + } + + // BeginLuminosityBlock message + case EDM_MPI_BeginLuminosityBlock: { + // receive the LuminosityBlockAuxiliary + EDM_MPI_LuminosityBlockAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, &message, &status); + // TODO this is currently not used + edm::LuminosityBlockAuxiliary luminosityBlockAuxiliary; + edmFromBuffer(buffer, luminosityBlockAuxiliary); + + // receive the next message + break; + } + + // EndLuminosityBlock message + case EDM_MPI_EndLuminosityBlock: { + // receive the LuminosityBlockAuxiliary + EDM_MPI_LuminosityBlockAuxiliary_t buffer; + MPI_Mrecv(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, &message, &status); + + // receive the next message + break; + } + + // ProcessEvent message + case EDM_MPI_ProcessEvent: { + // receive the EventAuxiliary + edm::EventAuxiliary aux; + status = channel_.receiveEvent(aux, message); + + // extract the rank of the other process (currently unused) + int source = status.MPI_SOURCE; + (void)source; + + // fill the event details + event = aux.id(); + time = aux.time().value(); + type = aux.experimentType(); + + // signal a new event + return true; + } + + // unexpected message + default: { + throw cms::Exception("InvalidValue") + << "The MPISource has received an unknown message with tag " << status.MPI_TAG; + return false; + } + } + } +} + +void MPISource::produce(edm::Event& event) { + // duplicate the MPIChannel and put the copy into the Event + std::shared_ptr channel(new MPIChannel(channel_.duplicate()), [](MPIChannel* ptr) { + ptr->reset(); + delete ptr; + }); + event.emplace(token_, std::move(channel)); +} + +void MPISource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + descriptions.setComment( + "This module connects to an \"MPIController\" in a separate CMSSW job, receives all Run, LuminosityBlock and " + "Event transitions from the remote process and reproduces them in the local one."); + + edm::ParameterSetDescription desc; + edm::ProducerSourceBase::fillDescription(desc); + desc.ifValue( + edm::ParameterDescription("mode", "CommWorld", false), + ModeDescription[kCommWorld] >> edm::EmptyGroupDescription() or + ModeDescription[kIntercommunicator] >> edm::ParameterDescription("name", "server", false)) + ->setComment( + "Valid modes are CommWorld (use MPI_COMM_WORLD) and Intercommunicator (use an MPI name server to setup an " + "intercommunicator)."); + + descriptions.add("source", desc); +} + +#include "FWCore/Framework/interface/InputSourceMacros.h" +DEFINE_FWK_INPUT_SOURCE(MPISource); diff --git a/HeterogeneousCore/MPICore/src/api.cc b/HeterogeneousCore/MPICore/src/api.cc new file mode 100644 index 0000000000000..978c25c6f5d19 --- /dev/null +++ b/HeterogeneousCore/MPICore/src/api.cc @@ -0,0 +1,238 @@ +// C++ standard library headers +#include +#include +#include +#include +#include + +// ROOT headers +#include +#include + +// CMSSW headers +#include "DataFormats/Provenance/interface/EventAuxiliary.h" +#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h" +#include "DataFormats/Provenance/interface/RunAuxiliary.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "HeterogeneousCore/MPICore/interface/api.h" +#include "HeterogeneousCore/MPICore/interface/conversion.h" +#include "HeterogeneousCore/MPICore/interface/messages.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/WriterBase.h" + +namespace { + // copy the content of an std::string-like object to an N-sized char buffer: + // if the string is larger than the buffer, copy only the first N bytes; + // if the string is smaller than the buffer, fill the rest of the buffer with NUL characters. + template + void copy_and_fill(char (&dest)[N], S const& src) { + if (std::size(src) < N) { + memset(dest, 0x00, N); + memcpy(dest, src.data(), std::size(src)); + } else { + memcpy(dest, src.data(), N); + } + } +} // namespace + +// build a new MPIChannel that uses a duplicate of the underlying communicator and the same destination +MPIChannel MPIChannel::duplicate() const { + MPI_Comm newcomm; + MPI_Comm_dup(comm_, &newcomm); + return MPIChannel(newcomm, dest_); +} + +// close the underlying communicator and reset the MPIChannel to an invalid state +void MPIChannel::reset() { + MPI_Comm_disconnect(&comm_); + dest_ = MPI_UNDEFINED; +} + +// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary buffer +void MPIChannel::edmFromBuffer_(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux) { + aux = edm::RunAuxiliary(buffer.run, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_RunAuxiliary buffer from an edm::RunAuxiliary object +void MPIChannel::edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); +} + +// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary buffer +void MPIChannel::edmFromBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux) { + aux = edm::LuminosityBlockAuxiliary( + buffer.run, buffer.lumi, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_LuminosityBlockAuxiliary buffer from an edm::LuminosityBlockAuxiliary object +void MPIChannel::edmToBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); +} + +// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary buffer +void MPIChannel::edmFromBuffer_(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux) { + aux = edm::EventAuxiliary({buffer.run, buffer.lumi, buffer.event}, + std::string(buffer.processGuid, std::size(buffer.processGuid)), + edm::Timestamp(buffer.time), + buffer.realData, + static_cast(buffer.experimentType), + buffer.bunchCrossing, + buffer.storeNumber, + buffer.orbitNumber); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_EventAuxiliary buffer from an edm::EventAuxiliary object +void MPIChannel::edmToBuffer_(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + copy_and_fill(buffer.processGuid, aux.processGUID()); + buffer.time = aux.time().value(); + buffer.realData = aux.isRealData(); + buffer.experimentType = aux.experimentType(); + buffer.bunchCrossing = aux.bunchCrossing(); + buffer.orbitNumber = aux.orbitNumber(); + buffer.storeNumber = aux.storeNumber(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); + buffer.event = aux.id().event(); +} + +// fill and send an EDM_MPI_Empty_t buffer +void MPIChannel::sendEmpty_(int tag) { + EDM_MPI_Empty_t buffer; + buffer.messageTag = tag; + MPI_Send(&buffer, 1, EDM_MPI_Empty, dest_, tag, comm_); +} + +// fill and send an EDM_MPI_RunAuxiliary_t buffer +void MPIChannel::sendRunAuxiliary_(int tag, edm::RunAuxiliary const& aux) { + EDM_MPI_RunAuxiliary_t buffer; + buffer.messageTag = tag; + edmToBuffer_(buffer, aux); + MPI_Send(&buffer, 1, EDM_MPI_RunAuxiliary, dest_, tag, comm_); +} + +// fill and send an EDM_MPI_RunAuxiliary_t buffer +void MPIChannel::sendLuminosityBlockAuxiliary_(int tag, edm::LuminosityBlockAuxiliary const& aux) { + EDM_MPI_LuminosityBlockAuxiliary_t buffer; + buffer.messageTag = tag; + edmToBuffer_(buffer, aux); + MPI_Send(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, dest_, tag, comm_); +} + +// fill and send an EDM_MPI_EventAuxiliary_t buffer +void MPIChannel::sendEventAuxiliary_(edm::EventAuxiliary const& aux) { + EDM_MPI_EventAuxiliary_t buffer; + buffer.messageTag = EDM_MPI_ProcessEvent; + edmToBuffer_(buffer, aux); + MPI_Send(&buffer, 1, EDM_MPI_EventAuxiliary, dest_, EDM_MPI_ProcessEvent, comm_); +} + +/* +// receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary +MPI_Status MPIChannel::receiveEventAuxiliary_(edm::EventAuxiliary& aux, int source, int tag) { + MPI_Status status; + EDM_MPI_EventAuxiliary_t buffer; + MPI_Recv(&buffer, 1, EDM_MPI_EventAuxiliary, source, tag, comm_, &status); + edmFromBuffer_(buffer, aux); + return status; +} +*/ + +// receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary +MPI_Status MPIChannel::receiveEventAuxiliary_(edm::EventAuxiliary& aux, MPI_Message& message) { + MPI_Status status = {}; + EDM_MPI_EventAuxiliary_t buffer; +#ifdef EDM_ML_DEBUG + memset(&buffer, 0x00, sizeof(buffer)); +#endif + status.MPI_ERROR = MPI_Mrecv(&buffer, 1, EDM_MPI_EventAuxiliary, &message, &status); + edmFromBuffer_(buffer, aux); + return status; +} + +void MPIChannel::sendMetadata(int instance, std::shared_ptr meta) { + int tag = EDM_MPI_SendMetadata | instance * EDM_MPI_MessageTagWidth_; + meta->setHeader(); + MPI_Ssend(meta->data(), meta->size(), MPI_BYTE, dest_, tag, comm_); +} + +void MPIChannel::receiveMetadata(int instance, std::shared_ptr meta) { + int tag = EDM_MPI_SendMetadata | instance * EDM_MPI_MessageTagWidth_; + meta->receiveMetadata(dest_, tag, comm_); +} + +void MPIChannel::sendBuffer(const void* buf, size_t size, int instance, EDM_MPI_MessageTag tag) { + int commtag = tag | instance * EDM_MPI_MessageTagWidth_; + MPI_Send(buf, size, MPI_BYTE, dest_, commtag, comm_); +} + +void MPIChannel::sendSerializedProduct_(int instance, TClass const* type, void const* product) { + TBufferFile buffer{TBuffer::kWrite}; + type->Streamer(const_cast(product), buffer); + int tag = EDM_MPI_SendSerializedProduct | instance * EDM_MPI_MessageTagWidth_; + MPI_Send(buffer.Buffer(), buffer.Length(), MPI_BYTE, dest_, tag, comm_); +} + +std::unique_ptr MPIChannel::receiveSerializedBuffer(int instance, int bufSize) { + int tag = EDM_MPI_SendSerializedProduct | instance * EDM_MPI_MessageTagWidth_; + MPI_Status status; + auto buffer = std::make_unique(TBuffer::kRead, bufSize); +#ifdef EDM_ML_DEBUG + memset(buffer->Buffer(), 0xff, buffer->BufferSize()); +#endif + MPI_Recv(buffer->Buffer(), bufSize, MPI_BYTE, dest_, tag, comm_, &status); + int receivedCount = 0; + MPI_Get_count(&status, MPI_BYTE, &receivedCount); + assert(receivedCount == bufSize && "received serialized buffer size mismatches the size expected from metadata"); + // set the buffer length + buffer->SetBufferOffset(receivedCount); + return buffer; +} + +void MPIChannel::receiveSerializedProduct_(int instance, TClass const* type, void* product) { + int tag = EDM_MPI_SendSerializedProduct | instance * EDM_MPI_MessageTagWidth_; + MPI_Message message; + MPI_Status status; + MPI_Mprobe(dest_, tag, comm_, &message, &status); + int size; + MPI_Get_count(&status, MPI_BYTE, &size); + TBufferFile buffer{TBuffer::kRead, size}; + MPI_Mrecv(buffer.Buffer(), size, MPI_BYTE, &message, &status); + type->Streamer(product, buffer); +} + +void MPIChannel::sendTrivialCopyProduct(int instance, const ngt::ReaderBase& reader) { + int tag = EDM_MPI_SendTrivialCopyProduct | instance * EDM_MPI_MessageTagWidth_; + // transfer the memory regions + auto regions = reader.regions(); + // TODO send the number of regions ? + for (size_t i = 0; i < regions.size(); ++i) { + assert(regions[i].data() != nullptr); + MPI_Send(regions[i].data(), regions[i].size_bytes(), MPI_BYTE, dest_, tag, comm_); + } +} + +void MPIChannel::receiveInitializedTrivialCopy(int instance, ngt::WriterBase& writer) { + int tag = EDM_MPI_SendTrivialCopyProduct | instance * EDM_MPI_MessageTagWidth_; + MPI_Status status; + // receive the memory regions + auto regions = writer.regions(); + // TODO receive and validate the number of regions ? + for (size_t i = 0; i < regions.size(); ++i) { + assert(regions[i].data() != nullptr); + MPI_Recv(regions[i].data(), regions[i].size_bytes(), MPI_BYTE, dest_, tag, comm_, &status); + } +} diff --git a/HeterogeneousCore/MPICore/src/classes.h b/HeterogeneousCore/MPICore/src/classes.h new file mode 100644 index 0000000000000..6909ea88bb3c9 --- /dev/null +++ b/HeterogeneousCore/MPICore/src/classes.h @@ -0,0 +1,2 @@ +#include "DataFormats/Common/interface/Wrapper.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" diff --git a/HeterogeneousCore/MPICore/src/classes_def.xml b/HeterogeneousCore/MPICore/src/classes_def.xml new file mode 100644 index 0000000000000..5f4cdb8fcbbf5 --- /dev/null +++ b/HeterogeneousCore/MPICore/src/classes_def.xml @@ -0,0 +1,4 @@ + + + + diff --git a/HeterogeneousCore/MPICore/src/conversion.cc b/HeterogeneousCore/MPICore/src/conversion.cc new file mode 100644 index 0000000000000..3272bb827cda8 --- /dev/null +++ b/HeterogeneousCore/MPICore/src/conversion.cc @@ -0,0 +1,86 @@ +// C++ standard library headers +#include +#include + +// CMSSW headers +#include "DataFormats/Provenance/interface/EventAuxiliary.h" +#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h" +#include "DataFormats/Provenance/interface/RunAuxiliary.h" +#include "HeterogeneousCore/MPICore/interface/conversion.h" +#include "HeterogeneousCore/MPICore/interface/messages.h" + +namespace { + // copy the content of an std::string-like object to an N-sized char buffer: + // if the string is larger than the buffer, copy only the first N bytes; + // if the string is smaller than the buffer, fill the rest of the buffer with NUL characters. + template + void copy_and_fill(char (&dest)[N], S const& src) { + if (std::size(src) < N) { + memset(dest, 0x00, N); + memcpy(dest, src.data(), std::size(src)); + } else { + memcpy(dest, src.data(), N); + } + } +} // namespace + +// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary buffer +void edmFromBuffer(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux) { + aux = edm::RunAuxiliary(buffer.run, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_RunAuxiliary buffer from an edm::RunAuxiliary object +void edmToBuffer(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); +} + +// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary buffer +void edmFromBuffer(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux) { + aux = edm::LuminosityBlockAuxiliary( + buffer.run, buffer.lumi, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime)); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_LuminosityBlockAuxiliary buffer from an edm::LuminosityBlockAuxiliary object +void edmToBuffer(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + buffer.beginTime = aux.beginTime().value(); + buffer.endTime = aux.endTime().value(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); +} + +// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary buffer +void edmFromBuffer(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux) { + aux = edm::EventAuxiliary({buffer.run, buffer.lumi, buffer.event}, + std::string(buffer.processGuid, std::size(buffer.processGuid)), + edm::Timestamp(buffer.time), + buffer.realData, + static_cast(buffer.experimentType), + buffer.bunchCrossing, + buffer.storeNumber, + buffer.orbitNumber); + aux.setProcessHistoryID( + edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID)))); +} + +// fill an EDM_MPI_EventAuxiliary buffer from an edm::EventAuxiliary object +void edmToBuffer(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux) { + copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm()); + copy_and_fill(buffer.processGuid, aux.processGUID()); + buffer.time = aux.time().value(); + buffer.realData = aux.isRealData(); + buffer.experimentType = aux.experimentType(); + buffer.bunchCrossing = aux.bunchCrossing(); + buffer.orbitNumber = aux.orbitNumber(); + buffer.storeNumber = aux.storeNumber(); + buffer.run = aux.id().run(); + buffer.lumi = aux.id().luminosityBlock(); + buffer.event = aux.id().event(); +} diff --git a/HeterogeneousCore/MPICore/src/macros.h b/HeterogeneousCore/MPICore/src/macros.h new file mode 100644 index 0000000000000..142d830595775 --- /dev/null +++ b/HeterogeneousCore/MPICore/src/macros.h @@ -0,0 +1,149 @@ +#ifndef HeterogeneousCore_MPICore_interface_macros_h +#define HeterogeneousCore_MPICore_interface_macros_h + +// C++ standard library headers +#include + +// MPI headers +#include + +// Boost headers +#include + +namespace mpi_traits { + template + constexpr inline size_t mpi_length = 1; + + template + constexpr inline size_t mpi_length = N; + + template + struct mpi_type { + inline static const MPI_Datatype value = MPI_DATATYPE_NULL; + }; + + template + struct mpi_type { + inline static const MPI_Datatype value = mpi_type::value; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_CHAR; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED_CHAR; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_WCHAR; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_SHORT; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED_SHORT; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_INT; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_LONG; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED_LONG; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_LONG_LONG; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_UNSIGNED_LONG_LONG; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_FLOAT; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_DOUBLE; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_LONG_DOUBLE; + }; + + template <> + struct mpi_type { + inline static const MPI_Datatype value = MPI_BYTE; + }; +} // namespace mpi_traits + +// clang-format off + +#define _GET_MPI_TYPE_LENGTH_IMPL(STRUCT, FIELD) \ + mpi_traits::mpi_length + +#define _GET_MPI_TYPE_LENGTH(R, STRUCT, FIELD) \ + _GET_MPI_TYPE_LENGTH_IMPL(STRUCT, FIELD), + +#define _GET_MPI_TYPE_LENGTHS(STRUCT, ...) \ + BOOST_PP_SEQ_FOR_EACH(_GET_MPI_TYPE_LENGTH, STRUCT, BOOST_PP_VARIADIC_TO_SEQ(__VA_ARGS__)) + +#define _GET_MPI_TYPE_OFFSET_IMPL(STRUCT, FIELD) \ + offsetof(STRUCT, FIELD) + +#define _GET_MPI_TYPE_OFFSET(R, STRUCT, FIELD) \ + _GET_MPI_TYPE_OFFSET_IMPL(STRUCT, FIELD), + +#define _GET_MPI_TYPE_OFFSETS(STRUCT, ...) \ + BOOST_PP_SEQ_FOR_EACH(_GET_MPI_TYPE_OFFSET, STRUCT, BOOST_PP_VARIADIC_TO_SEQ(__VA_ARGS__)) + +#define _GET_MPI_TYPE_TYPEID_IMPL(STRUCT, FIELD) \ + mpi_traits::mpi_type::value + +#define _GET_MPI_TYPE_TYPEID(R, STRUCT, FIELD) \ + _GET_MPI_TYPE_TYPEID_IMPL(STRUCT, FIELD), + +#define _GET_MPI_TYPE_TYPEIDS(STRUCT, ...) \ + BOOST_PP_SEQ_FOR_EACH(_GET_MPI_TYPE_TYPEID, STRUCT, BOOST_PP_VARIADIC_TO_SEQ(__VA_ARGS__)) + +#define DECLARE_MPI_TYPE(TYPE, STRUCT, ...) \ + _Pragma("GCC diagnostic push"); \ + _Pragma("GCC diagnostic ignored \"-Winvalid-offsetof\""); \ + { \ + constexpr int lenghts[] = {_GET_MPI_TYPE_LENGTHS(STRUCT, __VA_ARGS__)}; \ + constexpr MPI_Aint displacements[] = {_GET_MPI_TYPE_OFFSETS(STRUCT, __VA_ARGS__)}; \ + const MPI_Datatype types[] = {_GET_MPI_TYPE_TYPEIDS(STRUCT, __VA_ARGS__)}; \ + MPI_Type_create_struct(std::size(lenghts), lenghts, displacements, types, &TYPE); \ + MPI_Type_commit(&TYPE); \ + } \ + _Pragma("GCC diagnostic pop") + +// clang-format on + +#endif // HeterogeneousCore_MPICore_interface_macros_h diff --git a/HeterogeneousCore/MPICore/src/messages.cc b/HeterogeneousCore/MPICore/src/messages.cc new file mode 100644 index 0000000000000..7bd318e4f7bb2 --- /dev/null +++ b/HeterogeneousCore/MPICore/src/messages.cc @@ -0,0 +1,81 @@ +// C++ standard library headers +#include + +// MPI headers +#include + +// Boost headers +#include + +// CMSSW headers +#include "HeterogeneousCore/MPICore/interface/messages.h" + +// local headers +#include "macros.h" + +MPI_Datatype EDM_MPI_Empty; +MPI_Datatype EDM_MPI_RunAuxiliary; +MPI_Datatype EDM_MPI_LuminosityBlockAuxiliary; +MPI_Datatype EDM_MPI_EventAuxiliary; + +MPI_Datatype EDM_MPI_MessageType[EDM_MPI_MessageTagCount_]; + +void EDM_MPI_build_types_() { + // EDM_MPI_Empty + DECLARE_MPI_TYPE(EDM_MPI_Empty, // MPI_Datatype + EDM_MPI_Empty_t, // C++ struct + messageTag); // EDM_MPI_MessageTag + + // EDM_MPI_RunAuxiliary + DECLARE_MPI_TYPE(EDM_MPI_RunAuxiliary, // MPI_Datatype + EDM_MPI_RunAuxiliary_t, // C++ struct + messageTag, // EDM_MPI_MessageTag + processHistoryID, // edm::ProcessHistoryID::compactForm() + beginTime, // edm::TimeValue_t + endTime, // edm::TimeValue_t + run); // edm::RunNumber_t + + // EDM_MPI_LuminosityBlockAuxiliary + DECLARE_MPI_TYPE(EDM_MPI_LuminosityBlockAuxiliary, // MPI_Datatype + EDM_MPI_LuminosityBlockAuxiliary_t, // C++ struct + messageTag, // EDM_MPI_MessageTag + processHistoryID, // edm::ProcessHistoryID::compactForm() + beginTime, // edm::TimeValue_t + endTime, // edm::TimeValue_t + run, // edm::RunNumber_t + lumi); // edm::LuminosityBlockNumber_t + + // EDM_MPI_EventAuxiliary + DECLARE_MPI_TYPE(EDM_MPI_EventAuxiliary, // MPI_Datatype + EDM_MPI_EventAuxiliary_t, // C++ struct + messageTag, // EDM_MPI_MessageTag + processHistoryID, // edm::ProcessHistoryID::compactForm() + processGuid, // process GUID + time, // edm::TimeValue_t + realData, // real data (true) vs simulation (false) + experimentType, // edm::EventAuxiliary::ExperimentType + bunchCrossing, // LHC bunch crossing + orbitNumber, // LHC orbit number + storeNumber, // LHC fill number ? + run, // edm::RunNumber_t + lumi, // edm::LuminosityBlockNumber_t + event); // edm::EventNumber_t + + EDM_MPI_MessageType[EDM_MPI_Connect] = EDM_MPI_Empty; // + EDM_MPI_MessageType[EDM_MPI_Disconnect] = EDM_MPI_Empty; // + EDM_MPI_MessageType[EDM_MPI_BeginStream] = EDM_MPI_Empty; // + EDM_MPI_MessageType[EDM_MPI_EndStream] = EDM_MPI_Empty; // + EDM_MPI_MessageType[EDM_MPI_BeginRun] = EDM_MPI_RunAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_EndRun] = EDM_MPI_RunAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_BeginLuminosityBlock] = EDM_MPI_LuminosityBlockAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_EndLuminosityBlock] = EDM_MPI_LuminosityBlockAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_ProcessEvent] = EDM_MPI_EventAuxiliary; // + EDM_MPI_MessageType[EDM_MPI_SendSerializedProduct] = MPI_BYTE; // variable-length binary blob + EDM_MPI_MessageType[EDM_MPI_SendTrivialProduct] = MPI_BYTE; // variable-length binary blob + EDM_MPI_MessageType[EDM_MPI_SendMetadata] = MPI_BYTE; // +} + +void EDM_MPI_build_types() { + static std::once_flag flag; + std::call_once(flag, EDM_MPI_build_types_); +} diff --git a/HeterogeneousCore/MPICore/src/metadata.cc b/HeterogeneousCore/MPICore/src/metadata.cc new file mode 100644 index 0000000000000..f130ddf0ba080 --- /dev/null +++ b/HeterogeneousCore/MPICore/src/metadata.cc @@ -0,0 +1,241 @@ +// C++ standard library headers +#include +#include +#include + +// CMSSW headers +#include "HeterogeneousCore/MPICore/interface/metadata.h" + +ProductMetadataBuilder::ProductMetadataBuilder() : buffer_(nullptr), capacity_(0), size_(0), readOffset_(0) { + // reserve at least 13 bytes for header + reserve(maxMetadataSize_); + size_ = headerSize_; +} + +ProductMetadataBuilder::ProductMetadataBuilder(size_t expectedSize) + : buffer_(nullptr), capacity_(0), size_(0), readOffset_(0) { + reserve(expectedSize + headerSize_); + size_ = headerSize_; +} + +ProductMetadataBuilder::~ProductMetadataBuilder() { std::free(buffer_); } + +ProductMetadataBuilder::ProductMetadataBuilder(ProductMetadataBuilder&& other) noexcept + : buffer_(other.buffer_), capacity_(other.capacity_), size_(other.size_), readOffset_(other.readOffset_) { + other.buffer_ = nullptr; + other.capacity_ = 0; + other.size_ = 0; + other.readOffset_ = 0; +} + +ProductMetadataBuilder& ProductMetadataBuilder::operator=(ProductMetadataBuilder&& other) noexcept { + if (this != &other) { + std::free(buffer_); + buffer_ = other.buffer_; + capacity_ = other.capacity_; + size_ = other.size_; + readOffset_ = other.readOffset_; + other.buffer_ = nullptr; + other.capacity_ = 0; + other.size_ = 0; + other.readOffset_ = 0; + } + return *this; +} + +void ProductMetadataBuilder::reserve(size_t bytes) { + if (capacity_ >= bytes) + return; + resizeBuffer(bytes); +} + +void ProductMetadataBuilder::setHeader() { + assert(size_ >= headerSize_ && "Buffer must reserve space for header"); + std::memcpy(buffer_, &productCount_, sizeof(int64_t)); // first 8 bytes + buffer_[8] = productFlags_; // indicate which products are present + std::memcpy(buffer_ + 9, &serializedBufferSize_, sizeof(int)); // size of serialized products +} + +void ProductMetadataBuilder::addMissing() { + productFlags_ |= HasMissing; + append(static_cast(ProductMetadata::Kind::Missing)); +} + +void ProductMetadataBuilder::addSerialized(size_t size) { + productFlags_ |= HasSerialized; + append(static_cast(ProductMetadata::Kind::Serialized)); + append(size); + serializedBufferSize_ += size; +} + +void ProductMetadataBuilder::addTrivialCopy(const std::byte* buffer, size_t size) { + productFlags_ |= HasTrivialCopy; + append(static_cast(ProductMetadata::Kind::TrivialCopy)); + append(size); + appendBytes(buffer, size); +} + +const uint8_t* ProductMetadataBuilder::data() const { return buffer_; } +uint8_t* ProductMetadataBuilder::data() { return buffer_; } +size_t ProductMetadataBuilder::size() const { return size_; } +std::span ProductMetadataBuilder::buffer() const { return {buffer_, size_}; } + +void ProductMetadataBuilder::receiveMetadata(int src, int tag, MPI_Comm comm) { + MPI_Status status; + MPI_Recv(buffer_, maxMetadataSize_, MPI_BYTE, src, tag, comm, &status); + //add error hadling if message too long + int receivedBytes = 0; + MPI_Get_count(&status, MPI_BYTE, &receivedBytes); + assert(static_cast(receivedBytes) >= headerSize_ && "received metadata was less than header size"); + productCount_ = consume(); + productFlags_ = consume(); + serializedBufferSize_ = consume(); + size_ = receivedBytes; +} + +ProductMetadata ProductMetadataBuilder::getNext() { + if (readOffset_ >= size_) + throw std::out_of_range("No more metadata entries"); + + ProductMetadata meta; + auto kind = static_cast(consume()); + meta.kind = kind; + + switch (kind) { + case ProductMetadata::Kind::Missing: + break; + + case ProductMetadata::Kind::Serialized: + meta.sizeMeta = consume(); + break; + + case ProductMetadata::Kind::TrivialCopy: { + size_t blobSize = consume(); + if (readOffset_ + blobSize > size_) { + throw std::runtime_error("Metadata buffer too short for trivialCopy data"); + } + meta.sizeMeta = blobSize; + meta.trivialCopyOffset = buffer_ + readOffset_; + readOffset_ += blobSize; + break; + } + + default: + throw std::runtime_error("Unknown metadata kind"); + } + + return meta; +} + +void ProductMetadataBuilder::resizeBuffer(size_t newCap) { + uint8_t* newBuf = static_cast(std::realloc(buffer_, newCap)); + if (!newBuf) + throw std::bad_alloc(); + buffer_ = newBuf; + capacity_ = newCap; +} + +void ProductMetadataBuilder::ensureCapacity(size_t needed) { + if (size_ + needed <= capacity_) + return; + + size_t newCapacity = capacity_ ? capacity_ : 64; + while (size_ + needed > newCapacity) + newCapacity *= 2; + + uint8_t* newData = static_cast(std::realloc(buffer_, newCapacity)); + if (!newData) + throw std::bad_alloc(); + buffer_ = newData; + capacity_ = newCapacity; +} + +void ProductMetadataBuilder::appendBytes(const std::byte* src, size_t size) { + ensureCapacity(size); + std::memcpy(buffer_ + size_, src, size); + size_ += size; +} + +void ProductMetadataBuilder::debugPrintMetadataSummary() const { + if (size_ < headerSize_) { + std::cerr << "ERROR: Buffer too small to contain header\n"; + return; + } + + std::ostringstream out; + size_t offset = headerSize_; // Skip the header + size_t count = 0; + size_t numMissing = 0; + size_t numSerialized = 0; + size_t numTrivial = 0; + + uint64_t headerCount = 0; + std::memcpy(&headerCount, buffer_, sizeof(uint64_t)); + uint8_t flags = buffer_[sizeof(uint64_t)]; + + out << "---- ProductMetadata Debug Summary ----\n"; + out << "Header:\n"; + out << " Product count: " << headerCount << "\n"; + out << " Flags: " << ((flags & HasMissing) ? "Missing " : "") << ((flags & HasSerialized) ? "Serialized " : "") + << ((flags & HasTrivialCopy) ? "TrivialCopy " : "") << "\n\n"; + + while (offset < size_) { + uint8_t kindVal = buffer_[offset]; + auto kind = static_cast(kindVal); + offset += sizeof(uint8_t); + count++; + + out << "Product #" << count << ": "; + + switch (kind) { + case ProductMetadata::Kind::Missing: + numMissing++; + out << "Missing\n"; + break; + + case ProductMetadata::Kind::Serialized: { + if (offset + sizeof(size_t) > size_) { + out << "ERROR: Corrupted serialized metadata\n"; + return; + } + size_t sz; + std::memcpy(&sz, buffer_ + offset, sizeof(size_t)); + offset += sizeof(size_t); + numSerialized++; + out << "Serialized, size = " << sz << "\n"; + break; + } + + case ProductMetadata::Kind::TrivialCopy: { + if (offset + sizeof(size_t) > size_) { + out << "ERROR: Corrupted trivial copy metadata\n"; + return; + } + size_t sz; + std::memcpy(&sz, buffer_ + offset, sizeof(size_t)); + offset += sizeof(size_t); + if (offset + sz > size_) { + out << "ERROR: Trivial copy data overflows buffer\n"; + return; + } + offset += sz; + numTrivial++; + out << "TrivialCopy, size = " << sz << "\n"; + break; + } + + default: + out << "Unknown kind: " << static_cast(kindVal) << "\n"; + return; + } + } + + out << "----------------------------------------\n"; + out << "Total entries parsed: " << count << "\n"; + out << " Missing: " << numMissing << "\n"; + out << " Serialized: " << numSerialized << "\n"; + out << " TrivialCopy: " << numTrivial << "\n"; + out << "Total buffer size: " << size_ << " bytes\n"; + + std::cerr << out.str() << std::flush; +} diff --git a/HeterogeneousCore/MPICore/test/BuildFile.xml b/HeterogeneousCore/MPICore/test/BuildFile.xml index 63b96e9989075..bdc6dc143aad6 100644 --- a/HeterogeneousCore/MPICore/test/BuildFile.xml +++ b/HeterogeneousCore/MPICore/test/BuildFile.xml @@ -3,6 +3,12 @@ + + + + + + diff --git a/HeterogeneousCore/MPICore/test/controller_cfg.py b/HeterogeneousCore/MPICore/test/controller_cfg.py new file mode 100644 index 0000000000000..dc03bff4d1a27 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/controller_cfg.py @@ -0,0 +1,58 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIServer") + +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 4 +# MPIController supports a single concurrent LuminosityBlock +process.options.numberOfConcurrentLuminosityBlocks = 1 +process.options.numberOfConcurrentRuns = 1 +process.options.wantSummary = False + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") + +from eventlist_cff import eventlist +process.source = cms.Source("EmptySourceFromEventIDs", + events = cms.untracked(eventlist) +) + +process.maxEvents.input = 100 + +from HeterogeneousCore.MPICore.modules import * + +process.mpiController = MPIController( + mode = 'CommWorld' +) + +process.ids = cms.EDProducer("edmtest::EventIDProducer") + +process.initialcheck = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag('ids') +) + +process.sender = MPISender( + upstream = "mpiController", + instance = 42, + products = [ "edmEventID_ids__*" ] +) + +process.othersender = MPISender( + upstream = "mpiController", + instance = 19, + products = [ "edmEventID_ids__*" ] +) + +process.receiver = MPIReceiver( + upstream = "othersender", # guarantees that this module will only run after "othersender" has run + instance = 99, + products = [ dict( + type = "edm::EventID", + label = "" + )] +) + +process.finalcheck = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag('receiver') +) + +process.path = cms.Path(process.mpiController + process.ids + process.initialcheck + process.sender + process.othersender + process.receiver + process.finalcheck) diff --git a/HeterogeneousCore/MPICore/test/controller_complex_cfg.py b/HeterogeneousCore/MPICore/test/controller_complex_cfg.py new file mode 100644 index 0000000000000..87cb7920a617b --- /dev/null +++ b/HeterogeneousCore/MPICore/test/controller_complex_cfg.py @@ -0,0 +1,95 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIServer") + +process.load("FWCore.MessageService.MessageLogger_cfi") +process.MessageLogger.cerr.INFO.limit = 10000000 + +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 4 +# MPIController supports a single concurrent LuminosityBlock +process.options.numberOfConcurrentLuminosityBlocks = 1 +process.options.numberOfConcurrentRuns = 1 +process.options.wantSummary = False + +process.source = cms.Source("EmptySource") +process.maxEvents.input = 10 + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") + +from HeterogeneousCore.MPICore.modules import * + +process.mpiController = MPIController( + mode = 'CommWorld' +) + +# Phase-1 FED RAW data collection pseudo object +process.fedRawDataCollectionProducer = cms.EDProducer("TestWriteFEDRawDataCollection", + # Test values below are meaningless. We just make sure when we read + # we get the same values. + FEDData0 = cms.vuint32(0, 1, 2, 3, 4, 5, 6, 7), + FEDData3 = cms.vuint32(100, 101, 102, 103, 104, 105, 106, 107) +) + +# Phase-2 RAW data buffer pseudo object +process.rawDataBufferProducer = cms.EDProducer("TestWriteRawDataBuffer", + # Test values below are meaningless. We just make sure when we read + # we get the same values. + dataPattern1 = cms.vuint32(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15), + dataPattern2 = cms.vuint32(100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115) +) + +# HLT trigger event pseudo object +process.triggerEventProducer = cms.EDProducer("TestWriteTriggerEvent", + # Test values below are meaningless. We just make sure when we read + # we get the same values. + usedProcessName = cms.string("testName"), + collectionTags = cms.vstring('moduleA', 'moduleB', 'moduleC'), + collectionKeys = cms.vuint32(11, 21, 31), + ids = cms.vint32(1, 3, 5), + # stick to values exactly convertible from double to float + # to avoid potential rounding issues in the test, because + # the configuration only supports double not float and + # the data format holds floats. + pts = cms.vdouble(11.0, 21.0, 31.0), + etas = cms.vdouble(101.0, 102.0, 103.0), + phis = cms.vdouble(201.0, 202.0, 203.0), + masses = cms.vdouble(301.0, 302.0, 303.0), + filterTags = cms.vstring('moduleAA', 'moduleBB'), + elementsPerVector = cms.uint32(2), + filterIds = cms.vint32(1001, 1002, 1003, 1004), + filterKeys = cms.vuint32(2001, 2002, 2003, 2004) +) + +# EDM trigger results pseudo object +process.triggerResultsProducer = cms.EDProducer("TestWriteTriggerResults", + # Test values below are meaningless. We just make sure when we read + # we get the same values. + parameterSetID = cms.string('8b99d66b6c3865c75e460791f721202d'), + # names should normally be empty. Only extremely old data or + # has names filled and not empty. If it is not empty, the + # ParameterSetID is ignored and left default constructed. + names = cms.vstring(), + hltStates = cms.vuint32(0, 1, 2, 3), + moduleIndexes = cms.vuint32(11, 21, 31, 41) +) + +process.sender = MPISender( + upstream = "mpiController", + instance = 42, + products = [ + "FEDRawDataCollection_fedRawDataCollectionProducer__*", + "RawDataBuffer_rawDataBufferProducer__*", + "edmTriggerResults_triggerResultsProducer__*", + "triggerTriggerEvent_triggerEventProducer__*" + ] +) + +process.path = cms.Path( + process.mpiController + + process.fedRawDataCollectionProducer + + process.rawDataBufferProducer + + process.triggerEventProducer + + process.triggerResultsProducer + + process.sender +) diff --git a/HeterogeneousCore/MPICore/test/controller_soa_cfg.py b/HeterogeneousCore/MPICore/test/controller_soa_cfg.py new file mode 100644 index 0000000000000..d3e39b0597c11 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/controller_soa_cfg.py @@ -0,0 +1,56 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIServer") + +process.load("FWCore.MessageService.MessageLogger_cfi") +process.MessageLogger.cerr.INFO.limit = 10000000 + +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 4 +# MPIController supports a single concurrent LuminosityBlock +process.options.numberOfConcurrentLuminosityBlocks = 1 +process.options.numberOfConcurrentRuns = 1 +process.options.wantSummary = False + +process.source = cms.Source("EmptySource") +process.maxEvents.input = 10 + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") + +# produce and send a portable object, a portable collection, and some portable multicollections +process.load("Configuration.StandardSequences.Accelerators_cff") +process.load("HeterogeneousCore.AlpakaCore.ProcessAcceleratorAlpaka_cfi") + +from HeterogeneousCore.MPICore.modules import * + +process.mpiController = MPIController( + mode = 'CommWorld' +) + +process.producePortableObjects = cms.EDProducer("TestAlpakaProducer@alpaka", + size = cms.int32(42), + size2 = cms.int32(33), + size3 = cms.int32(61), + alpaka = cms.untracked.PSet( + # "serial_sync", "cuda_async", or "rocm_async" + backend = cms.untracked.string("") + ) +) + +process.sender = MPISender( + upstream = "mpiController", + instance = 42, + products = [ + "portabletestTestStructPortableHostObject_producePortableObjects__*", + "128falseportabletestTestSoALayoutPortableHostCollection_producePortableObjects__*", + "128falseportabletestSoABlocks2PortableHostCollection_producePortableObjects__*", + "128falseportabletestSoABlocks3PortableHostCollection_producePortableObjects__*", + "ushort_producePortableObjects_backend_*" + ] +) + +process.pathSoA = cms.Path( + process.mpiController + + process.producePortableObjects + + process.sender +) diff --git a/HeterogeneousCore/MPICore/test/eventlist_cff.py b/HeterogeneousCore/MPICore/test/eventlist_cff.py new file mode 100644 index 0000000000000..abeb05bf21d7e --- /dev/null +++ b/HeterogeneousCore/MPICore/test/eventlist_cff.py @@ -0,0 +1,39 @@ +import FWCore.ParameterSet.Config as cms + +eventlist = cms.VEventID( + # Run 100 + cms.EventID(100, 1, 1), + cms.EventID(100, 1, 4), + cms.EventID(100, 3, 13), + cms.EventID(100, 3, 17), + cms.EventID(100, 5, 18), + cms.EventID(100, 5, 28), + # Run 101 + cms.EventID(101, 1, 2), + cms.EventID(101, 1, 9), + cms.EventID(101, 2, 10), + cms.EventID(101, 2, 14), + cms.EventID(101, 4, 15), + cms.EventID(101, 4, 16), + # Run 102 + cms.EventID(102, 1, 1), + cms.EventID(102, 1, 18), + cms.EventID(102, 1, 43), + cms.EventID(102, 5, 59), + cms.EventID(102, 8, 85), + cms.EventID(102, 8, 89), + # Run 103 + cms.EventID(103, 1, 13), + cms.EventID(103, 4, 42), + cms.EventID(103, 4, 43), + cms.EventID(103, 4, 44), + cms.EventID(103, 9, 95), + cms.EventID(103, 9, 99), + # Run 104 + cms.EventID(104, 3, 31), + cms.EventID(104, 5, 52), + cms.EventID(104, 8, 83), + cms.EventID(104, 8, 84), + cms.EventID(104, 8, 85), + cms.EventID(104, 8, 89), +) diff --git a/HeterogeneousCore/MPICore/test/follower_cfg.py b/HeterogeneousCore/MPICore/test/follower_cfg.py new file mode 100644 index 0000000000000..443dff041f4c1 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/follower_cfg.py @@ -0,0 +1,51 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIClient") + +process.options.numberOfThreads = 8 +process.options.numberOfStreams = 8 +process.options.numberOfConcurrentLuminosityBlocks = 2 +process.options.numberOfConcurrentRuns = 2 +process.options.wantSummary = False + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") + +from HeterogeneousCore.MPICore.modules import * + +process.source = MPISource() + +process.maxEvents.input = -1 + +# very verbose +#from HeterogeneousCore.MPICore.mpiReporter_cfi import mpiReporter as mpiReporter_ +#process.reporter = mpiReporter_.clone() + +process.receiver = MPIReceiver( + upstream = "source", + instance = 42, + products = [ dict( + type = "edm::EventID", + label = "" + )] +) + +process.otherreceiver = MPIReceiver( + upstream = "source", + instance = 19, + products = [ dict( + type = "edm::EventID", + label = "" + )] +) + +process.sender = MPISender( + upstream = "otherreceiver", # guarantees that this module will only run after otherreceiver has run + instance = 99, + products = [ "edmEventID_otherreceiver__*" ] +) + +process.analyzer = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag("receiver") +) + +process.path = cms.Path(process.receiver + process.analyzer + process.otherreceiver + process.sender) diff --git a/HeterogeneousCore/MPICore/test/follower_complex_cfg.py b/HeterogeneousCore/MPICore/test/follower_complex_cfg.py new file mode 100644 index 0000000000000..e72bbd99ce7f5 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/follower_complex_cfg.py @@ -0,0 +1,90 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIClient") + +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 4 +process.options.wantSummary = False + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") + +from HeterogeneousCore.MPICore.modules import * + +process.source = MPISource() + +process.maxEvents.input = -1 + +process.receiver = MPIReceiver( + upstream = "source", + instance = 42, + products = [ + dict( + type = "FEDRawDataCollection", + label = "fedRawDataCollectionProducer" + ), + dict( + type = "RawDataBuffer", + label = "rawDataBufferProducer" + ), + dict( + type = "edm::TriggerResults", + label = "triggerResultsProducer" + ), + dict( + type = "trigger::TriggerEvent", + label = "triggerEventProducer" + ) + ] +) + +# Phase-1 FED RAW data collection pseudo object +process.testReadFEDRawDataCollection = cms.EDAnalyzer("TestReadFEDRawDataCollection", + fedRawDataCollectionTag = cms.InputTag("receiver", "fedRawDataCollectionProducer"), + expectedFEDData0 = cms.vuint32(0, 1, 2, 3, 4, 5, 6, 7), + expectedFEDData3 = cms.vuint32(100, 101, 102, 103, 104, 105, 106, 107) +) + +# Phase-2 RAW data buffer pseudo object +process.testReadRawDataBuffer = cms.EDAnalyzer("TestReadRawDataBuffer", + rawDataBufferTag = cms.InputTag("receiver", "rawDataBufferProducer"), + dataPattern1 = cms.vuint32(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15), + dataPattern2 = cms.vuint32(100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115) +) + +# HLT trigger event pseudo object +process.testReadTriggerEvent = cms.EDAnalyzer("TestReadTriggerEvent", + expectedUsedProcessName = cms.string("testName"), + expectedCollectionTags = cms.vstring('moduleA', 'moduleB', 'moduleC'), + expectedCollectionKeys = cms.vuint32(11, 21, 31), + expectedIds = cms.vint32(1, 3, 5), + # stick to values exactly convertible from double to float + # to avoid potential rounding issues in the test, because + # the configuration only supports double not float and + # the data format holds floats. + expectedPts = cms.vdouble(11.0, 21.0, 31.0), + expectedEtas = cms.vdouble(101.0, 102.0, 103.0), + expectedPhis = cms.vdouble(201.0, 202.0, 203.0), + expectedMasses = cms.vdouble(301.0, 302.0, 303.0), + expectedFilterTags = cms.vstring('moduleAA', 'moduleBB'), + expectedElementsPerVector = cms.uint32(2), + expectedFilterIds = cms.vint32(1001, 1002, 1003, 1004), + expectedFilterKeys = cms.vuint32(2001, 2002, 2003, 2004), + triggerEventTag = cms.InputTag("receiver", "triggerEventProducer") +) + +# EDM trigger results pseudo object +process.testReadTriggerResults = cms.EDAnalyzer("TestReadTriggerResults", + triggerResultsTag = cms.InputTag("receiver", "triggerResultsProducer"), + expectedParameterSetID = cms.string('8b99d66b6c3865c75e460791f721202d'), + expectedNames = cms.vstring(), + expectedHLTStates = cms.vuint32(0, 1, 2, 3), + expectedModuleIndexes = cms.vuint32(11, 21, 31, 41) +) + +process.path = cms.Path( + process.receiver + + process.testReadFEDRawDataCollection + + process.testReadRawDataBuffer + + process.testReadTriggerEvent + + process.testReadTriggerResults +) diff --git a/HeterogeneousCore/MPICore/test/follower_soa_cfg.py b/HeterogeneousCore/MPICore/test/follower_soa_cfg.py new file mode 100644 index 0000000000000..b250af51332cd --- /dev/null +++ b/HeterogeneousCore/MPICore/test/follower_soa_cfg.py @@ -0,0 +1,57 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIClient") + +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 4 +process.options.wantSummary = False + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") + +from HeterogeneousCore.MPICore.modules import * + +process.source = MPISource() + +process.maxEvents.input = -1 + +# receive and validate a portable object, a portable collection, and some portable multi-block collections +process.receiver = MPIReceiver( + upstream = "source", + instance = 42, + products = [ + dict( + type = "PortableHostObject", + label = "" + ), + dict( + type = "PortableHostCollection >", + label = "" + ), + dict( + type = "PortableHostCollection >", + label = "" + ), + dict( + type = "PortableHostCollection >", + label = "" + ), + dict( + type = "ushort", + label = "backend" + ) + ] +) + +process.validatePortableCollections = cms.EDAnalyzer("TestAlpakaAnalyzer", + source = cms.InputTag("receiver") +) + +process.validatePortableObject = cms.EDAnalyzer("TestAlpakaObjectAnalyzer", + source = cms.InputTag("receiver") +) + +process.pathSoA = cms.Path( + process.receiver + + process.validatePortableCollections + + process.validatePortableObject +) diff --git a/HeterogeneousCore/MPICore/test/testMPICommWorld.sh b/HeterogeneousCore/MPICore/test/testMPICommWorld.sh new file mode 100755 index 0000000000000..e9739bb0e8cfc --- /dev/null +++ b/HeterogeneousCore/MPICore/test/testMPICommWorld.sh @@ -0,0 +1,17 @@ +#! /bin/bash + +# Shell script for testing CMSSW over MPI. +CONTROLLER=$(realpath $1) +FOLLOWER=$(realpath $2) + +# Make sure the CMSSW environment has been loaded. +if [ -z "$CMSSW_BASE" ]; then + eval `scram runtime -sh` +fi + +# The CM pml leads to silent communication failures on some machines. +# Until this is understood and fixed, keep it disabled. +export OMPI_MCA_pml='^cm' + +# Launch the controller and follower processes +mpirun -n 1 cmsRun ${CONTROLLER} : -n 1 cmsRun ${FOLLOWER} diff --git a/HeterogeneousCore/MPICore/test/testMPIInterCommunicator.sh b/HeterogeneousCore/MPICore/test/testMPIInterCommunicator.sh new file mode 100755 index 0000000000000..c33ccd0ebe004 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/testMPIInterCommunicator.sh @@ -0,0 +1,71 @@ +#! /bin/bash +# Shell script for testing CMSSW over MPI +CONTROLLER=$(realpath $1) +FOLLOWER=$(realpath $2) + +# Make sure the CMSSW environment has been loaded. +if [ -z "$CMSSW_BASE" ]; then + eval `scram runtime -sh` +fi + +# The CM pml leads to silent communication failures on some machines. +# Until this is understood and fixed, keep it disabled. +export OMPI_MCA_pml='^cm' + +mkdir -p $CMSSW_BASE/tmp/$SCRAM_ARCH/test +DIR=$(mktemp -d -p $CMSSW_BASE/tmp/$SCRAM_ARCH/test) +echo "Running MPI tests at $DIR/" +pushd $DIR > /dev/null + +# Start an MPI server to let independent CMSSW processes find each other. +echo "Starting the Open RTE data server" +ompi-server -r server.uri -d >& ompi-server.log & +SERVER_PID=$! +disown +# wait until the ORTE server logs 'up and running' +while ! grep -q 'up and running' ompi-server.log; do + sleep 1s +done + +# Note: "mpirun --mca pmix_server_uri file:server.uri" is required to make the +# tests work inside a singularity/apptainer container. Without a container the +# cmsRun commands can be used directly. + +# Start the "follower" CMSSW job(s). +{ + mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $FOLLOWER >& follower.log + echo $? > follower.status +} & +FOLLOWER_PID=$! + +# Wait until the MPISource has established the connection to the ORTE server. +while ! grep -q 'waiting for a connection to the MPI server' follower.log; do + sleep 1s +done + +# Start the "controller" CMSSW job(s). +{ + mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $CONTROLLER >& controller.log + echo $? > controller.status +} & +CONTROLLER_PID=$! + +# Wait for the CMSSW jobs to finish. +wait $CONTROLLER_PID $FOLLOWER_PID + +# Print the jobs' output and check the jobs' exit status. +echo '========== testMPIController ===========' +cat controller.log +MPICONTROLLER_STATUS=$(< controller.status) +echo '========================================' +echo +echo '=========== testMPIFollower ============' +cat follower.log +MPISOURCE_STATUS=$(< follower.status) +echo '========================================' + +# Stop the MPI server and cleanup the URI file. +kill $SERVER_PID + +popd > /dev/null +exit $((MPISOURCE_STATUS > MPICONTROLLER_STATUS ? MPISOURCE_STATUS : MPICONTROLLER_STATUS))