diff --git a/HeterogeneousCore/MPICore/BuildFile.xml b/HeterogeneousCore/MPICore/BuildFile.xml
new file mode 100644
index 0000000000000..235463762b1ae
--- /dev/null
+++ b/HeterogeneousCore/MPICore/BuildFile.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
diff --git a/HeterogeneousCore/MPICore/README.md b/HeterogeneousCore/MPICore/README.md
new file mode 100644
index 0000000000000..38ab5ebe80343
--- /dev/null
+++ b/HeterogeneousCore/MPICore/README.md
@@ -0,0 +1,105 @@
+# Extend CMSSW to a fully distributed application
+
+Let multiple CMSSW processes on the same or different machines coordinate event processing and transfer data products
+over MPI.
+
+The implementation is based on four CMSSW modules.
+Two are responsible for setting up the communication channels and coordinate the event processing:
+ - the `MPIController`
+ - the `MPISource`
+
+and two are responsible for the transfer of data products:
+ - the `MPISender`
+ - the `MPIReceiver`
+
+.
+
+## `MPIController` class
+
+The `MPIController` is an `EDProducer` running in a regular CMSSW process. After setting up the communication with an
+`MPISource`, it transmits to it all EDM run, lumi and event transitions, and instructs the `MPISource` to replicate them
+in the second process.
+
+
+## `MPISource` class
+
+The `MPISource` is a `Source` controlling the execution of a second CMSSW process. After setting up the communication
+with an `MPIController`, it listens for EDM run, lumi and event transitions, and replicates them in its own process.
+
+
+## `MPISender` class
+
+The `MPISender` is an `EDProducer` that can read any number of collections of arbitrary types from the `Event`.
+For each event, it first sends a metadata message describing the products to be transferred, including their number and
+basic characteristics.
+
+If `TrivialCopyTraits` are defined for a given product, the data are transferred directly from the product's memory
+regions; otherwise, the product is serialised into a single buffer using its ROOT dictionary. The regions and the buffer
+are sent to another process over the MPI communication channel.
+
+The number and types of the collections to be read from the `Event` is determined by the module configuration. The
+configuration can speficy a list of module labels, branch names, or a mix of the two:
+ - a module label selects all collections produced by that module, irrespective of the type and instance;
+ - a branch name selects only the collections that match all the branch fields (type, label, instance, process name),
+ similar to an `OutputModule`'s `"keep ..."` statement.
+
+Wildcards (`?` and `*`) are allowed in a module label or in each field of a branch name.
+
+
+## `MPIReceiver` class
+
+The `MPIReceiver` is an `EDProducer` that can receive any number of collections of arbitrary types over the MPI
+communication channel. It first receives metadata, which is leter used to initialise trivially copyable products and
+allocate buffers for serialised products.
+
+For trivially copyable products, the receiver initialises the target objects using the metadata and performs an
+`MPI_Recv` for each memory region. For non-trivially copyable products, it receives the serialised buffer and
+deserialises it using the corresponding ROOT dictionary.
+
+All received products are put into the `Event`. The number, type and label of the collections to be produced is
+determined by the module configuration.
+
+For each collection, the `type` indicates the C++ type as understood by the ROOT dictionary, and the `label` indicates
+the module instance label to be used for producing that cllection into the `Event`.
+
+
+## `MPISender` and `MPIReceiver` instances
+
+Both `MPISender` and `MPIReceiver` are configured with an instance value that is used to match one `MPISender` in one
+process to one `MPIReceiver` in another process. Using different instance values allows the use of multiple pairs of
+`MPISender`/`MPIReceiver` modules in a process.
+
+
+## MPI communication channel
+
+The `MPIController` and `MPISource` produce an `MPIToken`, a special data product that encapsulates the information
+about the MPI communication channel.
+
+Both `MPISender` and `MPIReceiver` obtain the MPI communication channel reading an `MPIToken` from the event, identified
+by the `upstream` parmeter.
+They also produce a copy of the `MPIToken`, so other modules can consume it to declare a dependency on those modules.
+
+
+## Testing
+
+An automated test is available in the `test/` directory.
+
+
+## Current limitations
+
+ - `MPIController` is a "one" module that supports only a single luminosity block at a time;
+ - there is only a partial check the number, type and order of collections sent by the `MPISender` matches those
+ expected by the `MPIReceiver`.
+
+
+## Notes for future developments
+
+ - implement efficient GPU-direct transfers for trivially serialisable products (in progress);
+ - check the the collection sent by the `MPISender` and the one expected by the `MPIReceiver` match;
+ - integrate filter decisions and GPU backend into the metadata message
+ - improve the `MPIController` to be a `global` module rather than a `one` module;
+ - let an `MPISource` accept connections and events from multiple `MPIController` modules in different jobs;
+ - let an `MPIController` connect and sent events to multiple `MPISource` modules in different jobs (in progress);
+ - support multiple concurrent runs and luminosity blocks, up to a given maximum;
+ - when a run, luminosity block or event is received, check that they belong to the same `ProcessingHistory` as the
+ ongoing run?
diff --git a/HeterogeneousCore/MPICore/interface/MPIToken.h b/HeterogeneousCore/MPICore/interface/MPIToken.h
new file mode 100644
index 0000000000000..ec31dd2f6e7f9
--- /dev/null
+++ b/HeterogeneousCore/MPICore/interface/MPIToken.h
@@ -0,0 +1,26 @@
+#ifndef HeterogeneousCore_MPICore_MPIToken_h
+#define HeterogeneousCore_MPICore_MPIToken_h
+
+// C++ standard library headers
+#include
+
+// forward declaration
+class MPIChannel;
+
+class MPIToken {
+public:
+ // default constructor, needed to write the type's dictionary
+ MPIToken() = default;
+
+ // user-defined constructor
+ explicit MPIToken(std::shared_ptr channel) : channel_(channel) {}
+
+ // access the data member
+ MPIChannel* channel() const { return channel_.get(); }
+
+private:
+ // wrap the MPI communicator and destination
+ std::shared_ptr channel_;
+};
+
+#endif // HeterogeneousCore_MPICore_MPIToken_h
diff --git a/HeterogeneousCore/MPICore/interface/api.h b/HeterogeneousCore/MPICore/interface/api.h
new file mode 100644
index 0000000000000..8cfc0d2bb1295
--- /dev/null
+++ b/HeterogeneousCore/MPICore/interface/api.h
@@ -0,0 +1,182 @@
+#ifndef HeterogeneousCore_MPICore_interface_api_h
+#define HeterogeneousCore_MPICore_interface_api_h
+
+// C++ standard library headers
+#include
+#include
+#include
+#include
+
+// MPI headers
+#include
+
+// ROOT headers
+#include
+
+// CMSSW headers
+#include "DataFormats/Common/interface/WrapperBase.h"
+#include "DataFormats/Provenance/interface/ProvenanceFwd.h"
+#include "FWCore/Reflection/interface/ObjectWithDict.h"
+#include "HeterogeneousCore/MPICore/interface/messages.h"
+#include "HeterogeneousCore/MPICore/interface/metadata.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/WriterBase.h"
+
+class MPIChannel {
+public:
+ MPIChannel() = default;
+ MPIChannel(MPI_Comm comm, int destination) : comm_(comm), dest_(destination) {}
+
+ // build a new MPIChannel that uses a duplicate of the underlying communicator and the same destination
+ MPIChannel duplicate() const;
+
+ // close the underlying communicator and reset the MPIChannel to an invalid state
+ void reset();
+
+ // announce that a client has just connected
+ void sendConnect() { sendEmpty_(EDM_MPI_Connect); }
+
+ // announce that the client will disconnect
+ void sendDisconnect() { sendEmpty_(EDM_MPI_Disconnect); }
+
+ // signal the begin of stream
+ void sendBeginStream() { sendEmpty_(EDM_MPI_BeginStream); }
+
+ // signal the end of stream
+ void sendEndStream() { sendEmpty_(EDM_MPI_EndStream); }
+
+ // signal a new run, and transmit the RunAuxiliary
+ void sendBeginRun(edm::RunAuxiliary const& aux) { sendRunAuxiliary_(EDM_MPI_BeginRun, aux); }
+
+ // signal the end of run, and re-transmit the RunAuxiliary
+ void sendEndRun(edm::RunAuxiliary const& aux) { sendRunAuxiliary_(EDM_MPI_EndRun, aux); }
+
+ // signal a new luminosity block, and transmit the LuminosityBlockAuxiliary
+ void sendBeginLuminosityBlock(edm::LuminosityBlockAuxiliary const& aux) {
+ sendLuminosityBlockAuxiliary_(EDM_MPI_BeginLuminosityBlock, aux);
+ }
+
+ // signal the end of luminosity block, and re-transmit the LuminosityBlockAuxiliary
+ void sendEndLuminosityBlock(edm::LuminosityBlockAuxiliary const& aux) {
+ sendLuminosityBlockAuxiliary_(EDM_MPI_EndLuminosityBlock, aux);
+ }
+
+ // signal a new event, and transmit the EventAuxiliary
+ void sendEvent(edm::EventAuxiliary const& aux) { sendEventAuxiliary_(aux); }
+
+ /*
+ // start processing a new event, and receive the EventAuxiliary
+ MPI_Status receiveEvent(edm::EventAuxiliary& aux, int source) {
+ return receiveEventAuxiliary_(aux, source, EDM_MPI_ProcessEvent);
+ }
+ */
+
+ // start processing a new event, and receive the EventAuxiliary
+ MPI_Status receiveEvent(edm::EventAuxiliary& aux, MPI_Message& message) {
+ return receiveEventAuxiliary_(aux, message);
+ }
+
+ void sendMetadata(int instance, std::shared_ptr meta);
+ void receiveMetadata(int instance, std::shared_ptr meta);
+
+ // send buffer of serialized products
+ void sendBuffer(const void* buf, size_t size, int instance, EDM_MPI_MessageTag tag);
+
+ // serialize an object of type T using its ROOT dictionary, and transmit it
+ template
+ void sendProduct(int instance, T const& product) {
+ if constexpr (std::is_fundamental_v) {
+ sendTrivialProduct_(instance, product);
+ } else {
+ static const TClass* type = TClass::GetClass();
+ if (!type) {
+ throw std::runtime_error("ROOT dictionary not found for type " + std::string(typeid(T).name()));
+ }
+ sendSerializedProduct_(instance, type, &product);
+ }
+ }
+
+ // receive and object of type T, and deserialize it using its ROOT dictionary
+ template
+ void receiveProduct(int instance, T& product) {
+ if constexpr (std::is_fundamental_v) {
+ receiveTrivialProduct_(instance, product);
+ } else {
+ static const TClass* type = TClass::GetClass();
+ if (!type) {
+ throw std::runtime_error("ROOT dictionary not found for type " + std::string(typeid(T).name()));
+ }
+ receiveSerializedProduct_(instance, type, &product);
+ }
+ }
+
+ // serialize a generic object using its ROOT dictionary, and send the binary blob
+ void sendSerializedProduct_(int instance, TClass const* type, void const* product);
+
+ // receive a binary blob, and deserialize an object of generic type using its ROOT dictionary
+ void receiveSerializedProduct_(int instance, TClass const* type, void* product);
+
+ // receive product buffer of known size
+ std::unique_ptr receiveSerializedBuffer(int instance, int bufSize);
+
+ // transfer a wrapped object using its MemoryCopyTraits
+ void sendTrivialCopyProduct(int instance, const ngt::ReaderBase& reader);
+
+ // receive into wrapped object
+ void receiveInitializedTrivialCopy(int instance, ngt::WriterBase& writer);
+
+private:
+ // serialize an EDM object to a simplified representation that can be transmitted as an MPI message
+ void edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux);
+ void edmToBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux);
+ void edmToBuffer_(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux);
+
+ // dwserialize an EDM object from a simplified representation transmitted as an MPI message
+ void edmFromBuffer_(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux);
+ void edmFromBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux);
+ void edmFromBuffer_(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux);
+
+ // fill and send an EDM_MPI_Empty_t buffer
+ void sendEmpty_(int tag);
+
+ // fill and send an EDM_MPI_RunAuxiliary_t buffer
+ void sendRunAuxiliary_(int tag, edm::RunAuxiliary const& aux);
+
+ // fill and send an EDM_MPI_LuminosityBlock_t buffer
+ void sendLuminosityBlockAuxiliary_(int tag, edm::LuminosityBlockAuxiliary const& aux);
+
+ // fill and send an EDM_MPI_EventAuxiliary_t buffer
+ void sendEventAuxiliary_(edm::EventAuxiliary const& aux);
+
+ // receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary
+ MPI_Status receiveEventAuxiliary_(edm::EventAuxiliary& aux, int source, int tag);
+ MPI_Status receiveEventAuxiliary_(edm::EventAuxiliary& aux, MPI_Message& message);
+
+ // this is what is used for sending when product is of raw fundamental type
+ template
+ void sendTrivialProduct_(int instance, T const& product) {
+ int tag = EDM_MPI_SendTrivialProduct | instance * EDM_MPI_MessageTagWidth_;
+ MPI_Send(&product, sizeof(T), MPI_BYTE, dest_, tag, comm_);
+ }
+
+ // this is what is used when product is of raw fundamental type
+ template
+ void receiveTrivialProduct_(int instance, T& product) {
+ int tag = EDM_MPI_SendTrivialProduct | instance * EDM_MPI_MessageTagWidth_;
+ MPI_Message message;
+ MPI_Status status;
+ MPI_Mprobe(dest_, tag, comm_, &message, &status);
+ int size;
+ MPI_Get_count(&status, MPI_BYTE, &size);
+ assert(static_cast(sizeof(T)) == size);
+ MPI_Mrecv(&product, size, MPI_BYTE, &message, MPI_STATUS_IGNORE);
+ }
+
+ // MPI intercommunicator
+ MPI_Comm comm_ = MPI_COMM_NULL;
+
+ // MPI destination
+ int dest_ = MPI_UNDEFINED;
+};
+
+#endif // HeterogeneousCore_MPICore_interface_api_h
diff --git a/HeterogeneousCore/MPICore/interface/conversion.h b/HeterogeneousCore/MPICore/interface/conversion.h
new file mode 100644
index 0000000000000..3ac4062b3d845
--- /dev/null
+++ b/HeterogeneousCore/MPICore/interface/conversion.h
@@ -0,0 +1,26 @@
+#ifndef HeterogeneousCore_MPICore_interface_conversion_h
+#define HeterogeneousCore_MPICore_interface_conversion_h
+
+// CMSSW headers
+#include "DataFormats/Provenance/interface/ProvenanceFwd.h"
+#include "HeterogeneousCore/MPICore/interface/messages.h"
+
+// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary_t buffer
+void edmFromBuffer(EDM_MPI_RunAuxiliary_t const &, edm::RunAuxiliary &);
+
+// fill an EDM_MPI_RunAuxiliary_t buffer from an edm::RunAuxiliary object
+void edmToBuffer(EDM_MPI_RunAuxiliary_t &, edm::RunAuxiliary const &);
+
+// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary_t buffer
+void edmFromBuffer(EDM_MPI_LuminosityBlockAuxiliary_t const &, edm::LuminosityBlockAuxiliary &);
+
+// fill an EDM_MPI_LuminosityBlockAuxiliary_t buffer from an edm::LuminosityBlockAuxiliary object
+void edmToBuffer(EDM_MPI_LuminosityBlockAuxiliary_t &, edm::LuminosityBlockAuxiliary const &);
+
+// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary_t buffer
+void edmFromBuffer(EDM_MPI_EventAuxiliary_t const &, edm::EventAuxiliary &);
+
+// fill an EDM_MPI_EventAuxiliary_t buffer from an edm::EventAuxiliary object
+void edmToBuffer(EDM_MPI_EventAuxiliary_t &, edm::EventAuxiliary const &);
+
+#endif // HeterogeneousCore_MPICore_interface_conversion_h
diff --git a/HeterogeneousCore/MPICore/interface/messages.h b/HeterogeneousCore/MPICore/interface/messages.h
new file mode 100644
index 0000000000000..de07f6ac0db67
--- /dev/null
+++ b/HeterogeneousCore/MPICore/interface/messages.h
@@ -0,0 +1,119 @@
+#ifndef HeterogeneousCore_MPICore_interface_messages_h
+#define HeterogeneousCore_MPICore_interface_messages_h
+
+// C++ standard library headers
+#include
+
+// MPI headers
+#include
+
+/* register the MPI message types forthe EDM communication
+ */
+void EDM_MPI_build_types();
+
+/* MPI message tags corresponding to EDM transitions
+ */
+enum EDM_MPI_MessageTag {
+ EDM_MPI_Connect,
+ EDM_MPI_Disconnect,
+ EDM_MPI_BeginStream,
+ EDM_MPI_EndStream,
+ EDM_MPI_BeginRun,
+ EDM_MPI_EndRun,
+ EDM_MPI_BeginLuminosityBlock,
+ EDM_MPI_EndLuminosityBlock,
+ EDM_MPI_ProcessEvent,
+ EDM_MPI_SendMetadata,
+ EDM_MPI_SendSerializedProduct,
+ EDM_MPI_SendTrivialProduct,
+ EDM_MPI_SendTrivialCopyProduct,
+ EDM_MPI_MessageTagCount_
+};
+
+/* Ensure that the MPI message tags can fit in a single byte
+ */
+inline constexpr int EDM_MPI_MessageTagWidth_ = 256;
+static_assert(EDM_MPI_MessageTagCount_ <= EDM_MPI_MessageTagWidth_);
+
+extern MPI_Datatype EDM_MPI_MessageType[EDM_MPI_MessageTagCount_];
+
+/* Common header for EDM MPI messages, containing
+ * - the message type (to allow decoding the message further)
+ */
+struct __attribute__((aligned(8))) EDM_MPI_Header_t {
+ uint32_t messageTag; // EDM_MPI_MessageTag
+};
+
+/* Empty EDM MPI message, used when only the header is needed
+ */
+struct EDM_MPI_Empty_t : public EDM_MPI_Header_t {};
+
+// corresponding MPI type
+extern MPI_Datatype EDM_MPI_Empty;
+
+/* Run information stored in edm::RunAuxiliary,
+ * augmented with the MPI message id
+ *
+ * See DataFormats/Provenance/interface/RunAuxiliary.h
+ */
+struct EDM_MPI_RunAuxiliary_t : public EDM_MPI_Header_t {
+ // from DataFormats/Provenance/interface/RunAuxiliary.h
+ char processHistoryID[16]; // edm::ProcessHistoryID::compactForm()
+ uint64_t beginTime; // edm::TimeValue_t
+ uint64_t endTime; // edm::TimeValue_t
+ uint32_t run; // edm::RunNumber_t
+};
+
+// corresponding MPI type
+extern MPI_Datatype EDM_MPI_RunAuxiliary;
+
+/* LuminosityBlock information stored in edm::LuminosityBlockAuxiliary,
+ * augmented with the MPI message id
+ *
+ * See DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h
+ */
+struct EDM_MPI_LuminosityBlockAuxiliary_t : public EDM_MPI_Header_t {
+ // from DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h
+ char processHistoryID[16]; // edm::ProcessHistoryID::compactForm()
+ uint64_t beginTime; // edm::TimeValue_t
+ uint64_t endTime; // edm::TimeValue_t
+ uint32_t run; // edm::RunNumber_t
+ uint32_t lumi; // edm::LuminosityBlockNumber_t
+};
+
+// corresponding MPI type
+extern MPI_Datatype EDM_MPI_LuminosityBlockAuxiliary;
+
+/* Event information stored in edm::EventAuxiliary,
+ * augmented with the MPI message id
+ *
+ * See DataFormats/Provenance/interface/EventAuxiliary.h
+ */
+struct EDM_MPI_EventAuxiliary_t : public EDM_MPI_Header_t {
+ // from DataFormats/Provenance/interface/EventAuxiliary.h
+ char processHistoryID[16]; // edm::ProcessHistoryID::compactForm()
+ char processGuid[16]; // process GUID
+ uint64_t time; // edm::TimeValue_t
+ int32_t realData; // real data (true) vs simulation (false)
+ int32_t experimentType; // edm::EventAuxiliary::ExperimentType
+ int32_t bunchCrossing; // LHC bunch crossing
+ int32_t orbitNumber; // LHC orbit number
+ int32_t storeNumber; // LHC fill number ?
+ uint32_t run; // edm::RunNumber_t
+ uint32_t lumi; // edm::LuminosityBlockNumber_t
+ uint32_t event; // edm::EventNumber_t
+};
+
+// corresponding MPI type
+extern MPI_Datatype EDM_MPI_EventAuxiliary;
+
+// union of all possible messages
+union EDM_MPI_Any_t {
+ EDM_MPI_Header_t header;
+ EDM_MPI_Empty_t empty;
+ EDM_MPI_RunAuxiliary_t runAuxiliary;
+ EDM_MPI_LuminosityBlockAuxiliary_t luminosityBlockAuxiliary;
+ EDM_MPI_EventAuxiliary_t eventAuxiliary;
+};
+
+#endif // HeterogeneousCore_MPICore_interface_messages_h
diff --git a/HeterogeneousCore/MPICore/interface/metadata.h b/HeterogeneousCore/MPICore/interface/metadata.h
new file mode 100644
index 0000000000000..f131fd1ce5c14
--- /dev/null
+++ b/HeterogeneousCore/MPICore/interface/metadata.h
@@ -0,0 +1,114 @@
+#ifndef HeterogeneousCore_MPICore_interface_metadata_h
+#define HeterogeneousCore_MPICore_interface_metadata_h
+
+// C++ standard library headers
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+// MPI headers
+#include
+
+enum ProductFlags : uint8_t {
+ HasMissing = 1 << 0,
+ HasSerialized = 1 << 1,
+ HasTrivialCopy = 1 << 2,
+};
+
+struct ProductMetadata {
+ enum class Kind : uint8_t { Missing = 0, Serialized = 1, TrivialCopy = 2 };
+
+ Kind kind;
+ size_t sizeMeta = 0;
+ const uint8_t* trivialCopyOffset = nullptr; // Only valid if kind == TrivialCopy
+};
+
+class ProductMetadataBuilder {
+public:
+ ProductMetadataBuilder();
+ explicit ProductMetadataBuilder(size_t productCount);
+ ~ProductMetadataBuilder();
+
+ // No copy
+ ProductMetadataBuilder(const ProductMetadataBuilder&) = delete;
+ ProductMetadataBuilder& operator=(const ProductMetadataBuilder&) = delete;
+
+ // Move
+ ProductMetadataBuilder(ProductMetadataBuilder&& other) noexcept;
+ ProductMetadataBuilder& operator=(ProductMetadataBuilder&& other) noexcept;
+
+ // Sender-side: pre-allocate
+ void reserve(size_t bytes);
+
+ // set or reset number of products. will fail if not set called before sending
+ void setProductCount(size_t prod_num) { productCount_ = prod_num; }
+ void setHeader();
+
+ // Sender API
+ void addMissing();
+ void addSerialized(size_t size);
+ void addTrivialCopy(const std::byte* buffer, size_t size);
+
+ const uint8_t* data() const;
+ uint8_t* data();
+ size_t size() const;
+ std::span buffer() const;
+
+ // Receiver-side
+ void receiveMetadata(int src, int tag, MPI_Comm comm);
+
+ // Not memory safe for trivial copy products.
+ // Please make sure that ProductMetadataBuilder lives longer than returned ProductMetadata
+ ProductMetadata getNext();
+
+ int64_t productCount() const { return productCount_; }
+ int64_t serializedBufferSize() const { return serializedBufferSize_; }
+ bool hasMissing() const { return productFlags_ & HasMissing; }
+ bool hasSerialized() const { return productFlags_ & HasSerialized; }
+ bool hasTrivialCopy() const { return productFlags_ & HasTrivialCopy; }
+
+ void debugPrintMetadataSummary() const;
+
+private:
+ uint8_t* buffer_;
+ size_t capacity_;
+ size_t size_;
+ size_t readOffset_;
+ const size_t headerSize_ = 13; // header is always present in the metadata object do describe products in general
+ const size_t maxMetadataSize_ = 1024; // default size for buffer initialization. must fit any metadata
+ int serializedBufferSize_ = 0;
+ uint8_t productFlags_ = 0;
+ int64_t productCount_ = 0;
+
+ void resizeBuffer(size_t newCap);
+ void ensureCapacity(size_t needed);
+
+ void appendBytes(const std::byte* src, size_t size);
+
+ template
+ void append(T value) {
+ static_assert(std::is_trivially_copyable_v);
+ ensureCapacity(sizeof(T));
+ std::memcpy(buffer_ + size_, &value, sizeof(T));
+ size_ += sizeof(T);
+ }
+
+ template
+ T consume() {
+ static_assert(std::is_trivially_copyable_v);
+ if (readOffset_ + sizeof(T) > size_)
+ throw std::runtime_error("Buffer underflow");
+ T val;
+ std::memcpy(&val, buffer_ + readOffset_, sizeof(T));
+ readOffset_ += sizeof(T);
+ return val;
+ }
+};
+
+#endif // HeterogeneousCore_MPICore_interface_metadata_h
diff --git a/HeterogeneousCore/MPICore/plugins/BuildFile.xml b/HeterogeneousCore/MPICore/plugins/BuildFile.xml
new file mode 100644
index 0000000000000..0b56675f5b4d6
--- /dev/null
+++ b/HeterogeneousCore/MPICore/plugins/BuildFile.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/HeterogeneousCore/MPICore/plugins/MPIController.cc b/HeterogeneousCore/MPICore/plugins/MPIController.cc
new file mode 100644
index 0000000000000..f62faaa81b287
--- /dev/null
+++ b/HeterogeneousCore/MPICore/plugins/MPIController.cc
@@ -0,0 +1,293 @@
+// C++ headers
+#include
+#include
+#include
+
+// MPI headers
+#include
+
+// ROOT headers
+#include
+#include
+
+// CMSSW headers
+#include "DataFormats/Provenance/interface/BranchKey.h"
+#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
+#include "FWCore/Framework/interface/Event.h"
+#include "FWCore/Framework/interface/LuminosityBlock.h"
+#include "FWCore/Framework/interface/MakerMacros.h"
+#include "FWCore/Framework/interface/Run.h"
+#include "FWCore/Framework/interface/one/EDProducer.h"
+#include "FWCore/MessageLogger/interface/MessageLogger.h"
+#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
+#include "FWCore/ParameterSet/interface/EmptyGroupDescription.h"
+#include "FWCore/ParameterSet/interface/ParameterDescriptionNode.h"
+#include "FWCore/ParameterSet/interface/ParameterSet.h"
+#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
+#include "FWCore/Reflection/interface/ObjectWithDict.h"
+#include "FWCore/Reflection/interface/TypeWithDict.h"
+#include "FWCore/ServiceRegistry/interface/Service.h"
+#include "FWCore/Utilities/interface/Guid.h"
+#include "HeterogeneousCore/MPICore/interface/api.h"
+#include "HeterogeneousCore/MPICore/interface/messages.h"
+#include "HeterogeneousCore/MPICore/interface/MPIToken.h"
+#include "HeterogeneousCore/MPIServices/interface/MPIService.h"
+
+/* MPIController class
+ *
+ * This module runs inside a CMSSW job (the "controller") and connects to an "MPISource" in a separate CMSSW job (the "follower").
+ * The follower is informed of all transitions seen by the controller, and can replicate them in its own process.
+ *
+ * Current limitations:
+ * - support a single "follower"
+ *
+ * Future work:
+ * - support multiple "followers"
+ */
+
+class MPIController : public edm::one::EDProducer {
+public:
+ explicit MPIController(edm::ParameterSet const& config);
+ ~MPIController() override;
+
+ void beginJob() override;
+ void endJob() override;
+
+ void beginRun(edm::Run const& run, edm::EventSetup const& setup) override;
+ void endRun(edm::Run const& run, edm::EventSetup const& setup) override;
+
+ void beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) override;
+ void endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) override;
+
+ void produce(edm::Event& event, edm::EventSetup const& setup) override;
+
+ static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
+
+private:
+ enum Mode { kInvalid = 0, kCommWorld, kIntercommunicator };
+ static constexpr const char* ModeDescription[] = {"Invalid", "CommWorld", "Intercommunicator"};
+ Mode parseMode(std::string const& label) {
+ if (label == ModeDescription[kCommWorld])
+ return kCommWorld;
+ else if (label == ModeDescription[kIntercommunicator])
+ return kIntercommunicator;
+ else
+ return kInvalid;
+ }
+
+ MPI_Comm comm_ = MPI_COMM_NULL;
+ MPIChannel channel_;
+ edm::EDPutTokenT token_;
+ Mode mode_;
+};
+
+MPIController::MPIController(edm::ParameterSet const& config)
+ : token_(produces()),
+ mode_(parseMode(config.getUntrackedParameter("mode"))) //
+{
+ // make sure that MPI is initialised
+ MPIService::required();
+
+ // make sure the EDM MPI types are available
+ EDM_MPI_build_types();
+
+ if (mode_ == kCommWorld) {
+ // All processes are in MPI_COMM_WORLD.
+ // The current implementation supports only two processes: one controller and one source.
+ edm::LogAbsolute("MPI") << "MPIController in " << ModeDescription[mode_] << " mode.";
+
+ // Check how many processes are there in MPI_COMM_WORLD
+ int size;
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
+ if (size != 2) {
+ throw edm::Exception(edm::errors::Configuration)
+ << "The current implementation supports only two processes: one controller and one source.";
+ }
+
+ // Check the rank of this process, and determine the rank of the other process.
+ int rank;
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+ edm::LogAbsolute("MPI") << "MPIController has rank " << rank << " in MPI_COMM_WORLD.";
+ int other_rank = 1 - rank;
+ comm_ = MPI_COMM_WORLD;
+ channel_ = MPIChannel(comm_, other_rank);
+ } else if (mode_ == kIntercommunicator) {
+ // Use an intercommunicator to let two groups of processes communicate with each other.
+ // The current implementation supports only two processes: one controller and one source.
+ edm::LogAbsolute("MPI") << "MPISource in " << ModeDescription[mode_] << " mode.";
+
+ // Check how many processes are there in MPI_COMM_WORLD
+ int size;
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
+ if (size != 1) {
+ throw edm::Exception(edm::errors::Configuration)
+ << "The current implementation supports only two processes: one controller and one source.";
+ }
+
+ // Look for the port under the name indicated by the parameter "server".
+ std::string name = config.getUntrackedParameter("name", "server");
+ char port[MPI_MAX_PORT_NAME];
+ MPI_Lookup_name(name.c_str(), MPI_INFO_NULL, port);
+ edm::LogAbsolute("MPI") << "Trying to connect to the MPI server on port " << port;
+
+ // Create an intercommunicator and connect to the server.
+ MPI_Comm_connect(port, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_);
+ MPI_Comm_remote_size(comm_, &size);
+ if (size != 1) {
+ throw edm::Exception(edm::errors::Configuration)
+ << "The current implementation supports only two processes: one controller and one source.";
+ }
+ edm::LogAbsolute("MPI") << "Client connected to " << size << (size == 1 ? " server" : " servers");
+ channel_ = MPIChannel(comm_, 0);
+ } else {
+ // Invalid mode.
+ throw edm::Exception(edm::errors::Configuration)
+ << "Invalid mode \"" << config.getUntrackedParameter("mode") << "\"";
+ }
+}
+
+MPIController::~MPIController() {
+ // Close the intercommunicator.
+ if (mode_ == kIntercommunicator) {
+ MPI_Comm_disconnect(&comm_);
+ }
+}
+
+void MPIController::beginJob() {
+ // signal the connection
+ channel_.sendConnect();
+
+ /* is there a way to access all known process histories ?
+ edm::ProcessHistoryRegistry const& registry = * edm::ProcessHistoryRegistry::instance();
+ edm::LogAbsolute("MPI") << "ProcessHistoryRegistry:";
+ for (auto const& keyval: registry) {
+ edm::LogAbsolute("MPI") << keyval.first << ": " << keyval.second;
+ }
+ */
+}
+
+void MPIController::endJob() {
+ // signal the disconnection
+ channel_.sendDisconnect();
+}
+
+void MPIController::beginRun(edm::Run const& run, edm::EventSetup const& setup) {
+ // signal a new run, and transmit the RunAuxiliary
+ /* FIXME
+ * Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and
+ * we could simply do
+
+ channel_.sendBeginRun(run.runAuxiliary());
+
+ * Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the
+ * _parent_ process.
+ * So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and
+ * transmit the modified RunAuxiliary.
+ */
+ auto aux = run.runAuxiliary();
+ aux.setProcessHistoryID(run.processHistory().id());
+ channel_.sendBeginRun(aux);
+
+ // transmit the ProcessHistory
+ channel_.sendProduct(0, run.processHistory());
+}
+
+void MPIController::endRun(edm::Run const& run, edm::EventSetup const& setup) {
+ // signal the end of run
+ /* FIXME
+ * Ideally the ProcessHistoryID stored in the run.runAuxiliary() should be the correct one, and
+ * we could simply do
+
+ channel_.sendEndRun(run.runAuxiliary());
+
+ * Instead, it looks like the ProcessHistoryID stored in the run.runAuxiliary() is that of the
+ * _parent_ process.
+ * So, we make a copy of the RunAuxiliary, set the ProcessHistoryID to the correct value, and
+ * transmit the modified RunAuxiliary.
+ */
+ auto aux = run.runAuxiliary();
+ aux.setProcessHistoryID(run.processHistory().id());
+ channel_.sendEndRun(aux);
+}
+
+void MPIController::beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) {
+ // signal a new luminosity block, and transmit the LuminosityBlockAuxiliary
+ /* FIXME
+ * Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the
+ * correct one, and we could simply do
+
+ channel_.sendBeginLuminosityBlock(lumi.luminosityBlockAuxiliary());
+
+ * Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is
+ * that of the _parent_ process.
+ * So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct
+ * value, and transmit the modified LuminosityBlockAuxiliary.
+ */
+ auto aux = lumi.luminosityBlockAuxiliary();
+ aux.setProcessHistoryID(lumi.processHistory().id());
+ channel_.sendBeginLuminosityBlock(aux);
+}
+
+void MPIController::endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& setup) {
+ // signal the end of luminosity block
+ /* FIXME
+ * Ideally the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() should be the
+ * correct one, and we could simply do
+
+ channel_.sendEndLuminosityBlock(lumi.luminosityBlockAuxiliary());
+
+ * Instead, it looks like the ProcessHistoryID stored in the lumi.luminosityBlockAuxiliary() is
+ * that of the _parent_ process.
+ * So, we make a copy of the LuminosityBlockAuxiliary, set the ProcessHistoryID to the correct
+ * value, and transmit the modified LuminosityBlockAuxiliary.
+ */
+ auto aux = lumi.luminosityBlockAuxiliary();
+ aux.setProcessHistoryID(lumi.processHistory().id());
+ channel_.sendEndLuminosityBlock(aux);
+}
+
+void MPIController::produce(edm::Event& event, edm::EventSetup const& setup) {
+ {
+ edm::LogInfo log("MPI");
+ log << "processing run " << event.run() << ", lumi " << event.luminosityBlock() << ", event " << event.id().event();
+ log << "\nprocess history: " << event.processHistory();
+ log << "\nprocess history id: " << event.processHistory().id();
+ log << "\nprocess history id: " << event.eventAuxiliary().processHistoryID() << " (from eventAuxiliary)";
+ log << "\nisRealData " << event.eventAuxiliary().isRealData();
+ log << "\nexperimentType " << event.eventAuxiliary().experimentType();
+ log << "\nbunchCrossing " << event.eventAuxiliary().bunchCrossing();
+ log << "\norbitNumber " << event.eventAuxiliary().orbitNumber();
+ log << "\nstoreNumber " << event.eventAuxiliary().storeNumber();
+ log << "\nprocessHistoryID " << event.eventAuxiliary().processHistoryID();
+ log << "\nprocessGUID " << edm::Guid(event.eventAuxiliary().processGUID(), true).toString();
+ }
+
+ // signal a new event, and transmit the EventAuxiliary
+ channel_.sendEvent(event.eventAuxiliary());
+
+ // duplicate the MPIChannel and put the copy into the Event
+ std::shared_ptr link(new MPIChannel(channel_.duplicate()), [](MPIChannel* ptr) {
+ ptr->reset();
+ delete ptr;
+ });
+ event.emplace(token_, std::move(link));
+}
+
+void MPIController::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
+ descriptions.setComment(
+ "This module connects to an \"MPISource\" in a separate CMSSW job, and transmits all Run, LuminosityBlock and "
+ "Event transitions from the current process to the remote one.");
+
+ edm::ParameterSetDescription desc;
+ desc.ifValue(
+ edm::ParameterDescription("mode", "CommWorld", false),
+ ModeDescription[kCommWorld] >> edm::EmptyGroupDescription() or
+ ModeDescription[kIntercommunicator] >> edm::ParameterDescription("name", "server", false))
+ ->setComment(
+ "Valid modes are CommWorld (use MPI_COMM_WORLD) and Intercommunicator (use an MPI name server to setup an "
+ "intercommunicator).");
+
+ descriptions.addWithDefaultLabel(desc);
+}
+
+DEFINE_FWK_MODULE(MPIController);
diff --git a/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc
new file mode 100644
index 0000000000000..b6b36baffc067
--- /dev/null
+++ b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc
@@ -0,0 +1,196 @@
+// C++ include files
+#include
+#include
+#include
+#include
+
+// ROOT headers
+#include
+#include
+
+// CMSSW include files
+#include "FWCore/Concurrency/interface/Async.h"
+#include "FWCore/Concurrency/interface/chain_first.h"
+#include "FWCore/Framework/interface/Event.h"
+#include "FWCore/Framework/interface/MakerMacros.h"
+#include "FWCore/Framework/interface/WrapperBaseOrphanHandle.h"
+#include "FWCore/Framework/interface/global/EDProducer.h"
+#include "FWCore/Framework/interface/stream/EDProducer.h"
+#include "FWCore/MessageLogger/interface/MessageLogger.h"
+#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
+#include "FWCore/ParameterSet/interface/ParameterSet.h"
+#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
+#include "FWCore/ServiceRegistry/interface/Service.h"
+#include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
+#include "FWCore/Utilities/interface/Exception.h"
+#include "HeterogeneousCore/MPICore/interface/api.h"
+#include "HeterogeneousCore/MPICore/interface/MPIToken.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/AnyBuffer.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserBase.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserFactory.h"
+
+class MPIReceiver : public edm::stream::EDProducer {
+public:
+ MPIReceiver(edm::ParameterSet const& config)
+ : upstream_(consumes(config.getParameter("upstream"))),
+ token_(produces()),
+ instance_(config.getParameter("instance")) //
+ {
+ // instance 0 is reserved for the MPIController / MPISource pair
+ // instance values greater than 255 may not fit in the MPI tag
+ if (instance_ < 1 or instance_ > 255) {
+ throw cms::Exception("InvalidValue")
+ << "Invalid MPIReceiver instance value, please use a value between 1 and 255";
+ }
+
+ auto const& products = config.getParameter>("products");
+ products_.reserve(products.size());
+ for (auto const& product : products) {
+ auto const& type = product.getParameter("type");
+ auto const& label = product.getParameter("label");
+ Entry entry;
+ entry.type = edm::TypeWithDict::byName(type);
+ entry.wrappedType = edm::TypeWithDict::byName("edm::Wrapper<" + type + ">");
+ entry.token = produces(edm::TypeID{entry.type.typeInfo()}, label);
+
+ LogTrace("MPIReceiver") << "receive type \"" << entry.type.name() << "\" for label \"" << label
+ << "\" over MPI channel instance " << this->instance_;
+
+ products_.emplace_back(std::move(entry));
+ }
+ }
+
+ void acquire(edm::Event const& event, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final {
+ const MPIToken& token = event.get(upstream_);
+
+ //also try unique or optional
+ received_meta_ = std::make_shared();
+
+ edm::Service as;
+ as->runAsync(
+ std::move(holder),
+ [this, token]() { token.channel()->receiveMetadata(instance_, received_meta_); },
+ []() { return "Calling MPIReceiver::acquire()"; });
+ }
+
+ void produce(edm::Event& event, edm::EventSetup const&) final {
+ // read the MPIToken used to establish the communication channel
+ MPIToken token = event.get(upstream_);
+#ifdef EDM_ML_DEBUG
+ // dump the summary of metadata
+ received_meta_->debugPrintMetadataSummary();
+#endif
+
+ // if filter was false before the sender, receive nothing
+ if (received_meta_->productCount() == -1) {
+ event.emplace(token_, token);
+ return;
+ }
+
+ int buffer_offset = 0;
+ std::unique_ptr serialized_buffer;
+ if (received_meta_->hasSerialized()) {
+ serialized_buffer = token.channel()->receiveSerializedBuffer(instance_, received_meta_->serializedBufferSize());
+#ifdef EDM_ML_DEBUG
+ {
+ edm::LogSystem msg("MPISender");
+ msg << "Received serialised product:\n";
+ for (int i = 0; i < serialized_buffer->Length(); ++i) {
+ msg << "0x" << std::hex << std::setw(2) << std::setfill('0')
+ << (unsigned int)(unsigned char)serialized_buffer->Buffer()[i] << (i % 16 == 15 ? '\n' : ' ');
+ }
+ }
+#endif
+ }
+
+ for (auto const& entry : products_) {
+ auto product_meta = received_meta_->getNext();
+ if (product_meta.kind == ProductMetadata::Kind::Missing) {
+ edm::LogWarning("MPIReceiver") << "Product " << entry.type.name() << " was not received.";
+ continue; // Skip products that weren't sent
+ }
+
+ else if (product_meta.kind == ProductMetadata::Kind::Serialized) {
+ std::unique_ptr wrapper(
+ reinterpret_cast(entry.wrappedType.getClass()->New()));
+ auto productBuffer = TBufferFile(TBuffer::kRead, product_meta.sizeMeta);
+ // assert(!wrapper->hasTrivialCopyTraits() && "mismatch between expected and actual metadata type");
+ assert(buffer_offset < serialized_buffer->BufferSize() && "serialized data buffer is shorter than expected");
+ productBuffer.SetBuffer(serialized_buffer->Buffer() + buffer_offset, product_meta.sizeMeta, false);
+ buffer_offset += product_meta.sizeMeta;
+ entry.wrappedType.getClass()->Streamer(wrapper.get(), productBuffer);
+ // put the data into the Event
+ event.put(entry.token, std::move(wrapper));
+ }
+
+ else if (product_meta.kind == ProductMetadata::Kind::TrivialCopy) {
+ // assert(wrapper->hasTrivialCopyTraits() && "mismatch between expected and actual metadata type");
+ std::unique_ptr serialiser =
+ ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name());
+ if (not serialiser) {
+ throw cms::Exception("SerializerError") << "Receiver could not retrieve its serializer when it was expected";
+ }
+ auto writer = serialiser->writer();
+ ngt::AnyBuffer buffer = writer->uninitialized_parameters(); // constructs buffer with typeid
+ assert(buffer.size_bytes() == product_meta.sizeMeta);
+ std::memcpy(buffer.data(), product_meta.trivialCopyOffset, product_meta.sizeMeta);
+ // why both of these methods are called initialize? I find this rather confusing
+ writer->initialize(buffer);
+ token.channel()->receiveInitializedTrivialCopy(instance_, *writer);
+ writer->finalize();
+ // put the data into the Event
+ event.put(entry.token, writer->get());
+ }
+ }
+
+ if (received_meta_->hasSerialized()) {
+ assert(buffer_offset == received_meta_->serializedBufferSize() &&
+ "serialized data buffer is not equal to the expected length");
+ }
+
+ // write a shallow copy of the channel to the output, so other modules can consume it
+ // to indicate that they should run after this
+ event.emplace(token_, token);
+ }
+
+ static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
+ descriptions.setComment(
+ "This module can receive arbitrary event products from an \"MPISender\" module in a separate CMSSW job, and "
+ "produce them into the event.");
+
+ edm::ParameterSetDescription product;
+ product.add("type")->setComment("C++ type of the product to be received.");
+ product.add("label", "")->setComment("Product instance label to be associated to the product.");
+
+ edm::ParameterSetDescription desc;
+ desc.add("upstream", {"source"})
+ ->setComment(
+ "MPI communication channel. Can be an \"MPIController\", \"MPISource\", \"MPISender\" or \"MPIReceiver\". "
+ "Passing an \"MPIController\" or \"MPISource\" only identifies the pair of local and remote application "
+ "that communicate. Passing an \"MPISender\" or \"MPIReceiver\" in addition in addition imposes a "
+ "scheduling dependency.");
+ desc.addVPSet("products", product, {})
+ ->setComment("Products to be received by a separate CMSSW job and produced into the event.");
+ desc.add("instance", 0)
+ ->setComment("A value between 1 and 255 used to identify a matching pair of \"MPISender\"/\"MPIRecevier\".");
+
+ descriptions.addWithDefaultLabel(desc);
+ }
+
+private:
+ struct Entry {
+ edm::TypeWithDict type;
+ edm::TypeWithDict wrappedType;
+ edm::EDPutToken token;
+ };
+
+ edm::EDGetTokenT const upstream_; // MPIToken used to establish the communication channel
+ edm::EDPutTokenT const token_; // copy of the MPIToken that may be used to implement an ordering relation
+ std::vector products_; // data to be read over the channel and put into the Event
+ int32_t const instance_; // instance used to identify the source-destination pair
+
+ std::shared_ptr received_meta_;
+};
+
+#include "FWCore/Framework/interface/MakerMacros.h"
+DEFINE_FWK_MODULE(MPIReceiver);
diff --git a/HeterogeneousCore/MPICore/plugins/MPIReporter.cc b/HeterogeneousCore/MPICore/plugins/MPIReporter.cc
new file mode 100644
index 0000000000000..49ec608e0a0f5
--- /dev/null
+++ b/HeterogeneousCore/MPICore/plugins/MPIReporter.cc
@@ -0,0 +1,59 @@
+#include "FWCore/Framework/interface/Event.h"
+#include "FWCore/Framework/interface/stream/EDAnalyzer.h"
+#include "FWCore/MessageLogger/interface/MessageLogger.h"
+#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
+#include "FWCore/ParameterSet/interface/ParameterSet.h"
+#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
+#include "FWCore/Utilities/interface/Guid.h"
+#include "HeterogeneousCore/MPICore/interface/MPIToken.h"
+
+/* MPIReporter class
+ *
+ */
+
+class MPIReporter : public edm::stream::EDAnalyzer<> {
+public:
+ explicit MPIReporter(edm::ParameterSet const& config);
+ ~MPIReporter() override = default;
+
+ void analyze(edm::Event const& event, edm::EventSetup const& setup) override;
+
+ static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
+
+private:
+ edm::EDGetTokenT token_;
+};
+
+MPIReporter::MPIReporter(edm::ParameterSet const& config) : token_(consumes(edm::InputTag("source"))) {}
+
+void MPIReporter::analyze(edm::Event const& event, edm::EventSetup const& setup) {
+ {
+ edm::LogAbsolute log("MPI");
+ log << "stream " << event.streamID() << ": processing run " << event.run() << ", lumi " << event.luminosityBlock()
+ << ", event " << event.id().event();
+ log << "\nprocess history: " << event.processHistory();
+ log << "\nprocess history id: " << event.processHistory().id();
+ log << "\nprocess history id: " << event.eventAuxiliary().processHistoryID() << " (from eventAuxiliary)";
+ log << "\nisRealData " << event.eventAuxiliary().isRealData();
+ log << "\nexperimentType " << event.eventAuxiliary().experimentType();
+ log << "\nbunchCrossing " << event.eventAuxiliary().bunchCrossing();
+ log << "\norbitNumber " << event.eventAuxiliary().orbitNumber();
+ log << "\nstoreNumber " << event.eventAuxiliary().storeNumber();
+ log << "\nprocessHistoryID " << event.eventAuxiliary().processHistoryID();
+ log << "\nprocessGUID " << edm::Guid(event.eventAuxiliary().processGUID(), true).toString();
+ }
+
+ auto const& token = event.get(token_);
+ {
+ edm::LogAbsolute log("MPI");
+ log << "got the MPIToken opaque wrapper around the MPIChannel at " << &token;
+ }
+}
+
+void MPIReporter::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
+ edm::ParameterSetDescription desc;
+ descriptions.addWithDefaultLabel(desc);
+}
+
+#include "FWCore/Framework/interface/MakerMacros.h"
+DEFINE_FWK_MODULE(MPIReporter);
diff --git a/HeterogeneousCore/MPICore/plugins/MPISender.cc b/HeterogeneousCore/MPICore/plugins/MPISender.cc
new file mode 100644
index 0000000000000..8e87a9ae7c77b
--- /dev/null
+++ b/HeterogeneousCore/MPICore/plugins/MPISender.cc
@@ -0,0 +1,263 @@
+// C++ include files
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+// ROOT headers
+#include
+#include
+
+// CMSSW include files
+#include "DataFormats/Provenance/interface/ProductDescription.h"
+#include "DataFormats/Provenance/interface/ProductNamePattern.h"
+#include "FWCore/Concurrency/interface/Async.h"
+#include "FWCore/Concurrency/interface/chain_first.h"
+#include "FWCore/Framework/interface/Event.h"
+#include "FWCore/Framework/interface/GenericHandle.h"
+#include "FWCore/Framework/interface/MakerMacros.h"
+#include "FWCore/Framework/interface/WrapperBaseHandle.h"
+#include "FWCore/Framework/interface/global/EDProducer.h"
+#include "FWCore/Framework/interface/stream/EDProducer.h"
+#include "FWCore/MessageLogger/interface/MessageLogger.h"
+#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
+#include "FWCore/ParameterSet/interface/ParameterSet.h"
+#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
+#include "FWCore/Reflection/interface/TypeWithDict.h"
+#include "FWCore/ServiceRegistry/interface/Service.h"
+#include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
+#include "FWCore/Utilities/interface/Exception.h"
+#include "HeterogeneousCore/MPICore/interface/api.h"
+#include "HeterogeneousCore/MPICore/interface/MPIToken.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/AnyBuffer.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserBase.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserFactory.h"
+
+class MPISender : public edm::stream::EDProducer {
+public:
+ MPISender(edm::ParameterSet const& config)
+ : upstream_(consumes(config.getParameter("upstream"))),
+ token_(produces()),
+ patterns_(edm::productPatterns(config.getParameter>("products"))),
+ instance_(config.getParameter("instance")),
+ buffer_(std::make_unique(TBuffer::kWrite)),
+ buffer_offset_(0),
+ metadata_size_(0) {
+ // instance 0 is reserved for the MPIController / MPISource pair
+ // instance values greater than 255 may not fit in the MPI tag
+ if (instance_ < 1 or instance_ > 255) {
+ throw cms::Exception("InvalidValue") << "Invalid MPISender instance value, please use a value between 1 and 255";
+ }
+
+ products_.resize(patterns_.size());
+
+ callWhenNewProductsRegistered([this](edm::ProductDescription const& product) {
+ static const std::string_view kPathStatus("edm::PathStatus");
+ static const std::string_view kEndPathStatus("edm::EndPathStatus");
+
+ // std::cout << "MPISender: considering product " << product.friendlyClassName() << '_'
+ // << product.moduleLabel() << '_' << product.productInstanceName() << '_' << product.processName()
+ // << " of type " << product.unwrappedType().name() << " branch type " << product.branchType() << "\n";
+
+ switch (product.branchType()) {
+ case edm::InEvent:
+ if (product.className() == kPathStatus or product.className() == kEndPathStatus)
+ return;
+ for (size_t pattern_index = 0; pattern_index < patterns_.size(); pattern_index++) {
+ if (patterns_[pattern_index].match(product)) {
+ Entry entry;
+ entry.type = product.unwrappedType();
+ entry.wrappedType = product.wrappedType();
+ // TODO move this to EDConsumerBase::consumes() ?
+ entry.token = this->consumes(
+ edm::TypeToGet{product.unwrappedTypeID(), edm::PRODUCT_TYPE},
+ edm::InputTag{product.moduleLabel(), product.productInstanceName(), product.processName()});
+
+ LogDebug("MPISender") << "send product \"" << product.friendlyClassName() << '_' << product.moduleLabel()
+ << '_' << product.productInstanceName() << '_' << product.processName()
+ << "\" of type \"" << entry.type.name() << "\" over MPI channel instance "
+ << instance_;
+
+ products_[pattern_index] = std::move(entry);
+ break;
+ }
+ }
+ break;
+
+ case edm::InLumi:
+ case edm::InRun:
+ case edm::InProcess:
+ // lumi, run and process products are not supported
+ break;
+
+ default:
+ throw edm::Exception(edm::errors::LogicError)
+ << "Unexpected branch type " << product.branchType() << "\nPlease contact a Framework developer\n";
+ }
+ });
+
+ // TODO add an error if a pattern does not match any branches? how?
+ }
+
+ void acquire(edm::Event const& event, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final {
+ const MPIToken& token = event.get(upstream_);
+ // we need 1 byte for type, 8 bytes for size and at least 8 bytes for trivial copy parameters buffer
+ auto meta = std::make_shared(products_.size() * 24);
+ size_t index = 0;
+ // this seems to work fine, but does this vector indeed persist between acquire() and produce()?
+ // serializedBuffers_.clear();
+ buffer_->Reset();
+ buffer_offset_ = 0;
+ meta->setProductCount(products_.size());
+ has_serialized_ = false;
+ is_active_ = true;
+
+ // estimate buffer size in the constructor
+
+ for (auto const& entry : products_) {
+ // Get the product
+ edm::Handle handle(entry.type.typeInfo());
+ event.getByToken(entry.token, handle);
+
+ // product count -1 indicates that the event was filtered out on given path
+ if (not handle.isValid() and entry.type.name() == "edm::PathStateToken") {
+ meta->setProductCount(-1);
+ is_active_ = false;
+ break;
+ }
+
+ if (handle.isValid()) {
+ edm::WrapperBase const* wrapper = handle.product();
+ std::unique_ptr serialiser =
+ ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name());
+
+ if (serialiser) {
+ LogDebug("MPISender") << "Found serializer for type \"" << entry.type.name() << "\" ("
+ << entry.type.typeInfo().name() << ")";
+ auto reader = serialiser->reader(*wrapper);
+ ngt::AnyBuffer buffer = reader->parameters();
+ meta->addTrivialCopy(buffer.data(), buffer.size_bytes());
+ } else {
+ LogDebug("MPISender") << "No serializer for type \"" << entry.type.name() << "\" ("
+ << entry.type.typeInfo().name() << "), using ROOT serialization";
+ TClass* cls = entry.wrappedType.getClass();
+ if (!cls) {
+ throw cms::Exception("MPISender") << "Failed to get TClass for type: " << entry.type.name();
+ }
+ size_t bufLen = serializeAndStoreBuffer_(index, cls, wrapper);
+ meta->addSerialized(bufLen);
+ has_serialized_ = true;
+ }
+
+ } else {
+ // handle missing product
+ meta->addMissing();
+ }
+ index++;
+ }
+
+ // Submit sending of all products to run in the additional asynchronous threadpool
+ edm::Service as;
+ as->runAsync(
+ std::move(holder),
+ [this, token, meta = std::move(meta)]() { token.channel()->sendMetadata(instance_, meta); },
+ []() { return "Calling MPISender::acquire()"; });
+ }
+
+ void produce(edm::Event& event, edm::EventSetup const&) final {
+ MPIToken token = event.get(upstream_);
+
+ if (!is_active_) {
+ event.emplace(token_, token);
+ return;
+ }
+
+ if (has_serialized_) {
+#ifdef EDM_ML_DEBUG
+ {
+ edm::LogSystem msg("MPISender");
+ msg << "Sending serialised product:\n";
+ for (int i = 0; i < buffer_->Length(); ++i) {
+ msg << "0x" << std::hex << std::setw(2) << std::setfill('0')
+ << (unsigned int)(unsigned char)buffer_->Buffer()[i] << (i % 16 == 15 ? '\n' : ' ');
+ }
+ }
+#endif
+ token.channel()->sendBuffer(buffer_->Buffer(), buffer_->Length(), instance_, EDM_MPI_SendSerializedProduct);
+ }
+
+ for (auto const& entry : products_) {
+ edm::Handle handle(entry.type.typeInfo());
+ event.getByToken(entry.token, handle);
+ edm::WrapperBase const* wrapper = handle.product();
+ // we don't send missing products
+ if (handle.isValid()) {
+ std::unique_ptr serialiser =
+ ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name());
+ if (serialiser) {
+ auto reader = serialiser->reader(*wrapper);
+ token.channel()->sendTrivialCopyProduct(instance_, *reader);
+ }
+ }
+ }
+ // write a shallow copy of the channel to the output, so other modules can consume it
+ // to indicate that they should run after this
+ event.emplace(token_, token);
+ }
+
+ static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
+ descriptions.setComment(
+ "This module can consume arbitrary event products and copy them to an \"MPIReceiver\" module in a separate "
+ "CMSSW job.");
+
+ edm::ParameterSetDescription desc;
+ desc.add("upstream", {"source"})
+ ->setComment(
+ "MPI communication channel. Can be an \"MPIController\", \"MPISource\", \"MPISender\" or \"MPIReceiver\". "
+ "Passing an \"MPIController\" or \"MPISource\" only identifies the pair of local and remote application "
+ "that communicate. Passing an \"MPISender\" or \"MPIReceiver\" in addition in addition imposes a "
+ "scheduling dependency.");
+ desc.add>("products", {})
+ ->setComment(
+ "Event products to be consumed and copied over to a separate CMSSW job. Can be a list of module labels, "
+ "branch names (similar to an OutputModule's \"keep ...\" statement), or a mix of the two. Wildcards (\"?\" "
+ "and \"*\") are allowed in a module label or in each field of a branch name.");
+ desc.add("instance", 0)
+ ->setComment("A value between 1 and 255 used to identify a matching pair of \"MPISender\"/\"MPIRecevier\".");
+
+ descriptions.addWithDefaultLabel(desc);
+ }
+
+private:
+ size_t serializeAndStoreBuffer_(size_t index, TClass* type, void const* product) {
+ buffer_->ResetMap();
+ type->Streamer(const_cast(product), *buffer_);
+ size_t prod_size = buffer_->Length() - buffer_offset_;
+ buffer_offset_ = buffer_->Length();
+ return prod_size;
+ }
+
+ struct Entry {
+ edm::TypeWithDict type;
+ edm::TypeWithDict wrappedType;
+ edm::EDGetToken token;
+ };
+
+ // TODO consider if upstream_ should be a vector instead of a single token ?
+ edm::EDGetTokenT const upstream_; // MPIToken used to establish the communication channel
+ edm::EDPutTokenT const token_; // copy of the MPIToken that may be used to implement an ordering relation
+ std::vector patterns_; // branches to read from the Event and send over the MPI channel
+ std::vector products_; // types and tokens corresponding to the branches
+ int32_t const instance_; // instance used to identify the source-destination pair
+ std::unique_ptr buffer_;
+ size_t buffer_offset_;
+ size_t metadata_size_;
+ bool has_serialized_ = false;
+ bool is_active_ = true;
+};
+
+#include "FWCore/Framework/interface/MakerMacros.h"
+DEFINE_FWK_MODULE(MPISender);
diff --git a/HeterogeneousCore/MPICore/plugins/MPISource.cc b/HeterogeneousCore/MPICore/plugins/MPISource.cc
new file mode 100644
index 0000000000000..cf44ca57a3a3c
--- /dev/null
+++ b/HeterogeneousCore/MPICore/plugins/MPISource.cc
@@ -0,0 +1,332 @@
+// C++ headers
+#include
+#include
+#include
+
+// MPI headers
+#include
+
+// ROOT headers
+#include
+#include
+#include
+
+// CMSSW headers
+#include "DataFormats/Provenance/interface/BranchListIndex.h"
+#include "DataFormats/Provenance/interface/EventAuxiliary.h"
+#include "DataFormats/Provenance/interface/EventSelectionID.h"
+#include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h"
+#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
+#include "DataFormats/Provenance/interface/ProcessHistory.h"
+#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
+#include "DataFormats/Provenance/interface/RunAuxiliary.h"
+#include "FWCore/Framework/interface/Event.h"
+#include "FWCore/Framework/interface/EventPrincipal.h"
+#include "FWCore/Framework/interface/InputSource.h"
+#include "FWCore/Framework/interface/InputSourceDescription.h"
+#include "FWCore/Framework/interface/InputSourceMacros.h"
+#include "FWCore/Framework/interface/ProductProvenanceRetriever.h"
+#include "FWCore/MessageLogger/interface/ErrorObj.h"
+#include "FWCore/MessageLogger/interface/MessageLogger.h"
+#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
+#include "FWCore/ParameterSet/interface/EmptyGroupDescription.h"
+#include "FWCore/ParameterSet/interface/ParameterSet.h"
+#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
+#include "FWCore/ParameterSet/interface/ParameterSetDescriptionFiller.h"
+#include "FWCore/Sources/interface/ProducerSourceBase.h"
+#include "FWCore/Utilities/interface/EDMException.h"
+#include "HeterogeneousCore/MPICore/interface/api.h"
+#include "HeterogeneousCore/MPICore/interface/conversion.h"
+#include "HeterogeneousCore/MPICore/interface/messages.h"
+#include "HeterogeneousCore/MPICore/interface/MPIToken.h"
+#include "HeterogeneousCore/MPIServices/interface/MPIService.h"
+
+class MPISource : public edm::ProducerSourceBase {
+public:
+ explicit MPISource(edm::ParameterSet const& config, edm::InputSourceDescription const& desc);
+ ~MPISource() override;
+ using InputSource::processHistoryRegistryForUpdate;
+ using InputSource::productRegistryUpdate;
+
+ static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
+
+private:
+ bool setRunAndEventInfo(edm::EventID& id, edm::TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override;
+ void produce(edm::Event&) override;
+
+ enum Mode { kInvalid = 0, kCommWorld, kIntercommunicator };
+ static constexpr const char* ModeDescription[] = {"Invalid", "CommWorld", "Intercommunicator"};
+ Mode parseMode(std::string const& label) {
+ if (label == ModeDescription[kCommWorld])
+ return kCommWorld;
+ else if (label == ModeDescription[kIntercommunicator])
+ return kIntercommunicator;
+ else
+ return kInvalid;
+ }
+
+ char port_[MPI_MAX_PORT_NAME];
+ MPI_Comm comm_ = MPI_COMM_NULL;
+ MPIChannel channel_;
+ edm::EDPutTokenT token_;
+ Mode mode_;
+
+ edm::ProcessHistory history_;
+};
+
+MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescription const& desc)
+ : // note that almost all configuration parameters passed to IDGeneratorSourceBase via ProducerSourceBase will
+ // effectively be ignored, because this ConfigurableSource will explicitly set the run, lumi, and event
+ // numbers, the timestamp, and the event type
+ edm::ProducerSourceBase(config, desc, false),
+ token_(produces()),
+ mode_(parseMode(config.getUntrackedParameter("mode"))) //
+{
+ // make sure that MPI is initialised
+
+ MPIService::required();
+
+ // Make sure the EDM MPI types are available.
+ EDM_MPI_build_types();
+
+ if (mode_ == kCommWorld) {
+ // All processes are in MPI_COMM_WORLD.
+ // The current implementation supports only two processes: one controller and one source.
+ edm::LogAbsolute("MPI") << "MPISource in " << ModeDescription[mode_] << " mode.";
+
+ // Check how many processes are there in MPI_COMM_WORLD
+ int size;
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
+ if (size != 2) {
+ throw edm::Exception(edm::errors::Configuration)
+ << "The current implementation supports only two processes: one controller and one source.";
+ }
+
+ // Check the rank of this process, and determine the rank of the other process.
+ int rank;
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+ edm::LogAbsolute("MPI") << "MPISource has rank " << rank << " in MPI_COMM_WORLD.";
+ int other_rank = 1 - rank;
+ comm_ = MPI_COMM_WORLD;
+ channel_ = MPIChannel(comm_, other_rank);
+ } else if (mode_ == kIntercommunicator) {
+ // Use an intercommunicator to let two groups of processes communicate with each other.
+ // The current implementation supports only two processes: one controller and one source.
+ edm::LogAbsolute("MPI") << "MPISource in " << ModeDescription[mode_] << " mode.";
+
+ // Check how many processes are there in MPI_COMM_WORLD
+ int size;
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
+ if (size != 1) {
+ throw edm::Exception(edm::errors::Configuration)
+ << "The current implementation supports only two processes: one controller and one source.";
+ }
+
+ // Open a server-side port.
+ MPI_Open_port(MPI_INFO_NULL, port_);
+
+ // Publish the port under the name indicated by the parameter "server".
+ std::string name = config.getUntrackedParameter("name", "server");
+ MPI_Info port_info;
+ MPI_Info_create(&port_info);
+ MPI_Info_set(port_info, "ompi_global_scope", "true");
+ MPI_Info_set(port_info, "ompi_unique", "true");
+ MPI_Publish_name(name.c_str(), port_info, port_);
+
+ // Create an intercommunicator and accept a client connection.
+ edm::LogAbsolute("MPI") << "Waiting for a connection to the MPI server at port " << port_;
+
+ MPI_Comm_accept(port_, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm_);
+ edm::LogAbsolute("MPI") << "Connection accepted.";
+ channel_ = MPIChannel(comm_, 0);
+ } else {
+ // Invalid mode.
+ throw edm::Exception(edm::errors::Configuration)
+ << "Invalid mode \"" << config.getUntrackedParameter("mode") << "\"";
+ }
+
+ // Wait for a client to connect.
+ MPI_Status status;
+ EDM_MPI_Empty_t buffer;
+ MPI_Recv(&buffer, 1, EDM_MPI_Empty, MPI_ANY_SOURCE, EDM_MPI_Connect, comm_, &status);
+ edm::LogAbsolute("MPI") << "connected from " << status.MPI_SOURCE;
+}
+
+MPISource::~MPISource() {
+ if (mode_ == kIntercommunicator) {
+ // Close the intercommunicator.
+ MPI_Comm_disconnect(&comm_);
+
+ // Unpublish and close the port.
+ MPI_Info port_info;
+ MPI_Info_create(&port_info);
+ MPI_Info_set(port_info, "ompi_global_scope", "true");
+ MPI_Info_set(port_info, "ompi_unique", "true");
+ MPI_Unpublish_name("server", port_info, port_);
+ MPI_Close_port(port_);
+ }
+}
+
+//MPISource::ItemTypeInfo MPISource::getNextItemType() {
+bool MPISource::setRunAndEventInfo(edm::EventID& event,
+ edm::TimeValue_t& time,
+ edm::EventAuxiliary::ExperimentType& type) {
+ while (true) {
+ MPI_Status status;
+ MPI_Message message;
+ MPI_Mprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm_, &message, &status);
+ switch (status.MPI_TAG) {
+ // Connect message
+ case EDM_MPI_Connect: {
+ // receive the message header
+ EDM_MPI_Empty_t buffer;
+ MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status);
+
+ // the Connect message is unexpected here (see above)
+ throw cms::Exception("InvalidValue")
+ << "The MPISource has received an EDM_MPI_Connect message after the initial connection";
+ return false;
+ }
+
+ // Disconnect message
+ case EDM_MPI_Disconnect: {
+ // receive the message header
+ EDM_MPI_Empty_t buffer;
+ MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status);
+
+ // signal the end of the input data
+ return false;
+ }
+
+ // BeginStream message
+ case EDM_MPI_BeginStream: {
+ // receive the message header
+ EDM_MPI_Empty_t buffer;
+ MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status);
+
+ // receive the next message
+ break;
+ }
+
+ // EndStream message
+ case EDM_MPI_EndStream: {
+ // receive the message header
+ EDM_MPI_Empty_t buffer;
+ MPI_Mrecv(&buffer, 1, EDM_MPI_Empty, &message, &status);
+
+ // receive the next message
+ break;
+ }
+
+ // BeginRun message
+ case EDM_MPI_BeginRun: {
+ // receive the RunAuxiliary
+ EDM_MPI_RunAuxiliary_t buffer;
+ MPI_Mrecv(&buffer, 1, EDM_MPI_RunAuxiliary, &message, &status);
+ // TODO this is currently not used
+ edm::RunAuxiliary runAuxiliary;
+ edmFromBuffer(buffer, runAuxiliary);
+
+ // receive the ProcessHistory
+ history_.clear();
+ channel_.receiveProduct(0, history_);
+ history_.initializeTransients();
+ if (processHistoryRegistryForUpdate().registerProcessHistory(history_)) {
+ edm::LogAbsolute("MPI") << "new ProcessHistory registered: " << history_;
+ }
+
+ // receive the next message
+ break;
+ }
+
+ // EndRun message
+ case EDM_MPI_EndRun: {
+ // receive the RunAuxiliary message
+ EDM_MPI_RunAuxiliary_t buffer;
+ MPI_Mrecv(&buffer, 1, EDM_MPI_RunAuxiliary, &message, &status);
+
+ // receive the next message
+ break;
+ }
+
+ // BeginLuminosityBlock message
+ case EDM_MPI_BeginLuminosityBlock: {
+ // receive the LuminosityBlockAuxiliary
+ EDM_MPI_LuminosityBlockAuxiliary_t buffer;
+ MPI_Mrecv(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, &message, &status);
+ // TODO this is currently not used
+ edm::LuminosityBlockAuxiliary luminosityBlockAuxiliary;
+ edmFromBuffer(buffer, luminosityBlockAuxiliary);
+
+ // receive the next message
+ break;
+ }
+
+ // EndLuminosityBlock message
+ case EDM_MPI_EndLuminosityBlock: {
+ // receive the LuminosityBlockAuxiliary
+ EDM_MPI_LuminosityBlockAuxiliary_t buffer;
+ MPI_Mrecv(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, &message, &status);
+
+ // receive the next message
+ break;
+ }
+
+ // ProcessEvent message
+ case EDM_MPI_ProcessEvent: {
+ // receive the EventAuxiliary
+ edm::EventAuxiliary aux;
+ status = channel_.receiveEvent(aux, message);
+
+ // extract the rank of the other process (currently unused)
+ int source = status.MPI_SOURCE;
+ (void)source;
+
+ // fill the event details
+ event = aux.id();
+ time = aux.time().value();
+ type = aux.experimentType();
+
+ // signal a new event
+ return true;
+ }
+
+ // unexpected message
+ default: {
+ throw cms::Exception("InvalidValue")
+ << "The MPISource has received an unknown message with tag " << status.MPI_TAG;
+ return false;
+ }
+ }
+ }
+}
+
+void MPISource::produce(edm::Event& event) {
+ // duplicate the MPIChannel and put the copy into the Event
+ std::shared_ptr channel(new MPIChannel(channel_.duplicate()), [](MPIChannel* ptr) {
+ ptr->reset();
+ delete ptr;
+ });
+ event.emplace(token_, std::move(channel));
+}
+
+void MPISource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
+ descriptions.setComment(
+ "This module connects to an \"MPIController\" in a separate CMSSW job, receives all Run, LuminosityBlock and "
+ "Event transitions from the remote process and reproduces them in the local one.");
+
+ edm::ParameterSetDescription desc;
+ edm::ProducerSourceBase::fillDescription(desc);
+ desc.ifValue(
+ edm::ParameterDescription("mode", "CommWorld", false),
+ ModeDescription[kCommWorld] >> edm::EmptyGroupDescription() or
+ ModeDescription[kIntercommunicator] >> edm::ParameterDescription("name", "server", false))
+ ->setComment(
+ "Valid modes are CommWorld (use MPI_COMM_WORLD) and Intercommunicator (use an MPI name server to setup an "
+ "intercommunicator).");
+
+ descriptions.add("source", desc);
+}
+
+#include "FWCore/Framework/interface/InputSourceMacros.h"
+DEFINE_FWK_INPUT_SOURCE(MPISource);
diff --git a/HeterogeneousCore/MPICore/src/api.cc b/HeterogeneousCore/MPICore/src/api.cc
new file mode 100644
index 0000000000000..978c25c6f5d19
--- /dev/null
+++ b/HeterogeneousCore/MPICore/src/api.cc
@@ -0,0 +1,238 @@
+// C++ standard library headers
+#include
+#include
+#include
+#include
+#include
+
+// ROOT headers
+#include
+#include
+
+// CMSSW headers
+#include "DataFormats/Provenance/interface/EventAuxiliary.h"
+#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
+#include "DataFormats/Provenance/interface/RunAuxiliary.h"
+#include "FWCore/MessageLogger/interface/MessageLogger.h"
+#include "HeterogeneousCore/MPICore/interface/api.h"
+#include "HeterogeneousCore/MPICore/interface/conversion.h"
+#include "HeterogeneousCore/MPICore/interface/messages.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h"
+#include "HeterogeneousCore/TrivialSerialisation/interface/WriterBase.h"
+
+namespace {
+ // copy the content of an std::string-like object to an N-sized char buffer:
+ // if the string is larger than the buffer, copy only the first N bytes;
+ // if the string is smaller than the buffer, fill the rest of the buffer with NUL characters.
+ template
+ void copy_and_fill(char (&dest)[N], S const& src) {
+ if (std::size(src) < N) {
+ memset(dest, 0x00, N);
+ memcpy(dest, src.data(), std::size(src));
+ } else {
+ memcpy(dest, src.data(), N);
+ }
+ }
+} // namespace
+
+// build a new MPIChannel that uses a duplicate of the underlying communicator and the same destination
+MPIChannel MPIChannel::duplicate() const {
+ MPI_Comm newcomm;
+ MPI_Comm_dup(comm_, &newcomm);
+ return MPIChannel(newcomm, dest_);
+}
+
+// close the underlying communicator and reset the MPIChannel to an invalid state
+void MPIChannel::reset() {
+ MPI_Comm_disconnect(&comm_);
+ dest_ = MPI_UNDEFINED;
+}
+
+// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary buffer
+void MPIChannel::edmFromBuffer_(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux) {
+ aux = edm::RunAuxiliary(buffer.run, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime));
+ aux.setProcessHistoryID(
+ edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID))));
+}
+
+// fill an EDM_MPI_RunAuxiliary buffer from an edm::RunAuxiliary object
+void MPIChannel::edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux) {
+ copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm());
+ buffer.beginTime = aux.beginTime().value();
+ buffer.endTime = aux.endTime().value();
+ buffer.run = aux.id().run();
+}
+
+// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary buffer
+void MPIChannel::edmFromBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux) {
+ aux = edm::LuminosityBlockAuxiliary(
+ buffer.run, buffer.lumi, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime));
+ aux.setProcessHistoryID(
+ edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID))));
+}
+
+// fill an EDM_MPI_LuminosityBlockAuxiliary buffer from an edm::LuminosityBlockAuxiliary object
+void MPIChannel::edmToBuffer_(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux) {
+ copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm());
+ buffer.beginTime = aux.beginTime().value();
+ buffer.endTime = aux.endTime().value();
+ buffer.run = aux.id().run();
+ buffer.lumi = aux.id().luminosityBlock();
+}
+
+// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary buffer
+void MPIChannel::edmFromBuffer_(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux) {
+ aux = edm::EventAuxiliary({buffer.run, buffer.lumi, buffer.event},
+ std::string(buffer.processGuid, std::size(buffer.processGuid)),
+ edm::Timestamp(buffer.time),
+ buffer.realData,
+ static_cast(buffer.experimentType),
+ buffer.bunchCrossing,
+ buffer.storeNumber,
+ buffer.orbitNumber);
+ aux.setProcessHistoryID(
+ edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID))));
+}
+
+// fill an EDM_MPI_EventAuxiliary buffer from an edm::EventAuxiliary object
+void MPIChannel::edmToBuffer_(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux) {
+ copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm());
+ copy_and_fill(buffer.processGuid, aux.processGUID());
+ buffer.time = aux.time().value();
+ buffer.realData = aux.isRealData();
+ buffer.experimentType = aux.experimentType();
+ buffer.bunchCrossing = aux.bunchCrossing();
+ buffer.orbitNumber = aux.orbitNumber();
+ buffer.storeNumber = aux.storeNumber();
+ buffer.run = aux.id().run();
+ buffer.lumi = aux.id().luminosityBlock();
+ buffer.event = aux.id().event();
+}
+
+// fill and send an EDM_MPI_Empty_t buffer
+void MPIChannel::sendEmpty_(int tag) {
+ EDM_MPI_Empty_t buffer;
+ buffer.messageTag = tag;
+ MPI_Send(&buffer, 1, EDM_MPI_Empty, dest_, tag, comm_);
+}
+
+// fill and send an EDM_MPI_RunAuxiliary_t buffer
+void MPIChannel::sendRunAuxiliary_(int tag, edm::RunAuxiliary const& aux) {
+ EDM_MPI_RunAuxiliary_t buffer;
+ buffer.messageTag = tag;
+ edmToBuffer_(buffer, aux);
+ MPI_Send(&buffer, 1, EDM_MPI_RunAuxiliary, dest_, tag, comm_);
+}
+
+// fill and send an EDM_MPI_RunAuxiliary_t buffer
+void MPIChannel::sendLuminosityBlockAuxiliary_(int tag, edm::LuminosityBlockAuxiliary const& aux) {
+ EDM_MPI_LuminosityBlockAuxiliary_t buffer;
+ buffer.messageTag = tag;
+ edmToBuffer_(buffer, aux);
+ MPI_Send(&buffer, 1, EDM_MPI_LuminosityBlockAuxiliary, dest_, tag, comm_);
+}
+
+// fill and send an EDM_MPI_EventAuxiliary_t buffer
+void MPIChannel::sendEventAuxiliary_(edm::EventAuxiliary const& aux) {
+ EDM_MPI_EventAuxiliary_t buffer;
+ buffer.messageTag = EDM_MPI_ProcessEvent;
+ edmToBuffer_(buffer, aux);
+ MPI_Send(&buffer, 1, EDM_MPI_EventAuxiliary, dest_, EDM_MPI_ProcessEvent, comm_);
+}
+
+/*
+// receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary
+MPI_Status MPIChannel::receiveEventAuxiliary_(edm::EventAuxiliary& aux, int source, int tag) {
+ MPI_Status status;
+ EDM_MPI_EventAuxiliary_t buffer;
+ MPI_Recv(&buffer, 1, EDM_MPI_EventAuxiliary, source, tag, comm_, &status);
+ edmFromBuffer_(buffer, aux);
+ return status;
+}
+*/
+
+// receive an EDM_MPI_EventAuxiliary_t buffer and populate an edm::EventAuxiliary
+MPI_Status MPIChannel::receiveEventAuxiliary_(edm::EventAuxiliary& aux, MPI_Message& message) {
+ MPI_Status status = {};
+ EDM_MPI_EventAuxiliary_t buffer;
+#ifdef EDM_ML_DEBUG
+ memset(&buffer, 0x00, sizeof(buffer));
+#endif
+ status.MPI_ERROR = MPI_Mrecv(&buffer, 1, EDM_MPI_EventAuxiliary, &message, &status);
+ edmFromBuffer_(buffer, aux);
+ return status;
+}
+
+void MPIChannel::sendMetadata(int instance, std::shared_ptr meta) {
+ int tag = EDM_MPI_SendMetadata | instance * EDM_MPI_MessageTagWidth_;
+ meta->setHeader();
+ MPI_Ssend(meta->data(), meta->size(), MPI_BYTE, dest_, tag, comm_);
+}
+
+void MPIChannel::receiveMetadata(int instance, std::shared_ptr meta) {
+ int tag = EDM_MPI_SendMetadata | instance * EDM_MPI_MessageTagWidth_;
+ meta->receiveMetadata(dest_, tag, comm_);
+}
+
+void MPIChannel::sendBuffer(const void* buf, size_t size, int instance, EDM_MPI_MessageTag tag) {
+ int commtag = tag | instance * EDM_MPI_MessageTagWidth_;
+ MPI_Send(buf, size, MPI_BYTE, dest_, commtag, comm_);
+}
+
+void MPIChannel::sendSerializedProduct_(int instance, TClass const* type, void const* product) {
+ TBufferFile buffer{TBuffer::kWrite};
+ type->Streamer(const_cast(product), buffer);
+ int tag = EDM_MPI_SendSerializedProduct | instance * EDM_MPI_MessageTagWidth_;
+ MPI_Send(buffer.Buffer(), buffer.Length(), MPI_BYTE, dest_, tag, comm_);
+}
+
+std::unique_ptr MPIChannel::receiveSerializedBuffer(int instance, int bufSize) {
+ int tag = EDM_MPI_SendSerializedProduct | instance * EDM_MPI_MessageTagWidth_;
+ MPI_Status status;
+ auto buffer = std::make_unique(TBuffer::kRead, bufSize);
+#ifdef EDM_ML_DEBUG
+ memset(buffer->Buffer(), 0xff, buffer->BufferSize());
+#endif
+ MPI_Recv(buffer->Buffer(), bufSize, MPI_BYTE, dest_, tag, comm_, &status);
+ int receivedCount = 0;
+ MPI_Get_count(&status, MPI_BYTE, &receivedCount);
+ assert(receivedCount == bufSize && "received serialized buffer size mismatches the size expected from metadata");
+ // set the buffer length
+ buffer->SetBufferOffset(receivedCount);
+ return buffer;
+}
+
+void MPIChannel::receiveSerializedProduct_(int instance, TClass const* type, void* product) {
+ int tag = EDM_MPI_SendSerializedProduct | instance * EDM_MPI_MessageTagWidth_;
+ MPI_Message message;
+ MPI_Status status;
+ MPI_Mprobe(dest_, tag, comm_, &message, &status);
+ int size;
+ MPI_Get_count(&status, MPI_BYTE, &size);
+ TBufferFile buffer{TBuffer::kRead, size};
+ MPI_Mrecv(buffer.Buffer(), size, MPI_BYTE, &message, &status);
+ type->Streamer(product, buffer);
+}
+
+void MPIChannel::sendTrivialCopyProduct(int instance, const ngt::ReaderBase& reader) {
+ int tag = EDM_MPI_SendTrivialCopyProduct | instance * EDM_MPI_MessageTagWidth_;
+ // transfer the memory regions
+ auto regions = reader.regions();
+ // TODO send the number of regions ?
+ for (size_t i = 0; i < regions.size(); ++i) {
+ assert(regions[i].data() != nullptr);
+ MPI_Send(regions[i].data(), regions[i].size_bytes(), MPI_BYTE, dest_, tag, comm_);
+ }
+}
+
+void MPIChannel::receiveInitializedTrivialCopy(int instance, ngt::WriterBase& writer) {
+ int tag = EDM_MPI_SendTrivialCopyProduct | instance * EDM_MPI_MessageTagWidth_;
+ MPI_Status status;
+ // receive the memory regions
+ auto regions = writer.regions();
+ // TODO receive and validate the number of regions ?
+ for (size_t i = 0; i < regions.size(); ++i) {
+ assert(regions[i].data() != nullptr);
+ MPI_Recv(regions[i].data(), regions[i].size_bytes(), MPI_BYTE, dest_, tag, comm_, &status);
+ }
+}
diff --git a/HeterogeneousCore/MPICore/src/classes.h b/HeterogeneousCore/MPICore/src/classes.h
new file mode 100644
index 0000000000000..6909ea88bb3c9
--- /dev/null
+++ b/HeterogeneousCore/MPICore/src/classes.h
@@ -0,0 +1,2 @@
+#include "DataFormats/Common/interface/Wrapper.h"
+#include "HeterogeneousCore/MPICore/interface/MPIToken.h"
diff --git a/HeterogeneousCore/MPICore/src/classes_def.xml b/HeterogeneousCore/MPICore/src/classes_def.xml
new file mode 100644
index 0000000000000..5f4cdb8fcbbf5
--- /dev/null
+++ b/HeterogeneousCore/MPICore/src/classes_def.xml
@@ -0,0 +1,4 @@
+
+
+
+
diff --git a/HeterogeneousCore/MPICore/src/conversion.cc b/HeterogeneousCore/MPICore/src/conversion.cc
new file mode 100644
index 0000000000000..3272bb827cda8
--- /dev/null
+++ b/HeterogeneousCore/MPICore/src/conversion.cc
@@ -0,0 +1,86 @@
+// C++ standard library headers
+#include
+#include
+
+// CMSSW headers
+#include "DataFormats/Provenance/interface/EventAuxiliary.h"
+#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
+#include "DataFormats/Provenance/interface/RunAuxiliary.h"
+#include "HeterogeneousCore/MPICore/interface/conversion.h"
+#include "HeterogeneousCore/MPICore/interface/messages.h"
+
+namespace {
+ // copy the content of an std::string-like object to an N-sized char buffer:
+ // if the string is larger than the buffer, copy only the first N bytes;
+ // if the string is smaller than the buffer, fill the rest of the buffer with NUL characters.
+ template
+ void copy_and_fill(char (&dest)[N], S const& src) {
+ if (std::size(src) < N) {
+ memset(dest, 0x00, N);
+ memcpy(dest, src.data(), std::size(src));
+ } else {
+ memcpy(dest, src.data(), N);
+ }
+ }
+} // namespace
+
+// fill an edm::RunAuxiliary object from an EDM_MPI_RunAuxiliary buffer
+void edmFromBuffer(EDM_MPI_RunAuxiliary_t const& buffer, edm::RunAuxiliary& aux) {
+ aux = edm::RunAuxiliary(buffer.run, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime));
+ aux.setProcessHistoryID(
+ edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID))));
+}
+
+// fill an EDM_MPI_RunAuxiliary buffer from an edm::RunAuxiliary object
+void edmToBuffer(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux) {
+ copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm());
+ buffer.beginTime = aux.beginTime().value();
+ buffer.endTime = aux.endTime().value();
+ buffer.run = aux.id().run();
+}
+
+// fill an edm::LuminosityBlockAuxiliary object from an EDM_MPI_LuminosityBlockAuxiliary buffer
+void edmFromBuffer(EDM_MPI_LuminosityBlockAuxiliary_t const& buffer, edm::LuminosityBlockAuxiliary& aux) {
+ aux = edm::LuminosityBlockAuxiliary(
+ buffer.run, buffer.lumi, edm::Timestamp(buffer.beginTime), edm::Timestamp(buffer.endTime));
+ aux.setProcessHistoryID(
+ edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID))));
+}
+
+// fill an EDM_MPI_LuminosityBlockAuxiliary buffer from an edm::LuminosityBlockAuxiliary object
+void edmToBuffer(EDM_MPI_LuminosityBlockAuxiliary_t& buffer, edm::LuminosityBlockAuxiliary const& aux) {
+ copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm());
+ buffer.beginTime = aux.beginTime().value();
+ buffer.endTime = aux.endTime().value();
+ buffer.run = aux.id().run();
+ buffer.lumi = aux.id().luminosityBlock();
+}
+
+// fill an edm::EventAuxiliary object from an EDM_MPI_EventAuxiliary buffer
+void edmFromBuffer(EDM_MPI_EventAuxiliary_t const& buffer, edm::EventAuxiliary& aux) {
+ aux = edm::EventAuxiliary({buffer.run, buffer.lumi, buffer.event},
+ std::string(buffer.processGuid, std::size(buffer.processGuid)),
+ edm::Timestamp(buffer.time),
+ buffer.realData,
+ static_cast(buffer.experimentType),
+ buffer.bunchCrossing,
+ buffer.storeNumber,
+ buffer.orbitNumber);
+ aux.setProcessHistoryID(
+ edm::ProcessHistoryID(std::string(buffer.processHistoryID, std::size(buffer.processHistoryID))));
+}
+
+// fill an EDM_MPI_EventAuxiliary buffer from an edm::EventAuxiliary object
+void edmToBuffer(EDM_MPI_EventAuxiliary_t& buffer, edm::EventAuxiliary const& aux) {
+ copy_and_fill(buffer.processHistoryID, aux.processHistoryID().compactForm());
+ copy_and_fill(buffer.processGuid, aux.processGUID());
+ buffer.time = aux.time().value();
+ buffer.realData = aux.isRealData();
+ buffer.experimentType = aux.experimentType();
+ buffer.bunchCrossing = aux.bunchCrossing();
+ buffer.orbitNumber = aux.orbitNumber();
+ buffer.storeNumber = aux.storeNumber();
+ buffer.run = aux.id().run();
+ buffer.lumi = aux.id().luminosityBlock();
+ buffer.event = aux.id().event();
+}
diff --git a/HeterogeneousCore/MPICore/src/macros.h b/HeterogeneousCore/MPICore/src/macros.h
new file mode 100644
index 0000000000000..142d830595775
--- /dev/null
+++ b/HeterogeneousCore/MPICore/src/macros.h
@@ -0,0 +1,149 @@
+#ifndef HeterogeneousCore_MPICore_interface_macros_h
+#define HeterogeneousCore_MPICore_interface_macros_h
+
+// C++ standard library headers
+#include
+
+// MPI headers
+#include
+
+// Boost headers
+#include
+
+namespace mpi_traits {
+ template
+ constexpr inline size_t mpi_length = 1;
+
+ template
+ constexpr inline size_t mpi_length = N;
+
+ template
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_DATATYPE_NULL;
+ };
+
+ template
+ struct mpi_type {
+ inline static const MPI_Datatype value = mpi_type::value;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_CHAR;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_UNSIGNED_CHAR;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_WCHAR;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_SHORT;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_UNSIGNED_SHORT;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_INT;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_UNSIGNED;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_LONG;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_UNSIGNED_LONG;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_LONG_LONG;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_UNSIGNED_LONG_LONG;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_FLOAT;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_DOUBLE;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_LONG_DOUBLE;
+ };
+
+ template <>
+ struct mpi_type {
+ inline static const MPI_Datatype value = MPI_BYTE;
+ };
+} // namespace mpi_traits
+
+// clang-format off
+
+#define _GET_MPI_TYPE_LENGTH_IMPL(STRUCT, FIELD) \
+ mpi_traits::mpi_length
+
+#define _GET_MPI_TYPE_LENGTH(R, STRUCT, FIELD) \
+ _GET_MPI_TYPE_LENGTH_IMPL(STRUCT, FIELD),
+
+#define _GET_MPI_TYPE_LENGTHS(STRUCT, ...) \
+ BOOST_PP_SEQ_FOR_EACH(_GET_MPI_TYPE_LENGTH, STRUCT, BOOST_PP_VARIADIC_TO_SEQ(__VA_ARGS__))
+
+#define _GET_MPI_TYPE_OFFSET_IMPL(STRUCT, FIELD) \
+ offsetof(STRUCT, FIELD)
+
+#define _GET_MPI_TYPE_OFFSET(R, STRUCT, FIELD) \
+ _GET_MPI_TYPE_OFFSET_IMPL(STRUCT, FIELD),
+
+#define _GET_MPI_TYPE_OFFSETS(STRUCT, ...) \
+ BOOST_PP_SEQ_FOR_EACH(_GET_MPI_TYPE_OFFSET, STRUCT, BOOST_PP_VARIADIC_TO_SEQ(__VA_ARGS__))
+
+#define _GET_MPI_TYPE_TYPEID_IMPL(STRUCT, FIELD) \
+ mpi_traits::mpi_type::value
+
+#define _GET_MPI_TYPE_TYPEID(R, STRUCT, FIELD) \
+ _GET_MPI_TYPE_TYPEID_IMPL(STRUCT, FIELD),
+
+#define _GET_MPI_TYPE_TYPEIDS(STRUCT, ...) \
+ BOOST_PP_SEQ_FOR_EACH(_GET_MPI_TYPE_TYPEID, STRUCT, BOOST_PP_VARIADIC_TO_SEQ(__VA_ARGS__))
+
+#define DECLARE_MPI_TYPE(TYPE, STRUCT, ...) \
+ _Pragma("GCC diagnostic push"); \
+ _Pragma("GCC diagnostic ignored \"-Winvalid-offsetof\""); \
+ { \
+ constexpr int lenghts[] = {_GET_MPI_TYPE_LENGTHS(STRUCT, __VA_ARGS__)}; \
+ constexpr MPI_Aint displacements[] = {_GET_MPI_TYPE_OFFSETS(STRUCT, __VA_ARGS__)}; \
+ const MPI_Datatype types[] = {_GET_MPI_TYPE_TYPEIDS(STRUCT, __VA_ARGS__)}; \
+ MPI_Type_create_struct(std::size(lenghts), lenghts, displacements, types, &TYPE); \
+ MPI_Type_commit(&TYPE); \
+ } \
+ _Pragma("GCC diagnostic pop")
+
+// clang-format on
+
+#endif // HeterogeneousCore_MPICore_interface_macros_h
diff --git a/HeterogeneousCore/MPICore/src/messages.cc b/HeterogeneousCore/MPICore/src/messages.cc
new file mode 100644
index 0000000000000..7bd318e4f7bb2
--- /dev/null
+++ b/HeterogeneousCore/MPICore/src/messages.cc
@@ -0,0 +1,81 @@
+// C++ standard library headers
+#include
+
+// MPI headers
+#include
+
+// Boost headers
+#include
+
+// CMSSW headers
+#include "HeterogeneousCore/MPICore/interface/messages.h"
+
+// local headers
+#include "macros.h"
+
+MPI_Datatype EDM_MPI_Empty;
+MPI_Datatype EDM_MPI_RunAuxiliary;
+MPI_Datatype EDM_MPI_LuminosityBlockAuxiliary;
+MPI_Datatype EDM_MPI_EventAuxiliary;
+
+MPI_Datatype EDM_MPI_MessageType[EDM_MPI_MessageTagCount_];
+
+void EDM_MPI_build_types_() {
+ // EDM_MPI_Empty
+ DECLARE_MPI_TYPE(EDM_MPI_Empty, // MPI_Datatype
+ EDM_MPI_Empty_t, // C++ struct
+ messageTag); // EDM_MPI_MessageTag
+
+ // EDM_MPI_RunAuxiliary
+ DECLARE_MPI_TYPE(EDM_MPI_RunAuxiliary, // MPI_Datatype
+ EDM_MPI_RunAuxiliary_t, // C++ struct
+ messageTag, // EDM_MPI_MessageTag
+ processHistoryID, // edm::ProcessHistoryID::compactForm()
+ beginTime, // edm::TimeValue_t
+ endTime, // edm::TimeValue_t
+ run); // edm::RunNumber_t
+
+ // EDM_MPI_LuminosityBlockAuxiliary
+ DECLARE_MPI_TYPE(EDM_MPI_LuminosityBlockAuxiliary, // MPI_Datatype
+ EDM_MPI_LuminosityBlockAuxiliary_t, // C++ struct
+ messageTag, // EDM_MPI_MessageTag
+ processHistoryID, // edm::ProcessHistoryID::compactForm()
+ beginTime, // edm::TimeValue_t
+ endTime, // edm::TimeValue_t
+ run, // edm::RunNumber_t
+ lumi); // edm::LuminosityBlockNumber_t
+
+ // EDM_MPI_EventAuxiliary
+ DECLARE_MPI_TYPE(EDM_MPI_EventAuxiliary, // MPI_Datatype
+ EDM_MPI_EventAuxiliary_t, // C++ struct
+ messageTag, // EDM_MPI_MessageTag
+ processHistoryID, // edm::ProcessHistoryID::compactForm()
+ processGuid, // process GUID
+ time, // edm::TimeValue_t
+ realData, // real data (true) vs simulation (false)
+ experimentType, // edm::EventAuxiliary::ExperimentType
+ bunchCrossing, // LHC bunch crossing
+ orbitNumber, // LHC orbit number
+ storeNumber, // LHC fill number ?
+ run, // edm::RunNumber_t
+ lumi, // edm::LuminosityBlockNumber_t
+ event); // edm::EventNumber_t
+
+ EDM_MPI_MessageType[EDM_MPI_Connect] = EDM_MPI_Empty; //
+ EDM_MPI_MessageType[EDM_MPI_Disconnect] = EDM_MPI_Empty; //
+ EDM_MPI_MessageType[EDM_MPI_BeginStream] = EDM_MPI_Empty; //
+ EDM_MPI_MessageType[EDM_MPI_EndStream] = EDM_MPI_Empty; //
+ EDM_MPI_MessageType[EDM_MPI_BeginRun] = EDM_MPI_RunAuxiliary; //
+ EDM_MPI_MessageType[EDM_MPI_EndRun] = EDM_MPI_RunAuxiliary; //
+ EDM_MPI_MessageType[EDM_MPI_BeginLuminosityBlock] = EDM_MPI_LuminosityBlockAuxiliary; //
+ EDM_MPI_MessageType[EDM_MPI_EndLuminosityBlock] = EDM_MPI_LuminosityBlockAuxiliary; //
+ EDM_MPI_MessageType[EDM_MPI_ProcessEvent] = EDM_MPI_EventAuxiliary; //
+ EDM_MPI_MessageType[EDM_MPI_SendSerializedProduct] = MPI_BYTE; // variable-length binary blob
+ EDM_MPI_MessageType[EDM_MPI_SendTrivialProduct] = MPI_BYTE; // variable-length binary blob
+ EDM_MPI_MessageType[EDM_MPI_SendMetadata] = MPI_BYTE; //
+}
+
+void EDM_MPI_build_types() {
+ static std::once_flag flag;
+ std::call_once(flag, EDM_MPI_build_types_);
+}
diff --git a/HeterogeneousCore/MPICore/src/metadata.cc b/HeterogeneousCore/MPICore/src/metadata.cc
new file mode 100644
index 0000000000000..f130ddf0ba080
--- /dev/null
+++ b/HeterogeneousCore/MPICore/src/metadata.cc
@@ -0,0 +1,241 @@
+// C++ standard library headers
+#include
+#include
+#include
+
+// CMSSW headers
+#include "HeterogeneousCore/MPICore/interface/metadata.h"
+
+ProductMetadataBuilder::ProductMetadataBuilder() : buffer_(nullptr), capacity_(0), size_(0), readOffset_(0) {
+ // reserve at least 13 bytes for header
+ reserve(maxMetadataSize_);
+ size_ = headerSize_;
+}
+
+ProductMetadataBuilder::ProductMetadataBuilder(size_t expectedSize)
+ : buffer_(nullptr), capacity_(0), size_(0), readOffset_(0) {
+ reserve(expectedSize + headerSize_);
+ size_ = headerSize_;
+}
+
+ProductMetadataBuilder::~ProductMetadataBuilder() { std::free(buffer_); }
+
+ProductMetadataBuilder::ProductMetadataBuilder(ProductMetadataBuilder&& other) noexcept
+ : buffer_(other.buffer_), capacity_(other.capacity_), size_(other.size_), readOffset_(other.readOffset_) {
+ other.buffer_ = nullptr;
+ other.capacity_ = 0;
+ other.size_ = 0;
+ other.readOffset_ = 0;
+}
+
+ProductMetadataBuilder& ProductMetadataBuilder::operator=(ProductMetadataBuilder&& other) noexcept {
+ if (this != &other) {
+ std::free(buffer_);
+ buffer_ = other.buffer_;
+ capacity_ = other.capacity_;
+ size_ = other.size_;
+ readOffset_ = other.readOffset_;
+ other.buffer_ = nullptr;
+ other.capacity_ = 0;
+ other.size_ = 0;
+ other.readOffset_ = 0;
+ }
+ return *this;
+}
+
+void ProductMetadataBuilder::reserve(size_t bytes) {
+ if (capacity_ >= bytes)
+ return;
+ resizeBuffer(bytes);
+}
+
+void ProductMetadataBuilder::setHeader() {
+ assert(size_ >= headerSize_ && "Buffer must reserve space for header");
+ std::memcpy(buffer_, &productCount_, sizeof(int64_t)); // first 8 bytes
+ buffer_[8] = productFlags_; // indicate which products are present
+ std::memcpy(buffer_ + 9, &serializedBufferSize_, sizeof(int)); // size of serialized products
+}
+
+void ProductMetadataBuilder::addMissing() {
+ productFlags_ |= HasMissing;
+ append(static_cast(ProductMetadata::Kind::Missing));
+}
+
+void ProductMetadataBuilder::addSerialized(size_t size) {
+ productFlags_ |= HasSerialized;
+ append(static_cast(ProductMetadata::Kind::Serialized));
+ append(size);
+ serializedBufferSize_ += size;
+}
+
+void ProductMetadataBuilder::addTrivialCopy(const std::byte* buffer, size_t size) {
+ productFlags_ |= HasTrivialCopy;
+ append(static_cast(ProductMetadata::Kind::TrivialCopy));
+ append(size);
+ appendBytes(buffer, size);
+}
+
+const uint8_t* ProductMetadataBuilder::data() const { return buffer_; }
+uint8_t* ProductMetadataBuilder::data() { return buffer_; }
+size_t ProductMetadataBuilder::size() const { return size_; }
+std::span ProductMetadataBuilder::buffer() const { return {buffer_, size_}; }
+
+void ProductMetadataBuilder::receiveMetadata(int src, int tag, MPI_Comm comm) {
+ MPI_Status status;
+ MPI_Recv(buffer_, maxMetadataSize_, MPI_BYTE, src, tag, comm, &status);
+ //add error hadling if message too long
+ int receivedBytes = 0;
+ MPI_Get_count(&status, MPI_BYTE, &receivedBytes);
+ assert(static_cast(receivedBytes) >= headerSize_ && "received metadata was less than header size");
+ productCount_ = consume();
+ productFlags_ = consume();
+ serializedBufferSize_ = consume();
+ size_ = receivedBytes;
+}
+
+ProductMetadata ProductMetadataBuilder::getNext() {
+ if (readOffset_ >= size_)
+ throw std::out_of_range("No more metadata entries");
+
+ ProductMetadata meta;
+ auto kind = static_cast(consume());
+ meta.kind = kind;
+
+ switch (kind) {
+ case ProductMetadata::Kind::Missing:
+ break;
+
+ case ProductMetadata::Kind::Serialized:
+ meta.sizeMeta = consume();
+ break;
+
+ case ProductMetadata::Kind::TrivialCopy: {
+ size_t blobSize = consume();
+ if (readOffset_ + blobSize > size_) {
+ throw std::runtime_error("Metadata buffer too short for trivialCopy data");
+ }
+ meta.sizeMeta = blobSize;
+ meta.trivialCopyOffset = buffer_ + readOffset_;
+ readOffset_ += blobSize;
+ break;
+ }
+
+ default:
+ throw std::runtime_error("Unknown metadata kind");
+ }
+
+ return meta;
+}
+
+void ProductMetadataBuilder::resizeBuffer(size_t newCap) {
+ uint8_t* newBuf = static_cast(std::realloc(buffer_, newCap));
+ if (!newBuf)
+ throw std::bad_alloc();
+ buffer_ = newBuf;
+ capacity_ = newCap;
+}
+
+void ProductMetadataBuilder::ensureCapacity(size_t needed) {
+ if (size_ + needed <= capacity_)
+ return;
+
+ size_t newCapacity = capacity_ ? capacity_ : 64;
+ while (size_ + needed > newCapacity)
+ newCapacity *= 2;
+
+ uint8_t* newData = static_cast(std::realloc(buffer_, newCapacity));
+ if (!newData)
+ throw std::bad_alloc();
+ buffer_ = newData;
+ capacity_ = newCapacity;
+}
+
+void ProductMetadataBuilder::appendBytes(const std::byte* src, size_t size) {
+ ensureCapacity(size);
+ std::memcpy(buffer_ + size_, src, size);
+ size_ += size;
+}
+
+void ProductMetadataBuilder::debugPrintMetadataSummary() const {
+ if (size_ < headerSize_) {
+ std::cerr << "ERROR: Buffer too small to contain header\n";
+ return;
+ }
+
+ std::ostringstream out;
+ size_t offset = headerSize_; // Skip the header
+ size_t count = 0;
+ size_t numMissing = 0;
+ size_t numSerialized = 0;
+ size_t numTrivial = 0;
+
+ uint64_t headerCount = 0;
+ std::memcpy(&headerCount, buffer_, sizeof(uint64_t));
+ uint8_t flags = buffer_[sizeof(uint64_t)];
+
+ out << "---- ProductMetadata Debug Summary ----\n";
+ out << "Header:\n";
+ out << " Product count: " << headerCount << "\n";
+ out << " Flags: " << ((flags & HasMissing) ? "Missing " : "") << ((flags & HasSerialized) ? "Serialized " : "")
+ << ((flags & HasTrivialCopy) ? "TrivialCopy " : "") << "\n\n";
+
+ while (offset < size_) {
+ uint8_t kindVal = buffer_[offset];
+ auto kind = static_cast(kindVal);
+ offset += sizeof(uint8_t);
+ count++;
+
+ out << "Product #" << count << ": ";
+
+ switch (kind) {
+ case ProductMetadata::Kind::Missing:
+ numMissing++;
+ out << "Missing\n";
+ break;
+
+ case ProductMetadata::Kind::Serialized: {
+ if (offset + sizeof(size_t) > size_) {
+ out << "ERROR: Corrupted serialized metadata\n";
+ return;
+ }
+ size_t sz;
+ std::memcpy(&sz, buffer_ + offset, sizeof(size_t));
+ offset += sizeof(size_t);
+ numSerialized++;
+ out << "Serialized, size = " << sz << "\n";
+ break;
+ }
+
+ case ProductMetadata::Kind::TrivialCopy: {
+ if (offset + sizeof(size_t) > size_) {
+ out << "ERROR: Corrupted trivial copy metadata\n";
+ return;
+ }
+ size_t sz;
+ std::memcpy(&sz, buffer_ + offset, sizeof(size_t));
+ offset += sizeof(size_t);
+ if (offset + sz > size_) {
+ out << "ERROR: Trivial copy data overflows buffer\n";
+ return;
+ }
+ offset += sz;
+ numTrivial++;
+ out << "TrivialCopy, size = " << sz << "\n";
+ break;
+ }
+
+ default:
+ out << "Unknown kind: " << static_cast(kindVal) << "\n";
+ return;
+ }
+ }
+
+ out << "----------------------------------------\n";
+ out << "Total entries parsed: " << count << "\n";
+ out << " Missing: " << numMissing << "\n";
+ out << " Serialized: " << numSerialized << "\n";
+ out << " TrivialCopy: " << numTrivial << "\n";
+ out << "Total buffer size: " << size_ << " bytes\n";
+
+ std::cerr << out.str() << std::flush;
+}
diff --git a/HeterogeneousCore/MPICore/test/BuildFile.xml b/HeterogeneousCore/MPICore/test/BuildFile.xml
index 63b96e9989075..bdc6dc143aad6 100644
--- a/HeterogeneousCore/MPICore/test/BuildFile.xml
+++ b/HeterogeneousCore/MPICore/test/BuildFile.xml
@@ -3,6 +3,12 @@
+
+
+
+
+
+
diff --git a/HeterogeneousCore/MPICore/test/controller_cfg.py b/HeterogeneousCore/MPICore/test/controller_cfg.py
new file mode 100644
index 0000000000000..dc03bff4d1a27
--- /dev/null
+++ b/HeterogeneousCore/MPICore/test/controller_cfg.py
@@ -0,0 +1,58 @@
+import FWCore.ParameterSet.Config as cms
+
+process = cms.Process("MPIServer")
+
+process.options.numberOfThreads = 4
+process.options.numberOfStreams = 4
+# MPIController supports a single concurrent LuminosityBlock
+process.options.numberOfConcurrentLuminosityBlocks = 1
+process.options.numberOfConcurrentRuns = 1
+process.options.wantSummary = False
+
+process.load("HeterogeneousCore.MPIServices.MPIService_cfi")
+
+from eventlist_cff import eventlist
+process.source = cms.Source("EmptySourceFromEventIDs",
+ events = cms.untracked(eventlist)
+)
+
+process.maxEvents.input = 100
+
+from HeterogeneousCore.MPICore.modules import *
+
+process.mpiController = MPIController(
+ mode = 'CommWorld'
+)
+
+process.ids = cms.EDProducer("edmtest::EventIDProducer")
+
+process.initialcheck = cms.EDAnalyzer("edmtest::EventIDValidator",
+ source = cms.untracked.InputTag('ids')
+)
+
+process.sender = MPISender(
+ upstream = "mpiController",
+ instance = 42,
+ products = [ "edmEventID_ids__*" ]
+)
+
+process.othersender = MPISender(
+ upstream = "mpiController",
+ instance = 19,
+ products = [ "edmEventID_ids__*" ]
+)
+
+process.receiver = MPIReceiver(
+ upstream = "othersender", # guarantees that this module will only run after "othersender" has run
+ instance = 99,
+ products = [ dict(
+ type = "edm::EventID",
+ label = ""
+ )]
+)
+
+process.finalcheck = cms.EDAnalyzer("edmtest::EventIDValidator",
+ source = cms.untracked.InputTag('receiver')
+)
+
+process.path = cms.Path(process.mpiController + process.ids + process.initialcheck + process.sender + process.othersender + process.receiver + process.finalcheck)
diff --git a/HeterogeneousCore/MPICore/test/controller_complex_cfg.py b/HeterogeneousCore/MPICore/test/controller_complex_cfg.py
new file mode 100644
index 0000000000000..87cb7920a617b
--- /dev/null
+++ b/HeterogeneousCore/MPICore/test/controller_complex_cfg.py
@@ -0,0 +1,95 @@
+import FWCore.ParameterSet.Config as cms
+
+process = cms.Process("MPIServer")
+
+process.load("FWCore.MessageService.MessageLogger_cfi")
+process.MessageLogger.cerr.INFO.limit = 10000000
+
+process.options.numberOfThreads = 4
+process.options.numberOfStreams = 4
+# MPIController supports a single concurrent LuminosityBlock
+process.options.numberOfConcurrentLuminosityBlocks = 1
+process.options.numberOfConcurrentRuns = 1
+process.options.wantSummary = False
+
+process.source = cms.Source("EmptySource")
+process.maxEvents.input = 10
+
+process.load("HeterogeneousCore.MPIServices.MPIService_cfi")
+
+from HeterogeneousCore.MPICore.modules import *
+
+process.mpiController = MPIController(
+ mode = 'CommWorld'
+)
+
+# Phase-1 FED RAW data collection pseudo object
+process.fedRawDataCollectionProducer = cms.EDProducer("TestWriteFEDRawDataCollection",
+ # Test values below are meaningless. We just make sure when we read
+ # we get the same values.
+ FEDData0 = cms.vuint32(0, 1, 2, 3, 4, 5, 6, 7),
+ FEDData3 = cms.vuint32(100, 101, 102, 103, 104, 105, 106, 107)
+)
+
+# Phase-2 RAW data buffer pseudo object
+process.rawDataBufferProducer = cms.EDProducer("TestWriteRawDataBuffer",
+ # Test values below are meaningless. We just make sure when we read
+ # we get the same values.
+ dataPattern1 = cms.vuint32(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
+ dataPattern2 = cms.vuint32(100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115)
+)
+
+# HLT trigger event pseudo object
+process.triggerEventProducer = cms.EDProducer("TestWriteTriggerEvent",
+ # Test values below are meaningless. We just make sure when we read
+ # we get the same values.
+ usedProcessName = cms.string("testName"),
+ collectionTags = cms.vstring('moduleA', 'moduleB', 'moduleC'),
+ collectionKeys = cms.vuint32(11, 21, 31),
+ ids = cms.vint32(1, 3, 5),
+ # stick to values exactly convertible from double to float
+ # to avoid potential rounding issues in the test, because
+ # the configuration only supports double not float and
+ # the data format holds floats.
+ pts = cms.vdouble(11.0, 21.0, 31.0),
+ etas = cms.vdouble(101.0, 102.0, 103.0),
+ phis = cms.vdouble(201.0, 202.0, 203.0),
+ masses = cms.vdouble(301.0, 302.0, 303.0),
+ filterTags = cms.vstring('moduleAA', 'moduleBB'),
+ elementsPerVector = cms.uint32(2),
+ filterIds = cms.vint32(1001, 1002, 1003, 1004),
+ filterKeys = cms.vuint32(2001, 2002, 2003, 2004)
+)
+
+# EDM trigger results pseudo object
+process.triggerResultsProducer = cms.EDProducer("TestWriteTriggerResults",
+ # Test values below are meaningless. We just make sure when we read
+ # we get the same values.
+ parameterSetID = cms.string('8b99d66b6c3865c75e460791f721202d'),
+ # names should normally be empty. Only extremely old data or
+ # has names filled and not empty. If it is not empty, the
+ # ParameterSetID is ignored and left default constructed.
+ names = cms.vstring(),
+ hltStates = cms.vuint32(0, 1, 2, 3),
+ moduleIndexes = cms.vuint32(11, 21, 31, 41)
+)
+
+process.sender = MPISender(
+ upstream = "mpiController",
+ instance = 42,
+ products = [
+ "FEDRawDataCollection_fedRawDataCollectionProducer__*",
+ "RawDataBuffer_rawDataBufferProducer__*",
+ "edmTriggerResults_triggerResultsProducer__*",
+ "triggerTriggerEvent_triggerEventProducer__*"
+ ]
+)
+
+process.path = cms.Path(
+ process.mpiController +
+ process.fedRawDataCollectionProducer +
+ process.rawDataBufferProducer +
+ process.triggerEventProducer +
+ process.triggerResultsProducer +
+ process.sender
+)
diff --git a/HeterogeneousCore/MPICore/test/controller_soa_cfg.py b/HeterogeneousCore/MPICore/test/controller_soa_cfg.py
new file mode 100644
index 0000000000000..d3e39b0597c11
--- /dev/null
+++ b/HeterogeneousCore/MPICore/test/controller_soa_cfg.py
@@ -0,0 +1,56 @@
+import FWCore.ParameterSet.Config as cms
+
+process = cms.Process("MPIServer")
+
+process.load("FWCore.MessageService.MessageLogger_cfi")
+process.MessageLogger.cerr.INFO.limit = 10000000
+
+process.options.numberOfThreads = 4
+process.options.numberOfStreams = 4
+# MPIController supports a single concurrent LuminosityBlock
+process.options.numberOfConcurrentLuminosityBlocks = 1
+process.options.numberOfConcurrentRuns = 1
+process.options.wantSummary = False
+
+process.source = cms.Source("EmptySource")
+process.maxEvents.input = 10
+
+process.load("HeterogeneousCore.MPIServices.MPIService_cfi")
+
+# produce and send a portable object, a portable collection, and some portable multicollections
+process.load("Configuration.StandardSequences.Accelerators_cff")
+process.load("HeterogeneousCore.AlpakaCore.ProcessAcceleratorAlpaka_cfi")
+
+from HeterogeneousCore.MPICore.modules import *
+
+process.mpiController = MPIController(
+ mode = 'CommWorld'
+)
+
+process.producePortableObjects = cms.EDProducer("TestAlpakaProducer@alpaka",
+ size = cms.int32(42),
+ size2 = cms.int32(33),
+ size3 = cms.int32(61),
+ alpaka = cms.untracked.PSet(
+ # "serial_sync", "cuda_async", or "rocm_async"
+ backend = cms.untracked.string("")
+ )
+)
+
+process.sender = MPISender(
+ upstream = "mpiController",
+ instance = 42,
+ products = [
+ "portabletestTestStructPortableHostObject_producePortableObjects__*",
+ "128falseportabletestTestSoALayoutPortableHostCollection_producePortableObjects__*",
+ "128falseportabletestSoABlocks2PortableHostCollection_producePortableObjects__*",
+ "128falseportabletestSoABlocks3PortableHostCollection_producePortableObjects__*",
+ "ushort_producePortableObjects_backend_*"
+ ]
+)
+
+process.pathSoA = cms.Path(
+ process.mpiController +
+ process.producePortableObjects +
+ process.sender
+)
diff --git a/HeterogeneousCore/MPICore/test/eventlist_cff.py b/HeterogeneousCore/MPICore/test/eventlist_cff.py
new file mode 100644
index 0000000000000..abeb05bf21d7e
--- /dev/null
+++ b/HeterogeneousCore/MPICore/test/eventlist_cff.py
@@ -0,0 +1,39 @@
+import FWCore.ParameterSet.Config as cms
+
+eventlist = cms.VEventID(
+ # Run 100
+ cms.EventID(100, 1, 1),
+ cms.EventID(100, 1, 4),
+ cms.EventID(100, 3, 13),
+ cms.EventID(100, 3, 17),
+ cms.EventID(100, 5, 18),
+ cms.EventID(100, 5, 28),
+ # Run 101
+ cms.EventID(101, 1, 2),
+ cms.EventID(101, 1, 9),
+ cms.EventID(101, 2, 10),
+ cms.EventID(101, 2, 14),
+ cms.EventID(101, 4, 15),
+ cms.EventID(101, 4, 16),
+ # Run 102
+ cms.EventID(102, 1, 1),
+ cms.EventID(102, 1, 18),
+ cms.EventID(102, 1, 43),
+ cms.EventID(102, 5, 59),
+ cms.EventID(102, 8, 85),
+ cms.EventID(102, 8, 89),
+ # Run 103
+ cms.EventID(103, 1, 13),
+ cms.EventID(103, 4, 42),
+ cms.EventID(103, 4, 43),
+ cms.EventID(103, 4, 44),
+ cms.EventID(103, 9, 95),
+ cms.EventID(103, 9, 99),
+ # Run 104
+ cms.EventID(104, 3, 31),
+ cms.EventID(104, 5, 52),
+ cms.EventID(104, 8, 83),
+ cms.EventID(104, 8, 84),
+ cms.EventID(104, 8, 85),
+ cms.EventID(104, 8, 89),
+)
diff --git a/HeterogeneousCore/MPICore/test/follower_cfg.py b/HeterogeneousCore/MPICore/test/follower_cfg.py
new file mode 100644
index 0000000000000..443dff041f4c1
--- /dev/null
+++ b/HeterogeneousCore/MPICore/test/follower_cfg.py
@@ -0,0 +1,51 @@
+import FWCore.ParameterSet.Config as cms
+
+process = cms.Process("MPIClient")
+
+process.options.numberOfThreads = 8
+process.options.numberOfStreams = 8
+process.options.numberOfConcurrentLuminosityBlocks = 2
+process.options.numberOfConcurrentRuns = 2
+process.options.wantSummary = False
+
+process.load("HeterogeneousCore.MPIServices.MPIService_cfi")
+
+from HeterogeneousCore.MPICore.modules import *
+
+process.source = MPISource()
+
+process.maxEvents.input = -1
+
+# very verbose
+#from HeterogeneousCore.MPICore.mpiReporter_cfi import mpiReporter as mpiReporter_
+#process.reporter = mpiReporter_.clone()
+
+process.receiver = MPIReceiver(
+ upstream = "source",
+ instance = 42,
+ products = [ dict(
+ type = "edm::EventID",
+ label = ""
+ )]
+)
+
+process.otherreceiver = MPIReceiver(
+ upstream = "source",
+ instance = 19,
+ products = [ dict(
+ type = "edm::EventID",
+ label = ""
+ )]
+)
+
+process.sender = MPISender(
+ upstream = "otherreceiver", # guarantees that this module will only run after otherreceiver has run
+ instance = 99,
+ products = [ "edmEventID_otherreceiver__*" ]
+)
+
+process.analyzer = cms.EDAnalyzer("edmtest::EventIDValidator",
+ source = cms.untracked.InputTag("receiver")
+)
+
+process.path = cms.Path(process.receiver + process.analyzer + process.otherreceiver + process.sender)
diff --git a/HeterogeneousCore/MPICore/test/follower_complex_cfg.py b/HeterogeneousCore/MPICore/test/follower_complex_cfg.py
new file mode 100644
index 0000000000000..e72bbd99ce7f5
--- /dev/null
+++ b/HeterogeneousCore/MPICore/test/follower_complex_cfg.py
@@ -0,0 +1,90 @@
+import FWCore.ParameterSet.Config as cms
+
+process = cms.Process("MPIClient")
+
+process.options.numberOfThreads = 4
+process.options.numberOfStreams = 4
+process.options.wantSummary = False
+
+process.load("HeterogeneousCore.MPIServices.MPIService_cfi")
+
+from HeterogeneousCore.MPICore.modules import *
+
+process.source = MPISource()
+
+process.maxEvents.input = -1
+
+process.receiver = MPIReceiver(
+ upstream = "source",
+ instance = 42,
+ products = [
+ dict(
+ type = "FEDRawDataCollection",
+ label = "fedRawDataCollectionProducer"
+ ),
+ dict(
+ type = "RawDataBuffer",
+ label = "rawDataBufferProducer"
+ ),
+ dict(
+ type = "edm::TriggerResults",
+ label = "triggerResultsProducer"
+ ),
+ dict(
+ type = "trigger::TriggerEvent",
+ label = "triggerEventProducer"
+ )
+ ]
+)
+
+# Phase-1 FED RAW data collection pseudo object
+process.testReadFEDRawDataCollection = cms.EDAnalyzer("TestReadFEDRawDataCollection",
+ fedRawDataCollectionTag = cms.InputTag("receiver", "fedRawDataCollectionProducer"),
+ expectedFEDData0 = cms.vuint32(0, 1, 2, 3, 4, 5, 6, 7),
+ expectedFEDData3 = cms.vuint32(100, 101, 102, 103, 104, 105, 106, 107)
+)
+
+# Phase-2 RAW data buffer pseudo object
+process.testReadRawDataBuffer = cms.EDAnalyzer("TestReadRawDataBuffer",
+ rawDataBufferTag = cms.InputTag("receiver", "rawDataBufferProducer"),
+ dataPattern1 = cms.vuint32(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
+ dataPattern2 = cms.vuint32(100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115)
+)
+
+# HLT trigger event pseudo object
+process.testReadTriggerEvent = cms.EDAnalyzer("TestReadTriggerEvent",
+ expectedUsedProcessName = cms.string("testName"),
+ expectedCollectionTags = cms.vstring('moduleA', 'moduleB', 'moduleC'),
+ expectedCollectionKeys = cms.vuint32(11, 21, 31),
+ expectedIds = cms.vint32(1, 3, 5),
+ # stick to values exactly convertible from double to float
+ # to avoid potential rounding issues in the test, because
+ # the configuration only supports double not float and
+ # the data format holds floats.
+ expectedPts = cms.vdouble(11.0, 21.0, 31.0),
+ expectedEtas = cms.vdouble(101.0, 102.0, 103.0),
+ expectedPhis = cms.vdouble(201.0, 202.0, 203.0),
+ expectedMasses = cms.vdouble(301.0, 302.0, 303.0),
+ expectedFilterTags = cms.vstring('moduleAA', 'moduleBB'),
+ expectedElementsPerVector = cms.uint32(2),
+ expectedFilterIds = cms.vint32(1001, 1002, 1003, 1004),
+ expectedFilterKeys = cms.vuint32(2001, 2002, 2003, 2004),
+ triggerEventTag = cms.InputTag("receiver", "triggerEventProducer")
+)
+
+# EDM trigger results pseudo object
+process.testReadTriggerResults = cms.EDAnalyzer("TestReadTriggerResults",
+ triggerResultsTag = cms.InputTag("receiver", "triggerResultsProducer"),
+ expectedParameterSetID = cms.string('8b99d66b6c3865c75e460791f721202d'),
+ expectedNames = cms.vstring(),
+ expectedHLTStates = cms.vuint32(0, 1, 2, 3),
+ expectedModuleIndexes = cms.vuint32(11, 21, 31, 41)
+)
+
+process.path = cms.Path(
+ process.receiver +
+ process.testReadFEDRawDataCollection +
+ process.testReadRawDataBuffer +
+ process.testReadTriggerEvent +
+ process.testReadTriggerResults
+)
diff --git a/HeterogeneousCore/MPICore/test/follower_soa_cfg.py b/HeterogeneousCore/MPICore/test/follower_soa_cfg.py
new file mode 100644
index 0000000000000..b250af51332cd
--- /dev/null
+++ b/HeterogeneousCore/MPICore/test/follower_soa_cfg.py
@@ -0,0 +1,57 @@
+import FWCore.ParameterSet.Config as cms
+
+process = cms.Process("MPIClient")
+
+process.options.numberOfThreads = 4
+process.options.numberOfStreams = 4
+process.options.wantSummary = False
+
+process.load("HeterogeneousCore.MPIServices.MPIService_cfi")
+
+from HeterogeneousCore.MPICore.modules import *
+
+process.source = MPISource()
+
+process.maxEvents.input = -1
+
+# receive and validate a portable object, a portable collection, and some portable multi-block collections
+process.receiver = MPIReceiver(
+ upstream = "source",
+ instance = 42,
+ products = [
+ dict(
+ type = "PortableHostObject",
+ label = ""
+ ),
+ dict(
+ type = "PortableHostCollection >",
+ label = ""
+ ),
+ dict(
+ type = "PortableHostCollection >",
+ label = ""
+ ),
+ dict(
+ type = "PortableHostCollection >",
+ label = ""
+ ),
+ dict(
+ type = "ushort",
+ label = "backend"
+ )
+ ]
+)
+
+process.validatePortableCollections = cms.EDAnalyzer("TestAlpakaAnalyzer",
+ source = cms.InputTag("receiver")
+)
+
+process.validatePortableObject = cms.EDAnalyzer("TestAlpakaObjectAnalyzer",
+ source = cms.InputTag("receiver")
+)
+
+process.pathSoA = cms.Path(
+ process.receiver +
+ process.validatePortableCollections +
+ process.validatePortableObject
+)
diff --git a/HeterogeneousCore/MPICore/test/testMPICommWorld.sh b/HeterogeneousCore/MPICore/test/testMPICommWorld.sh
new file mode 100755
index 0000000000000..e9739bb0e8cfc
--- /dev/null
+++ b/HeterogeneousCore/MPICore/test/testMPICommWorld.sh
@@ -0,0 +1,17 @@
+#! /bin/bash
+
+# Shell script for testing CMSSW over MPI.
+CONTROLLER=$(realpath $1)
+FOLLOWER=$(realpath $2)
+
+# Make sure the CMSSW environment has been loaded.
+if [ -z "$CMSSW_BASE" ]; then
+ eval `scram runtime -sh`
+fi
+
+# The CM pml leads to silent communication failures on some machines.
+# Until this is understood and fixed, keep it disabled.
+export OMPI_MCA_pml='^cm'
+
+# Launch the controller and follower processes
+mpirun -n 1 cmsRun ${CONTROLLER} : -n 1 cmsRun ${FOLLOWER}
diff --git a/HeterogeneousCore/MPICore/test/testMPIInterCommunicator.sh b/HeterogeneousCore/MPICore/test/testMPIInterCommunicator.sh
new file mode 100755
index 0000000000000..c33ccd0ebe004
--- /dev/null
+++ b/HeterogeneousCore/MPICore/test/testMPIInterCommunicator.sh
@@ -0,0 +1,71 @@
+#! /bin/bash
+# Shell script for testing CMSSW over MPI
+CONTROLLER=$(realpath $1)
+FOLLOWER=$(realpath $2)
+
+# Make sure the CMSSW environment has been loaded.
+if [ -z "$CMSSW_BASE" ]; then
+ eval `scram runtime -sh`
+fi
+
+# The CM pml leads to silent communication failures on some machines.
+# Until this is understood and fixed, keep it disabled.
+export OMPI_MCA_pml='^cm'
+
+mkdir -p $CMSSW_BASE/tmp/$SCRAM_ARCH/test
+DIR=$(mktemp -d -p $CMSSW_BASE/tmp/$SCRAM_ARCH/test)
+echo "Running MPI tests at $DIR/"
+pushd $DIR > /dev/null
+
+# Start an MPI server to let independent CMSSW processes find each other.
+echo "Starting the Open RTE data server"
+ompi-server -r server.uri -d >& ompi-server.log &
+SERVER_PID=$!
+disown
+# wait until the ORTE server logs 'up and running'
+while ! grep -q 'up and running' ompi-server.log; do
+ sleep 1s
+done
+
+# Note: "mpirun --mca pmix_server_uri file:server.uri" is required to make the
+# tests work inside a singularity/apptainer container. Without a container the
+# cmsRun commands can be used directly.
+
+# Start the "follower" CMSSW job(s).
+{
+ mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $FOLLOWER >& follower.log
+ echo $? > follower.status
+} &
+FOLLOWER_PID=$!
+
+# Wait until the MPISource has established the connection to the ORTE server.
+while ! grep -q 'waiting for a connection to the MPI server' follower.log; do
+ sleep 1s
+done
+
+# Start the "controller" CMSSW job(s).
+{
+ mpirun --mca pmix_server_uri file:server.uri -n 1 -- cmsRun $CONTROLLER >& controller.log
+ echo $? > controller.status
+} &
+CONTROLLER_PID=$!
+
+# Wait for the CMSSW jobs to finish.
+wait $CONTROLLER_PID $FOLLOWER_PID
+
+# Print the jobs' output and check the jobs' exit status.
+echo '========== testMPIController ==========='
+cat controller.log
+MPICONTROLLER_STATUS=$(< controller.status)
+echo '========================================'
+echo
+echo '=========== testMPIFollower ============'
+cat follower.log
+MPISOURCE_STATUS=$(< follower.status)
+echo '========================================'
+
+# Stop the MPI server and cleanup the URI file.
+kill $SERVER_PID
+
+popd > /dev/null
+exit $((MPISOURCE_STATUS > MPICONTROLLER_STATUS ? MPISOURCE_STATUS : MPICONTROLLER_STATUS))