Skip to content

Commit ac7f409

Browse files
committed
More attempts to replicate cpp implementation
1 parent 562f1d4 commit ac7f409

File tree

1 file changed

+90
-84
lines changed

1 file changed

+90
-84
lines changed

rclpy/rclpy/action/client.py

+90-84
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,10 @@ def __init__(
182182
self._node.add_waitable(self)
183183
self._logger = self._node.get_logger().get_child('action_client')
184184

185-
self._lock_goal_request = threading.Lock()
186-
self._lock_cancel_request = threading.Lock()
187-
self._lock_result_request = threading.Lock()
188-
self._lock_feedback = threading.Lock()
189-
self._lock_status = threading.Lock()
185+
self._lock_goal_handles = threading.Lock()
186+
self._lock_goal_requests = threading.Lock()
187+
self._lock_cancel_requests = threading.Lock()
188+
self._lock_result_requests = threading.Lock()
190189

191190
def _generate_random_uuid(self):
192191
return UUID(uuid=list(uuid.uuid4().bytes))
@@ -216,21 +215,24 @@ def _remove_pending_request(self, future, pending_requests):
216215
return None
217216

218217
def _remove_pending_goal_request(self, future):
219-
seq = self._remove_pending_request(future, self._pending_goal_requests)
220-
if seq in self._goal_sequence_number_to_goal_id:
221-
del self._goal_sequence_number_to_goal_id[seq]
218+
with self._lock_goal_requests:
219+
seq = self._remove_pending_request(future, self._pending_goal_requests)
220+
if seq in self._goal_sequence_number_to_goal_id:
221+
del self._goal_sequence_number_to_goal_id[seq]
222222

223223
def _remove_pending_cancel_request(self, future):
224-
self._remove_pending_request(future, self._pending_cancel_requests)
224+
with self._lock_cancel_requests:
225+
self._remove_pending_request(future, self._pending_cancel_requests)
225226

226227
def _remove_pending_result_request(self, future):
227-
seq = self._remove_pending_request(future, self._pending_result_requests)
228-
if seq in self._result_sequence_number_to_goal_id:
229-
goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid)
230-
del self._result_sequence_number_to_goal_id[seq]
231-
# remove feeback_callback if user is aware of result and it's been received
232-
if goal_uuid in self._feedback_callbacks:
233-
del self._feedback_callbacks[goal_uuid]
228+
with self._lock_result_requests:
229+
seq = self._remove_pending_request(future, self._pending_result_requests)
230+
if seq in self._result_sequence_number_to_goal_id:
231+
goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid)
232+
del self._result_sequence_number_to_goal_id[seq]
233+
# remove feeback_callback if user is aware of result and it's been received
234+
if goal_uuid in self._feedback_callbacks:
235+
del self._feedback_callbacks[goal_uuid]
234236

235237
# Start Waitable API
236238
def is_ready(self, wait_set):
@@ -247,39 +249,39 @@ def take_data(self):
247249
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
248250
data = {}
249251
if self._is_goal_response_ready:
250-
with self._lock_goal_request:
252+
with self._node.handle:
251253
taken_data = self._client_handle.take_goal_response(
252254
self._action_type.Impl.SendGoalService.Response)
253255
# If take fails, then we get (None, None)
254256
if all(taken_data):
255257
data['goal'] = taken_data
256258

257259
if self._is_cancel_response_ready:
258-
with self._lock_cancel_request:
260+
with self._node.handle:
259261
taken_data = self._client_handle.take_cancel_response(
260262
self._action_type.Impl.CancelGoalService.Response)
261263
# If take fails, then we get (None, None)
262264
if all(taken_data):
263265
data['cancel'] = taken_data
264266

265267
if self._is_result_response_ready:
266-
with self._lock_result_request:
268+
with self._node.handle:
267269
taken_data = self._client_handle.take_result_response(
268270
self._action_type.Impl.GetResultService.Response)
269271
# If take fails, then we get (None, None)
270272
if all(taken_data):
271273
data['result'] = taken_data
272274

273275
if self._is_feedback_ready:
274-
with self._lock_feedback:
276+
with self._node.handle:
275277
taken_data = self._client_handle.take_feedback(
276278
self._action_type.Impl.FeedbackMessage)
277279
# If take fails, then we get None
278280
if taken_data is not None:
279281
data['feedback'] = taken_data
280282

