Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/rov/rov_led_controller/rov_led_controller/WS2812.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# lifted from this lovely repo: https://github.com/seitomatsubara/Jetson-nano-WS2812-LED-

import spidev
import sys

class SPItoWS():
def __init__(self, ledc, bus=1, device=0):
self.led_count = ledc
self.X = '' # X is signal of WS281x
for i in range(self.led_count):
self.X = self.X + "100100100100100100100100100100100100100100100100100100100100100100100100"
self.led_brightness = 1.0
self.spi = spidev.SpiDev()
self.spi.open(bus, device)
self.spi.max_speed_hz = 2400000

def __del__(self):
self.spi.close()
print("destructor")

def _Bytesto3Bytes(self, num, RGB): # num is number of signal, RGB is 8 bits (1 byte) str
for i in range(8):
if RGB[i] == '0':
self.X = self.X[:num * 3 * 8 + i * 3] + '100' + self.X[num * 3 * 8 + i * 3 + 3:]
elif RGB[i] == '1':
self.X = self.X[:num * 3 * 8 + i * 3] + '110' + self.X[num * 3 * 8 + i * 3 + 3:]

def _BytesToHex(self, Bytes):
return ''.join(["0x%02X " % x for x in Bytes]).strip()

def LED_show(self):
Y = []
for i in range(self.led_count * 9):
Y.append(int(self.X[i*8:(i+1)*8],2))
WS = self._BytesToHex(Y)
self.spi.xfer3(Y, 2400000,0,8)

def RGBto3Bytes(self, led_num, R, G, B):
if (R > 255 or G > 255 or B > 255):
print("Invalid Value: RGB is over 255\n")
sys.exit(1)
if (led_num > self.led_count - 1):
print("Invalid Value: The number is over the number of LED")
sys.exit(1)
RR = format(round(R * self.led_brightness), '08b')
GG = format(round(G * self.led_brightness), '08b')
BB = format(round(B * self.led_brightness), '08b')
self._Bytesto3Bytes(led_num * 3, GG)
self._Bytesto3Bytes(led_num * 3 + 1, RR)
self._Bytesto3Bytes(led_num * 3 + 2, BB)

# debug
# print("led_num: %d, R: %d G: %d B: %d" % (led_num, round(R * self.led_brightness), round(G * self.led_brightness), round(B * self.led_brightness)))

def LED_OFF_ALL(self):
self.X = ''
for i in range(self.led_count):
self.X = self.X + "100100100100100100100100100100100100100100100100100100100100100100100100"
self.LED_show()

def set_brightness(self, brightness):
self.led_brightness = brightness


if __name__ == "__main__":
import time
LED_COUNT = 3
sig = SPItoWS(LED_COUNT)
sig.RGBto3Bytes(0, 255, 0, 0)
sig.RGBto3Bytes(1, 0, 255, 0)
sig.RGBto3Bytes(2, 0, 0, 255)
sig.LED_show()
time.sleep(1)
sig.LED_OFF_ALL()
227 changes: 149 additions & 78 deletions src/rov/rov_led_controller/rov_led_controller/led_controller.py
Original file line number Diff line number Diff line change
@@ -1,129 +1,200 @@
from common.csm_common_interfaces.msg import PinState
from std_msgs.msg import String
import Jetson.GPIO as GPIO
import rclpy
from rclpy.node import Node
from rainbowio import colorwheel

import board
import neopixel_spi
import time
import random
from WS2812 import SPItoWS

NUM_LIGHTS = 60
LED_PIN = board.SPI()._pins[0]
BLACK = (0,0,0)
WHITE = (255,255,255)
RED = (255, 0, 0)
YELLOW = (255, 150, 0)
GREEN = (0, 255, 0)
CYAN = (0, 255, 255)
BLUE = (0, 0, 255)
PURPLE = (180, 0, 255)
PIXELS = neopixel_spi.NeoPixel_SPI(board.SPI(), NUM_LIGHTS)
global ledState
global ledColor
PIXELS = SPItoWS(NUM_LIGHTS)

class LEDControllerNode(Node):

def __init__(self):
super().__init__(node_name="LEDController")
self.state_subscriber = self.create_subscription(PinState, "set_rov_gpio", self.on_set_rov_gpio, 10)
self.publisher_ = self.create_publisher(String, 'preprogrammed_animations', 10)
self.publisher_ = self.create_publisher(String, 'led_controller_output', 10)

