Skip to content

Commit

Permalink
Cleaned up code before pushing commits.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 11, 2024
1 parent e601491 commit d7823b4
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 186 deletions.
16 changes: 5 additions & 11 deletions bin/debug/extract_timeline_for_day_range_and_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
# https://github.com/e-mission/e-mission-docs/issues/356#issuecomment-520630934
import emission.export.export as eee

def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name, databases):
logging.info("In export_timeline: Databases = %s" % args.databases)

def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name):
logging.info("Extracting timeline for user %s day %s -> %s and saving to file %s" %
(user_id, start_day_str, end_day_str, file_name))

Expand All @@ -40,7 +38,7 @@ def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name, da
end_day_ts, arrow.get(end_day_ts).to(timezone)))

ts = esta.TimeSeries.get_time_series(user_id)
eee.export(user_id, ts, start_day_ts, end_day_ts, "%s_%s" % (file_name, user_id), True, databases=databases)
eee.export(user_id, ts, start_day_ts, end_day_ts, "%s_%s" % (file_name, user_id), True)

import emission.core.get_database as edb
pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
Expand All @@ -53,13 +51,12 @@ def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name, da
gpfd, default=esj.wrapped_default, allow_nan=False, indent=4)

def export_timeline_for_users(user_id_list, args):
logging.info("In export_timeline_for_users: Databases = %s" % args.databases)
for curr_uuid in user_id_list:
if curr_uuid != '':
logging.info("=" * 50)
export_timeline(user_id=curr_uuid, databases=args.databases,
start_day_str=args.start_day, end_day_str= args.end_day,
timezone=args.timezone, file_name=args.file_prefix)
export_timeline(user_id=curr_uuid, start_day_str=args.start_day,
end_day_str= args.end_day, timezone=args.timezone,
file_name=args.file_prefix)

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Expand All @@ -70,9 +67,6 @@ def export_timeline_for_users(user_id_list, args):
group.add_argument("-u", "--user_uuid", nargs="+")
group.add_argument("-a", "--all", action="store_true")
group.add_argument("-f", "--file")

parser.add_argument("--databases", nargs="+", default=None,
help="List of databases to fetch data from (supported options: timeseries_db, analysis_timeseries_db, usercache)")
parser.add_argument("--timezone", default="UTC")
parser.add_argument("start_day", help="start day in utc - e.g. 'YYYY-MM-DD'" )
parser.add_argument("end_day", help="start day in utc - e.g. 'YYYY-MM-DD'" )
Expand Down
44 changes: 22 additions & 22 deletions bin/debug/load_multi_timeline_for_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import emission.storage.json_wrappers as esj
import argparse

import bin.debug.common
import bin.debug.common as common
import os

import gzip
Expand All @@ -26,19 +26,19 @@

args = None

def register_fake_users(prefix, unique_user_list):
def register_fake_users(prefix, unique_user_list, verbose):
logging.info("Creating user entries for %d users" % len(unique_user_list))

format_string = "{0}-%0{1}d".format(prefix, len(str(len(unique_user_list))))
logging.info("pattern = %s" % format_string)

for i, uuid in enumerate(unique_user_list):
username = (format_string % i)
if args.verbose is not None and i % args.verbose == 0:
if verbose is not None and i % verbose == 0:
logging.info("About to insert mapping %s -> %s" % (username, uuid))
user = ecwu.User.registerWithUUID(username, uuid)

def register_mapped_users(mapfile, unique_user_list):
def register_mapped_users(mapfile, unique_user_list, verbose):
uuid_entries = json.load(open(mapfile), object_hook=esj.wrapped_object_hook)
logging.info("Creating user entries for %d users from map of length %d" % (len(unique_user_list), len(mapfile)))

Expand All @@ -50,17 +50,17 @@ def register_mapped_users(mapfile, unique_user_list):
# register this way
# Pro: will do everything that register does, including creating the profile
# Con: will insert only username and uuid - id and update_ts will be different
if args.verbose is not None and i % args.verbose == 0:
if verbose is not None and i % verbose == 0:
logging.info("About to insert mapping %s -> %s" % (username, uuid))
user = ecwu.User.registerWithUUID(username, uuid)

