1
- from .constants import JOYCON_VENDOR_ID , JOYCON_PRODUCT_IDS , JOYCON_L_PRODUCT_ID
1
+ from .constants import JOYCON_VENDOR_ID , JOYCON_PRODUCT_IDS
2
+ from .constants import JOYCON_L_PRODUCT_ID , JOYCON_R_PRODUCT_ID
2
3
import hid
3
4
import time
4
5
import threading
5
6
from typing import Optional
6
7
7
8
# TODO: disconnect, power off sequence
8
9
10
+
9
11
class JoyCon :
10
12
_INPUT_REPORT_SIZE = 49
11
13
_INPUT_REPORT_PERIOD = 0.015
@@ -18,16 +20,17 @@ class JoyCon:
18
20
color_body : (int , int , int )
19
21
color_btn : (int , int , int )
20
22
21
- def __init__ (self , vendor_id : int , product_id : int , serial : str = None ):
23
+ def __init__ (self , vendor_id : int , product_id : int , serial : str = None , simple_mode = False ):
22
24
if vendor_id != JOYCON_VENDOR_ID :
23
25
raise ValueError (f'vendor_id is invalid: { vendor_id !r} ' )
24
26
25
27
if product_id not in JOYCON_PRODUCT_IDS :
26
28
raise ValueError (f'product_id is invalid: { product_id !r} ' )
27
29
28
- self .vendor_id = vendor_id
29
- self .product_id = product_id
30
- self .serial = serial
30
+ self .vendor_id = vendor_id
31
+ self .product_id = product_id
32
+ self .serial = serial
33
+ self .simple_mode = simple_mode # TODO: It's for reporting mode 0x3f
31
34
32
35
# setup internal state
33
36
self ._input_hooks = []
@@ -69,25 +72,29 @@ def _read_input_report(self) -> bytes:
69
72
70
73
def _write_output_report (self , command , subcommand , argument ):
71
74
# TODO: add documentation
72
- self ._joycon_device .write (command
73
- + self ._packet_number .to_bytes (1 , byteorder = 'little' )
74
- + self ._RUMBLE_DATA
75
- + subcommand
76
- + argument )
75
+ self ._joycon_device .write (b'' .join ([
76
+ command ,
77
+ self ._packet_number .to_bytes (1 , byteorder = 'little' ),
78
+ self ._RUMBLE_DATA ,
79
+ subcommand ,
80
+ argument ,
81
+ ]))
77
82
self ._packet_number = (self ._packet_number + 1 ) & 0xF
78
83
79
84
def _send_subcmd_get_response (self , subcommand , argument ) -> (bool , bytes ):
80
85
# TODO: handle subcmd when daemon is running
81
86
self ._write_output_report (b'\x01 ' , subcommand , argument )
82
87
83
88
report = self ._read_input_report ()
84
- while report [0 ] != 0x21 : # TODO, avoid this, await daemon instead
89
+ while report [0 ] != 0x21 : # TODO, avoid this, await daemon instead
85
90
report = self ._read_input_report ()
86
- assert report [1 :2 ] != subcommand , "THREAD carefully" # TODO, remove, see the todo above
91
+
92
+ # TODO, remove, see the todo above
93
+ assert report [1 :2 ] != subcommand , "THREAD carefully"
87
94
88
95
# TODO: determine if the cut bytes are worth anything
89
96
90
- return report [13 ] & 0x80 , report [13 :] # (ack, data)
97
+ return report [13 ] & 0x80 , report [13 :] # (ack, data)
91
98
92
99
def _spi_flash_read (self , address , size ) -> bytes :
93
100
assert size <= 0x1d
@@ -105,25 +112,30 @@ def _spi_flash_read(self, address, size) -> bytes:
105
112
def _update_input_report (self ): # daemon thread
106
113
while True :
107
114
report = self ._read_input_report ()
108
- while report [0 ] != 0x30 : # TODO, handle input reports of type 0x21 and 0x3f
115
+ # TODO, handle input reports of type 0x21 and 0x3f
116
+ while report [0 ] != 0x30 :
109
117
report = self ._read_input_report ()
118
+
110
119
self ._input_report = report
111
120
112
121
for callback in self ._input_hooks :
113
122
callback (self )
114
123
115
124
def _read_joycon_data (self ):
116
125
color_data = self ._spi_flash_read (0x6050 , 6 )
117
- #stick_cal = self._spi_flash_read(0x8012 if self.is_left else 0x801D, 8) # TODO, this
126
+
127
+ # TODO: use this
128
+ # stick_cal_addr = 0x8012 if self.is_left else 0x801D
129
+ # stick_cal = self._spi_flash_read(stick_cal_addr, 8)
118
130
119
131
# user IME data
120
132
if self ._spi_flash_read (0x8026 , 2 ) == b"\xB2 \xA1 " :
121
- print (f"Calibrate { self .serial } IME with user data" )
133
+ # print(f"Calibrate {self.serial} IME with user data")
122
134
imu_cal = self ._spi_flash_read (0x8028 , 24 )
123
135
124
136
# factory IME data
125
137
else :
126
- print (f"Calibrate { self .serial } IME with factory data" )
138
+ # print(f"Calibrate {self.serial} IME with factory data")
127
139
imu_cal = self ._spi_flash_read (0x6020 , 24 )
128
140
129
141
self .color_body = tuple (color_data [:3 ])
@@ -165,32 +177,33 @@ def _to_int16le_from_2bytes(hbytebe, lbytebe):
165
177
return int16le
166
178
167
179
def _get_nbit_from_input_report (self , offset_byte , offset_bit , nbit ):
168
- return (self ._input_report [offset_byte ] >> offset_bit ) & ((1 << nbit ) - 1 )
180
+ byte = self ._input_report [offset_byte ]
181
+ return (byte >> offset_bit ) & ((1 << nbit ) - 1 )
169
182
170
183
def __del__ (self ):
171
184
self ._close ()
172
185
173
- def set_gyro_calibration (self , offset_xyz , coeff_xyz ):
174
- print ("set_gyro_calibration:" , offset_xyz , coeff_xyz ) # TODO: remove this
186
+ def set_gyro_calibration (self , offset_xyz = None , coeff_xyz = None ):
175
187
if offset_xyz :
176
188
self ._GYRO_OFFSET_X , \
177
189
self ._GYRO_OFFSET_Y , \
178
190
self ._GYRO_OFFSET_Z = offset_xyz
179
191
if coeff_xyz :
180
- self ._GYRO_COEFF_X = 0x343b / coeff_xyz [0 ] if coeff_xyz [0 ] != 0x343b else 1
181
- self ._GYRO_COEFF_Y = 0x343b / coeff_xyz [1 ] if coeff_xyz [1 ] != 0x343b else 1
182
- self ._GYRO_COEFF_Z = 0x343b / coeff_xyz [2 ] if coeff_xyz [2 ] != 0x343b else 1
192
+ cx , cy , cz = coeff_xyz
193
+ self ._GYRO_COEFF_X = 0x343b / cx if cx != 0x343b else 1
194
+ self ._GYRO_COEFF_Y = 0x343b / cy if cy != 0x343b else 1
195
+ self ._GYRO_COEFF_Z = 0x343b / cz if cz != 0x343b else 1
183
196
184
- def set_accel_calibration (self , offset_xyz , coeff_xyz ):
185
- print ("set_accel_calibration:" , offset_xyz , coeff_xyz ) # TODO: remove this
197
+ def set_accel_calibration (self , offset_xyz = None , coeff_xyz = None ):
186
198
if offset_xyz :
187
199
self ._ACCEL_OFFSET_X , \
188
200
self ._ACCEL_OFFSET_Y , \
189
201
self ._ACCEL_OFFSET_Z = offset_xyz
190
202
if coeff_xyz :
191
- self ._ACCEL_COEFF_X = 0x4000 / coeff_xyz [0 ] if coeff_xyz [0 ] != 0x4000 else 1
192
- self ._ACCEL_COEFF_Y = 0x4000 / coeff_xyz [1 ] if coeff_xyz [1 ] != 0x4000 else 1
193
- self ._ACCEL_COEFF_Z = 0x4000 / coeff_xyz [2 ] if coeff_xyz [2 ] != 0x4000 else 1
203
+ cx , cy , cz = coeff_xyz
204
+ self ._ACCEL_COEFF_X = 0x4000 / cx if cx != 0x4000 else 1
205
+ self ._ACCEL_COEFF_Y = 0x4000 / cy if cy != 0x4000 else 1
206
+ self ._ACCEL_COEFF_Z = 0x4000 / cz if cz != 0x4000 else 1
194
207
195
208
def register_update_hook (self , callback ):
196
209
self ._input_hooks .append (callback )
@@ -402,15 +415,18 @@ def get_status(self) -> dict:
402
415
403
416
def set_player_lamp_on (self , on_pattern : int ):
404
417
self ._write_output_report (
405
- b'\x01 ' , b'\x30 ' , (on_pattern & 0xF ).to_bytes (1 , byteorder = 'little' ))
418
+ b'\x01 ' , b'\x30 ' ,
419
+ (on_pattern & 0xF ).to_bytes (1 , byteorder = 'little' ))
406
420
407
421
def set_player_lamp_flashing (self , flashing_pattern : int ):
408
422
self ._write_output_report (
409
- b'\x01 ' , b'\x30 ' , ((flashing_pattern & 0xF ) << 4 ).to_bytes (1 , byteorder = 'little' ))
423
+ b'\x01 ' , b'\x30 ' ,
424
+ ((flashing_pattern & 0xF ) << 4 ).to_bytes (1 , byteorder = 'little' ))
410
425
411
426
def set_player_lamp (self , pattern : int ):
412
427
self ._write_output_report (
413
- b'\x01 ' , b'\x30 ' , pattern .to_bytes (1 , byteorder = 'little' ))
428
+ b'\x01 ' , b'\x30 ' ,
429
+ pattern .to_bytes (1 , byteorder = 'little' ))
414
430
415
431
416
432
if __name__ == '__main__' :
0 commit comments