self.animations = [
"color_chase",
"rainbow_cycle",
"pulse",
"solid",
"seizure_disco"
]

self.colors = [
"WHITE",
"BLACK",
"RED",
"YELLOW",
"GREEN",
"CYAN",
"BLUE",
"PURPLE"
]
self.animations = {
"color_chase": self.color_chase,
"rainbow_cycle": self.rainbow_cycle,
"pulse": self.pulse,
"solid": self.solid,
"seizure_disco": self.seizure_disco,
"off": self.off
}

self.colors = {
"WHITE": (255,255,255),
"BLACK": (0,0,0),
"RED": (255, 0, 0),
"YELLOW": (255, 150, 0),
"GREEN": (0, 255, 0),
"CYAN": (0, 255, 255),
"BLUE": (0, 0, 255),
"PURPLE": (180, 0, 255)
}

# Rainbow RGB values
self.rainbowR = 255
self.rainbowG = 255
self.rainbowB = 0

# color value and current animation
self.ledColor = "WHITE"
self.currentAnimation = "solid"

# Listen for requests from the UI
self.ui_subscriber = self.create_subscription(String, 'ui_requests', self.ui_request_callback, 10)

PIXELS.fill(BLACK)
PIXELS.LED_OFF_ALL()


def ui_request_callback(self, msg):
if msg.data == 'get_animations':
# Split message
split = msg.data.split(",")
msgs = [s.strip() for s in split]

# Change brightness
if len(msgs) == 2:
try:
brightness = float(msgs[1])
if brightness > 1.0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the fancy min(max_bound, max(min_bound, number)) method

brightness = 1
elif brightness < 0:
brightness = 0
PIXELS.set_brightness(brightness)
self.get_logger().info("Changed brightness to: %f" % brightness)
except:
self.get_logger().info("Second argument, brightness, should be a float.")

# Change animation and colors
if msgs[0] == 'get_animations':
# Send the list of preprogrammed animations to the UI
animations_msg = String()
animations_msg.data = "\n".join(animation for animation in self.animations)
self.publisher_.publish(animations_msg)
self.get_logger().info('Sent preprogrammed animations to UI')
elif msg.data == 'get_colors':
elif msgs[0] == 'get_colors':
colors_msg = String()
colors_msg.data = "\n".join(color for color in self.colors)
self.publisher_.publish(colors_msg)
self.get_logger().info('Sent preprogrammed colors to UI')
else:
# Check if the requested animation exists
if msg.data in self.animations:
if msgs[0] in self.animations:
# Call the method corresponding to the requested animation

if ledState and (lastAnimation != msg.data):
exec(f"self.{msg.data}({ledColor}, 0.1)")()
else:
PIXELS.brightness(0)
lastAnimation = msg.data
elif msg.data in self.colors:
lastColor = ledColor
if ledState and (lastColor != msg.data):
exec(f"self.{lastAnimation}({msg.data}, 0.1)")()


def on_set_rov_gpio(self, message: PinState):
if message.pin not in LED_PIN:
pass
ledState = message.state
self.currentAnimation = msgs[0]
self.get_logger().info('New animation received: %s' % msgs[0])
self.animations[msgs[0]](self.colors[self.ledColor])
elif msgs[0] in self.colors:
# Change the color if message changes colors
self.ledColor = msgs[0]
self.get_logger().info('New color received: %s' % msgs[0])
self.animations[self.currentAnimation](self.colors[msgs[0]])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a last case here which handles if it's "none of the above", aka an invalid message. Print out a something like "oi butt face you entered [blank] and that's wrong" - feel free to quote me



def color_chase(color, wait=0.1):
def color_chase(self, color, wait=0.1):
"""
Animates a single color across the LED strip
"""
for i in range(NUM_LIGHTS):
PIXELS[i] = color
PIXELS.RGBto3Bytes(i, color[0], color[1], color[2])
time.sleep(wait)
PIXELS.show()
PIXELS.LED_show()
time.sleep(0.5)

