-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdriver.py
More file actions
102 lines (84 loc) · 2.93 KB
/
driver.py
File metadata and controls
102 lines (84 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import time
import struct
import threading
from collections import deque
from typing import Optional
import serial
from gpiozero import OutputDevice, InputDevice
from .pins import LoRaPins
from .enums import LoraMode
from .protocol import DataPacket, FMT
from .exceptions import AuxTimeoutError
class LoraDriver:
def __init__(self, port: str, baudrate: int, pins: LoRaPins, buffer_size: int = 200, start_listener: bool = True):
self.serial = serial.Serial(port, baudrate, timeout=1)
self.pins = pins
self._packet_size = struct.calcsize(FMT)
# GPIO Zero 초기화
self._m0 = OutputDevice(pins.m0)
self._m1 = OutputDevice(pins.m1)
self._aux = InputDevice(pins.aux)
self.set_mode(LoraMode.NORMAL)
# RX buffer & listener thread
self._buffer = deque(maxlen=buffer_size)
self._latest: Optional[DataPacket] = None
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
if start_listener:
self.start_listener()
def wait_aux(self, timeout=1.0):
start = time.time()
while self._aux.value == 0:
if time.time() - start > timeout:
raise AuxTimeoutError("AUX timeout")
time.sleep(0.001)
def set_mode(self, mode: LoraMode):
if mode.value[0]:
self._m0.on()
else:
self._m0.off()
if mode.value[1]:
self._m1.on()
else:
self._m1.off()
self.wait_aux()
def send(self, packet: DataPacket):
self.wait_aux()
self.serial.write(packet.encode())
self.wait_aux()
def _receive_once(self) -> Optional[DataPacket]:
if self.serial.in_waiting >= self._packet_size:
raw = self.serial.read(self._packet_size)
return DataPacket.decode(raw)
return None
def receive(self) -> Optional[DataPacket]:
# synchronous pull API (kept for backward compatibility)
return self._receive_once()
def _reader_loop(self):
while not self._stop_event.is_set():
pkt = self._receive_once()
if pkt:
self._latest = pkt
self._buffer.append(pkt)
else:
time.sleep(0.01)
def start_listener(self):
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._reader_loop, daemon=True)
self._thread.start()
def stop_listener(self):
self._stop_event.set()
if self._thread:
self._thread.join(timeout=1)
def latest(self) -> Optional[DataPacket]:
return self._latest
def buffer(self):
return list(self._buffer)
def close(self):
self.stop_listener()
self._m0.close()
self._m1.close()
self._aux.close()
self.serial.close()