Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 79 additions & 24 deletions src/V0.2.3/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Author: PamirAI
#Date: 2025-05-05
#Version: 0.1.0
#Date: 2025-07-14
#Version: 1.0.0
#Description: This is the main program for the RP2040 SAM

import machine
Expand Down Expand Up @@ -48,8 +48,8 @@ def switch_usb(usb_type):
if PRODUCTION:
switch_usb("SOM_USB") # Disable SAM USB

# Setup UART0 on GPIO0 (TX) and GPIO1 (RX)
uart0 = machine.UART(0, baudrate=115200, tx=machine.Pin(0), rx=machine.Pin(1))
# Setup UART2 on GPIO4 (TX) and GPIO5 (RX) - matches CM5 UART2 connection
uart2 = machine.UART(0, baudrate=115200, tx=machine.Pin(4), rx=machine.Pin(5))
einkMux.low() # EINK OFF
einkStatus.low() # SOM CONTROL E-INK

Expand All @@ -58,7 +58,7 @@ def switch_usb(usb_type):
# Function to handle UART debug messages
def debug_print(message):
if UART_DEBUG:
uart0.write(message)
uart2.write(message)
print(message)


Expand All @@ -77,21 +77,23 @@ def debug_print(message):
# LED completion callback function
def led_completion_callback(led_id, sequence_length):
"""Callback function called when LED animations complete"""
global protocol, uart0, uart_lock, PRODUCTION
global protocol, uart2, uart_lock, PRODUCTION

try:
with uart_lock:
if sequence_length > 0:
# Send completion acknowledgment
packet = protocol.create_led_completion_packet(led_id, sequence_length)
uart0.write(packet)
uart2.write(packet)
uart2.flush() # Ensure immediate transmission
if not PRODUCTION:
print(f"LED completion ACK sent: LED{led_id}, {sequence_length} commands")
elif sequence_length < 0:
# Send error report (sequence_length is negative error code)
error_code = abs(sequence_length)
packet = protocol.create_led_error_packet(led_id, error_code)
uart0.write(packet)
uart2.write(packet)
uart2.flush() # Ensure immediate transmission
if not PRODUCTION:
print(f"LED error ACK sent: LED{led_id}, error {error_code}")
except Exception as e:
Expand Down Expand Up @@ -173,6 +175,50 @@ def process_uart_packet(packet_bytes):
# Handle button packets if needed (currently only send, not receive)
pass

elif packet_type == protocol.TYPE_SYSTEM:
# Parse system packet (SoM → RP2040 system commands)
valid, system_data = protocol.parse_system_packet(packet_bytes)
if valid:
cmd_type = system_data.get('command', 'unknown')

if cmd_type == 'ping':
# Respond to ping with pong
try:
with uart_lock:
pong_packet = protocol.create_system_pong_packet()
uart2.write(pong_packet)
uart2.flush() # Ensure immediate transmission
if not PRODUCTION:
print("Ping received, pong sent")
except Exception as e:
print(f"Failed to send pong: {e}")

elif cmd_type == 'version_request':
# Send firmware version response
try:
with uart_lock:
version_packet = protocol.create_firmware_version_packet()
uart2.write(version_packet)
uart2.flush() # Ensure immediate transmission
if not PRODUCTION:
print(f"Version sent: {protocol.FIRMWARE_VERSION_MAJOR}.{protocol.FIRMWARE_VERSION_MINOR}.{protocol.FIRMWARE_VERSION_PATCH}")
except Exception as e:
print(f"Failed to send version: {e}")

elif cmd_type == 'reset':
# Handle reset command
reset_type = system_data.get('reset_type', 0)
if not PRODUCTION:
print(f"Reset command received: type={reset_type}")
# TODO: Implement actual reset logic based on reset_type

else:
if not PRODUCTION:
print(f"Unknown system command: {cmd_type}")
else:
if not PRODUCTION:
print(f"Invalid system packet: {[hex(b) for b in packet_bytes]}")

