diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1a3cca14..bf83331b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -285,7 +285,8 @@ jobs: \"service-port\": ${RABBITMQ_PORT}, \"service-host\": \"${RABBITMQ_HOST}\", \"rabbitmq-vhost\": \"/\", - \"rabbitmq-queue-physics\": \"test-ci\", + \"rabbitmq-exchange-physics\": \"exchange-ci\", + \"rabbitmq-key-physics\": \"queue-ci\", \"rabbitmq-exchange-training\": \"ams-fanout\", \"rabbitmq-key-training\": \"training\" }""" > rmq.json diff --git a/src/AMSWorkflow/ams/rmq.py b/src/AMSWorkflow/ams/rmq.py index 61f65967..31ed4655 100644 --- a/src/AMSWorkflow/ams/rmq.py +++ b/src/AMSWorkflow/ams/rmq.py @@ -18,7 +18,6 @@ import json import pika - class AMSMessage(object): """ Represents a RabbitMQ incoming message from AMSLib. @@ -225,12 +224,16 @@ class AMSChannel: def __init__( self, connection, - q_name, + exchange, + routing_key, + queue = "", callback: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ): self.connection = connection - self.q_name = q_name + self.exchange = exchange + self.routing_key = routing_key + self.q_name = queue self.logger = logger if logger else logging.getLogger(__name__) self.callback = callback if callback else self.default_callback @@ -247,7 +250,16 @@ def default_callback(self, method, properties, body): def open(self): self.channel = self.connection.channel() - self.channel.queue_declare(queue=self.q_name) + if self.exchange != '': + self.logger.info(f"Declared exchange {self.exchange}") + self.channel.exchange_declare(exchange = self.exchange, exchange_type = "direct") + + result = self.channel.queue_declare(queue = self.q_name, exclusive = False, durable = False) + self.q_name = result.method.queue + self.logger.info(f"Declared queue {self.q_name}") + if self.exchange != '': + self.logger.info(f"Binding queue {self.q_name} to exchange {self.exchange}") + self.channel.queue_bind(exchange = self.exchange, queue = self.q_name, routing_key = self.routing_key) def close(self): self.channel.close() @@ -308,7 +320,7 @@ def send(self, text: str, exchange: str = ""): @param text The text to send @param exchange Exchange to use """ - self.channel.basic_publish(exchange=exchange, routing_key=self.q_name, body=text) + self.channel.basic_publish(exchange = exchange, routing_key = self.routing_key, body=text) return def get_messages(self): @@ -374,9 +386,11 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.connection.close() - def connect(self, queue): - """Connect to the queue""" - return AMSChannel(self.connection, queue, self.callback) + def connect(self, exchange, routing_key, queue = ""): + """ + Connect to the exchange and routing key. + """ + return AMSChannel(self.connection, exchange, routing_key, queue, self.callback) class StatusPoller(BlockingClient): @@ -401,8 +415,11 @@ def __init__( user: str, password: str, cert: str, - queue: str, - prefetch_count: int = 1, + exchange: str, + routing_key: str, + queue: str = "", + prefetch_count: int = 0, + exchange_type: str = "direct", on_message_cb: Optional[Callable] = None, on_close_cb: Optional[Callable] = None, logger: Optional[logging.Logger] = None, @@ -425,6 +442,9 @@ def __init__( self._vhost = vhost self._cacert = cert self._queue = queue + self._exchange = exchange + self._exchange_type = exchange_type + self._routing_key = routing_key self.should_reconnect = False # Holds the latest error/reason to reconnect @@ -568,8 +588,7 @@ def on_channel_open(self, channel): self._channel = channel self.logger.debug("Channel opened") self.add_on_channel_close_callback() - # we do not set up exchange first here, we use the default exchange '' - self.setup_queue(self._queue) + self.setup_exchange(self._exchange, self._exchange_type) def add_on_channel_close_callback(self): """This method tells pika to call the on_channel_closed method if @@ -595,6 +614,33 @@ def on_channel_closed(self, channel, reason): self._on_close_cb() # running user callback self.close_connection() + def setup_exchange(self, exchange_name, exchange_type): + """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC + command. When it is complete, the on_exchange_declareok method will + be invoked by pika. + + :param str|unicode exchange_name: The name of the exchange to declare + + """ + self.logger.debug(f"Declaring exchange: '{exchange_name}'") + cb = functools.partial( + self.on_exchange_declareok, userdata = exchange_name) + self._channel.exchange_declare( + exchange = exchange_name, + exchange_type = exchange_type, + callback = cb) + + def on_exchange_declareok(self, _unused_frame, userdata): + """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC + command. + + :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame + :param str|unicode userdata: Extra user data (exchange name) + + """ + self.logger.debug(f"Exchange declared: '{userdata}'") + self.setup_queue(self._queue) + def setup_queue(self, queue_name): """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC command. When it is complete, the on_queue_declareok method will @@ -603,7 +649,7 @@ def setup_queue(self, queue_name): :param str|unicode queue_name: The name of the queue to declare. """ - self.logger.debug(f'Declaring queue "{queue_name}"') + self.logger.debug(f"Declaring queue '{queue_name}'") cb = functools.partial(self.on_queue_declareok, userdata=queue_name) # arguments = {"x-consumer-timeout":1800000} # 30 minutes in ms self._channel.queue_declare(queue=queue_name, exclusive=False, callback=cb) @@ -620,7 +666,23 @@ def on_queue_declareok(self, _unused_frame, userdata): """ queue_name = userdata - self.logger.debug(f'Queue "{queue_name}" declared') + self.logger.info(f"Binding {self._exchange} to queue '{queue_name}' with key '{self._routing_key}'") + cb = functools.partial(self.on_bindok, userdata=queue_name) + self._channel.queue_bind( + queue_name, + self._exchange, + routing_key=self._routing_key, + callback=cb) + + def on_bindok(self, _unused_frame, userdata): + """Invoked by pika when the Queue.Bind method has completed. At this + point we will set the prefetch count for the channel. + + :param pika.frame.Method _unused_frame: The Queue.BindOk response frame + :param str|unicode userdata: Extra user data (queue name) + + """ + self.logger.debug(f"Queue bound: '{userdata}'") self.set_qos() def set_qos(self): @@ -772,51 +834,29 @@ def __init__( user: str, password: str, cert: str, - queue: str, - prefetch_count: int = 1, + routing_key: str, + prefetch_count: int = 0, on_message_cb: Optional[Callable] = None, on_close_cb: Optional[Callable] = None, logger: Optional[logging.Logger] = None, ): super().__init__( - host, - port, - vhost, - user, - password, - cert, - queue, - prefetch_count, - on_message_cb, - on_close_cb, - logger, - ) - - # Callback when the channel is open - def on_channel_open(self, channel): - self._channel = channel - self.logger.debug("Channel opened") - self.add_on_channel_close_callback() - self._channel.exchange_declare( + host=host, + port=port, + vhost=vhost, + user=user, + password=password, + cert=cert, exchange="control-panel", + routing_key=routing_key, + queue="", exchange_type="fanout", - callback=self.on_exchange_declared, + prefetch_count=prefetch_count, + on_message_cb=on_message_cb, + on_close_cb=on_close_cb, + logger=logger, ) - # Callback when the exchange is declared - def on_exchange_declared(self, frame): - self._channel.queue_declare(queue="", exclusive=True, callback=self.on_queue_declared) - - # Callback when the queue is declared - def on_queue_declared(self, queue_result): - self._queue = queue_result.method.queue - self._channel.queue_bind(exchange="control-panel", queue=self._queue, callback=self.on_queue_bound) - - # Callback when the queue is bound to the exchange - def on_queue_bound(self, frame): - self.set_qos() - - class AMSSyncProducer: def __init__( self, @@ -947,7 +987,8 @@ class AMSRMQConfiguration: "rabbitmq-user": "", "rabbitmq-vhost": "", "rabbitmq-cert": "", - "rabbitmq-queue-physics": "", + "rabbitmq-exchange-physics": "", + "rabbitmq-key-physics": "", "rabbitmq-exchange-training": "", "rabbitmq-key-training": "" }, @@ -962,7 +1003,8 @@ class AMSRMQConfiguration: rabbitmq_user: str rabbitmq_vhost: str rabbitmq_cert: str - rabbitmq_queue_physics: str + rabbitmq_exchange_physics: str + rabbitmq_key_physics: str rabbitmq_exchange_training: str = "" rabbitmq_key_training: str = "" rabbitmq_ml_submit_queue: str = "" @@ -994,7 +1036,8 @@ def to_dict(self, AMSlib=False): "rabbitmq-user": self.rabbitmq_user, "rabbitmq-vhost": self.rabbitmq_vhost, "rabbitmq-cert": self.rabbitmq_cert, - "rabbitmq-queue-physics": self.rabbitmq_queue_physics, + "rabbitmq-exchange-physics": self.rabbitmq_exchange_physics, + "rabbitmq-key-physics": self.rabbitmq_key_physics, "rabbitmq-exchange-training": self.rabbitmq_exchange_training, "rabbitmq-key-training": self.rabbitmq_key_training, "rabbitmq-ml-submit-queue": self.rabbitmq_ml_submit_queue, diff --git a/src/AMSWorkflow/ams/stage.py b/src/AMSWorkflow/ams/stage.py index 3442b7dd..a91a723f 100644 --- a/src/AMSWorkflow/ams/stage.py +++ b/src/AMSWorkflow/ams/stage.py @@ -262,7 +262,7 @@ class RMQDomainDataLoaderTask(Task): """ A RMQDomainDataLoaderTask consumes 'AMSMessages' from RabbitMQ bundles the data of the files into batches and forwards them to the next task waiting on the - output queuee. + output queue. Attributes: o_queue: The output queue to write the transformed messages @@ -281,14 +281,17 @@ def __init__( user, password, cert, - rmq_queue, + rmq_exchange, + rmq_routing_key, policy, - prefetch_count=1, + prefetch_count = 0, signals=[signal.SIGINT, signal.SIGUSR1], ): self.o_queue = o_queue self.cert = cert - self.rmq_queue = rmq_queue + # self.rmq_queue = rmq_queue + self.rmq_exchange = rmq_exchange + self.rmq_routing_key = rmq_routing_key self.prefetch_count = prefetch_count self.datasize_byte = 0 self.total_time_ns = 0 @@ -311,7 +314,9 @@ def __init__( user=user, password=password, cert=self.cert, - queue=self.rmq_queue, + exchange=self.rmq_exchange, + routing_key=self.rmq_routing_key, + queue="", on_message_cb=self.callback_message, on_close_cb=self.callback_close, prefetch_count=self.prefetch_count, @@ -428,7 +433,7 @@ def __init__( user: str, password: str, cert: str, - prefetch_count: int = 1, + prefetch_count: int = 0, ): self._consumers = consumers super().__init__( @@ -1061,7 +1066,8 @@ def __init__( user, password, cert, - data_queue, + exchange, + routing_key, model_update_queue=None, ): """ @@ -1075,10 +1081,12 @@ def __init__( self._user = user self._password = password self._cert = Path(cert) - self._data_queue = data_queue + # self._data_queue = data_queue + self._exchange = exchange + self._routing_key = routing_key self._model_update_queue = model_update_queue - print("Received a data queue of", self._data_queue) - print("Received a model_update queue of", self._model_update_queue) + print(f"Received data from exchange {self._exchange} / rkey {self._routing_key}") + print(f"Received a model_update queue of {self._model_update_queue}") self._gracefull_shutdown = None self._o_queue = None @@ -1101,9 +1109,10 @@ def get_load_task(self, o_queue, policy): self._user, self._password, self._cert, - self._data_queue, + self._exchange, + self._routing_key, policy, - prefetch_count=1, + prefetch_count = 0, ) self._o_queue = o_queue self._gracefull_shutdown = AMSShutdown( @@ -1176,7 +1185,8 @@ def from_cli(cls, args): config.rabbitmq_user, config.rabbitmq_password, config.rabbitmq_cert, - config.rabbitmq_queue_physics, + config.rabbitmq_exchange_physics, + config.rabbitmq_key_physics, config.rabbitmq_exchange_training if args.update_rmq_models else None, ) diff --git a/src/AMSlib/AMS.cpp b/src/AMSlib/AMS.cpp index b665b36a..4d892521 100644 --- a/src/AMSlib/AMS.cpp +++ b/src/AMSlib/AMS.cpp @@ -226,11 +226,13 @@ class AMSWrap getEntry(rmq_entry, "rabbitmq-password"); std::string rmq_user = getEntry(rmq_entry, "rabbitmq-user"); std::string rmq_vhost = getEntry(rmq_entry, "rabbitmq-vhost"); - std::string rmq_out_queue = - getEntry(rmq_entry, "rabbitmq-queue-physics"); - std::string exchange = + std::string exchange_physics = + getEntry(rmq_entry, "rabbitmq-exchange-physics"); + std::string routing_key_physics = + getEntry(rmq_entry, "rabbitmq-key-physics"); + std::string exchange_ml = getEntry(rmq_entry, "rabbitmq-exchange-training"); - std::string routing_key = + std::string routing_key_ml = getEntry(rmq_entry, "rabbitmq-key-training"); bool update_surrogate = getEntry(entry, "update_surrogate"); @@ -240,13 +242,13 @@ class AMSWrap rmq_cert = getEntry(rmq_entry, "rabbitmq-cert"); CFATAL(AMS, - (exchange == "" || routing_key == "") && update_surrogate, + (exchange_ml == "" || routing_key_ml == "") && update_surrogate, "Found empty RMQ exchange / routing-key, model update is not " "possible. " "Please provide a RMQ exchange or deactivate surrogate model " "update.") - if (exchange == "" || routing_key == "") { + if (exchange_ml == "" || routing_key_ml == "") { WARNING(AMS, "Found empty RMQ exchange or routing-key, deactivating model " "update") @@ -260,9 +262,10 @@ class AMSWrap rmq_user, rmq_vhost, rmq_cert, - rmq_out_queue, - exchange, - routing_key, + exchange_physics, + routing_key_physics, + exchange_ml, + routing_key_ml, update_surrogate); } diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp index 8d375a3a..468c5dc2 100644 --- a/src/AMSlib/wf/basedb.hpp +++ b/src/AMSlib/wf/basedb.hpp @@ -934,16 +934,20 @@ class ConnectionManagerAMQP std::atomic _stop; /** @brief True if currently reconnectiong */ std::atomic _reconnecting; - std::string _queue_sender; - /** @brief name of the exchange */ - std::string _exchange; - /** @brief name of the routing binded to exchange */ - std::string _routing_key; + /** @brief Exchange name for outgoing communication */ + std::string _exchange_sender; + /** @brief Routing key for outgoing communication */ + std::string _routing_key_sender; + /** @brief name of the receiving exchange */ + std::string _exchange_receiver; + /** @brief name of the routing binded to receiving exchange */ + std::string _routing_key_receiver; /** @brief True if connection */ std::atomic _isConnected; - /** @brief Number of messages not acked / nacked */ std::atomic _nbProcessingMsg; + /** @brief Store queue name if explicit queue name requested */ + std::string _queue_name; public: ConnectionManagerAMQP(uint64_t id, @@ -953,10 +957,12 @@ class ConnectionManagerAMQP std::string service_host, int service_port, std::string rmq_cert, - std::string outbound_queue, - std::string exchange, - std::string routing_key, - bool connectionDrop = false) + std::string exchange_physics, + std::string routing_key_physics, + std::string exchange_ml, + std::string routing_key_ml, + bool connectionDrop = false, + std::string queue_name = "") : _rId(id), _address(service_host, service_port, @@ -964,12 +970,14 @@ class ConnectionManagerAMQP rmq_vhost, rmq_cert.empty() ? false : true), _stop(false), - _queue_sender(outbound_queue), - _exchange(exchange), - _routing_key(routing_key), + _exchange_sender(exchange_physics), + _exchange_receiver(exchange_ml), + _routing_key_sender(routing_key_physics), + _routing_key_receiver(routing_key_ml), _reconnecting(false), _isConnected(false), - _nbProcessingMsg(0) + _nbProcessingMsg(0), + _queue_name(queue_name) { #ifdef EVTHREAD_USE_PTHREADS_IMPLEMENTED evthread_use_pthreads(); @@ -988,8 +996,8 @@ class ConnectionManagerAMQP _address.hostname().c_str(), _address.port(), _address.vhost().c_str(), - _exchange.c_str(), - _routing_key.c_str()) + _exchange_receiver.c_str(), + _routing_key_receiver.c_str()) _base = event_base_new(); _handler = std::make_shared(_base, rmq_cert); @@ -1167,8 +1175,8 @@ class ConnectionManagerAMQP // Publish using the reliable channel if available. if (_reliableChannel) { _reliableChannel - ->publish("", - _queue_sender, + ->publish(_exchange_sender, + _routing_key_sender, reinterpret_cast(msg.dPtr.get()), msg.size) .onAck([this, msg]() { @@ -1276,24 +1284,59 @@ class ConnectionManagerAMQP _isConnected = false; }); - _channel->declareQueue(_queue_sender) - .onSuccess([](const std::string& name, - uint32_t messagecount, - uint32_t consumercount) { + _channel->declareExchange(_exchange_sender, AMQP::ExchangeType::direct) + .onSuccess([&]() { DBG(ConnectionManagerAMQP, - "declared queue: %s (messagecount=%d, " - "consumercount=%d)", - name.c_str(), - messagecount, - consumercount) - }) + "declared exchange: %s", + _exchange_sender.c_str()) + + _channel->declareQueue(_queue_name) + .onSuccess([&](const std::string& queue_name, + uint32_t messagecount, + uint32_t consumercount) { + DBG(ConnectionManagerAMQP, + "declared queue: %s (messagecount=%d, " + "consumercount=%d)", + queue_name.c_str(), + messagecount, + consumercount) + // We bind the anonymous queue to the exchange + _channel + ->bindQueue(_exchange_sender, + queue_name, + _routing_key_sender) + .onSuccess([&, queue_name]() { + DBG(ConnectionManagerAMQP, + "Bounded queue %s to exchange %s with " + "routing key = %s", + queue_name.c_str(), + _exchange_sender.c_str(), + _routing_key_sender.c_str()) + }) // bindQueue + .onError([&](const char* message) { + WARNING(ConnectionManagerAMQP, + "Error while binding queue to exchange " + "%s", + message) + _isConnected = false; + }); // bindQueue + }) //declareQueue + .onError([&](const char* message) { + WARNING(ConnectionManagerAMQP, + "Error while creating queue: " + "%s", + message) + _isConnected = false; + }); //declareQueue + }) // declareExchange .onError([&](const char* message) { WARNING(ConnectionManagerAMQP, - "Error while creating broker queue: " + "Error while creating exchange: " "%s", message) _isConnected = false; - }); + }); // declareExchange + _isConnected = true; _reliableChannel = std::make_shared>(*_channel); @@ -1401,18 +1444,8 @@ class ConnectionManagerAMQP class RMQInterface { private: - /** @brief Path of the config file (JSON) */ - std::string _config; /** @brief MPI rank (0 if no MPI support) */ uint64_t _rId; - /** @brief name of the queue to send data */ - std::string _queue_sender; - /** @brief name of the exchange to receive data */ - std::string _exchange; - /** @brief name of the routing key to receive data */ - std::string _routing_key; - /** @brief TLS certificate path */ - std::string _cacert; /** @brief Represent the ID of the last message sent */ int _msg_tag; /** @brief True if we support surrogate update */ @@ -1432,9 +1465,10 @@ class RMQInterface * @param[in] service_port The port number * @param[in] service_host URL of RabbitMQ server * @param[in] rmq_cert Path to TLS certificate - * @param[in] outbound_queue Name of the queue on which AMSlib publishes (send) messages - * @param[in] exchange Exchange for incoming messages - * @param[in] routing_key Routing key for incoming messages (must match what the AMS Python side is using) + * @param[in] exchange_physics Name of the exchange on which AMSlib publishes (send) messages + * @param[in] routing_key_physics Routing key used by AMSlib to send messages + * @param[in] exchange_ml Exchange for incoming messages + * @param[in] routing_key_ml Routing key for incoming messages (must match what the AMS Python side is using) */ void connect(std::string rmq_user, std::string rmq_password, @@ -1442,25 +1476,41 @@ class RMQInterface std::string service_host, int service_port, std::string rmq_cert, - std::string outbound_queue, - std::string exchange, - std::string routing_key, + std::string exchange_physics, + std::string routing_key_physics, + std::string exchange_ml, + std::string routing_key_ml, bool updateSurrogate) { bool amsRMQFailure = checkEnvVariable("AMS_SIMULATE_RMQ_FAILURE"); + bool amsRMQNamedQueue = checkEnvVariable("AMS_USE_NAMED_QUEUE"); CWARNING(RMQInterface, amsRMQFailure, "Simulating connetion drops") + CWARNING(RMQInterface, + amsRMQNamedQueue, + "Using named queue for RabbitMQ (slower)") + + // If the queue_name is equals to "" RabbitMQ will create a queue for us (anonymous queue) + // For debug and test we can also force RMQ to use a specific queue name which enforce message + // retention even if the other side (consumer) is not listening when we send messages + std::string queue_name = ""; + if (amsRMQNamedQueue) { + queue_name = "ams-debug-queue-" + std::to_string(_rId); + } - _publishingManager = std::make_unique(_rId, - rmq_user, - rmq_password, - rmq_vhost, - service_host, - service_port, - rmq_cert, - outbound_queue, - exchange, - routing_key, - amsRMQFailure); + _publishingManager = + std::make_unique(_rId, + rmq_user, + rmq_password, + rmq_vhost, + service_host, + service_port, + rmq_cert, + exchange_physics, + routing_key_physics, + exchange_ml, + routing_key_ml, + amsRMQFailure, + queue_name); _updateSurrogate = updateSurrogate; } @@ -1869,9 +1919,10 @@ class DBManager std::string& rmq_user, std::string& rmq_vhost, std::string& rmq_cert, - std::string& outbound_queue, - std::string& exchange, - std::string& routing_key, + std::string& exchange_physics, + std::string& routing_key_physics, + std::string& exchange_ml, + std::string& routing_key_ml, bool update_surrogate) { fs::path Path(rmq_cert); @@ -1890,9 +1941,10 @@ class DBManager host, port, rmq_cert, - outbound_queue, - exchange, - routing_key, + exchange_physics, + routing_key_physics, + exchange_ml, + routing_key_ml, update_surrogate); #else FATAL(DBManager, diff --git a/tests/AMSlib/ams_interface/CMakeLists.txt b/tests/AMSlib/ams_interface/CMakeLists.txt index 7d34ec0a..de418674 100644 --- a/tests/AMSlib/ams_interface/CMakeLists.txt +++ b/tests/AMSlib/ams_interface/CMakeLists.txt @@ -1,31 +1,33 @@ function(ADD_API_UNIT_TEST gname name cmd) - add_test(NAME ${name} COMMAND bash -c "${cmd}") + add_test(NAME ${name} COMMAND ${CMAKE_COMMAND} -E env --unset=AMS_OBJECTS bash -c "${cmd}") set_tests_properties(${name} PROPERTIES LABELS ${gname}) endfunction() function(JSON_TESTS db_type) - configure_file("${CMAKE_CURRENT_SOURCE_DIR}/json_configs/env_2_models_fs_rand_uq.json.in" "${JSON_FP}" @ONLY) - + if(NOT EXISTS "${JSON_FP}") + configure_file("${CMAKE_CURRENT_SOURCE_DIR}/json_configs/env_2_models_fs_rand_uq.json.in" "${JSON_FP}" @ONLY) + endif() + # Tests Random models with different percentages both models store to file - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random10::Random50::Double::DB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_random_50;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_random_50") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random10::Random50::Double::DB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_random_50;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_random_50") # Tests delta-uq models with different aggregation both models store to file - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::DuqMean::DuqMax::Double::DB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_uq_mean app_uq_max;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_uq_mean app_uq_max") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::DuqMean::DuqMax::Double::DB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_uq_mean app_uq_max;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_uq_mean app_uq_max") # Tests detla uq model with a random uq model both models store to files - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random::DuqMax::Double::DB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_uq_max;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_uq_max") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random::DuqMax::Double::DB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_uq_max;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_uq_max") # Tests detla uq model with no model. uq model both store to files - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random::NoModel::Double::DB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_no_model;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_no_model") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::Random::NoModel::Double::DB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_random_10 app_no_model;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_random_10 app_no_model") # Tests 2 delta uq models with no deb . uq model both store to files - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::DuqMean::DuqMax::Double::NODB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_uq_mean_ndb app_uq_max_ndb;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_uq_mean_ndb app_uq_max_ndb") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::DuqMean::DuqMax::Double::NODB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_uq_mean_ndb app_uq_max_ndb;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_uq_mean_ndb app_uq_max_ndb") # Tests null models null dbs - ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::None::None::Double::NODB::${db_type}::HOST "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_no_model_no_db app_no_model_no_db ;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_no_model_no_db app_no_model_no_db") + ADD_API_UNIT_TEST(APIEnvAPI AMS::ENV::None::None::Double::NODB::${db_type}::HOST "AMS_USE_NAMED_QUEUE=1 AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 8 \"double\" 1 512 app_no_model_no_db app_no_model_no_db ;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 8 \"double\" 512 app_no_model_no_db app_no_model_no_db") unset(AMS_DB_TEST_TYPE) unset(JSON_FP) @@ -42,7 +44,7 @@ function(CHECK_RMQ_CONFIG file) string(JSON RMQ_PORT GET ${DB_CONF} "service-port") if(NOT "${RMQ_HOST}" STREQUAL "" AND NOT "${RMQ_PORT}" STREQUAL "0") - message(STATUS "RabbitMQ config ${file}: ${RMQ_HOST}:${RMQ_PORT}") + message(STATUS "RabbitMQ config ${file}: ${RMQ_HOST}:${RMQ_PORT}") else() message(WARNING "RabbitMQ config file ${file} looks empty! Make sure to fill these fields before running the tests") endif() diff --git a/tests/AMSlib/ams_interface/json_configs/rmq.json.in b/tests/AMSlib/ams_interface/json_configs/rmq.json.in index 7aec73d8..c3156602 100644 --- a/tests/AMSlib/ams_interface/json_configs/rmq.json.in +++ b/tests/AMSlib/ams_interface/json_configs/rmq.json.in @@ -9,7 +9,8 @@ "rabbitmq-user": "", "rabbitmq-vhost": "", "rabbitmq-cert": "", - "rabbitmq-queue-physics": "", + "rabbitmq-exchange-physics": "", + "rabbitmq-key-physics": "", "rabbitmq-exchange-training": "", "rabbitmq-key-training": "" }, diff --git a/tests/AMSlib/ams_interface/verify_ete.py b/tests/AMSlib/ams_interface/verify_ete.py index fccc61e5..29257581 100644 --- a/tests/AMSlib/ams_interface/verify_ete.py +++ b/tests/AMSlib/ams_interface/verify_ete.py @@ -265,7 +265,7 @@ def from_cli(argv): return error -def get_rmq_data(ams_config, domain_names, num_iterations, timeout=1): +def get_rmq_data(ams_config, domain_names, num_iterations, timeout=10): from ams.rmq import BlockingClient, default_ams_callback rmq_json = ams_config["db"]["rmq_config"] @@ -275,13 +275,19 @@ def get_rmq_data(ams_config, domain_names, num_iterations, timeout=1): port = rmq_json["service-port"] user = rmq_json["rabbitmq-user"] password = rmq_json["rabbitmq-password"] - queue = rmq_json["rabbitmq-queue-physics"] + exchange = rmq_json["rabbitmq-exchange-physics"] + rkey = rmq_json["rabbitmq-key-physics"] cert = None if "rabbitmq-cert" in rmq_json: cert = rmq_json["rabbitmq-cert"] cert = None if cert == "" else cert with BlockingClient(host, port, vhost, user, password, cert, default_ams_callback) as client: - with client.connect(queue) as channel: + # For testing purpose we expect data from Rank 0 + # using the debug mode enabled by setting AMS_USE_NAMED_QUEUE + # This allows us to use named queue which can retain data. + # without that, running the sender before the receiver would not work. + queue = "ams-debug-queue-0" + with client.connect(exchange, rkey, queue) as channel: msgs = channel.receive(n_msg=num_iterations * len(domain_names), timeout=timeout) dns = set(domain_names)