-
Notifications
You must be signed in to change notification settings - Fork 9
Parallelization of data ingestion from AMSlib #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
…stager to exchange-based communications. This will allow RMQ server to mutliple cores to process the data Signed-off-by: Loic Pottier <[email protected]>
Signed-off-by: Loic Pottier <[email protected]>
…ectly from queue Signed-off-by: Loic Pottier <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cpp-linter Review
Used clang-format v18.1.8
Only 5 out of 6 clang-format concerns fit within this pull request's diff.
Click here for the full clang-format patch
diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp
index 52c51fe..468c5dc 100644
--- a/src/AMSlib/wf/basedb.hpp
+++ b/src/AMSlib/wf/basedb.hpp
@@ -1288,7 +1288 @@ private:
- .onSuccess([&]() {
- DBG(ConnectionManagerAMQP, "declared exchange: %s", _exchange_sender.c_str())
-
- _channel->declareQueue(_queue_name)
- .onSuccess([&](const std::string& queue_name,
- uint32_t messagecount,
- uint32_t consumercount) {
+ .onSuccess([&]() {
@@ -1296,23 +1290,42 @@ private:
- "declared queue: %s (messagecount=%d, "
- "consumercount=%d)",
- queue_name.c_str(),
- messagecount,
- consumercount)
- // We bind the anonymous queue to the exchange
- _channel->bindQueue(_exchange_sender, queue_name, _routing_key_sender)
- .onSuccess([&, queue_name]() {
- DBG(ConnectionManagerAMQP,
- "Bounded queue %s to exchange %s with "
- "routing key = %s",
- queue_name.c_str(),
- _exchange_sender.c_str(),
- _routing_key_sender.c_str())
- }) // bindQueue
- .onError([&](const char* message) {
- WARNING(ConnectionManagerAMQP,
- "Error while binding queue to exchange "
- "%s",
- message)
- _isConnected = false;
- }); // bindQueue
- }) //declareQueue
+ "declared exchange: %s",
+ _exchange_sender.c_str())
+
+ _channel->declareQueue(_queue_name)
+ .onSuccess([&](const std::string& queue_name,
+ uint32_t messagecount,
+ uint32_t consumercount) {
+ DBG(ConnectionManagerAMQP,
+ "declared queue: %s (messagecount=%d, "
+ "consumercount=%d)",
+ queue_name.c_str(),
+ messagecount,
+ consumercount)
+ // We bind the anonymous queue to the exchange
+ _channel
+ ->bindQueue(_exchange_sender,
+ queue_name,
+ _routing_key_sender)
+ .onSuccess([&, queue_name]() {
+ DBG(ConnectionManagerAMQP,
+ "Bounded queue %s to exchange %s with "
+ "routing key = %s",
+ queue_name.c_str(),
+ _exchange_sender.c_str(),
+ _routing_key_sender.c_str())
+ }) // bindQueue
+ .onError([&](const char* message) {
+ WARNING(ConnectionManagerAMQP,
+ "Error while binding queue to exchange "
+ "%s",
+ message)
+ _isConnected = false;
+ }); // bindQueue
+ }) //declareQueue
+ .onError([&](const char* message) {
+ WARNING(ConnectionManagerAMQP,
+ "Error while creating queue: "
+ "%s",
+ message)
+ _isConnected = false;
+ }); //declareQueue
+ }) // declareExchange
@@ -1321 +1334 @@ private:
- "Error while creating queue: "
+ "Error while creating exchange: "
@@ -1325,9 +1338 @@ private:
- }); //declareQueue
- }) // declareExchange
- .onError([&](const char* message) {
- WARNING(ConnectionManagerAMQP,
- "Error while creating exchange: "
- "%s",
- message)
- _isConnected = false;
- }); // declareExchange
+ }); // declareExchange
@@ -1483 +1488,3 @@ public:
- CWARNING(RMQInterface, amsRMQNamedQueue, "Using named queue for RabbitMQ (slower)")
+ CWARNING(RMQInterface,
+ amsRMQNamedQueue,
+ "Using named queue for RabbitMQ (slower)")
@@ -1493,13 +1500,14 @@ public:
- _publishingManager = std::make_unique<ConnectionManagerAMQP>(_rId,
- rmq_user,
- rmq_password,
- rmq_vhost,
- service_host,
- service_port,
- rmq_cert,
- exchange_physics,
- routing_key_physics,
- exchange_ml,
- routing_key_ml,
- amsRMQFailure,
- queue_name);
+ _publishingManager =
+ std::make_unique<ConnectionManagerAMQP>(_rId,
+ rmq_user,
+ rmq_password,
+ rmq_vhost,
+ service_host,
+ service_port,
+ rmq_cert,
+ exchange_physics,
+ routing_key_physics,
+ exchange_ml,
+ routing_key_ml,
+ amsRMQFailure,
+ queue_name);
Have any feedback or feature suggestions? Share it here.
Signed-off-by: Loic Pottier <[email protected]>
Signed-off-by: Loic Pottier <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cpp-linter Review
Used clang-format v18.1.8
Only 5 out of 6 clang-format concerns fit within this pull request's diff.
Click here for the full clang-format patch
diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp
index 52c51fe..468c5dc 100644
--- a/src/AMSlib/wf/basedb.hpp
+++ b/src/AMSlib/wf/basedb.hpp
@@ -1288,7 +1288 @@ private:
- .onSuccess([&]() {
- DBG(ConnectionManagerAMQP, "declared exchange: %s", _exchange_sender.c_str())
-
- _channel->declareQueue(_queue_name)
- .onSuccess([&](const std::string& queue_name,
- uint32_t messagecount,
- uint32_t consumercount) {
+ .onSuccess([&]() {
@@ -1296,23 +1290,42 @@ private:
- "declared queue: %s (messagecount=%d, "
- "consumercount=%d)",
- queue_name.c_str(),
- messagecount,
- consumercount)
- // We bind the anonymous queue to the exchange
- _channel->bindQueue(_exchange_sender, queue_name, _routing_key_sender)
- .onSuccess([&, queue_name]() {
- DBG(ConnectionManagerAMQP,
- "Bounded queue %s to exchange %s with "
- "routing key = %s",
- queue_name.c_str(),
- _exchange_sender.c_str(),
- _routing_key_sender.c_str())
- }) // bindQueue
- .onError([&](const char* message) {
- WARNING(ConnectionManagerAMQP,
- "Error while binding queue to exchange "
- "%s",
- message)
- _isConnected = false;
- }); // bindQueue
- }) //declareQueue
+ "declared exchange: %s",
+ _exchange_sender.c_str())
+
+ _channel->declareQueue(_queue_name)
+ .onSuccess([&](const std::string& queue_name,
+ uint32_t messagecount,
+ uint32_t consumercount) {
+ DBG(ConnectionManagerAMQP,
+ "declared queue: %s (messagecount=%d, "
+ "consumercount=%d)",
+ queue_name.c_str(),
+ messagecount,
+ consumercount)
+ // We bind the anonymous queue to the exchange
+ _channel
+ ->bindQueue(_exchange_sender,
+ queue_name,
+ _routing_key_sender)
+ .onSuccess([&, queue_name]() {
+ DBG(ConnectionManagerAMQP,
+ "Bounded queue %s to exchange %s with "
+ "routing key = %s",
+ queue_name.c_str(),
+ _exchange_sender.c_str(),
+ _routing_key_sender.c_str())
+ }) // bindQueue
+ .onError([&](const char* message) {
+ WARNING(ConnectionManagerAMQP,
+ "Error while binding queue to exchange "
+ "%s",
+ message)
+ _isConnected = false;
+ }); // bindQueue
+ }) //declareQueue
+ .onError([&](const char* message) {
+ WARNING(ConnectionManagerAMQP,
+ "Error while creating queue: "
+ "%s",
+ message)
+ _isConnected = false;
+ }); //declareQueue
+ }) // declareExchange
@@ -1321 +1334 @@ private:
- "Error while creating queue: "
+ "Error while creating exchange: "
@@ -1325,9 +1338 @@ private:
- }); //declareQueue
- }) // declareExchange
- .onError([&](const char* message) {
- WARNING(ConnectionManagerAMQP,
- "Error while creating exchange: "
- "%s",
- message)
- _isConnected = false;
- }); // declareExchange
+ }); // declareExchange
@@ -1483 +1488,3 @@ public:
- CWARNING(RMQInterface, amsRMQNamedQueue, "Using named queue for RabbitMQ (slower)")
+ CWARNING(RMQInterface,
+ amsRMQNamedQueue,
+ "Using named queue for RabbitMQ (slower)")
@@ -1493,13 +1500,14 @@ public:
- _publishingManager = std::make_unique<ConnectionManagerAMQP>(_rId,
- rmq_user,
- rmq_password,
- rmq_vhost,
- service_host,
- service_port,
- rmq_cert,
- exchange_physics,
- routing_key_physics,
- exchange_ml,
- routing_key_ml,
- amsRMQFailure,
- queue_name);
+ _publishingManager =
+ std::make_unique<ConnectionManagerAMQP>(_rId,
+ rmq_user,
+ rmq_password,
+ rmq_vhost,
+ service_host,
+ service_port,
+ rmq_cert,
+ exchange_physics,
+ routing_key_physics,
+ exchange_ml,
+ routing_key_ml,
+ amsRMQFailure,
+ queue_name);
Have any feedback or feature suggestions? Share it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cpp-linter Review
Used clang-format v18.1.8
Only 5 out of 6 clang-format concerns fit within this pull request's diff.
Click here for the full clang-format patch
diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp
index 52c51fe..468c5dc 100644
--- a/src/AMSlib/wf/basedb.hpp
+++ b/src/AMSlib/wf/basedb.hpp
@@ -1288,7 +1288 @@ private:
- .onSuccess([&]() {
- DBG(ConnectionManagerAMQP, "declared exchange: %s", _exchange_sender.c_str())
-
- _channel->declareQueue(_queue_name)
- .onSuccess([&](const std::string& queue_name,
- uint32_t messagecount,
- uint32_t consumercount) {
+ .onSuccess([&]() {
@@ -1296,23 +1290,42 @@ private:
- "declared queue: %s (messagecount=%d, "
- "consumercount=%d)",
- queue_name.c_str(),
- messagecount,
- consumercount)
- // We bind the anonymous queue to the exchange
- _channel->bindQueue(_exchange_sender, queue_name, _routing_key_sender)
- .onSuccess([&, queue_name]() {
- DBG(ConnectionManagerAMQP,
- "Bounded queue %s to exchange %s with "
- "routing key = %s",
- queue_name.c_str(),
- _exchange_sender.c_str(),
- _routing_key_sender.c_str())
- }) // bindQueue
- .onError([&](const char* message) {
- WARNING(ConnectionManagerAMQP,
- "Error while binding queue to exchange "
- "%s",
- message)
- _isConnected = false;
- }); // bindQueue
- }) //declareQueue
+ "declared exchange: %s",
+ _exchange_sender.c_str())
+
+ _channel->declareQueue(_queue_name)
+ .onSuccess([&](const std::string& queue_name,
+ uint32_t messagecount,
+ uint32_t consumercount) {
+ DBG(ConnectionManagerAMQP,
+ "declared queue: %s (messagecount=%d, "
+ "consumercount=%d)",
+ queue_name.c_str(),
+ messagecount,
+ consumercount)
+ // We bind the anonymous queue to the exchange
+ _channel
+ ->bindQueue(_exchange_sender,
+ queue_name,
+ _routing_key_sender)
+ .onSuccess([&, queue_name]() {
+ DBG(ConnectionManagerAMQP,
+ "Bounded queue %s to exchange %s with "
+ "routing key = %s",
+ queue_name.c_str(),
+ _exchange_sender.c_str(),
+ _routing_key_sender.c_str())
+ }) // bindQueue
+ .onError([&](const char* message) {
+ WARNING(ConnectionManagerAMQP,
+ "Error while binding queue to exchange "
+ "%s",
+ message)
+ _isConnected = false;
+ }); // bindQueue
+ }) //declareQueue
+ .onError([&](const char* message) {
+ WARNING(ConnectionManagerAMQP,
+ "Error while creating queue: "
+ "%s",
+ message)
+ _isConnected = false;
+ }); //declareQueue
+ }) // declareExchange
@@ -1321 +1334 @@ private:
- "Error while creating queue: "
+ "Error while creating exchange: "
@@ -1325,9 +1338 @@ private:
- }); //declareQueue
- }) // declareExchange
- .onError([&](const char* message) {
- WARNING(ConnectionManagerAMQP,
- "Error while creating exchange: "
- "%s",
- message)
- _isConnected = false;
- }); // declareExchange
+ }); // declareExchange
@@ -1483 +1488,3 @@ public:
- CWARNING(RMQInterface, amsRMQNamedQueue, "Using named queue for RabbitMQ (slower)")
+ CWARNING(RMQInterface,
+ amsRMQNamedQueue,
+ "Using named queue for RabbitMQ (slower)")
@@ -1493,13 +1500,14 @@ public:
- _publishingManager = std::make_unique<ConnectionManagerAMQP>(_rId,
- rmq_user,
- rmq_password,
- rmq_vhost,
- service_host,
- service_port,
- rmq_cert,
- exchange_physics,
- routing_key_physics,
- exchange_ml,
- routing_key_ml,
- amsRMQFailure,
- queue_name);
+ _publishingManager =
+ std::make_unique<ConnectionManagerAMQP>(_rId,
+ rmq_user,
+ rmq_password,
+ rmq_vhost,
+ service_host,
+ service_port,
+ rmq_cert,
+ exchange_physics,
+ routing_key_physics,
+ exchange_ml,
+ routing_key_ml,
+ amsRMQFailure,
+ queue_name);
Have any feedback or feature suggestions? Share it here.
Signed-off-by: Loic Pottier <[email protected]>
Signed-off-by: Loic Pottier <[email protected]>
koparasy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that this does what we want. From briefly reading the code all producers publish to the exchange and use a fixed routing key. The producers also bind a specific single queue name to that exchange. Which means (I think) that the server will have a single erlang process to support that single qname and route all messages to that queue. I am guessing we want to have multiple queues right to enable multi processing on the server side. Correct?
I am not sure I know the specification good enough. But from a first read that I did this is what I understand.
|
@koparasy Yes the idea is that all producers (MPI ranks) subscribe to the same exchange and send their messages with the same routing key. Note that the routing key could different for an ensemble run where the different domain names could be the routing keys, it would allow the stagers to only fetch specific domain names for example.
The previous design looks like this with the default exchange Of course, the more complex design with the exchange and the different queues introduces some overhead (creating the exchange and binding the queues which are all synchronous operations and routing the messages). I am benchmarking this PR to know the potential performance gains (or loss). However, one important point is that the number of anonymous queues that are created by RMQ is related to the number of consumers not producers. So, if you have 32 stagers, we will have 32 queues, not matter the number of MPI ranks on the other side. We could think of a design where explicitly create named queues for each MPI rank and we listen to them on the consumers side. |
|
@koparasy I have found a flaw in the current design: messages are going to be duplicated similar to a |