else:
if not PRODUCTION:
print(f"Unknown packet type: 0x{packet_type:02X}")
Expand Down Expand Up @@ -210,7 +256,8 @@ def send_button_state():
print(f"Button packet: {[hex(b) for b in packet]}")
print(f"Button states - UP: {up_pressed}, DOWN: {down_pressed}, SELECT: {select_pressed}")

uart0.write(packet)
uart2.write(packet)
uart2.flush() # Ensure immediate transmission

# Interrupt handler for down button
def button_handler(pin):
Expand Down Expand Up @@ -246,14 +293,15 @@ def send_boot_notification():
# Send power status packet indicating we're running
packet = protocol.create_power_status_packet_rp2040_to_som(
protocol.POWER_STATE_RUNNING, 0x00)
uart0.write(packet)
uart2.write(packet)
# Ensure packet is transmitted immediately
uart2.flush()
if not PRODUCTION:
print("[Boot] Boot notification sent to SoM")
except Exception as e:
print(f"[Boot] Failed to send boot notification: {e}")

# Send boot notification now that UART is initialized
send_boot_notification()
# Boot notification will be sent after UART reception starts

# Shared flag to coordinate thread handoff
thread_handoff_complete = False
Expand Down Expand Up @@ -321,14 +369,17 @@ def core1_task():
print("Starting UART reception loop on Core 1")
uart_buffer = bytearray()

# Send boot notification now that UART reception is ready
send_boot_notification()

while True:
try:
wdt.feed()

# Check for incoming UART data
if uart0.any():
if uart2.any():
with uart_lock:
data = uart0.read()
data = uart2.read()
if data:
uart_buffer.extend(data)

Expand Down Expand Up @@ -385,7 +436,7 @@ def core1_task():
# Send status packet
with uart_lock:
packet = protocol.create_led_status_packet(led_id, status_code, status_value)
uart0.write(packet)
uart2.write(packet)
if not PRODUCTION:
print(f"LED status response: LED{led_id}, code{status_code}, value{status_value}")

Expand Down Expand Up @@ -441,7 +492,7 @@ def core1_task():
current_state = power_manager.get_power_state()
with uart_lock:
packet = protocol.create_power_status_packet_rp2040_to_som(current_state, 0x00)
uart0.write(packet)
uart2.write(packet)
if not PRODUCTION:
print(f"Power status response sent: state=0x{current_state:02X}")

Expand All @@ -452,7 +503,7 @@ def core1_task():
# Send acknowledgment
with uart_lock:
packet = protocol.create_power_status_packet_rp2040_to_som(new_state, 0x00)
uart0.write(packet)
uart2.write(packet)
if not PRODUCTION:
print(f"Power state set to: 0x{new_state:02X}")

Expand All @@ -472,7 +523,7 @@ def core1_task():
with uart_lock:
packet = protocol.create_power_status_packet_rp2040_to_som(
protocol.POWER_STATE_OFF, shutdown_mode)
uart0.write(packet)
uart2.write(packet)
if not PRODUCTION:
print(f"Shutdown ACK sent: mode={shutdown_mode}")

Expand All @@ -484,22 +535,26 @@ def core1_task():
# Send current measurement
current_packet = protocol.create_power_metrics_packet_rp2040_to_som(
protocol.POWER_CMD_CURRENT, metrics['current_ma'])
uart0.write(current_packet)
uart2.write(current_packet)
uart2.flush() # Ensure immediate transmission

# Send battery percentage
battery_packet = protocol.create_power_metrics_packet_rp2040_to_som(
protocol.POWER_CMD_BATTERY, metrics['battery_percent'])
uart0.write(battery_packet)
uart2.write(battery_packet)
uart2.flush() # Ensure immediate transmission

# Send temperature
temp_packet = protocol.create_power_metrics_packet_rp2040_to_som(
protocol.POWER_CMD_TEMP, metrics['temperature_0_1c'])
uart0.write(temp_packet)
uart2.write(temp_packet)
uart2.flush() # Ensure immediate transmission

