Skip to content

Commit

Permalink
Add error handling, progress bar, cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <[email protected]>
  • Loading branch information
mikaylathompson committed Aug 19, 2023
1 parent dea732c commit 928e55c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ FROM ubuntu:focal
ENV DEBIAN_FRONTEND noninteractive

RUN apt-get update && \
apt-get install -y --no-install-recommends python3.11.4 python3-pip python3-dev gcc libc-dev git curl && \
pip3 install urllib3==1.25.11 opensearch-benchmark==1.1.0
apt-get install -y --no-install-recommends python3.9 python3-pip python3-dev gcc libc-dev git curl && \
pip3 install urllib3==1.25.11 opensearch-benchmark==1.1.0 tqdm

COPY runTestBenchmarks.sh /root/
COPY humanReadableLogs.py /root/
RUN chmod ugo+x /root/runTestBenchmarks.sh
RUN chmod ugo+x /root/humanReadableLogs.py
WORKDIR /root

CMD tail -f /dev/null
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
import json
import pathlib
from typing import Optional
import logging

from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

logger = logging.getLogger(__name__)

LOG_JSON_TUPLE_FIELD = "message"
BASE64_ENCODED_TUPLE_PATHS = ["request.body", "primaryResponse.body", "shadowResponse.body"]
Expand All @@ -27,14 +33,21 @@
BULK_URI_PATH = "_bulk"


def get_element(element: str, dict_: dict) -> Optional[any]:
class DictionaryPathException(Exception):
pass


def get_element(element: str, dict_: dict, raise_on_error=False) -> Optional[any]:
keys = element.split('.')
rv = dict_
for key in keys:
try:
rv = rv[key]
except KeyError:
return None
if raise_on_error:
raise DictionaryPathException(f"Key {key} was not present.")
else:
return None
return rv


Expand All @@ -53,33 +66,67 @@ def parse_args():
return parser.parse_args()


def parse_body_value(raw_value: str, content_encoding: Optional[str], content_type: Optional[str], is_bulk: bool):
b64decoded = base64.b64decode(raw_value)
def parse_body_value(raw_value: str, content_encoding: Optional[str],
content_type: Optional[str], is_bulk: bool, line_no: int):
try:
b64decoded = base64.b64decode(raw_value)
except Exception as e:
logger.error(f"Body value on line {line_no} could not be decoded: {e}. Skipping parsing body value.")
return None
is_gzipped = content_encoding is not None and content_encoding == CONTENT_ENCODING_GZIP
is_json = content_type is not None and CONTENT_TYPE_JSON in content_type
if is_gzipped:
unzipped = gzip.decompress(b64decoded)
try:
unzipped = gzip.decompress(b64decoded)
except Exception as e:
logger.error(f"Body value on line {line_no} should be gzipped but could not be unzipped: {e}. "
"Skipping parsing body value.")
return b64decoded
else:
unzipped = b64decoded
decoded = unzipped.decode("utf-8")
try:
decoded = unzipped.decode("utf-8")
except Exception as e:
logger.error(f"Body value on line {line_no} could not be decoded to utf-8: {e}. "
"Skipping parsing body value.")
return unzipped
if is_json and len(decoded) > 0:
if is_bulk:
return [json.loads(line) for line in decoded.splitlines()]
return json.loads(decoded)
try:
return [json.loads(line) for line in decoded.splitlines()]
except Exception as e:
logger.error("Body value on line {line_no} should be a bulk json (list of json lines) but "
f"could not be parsed: {e}. Skipping parsing body value.")
return decoded
try:
return json.loads(decoded)
except Exception as e:
logger.error(f"Body value on line {line_no} should be a json but could not be parsed: {e}. "
"Skipping parsing body value.")
return decoded
return decoded


def parse_tuple(line):
def parse_tuple(line: str, line_no: int):
item = json.loads(line)
message = item[LOG_JSON_TUPLE_FIELD]
tuple = json.loads(message)
for path in BASE64_ENCODED_TUPLE_PATHS:
base64value = get_element(path, tuple)
content_encoding = get_element(CONTENT_ENCODING_PATH[path], tuple)
content_type = get_element(CONTENT_TYPE_PATH[path], tuple)
is_bulk_path = BULK_URI_PATH in get_element(URI_PATH, tuple)
value = parse_body_value(base64value, content_encoding, content_type, is_bulk_path)
set_element(path, tuple, value)
try:
is_bulk_path = BULK_URI_PATH in get_element(URI_PATH, tuple, raise_on_error=True)
except DictionaryPathException as e:
logger.error(f"`{URI_PATH}` on line {line_no} could not be loaded: {e} "
f"Skipping parsing tuple.")
return tuple
for body_path in BASE64_ENCODED_TUPLE_PATHS:
base64value = get_element(body_path, tuple)
if base64value is None:
# This component has no body element, which is potentially valid.
continue
content_encoding = get_element(CONTENT_ENCODING_PATH[body_path], tuple)
content_type = get_element(CONTENT_TYPE_PATH[body_path], tuple)
value = parse_body_value(base64value, content_encoding, content_type, is_bulk_path, line_no)
if value:
set_element(body_path, tuple, value)
return tuple


Expand All @@ -90,10 +137,10 @@ def parse_tuple(line):
else:
outfile = args.infile.parent / f"readable-{args.infile.name}"
print(f"Input file: {args.infile}; Output file: {outfile}")
with open(args.infile, 'r') as in_f:
with open(outfile, 'w') as out_f:
for line in in_f:
print(parse_tuple(line), file=out_f)

# TODO: add some try/catching
# TODO: add a progress indicator for large files
logging.basicConfig(level=logging.INFO)
with logging_redirect_tqdm():
with open(args.infile, 'r') as in_f:
with open(outfile, 'w') as out_f:
for i, line in tqdm(enumerate(in_f)):
print(parse_tuple(line, i + 1), file=out_f)

0 comments on commit 928e55c

Please sign in to comment.