diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index ffb6760..619ccae 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -7,7 +7,7 @@ set -o xtrace script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" readonly script_dir echo "[INFO] script_dir: '$script_dir'" -readonly rabbitmq_image=rabbitmq:4.1.0-management +readonly rabbitmq_image=rabbitmq:4.2-rc-management readonly docker_name_prefix='rabbitmq-amqp-python-client' diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py index 9f5c2dc..1365866 100644 --- a/examples/streams_with_filters/example_streams_with_filters.py +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -72,7 +72,7 @@ def main() -> None: See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions """ - queue_name = "stream-example-with_filtering-queue" + queue_name = "stream-example-with-message-properties-filter-queue" logger.info("Creating connection") environment = Environment("amqp://guest:guest@localhost:5672/") connection = create_connection(environment) diff --git a/examples/streams_with_sql_filters/example_streams_with_sql_filters.py b/examples/streams_with_sql_filters/example_streams_with_sql_filters.py new file mode 100644 index 0000000..435b8f9 --- /dev/null +++ b/examples/streams_with_sql_filters/example_streams_with_sql_filters.py @@ -0,0 +1,144 @@ +# type: ignore +import logging + +from rabbitmq_amqp_python_client import ( + AddressHelper, + AMQPMessagingHandler, + Connection, + ConnectionClosed, + Converter, + Environment, + Event, + Message, + OffsetSpecification, + StreamConsumerOptions, + StreamFilterOptions, + StreamSpecification, +) + +MESSAGES_TO_PUBLISH = 100 + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_amqp_message(self, event: Event): + # only messages with banana filters and with subject yellow + # and application property from = italy get received + self._count = self._count + 1 + logger.info( + "Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format( + Converter.bytes_to_string(event.message.body), + event.message.subject, + event.message.application_properties, + self._count, + ) + ) + self.delivery_context.accept(event) + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection(environment: Environment) -> Connection: + connection = environment.connection() + connection.dial() + + return connection + + +logging.basicConfig() +logger = logging.getLogger("[streams_with_filters]") +logger.setLevel(logging.INFO) + + +def main() -> None: + """ + In this example we create a stream queue and a consumer with SQL filter + + See: https://www.rabbitmq.com/docs/next/stream-filtering#stage-2-amqp-filter-expressions + """ + + queue_name = "stream-example-with-sql-filter-queue" + logger.info("Creating connection") + environment = Environment("amqp://guest:guest@localhost:5672/") + connection = create_connection(environment) + management = connection.management() + # delete the queue if it exists + management.delete_queue(queue_name) + # create a stream queue + management.declare_queue(StreamSpecification(name=queue_name)) + + addr_queue = AddressHelper.queue_address(queue_name) + + consumer_connection = create_connection(environment) + sql = ( + "properties.subject LIKE '%in_the_filter%' " + "AND a_in_the_filter_key = 'a_in_the_filter_value'" + ) + + consumer = consumer_connection.consumer( + addr_queue, + message_handler=MyMessageHandler(), + stream_consumer_options=StreamConsumerOptions( + offset_specification=OffsetSpecification.first, + filter_options=StreamFilterOptions(sql=sql), + ), + ) + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + + # print("create a publisher and publish a test message") + publisher = connection.publisher(addr_queue) + + # publish messages won't match the filter + for i in range(MESSAGES_TO_PUBLISH): + publisher.publish(Message(Converter.string_to_bytes(body="apple: " + str(i)))) + + # publish messages that will match the filter + for i in range(MESSAGES_TO_PUBLISH): + msqMatch = Message( + body=Converter.string_to_bytes("the_right_one_sql"), + # will match due of % + subject="something_in_the_filter_{}".format(i), + application_properties={"a_in_the_filter_key": "a_in_the_filter_value"}, + ) + publisher.publish(msqMatch) + + publisher.close() + + while True: + try: + consumer.run() + except KeyboardInterrupt: + pass + except ConnectionClosed: + print("connection closed") + continue + except Exception as e: + print("consumer exited for exception " + str(e)) + + break + + # + logger.info("consumer exited, deleting queue") + management.delete_queue(queue_name) + + print("closing connections") + management.close() + print("after management closing") + environment.close() + print("after connection closing") + + +if __name__ == "__main__": + main() diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 5423006..54491ba 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -7,6 +7,8 @@ from .exceptions import ValidationCodeException from .qpid.proton._data import Described, symbol +SQL_FILTER = "sql-filter" +AMQP_SQL_FILTER = "amqp:sql-filter" STREAM_FILTER_SPEC = "rabbitmq:stream-filter" STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" @@ -159,7 +161,6 @@ class MessageProperties: Attributes: message_id: Uniquely identifies a message within the system (int, UUID, bytes, or str). user_id: Identity of the user responsible for producing the message. - to: Intended destination node of the message. subject: Summary information about the message content and purpose. reply_to: Address of the node to send replies to. correlation_id: Client-specific id for marking or identifying messages (int, UUID, bytes, or str). @@ -174,7 +175,6 @@ class MessageProperties: message_id: Optional[Union[int, str, bytes]] = None user_id: Optional[bytes] = None - to: Optional[str] = None subject: Optional[str] = None reply_to: Optional[str] = None correlation_id: Optional[Union[int, str, bytes]] = None @@ -245,20 +245,24 @@ def __init__( if offset_specification is not None: self._offset(offset_specification) - if filter_options is not None and filter_options.values is not None: + if filter_options is None: + return + + if filter_options.values is not None: self._filter_values(filter_options.values) - if filter_options is not None and filter_options.match_unfiltered: + if filter_options.match_unfiltered: self._filter_match_unfiltered(filter_options.match_unfiltered) - if filter_options is not None and filter_options.message_properties is not None: + if filter_options.message_properties is not None: self._filter_message_properties(filter_options.message_properties) - if ( - filter_options is not None - and filter_options.application_properties is not None - ): + + if filter_options.application_properties is not None: self._filter_application_properties(filter_options.application_properties) + if filter_options.sql is not None and filter_options.sql != "": + self._filter_sql(filter_options.sql) + def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ Set the offset specification for the stream. @@ -334,6 +338,15 @@ def _filter_application_properties( Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop) ) + def _filter_sql(self, sql: str) -> None: + """ + Set SQL filter for the stream. + + Args: + sql: SQL string to apply as a filter + """ + self._filter_set[symbol(SQL_FILTER)] = Described(symbol(AMQP_SQL_FILTER), sql) + def filter_set(self) -> Dict[symbol, Described]: """ Get the current filter set configuration. diff --git a/tests/test_streams.py b/tests/test_streams.py index a3b49f2..2e503fb 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -54,10 +54,9 @@ def test_stream_read_from_last_default( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_read_from_last( @@ -91,10 +90,9 @@ def test_stream_read_from_last( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_read_from_offset_zero( @@ -128,10 +126,9 @@ def test_stream_read_from_offset_zero( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_read_from_offset_first( @@ -165,10 +162,9 @@ def test_stream_read_from_offset_first( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_read_from_offset_ten( @@ -203,10 +199,9 @@ def test_stream_read_from_offset_ten( # this will finish after 10 messages read except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_filtering(connection: Connection, environment: Environment) -> None: @@ -240,10 +235,9 @@ def test_stream_filtering(connection: Connection, environment: Environment) -> N # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_filtering_mixed( @@ -281,10 +275,9 @@ def test_stream_filtering_mixed( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_filtering_not_present( @@ -362,10 +355,9 @@ def test_stream_match_unfiltered( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) def test_stream_reconnection( @@ -403,10 +395,9 @@ def test_stream_reconnection( # ack to terminate the consumer except ConsumerTestException: pass - - consumer.close() - - management.delete_queue(stream_name) + finally: + consumer.close() + management.delete_queue(stream_name) class MyMessageHandlerMessagePropertiesFilter(AMQPMessagingHandler): @@ -468,11 +459,10 @@ def test_stream_filter_message_properties( # ack to terminate the consumer except ConsumerTestException: pass - - if consumer is not None: - consumer.close() - - management.delete_queue(stream_name) + finally: + if consumer is not None: + consumer.close() + management.delete_queue(stream_name) class MyMessageHandlerApplicationPropertiesFilter(AMQPMessagingHandler): @@ -529,11 +519,86 @@ def test_stream_filter_application_properties( # ack to terminate the consumer except ConsumerTestException: pass + finally: + if consumer is not None: + consumer.close() + management.delete_queue(stream_name) - if consumer is not None: - consumer.close() +class MyMessageHandlerSQLFilter(AMQPMessagingHandler): + def __init__( + self, + ): + super().__init__() + + def on_message(self, event: Event): + self.delivery_context.accept(event) + assert event.message.body == Converter.string_to_bytes("the_right_one_sql") + assert event.message.subject == "something_in_the_filter" + assert event.message.reply_to == "the_reply_to" + assert ( + event.message.application_properties["a_in_the_filter_key"] + == "a_in_the_filter_value" + ) + + raise ConsumerTestException("consumed") + + +def test_stream_filter_sql(connection: Connection, environment: Environment) -> None: + consumer = None + stream_name = "test_stream_filter_sql" + messages_to_send = 30 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() management.delete_queue(stream_name) + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + sql = ( + "properties.subject LIKE '%in_the_filter%' AND properties.reply_to = 'the_reply_to' " + "AND a_in_the_filter_key = 'a_in_the_filter_value'" + ) + try: + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + message_handler=MyMessageHandlerSQLFilter(), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions(sql=sql) + ), + ) + publisher = connection.publisher(addr_queue) + # won't match + for i in range(messages_to_send): + msg = Message( + body=Converter.string_to_bytes("hello_{}".format(i)), + ) + publisher.publish(msg) + + # the only one that will match + msqMatch = Message( + body=Converter.string_to_bytes("the_right_one_sql"), + subject="something_in_the_filter", + reply_to="the_reply_to", + application_properties={"a_in_the_filter_key": "a_in_the_filter_value"}, + ) + + publisher.publish(msqMatch) + + publisher.close() + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + finally: + if consumer is not None: + consumer.close() + management.delete_queue(stream_name) class MyMessageHandlerMixingDifferentFilters(AMQPMessagingHandler): @@ -605,8 +670,7 @@ def test_stream_filter_mixing_different( # ack to terminate the consumer except ConsumerTestException: pass - - if consumer is not None: - consumer.close() - - management.delete_queue(stream_name) + finally: + if consumer is not None: + consumer.close() + management.delete_queue(stream_name)