forked from eclipse-ditto/ditto-clients-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage_request_response_handling.py
96 lines (77 loc) · 3.4 KB
/
message_request_response_handling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Copyright (c) 2022 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0
#
# SPDX-License-Identifier: EPL-2.0
import json
import sys
import threading
import time
import paho.mqtt.client as mqtt
from ditto.client import Client
from ditto.model.namespaced_id import NamespacedID
from ditto.protocol.envelope import Envelope
from ditto.protocol.things.messages import Message
from ditto.model.feature import Feature
from ditto.model.definition_id import DefinitionID
thing_id = NamespacedID().from_string("test.ns:test-name")
feature_id = "MyFeatureID"
property_id = "myProperty"
definition_id = DefinitionID().from_string("my.model.namespace:FeatureModel:1.0.0")
my_feature = Feature().with_definition(definition_id).with_property(property_id, "myValue")
req_topic = "command///req/" + str(thing_id) + "/"
message_subject = "some-command"
def send_inbox_message():
live_message = Message(thing_id).inbox(message_subject).with_payload("some_payload")
live_message_envelope = live_message.envelope(response_required=True, correlation_id="example-correlation-id")
live_message_dict = live_message_envelope.to_ditto_dict()
live_message_json = json.dumps(live_message_dict)
# wait before sending the message to make sure the client's on connect has been executed
time.sleep(5)
paho_client.publish(topic=req_topic + message_subject, payload=live_message_json)
class MyClient(Client):
def on_connect(self, ditto_client: Client):
print("Ditto client connected")
self.subscribe(self.on_message)
print("subscribed")
def on_disconnect(self, ditto_client: Client):
print("Ditto client disconnected")
self.unsubscribe(self.on_message)
print("unsubscribed")
def on_message(self, request_id: str, message: Envelope):
print("request_id: {}, envelope: {}".format(request_id, message.to_ditto_dict()))
print(message.topic.__str__())
incoming_thing_id = NamespacedID(message.topic.namespace, message.topic.entity_id)
# create an example outbox message and reply
live_message = Message(incoming_thing_id).outbox(message_subject).with_payload(
dict(a="b", x=2))
# generate the respective Envelope
response_envelope = live_message.envelope(correlation_id=message.headers.correlation_id,
response_required=False).with_status(200)
# send the reply
self.reply(request_id, response_envelope)
def on_log(self, ditto_client: Client, level, string):
print("[{}] {}".format(level, string))
ditto_client: Client = None
def paho_on_connect(client, userdata, flags, rc):
global ditto_client
ditto_client = MyClient(paho_client=client)
ditto_client.enable_logger(True)
ditto_client.connect()
try:
paho_client = mqtt.Client()
paho_client.on_connect = paho_on_connect
paho_client.connect("localhost")
inbox_message_thread = threading.Thread(target=send_inbox_message)
inbox_message_thread.start()
paho_client.loop_forever()
except KeyboardInterrupt:
print("finished")
ditto_client.disconnect()
paho_client.disconnect()
sys.exit()