Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .DS_Store
Binary file not shown.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@
__pycache__
.mypy_cache
*.bag
rosdep/
rosdep/
*.DS_Store
build/
install/
log/
6 changes: 6 additions & 0 deletions mrobosub_teleop/launch/teleop_launch.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<launch>
<node name="joystick_teleop" pkg="mrobosub_teleop" exec="joystick_teleop" output="screen">
<param from="$(find-pkg-share mrobosub_teleop)/params/joystick_controls_b2a.yaml"/>
</node>
<node name="joy_node" pkg="joy" exec="joy_node"/>
</launch>
212 changes: 212 additions & 0 deletions mrobosub_teleop/mrobosub_teleop/joystick_teleop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#!/usr/bin/env python

from enum import Enum
from typing import List
from sensor_msgs.msg import Joy
from std_msgs.msg import Float64
from typing import Dict, Tuple
from mrobosub_lib import Node
import yaml
import rclpy

class ButtonCommand(int, Enum):
INCREASE = 1
DECREASE = -1
TOGGLE = 0


class Button:
"""Store commands tied to buttons"""

def __init__(self, idx: int, commandstr: str) -> None:
self.idx = idx
self.commandstr = commandstr
self._ispressed = False
self._isrising_edge = False
self.name, self._action = self.__split_command(commandstr)

@staticmethod
def __split_command(comstr: str) -> Tuple[str, ButtonCommand]:
"""Split command string into (axis, command)"""
if comstr.endswith(".increase"):
return (comstr[:-9], ButtonCommand.INCREASE)
elif comstr.endswith(".decrease"):
return (comstr[:-9], ButtonCommand.DECREASE)
elif comstr.endswith(".toggle"):
return (comstr[:-7], ButtonCommand.TOGGLE)
# Special commands
elif comstr.endswith("estop") or comstr.endswith("switch"):
return (comstr[4:], ButtonCommand.TOGGLE)
else:
raise ValueError(f"Button command {comstr} not recognised")

def update(self, ispress: bool) -> None:
self._isrising_edge = (not self.pressed) and ispress
self._ispressed = ispress

@property
def rise_edge(self) -> bool:
return self._isrising_edge

@property
def pressed(self) -> bool:
return self._ispressed

@property
def command(self) -> Tuple[str, ButtonCommand]:
return self.name, self._action


class DOF:
"""Store state of a degree of freedom"""

class DOFState(int, Enum):
POSE = 0
TWIST = 1

def __init__(
self,
id: int,
name: str,
scale_t: float,
scale_p: float,
istoggleable: bool,
node: Node,
) -> None:
self.idx = id
self.name = name

## state variables
self.pos = 0.0 # current position
self.state = DOF.DOFState.TWIST
self.scale = 0.0 # current scale
self.setPoint = 0.0 # target position

## configs
self.scale_t = scale_t
self.scale_p = scale_p
self.istogglable = istoggleable

self.twist_pub = node.create_publisher(Float64, f"/target_twist/{name}", qos_profile=1)
self.pose_pub = node.create_publisher(Float64, f"/target_pose/{name}", qos_profile=1)
self.pose_sub = node.create_subscription( Float64, f"/pose/{name}", self.pose_callback, qos_profile=1)

def pose_callback(self, msg: Float64) -> None:
self.pos = msg.data

def update(self, command: ButtonCommand, scale: float) -> None:
self.state ^= int(not bool(abs(command)) and self.istogglable) # toggle if command is TOGGLE && istoggleable
self.scale = ( self.scale_t * self.state + self.scale_p * (1 - self.state)) * scale # set scale based on state
self.setPoint = self.pos + self.scale_p * scale

def publish(self) -> None:
if self.state == DOF.DOFState.POSE:
self.pose_pub.publish(Float64(data=self.setPoint))
elif self.state == DOF.DOFState.TWIST:
self.twist_pub.publish(Float64(data=self.scale))


class Joystick_teleop(Node):
"""
Publishers
- /target_pose/heave
- /target_twist/heave
- /output_wrench/heave
- /target_twist/surge
- /output_wrench/surge
- /target_twist/sway
- /output_wrench/sway
- /target_pose/yaw
- /target_twist/yaw
- /output_wrench/yaw
- /target_pose/roll
- /target_twist/roll
- /output_wrench/roll
- /target_pose/pitch
- /target_twist/pitch
- /output_wrench/pitch

Subscribers
- /joy
- /pose/heave
- /pose/yaw
- /pose/roll
- /pose/pitch
"""

