Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
1fe22d8
fix(planning): stop publishing path when robot reaches goal (< 0.3m)
Apr 27, 2026
2c76cfe
fix(planning): compare camera position to target_pose for goal-reache…
Apr 27, 2026
1dc69b9
fix(nav): stop robot properly when no active nav target
Apr 28, 2026
3f47431
change dist_to_goal
Apr 28, 2026
4b8ede8
refactor(planning): remove redundant goal-reached check
Apr 28, 2026
70933de
Remove unnecessary path publishing in planning_node
xiaolefang-dm Apr 29, 2026
31237ac
refine planning and cmd_vel coordination for safer maneuvering
xiaolefang-dm Apr 29, 2026
7dd4372
fix(planning): remove front_dist topic and gate reverse in planner cost
xiaolefang-dm Apr 29, 2026
3a57d2c
chore: add trailing newlines to planning and control files
xiaolefang-dm Apr 29, 2026
5108b9c
fix(control): remove leftover merge conflict marker in cmd_vel_control
xiaolefang-dm Apr 29, 2026
a7a9e12
feat(planning): introduce predefined trajectory vocabularies and stre…
xiaolefang-dm Apr 29, 2026
e72469f
refactor(planning): enhance trajectory penalty calculations and clari…
xiaolefang-dm Apr 29, 2026
a902327
fix(planning): correct typo prefined -> predefined in trajectory voca…
xiaolefang-dm Apr 29, 2026
c6c6d4b
test(control): pass reverse commands through directly
xiaolefang-dm May 8, 2026
62f9e79
test(control): simplify cmd vel smoothing and force large heading turns
xiaolefang-dm May 9, 2026
7ca1988
Merge branch 'xiaole/test/cmd-vel-heading-turn' into xiaole/optimized…
xiaolefang-dm May 9, 2026
d721212
go2 config
xiaolefang-dm May 9, 2026
9181274
fix(control): remove yaw accel limit for cmd_vel
xiaolefang-dm May 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 93 additions & 40 deletions tinynav/core/planning_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ def footprint_from_control(self):

GO2_CONFIG = RobotConfig(
name='go2', shape='square',
length=0.7, width=0.3,
camera_x=0.35, camera_y=0.0,
length=0.4, width=0.3,
camera_x=0.2, camera_y=0.0,
control_x=0.0, control_y=0.0,
safety_radius=0.1,
safety_radius=0.2,
)