def get_load_ranges(entries):
start_indices = list(range(0, len(entries), args.batch_size))
def get_load_ranges(entries, batch_size):
start_indices = list(range(0, len(entries), batch_size))
ranges = list(zip(start_indices, start_indices[1:]))
ranges.append((start_indices[-1], len(entries)))
return ranges

def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error):
def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error, verbose):
import emission.core.get_database as edb
import pymongo

Expand All @@ -70,7 +70,7 @@ def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error):
(curr_uuid, pipeline_filename))
with gzip.open(pipeline_filename) as gfd:
states = json.load(gfd, object_hook = esj.wrapped_object_hook)
if args.verbose:
if verbose:
logging.debug("Loading states of length %s" % len(states))
if len(states) > 0:
try:
Expand Down Expand Up @@ -109,8 +109,8 @@ def post_check(unique_user_list, all_rerun_list):
else:
logging.info("timeline contains a mixture of analysis results and raw data - complain to shankari!")

def load_multi_timeline_for_range(file_prefix, info_only, verbose, continue_on_error, mapfile, prefix):
fn = args.file_prefix
def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, continue_on_error=None, mapfile=None, prefix=None, batch_size=10000):
fn = file_prefix
logging.info("Loading file or prefix %s" % fn)
sel_file_list = common.read_files_with_prefix(fn)

Expand All @@ -136,22 +136,22 @@ def load_multi_timeline_for_range(file_prefix, info_only, verbose, continue_on_e
all_user_list.append(curr_uuid)
all_rerun_list.append(needs_rerun)

load_ranges = get_load_ranges(entries)
if not args.info_only:
load_ranges = get_load_ranges(entries, batch_size)
if not info_only:
for j, curr_range in enumerate(load_ranges):
if args.verbose is not None and j % args.verbose == 0:
if verbose is not None and j % verbose == 0:
logging.info("About to load range %s -> %s" % (curr_range[0], curr_range[1]))
wrapped_entries = [ecwe.Entry(e) for e in entries[curr_range[0]:curr_range[1]]]
(tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, args.continue_on_error)
(tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, continue_on_error)
print("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count))

unique_user_list = set(all_user_list)
if not args.info_only:
load_pipeline_states(args.file_prefix, unique_user_list, args.continue_on_error)
if args.mapfile is not None:
register_mapped_users(args.mapfile, unique_user_list)
elif args.prefix is not None:
register_fake_users(args.prefix, unique_user_list)
if not info_only:
load_pipeline_states(file_prefix, unique_user_list, continue_on_error, verbose)
if mapfile is not None:
register_mapped_users(mapfile, unique_user_list, verbose)
elif prefix is not None:
register_fake_users(prefix, unique_user_list, verbose)

post_check(unique_user_list, all_rerun_list)

Expand Down Expand Up @@ -187,4 +187,4 @@ def load_multi_timeline_for_range(file_prefix, info_only, verbose, continue_on_e
else:
logging.basicConfig(level=logging.INFO)

load_multi_timeline_for_range(args)
load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size)
95 changes: 38 additions & 57 deletions bin/purge_user_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
import emission.core.wrapper.user as ecwu
import emission.core.get_database as edb
import emission.core.wrapper.pipelinestate as ecwp
import emission.core.wrapper.pipelinestate as ecwp
import emission.storage.pipeline_queries as esp
import pandas as pd
import pymongo
from bson import ObjectId
import json
from uuid import UUID
import tempfile
from datetime import datetime
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.pipeline_queries as espq
import emission.export.export as eee

DEFAULT_DIR_NAME = tempfile.gettempdir()
DEFAULT_FILE_PREFIX = "old_timeseries_"
Expand Down Expand Up @@ -85,18 +86,9 @@ def custom_encoder(obj):
# # logging.info("{} deleted entries since {}".format(result.deleted_count, datetime.fromtimestamp(last_ts_run)))


import emission.storage.timeseries.abstract_timeseries as esta
import gzip
import emission.tests.common as etc
import emission.pipeline.export_stage as epe
import emission.storage.pipeline_queries as espq
import emission.exportdata.export_data as eeed
import emission.export.export as eee
import os
import emission.pipeline.export_stage as epe


def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEFAULT_DIR_NAME, unsafe_ignore_save=False):
# def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEFAULT_DIR_NAME, unsafe_ignore_save=False):
def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, unsafe_ignore_save=False):
if user_uuid:
user_id = uuid.UUID(user_uuid)
else:
Expand All @@ -106,14 +98,6 @@ def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEF

