-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcan_byd_sim.py
146 lines (132 loc) · 6.05 KB
/
can_byd_sim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import sched
import can
from can_service_events import CanServiceEvents
from can_storage import CanStorage
from can_thread import CanThread
class CanBydSim:
def __init__(self, storage: CanStorage, can_bus: can.interface.Bus, service_mode: bool = False):
self.sto: CanStorage = storage
self.can_bus: can.interface.Bus = can_bus
self.scheduler: sched.scheduler = sched.scheduler()
self.thread: CanThread = CanThread('byd-sim', self.run)
self.events: CanServiceEvents = CanServiceEvents()
self.service_mode: bool = service_mode
def process_message(self, message: can.Message):
if not self.service_mode:
print(message)
self.sto.process_message(message)
self.events.on_received(message)
if message.arbitration_id == 0x151:
if message.data[0] == 0x1:
messages = [(0x250, b'\x03\x16\x00\x66\x00\x33\x02\x09'),
(0x290, b'\x06\x37\x10\xd9\x00\x00\x00\x00'),
(0x2d0, b'\x00' + b'BYD' + b'\x00' * 4),
(0x3d0, b'\x00' + b'Battery'),
(0x3d0, b'\x01' + b'-Box Pr'),
(0x3d0, b'\x02' + b'emium H'),
(0x3d0, b'\x03' + b'VS' + b'\x00' * 5)]
for message in messages:
can_message = can.Message(arbitration_id=message[0], data=message[1], is_extended_id=False)
if not self.service_mode:
self.sto.process_message(can_message)
try:
self.can_bus.send(can_message)
self.events.on_sent(can_message)
except can.CanError as e:
print(f'can write failed: {e}')
def run(self):
self.events.on_start()
self.init_scheduler()
if not self.service_mode:
self.sto.load_message_infos()
while self.thread.running:
self.scheduler.run(blocking=False)
try:
message = self.can_bus.recv(0.1)
except can.CanError as e:
print(f'can read failed: {e}')
continue
if message is not None:
self.process_message(message)
list(map(self.scheduler.cancel, self.scheduler.queue))
self.events.on_stop()
def init_scheduler(self):
self.scheduler.enter(1.9, 1, self.send_limits)
self.scheduler.enter(9.9, 1, self.send_states)
self.scheduler.enter(59.9, 1, self.send_alarm)
self.scheduler.enter(9.9, 1, self.send_battery_info)
self.scheduler.enter(9.9, 1, self.send_cell_info)
@staticmethod
def calculate_bytes(message_info: dict, value: float) -> bytearray:
data = value * (1.0 / message_info['scaling'])
data = int(data).to_bytes(message_info['length'], byteorder='big', signed=message_info['signed'])
return bytearray(data)
def calculate_message(self, can_id: int, initial_data=b'\x00' * 8) -> can.Message:
data = bytearray(initial_data)
if self.service_mode:
if can_id in self.sto.message_infos:
for startbit, message_info in self.sto.message_infos[can_id].items():
if 'overwrite' in message_info:
new_bytes = self.calculate_bytes(message_info, message_info['overwrite'])
data[startbit:message_info['endbit']] = new_bytes
else:
with self.sto.message_infos_lock:
if can_id in self.sto.message_infos:
for startbit, message_info in self.sto.message_infos[can_id].items():
if self.sto.overwrite and 'overwrite' in message_info:
new_bytes = self.calculate_bytes(message_info, message_info['overwrite'])
data[startbit:message_info['endbit']] = new_bytes
message = can.Message(arbitration_id=can_id, data=data, is_extended_id=False)
if not self.service_mode:
self.sto.process_message(message)
return message
def send_limits(self):
message = self.calculate_message(0x110, b'\x09\x20\x06\x40\x01\x00\x01\x00')
try:
self.can_bus.send(message)
self.events.on_sent(message)
except can.CanError as e:
print(f'can write failed: {e}')
if not self.service_mode:
print(message)
self.scheduler.enter(1.9, 1, self.send_limits)
def send_states(self):
message = self.calculate_message(0x150, b'\x26\x0c\x27\x10\x00\xf3\x00\xfa')
try:
self.can_bus.send(message)
self.events.on_sent(message)
except can.CanError as e:
print(f'can write failed: {e}')
if not self.service_mode:
print(message)
self.scheduler.enter(9.9, 1, self.send_states)
def send_alarm(self):
message = self.calculate_message(0x190, b'\x00' * 3 + b'\x04' + b'\x00' * 4)
try:
self.can_bus.send(message)
self.events.on_sent(message)
except can.CanError as e:
print(f'can write failed: {e}')
if not self.service_mode:
print(message)
self.scheduler.enter(59.9, 1, self.send_alarm)
def send_battery_info(self):
message = self.calculate_message(0x1d0, b'\x08\x49\x00\x00\x00\xb4\x03\x08')
try:
self.can_bus.send(message)
self.events.on_sent(message)
except can.CanError as e:
print(f'can write failed: {e}')
if not self.service_mode:
print(message)
self.scheduler.enter(9.9, 1, self.send_battery_info)
def send_cell_info(self):
message = self.calculate_message(0x210, b'\x00\xbe\x00\xb4' + b'\x00' * 4)
try:
self.can_bus.send(message)
self.events.on_sent(message)
except can.CanError as e:
print(f'can write failed: {e}')
if not self.service_mode:
print(message)
self.scheduler.enter(9.9, 1, self.send_cell_info)