def rainbow_cycle(color, wait=0.1):
for j in range(255):
for i in range(NUM_LIGHTS):
rc_index = (i * 256 // NUM_LIGHTS) + j
PIXELS[i] = colorwheel(rc_index & 255)
PIXELS.show()
time.sleep(wait)

def pulse(color, wait=0.1):
PIXELS.fill(color)
def rainbow_cycle(self, color, wait=0.1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ewwwww

"""
Cycles a wave of rainbow over the LEDs
"""
RGBincrement = float(255 * 3.0) / NUM_LIGHTS
# LED RGB values
tempR = self.rainbowR
tempG = self.rainbowG
tempB = self.rainbowB

# Assigning LED RGB values
for i in range(NUM_LIGHTS):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You couuuuld use HSV and cycle through Hues if you so chose.. colorsys has a thing for that and I think it's in the standard library (so no extra dependency)

if tempR == 255 and tempB == 0 and tempG < 255:
PIXELS.RGBto3Bytes(i, tempR, tempG, tempB)
tempG += RGBincrement
elif tempG == 255 and tempB == 0 and tempR > 0:
PIXELS.RGBto3Bytes(i, tempR, tempG, tempB)
tempR -= RGBincrement
elif tempG == 255 and tempR == 0 and tempB < 255:
PIXELS.RGBto3Bytes(i, tempR, tempG, tempB)
tempB += RGBincrement
elif tempB == 255 and tempR == 0 and tempG > 0:
PIXELS.RGBto3Bytes(i, tempR, tempG, tempB)
tempG -= RGBincrement
elif tempB == 255 and tempG == 0 and tempR < 255:
PIXELS.RGBto3Bytes(i, tempR, tempG, tempB)
tempR += RGBincrement
elif tempR == 255 and tempG == 0 and tempB > 0:
PIXELS.RGBto3Bytes(i, tempR, tempG, tempB)
tempB -= RGBincrement
PIXELS.LED_show()
time.sleep(wait)

# increment first rainbow RGB values to next value
if self.rainbowR == 255 and self.rainbowB == 0 and self.rainbowG < 255:
self.rainbowG += 1
elif self.rainbowG == 255 and self.rainbowB == 0 and self.rainbowR > 0:
self.rainbowR -= 1
elif self.rainbowG == 255 and self.rainbowR == 0 and self.rainbowB < 255:
self.rainbowB += 1
elif self.rainbowB == 255 and self.rainbowR == 0 and self.rainbowG > 0:
self.rainbowG -= 1
elif self.rainbowB == 255 and self.rainbowG == 0 and self.rainbowR < 255:
self.rainbowR += 1
elif self.rainbowR == 255 and self.rainbowG == 0 and self.rainbowB > 0:
self.rainbowB -= 1


def pulse(self, color, wait=0.1):
"""
Fades a single color in
"""
for i in range(255):
PIXELS.setBrightness(i)
for j in range(NUM_LIGHTS):
PIXELS.RGBto3Bytes(j, color[0] * (i / 255.0), color[1] * (i / 255.0), color[2] * (i / 255.0))
time.sleep(wait)
PIXELS.show()
PIXELS.LED_show()
time.sleep(wait)

def solid(color, wait=0.1):
PIXELS.fill(color)
PIXELS.show()
def solid(self, color, wait=0.1):
"""
Changes all LEDs to solid color
"""
for i in range (NUM_LIGHTS):
PIXELS.RGBto3Bytes(i, color[0], color[1], color[2])
PIXELS.LED_show()

def seizure_disco(color, wait=0.1):
def seizure_disco(self, color, wait=0.1):
"""
Changes all LEDs to a random color
"""
for i in range(NUM_LIGHTS):
R = random.randint(0,255)
G = random.randint(0,255)
B = random.randint(0,255)
PIXELS[i].fill(R, G, B)
PIXELS.show()
PIXELS.RGBto3Bytes(i, R, G, B)
PIXELS.LED_show()
time.sleep(wait)

def off(self, color):
PIXELS.LED_OFF_ALL()

def _set_all_pixels(self, color):
"""
Sets all pixels to a single
"""
for i in range (NUM_LIGHTS):
PIXELS.RGBto3Bytes(i, color[0], color[1], color[2])



def main(args=None):
rclpy.init(args=args)
Expand Down
8 changes: 5 additions & 3 deletions src/rov/rov_led_controller/setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import find_packages, setup

package_name = 'src/rov/rov_led_controller'
package_name = 'led_controller'

setup(
name=package_name,
Expand All @@ -10,17 +10,19 @@
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# add WS2812
('lib/' + package_name, [package_name+'/WS2812.py'])
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='catfishjw',
maintainer_email='[email protected]',
description='TODO: Package description',
description='It controls LEDs',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'led_controller = src/rov/rov_led_controller.led_controller:main'
'led_controller = led_controller.led_controller:main'
],
},
)
Loading