Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions onboard/src/task_planning/task_planning/interface/cv.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ def __init__(self, node: Node, bypass: bool = False) -> None:
)

# Subscribe to angle topics
self._angles: dict[CVObjectType, float] = dict.fromkeys(self.ANGLE_TOPICS, 0)
self._angle_queues: dict[CVObjectType, list[float]] = {object_type: [] for object_type in self.ANGLE_TOPICS}
self._angles: dict[CVObjectType, float] = dict.fromkeys(self.ANGLE_TOPICS | self.BOUNDING_BOX_TOPICS, 0)
self._angle_queues: dict[CVObjectType, list[float]] = {
object_type: [] for object_type in self.ANGLE_TOPICS | self.BOUNDING_BOX_TOPICS}
for object_type, object_topic in self.ANGLE_TOPICS.items():
node.create_subscription(
Float64,
Expand Down Expand Up @@ -183,16 +184,13 @@ def lane_marker_data(self) -> dict:
"""The dictionary containing lane marker-specific data."""
return self._lane_marker_data

def _on_receive_bounding_box_data(self, cv_data: CVObject, object_type: CVObjectType, filter_len: int = 10) -> None:
def _on_receive_bounding_box_data(self, cv_data: CVObject, object_type: CVObjectType) -> None:
"""
Store the received CV bounding box.

Args:
cv_data (CVObject): The received CV data.
object_type (CVObjectType): The name/type of the object.
filter_len (int, optional): The maximum number of distance data points to retain
for the moving average filter. Defaults to 10.

"""
# Special filtering for TORPEDO_BANNER
if object_type == CVObjectType.TORPEDO_BANNER:
Expand Down Expand Up @@ -221,9 +219,12 @@ def _on_receive_bounding_box_data(self, cv_data: CVObject, object_type: CVObject
self._lane_marker_data['touching_top'] = cv_data.coords.y - cv_data.height / 2 <= 0
self._lane_marker_data['touching_bottom'] = cv_data.coords.y + cv_data.height / 2 >= self.FRAME_HEIGHT

if object_type == CVObjectType.PATH_MARKER:
self._angles[object_type] = self.update_moving_average(self._angle_queues[object_type],
cv_data.yaw, filter_len)
# self._angles[object_type] = self.update_moving_average(self._angle_queues[object_type],
# cv_data.yaw, filter_len)
if object_type not in self._angles:
self._angles[object_type] = cv_data.yaw
else:
self._angles[object_type] = self.update_exponential_moving_average(self._angles[object_type], cv_data.yaw)

def _on_receive_distance_data(self, distance_data: Point, object_type: CVObjectType, filter_len: int = 10) -> None:
"""
Expand Down Expand Up @@ -302,6 +303,20 @@ def update_moving_average(self, queue: list[float], new_value: float, filter_len

return sum(queue) / len(queue)

def update_exponential_moving_average(self, old_value: float, new_value: float, smoothing_k: float = 0.2) -> float:
"""
Update the exponential moving average filter with a new value.

Args:
old_value (lifloatst): The most recent value output.
new_value (float): The new data point.
smoothing_k (float, optional): The smoothing factor. Higher prioritizes newer values

Returns:
float: The new moving average.
"""
return old_value * (1 - smoothing_k) + new_value * smoothing_k if old_value != 0.0 else new_value

def get_pose(self, name: CVObjectType) -> Pose:
"""
Get the pose of a detected object.
Expand Down Expand Up @@ -383,4 +398,3 @@ def get_sonar_sweep_params(self, name: CVObjectType) -> tuple[float, float, floa

data = self._bounding_boxes[name]
return (data.sonar_start_angle, data.sonar_end_angle, data.sonar_scan_distance)

6 changes: 3 additions & 3 deletions onboard/src/task_planning/task_planning/robot/oogway.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from task_planning.tasks import (
buoyancy_tasks,
comp_tasks,
cv_tasks,
ivc_tasks,
move_tasks,
prequal_tasks,
Expand All @@ -28,10 +29,9 @@ async def main(self: Task) -> Task[None, None, None]:
tasks = [
######## Main competition tasks ########
# ivc_tasks.delineate_ivc_log(parent=self),
comp_tasks.initial_submerge(DEPTH, parent=self),
# comp_tasks.initial_submerge(DEPTH, parent=self),
# comp_tasks.gate_task_dead_reckoning(depth_level=-DEPTH, parent=self),
comp_tasks.torpedo_task(first_target=FIRST_TARGET, depth_level=DEPTH,
direction=DIRECTION_OF_TORPEDO_BANNER, parent=self),
cv_tasks.yaw_to_cv_obj(FIRST_TARGET, parent=self),
# TODO: task not found???
# comp_tasks.send_torpedo_ivc(parent=self),
# comp_tasks.octagon_task(direction=1, parent=self),
Expand Down
134 changes: 134 additions & 0 deletions onboard/src/task_planning/task_planning/tasks/cv_tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,146 @@
import math

from geometry_msgs.msg import Twist, Vector3
from rclpy.clock import Clock
from rclpy.logging import get_logger
from task_planning.interface.cv import CV, CVObjectType
from task_planning.interface.state import State
from task_planning.task import Task, Yield, task
from task_planning.tasks import move_tasks
from task_planning.tasks.move_tasks import move_to_pose_local
from task_planning.utils import geometry_utils

logger = get_logger('cv_tasks')

@task
async def yaw_until_object_detection(self: Task, cv_object: CVObjectType, search_direction: int = 1,
depth_threshold: float = 0.2, depth_level: float = 0.5,
) -> Task[None, str | None, None] | bool:
"""
Yaws until an object is detected by CV.

Returns when the robot looking at the CV object, within a small tolerance.

Args:
self: Task instance.
cv_object: CV class name of the object to yaw to
search_direction: If no CV object in view, which direction should it search in.
1 for positive yaw, -1 for negative yaw.
depth_threshold: The error in depth that will cause a depth-correct call
depth_level: Desire depth level to hold throughout the task

Send:
CV class name of new object to yaw to
"""
logger.info('Beginning yaw_util_object_detection task')

yaw_pid_step_size = math.radians(45)
num_steps = 3
depth_level = State().orig_depth - depth_level

@task
async def correct_depth(self: Task) -> None:
await move_tasks.depth_correction(desired_depth=depth_level, parent=self)

@task
async def object_search_pattern(self: Task) -> bool:
step = 0
while step <= num_steps * 2 + 1:
if step <= num_steps - 1:
angle = yaw_pid_step_size
elif step == num_steps:
angle = -3 * yaw_pid_step_size
elif step <= num_steps * 2:
angle = -1 * yaw_pid_step_size
else:
angle = 3 * yaw_pid_step_size
logger.info('Yawed to find object more than 7 times, breaking loop.')
await move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, angle * search_direction),
depth_level=depth_level,
pose_tolerances=Twist(linear=Vector3(x=0.05, y=0.05, z=0.05),
angular=Vector3(x=0.2, y=0.3, z=0.3)),
timeout=10,
parent=self)
# TODO: confirm that this actually steps and doesn't go through the whole thing when we call outer function
step += 1
return False

object_search_task = object_search_pattern(parent=self)

while not CV().is_receiving_recent_cv_data(cv_object, 10):
if object_search_task.done:
return False

if (abs(State().depth - depth_level) < depth_threshold):
await correct_depth(parent=self)

object_search_task.step()

return True

@task
async def yaw_to_cv_obj(self: Task, cv_object: CVObjectType , search_direction : int = 1,
yaw_threshold : float = math.radians(10), depth_threshold : float = 0.2,
depth_level : float =0.5, pid_timeout : float = 10) -> Task[None, str | None, None] | None:
"""
Yaw to an object detected by CV.

Returns when the robot looking at the CV object, within a small tolerance.

Args:
self: Task instance.
cv_object: CV class name of the object to yaw to
search_direction: If no CV object in view, which direction should it search in.
1 for positive yaw, -1 for negative yaw.
yaw_threshold: Tolerance for completing the task
depth_threshold: The error in depth that will cause a depth-correct call
depth_level: Desire depth level to hold throughout the task
pid_timeout: Time within PID loop before automatically breaking out

Send:
CV class name of new object to yaw to
"""
depth_level = State().orig_depth - depth_level

@task
async def correct_depth(self: Task) -> None:
await move_tasks.depth_correction(desired_depth=depth_level, parent=self)

if not CV().is_receiving_recent_cv_data(cv_object, 10):
found_object = await yaw_until_object_detection(cv_object, search_direction,
depth_threshold, depth_level, parent=self)
if not found_object:
return

logger.info('Starting yaw_to_cv_object')

cv_object_yaw = CV().angles[cv_object]
#TODO: may need to multiply cv_object_yaw by a magic scaler
move_to_pose_task = move_tasks.move_to_pose_local(geometry_utils.create_pose(0, 0, 0, 0, 0, cv_object_yaw),
depth_level=depth_level,
pose_tolerances=Twist(linear=Vector3(x=0.05, y=0.05, z=0.05),
angular=Vector3(x=0.2, y=0.3, z=yaw_threshold)),
timout=10,
parent=self)

starting_time = Clock().now()

while not move_to_pose_task.done:
if abs(State().depth - depth_level) < depth_threshold:
await correct_depth(parent=self)

move_to_pose_task.step()

cv_object_yaw = CV().angles[cv_object]
move_to_pose_task.send(cv_object_yaw)

if Clock.now() - starting_time > pid_timeout:
logger.info('Timeout elapsed, Finishing Yaw To CV Object')
return

logger.info('PID Loop Complete, Finishing Yaw To CV Object')
return


# TODO: this task will likely be depleted once we complete the refactoring tasks in comp_tasks.py
@task
Expand Down
Loading