-
Notifications
You must be signed in to change notification settings - Fork 0
/
dump_traffic_read_property_multiple.py
104 lines (79 loc) · 2.87 KB
/
dump_traffic_read_property_multiple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/python
"""
Utility application that reads in the hex dump traffic from the BMR application
and decodes it.
$ python dump_traffic_read_property_multiple.py config.json dump.txt
"""
import sys
import json
from bacpypes.debugging import xtob
from bacpypes.pdu import Address
from bacpypes.analysis import decode_packet
from bacpypes.primitivedata import Unsigned
from bacpypes.constructeddata import Array, AnyAtomic
from bacpypes.object import get_datatype
from bacpypes.apdu import ReadPropertyMultipleRequest, ReadPropertyMultipleACK, Error
class Traffic:
def __init__(self, req):
self.req = req
self.resp = None
self.retry = 1
# humph
null_address = Address("0x000000000000")
# read in configuration
with open(sys.argv[1]) as bmr_config_file:
bmr_config = json.load(bmr_config_file)
inside_address = Address(bmr_config["inside"]["_deviceAddress"])
outside_address = Address(bmr_config["outside"]["_deviceAddress"])
# read in the file, split into lines
with open(sys.argv[2]) as infile:
lines = infile.readlines()
# strip off eol, split tabs to fields
lines = [line[:-1].split("\t") for line in lines]
traffic = []
requests = {}
# dump out the header and the decoded packet
for indx, line in enumerate(lines):
timestamp, stack, direction, source, destination, data = line
pkt = decode_packet(b"\0" * 14 + xtob(data))
if not pkt:
pkt_type = "unnable to decode"
else:
pkt_type = pkt.__class__.__name__
pkt._indx = indx + 1
if pkt.pduSource == null_address:
if direction == ">>>":
pkt.pduSource = Address(source)
elif direction == "<<<":
if stack == "inside":
pkt.pduSource = inside_address
elif stack == "outside":
pkt.pduSource = outside_address
if pkt.pduDestination == null_address:
pkt.pduDestination = Address(destination)
# check for reads
if isinstance(pkt, ReadPropertyMultipleRequest):
key = (pkt.pduSource, pkt.pduDestination, pkt.apduInvokeID)
if key in requests:
requests[key].retry += 1
else:
msg = Traffic(pkt)
requests[key] = msg
traffic.append(msg)
# now check for results
elif isinstance(pkt, (ReadPropertyMultipleACK, Error)):
key = (pkt.pduDestination, pkt.pduSource, pkt.apduInvokeID)
req = requests.get(key, None)
if req:
requests[key].resp = pkt
# delete the request, it stays in the traffic list
del requests[key]
# dump everything
for msg in traffic:
req = msg.req
resp = msg.resp
# start with request packet number, source, and destination
summary = f"{req._indx}/{resp._indx if resp else '*'}\t{req.pduSource}\t{req.pduDestination}"
if isinstance(resp, Error):
summary += f"\t{resp.errorClass}/{resp.errorCode}"
print(summary)