-
Notifications
You must be signed in to change notification settings - Fork 0
/
kibana.py
142 lines (113 loc) · 4.05 KB
/
kibana.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import re
from urllib.parse import unquote, urlparse
import requests
def parse(kibana_url: str):
""" Parses a Kibana url into an es-stream-logs one. """
if re.search(r"/app/kibana#/discover/\w+-\w+-\w+-\w+-\w+", kibana_url):
raise Exception("saved search not supported yet")
if "/goto/" in kibana_url:
resp = requests.head(kibana_url, allow_redirects=True)
resp.raise_for_status()
kibana_url = resp.url
kibana_url = urlparse(kibana_url)
kibana_query = unquote(kibana_url.fragment)
print(kibana_query)
timestamp = re.search(r"from:([^,]+),(mode:[^,]+,)?to:([^,)]+)", kibana_query)
from_timestamp = timestamp[1] if timestamp else 'now-15m'
to_timestamp = timestamp[3] if timestamp else 'now'
interval = re.search(r"interval:([^,]+)", kibana_query)
if not interval:
raise Exception("missing interval")
interval = interval[1]
# columns:!(application_name,level,message)
fields = re.search(r"columns:!\(([\w+,]+)\)", kibana_query)
if fields:
fields = fields[1]
if fields == "_source":
fields = None
# index:'application-*'
index = re.search(r"index:'?([^,'\"]+)'?", kibana_query)
if not index:
raise Exception("missing index", index)
index = index[1]
if "-*" not in index:
kibana_base = kibana_url.scheme + "://" + kibana_url.netloc
resp = requests.get(kibana_base + "/api/saved_objects/index-pattern/" + index)
resp.raise_for_status()
index = resp.json()['attributes']['title']
args = []
# query:(match:(level:(query:ERROR,type:phrase)))
# ["match", ["level", ["query:ERROR,type:phrase"]]]
# query:(match:(application_name:(query:api,type:phrase)))
# ["match", ["application_name", ["query:api,type:phrase"]]]
# query:(match_all:())
# query:(match:(geoip.as_number:(query:'34989',type:phrase)))
# query:(language:lucene,query:'')
query_marker = "query:("
idx = 0
try:
while True:
idx = kibana_query.index(query_marker, idx)
idx += len(query_marker) - 1
depth, query = parse_parentheses(kibana_query[idx:])
print(idx, depth, query)
if depth == 3:
if query[0] == "match":
var_name = query[1][0]
query_spec = dict(map(lambda e: [e[0], e[1].strip("'")], [x.split(":", 2) for x in query[1][1][0].split(",")]))
if 'query' not in query_spec:
raise Exception(f'no query for "{var_name}"')
args.append(f"{var_name}={query_spec['query']}")
except ValueError:
pass
url = f"index={index}&from={from_timestamp}&to={to_timestamp}&interval={interval}"
if len(args) > 0:
url += "&" + "&".join(args)
if fields:
url += f"&fields=@timestamp,{fields}"
return url
def push(obj, levels, depth):
while depth > 0:
levels = levels[-1]
depth -= 1
if isinstance(obj, str) and obj[-1] == ":":
obj = obj[:-1]
levels.append(obj)
def parse_parentheses(s):
groups = []
depth = 0
max_depth = 0
group = ""
try:
for ch in s:
if ch == '(':
if group != "":
push(group, groups, depth)
group = ""
push([], groups, depth)
depth += 1
max_depth = max(depth, max_depth)
elif ch == ')':
if group != "":
push(group, groups, depth)
group = ""
depth -= 1
if depth == 0:
return max_depth, groups[0]
else:
group += ch
except IndexError:
pass
if depth > 0:
raise ValueError('parentheses mismatch')
else:
return max_depth, groups[0]
if __name__ == "__main__":
import sys
for arg in sys.argv[1:]:
print("parse", arg)
try:
res = parse(arg)
print(res)
except Exception as ex:
print(ex)