# axes : Dict[str, Dict[str, Union[str,float]]]
# buttons : Dict[str, Dict[str, str]]

def __init__(self) -> None:
super().__init__("joystick_teleopn")
# load params

paramsfile = paramsfile = ("/home/ubuntu/ros2_ws/src/mrobosub_teleop/params/joystick_controls_b2a.yaml")
# load directly from params folder, instead of ros2 param server (do not support recursive dict)

with open(paramsfile, "r") as f:
params = yaml.safe_load(f)
self.axes = params["axes"]
self.buttons = params["buttons"]

print(self.axes)
print(self.buttons)

self.input_subscriber = self.create_subscription(Joy, "/joy", self.joystick_callback, qos_profile=1)
self.wrench_pubs = [self.create_publisher(Float64, f"/output_wrench/{axis}", qos_profile=1) for axis in ["sway", "surge", "heave", "yaw", "roll", "pitch"]]
# Globals
self.buttonsInstance: List[Button] = []
self.DOFInstance: Dict[str, DOF] = {}
self.stateMachineMode = False
self._hard_stop = False
self.timer = self.create_timer(1.0 / 50, self.loop)
for i in range(6):
if DOFConfig := self.axes.get(str(i)):
name = str(DOFConfig["use"])
self.DOFInstance[name] = DOF(
i,
name,
float(DOFConfig["scale_t"]),
float(DOFConfig["scale_p"]),
name in ["yaw", "heave", "roll", "pitch"],
self,
) # toggleable DOFs
else:
raise ValueError(f"Axis {i} not configured in params")
for i in range(10):
if ButtonConfig := self.buttons.get(str(i)):
self.buttonsInstance.append(Button(i, str(ButtonConfig["use"])))
else:
raise ValueError(f"Button {i} not configured in params")

def joystick_callback(self, msg: Joy):
for button in self.buttonsInstance:
button.update(msg.buttons[button.idx] == 1)
if not button.rise_edge:
continue
name, command = button.command
self._hard_stop |= (name == "estop")
self.stateMachineMode ^= (name == "switch")
if name not in ("estop", "switch"):
(self.DOFInstance.get(name) or (_ for _ in ()).throw(ValueError(f"Button command {name} not recognised"))).update(command, msg.axes[self.DOFInstance[name].idx])

def hard_stop(self):
for pub in self.wrench_pubs:
pub.publish(Float64(data=0.0))
return

def loop(self):
if not (self._hard_stop and self.stateMachineMode):
for dof in self.DOFInstance.values():
dof.publish()
else:
self.hard_stop()


def main():
rclpy.init()
node = Joystick_teleop()
rclpy.spin(node)

if __name__ == "__main__":
main()
58 changes: 58 additions & 0 deletions mrobosub_teleop/params/joystick_controls_b2a.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@

# Tie axis numbers to commands
axes:
"0":
use: sway
scale_t: -1
scale_p: 0
"1":
use: surge
scale_t: 1
scale_p: 0
"2":
use: yaw
scale_t: -0.2
scale_p: -1
"3":
use: heave
scale_t: -0.2
scale_p: -0.05
"4":
use: roll
scale_t: -0.2
scale_p: -0.1
"5":
use: pitch
scale_t: 0.2
scale_p: 0.1

# Tie button numbers to commands
buttons:
"0":
use: yaw.increase
scale: 30
"1":
use: heave.decrease
scale: -0.2
"2":
use: heave.increase
scale: 0.2
"3":
use: yaw.decrease
scale: -30
"4":
use: roll.toggle
"5":
use: pitch.toggle
"6":
use: yaw.toggle
"7":
use: heave.toggle
# Special commands
"8":
use: com.switch # between state machine mode
"9":
use: com.estop

# Rate to send commands at (Hz)
rate: 50
5 changes: 4 additions & 1 deletion mrobosub_teleop/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
license="BSD-2.0",
tests_require=["pytest"],
entry_points={
"console_scripts": ["all_dof_teleop = mrobosub_teleop.all_dof_teleop:main"],
"console_scripts": [
"all_dof_teleop = mrobosub_teleop.all_dof_teleop:run",
"joystick_teleop = mrobosub_teleop.joystick_teleop:main",
],
},
)