281283
if self._is_status_ready:
282-
with self._lock_status:
284+
with self._node.handle:
283285
taken_data = self._client_handle.take_status(
284286
self._action_type.Impl.GoalStatusMessage)
285287
# If take fails, then we get None
@@ -296,46 +298,50 @@ async def execute(self, taken_data):
296298
call any user-defined callbacks (e.g. feedback).
297299
"""
298300
if 'goal' in taken_data:
299-
sequence_number, goal_response = taken_data['goal']
300-
if sequence_number in self._goal_sequence_number_to_goal_id:
301-
goal_handle = ClientGoalHandle(
302-
self,
303-
self._goal_sequence_number_to_goal_id[sequence_number],
304-
goal_response)
305-
306-
if goal_handle.accepted:
307-
goal_uuid = bytes(goal_handle.goal_id.uuid)
308-
if goal_uuid in self._goal_handles:
309-
raise RuntimeError(
310-
'Two goals were accepted with the same ID ({})'.format(goal_handle))
311-
self._goal_handles[goal_uuid] = weakref.ref(goal_handle)
312-
313-
self._pending_goal_requests[sequence_number].set_result(goal_handle)
314-
else:
315-
self._logger.warning(
316-
'Ignoring unexpected goal response. There may be more than '
317-
f"one action server for the action '{self._action_name}'"
318-
)
301+
with self._lock_goal_requests:
302+
sequence_number, goal_response = taken_data['goal']
303+
if sequence_number in self._goal_sequence_number_to_goal_id:
304+
goal_handle = ClientGoalHandle(
305+
self,
306+
self._goal_sequence_number_to_goal_id[sequence_number],
307+
goal_response)
308+
309+
if goal_handle.accepted:
310+
goal_uuid = bytes(goal_handle.goal_id.uuid)
311+
with self._lock_goal_handles:
312+
if goal_uuid in self._goal_handles:
313+
raise RuntimeError(
314+
'Two goals were accepted with the same ID ({})'.format(goal_handle))
315+
self._goal_handles[goal_uuid] = weakref.ref(goal_handle)
316+
317+
self._pending_goal_requests[sequence_number].set_result(goal_handle)
318+
else:
319+
self._logger.warning(
320+
'Ignoring unexpected goal response. There may be more than '
321+
f"one action server for the action '{self._action_name}'"
322+
)
319323

320324
if 'cancel' in taken_data:
321-
sequence_number, cancel_response = taken_data['cancel']
322-
if sequence_number in self._pending_cancel_requests:
323-
self._pending_cancel_requests[sequence_number].set_result(cancel_response)
324-
else:
325-
self._logger.warning(
326-
'Ignoring unexpected cancel response. There may be more than '
327-
f"one action server for the action '{self._action_name}'"
328-
)
325+
with self._lock_cancel_requests:
326+
sequence_number, cancel_response = taken_data['cancel']
327+
if sequence_number in self._pending_cancel_requests:
328+
self._pending_cancel_requests[sequence_number].set_result(cancel_response)
329+
else:
330+
self._logger.warning(
331+
'Ignoring unexpected cancel response. There may be more than '
332+
f"one action server for the action '{self._action_name}'"
333+
)
329334

330335
if 'result' in taken_data:
331-
sequence_number, result_response = taken_data['result']
332-
if sequence_number in self._pending_result_requests:
333-
self._pending_result_requests[sequence_number].set_result(result_response)
334-
else:
335-
self._logger.warning(
336-
'Ignoring unexpected result response. There may be more than '
337-
f"one action server for the action '{self._action_name}'"
338-
)
336+
with self._lock_result_requests:
337+
sequence_number, result_response = taken_data['result']
338+
if sequence_number in self._pending_result_requests:
339+
self._pending_result_requests[sequence_number].set_result(result_response)
340+
else:
341+
self._logger.warning(
342+
'Ignoring unexpected result response. There may be more than '
343+
f"one action server for the action '{self._action_name}'"
344+
)
339345

340346
if 'feedback' in taken_data:
341347
feedback_msg = taken_data['feedback']
@@ -349,19 +355,19 @@ async def execute(self, taken_data):
349355
for status_msg in taken_data['status'].status_list:
350356
goal_uuid = bytes(status_msg.goal_info.goal_id.uuid)
351357
status = status_msg.status
352-
353-
if goal_uuid in self._goal_handles:
354-
goal_handle = self._goal_handles[goal_uuid]()
355-
if goal_handle is not None:
356-
goal_handle._status = status
357-
# Remove "done" goals from the list
358-
if (GoalStatus.STATUS_SUCCEEDED == status or
359-
GoalStatus.STATUS_CANCELED == status or
360-
GoalStatus.STATUS_ABORTED == status):
358+
with self._lock_goal_handles:
359+
if goal_uuid in self._goal_handles:
360+
goal_handle = self._goal_handles[goal_uuid]()
361+
if goal_handle is not None:
362+
goal_handle._status = status
363+
# Remove "done" goals from the list
364+
if (GoalStatus.STATUS_SUCCEEDED == status or
365+
GoalStatus.STATUS_CANCELED == status or
366+
GoalStatus.STATUS_ABORTED == status):
367+
del self._goal_handles[goal_uuid]
368+
else:
369+
# Weak reference is None
361370
del self._goal_handles[goal_uuid]
362-
else:
363-
# Weak reference is None
364-
del self._goal_handles[goal_uuid]
365371

366372
def get_num_entities(self):
367373
"""Return number of each type of entity used in the wait set."""
@@ -370,7 +376,8 @@ def get_num_entities(self):
370376

371377
def add_to_wait_set(self, wait_set):
372378
"""Add entities to wait set."""
373-
self._client_handle.add_to_waitset(wait_set)
379+
with self._node.handle:
380+
self._client_handle.add_to_waitset(wait_set)
374381

375382
def __enter__(self):
376383
return self._client_handle.__enter__()
@@ -449,23 +456,22 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
449456
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
450457
request.goal = goal
451458
future = Future()
452-
with self._lock_goal_request:
459+
with self._lock_goal_requests:
453460
sequence_number = self._client_handle.send_goal_request(request)
454461
if sequence_number in self._pending_goal_requests:
455462
raise RuntimeError(
456463
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
457464
self._pending_goal_requests[sequence_number] = future
458465
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
466+
future.add_done_callback(self._remove_pending_goal_request)
467+
# Add future so executor is aware
468+
self.add_future(future)
459469

460470
if feedback_callback is not None:
461471
# TODO(jacobperron): Move conversion function to a general-use package
462472
goal_uuid = bytes(request.goal_id.uuid)
463473
self._feedback_callbacks[goal_uuid] = feedback_callback
464474

465-
future.add_done_callback(self._remove_pending_goal_request)
466-
# Add future so executor is aware
467-
self.add_future(future)
468-
469475
return future
470476

471477
def _cancel_goal(self, goal_handle):
@@ -508,16 +514,15 @@ def _cancel_goal_async(self, goal_handle):
508514
cancel_request = CancelGoal.Request()
509515
cancel_request.goal_info.goal_id = goal_handle.goal_id
510516
future = Future()
511-
with self._lock_cancel_request:
517+
with self._lock_cancel_requests:
512518
sequence_number = self._client_handle.send_cancel_request(cancel_request)
513519
if sequence_number in self._pending_cancel_requests:
514520
raise RuntimeError(
515521
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
516522
self._pending_cancel_requests[sequence_number] = future
517-
518-
future.add_done_callback(self._remove_pending_cancel_request)
519-
# Add future so executor is aware
520-
self.add_future(future)
523+
future.add_done_callback(self._remove_pending_cancel_request)
524+
# Add future so executor is aware
525+
self.add_future(future)
521526

522527
return future
523528

@@ -561,17 +566,18 @@ def _get_result_async(self, goal_handle):
561566
result_request = self._action_type.Impl.GetResultService.Request()
562567
result_request.goal_id = goal_handle.goal_id
563568
future = Future()
564-
with self._lock_result_request:
569+
with self._lock_result_requests:
565570
sequence_number = self._client_handle.send_result_request(result_request)
566571
if sequence_number in self._pending_result_requests:
567572
raise RuntimeError(
568573
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
574+
569575
self._pending_result_requests[sequence_number] = future
570576
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
571577

572-
future.add_done_callback(self._remove_pending_result_request)
573-
# Add future so executor is aware
574-
self.add_future(future)
578+
future.add_done_callback(self._remove_pending_result_request)
579+
# Add future so executor is aware
580+
self.add_future(future)
575581

576582
return future
577583

0 commit comments

Comments
 (0)