# Send voltage
voltage_packet = protocol.create_power_metrics_packet_rp2040_to_som(
protocol.POWER_CMD_VOLTAGE, metrics['voltage_mv'])
uart0.write(voltage_packet)
uart2.write(voltage_packet)
uart2.flush() # Ensure immediate transmission

if not PRODUCTION:
print(f"Metrics sent: {metrics}")
Expand All @@ -522,13 +577,13 @@ def core1_task():
with uart_lock:
packet = protocol.create_power_status_packet_rp2040_to_som(
protocol.POWER_STATE_OFF, 0x00)
uart0.write(packet)
uart2.write(packet)
wdt.feed()
utime.sleep_ms(10)
if utime.ticks_diff(utime.ticks_ms(), start_time) >= 10000:
einkStatus.low()
einkMux.low() # SOM CONTROL E-INK
uart0.write("xSAM_USB\n")
uart2.write("xSAM_USB\n")
if PRODUCTION:
switch_usb("SAM_USB")
einkRunning = False
Expand Down
82 changes: 81 additions & 1 deletion src/V0.2.3/pamir_uart_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

class PamirUartProtocols:

# RP2040 Firmware version constants for protocol negotiation
FIRMWARE_VERSION_MAJOR = 1
FIRMWARE_VERSION_MINOR = 0
FIRMWARE_VERSION_PATCH = 0

# Message type constants (3 MSB of type_flags)
TYPE_BUTTON = 0x00 # 0b000xxxxx - Button state change events
TYPE_LED = 0x20 # 0b001xxxxx - LED control commands and status
Expand Down Expand Up @@ -490,4 +495,79 @@ def get_packet_type(self, packet_bytes):
return None

type_flags = packet_bytes[0]
return type_flags & 0xE0
return type_flags & 0xE0

# ==================== SYSTEM COMMANDS PROTOCOL ====================

def create_system_ping_packet(self):
"""Create system ping packet FROM RP2040 TO SoM

Returns:
bytes: 4-byte ping packet ready for UART transmission
"""
return self.create_packet(0xC0, 0x00, 0x00)

def create_system_pong_packet(self):
"""Create system pong response packet FROM RP2040 TO SoM

Returns:
bytes: 4-byte pong packet ready for UART transmission
"""
return self.create_packet(0xC0, 0x01, 0x00)

def create_firmware_version_packet(self):
"""Create firmware version packet FROM RP2040 TO SoM

Returns:
bytes: 4-byte version packet ready for UART transmission
"""
# Pack version as: MAJOR.MINOR.PATCH into 2 bytes
# data[0] = MAJOR (8 bits), data[1] = MINOR (4 bits) | PATCH (4 bits)
version_data0 = self.FIRMWARE_VERSION_MAJOR & 0xFF
version_data1 = ((self.FIRMWARE_VERSION_MINOR & 0x0F) << 4) | (self.FIRMWARE_VERSION_PATCH & 0x0F)
return self.create_packet(0xC2, version_data0, version_data1)

def parse_system_packet(self, packet_bytes):
"""Parse system packet and return system command data

Args:
packet_bytes: 4-byte packet from UART

Returns:
tuple: (valid, system_data) where system_data contains command info
"""
valid, parsed = self.validate_packet(packet_bytes)
if not valid:
return False, None

type_flags, data0, data1, checksum = parsed

# Check if this is a system packet
if (type_flags & 0xE0) != 0xC0:
return False, None

# Extract system command from full type_flags
command = type_flags & 0x1F

if command == 0x00:
# Ping command
system_data = {'command': 'ping'}
elif command == 0x01:
# Pong response
system_data = {'command': 'pong'}
elif command == 0x02:
# Version request
system_data = {'command': 'version_request'}
elif command == 0x03:
# Reset command
system_data = {'command': 'reset', 'reset_type': data0}
else:
# Unknown system command
system_data = {
'command': 'unknown',
'raw_command': command,
'data0': data0,
'data1': data1
}

return True, system_data