Skip to content

Commit

Permalink
manage json decode error
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas-f authored Jan 10, 2024
1 parent fb2b0d6 commit dba9ef3
Showing 1 changed file with 30 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,33 +61,36 @@ def fetch_data(args):
try:
with gzip.open(file_path, 'rb') as f:
for line in f:
json_dict = json.loads(line)
if "_index" not in json_dict:
# must create index as it is not specified in the
# document
epoch = os.path.getmtime(file_path)
if "timestamp" in json_dict:
epoch = json_dict["timestamp"]
elif "_source" in json_dict and "timestamp" in \
json_dict["_source"]:
epoch = json_dict["_source"]["timestamp"]
elif "date" in json_dict:
epoch = calendar.timegm(datetime.datetime.strptime(
json_dict["date"], "%Y-%m-%dT%H:%M:%S.%fZ")
.timetuple())
dt = datetime.datetime.utcfromtimestamp(epoch)
stop_position = name.find("_")
if stop_position == -1:
stop_position = name.find(".")
json_dict["_index"] = name[:stop_position] + "_" + dt.strftime(args.time_format)
json_dict["_index"] = args.index_prepend + json_dict[
"_index"]
if "_id" not in json_dict:
# avoid duplicate by hashing the document
json_dict["_id"] = base64.b64encode(
hashlib.sha256(line).digest()).decode(
sys.getdefaultencoding())
yield json_dict
try:
json_dict = json.loads(line)
if "_index" not in json_dict:
# must create index as it is not specified in the
# document
epoch = os.path.getmtime(file_path)
if "timestamp" in json_dict:
epoch = json_dict["timestamp"]
elif "_source" in json_dict and "timestamp" in \
json_dict["_source"]:
epoch = json_dict["_source"]["timestamp"]
elif "date" in json_dict:
epoch = calendar.timegm(datetime.datetime.strptime(
json_dict["date"], "%Y-%m-%dT%H:%M:%S.%fZ")
.timetuple())
dt = datetime.datetime.utcfromtimestamp(epoch)
stop_position = name.find("_")
if stop_position == -1:
stop_position = name.find(".")
json_dict["_index"] = name[:stop_position] + "_" + dt.strftime(args.time_format)
json_dict["_index"] = args.index_prepend + json_dict[
"_index"]
if "_id" not in json_dict:
# avoid duplicate by hashing the document
json_dict["_id"] = base64.b64encode(
hashlib.sha256(line).digest()).decode(
sys.getdefaultencoding())
yield json_dict
except json.decoder.JSONDecodeError:
print("Cannot parse json: "+line)
except zlib.error as e:
print("Issue with compressed file named "+file_path, e)
if not args.keep_file:
Expand Down

0 comments on commit dba9ef3

Please sign in to comment.