Previous design
This PR changes how AMSlib send data to the to RabbitMQ and to the stager. Prior that PR, AMSlib design was one queue for every MPI ranks to send data to RMQ. It's a simple design but it does not scale as RabbitMQ adopted a design "one core for one queue".
New design
In order to take advantage of parallelization inside the RabbitMQ server, we must use more than one queue. The solution is to create an exchange for the data going to the stager and let RabbitMQ on both side create anonymous queues that are bound to that exchange with a given routing key.
This design is much more flexible and also allows filtering on the stager side per domain for example etc (we could create one routing key per domain and filter on that using RabbitMQ
topicfeature).Debug notes
Note that this design introduces one complication: because RabbitMQ does not hold messages that are routed to anonymous queues if there is no consumers listening. The consequence is that the receiver must start before the sender. It posed some issues with our CTest where the sender starts first and then the receiver starts. To overcome that, I introduce a debug mode in AMS which is triggered when the variable
AMS_USE_NAMED_QUEUEis set. This forces AMS to use named queues (ams-debug-queue-${RANK}). This allows us to start the sender first.Side notes
This PR also improves two aspects:
0(it was1before).0means that there is no limit on the number of unacknowledged messages per consumer;AMS_OBJECTSwas set it could disrupt RabbitMQ tests. Now, we unset that variable when running tests (it unset the variable just for the tests, it does unset it in your terminal).