B2_CONFIG = RobotConfig(
Expand Down Expand Up @@ -151,11 +151,11 @@ def run_raycasting_loopy(depth_image, T_cam_to_world, grid_shape, fx, fy, cx, cy

@dataclass
class ObstacleConfig:
robot_z_bottom: float = -0.2
robot_z_top: float = 0.5
robot_z_bottom: float = -0.4
robot_z_top: float = 0.4
occ_threshold: float = 0.1
min_wall_span_m: float = 0.4
dilation_cells: int = 3
min_wall_span_m: float = 0.2
dilation_cells: int = 2


def build_obstacle_map(occupancy_grid, origin, resolution, robot_z, config=None):
Expand Down Expand Up @@ -184,59 +184,76 @@ def build_obstacle_map(occupancy_grid, origin, resolution, robot_z, config=None)

@njit(cache=True)
def generate_trajectory_library_3d(
num_samples=11, duration=2.0, dt=0.1,
acc_std=0.00001, omega_y_std_deg=20.0,
init_p=np.zeros(3), init_v=np.zeros(3), init_q=np.array([0, 0, 0, 1])
num_samples=15, duration=3.0, dt=0.1,
init_p=np.zeros(3), init_q=np.array([0, 0, 0, 1])
):
"""Regular sampled lattice (forward-only)."""
num_steps = int(duration / dt) + 1

max_acc = 0.2
acc_samples = np.linspace(-max_acc, max_acc, int(num_samples / 2))
max_omega = np.pi / 8
omega_y_samples = np.linspace(-max_omega, max_omega, num_samples)
vx_max = 0.5
n_vx = max(3, int(num_samples / 2))
vx_samples = np.linspace(0.0, vx_max, n_vx)
omega_y_samples = np.linspace(-np.pi / 3, np.pi / 3, num_samples)

num_samples = len(acc_samples) * len(omega_y_samples)
num_samples = len(vx_samples) * len(omega_y_samples)

trajectories = np.empty((num_samples, num_steps, 7))
params = np.empty((num_samples, 2))

k = -1
for i_acc in range(len(acc_samples)):
for i_vx in range(len(vx_samples)):
for i_omega in range(len(omega_y_samples)):
k += 1
dv = acc_samples[i_acc]
vx = vx_samples[i_vx]
omega_y = omega_y_samples[i_omega]
p = init_p.copy()
v_world = init_v.copy()
q = quat_to_matrix(init_q)
traj = np.empty((num_steps, 7))
for i in range(num_steps):
dq = rotvec_to_matrix(np.array([0.0, omega_y * dt, 0.0]))
v_world = (q @ dq) @ q.T @ v_world
q = q @ dq

acc_body = q.T @ v_world
norm_val = np.linalg.norm(acc_body)
if norm_val > 1e-3:
acc_body = acc_body / norm_val
else:
acc_body = np.array([0.0, 0.0, 0.0])
acc_body = acc_body * dv

acc_world = q @ acc_body
v_world += acc_world * dt
v_world = np.clip(v_world, -0.5, 0.5)
v_world = q @ np.array([0.0, 0.0, vx])
p += v_world * dt
traj[i, :3] = p
traj[i, 3:] = matrix_to_quat(q)
#hack
for i in range(num_steps):
traj[i, 2] = traj[0, 2]
trajectories[k] = traj
params[k, 0] = dv
params[k, 0] = vx
params[k, 1] = omega_y
return trajectories, params


def generate_predefined_trajectory_vocabularies(
duration=3.0, dt=0.1,
init_p=np.zeros(3), init_q=np.array([0, 0, 0, 1])
):
"""
Predefined trajectory vocabularies.
"""
num_steps = int(duration / dt) + 1
trajectories = []
params = []

# constant reverse trajectory
# vx = -0.2 m/s, omega = 0
reverse_speed = 0.2
p = init_p.copy()
q = quat_to_matrix(init_q)
traj = np.empty((num_steps, 7), dtype=np.float64)
for i in range(num_steps):
v_world = q @ np.array([0.0, 0.0, -reverse_speed])
p += v_world * dt
traj[i, :3] = p
traj[i, 3:] = matrix_to_quat(q)
for i in range(num_steps):
traj[i, 2] = traj[0, 2]
trajectories.append(traj)
params.append(np.array([-reverse_speed, 0.0], dtype=np.float64))

return np.asarray(trajectories), np.asarray(params)

@njit(cache=True)
def score_trajectories_by_ESDF(trajectories, ESDF_map, origin, resolution, safety_radius=0.1,
front_len=0.35, rear_len=0.35, half_w=0.15):
Expand Down Expand Up @@ -421,6 +438,27 @@ def publish_footprint(self, T, stamp):
msg.points = points
self.footprint_pub.publish(msg)

def _front_obstacle_dist(self, T, obstacle_mask, max_dist=0.5):
"""Distance from the robot's front face to the nearest obstacle in the forward corridor.
Scans start at the front face so the returned value matches physical clearance."""
center = self.camera_to_robot_center(T)
fwd = T[:3, :3] @ np.array([0.0, 0.0, 1.0])
n = (fwd[0] ** 2 + fwd[1] ** 2) ** 0.5
fx, fy = (fwd[0] / n, fwd[1] / n) if n > 1e-6 else (1.0, 0.0)
lx, ly = -fy, fx
fl, _, hw = self.robot.footprint_from_control()
rows, cols = obstacle_mask.shape
steps = int(max_dist / self.resolution) + 1
for step in range(steps):
d_from_face = step * self.resolution
d_from_center = fl + d_from_face
for w in (-hw, 0.0, hw):
xi = int((center[0] + fx * d_from_center + lx * w - self.origin[0]) / self.resolution)
yi = int((center[1] + fy * d_from_center + ly * w - self.origin[1]) / self.resolution)
if 0 <= xi < rows and 0 <= yi < cols and obstacle_mask[xi, yi]:
return d_from_face
return max_dist + 1.0

def publish_obstacle_mask(self, mask, stamp):
msg = OccupancyGrid()
msg.header = Header()
Expand Down Expand Up @@ -552,14 +590,16 @@ def sync_callback(self, depth_msg, odom_msg):
self.publish_footprint(T, depth_msg.header.stamp)

with Timer(name='traj gen', text="[{name}] Elapsed time: {milliseconds:.0f} ms"):
v_dir = T[:3, :3] @ np.array([0, 0, 1])
magnitude = np.clip(self.smoothed_velocity, 0.05, 0.5)
init_v = v_dir * float(magnitude)
init_p = self.camera_to_robot_center(T)
init_q = np.array([odom_msg.pose.pose.orientation.x, odom_msg.pose.pose.orientation.y, odom_msg.pose.pose.orientation.z, odom_msg.pose.pose.orientation.w])
trajectories, params = generate_trajectory_library_3d(
init_p = self.camera_to_robot_center(T),
init_v = init_v,
init_q = np.array([odom_msg.pose.pose.orientation.x, odom_msg.pose.pose.orientation.y, odom_msg.pose.pose.orientation.z, odom_msg.pose.pose.orientation.w])
init_p=init_p, init_q=init_q
)
vocab_trajs, vocab_params = generate_predefined_trajectory_vocabularies(
init_p=init_p, init_q=init_q
)
trajectories = np.concatenate([trajectories, vocab_trajs], axis=0)
params = np.concatenate([params, vocab_params], axis=0)
self.last_T = T
self.last_stamp = stamp

Expand All @@ -570,11 +610,25 @@ def sync_callback(self, depth_msg, odom_msg):
top_indices = np.argsort(scores, kind='stable')[:top_k]

with Timer(name='pub', text="[{name}] Elapsed time: {milliseconds:.0f} ms"):
front_clearance = self._front_obstacle_dist(T, obstacle_mask)
enter_threshold = 0.30

def cost_function(traj, param, score, target_pose):
# predefined backward trajectory penalty
is_backward_traj = param[0] < 0.0
should_reverse = front_clearance <= enter_threshold
reverse_gate_penalty = 0.0
if should_reverse and not is_backward_traj:
reverse_gate_penalty = 1e9
elif not should_reverse and is_backward_traj:
reverse_gate_penalty = 1e9

# regular trajectory penalty
traj_end = np.array(traj[-1,:3])
target_end = target_pose if target_pose is not None else traj_end
dist = np.linalg.norm(traj_end - target_end)
return score * 100000 + 100 * dist + 10 * abs(self.last_param[0] - param[0]) + 10 * abs(self.last_param[1] - param[1])

return score * 100000 + 100 * dist + 10 * abs(self.last_param[0] - param[0]) + 10 * abs(self.last_param[1] - param[1]) + reverse_gate_penalty

top_k = 1
top_indices = np.argsort(np.array([cost_function(trajectories[i], params[i], scores[i], self.target_pose) for i in range(len(trajectories))]), kind='stable')[:top_k]
Expand All @@ -595,7 +649,6 @@ def cost_function(traj, param, score, target_pose):
for i in top_indices:
for j in range(0, len(trajectories[i]), 10):
x,y,z,qx,qy,qz,qw = trajectories[i][j]

pose = PoseStamped()
pose.header = depth_msg.header
pose.pose.position.x = x
Expand Down
82 changes: 62 additions & 20 deletions tinynav/platforms/cmd_vel_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ def __init__(self):
self.path_filter_tau = 0.30
self.lookahead_steps = 1
# Static-friction compensation: very small vx often cannot move the robot.
self.min_effective_linear_speed = 0.2
self.min_effective_linear_speed = 0.1
self.min_effective_angular_speed = 0.1
self.linear_engage_threshold = 0.04
self.fixed_reverse_speed = 0.2
# Hack: if path first segment points far away from robot heading,
# rotate in place instead of publishing near-zero cmd_vel.
self.force_turn_heading_threshold = np.deg2rad(80.0)

self.latest_cmd = Twist()
self.prev_cmd = Twist()
Expand All @@ -59,7 +63,7 @@ def __init__(self):
_latched_qos = QoSProfile(depth=1, durability=DurabilityPolicy.TRANSIENT_LOCAL)
self.create_subscription(Bool, '/nav/paused', self._on_paused, _latched_qos)
self.cmd_timer = self.create_timer(1.0 / self.cmd_rate_hz, self.cmd_timer_callback)

def _on_paused(self, msg: Bool):
self._paused = msg.data
if not self._paused:
Expand Down Expand Up @@ -96,20 +100,40 @@ def cmd_timer_callback(self):
target_cmd.linear.x *= 0.3
target_cmd.angular.z *= 0.5

# Acceleration limiting for smoother control.
max_dv = self.max_linear_acc * dt
max_dw = self.max_angular_acc * dt
out = Twist()
out.linear.x = self._clamp_step(target_cmd.linear.x, self.prev_cmd.linear.x, max_dv)
out.angular.z = self._clamp_step(target_cmd.angular.z, self.prev_cmd.angular.z, max_dw)
out.linear.y = 0.0
# Dead-band: < 0.05 → 0; small positive → snap to min effective speed.
if abs(out.linear.x) < 0.05:
out.linear.x = 0.0
elif 0 < out.linear.x < self.min_effective_linear_speed:
out.linear.x = self.min_effective_linear_speed
if abs(out.angular.z) < 0.05:

# Reverse is a predefined planner vocabulary: straight back at fixed speed.
# Do not smooth or re-lock it here; just pass it through while stale/paused guards still work.
if target_cmd.linear.x < 0.0:
out.linear.x = target_cmd.linear.x
out.angular.z = 0.0
self.cmd_pub.publish(out)
self.prev_cmd = out
return

# Forward/turning commands still get acceleration limiting and robot minimum-speed locks.
max_dv = self.max_linear_acc * dt
# If we just left reverse mode, do not let acceleration limiting leak another reverse command.
prev_linear_x = 0.0 if self.prev_cmd.linear.x < 0.0 else self.prev_cmd.linear.x
out.linear.x = self._clamp_step(target_cmd.linear.x, prev_linear_x, max_dv)
# Do not acceleration-limit yaw. The planner/control layer already decides the turn rate,
# and forced rotate-in-place should take effect immediately.
out.angular.z = target_cmd.angular.z

# Linear x: robot cannot execute tiny non-zero speeds reliably.
# When engaging forward motion, snap to +min; when stopping/decaying, snap to 0.
if 0.0 < out.linear.x < self.min_effective_linear_speed:
out.linear.x = self.min_effective_linear_speed if target_cmd.linear.x >= self.min_effective_linear_speed else 0.0
elif abs(out.linear.x) < self.min_effective_linear_speed:
out.linear.x = 0.0

# Angular z: same idea; tiny requested turns snap to executable min, decays snap to 0.
if 0.0 < abs(out.angular.z) < self.min_effective_angular_speed:
if abs(target_cmd.angular.z) >= self.min_effective_angular_speed:
out.angular.z = float(np.sign(target_cmd.angular.z) * self.min_effective_angular_speed)
else:
out.angular.z = 0.0

self.cmd_pub.publish(out)
self.prev_cmd = out
Expand Down Expand Up @@ -145,23 +169,41 @@ def msg2np(msg):
T_robot_2 = T2 @ self.T_robot_to_camera
T_robot_2_to_1 = np.linalg.inv(T_robot_1) @ T_robot_2
p = T_robot_2_to_1[:3, 3]
heading_err = float(np.arctan2(p[1], p[0]))
# dt must match actual spacing between published Path poses, not raw trajectory dt.
dt = self.planner_dt * self.path_pose_stride * max(1, step_idx)
linear_velocity_vec = p / dt
r = R.from_matrix(T_robot_2_to_1[:3, :3])
angular_velocity_vec = r.as_rotvec() / dt

vx = np.clip(linear_velocity_vec[0], -0.1, 0.5)
if vx < 0.0:
raw_vx = float(linear_velocity_vec[0])
if raw_vx < 0.0:
vx = -self.fixed_reverse_speed
else:
vx = float(np.clip(raw_vx, 0.0, 0.5))
vy = 0.0
vyaw = np.clip(angular_velocity_vec[2], -0.8, 0.8)

# Filter planner updates to reduce visible jitter from 7-10 Hz updates.
alpha = np.clip(self.path_period_ema / (self.path_filter_tau + self.path_period_ema), 0.15, 0.75)
self.latest_cmd.linear.x = float((1.0 - alpha) * self.latest_cmd.linear.x + alpha * vx)
is_backward_segment = raw_vx < 0.0
if is_backward_segment:
vyaw = 0.0

# Hack: if path first segment points >80 deg away from robot heading,
# force an in-place turn. Skip explicit backward segments because reverse
# naturally has heading_err close to +/-pi.
if (not is_backward_segment) and abs(heading_err) > self.force_turn_heading_threshold:
vx = 0.0
vyaw = float(heading_err)
# Minimal rotate-first gate: apply only for forward motion.
elif vx > 0.0 and abs(heading_err) > 0.45:
vx = 0.0
vyaw = float(np.clip(1.6 * heading_err, -0.6, 0.6))

# Store the latest target command directly. Smoothing is intentionally kept
# only in cmd_timer_callback via acceleration limiting, so planner/control
# behavior stays easy to reason about during tuning.
self.latest_cmd.linear.x = float(vx)
self.latest_cmd.linear.y = float(vy)
self.latest_cmd.angular.z = float((1.0 - alpha) * self.latest_cmd.angular.z + alpha * vyaw)
self.latest_cmd.angular.z = float(vyaw)
age = 0.0 if self.last_path_update_time is None else (time.monotonic() - self.last_path_update_time)
self.logger.debug(
f"cmd vx={self.latest_cmd.linear.x:.3f} vyaw={self.latest_cmd.angular.z:.3f} "
Expand Down
Loading