Skip to content

Added publish_image_as_message.py example #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: ver-2.1.0-main_scenario
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,110 @@ Python 3.7-3.9 environment required. (Data services lib don't support 3.10 yet)
1. User2 receives an Execution Report with ExecType=F on trade between **Order3** and **Order1**
1. User2 receives an Execution Report with ExecType=C on expired **Order3**

## Examples:

This repository contains some independent examples in `examples` folder.
Each example includes logic for solving specific issue and can be used as template for user scripts.

### Publish image as message

Images in `png` and `webp` formats can be published as messages into th2 storage for later viewing via `th2-rpt-viewer:5.2.9` (or higher version)

#### script location

[publish_image_as_message.py](examples/publish_image_as_message/publish_image_as_message.py)

#### requirements

The th2 components are required for uploading images

<details>
<summary>script.yml</summary>

```yaml
apiVersion: th2.exactpro.com/v2
kind: Th2Box
metadata:
name: script
spec:
imageName: dev-script
imageVersion: dev-script
type: th2-script
pins:
mq:
publishers:
- name: to_mstore_proto
attributes: [raw, publish]
extendedSettings:
k8sProbes: false
externalBox:
enabled: true
hostNetwork: false
service:
enabled: false
prometheus:
enabled: false
```
</details>

<details>
<summary>mstore-tp.yml</summary>

```yaml
apiVersion: th2.exactpro.com/v2
kind: Th2CoreBox
metadata:
name: mstore-tp
spec:
disabled: false
imageName: ghcr.io/th2-net/th2-mstore
imageVersion: 5.9.0-dev
type: th2-rpt-data-provider
pins:
mq:
subscribers:
- name: transport
attributes:
- transport-group
- subscribe
- name: proto
attributes:
- raw
- subscribe
linkTo:
- box: script
pin: to_mstore_proto
extendedSettings:
envVariables:
JAVA_TOOL_OPTIONS: >
-XX:+ExitOnOutOfMemoryError
-XX:+UseContainerSupport
-Ddatastax-java-driver.advanced.connection.init-query-timeout="5000 milliseconds"
-Ddatastax-java-driver.basic.request.timeout="5 seconds"
resources:
limits:
cpu: 1000m
memory: 1000Mi
requests:
cpu: 200m
memory: 200Mi
service:
enabled: false
```
</details>

th2 config files of the script component should be downloaded from the cluster config maps and put into `cfg` dir
* from `script-app-config` Config Map
* box.json
* mq.json
* prometheus.json
* from `rabbit-mq-external-app-config` Config Map
* rabbitMQ.json (from `rabbit-mq-external-app-config` Config Map)

#### run

```shell
export RABBITMQ_PASS=''
export PYTHONPATH="${PYTHONPATH}:$(pwd)"
python examples/publish_image_as_message/publish_image_as_message.py --config 'cfg' --image-files 'images/image.png' 'images/image.webp' --session-alias 'image'
```
78 changes: 66 additions & 12 deletions custom/support_functions.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
# Copyright 2020-2025 Exactpro (Exactpro Systems Limited)
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import copy
import logging
import random
import uuid
import time
import uuid
from pathlib import Path
from th2_grpc_act_template.act_template_pb2 import PlaceMessageRequest

from google.protobuf.timestamp_pb2 import Timestamp
from th2_common.schema.factory.common_factory import CommonFactory
from th2_common.schema.message.message_router import MessageRouter
from th2_grpc_act_template.act_service import ActService
from th2_grpc_act_template.act_template_pb2 import PlaceMessageRequest
from th2_grpc_check1 import check1_pb2
from th2_grpc_check1.check1_service import Check1Service
from th2_grpc_common.common_pb2 import (ValueFilter, FilterOperation, MessageMetadata, MessageFilter, RootMessageFilter,
ConnectionID, EventID, ListValue, Value, Message,
ListValueFilter, MessageID, Event, EventBatch)
ListValueFilter, MessageID, Event, EventBatch, RawMessage, MessageGroupBatch,
MessageGroup, AnyMessage, RawMessageMetadata, Direction)


# -----------Connection functions
Expand Down Expand Up @@ -57,15 +73,15 @@ def to_msg_body(string):
def create_event_id(factory):
start_timestamp = Timestamp()
start_timestamp.GetCurrentTime()
book_name = factory['factory'].box_configuration.book_name
scope = factory['factory'].box_configuration.box_name
book_name = factory.box_configuration.book_name
scope = factory.box_configuration.box_name
return EventID(id=str(uuid.uuid1()), book_name=book_name, scope=scope, start_timestamp=start_timestamp)