print("user_id: ", user_id)

# time_query = espq.get_time_range_for_export_data(user_id)
# file_name = dir_name + "/" + file_prefix + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs)
# export_dir_path = dir_name + "/" + file_prefix

# print("file_name: ", file_name)
# print("Start Ts: ", time_query.startTs)
# print("End Ts: ", time_query.endTs)

if unsafe_ignore_save is True:
logging.warning("CSV export was ignored")
else:
Expand All @@ -122,42 +106,42 @@ def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEF

ts = esta.TimeSeries.get_time_series(user_id)
time_query = espq.get_time_range_for_export_data(user_id)
# file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs)
file_name = os.environ.get('DATA_DIR', '/Users/mmahadik/Documents/Work/OpenPATH/Code/GitHub/logs/data/export_purge_restore/purge/tests') + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs)
export_file_name = dir_name + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs)

import datetime
print("Start Time: ", datetime.datetime.fromtimestamp(time_query.startTs).strftime('%Y-%m-%d %H:%M:%S'))

start_ts_datetime = datetime.datetime.fromtimestamp(time_query.startTs).strftime('%Y-%m-%d %H:%M:%S')
end_ts_datetime = datetime.datetime.fromtimestamp(time_query.endTs).strftime('%Y-%m-%d %H:%M:%S')
print("Start Time: ", start_ts_datetime)
print("Start Ts: ", time_query.startTs)
print("End Time: ", datetime.datetime.fromtimestamp(time_query.endTs).strftime('%Y-%m-%d %H:%M:%S'))
print("End Time: ", end_ts_datetime)
print("End Ts: ", time_query.endTs)

export_queries = eee.export(user_id, ts, time_query.startTs, time_query.endTs, file_name, False, databases)

database_dict = {
'timeseries_db': ts.timeseries_db,
'analysis_timeseries_db': ts.analysis_timeseries_db
# TODO: Add usercache
}

for database in databases:
for key, value in export_queries.items():
if value["type"] == "time":
ts_query = ts._get_query(time_query=value["query"])
else:
ts_query = value["query"]
delete_query = {"user_id": user_id, **ts_query}

# Get the count of matching documents
count = database_dict[database].count_documents(delete_query)
print(f"Number of documents matching for {database_dict[database]} with {key} query: {count}")
# delete_result = database_dict[database].delete_many(delete_query)
# deleted_count = delete_result.deleted_count
# print(f"Number of documents deleted for {database_dict[database]} with {key} query: {deleted_count}")


# logging.info("Deleting entries from database...")
# result = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}})
# logging.info("{} deleted entries since {}".format(result.deleted_count, datetime.fromtimestamp(last_ts_run)))
# Receiving these queries from export.py that were used to fetch the data entries that were exported.
# Need these for use in the purge_user_timeseries.py script so that we only delete those entries that were exported
export_queries = eee.export(user_id, ts, time_query.startTs, time_query.endTs, export_file_name, False, databases=['timeseries_db'])

for key, value in export_queries.items():
if value["type"] == "time":
ts_query = ts._get_query(time_query=value["query"])
print(ts_query)
# Separate case for handling the first_place_extra_query from export.py
# else:
# ts_query = ts._get_query(extra_query_list=[value["query"]])
# print(ts_query)
# sort_key = ts._get_sort_key(None)
# print(len(list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[value["query"]]))))
delete_query = {"user_id": user_id, **ts_query}

# Get the count of matching documents
count = ts.timeseries_db.count_documents(delete_query)
print(f"Number of documents matching for {ts.timeseries_db} with {key} query: {count}")
# print(f"Number of documents deleted for {ts.timeseries_db} with {key} query: {deleted_count}")

