-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathur3robot.py
305 lines (247 loc) · 11.3 KB
/
ur3robot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import logging
import time
from threading import Thread
from paho.mqtt import client as mqtt_client
from paho.mqtt.client import MQTT_ERR_SUCCESS
# Parameters concerning reconnection behaviour
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
class UR3Robot(object):
def __init__(self, broker, port, cmd_topic="ur3/set/cmd"):
# Hostname of broker (e.g. "urpi.local")
self.broker = broker
# Port of broker (default: 1883)
self.port = port
# Credentials if needed
# self.username = ''
# self.password = ''
# Topic where commands are published
self.cmd_topic = cmd_topic
# Global variable to store latest position vector of robot (x,y,z,ax,ay,az)
self.last_pose = None
# Global variable to store latest joints vector of robot (j1,j2,j3,j4,j5,j6)
self.last_joints = None
self.freedrive = False
# MQTT client to subscribe and publish to
self.client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2)
self._messages = list()
self.debug_timing = False
self.msg_counter = 0
self.published_messages = dict()
self.received_returns = dict()
def on_connect(client, userdata, flags, rc, properties):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
def on_disconnect(client, userdata, rc):
logging.info("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
logging.info("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
# client.username_pw_set(self.username, self.password)
self.client.on_connect = on_connect
self.client.connect(self.broker, self.port)
self.client.on_disconnect = on_disconnect
# Subscribe to the val topic to get return values
self.subscribe("ur3/get/val")
self.subscriber_thread = Thread(target=self._handle_incoming_val)
self.subscriber_thread.start()
def start(self):
self.client.loop_start()
def stop(self):
self.subscriber_thread.join(2)
self.client.loop_stop()
def publish(self, msg: str, topic: str):
"""
Publish message on given topic to MQTT broker
"""
if self.debug_timing:
if "movejPose" in msg:
mid = self.msg_counter
self.msg_counter += 1
if self.published_messages.get(topic, None) is None:
self.published_messages[topic] = dict()
if self.published_messages[topic].get(mid, None):
raise Exception("Doubled message ID!")
self.published_messages[topic][mid] = (msg, time.time())
result = self.client.publish(topic, msg)
if result.rc is not MQTT_ERR_SUCCESS:
raise Exception(f"No success publishing message! RC: {result.rc}")
def subscribe(self, topic):
"""
Subscribe to given topic on MQTT broker
"""
self.client.subscribe(topic)
def on_message(client, userdata, msg):
"""
Callback function if client receives a message from MQTT broker
Function where returned position data can be handled
"""
self._messages.append((client, userdata, msg))
self.client.on_message = on_message
def _handle_incoming_val(self):
while True:
if self._messages:
client, userdata, msg = self._messages.pop(0)
# Decode payload from received message
decoded_payload = msg.payload.decode()
# "Type check"
if "mid" in decoded_payload:
mid = int(decoded_payload.split(":")[1])
print(f"MID:{mid}")
if self.debug_timing:
self.received_returns[mid] = (msg, time.time())
elif "joints" in decoded_payload:
# Extract float values from joints string
# (e.g. from [0.1, -0.2, 0.1, 1.2, -1.3, 1.1] to x=0.1 y=-0.2, ...)
j1, j2, j3, j4, j5, j6 = [float(b) for b in decoded_payload.split(":")[1][2:-1].split(", ")]
print(f"Received joints: j1={j1}, j2={j2}, j3={j3}, j4={j4}, j5={j5}, j6={j6}")
self.last_joints = [j1, j2, j3, j4, j5, j6]
elif "pose" in decoded_payload:
# Extract float values from pose string
# (e.g. from p[0.1, -0.2, 0.1, 1.2, -1.3, 1.1] to x=0.1 y=-0.2, ...)
x, y, z, ax, ay, az = [float(b) for b in
decoded_payload.split(":")[1].split("[")[1].split("]")[0].split(", ")]
print(f"Received position: x={x}, y={y}, z={z}, ax={ax}, ay={ay}, az={az}")
self.last_position = [x, y, z, ax, ay, az]
elif "force" in decoded_payload:
# Extract float values from pose string
# (e.g. from p[0.1, -0.2, 0.1, 1.2, -1.3, 1.1] to x=0.1 y=-0.2, ...)
fx, fy, fz, tr_x, tr_y, tr_z = [float(b) for b in
decoded_payload.split(":")[1].split("[")[1].split("]")[0].split(", ")]
print(f"Received forces: Fx={fx}, Fy={fy}, Fz={fz}, TRx={tr_x}, TRy={tr_y}, TRz={tr_z}")
# self.last_force = [x, y, z, ax, ay, az]
else:
print(f"??? Received {msg.payload.decode()}")
def move_pos(self, pos):
"""
Move robo arm to pre-programmed position
"""
self.publish(f"movePos:{pos}", self.cmd_topic)
def get_pose(self):
"""
Get position vector of robo arm
"""
self.publish(f"getPose:", self.cmd_topic)
def get_joints(self):
"""
Get joint positions of robo arm
"""
self.publish(f"getJoints:", self.cmd_topic)
def get_forces(self):
"""
Get forces of robo arm
"""
self.publish(f"getForces:", self.cmd_topic)
def start_freedrive(self):
"""
Start freedrive
"""
self.freedrive = True
self.publish(f"free:start", self.cmd_topic)
def stop_freedrive(self):
"""
End freedrive
"""
self.publish(f"free:end", self.cmd_topic)
self.freedrive = False
def move_j_pose(self, pose, velocity=None, acceleration=None, blend_radius=None, jtime=None, mid=None):
"""
Move robo arm to given position (Pose)
"""
if self.freedrive:
raise Exception("Can't movej while freedrive mode is active.")
# Send velocity and acceleration
if velocity:
self.publish(f"vel:{velocity}", self.cmd_topic)
if acceleration:
self.publish(f"acc:{acceleration}", self.cmd_topic)
# Send position
self.publish(f"pose:{pose[0]},{pose[1]},{pose[2]},{pose[3]},{pose[4]},{pose[5]}", self.cmd_topic)
# If move should be blended into the next one, send blend radius
if blend_radius:
self.publish(f"blend:{blend_radius}", self.cmd_topic)
# If move should take a certain time, send time
if jtime:
self.publish(f"time:{jtime}", self.cmd_topic)
if mid:
self.publish(f"mid:{mid}", self.cmd_topic)
self.publish(
f"movejPose:{'blend' if blend_radius else ''}{',' if blend_radius and jtime else ''}{'time' if jtime else ''}",
self.cmd_topic)
def move_j_joints(self, joints, velocity=1.0, acceleration=1.0, blend_radius=None, jtime=None):
"""
Move robo arm to given position (Joints)
"""
if self.freedrive:
raise Exception("Can't movej while freedrive mode is active.")
# Send velocity and acceleration
if velocity:
self.publish(f"vel:{velocity}", self.cmd_topic)
if acceleration:
self.publish(f"acc:{acceleration}", self.cmd_topic)
# Send joints
self.publish(f"joints:{joints[0]},{joints[1]},{joints[2]},{joints[3]},{joints[4]},{joints[5]}", self.cmd_topic)
# If move should be blended into the next one, send blend radius
if blend_radius:
self.publish(f"blend:{blend_radius}", self.cmd_topic)
# If move should take a certain time, send time
if jtime:
self.publish(f"time:{jtime}", self.cmd_topic)
self.publish(
f"movejJoints:{'blend' if blend_radius else ''}{',' if blend_radius and jtime else ''}{'time' if jtime else ''}",
self.cmd_topic)
def reduce_speed(self):
"""
Reduces every movement of the robot to 10 %
"""
self.publish(f"pause:", self.cmd_topic)
def reset_speed(self):
"""
Resets the movement speed after reducing it
"""
self.publish(f"continue:", self.cmd_topic)
def evaluate_timing(self):
"""
NOTES: - send message id with movej cmd and return a message id when it reaches the script
then add it to the received returns here
"""
if not self.debug_timing:
print("Timing debug is disabled. Set UR3Robot.debug_timing to True.")
else:
print("Timing evaluation")
print("-----------------")
timings = list()
for topic in self.published_messages:
print(f"Topic: {topic}")
print(f"Published messages: {len(self.published_messages[topic])}")
if len(self.received_returns) <= 0:
print(f"Received returns: 0")
print("No timings, since no messages were received.")
else:
print(f"Received returns: {len(self.received_returns)}")
print(f"Lost returns: {len(self.published_messages[topic]) - len(self.received_returns)}")
for mid in self.published_messages[topic]:
message, timing = self.published_messages[topic][mid]
r_message, r_timing = self.received_returns.get(mid, (None, None))
if r_message is not None:
elapsed_time = r_timing - timing
timings.append(elapsed_time)
print(f"Average elapsed time: {round((sum(timings) / len(timings)) * 1000, 1)} ms")
print(f"Max elapsed time: {round((max(timings)) * 1000, 1)} ms")
print(f"Min elapsed time: {round((min(timings)) * 1000, 1)} ms")
print()