diff --git a/tinynav/core/planning_node.py b/tinynav/core/planning_node.py index c3237c3f..c9aeec77 100644 --- a/tinynav/core/planning_node.py +++ b/tinynav/core/planning_node.py @@ -51,10 +51,10 @@ def footprint_from_control(self): GO2_CONFIG = RobotConfig( name='go2', shape='square', - length=0.7, width=0.3, - camera_x=0.35, camera_y=0.0, + length=0.4, width=0.3, + camera_x=0.2, camera_y=0.0, control_x=0.0, control_y=0.0, - safety_radius=0.1, + safety_radius=0.2, ) B2_CONFIG = RobotConfig( @@ -151,11 +151,11 @@ def run_raycasting_loopy(depth_image, T_cam_to_world, grid_shape, fx, fy, cx, cy @dataclass class ObstacleConfig: - robot_z_bottom: float = -0.2 - robot_z_top: float = 0.5 + robot_z_bottom: float = -0.4 + robot_z_top: float = 0.4 occ_threshold: float = 0.1 - min_wall_span_m: float = 0.4 - dilation_cells: int = 3 + min_wall_span_m: float = 0.2 + dilation_cells: int = 2 def build_obstacle_map(occupancy_grid, origin, resolution, robot_z, config=None): @@ -184,48 +184,35 @@ def build_obstacle_map(occupancy_grid, origin, resolution, robot_z, config=None) @njit(cache=True) def generate_trajectory_library_3d( - num_samples=11, duration=2.0, dt=0.1, - acc_std=0.00001, omega_y_std_deg=20.0, - init_p=np.zeros(3), init_v=np.zeros(3), init_q=np.array([0, 0, 0, 1]) + num_samples=15, duration=3.0, dt=0.1, + init_p=np.zeros(3), init_q=np.array([0, 0, 0, 1]) ): + """Regular sampled lattice (forward-only).""" num_steps = int(duration / dt) + 1 - max_acc = 0.2 - acc_samples = np.linspace(-max_acc, max_acc, int(num_samples / 2)) - max_omega = np.pi / 8 - omega_y_samples = np.linspace(-max_omega, max_omega, num_samples) + vx_max = 0.5 + n_vx = max(3, int(num_samples / 2)) + vx_samples = np.linspace(0.0, vx_max, n_vx) + omega_y_samples = np.linspace(-np.pi / 3, np.pi / 3, num_samples) - num_samples = len(acc_samples) * len(omega_y_samples) + num_samples = len(vx_samples) * len(omega_y_samples) trajectories = np.empty((num_samples, num_steps, 7)) params = np.empty((num_samples, 2)) k = -1 - for i_acc in range(len(acc_samples)): + for i_vx in range(len(vx_samples)): for i_omega in range(len(omega_y_samples)): k += 1 - dv = acc_samples[i_acc] + vx = vx_samples[i_vx] omega_y = omega_y_samples[i_omega] p = init_p.copy() - v_world = init_v.copy() q = quat_to_matrix(init_q) traj = np.empty((num_steps, 7)) for i in range(num_steps): dq = rotvec_to_matrix(np.array([0.0, omega_y * dt, 0.0])) - v_world = (q @ dq) @ q.T @ v_world q = q @ dq - - acc_body = q.T @ v_world - norm_val = np.linalg.norm(acc_body) - if norm_val > 1e-3: - acc_body = acc_body / norm_val - else: - acc_body = np.array([0.0, 0.0, 0.0]) - acc_body = acc_body * dv - - acc_world = q @ acc_body - v_world += acc_world * dt - v_world = np.clip(v_world, -0.5, 0.5) + v_world = q @ np.array([0.0, 0.0, vx]) p += v_world * dt traj[i, :3] = p traj[i, 3:] = matrix_to_quat(q) @@ -233,10 +220,40 @@ def generate_trajectory_library_3d( for i in range(num_steps): traj[i, 2] = traj[0, 2] trajectories[k] = traj - params[k, 0] = dv + params[k, 0] = vx params[k, 1] = omega_y return trajectories, params + +def generate_predefined_trajectory_vocabularies( + duration=3.0, dt=0.1, + init_p=np.zeros(3), init_q=np.array([0, 0, 0, 1]) +): + """ + Predefined trajectory vocabularies. + """ + num_steps = int(duration / dt) + 1 + trajectories = [] + params = [] + + # constant reverse trajectory + # vx = -0.2 m/s, omega = 0 + reverse_speed = 0.2 + p = init_p.copy() + q = quat_to_matrix(init_q) + traj = np.empty((num_steps, 7), dtype=np.float64) + for i in range(num_steps): + v_world = q @ np.array([0.0, 0.0, -reverse_speed]) + p += v_world * dt + traj[i, :3] = p + traj[i, 3:] = matrix_to_quat(q) + for i in range(num_steps): + traj[i, 2] = traj[0, 2] + trajectories.append(traj) + params.append(np.array([-reverse_speed, 0.0], dtype=np.float64)) + + return np.asarray(trajectories), np.asarray(params) + @njit(cache=True) def score_trajectories_by_ESDF(trajectories, ESDF_map, origin, resolution, safety_radius=0.1, front_len=0.35, rear_len=0.35, half_w=0.15): @@ -421,6 +438,27 @@ def publish_footprint(self, T, stamp): msg.points = points self.footprint_pub.publish(msg) + def _front_obstacle_dist(self, T, obstacle_mask, max_dist=0.5): + """Distance from the robot's front face to the nearest obstacle in the forward corridor. + Scans start at the front face so the returned value matches physical clearance.""" + center = self.camera_to_robot_center(T) + fwd = T[:3, :3] @ np.array([0.0, 0.0, 1.0]) + n = (fwd[0] ** 2 + fwd[1] ** 2) ** 0.5 + fx, fy = (fwd[0] / n, fwd[1] / n) if n > 1e-6 else (1.0, 0.0) + lx, ly = -fy, fx + fl, _, hw = self.robot.footprint_from_control() + rows, cols = obstacle_mask.shape + steps = int(max_dist / self.resolution) + 1 + for step in range(steps): + d_from_face = step * self.resolution + d_from_center = fl + d_from_face + for w in (-hw, 0.0, hw): + xi = int((center[0] + fx * d_from_center + lx * w - self.origin[0]) / self.resolution) + yi = int((center[1] + fy * d_from_center + ly * w - self.origin[1]) / self.resolution) + if 0 <= xi < rows and 0 <= yi < cols and obstacle_mask[xi, yi]: + return d_from_face + return max_dist + 1.0 + def publish_obstacle_mask(self, mask, stamp): msg = OccupancyGrid() msg.header = Header() @@ -552,14 +590,16 @@ def sync_callback(self, depth_msg, odom_msg): self.publish_footprint(T, depth_msg.header.stamp) with Timer(name='traj gen', text="[{name}] Elapsed time: {milliseconds:.0f} ms"): - v_dir = T[:3, :3] @ np.array([0, 0, 1]) - magnitude = np.clip(self.smoothed_velocity, 0.05, 0.5) - init_v = v_dir * float(magnitude) + init_p = self.camera_to_robot_center(T) + init_q = np.array([odom_msg.pose.pose.orientation.x, odom_msg.pose.pose.orientation.y, odom_msg.pose.pose.orientation.z, odom_msg.pose.pose.orientation.w]) trajectories, params = generate_trajectory_library_3d( - init_p = self.camera_to_robot_center(T), - init_v = init_v, - init_q = np.array([odom_msg.pose.pose.orientation.x, odom_msg.pose.pose.orientation.y, odom_msg.pose.pose.orientation.z, odom_msg.pose.pose.orientation.w]) + init_p=init_p, init_q=init_q + ) + vocab_trajs, vocab_params = generate_predefined_trajectory_vocabularies( + init_p=init_p, init_q=init_q ) + trajectories = np.concatenate([trajectories, vocab_trajs], axis=0) + params = np.concatenate([params, vocab_params], axis=0) self.last_T = T self.last_stamp = stamp @@ -570,11 +610,25 @@ def sync_callback(self, depth_msg, odom_msg): top_indices = np.argsort(scores, kind='stable')[:top_k] with Timer(name='pub', text="[{name}] Elapsed time: {milliseconds:.0f} ms"): + front_clearance = self._front_obstacle_dist(T, obstacle_mask) + enter_threshold = 0.30 + def cost_function(traj, param, score, target_pose): + # predefined backward trajectory penalty + is_backward_traj = param[0] < 0.0 + should_reverse = front_clearance <= enter_threshold + reverse_gate_penalty = 0.0 + if should_reverse and not is_backward_traj: + reverse_gate_penalty = 1e9 + elif not should_reverse and is_backward_traj: + reverse_gate_penalty = 1e9 + + # regular trajectory penalty traj_end = np.array(traj[-1,:3]) target_end = target_pose if target_pose is not None else traj_end dist = np.linalg.norm(traj_end - target_end) - return score * 100000 + 100 * dist + 10 * abs(self.last_param[0] - param[0]) + 10 * abs(self.last_param[1] - param[1]) + + return score * 100000 + 100 * dist + 10 * abs(self.last_param[0] - param[0]) + 10 * abs(self.last_param[1] - param[1]) + reverse_gate_penalty top_k = 1 top_indices = np.argsort(np.array([cost_function(trajectories[i], params[i], scores[i], self.target_pose) for i in range(len(trajectories))]), kind='stable')[:top_k] @@ -595,7 +649,6 @@ def cost_function(traj, param, score, target_pose): for i in top_indices: for j in range(0, len(trajectories[i]), 10): x,y,z,qx,qy,qz,qw = trajectories[i][j] - pose = PoseStamped() pose.header = depth_msg.header pose.pose.position.x = x diff --git a/tinynav/platforms/cmd_vel_control.py b/tinynav/platforms/cmd_vel_control.py index 3afa4547..f6046b48 100644 --- a/tinynav/platforms/cmd_vel_control.py +++ b/tinynav/platforms/cmd_vel_control.py @@ -47,9 +47,13 @@ def __init__(self): self.path_filter_tau = 0.30 self.lookahead_steps = 1 # Static-friction compensation: very small vx often cannot move the robot. - self.min_effective_linear_speed = 0.2 + self.min_effective_linear_speed = 0.1 + self.min_effective_angular_speed = 0.1 self.linear_engage_threshold = 0.04 self.fixed_reverse_speed = 0.2 + # Hack: if path first segment points far away from robot heading, + # rotate in place instead of publishing near-zero cmd_vel. + self.force_turn_heading_threshold = np.deg2rad(80.0) self.latest_cmd = Twist() self.prev_cmd = Twist() @@ -59,7 +63,7 @@ def __init__(self): _latched_qos = QoSProfile(depth=1, durability=DurabilityPolicy.TRANSIENT_LOCAL) self.create_subscription(Bool, '/nav/paused', self._on_paused, _latched_qos) self.cmd_timer = self.create_timer(1.0 / self.cmd_rate_hz, self.cmd_timer_callback) - + def _on_paused(self, msg: Bool): self._paused = msg.data if not self._paused: @@ -96,20 +100,40 @@ def cmd_timer_callback(self): target_cmd.linear.x *= 0.3 target_cmd.angular.z *= 0.5 - # Acceleration limiting for smoother control. - max_dv = self.max_linear_acc * dt - max_dw = self.max_angular_acc * dt out = Twist() - out.linear.x = self._clamp_step(target_cmd.linear.x, self.prev_cmd.linear.x, max_dv) - out.angular.z = self._clamp_step(target_cmd.angular.z, self.prev_cmd.angular.z, max_dw) out.linear.y = 0.0 - # Dead-band: < 0.05 → 0; small positive → snap to min effective speed. - if abs(out.linear.x) < 0.05: - out.linear.x = 0.0 - elif 0 < out.linear.x < self.min_effective_linear_speed: - out.linear.x = self.min_effective_linear_speed - if abs(out.angular.z) < 0.05: + + # Reverse is a predefined planner vocabulary: straight back at fixed speed. + # Do not smooth or re-lock it here; just pass it through while stale/paused guards still work. + if target_cmd.linear.x < 0.0: + out.linear.x = target_cmd.linear.x out.angular.z = 0.0 + self.cmd_pub.publish(out) + self.prev_cmd = out + return + + # Forward/turning commands still get acceleration limiting and robot minimum-speed locks. + max_dv = self.max_linear_acc * dt + # If we just left reverse mode, do not let acceleration limiting leak another reverse command. + prev_linear_x = 0.0 if self.prev_cmd.linear.x < 0.0 else self.prev_cmd.linear.x + out.linear.x = self._clamp_step(target_cmd.linear.x, prev_linear_x, max_dv) + # Do not acceleration-limit yaw. The planner/control layer already decides the turn rate, + # and forced rotate-in-place should take effect immediately. + out.angular.z = target_cmd.angular.z + + # Linear x: robot cannot execute tiny non-zero speeds reliably. + # When engaging forward motion, snap to +min; when stopping/decaying, snap to 0. + if 0.0 < out.linear.x < self.min_effective_linear_speed: + out.linear.x = self.min_effective_linear_speed if target_cmd.linear.x >= self.min_effective_linear_speed else 0.0 + elif abs(out.linear.x) < self.min_effective_linear_speed: + out.linear.x = 0.0 + + # Angular z: same idea; tiny requested turns snap to executable min, decays snap to 0. + if 0.0 < abs(out.angular.z) < self.min_effective_angular_speed: + if abs(target_cmd.angular.z) >= self.min_effective_angular_speed: + out.angular.z = float(np.sign(target_cmd.angular.z) * self.min_effective_angular_speed) + else: + out.angular.z = 0.0 self.cmd_pub.publish(out) self.prev_cmd = out @@ -145,23 +169,41 @@ def msg2np(msg): T_robot_2 = T2 @ self.T_robot_to_camera T_robot_2_to_1 = np.linalg.inv(T_robot_1) @ T_robot_2 p = T_robot_2_to_1[:3, 3] + heading_err = float(np.arctan2(p[1], p[0])) # dt must match actual spacing between published Path poses, not raw trajectory dt. dt = self.planner_dt * self.path_pose_stride * max(1, step_idx) linear_velocity_vec = p / dt r = R.from_matrix(T_robot_2_to_1[:3, :3]) angular_velocity_vec = r.as_rotvec() / dt - vx = np.clip(linear_velocity_vec[0], -0.1, 0.5) - if vx < 0.0: + raw_vx = float(linear_velocity_vec[0]) + if raw_vx < 0.0: vx = -self.fixed_reverse_speed + else: + vx = float(np.clip(raw_vx, 0.0, 0.5)) vy = 0.0 vyaw = np.clip(angular_velocity_vec[2], -0.8, 0.8) - - # Filter planner updates to reduce visible jitter from 7-10 Hz updates. - alpha = np.clip(self.path_period_ema / (self.path_filter_tau + self.path_period_ema), 0.15, 0.75) - self.latest_cmd.linear.x = float((1.0 - alpha) * self.latest_cmd.linear.x + alpha * vx) + is_backward_segment = raw_vx < 0.0 + if is_backward_segment: + vyaw = 0.0 + + # Hack: if path first segment points >80 deg away from robot heading, + # force an in-place turn. Skip explicit backward segments because reverse + # naturally has heading_err close to +/-pi. + if (not is_backward_segment) and abs(heading_err) > self.force_turn_heading_threshold: + vx = 0.0 + vyaw = float(heading_err) + # Minimal rotate-first gate: apply only for forward motion. + elif vx > 0.0 and abs(heading_err) > 0.45: + vx = 0.0 + vyaw = float(np.clip(1.6 * heading_err, -0.6, 0.6)) + + # Store the latest target command directly. Smoothing is intentionally kept + # only in cmd_timer_callback via acceleration limiting, so planner/control + # behavior stays easy to reason about during tuning. + self.latest_cmd.linear.x = float(vx) self.latest_cmd.linear.y = float(vy) - self.latest_cmd.angular.z = float((1.0 - alpha) * self.latest_cmd.angular.z + alpha * vyaw) + self.latest_cmd.angular.z = float(vyaw) age = 0.0 if self.last_path_update_time is None else (time.monotonic() - self.last_path_update_time) self.logger.debug( f"cmd vx={self.latest_cmd.linear.x:.3f} vyaw={self.latest_cmd.angular.z:.3f} "