From 6efc6de892f43a18b03511cd719969a50ef336b1 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Wed, 12 Mar 2025 18:57:39 +0400 Subject: [PATCH 1/2] Added publish_image_as_message.py example --- custom/support_functions.py | 78 +++++++++++-- .../publish_image_as_message.py | 106 ++++++++++++++++++ run.py | 17 ++- 3 files changed, 187 insertions(+), 14 deletions(-) create mode 100644 examples/publish_image_as_message/publish_image_as_message.py diff --git a/custom/support_functions.py b/custom/support_functions.py index 0e638aa..4824819 100644 --- a/custom/support_functions.py +++ b/custom/support_functions.py @@ -1,20 +1,36 @@ +# Copyright 2020-2025 Exactpro (Exactpro Systems Limited) +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import print_function import copy import logging import random -import uuid import time +import uuid from pathlib import Path -from th2_grpc_act_template.act_template_pb2 import PlaceMessageRequest + from google.protobuf.timestamp_pb2 import Timestamp from th2_common.schema.factory.common_factory import CommonFactory +from th2_common.schema.message.message_router import MessageRouter from th2_grpc_act_template.act_service import ActService +from th2_grpc_act_template.act_template_pb2 import PlaceMessageRequest from th2_grpc_check1 import check1_pb2 from th2_grpc_check1.check1_service import Check1Service from th2_grpc_common.common_pb2 import (ValueFilter, FilterOperation, MessageMetadata, MessageFilter, RootMessageFilter, ConnectionID, EventID, ListValue, Value, Message, - ListValueFilter, MessageID, Event, EventBatch) + ListValueFilter, MessageID, Event, EventBatch, RawMessage, MessageGroupBatch, + MessageGroup, AnyMessage, RawMessageMetadata, Direction) # -----------Connection functions @@ -57,15 +73,15 @@ def to_msg_body(string): def create_event_id(factory): start_timestamp = Timestamp() start_timestamp.GetCurrentTime() - book_name = factory['factory'].box_configuration.book_name - scope = factory['factory'].box_configuration.box_name + book_name = factory.box_configuration.book_name + scope = factory.box_configuration.box_name return EventID(id=str(uuid.uuid1()), book_name=book_name, scope=scope, start_timestamp=start_timestamp) def store_event(factory, name, event_id=None, parent_id=None, body=b"", status='SUCCESS', etype=''): new_event_id = event_id if new_event_id is None: - new_event_id = create_event_id(factory) + new_event_id = create_event_id(factory['factory']) submit_event( estore=factory['estore'], event_batch=create_event_batch( @@ -78,7 +94,8 @@ def store_event(factory, name, event_id=None, parent_id=None, body=b"", status=' return new_event_id -def create_event_batch(report_name, event_id, parent_id=None, status='SUCCESS', body=b"", etype=''): +def create_event_batch(report_name, event_id, parent_id=None, status='SUCCESS', body=b"", etype='', + attached_message_ids: list[MessageID] = None): current_timestamp = Timestamp() current_timestamp.GetCurrentTime() logging.info(f'Storing event {report_name}...') @@ -89,11 +106,48 @@ def create_event_batch(report_name, event_id, parent_id=None, status='SUCCESS', body=body, type=etype, end_timestamp=current_timestamp, - parent_id=parent_id) - event_batch = EventBatch() - event_batch.events.append(event) - - return event_batch + parent_id=parent_id + ) + if attached_message_ids: + for attached_message_id in attached_message_ids: + event.attached_message_ids.append(attached_message_id) + + return EventBatch(events=[event]) + + +# -------mstore functions +def submit_message(mstore: MessageRouter, message_batch: MessageGroupBatch): + logging.debug(f'Message content:{str(message_batch)}') + response = mstore.send(message_batch, 'raw') + logging.debug(f'Mstore response content:{str(response)}') + + +def create_message_id(factory: CommonFactory, session_alias: str, direction: Direction, sequence: int): + timestamp = Timestamp() + timestamp.GetCurrentTime() + book_name = factory.box_configuration.book_name + return MessageID(book_name=book_name, connection_id=ConnectionID(session_alias=session_alias), direction=direction, + timestamp=timestamp, sequence=sequence) + + +def create_raw_message(message_id: MessageID, body: bytes, properties: dict[str, str] = None, + protocol: str = None) -> RawMessage: + metadata = RawMessageMetadata(id=message_id) + if properties: + for key, value in properties.items(): + metadata.properties[key] = value + if protocol: + metadata.protocol = protocol + return RawMessage(metadata=metadata, body=body) + + +def create_message_group_batch(*messages: RawMessage) -> MessageGroupBatch: + return MessageGroupBatch( + groups=[ + MessageGroup(messages=[ + AnyMessage(raw_message=message) for message in messages + ]) + ]) # -------------act functions diff --git a/examples/publish_image_as_message/publish_image_as_message.py b/examples/publish_image_as_message/publish_image_as_message.py new file mode 100644 index 0000000..683ea52 --- /dev/null +++ b/examples/publish_image_as_message/publish_image_as_message.py @@ -0,0 +1,106 @@ +# Copyright 2025 Exactpro (Exactpro Systems Limited) +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import os +import time +from pathlib import Path + +from th2_common.schema.factory.common_factory import CommonFactory +from th2_grpc_common.common_pb2 import Direction + +from custom import support_functions as sf + +SUPPORTED_IMAGE_EXTENSIONS = ['png', 'webp'] + + +def verify_args(args): + if not os.path.dirname(os.path.realpath(args.config)): + raise ValueError(f"'{args.config}' path for th2 configs doesn't exist or isn't directory") + + for image_file in args.image_files: + image_path: Path = Path(image_file) + if not (image_path.exists() and image_path.is_file()): + raise ValueError(f"'{image_file}' path for image doesn't exist or isn't file") + + image_path_extension = image_path.suffix.lstrip('.').lower() + if image_path_extension not in SUPPORTED_IMAGE_EXTENSIONS: + raise ValueError( + f"'{image_file}' path for image hasn't got supported extension {SUPPORTED_IMAGE_EXTENSIONS}") + + return args + + +def scenario(factory: CommonFactory, image_files: list[str], session_alias: str): + event_router = factory.event_batch_router + message_router = factory.message_group_batch_router + + scenario_id = 1 + sequence = time.time_ns() + + root_event_id = sf.create_event_id(factory) + sf.submit_event( + estore=event_router, + event_batch=sf.create_event_batch( + report_name=f"[TS_{scenario_id}] send image as message", + event_id=root_event_id, + parent_id=None + ) + ) + + for image_file in image_files: + sequence += 1 + message_id = sf.create_message_id(factory, session_alias, Direction.FIRST, sequence) + image_path: Path = Path(image_file) + image_bytes = image_path.read_bytes() + image_path_extension = image_path.suffix.lstrip('.').lower() + + sf.submit_message( + message_router, sf.create_message_group_batch( + sf.create_raw_message(message_id, image_bytes, protocol=f"image/{image_path_extension}") + ) + ) + + event_id = sf.create_event_id(factory) + sf.submit_event( + estore=event_router, + event_batch=sf.create_event_batch( + report_name=f"Image '{image_path.name}'", + etype='UploadImage', + event_id=event_id, + parent_id=root_event_id, + attached_message_ids=[message_id] + ) + ) + + +if __name__ == '__main__': + try: + parser = argparse.ArgumentParser(description='Publish image as th2 message') + parser.add_argument('--config', required=True, type=str, help='directory with th2 config files') + parser.add_argument('--image-files', required=True, nargs='+', type=str, + help='image files in png or webp format and appropriate extension') + parser.add_argument('--session-alias', type=str, default='screenshot', + help='th2 session alias for raw message publishing') + + args = verify_args(parser.parse_args()) + + factory = CommonFactory(config_path=args.config) + try: + scenario(factory, args.image_files, args.session_alias) + finally: + factory.close() + except Exception as e: + logging.error(e) + raise e diff --git a/run.py b/run.py index 80e643e..e13d507 100644 --- a/run.py +++ b/run.py @@ -1,3 +1,16 @@ +# Copyright 2021-2025 Exactpro (Exactpro Systems Limited) +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import print_function import datetime @@ -20,7 +33,7 @@ def scenario(factory, parent=None): # Storing EventID object of root Event. - report_id = sf.create_event_id(factory) + report_id = sf.create_event_id(factory['factory']) # Initialize chain_id for script ver1_chain = None @@ -50,7 +63,7 @@ def scenario(factory, parent=None): f"Case[TC_{scenario_id}.{case_id}]: " f"Trader {trader1} vs trader {trader2} for instrument {instrument['SecurityID']}", report_id, { - 'case_id': sf.create_event_id(factory), + 'case_id': sf.create_event_id(factory['factory']), 'Instrument': instrument['SecurityID'], 'Order1Price': instrument['Price'], 'Order1Qty': 30, From 32cb25f8cc02bac2986fa13781d3a676c2f41bea Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Thu, 13 Mar 2025 16:28:04 +0400 Subject: [PATCH 2/2] Added readme for publish image as message example --- README.md | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/README.md b/README.md index 90ce51a..86d50ca 100644 --- a/README.md +++ b/README.md @@ -37,5 +37,110 @@ Python 3.7-3.9 environment required. (Data services lib don't support 3.10 yet) 1. User2 receives an Execution Report with ExecType=F on trade between **Order3** and **Order1** 1. User2 receives an Execution Report with ExecType=C on expired **Order3** +## Examples: +This repository contains some independent examples in `examples` folder. +Each example includes logic for solving specific issue and can be used as template for user scripts. +### Publish image as message + +Images in `png` and `webp` formats can be published as messages into th2 storage for later viewing via `th2-rpt-viewer:5.2.9` (or higher version) + +#### script location + +[publish_image_as_message.py](examples/publish_image_as_message/publish_image_as_message.py) + +#### requirements + +The th2 components are required for uploading images + +
+script.yml + +```yaml +apiVersion: th2.exactpro.com/v2 +kind: Th2Box +metadata: + name: script +spec: + imageName: dev-script + imageVersion: dev-script + type: th2-script + pins: + mq: + publishers: + - name: to_mstore_proto + attributes: [raw, publish] + extendedSettings: + k8sProbes: false + externalBox: + enabled: true + hostNetwork: false + service: + enabled: false + prometheus: + enabled: false +``` +
+ +
+mstore-tp.yml + +```yaml +apiVersion: th2.exactpro.com/v2 +kind: Th2CoreBox +metadata: + name: mstore-tp +spec: + disabled: false + imageName: ghcr.io/th2-net/th2-mstore + imageVersion: 5.9.0-dev + type: th2-rpt-data-provider + pins: + mq: + subscribers: + - name: transport + attributes: + - transport-group + - subscribe + - name: proto + attributes: + - raw + - subscribe + linkTo: + - box: script + pin: to_mstore_proto + extendedSettings: + envVariables: + JAVA_TOOL_OPTIONS: > + -XX:+ExitOnOutOfMemoryError + -XX:+UseContainerSupport + -Ddatastax-java-driver.advanced.connection.init-query-timeout="5000 milliseconds" + -Ddatastax-java-driver.basic.request.timeout="5 seconds" + resources: + limits: + cpu: 1000m + memory: 1000Mi + requests: + cpu: 200m + memory: 200Mi + service: + enabled: false +``` +
+ +th2 config files of the script component should be downloaded from the cluster config maps and put into `cfg` dir +* from `script-app-config` Config Map + * box.json + * mq.json + * prometheus.json +* from `rabbit-mq-external-app-config` Config Map + * rabbitMQ.json (from `rabbit-mq-external-app-config` Config Map) + +#### run + +```shell +export RABBITMQ_PASS='' +export PYTHONPATH="${PYTHONPATH}:$(pwd)" +python examples/publish_image_as_message/publish_image_as_message.py --config 'cfg' --image-files 'images/image.png' 'images/image.webp' --session-alias 'image' +``` \ No newline at end of file