-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_socketio_data.py
executable file
·134 lines (121 loc) · 3.57 KB
/
get_socketio_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from time import sleep
import socketio
import argparse
import json
from can import LEDCan
from threading import Timer
HOST = '192.168.50.100'
PORT = 5100
sio = socketio.Client()
led = LEDCan('can0')
light_status = 'none'
last_light_status = 'none'
def setInterval(func, sec):
def func_wrapper():
setInterval(func, sec)
func()
t = Timer(sec, func_wrapper)
t.start()
return t
@sio.on('sensor')
def handleMsg(msg):
global light_status
data = json.loads(msg)
# print('=================', data)
for sensor in data:
if sensor['name'] == 'STEERING':
if sensor['data'] > 3*3.1415/180:
# print('Left')
light_status='left'
elif sensor['data'] < -3*3.1415/180:
# print('Right')
light_status='right'
else:
# print('None')
if light_status is not 'hazzard':
light_status='none'
elif sensor['name'] == 'DRIVE_STATUS':
# return to manual mode
if sensor['data'] == 2 and light_status == 'hazzard':
print('recover')
light_status='none'
elif sensor['name'] == 'LED':
# print(sensor['data'])
if sensor['data']:
led = json.loads(sensor['data'])
@sio.on('ids')
def handleMsg(msg):
global light_status
msg = json.loads(msg)
if msg["Classification"] != "Benign":
if msg["Attack_type"] == 'DoS':
print("IDS Alert: DoS Detected!")
light_status='hazzard'
elif msg["Attack_type"] == 'Spoofing':
print("IDS Alert: Spoofing Detected!")
light_status='hazzard'
else:
print("Unexpected Alert!")
@sio.on('rsu')
def handleMsg(msg):
global light_status
msg = json.loads(msg)
if 'RSU Cert' in msg:
if msg['RSU Cert'] == 'Valid':
light = msg['Traffic_Light']
remain = msg['Remaining_Time']
else:
print('OBU Alert: RSU Invalid!')
light_status='hazzard'
else:
print('OBU Alert: RSU Invalid!')
light_status='hazzard'
def process_command():
parser = argparse.ArgumentParser()
parser.add_argument('--host', '-host', type=str,
required=False, help='Set Host', default=HOST)
parser.add_argument('--port', '-p', type=int,
required=False, help='Set Port', default=PORT)
return parser.parse_args()
def send_can():
global light_status, led
# print(light_status)
if light_status == 'none':
#led.TestNone()
led.Non()
elif light_status == 'left':
#led.TestLeft()
led.Left()
elif light_status == 'right':
#led.TestRight()
led.Right()
elif light_status == 'hazzard':
led.Hazzard()
def send_can_once():
global light_status, last_light_status, led
# print(light_status)
if light_status == last_light_status:
return
if light_status == 'none':
led.Non()
elif light_status == 'left':
led.Left()
elif light_status == 'right':
led.Right()
elif light_status == 'hazzard':
led.Hazzard()
if __name__ == '__main__':
args = process_command()
host = args.host
port = args.port
print('socketio connecting...')
while (not sio.connected):
try:
sio.connect(f"http://{host}:{port}")
except:
sleep(3)
print('try again...')
print('connected')
setInterval(send_can, 0.1)