def store_event(factory, name, event_id=None, parent_id=None, body=b"", status='SUCCESS', etype=''):
new_event_id = event_id
if new_event_id is None:
new_event_id = create_event_id(factory)
new_event_id = create_event_id(factory['factory'])
submit_event(
estore=factory['estore'],
event_batch=create_event_batch(
Expand All @@ -78,7 +94,8 @@ def store_event(factory, name, event_id=None, parent_id=None, body=b"", status='
return new_event_id


def create_event_batch(report_name, event_id, parent_id=None, status='SUCCESS', body=b"", etype=''):
def create_event_batch(report_name, event_id, parent_id=None, status='SUCCESS', body=b"", etype='',
attached_message_ids: list[MessageID] = None):
current_timestamp = Timestamp()
current_timestamp.GetCurrentTime()
logging.info(f'Storing event {report_name}...')
Expand All @@ -89,11 +106,48 @@ def create_event_batch(report_name, event_id, parent_id=None, status='SUCCESS',
body=body,
type=etype,
end_timestamp=current_timestamp,
parent_id=parent_id)
event_batch = EventBatch()
event_batch.events.append(event)

return event_batch
parent_id=parent_id
)
if attached_message_ids:
for attached_message_id in attached_message_ids:
event.attached_message_ids.append(attached_message_id)

return EventBatch(events=[event])


# -------mstore functions
def submit_message(mstore: MessageRouter, message_batch: MessageGroupBatch):
logging.debug(f'Message content:{str(message_batch)}')
response = mstore.send(message_batch, 'raw')
logging.debug(f'Mstore response content:{str(response)}')


def create_message_id(factory: CommonFactory, session_alias: str, direction: Direction, sequence: int):
timestamp = Timestamp()
timestamp.GetCurrentTime()
book_name = factory.box_configuration.book_name
return MessageID(book_name=book_name, connection_id=ConnectionID(session_alias=session_alias), direction=direction,
timestamp=timestamp, sequence=sequence)


def create_raw_message(message_id: MessageID, body: bytes, properties: dict[str, str] = None,
protocol: str = None) -> RawMessage:
metadata = RawMessageMetadata(id=message_id)
if properties:
for key, value in properties.items():
metadata.properties[key] = value
if protocol:
metadata.protocol = protocol
return RawMessage(metadata=metadata, body=body)


def create_message_group_batch(*messages: RawMessage) -> MessageGroupBatch:
return MessageGroupBatch(
groups=[
MessageGroup(messages=[
AnyMessage(raw_message=message) for message in messages
])
])


# -------------act functions
Expand Down
106 changes: 106 additions & 0 deletions examples/publish_image_as_message/publish_image_as_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2025 Exactpro (Exactpro Systems Limited)
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import os
import time
from pathlib import Path

from th2_common.schema.factory.common_factory import CommonFactory
from th2_grpc_common.common_pb2 import Direction

from custom import support_functions as sf

SUPPORTED_IMAGE_EXTENSIONS = ['png', 'webp']


def verify_args(args):
if not os.path.dirname(os.path.realpath(args.config)):
raise ValueError(f"'{args.config}' path for th2 configs doesn't exist or isn't directory")

for image_file in args.image_files:
image_path: Path = Path(image_file)
if not (image_path.exists() and image_path.is_file()):
raise ValueError(f"'{image_file}' path for image doesn't exist or isn't file")

image_path_extension = image_path.suffix.lstrip('.').lower()
if image_path_extension not in SUPPORTED_IMAGE_EXTENSIONS:
raise ValueError(
f"'{image_file}' path for image hasn't got supported extension {SUPPORTED_IMAGE_EXTENSIONS}")

return args


def scenario(factory: CommonFactory, image_files: list[str], session_alias: str):
event_router = factory.event_batch_router
message_router = factory.message_group_batch_router

scenario_id = 1
sequence = time.time_ns()

root_event_id = sf.create_event_id(factory)
sf.submit_event(
estore=event_router,
event_batch=sf.create_event_batch(
report_name=f"[TS_{scenario_id}] send image as message",
event_id=root_event_id,
parent_id=None
)
)

for image_file in image_files:
sequence += 1
message_id = sf.create_message_id(factory, session_alias, Direction.FIRST, sequence)
image_path: Path = Path(image_file)
image_bytes = image_path.read_bytes()
image_path_extension = image_path.suffix.lstrip('.').lower()

sf.submit_message(
message_router, sf.create_message_group_batch(
sf.create_raw_message(message_id, image_bytes, protocol=f"image/{image_path_extension}")
)
)

event_id = sf.create_event_id(factory)
sf.submit_event(
estore=event_router,
event_batch=sf.create_event_batch(
report_name=f"Image '{image_path.name}'",
etype='UploadImage',
event_id=event_id,
parent_id=root_event_id,
attached_message_ids=[message_id]
)
)


if __name__ == '__main__':
try:
parser = argparse.ArgumentParser(description='Publish image as th2 message')
parser.add_argument('--config', required=True, type=str, help='directory with th2 config files')
parser.add_argument('--image-files', required=True, nargs='+', type=str,
help='image files in png or webp format and appropriate extension')
parser.add_argument('--session-alias', type=str, default='screenshot',
help='th2 session alias for raw message publishing')

args = verify_args(parser.parse_args())

factory = CommonFactory(config_path=args.config)
try:
scenario(factory, args.image_files, args.session_alias)
finally:
factory.close()
except Exception as e:
logging.error(e)
raise e
17 changes: 15 additions & 2 deletions run.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# Copyright 2021-2025 Exactpro (Exactpro Systems Limited)
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import datetime
Expand All @@ -20,7 +33,7 @@

def scenario(factory, parent=None):
# Storing EventID object of root Event.
report_id = sf.create_event_id(factory)
report_id = sf.create_event_id(factory['factory'])

# Initialize chain_id for script
ver1_chain = None
Expand Down Expand Up @@ -50,7 +63,7 @@ def scenario(factory, parent=None):
f"Case[TC_{scenario_id}.{case_id}]: "
f"Trader {trader1} vs trader {trader2} for instrument {instrument['SecurityID']}",
report_id, {
'case_id': sf.create_event_id(factory),
'case_id': sf.create_event_id(factory['factory']),
'Instrument': instrument['SecurityID'],
'Order1Price': instrument['Price'],
'Order1Qty': 30,
Expand Down
Loading