print("Deleting entries from database...")
# result = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}})
result = ts.timeseries_db.delete_many(delete_query)
print(f"Key query: {key}")
print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime))

return file_name

Expand All @@ -170,9 +154,6 @@ def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEF
group.add_argument("-e", "--user_email")
group.add_argument("-u", "--user_uuid")

parser.add_argument("--databases", nargs="+", default=None,
help="List of databases to fetch data from (supported options: timeseries_db, analysis_timeseries_db, usercache)"
)
parser.add_argument(
"-d", "--dir_name",
help="Target directory for exported JSON data (defaults to {})".format(DEFAULT_DIR_NAME),
Expand Down Expand Up @@ -201,4 +182,4 @@ def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEF
# }
logging.info(f"Default temporary directory: {DEFAULT_DIR_NAME}")
# purgeUserTimeseries(exportFileFlags, args.user_uuid, args.user_email, args.dir_name, args.file_prefix, args.unsafe_ignore_save)
purgeUserTimeseries(args.user_uuid, args.user_email, args.databases, args.dir_name, args.unsafe_ignore_save)
purgeUserTimeseries(args.user_uuid, args.user_email, args.dir_name, args.unsafe_ignore_save)
12 changes: 4 additions & 8 deletions emission/core/get_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@
try:
parsed=pymongo.uri_parser.parse_uri(url)
except:
# print("URL not formatted, defaulting to \"Stage_database\"")
# db_name = "Stage_database"
print("URL not formatted, defaulting to \"openpath_stage\"")
db_name = "openpath_stage"
print("URL not formatted, defaulting to \"Stage_database\"")
db_name = "Stage_database"
else:
if parsed['database']:
db_name = parsed['database']
else:
# print("URL does not specify a DB name, defaulting to \"Stage_database\"")
# db_name = "Stage_database"
print("URL does not specify a DB name, defaulting to \"openpath_stage\"")
db_name = "openpath_stage"
print("URL does not specify a DB name, defaulting to \"Stage_database\"")
db_name = "Stage_database"

print("Connecting to database URL "+url)
_current_db = MongoClient(url, uuidRepresentation='pythonLegacy')[db_name]
Expand Down
2 changes: 2 additions & 0 deletions emission/export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None):
json.dump(combined_list,
gcfd, default=esj.wrapped_default, allow_nan=False, indent=4)

# Returning these queries that were used to fetch the data entries that were exported.
# Need these for use in the purge_user_timeseries.py script so that we only delete those entries that were exported
return {
'trip_time_query': { 'query': trip_time_query, 'type': "time" },
'place_time_query': { 'query': place_time_query, 'type': "time" },
Expand Down
13 changes: 5 additions & 8 deletions emission/exportdata/export_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import bson.json_util as bju
import os

def export_data(user_id, databases=None, dir_name=None):
def export_data(user_id):
try:
edp = ExportDataPipeline()
edp.user_id = user_id
edp.run_export_data_pipeline(user_id, databases, dir_name)
edp.run_export_data_pipeline(user_id)
if edp.last_trip_done is None:
logging.debug("After run, last_trip_done == None, must be early return")
espq.mark_export_data_done(user_id, edp.last_trip_done)
Expand All @@ -31,15 +31,12 @@ def __init__(self):
def last_trip_done(self):
return self._last_trip_done

def run_export_data_pipeline(self, user_id, databases=None, dir_name='emission/archived'):
def run_export_data_pipeline(self, user_id):
ts = esta.TimeSeries.get_time_series(user_id)
time_query = espq.get_time_range_for_export_data(user_id)
if "DATA_DIR" in os.environ:
if os.path.isdir(os.environ['DATA_DIR']) == False:
os.mkdir(os.environ['DATA_DIR'])
# file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs)
print("In export_data.run_export_data_pipeline: Start timestamp: ", time_query.startTs)
print("In export_data.run_export_data_pipeline: End timestamp: ", time_query.endTs)
file_name = dir_name + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs)
eee.export(user_id, ts, time_query.startTs, time_query.endTs, file_name, False, databases)
file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs)
eee.export(user_id, ts, time_query.startTs, time_query.endTs, file_name, False)

Loading

0 comments on commit d7823b4

Please sign in to comment.