diff --git a/onboard/src/task_planning/task_planning/interface/cv.py b/onboard/src/task_planning/task_planning/interface/cv.py index 690b6609..c811e04d 100644 --- a/onboard/src/task_planning/task_planning/interface/cv.py +++ b/onboard/src/task_planning/task_planning/interface/cv.py @@ -149,8 +149,9 @@ def __init__(self, node: Node, bypass: bool = False) -> None: ) # Subscribe to angle topics - self._angles: dict[CVObjectType, float] = dict.fromkeys(self.ANGLE_TOPICS, 0) - self._angle_queues: dict[CVObjectType, list[float]] = {object_type: [] for object_type in self.ANGLE_TOPICS} + self._angles: dict[CVObjectType, float] = dict.fromkeys(self.ANGLE_TOPICS | self.BOUNDING_BOX_TOPICS, 0) + self._angle_queues: dict[CVObjectType, list[float]] = { + object_type: [] for object_type in self.ANGLE_TOPICS | self.BOUNDING_BOX_TOPICS} for object_type, object_topic in self.ANGLE_TOPICS.items(): node.create_subscription( Float64, @@ -183,16 +184,13 @@ def lane_marker_data(self) -> dict: """The dictionary containing lane marker-specific data.""" return self._lane_marker_data - def _on_receive_bounding_box_data(self, cv_data: CVObject, object_type: CVObjectType, filter_len: int = 10) -> None: + def _on_receive_bounding_box_data(self, cv_data: CVObject, object_type: CVObjectType) -> None: """ Store the received CV bounding box. Args: cv_data (CVObject): The received CV data. object_type (CVObjectType): The name/type of the object. - filter_len (int, optional): The maximum number of distance data points to retain - for the moving average filter. Defaults to 10. - """ # Special filtering for TORPEDO_BANNER if object_type == CVObjectType.TORPEDO_BANNER: @@ -221,9 +219,12 @@ def _on_receive_bounding_box_data(self, cv_data: CVObject, object_type: CVObject self._lane_marker_data['touching_top'] = cv_data.coords.y - cv_data.height / 2 <= 0 self._lane_marker_data['touching_bottom'] = cv_data.coords.y + cv_data.height / 2 >= self.FRAME_HEIGHT - if object_type == CVObjectType.PATH_MARKER: - self._angles[object_type] = self.update_moving_average(self._angle_queues[object_type], - cv_data.yaw, filter_len) + # self._angles[object_type] = self.update_moving_average(self._angle_queues[object_type], + # cv_data.yaw, filter_len) + if object_type not in self._angles: + self._angles[object_type] = cv_data.yaw + else: + self._angles[object_type] = self.update_exponential_moving_average(self._angles[object_type], cv_data.yaw) def _on_receive_distance_data(self, distance_data: Point, object_type: CVObjectType, filter_len: int = 10) -> None: """ @@ -302,6 +303,20 @@ def update_moving_average(self, queue: list[float], new_value: float, filter_len return sum(queue) / len(queue) + def update_exponential_moving_average(self, old_value: float, new_value: float, smoothing_k: float = 0.2) -> float: + """ + Update the exponential moving average filter with a new value. + + Args: + old_value (lifloatst): The most recent value output. + new_value (float): The new data point. + smoothing_k (float, optional): The smoothing factor. Higher prioritizes newer values + + Returns: + float: The new moving average. + """ + return old_value * (1 - smoothing_k) + new_value * smoothing_k if old_value != 0.0 else new_value + def get_pose(self, name: CVObjectType) -> Pose: """ Get the pose of a detected object. @@ -383,4 +398,3 @@ def get_sonar_sweep_params(self, name: CVObjectType) -> tuple[float, float, floa data = self._bounding_boxes[name] return (data.sonar_start_angle, data.sonar_end_angle, data.sonar_scan_distance) - diff --git a/onboard/src/task_planning/task_planning/robot/oogway.py b/onboard/src/task_planning/task_planning/robot/oogway.py index cbaff9ae..535ce54f 100644 --- a/onboard/src/task_planning/task_planning/robot/oogway.py +++ b/onboard/src/task_planning/task_planning/robot/oogway.py @@ -8,6 +8,7 @@ from task_planning.tasks import ( buoyancy_tasks, comp_tasks, + cv_tasks, ivc_tasks, move_tasks, prequal_tasks, @@ -28,10 +29,9 @@ async def main(self: Task) -> Task[None, None, None]: tasks = [ ######## Main competition tasks ######## # ivc_tasks.delineate_ivc_log(parent=self), - comp_tasks.initial_submerge(DEPTH, parent=self), + # comp_tasks.initial_submerge(DEPTH, parent=self), # comp_tasks.gate_task_dead_reckoning(depth_level=-DEPTH, parent=self), - comp_tasks.torpedo_task(first_target=FIRST_TARGET, depth_level=DEPTH, - direction=DIRECTION_OF_TORPEDO_BANNER, parent=self), + cv_tasks.yaw_to_cv_obj(FIRST_TARGET, parent=self), # TODO: task not found??? # comp_tasks.send_torpedo_ivc(parent=self), # comp_tasks.octagon_task(direction=1, parent=self), diff --git a/onboard/src/task_planning/task_planning/tasks/cv_tasks.py b/onboard/src/task_planning/task_planning/tasks/cv_tasks.py index e8cea0bc..9b9fb140 100644 --- a/onboard/src/task_planning/task_planning/tasks/cv_tasks.py +++ b/onboard/src/task_planning/task_planning/tasks/cv_tasks.py @@ -1,5 +1,10 @@ +import math + +from geometry_msgs.msg import Twist, Vector3 +from rclpy.clock import Clock from rclpy.logging import get_logger from task_planning.interface.cv import CV, CVObjectType +from task_planning.interface.state import State from task_planning.task import Task, Yield, task from task_planning.tasks import move_tasks from task_planning.tasks.move_tasks import move_to_pose_local @@ -7,6 +12,135 @@ logger = get_logger('cv_tasks') +@task +async def yaw_until_object_detection(self: Task, cv_object: CVObjectType, search_direction: int = 1, + depth_threshold: float = 0.2, depth_level: float = 0.5, + ) -> Task[None, str | None, None] | bool: + """ + Yaws until an object is detected by CV. + + Returns when the robot looking at the CV object, within a small tolerance. + + Args: + self: Task instance. + cv_object: CV class name of the object to yaw to + search_direction: If no CV object in view, which direction should it search in. + 1 for positive yaw, -1 for negative yaw. + depth_threshold: The error in depth that will cause a depth-correct call + depth_level: Desire depth level to hold throughout the task + + Send: + CV class name of new object to yaw to + """ + logger.info('Beginning yaw_util_object_detection task') + + yaw_pid_step_size = math.radians(45) + num_steps = 3 + depth_level = State().orig_depth - depth_level + + @task + async def correct_depth(self: Task) -> None: + await move_tasks.depth_correction(desired_depth=depth_level, parent=self) + + @task + async def object_search_pattern(self: Task) -> bool: + step = 0 + while step <= num_steps * 2 + 1: + if step <= num_steps - 1: + angle = yaw_pid_step_size + elif step == num_steps: + angle = -3 * yaw_pid_step_size + elif step <= num_steps * 2: + angle = -1 * yaw_pid_step_size + else: + angle = 3 * yaw_pid_step_size + logger.info('Yawed to find object more than 7 times, breaking loop.') + await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, angle * search_direction), + depth_level=depth_level, + pose_tolerances=Twist(linear=Vector3(x=0.05, y=0.05, z=0.05), + angular=Vector3(x=0.2, y=0.3, z=0.3)), + timeout=10, + parent=self) + # TODO: confirm that this actually steps and doesn't go through the whole thing when we call outer function + step += 1 + return False + + object_search_task = object_search_pattern(parent=self) + + while not CV().is_receiving_recent_cv_data(cv_object, 10): + if object_search_task.done: + return False + + if (abs(State().depth - depth_level) < depth_threshold): + await correct_depth(parent=self) + + object_search_task.step() + + return True + +@task +async def yaw_to_cv_obj(self: Task, cv_object: CVObjectType , search_direction : int = 1, + yaw_threshold : float = math.radians(10), depth_threshold : float = 0.2, + depth_level : float =0.5, pid_timeout : float = 10) -> Task[None, str | None, None] | None: + """ + Yaw to an object detected by CV. + + Returns when the robot looking at the CV object, within a small tolerance. + + Args: + self: Task instance. + cv_object: CV class name of the object to yaw to + search_direction: If no CV object in view, which direction should it search in. + 1 for positive yaw, -1 for negative yaw. + yaw_threshold: Tolerance for completing the task + depth_threshold: The error in depth that will cause a depth-correct call + depth_level: Desire depth level to hold throughout the task + pid_timeout: Time within PID loop before automatically breaking out + + Send: + CV class name of new object to yaw to + """ + depth_level = State().orig_depth - depth_level + + @task + async def correct_depth(self: Task) -> None: + await move_tasks.depth_correction(desired_depth=depth_level, parent=self) + + if not CV().is_receiving_recent_cv_data(cv_object, 10): + found_object = await yaw_until_object_detection(cv_object, search_direction, + depth_threshold, depth_level, parent=self) + if not found_object: + return + + logger.info('Starting yaw_to_cv_object') + + cv_object_yaw = CV().angles[cv_object] + #TODO: may need to multiply cv_object_yaw by a magic scaler + move_to_pose_task = move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, cv_object_yaw), + depth_level=depth_level, + pose_tolerances=Twist(linear=Vector3(x=0.05, y=0.05, z=0.05), + angular=Vector3(x=0.2, y=0.3, z=yaw_threshold)), + timout=10, + parent=self) + + starting_time = Clock().now() + + while not move_to_pose_task.done: + if abs(State().depth - depth_level) < depth_threshold: + await correct_depth(parent=self) + + move_to_pose_task.step() + + cv_object_yaw = CV().angles[cv_object] + move_to_pose_task.send(cv_object_yaw) + + if Clock.now() - starting_time > pid_timeout: + logger.info('Timeout elapsed, Finishing Yaw To CV Object') + return + + logger.info('PID Loop Complete, Finishing Yaw To CV Object') + return + # TODO: this task will likely be depleted once we complete the refactoring tasks in comp_tasks.py @task