-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathl2pre_fms.py
executable file
·148 lines (116 loc) · 5.28 KB
/
l2pre_fms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python
"""
Something like
https://github.com/vs-uulm/nemesys/blob/c03cafdaed3175647d5bc690488742745acbd0eb/src/nemesys_fms.py
adopted to test l2pre with Fomat Match Score (FMS)
"""
# system import
import argparse
from IPython import embed
from time import time
# nemere import
from nemere.validation.dissectorMatcher import MessageComparator, DissectorMatcher
from nemere.utils.loader import SpecimenLoader
from nemere.utils import reportWriter
# netzob import
from netzob.Model.Vocabulary.Field import Field
from netzob.Model.Vocabulary.Types.Raw import Raw
# internal import
from utils import import_messages
from l2pre import analyze
debug = False
"""Some modules and methods contain debug output that can be activated by this flag."""
def prepareMessages(symbols, specimens):
"""Restores content of symbol.messages to the original, not deduplicated messages, including the
payload and appends a payload field to symbol.fields. Afterwards, connect the message object of
the comparator to the symbol, so that it is able to compare the format.
"""
def applySpecimenMessage(sym):
# match specimenloader logic by using their message object
newmsgs = []
for m in sym.messages:
newmsg = None
for specmsg in specimens.messagePool.keys():
if m.data == specmsg.data and m.date == specmsg.date:
newmsg = specmsg
break
if not newmsg:
raise
newmsgs.append(newmsg)
sym.messages = newmsgs
# restore original messages, as l2pre normally only outputs the unique ones
for sym in symbols:
if sym.orig_messages:
sym.messages = list(sym.orig_messages)
else:
raise ValueError("orig_messages do not exist")
# restore payload data by adding a field to symbol and appending payload data again
for sym in symbols:
# check if payloads exist and what the max size is (for payload field)
payloads = []
for m in sym.messages:
if m.payload:
payloads.append(m.payload_data)
if payloads:
max_payload_size = max([len(pl) for pl in payloads])
else:
applySpecimenMessage(sym)
continue
max_payload_size *= 8 # bits, not bytes
# create and add payload field
payload_field = Field(Raw(nbBytes=(0, max_payload_size)))
sym.fields.append(payload_field)
# append payload data to message.data
for m in sym.messages:
if m.payload:
m.data += m.payload_data
applySpecimenMessage(sym)
def main(args):
print("Load messages...")
# for FMS stuff
# TODO import multiple files in SpecimenLoader and everywhere else, basically...
specimens = SpecimenLoader(args.files[0], layer=args.layer,
relativeToIP=False)
comparator = MessageComparator(specimens, pcap=args.files[0], omitPayload=args.omit_payload,
failOnUndissectable=False, debug=debug)
# for l2pre
messages_list = import_messages(args.files, importLayer=args.layer)
print("Infer protocol via l2pre tool...")
inference_title = 'l2pre_inferred'
inference_start_time = time()
symbols = analyze(messages_list, args)
inference_runtime = time() - inference_start_time
# prepare messages to have the format, the comparator expects
prepareMessages(symbols, specimens)
# print inference results
comparator.pprintInterleaved(symbols)
# calc FMS per message
print("\nCalculate FMS...\n")
fms = DissectorMatcher.symbolListFMS(comparator, symbols)
# write report
reportWriter.writeReport(fms, inference_runtime, specimens, comparator, inference_title)
if args.interactive:
print("Start interactive session...\n")
print('Loaded PCAP in: specimens, comparator, messages_list')
print('Inferred messages in: symbols')
print('FMS of messages in: message2quality')
embed()
return
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Reverse engineer a layer 2 protocol with l2pre and evaluate against tshark '
'dissectors: Write a report containing the Format Match Score (FMS) for each '
'message and other evaluation data.')
parser.add_argument('files', help='pcap/pcapng files with network traffic to be analyzed', \
metavar='PCAPs', nargs='+')
parser.add_argument('-l', '--layer', default=1, type=int, \
help='Layer to import, defaults to 1 (use 2 if importing stuff in Radiotap header)')
parser.add_argument('-nt', '--no-tunnel', action='store_true', default=False, \
help='Do not look for Ethernet frames while searching for payloads. To use in case ' + \
'of layer 2 replacements of Ethernet, but NOT in case of tunneled Ethernet+X traffic')
parser.add_argument('-p', '--omit-payload', action='store_true', \
help='Ignore payload during format comparison. Useful for protocols with big payload.')
parser.add_argument('-i', '--interactive', action='store_true', \
help='Start interactive session after automatic protocol reversing')
args = parser.parse_args()
main(args)