-
Notifications
You must be signed in to change notification settings - Fork 0
/
okhttp_log_parser
executable file
·142 lines (125 loc) · 3.66 KB
/
okhttp_log_parser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/python -u
import re
import subprocess
from subprocess import PIPE
import json
import jsonpath_rw_ext as jp
import yaml
import os
CONFIG_FILE = "{0}/okhttp_log_parser.yml".format(os.environ['HOME'])
TAG = 'OkHttp'
LOG_LINE = re.compile(r'^([A-Z])/(.+?)\( *(\d+)\): (.*?)$')
START_RESPONSE_PATTERN = re.compile(r'^<-- \d+.*$')
END_RESPONSE_PATTERN = re.compile(r'^<-- END HTTP.*$')
HEADER_PATTERN = re.compile(r'^[A-Za-z]*((-|_)[A-Za-z]*)*:.*$')
DEVICE_PATTERN = re.compile(r'^(\S*)\s*device$')
colors = BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
RESET = '\033[0m'
PATHS_TO_MATCH = []
PATHS_TO_DISPLAY = {}
def termcolor(fg=None, bg=None):
codes = []
if fg is not None: codes.append('3%d' % fg)
if bg is not None: codes.append('10%d' % bg)
return '\033[%sm' % ';'.join(codes) if codes else ''
def colorize(message, fg=None, bg=None):
return termcolor(fg, bg) + message + RESET
def get_json_paths():
global PATHS_TO_MATCH
global PATHS_TO_DISPLAY
with open(CONFIG_FILE) as f:
config = yaml.load(f.read(), Loader=yaml.FullLoader)
PATHS_TO_MATCH = config.get('json_paths', [])
for index, path in enumerate(PATHS_TO_MATCH):
fg_color = (index % (len(colors)-1)) + 1
PATHS_TO_DISPLAY[path] = colorize(path, BLACK, fg_color)
response_started = False
response = ''
def process_response():
global response
if not response:
return
try:
response_dict = json.loads(response)
for path in PATHS_TO_MATCH:
match = jp.match1(path, response_dict)
if match:
print(PATHS_TO_DISPLAY[path] + ": " + match)
except:
pass
def process_message(message):
global response
global response_started
if START_RESPONSE_PATTERN.match(message):
response_started = True
elif END_RESPONSE_PATTERN.match(message):
response_started = False
process_response()
response = ''
elif HEADER_PATTERN.match(message):
pass
elif message.startswith('-->'):
pass
else:
if(response_started):
response += message
def poll(device=None):
get_json_paths()
adb_command = ['adb']
if device:
adb_command.append('-s')
adb_command.append(device)
adb_command.extend(['logcat', '-v', 'brief'])
adb = subprocess.Popen(adb_command, stdin=PIPE, stdout=PIPE)
while adb.poll() is None:
try:
line = adb.stdout.readline().decode('utf-8', 'replace').strip()
except KeyboardInterrupt:
break
if len(line) == 0:
break
log_line = LOG_LINE.match(line)
if log_line:
_, tag, _, message = log_line.groups()
if tag.strip() == TAG:
process_message(message.encode('utf-8'))
def find_devices():
devices = []
adb_command = ['adb', 'devices']
adb = subprocess.Popen(adb_command, stdin=PIPE, stdout=PIPE)
while adb.poll() is None:
try:
line = adb.stdout.readline().decode('utf-8', 'replace').strip()
match = DEVICE_PATTERN.match(line)
if match:
devices.append(match.group(1))
except KeyboardInterrupt:
break
if len(line) == 0:
break
return devices
def try_again(devices):
print("Invalid Option. Try Again")
return select_device(devices)
def select_device(devices):
print("Selct Device")
for index, value in enumerate(devices):
print('{0}.) {1}'.format(index+1, value))
option = input()
try:
option_number = int(option) - 1
if option_number < len(devices) and option_number >=0:
return devices[option_number]
else:
return try_again(devices)
except:
return try_again(devices)
def cli():
devices = find_devices()
if not devices or len(devices) == 1:
poll()
else:
device = select_device(devices)
poll(str(device))
if __name__ == '__main__':
cli()