diff --git a/bin/debug/extract_timeline_for_day_range_and_user.py b/bin/debug/extract_timeline_for_day_range_and_user.py index 31329a8e2..3b8ff66ba 100644 --- a/bin/debug/extract_timeline_for_day_range_and_user.py +++ b/bin/debug/extract_timeline_for_day_range_and_user.py @@ -26,9 +26,7 @@ # https://github.com/e-mission/e-mission-docs/issues/356#issuecomment-520630934 import emission.export.export as eee -def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name, databases): - logging.info("In export_timeline: Databases = %s" % args.databases) - +def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name): logging.info("Extracting timeline for user %s day %s -> %s and saving to file %s" % (user_id, start_day_str, end_day_str, file_name)) @@ -40,7 +38,7 @@ def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name, da end_day_ts, arrow.get(end_day_ts).to(timezone))) ts = esta.TimeSeries.get_time_series(user_id) - eee.export(user_id, ts, start_day_ts, end_day_ts, "%s_%s" % (file_name, user_id), True, databases=databases) + eee.export(user_id, ts, start_day_ts, end_day_ts, "%s_%s" % (file_name, user_id), True) import emission.core.get_database as edb pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id})) @@ -53,13 +51,12 @@ def export_timeline(user_id, start_day_str, end_day_str, timezone, file_name, da gpfd, default=esj.wrapped_default, allow_nan=False, indent=4) def export_timeline_for_users(user_id_list, args): - logging.info("In export_timeline_for_users: Databases = %s" % args.databases) for curr_uuid in user_id_list: if curr_uuid != '': logging.info("=" * 50) - export_timeline(user_id=curr_uuid, databases=args.databases, - start_day_str=args.start_day, end_day_str= args.end_day, - timezone=args.timezone, file_name=args.file_prefix) + export_timeline(user_id=curr_uuid, start_day_str=args.start_day, + end_day_str= args.end_day, timezone=args.timezone, + file_name=args.file_prefix) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) @@ -70,9 +67,6 @@ def export_timeline_for_users(user_id_list, args): group.add_argument("-u", "--user_uuid", nargs="+") group.add_argument("-a", "--all", action="store_true") group.add_argument("-f", "--file") - - parser.add_argument("--databases", nargs="+", default=None, - help="List of databases to fetch data from (supported options: timeseries_db, analysis_timeseries_db, usercache)") parser.add_argument("--timezone", default="UTC") parser.add_argument("start_day", help="start day in utc - e.g. 'YYYY-MM-DD'" ) parser.add_argument("end_day", help="start day in utc - e.g. 'YYYY-MM-DD'" ) diff --git a/bin/debug/load_multi_timeline_for_range.py b/bin/debug/load_multi_timeline_for_range.py index 06c3f12ce..79973fde0 100644 --- a/bin/debug/load_multi_timeline_for_range.py +++ b/bin/debug/load_multi_timeline_for_range.py @@ -14,7 +14,7 @@ import emission.storage.json_wrappers as esj import argparse -import bin.debug.common +import bin.debug.common as common import os import gzip @@ -26,7 +26,7 @@ args = None -def register_fake_users(prefix, unique_user_list): +def register_fake_users(prefix, unique_user_list, verbose): logging.info("Creating user entries for %d users" % len(unique_user_list)) format_string = "{0}-%0{1}d".format(prefix, len(str(len(unique_user_list)))) @@ -34,11 +34,11 @@ def register_fake_users(prefix, unique_user_list): for i, uuid in enumerate(unique_user_list): username = (format_string % i) - if args.verbose is not None and i % args.verbose == 0: + if verbose is not None and i % verbose == 0: logging.info("About to insert mapping %s -> %s" % (username, uuid)) user = ecwu.User.registerWithUUID(username, uuid) -def register_mapped_users(mapfile, unique_user_list): +def register_mapped_users(mapfile, unique_user_list, verbose): uuid_entries = json.load(open(mapfile), object_hook=esj.wrapped_object_hook) logging.info("Creating user entries for %d users from map of length %d" % (len(unique_user_list), len(mapfile))) @@ -50,17 +50,17 @@ def register_mapped_users(mapfile, unique_user_list): # register this way # Pro: will do everything that register does, including creating the profile # Con: will insert only username and uuid - id and update_ts will be different - if args.verbose is not None and i % args.verbose == 0: + if verbose is not None and i % verbose == 0: logging.info("About to insert mapping %s -> %s" % (username, uuid)) user = ecwu.User.registerWithUUID(username, uuid) -def get_load_ranges(entries): - start_indices = list(range(0, len(entries), args.batch_size)) +def get_load_ranges(entries, batch_size): + start_indices = list(range(0, len(entries), batch_size)) ranges = list(zip(start_indices, start_indices[1:])) ranges.append((start_indices[-1], len(entries))) return ranges -def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error): +def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error, verbose): import emission.core.get_database as edb import pymongo @@ -70,7 +70,7 @@ def load_pipeline_states(file_prefix, all_uuid_list, continue_on_error): (curr_uuid, pipeline_filename)) with gzip.open(pipeline_filename) as gfd: states = json.load(gfd, object_hook = esj.wrapped_object_hook) - if args.verbose: + if verbose: logging.debug("Loading states of length %s" % len(states)) if len(states) > 0: try: @@ -109,8 +109,8 @@ def post_check(unique_user_list, all_rerun_list): else: logging.info("timeline contains a mixture of analysis results and raw data - complain to shankari!") -def load_multi_timeline_for_range(file_prefix, info_only, verbose, continue_on_error, mapfile, prefix): - fn = args.file_prefix +def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, continue_on_error=None, mapfile=None, prefix=None, batch_size=10000): + fn = file_prefix logging.info("Loading file or prefix %s" % fn) sel_file_list = common.read_files_with_prefix(fn) @@ -136,22 +136,22 @@ def load_multi_timeline_for_range(file_prefix, info_only, verbose, continue_on_e all_user_list.append(curr_uuid) all_rerun_list.append(needs_rerun) - load_ranges = get_load_ranges(entries) - if not args.info_only: + load_ranges = get_load_ranges(entries, batch_size) + if not info_only: for j, curr_range in enumerate(load_ranges): - if args.verbose is not None and j % args.verbose == 0: + if verbose is not None and j % verbose == 0: logging.info("About to load range %s -> %s" % (curr_range[0], curr_range[1])) wrapped_entries = [ecwe.Entry(e) for e in entries[curr_range[0]:curr_range[1]]] - (tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, args.continue_on_error) + (tsdb_count, ucdb_count) = estcs.insert_entries(curr_uuid, wrapped_entries, continue_on_error) print("For uuid %s, finished loading %d entries into the usercache and %d entries into the timeseries" % (curr_uuid, ucdb_count, tsdb_count)) unique_user_list = set(all_user_list) - if not args.info_only: - load_pipeline_states(args.file_prefix, unique_user_list, args.continue_on_error) - if args.mapfile is not None: - register_mapped_users(args.mapfile, unique_user_list) - elif args.prefix is not None: - register_fake_users(args.prefix, unique_user_list) + if not info_only: + load_pipeline_states(file_prefix, unique_user_list, continue_on_error, verbose) + if mapfile is not None: + register_mapped_users(mapfile, unique_user_list, verbose) + elif prefix is not None: + register_fake_users(prefix, unique_user_list, verbose) post_check(unique_user_list, all_rerun_list) @@ -187,4 +187,4 @@ def load_multi_timeline_for_range(file_prefix, info_only, verbose, continue_on_e else: logging.basicConfig(level=logging.INFO) - load_multi_timeline_for_range(args) + load_multi_timeline_for_range(args.file_prefix, args.info_only, args.verbose, args.continue_on_error, args.mapfile, args.prefix, args.batch_size) diff --git a/bin/purge_user_timeseries.py b/bin/purge_user_timeseries.py index 836c9bc04..8c51aad42 100644 --- a/bin/purge_user_timeseries.py +++ b/bin/purge_user_timeseries.py @@ -5,15 +5,16 @@ import emission.core.wrapper.user as ecwu import emission.core.get_database as edb import emission.core.wrapper.pipelinestate as ecwp -import emission.core.wrapper.pipelinestate as ecwp import emission.storage.pipeline_queries as esp import pandas as pd -import pymongo from bson import ObjectId import json from uuid import UUID import tempfile from datetime import datetime +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.pipeline_queries as espq +import emission.export.export as eee DEFAULT_DIR_NAME = tempfile.gettempdir() DEFAULT_FILE_PREFIX = "old_timeseries_" @@ -85,18 +86,9 @@ def custom_encoder(obj): # # logging.info("{} deleted entries since {}".format(result.deleted_count, datetime.fromtimestamp(last_ts_run))) -import emission.storage.timeseries.abstract_timeseries as esta -import gzip -import emission.tests.common as etc -import emission.pipeline.export_stage as epe -import emission.storage.pipeline_queries as espq -import emission.exportdata.export_data as eeed -import emission.export.export as eee -import os -import emission.pipeline.export_stage as epe - -def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEFAULT_DIR_NAME, unsafe_ignore_save=False): +# def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEFAULT_DIR_NAME, unsafe_ignore_save=False): +def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, unsafe_ignore_save=False): if user_uuid: user_id = uuid.UUID(user_uuid) else: @@ -106,14 +98,6 @@ def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEF print("user_id: ", user_id) - # time_query = espq.get_time_range_for_export_data(user_id) - # file_name = dir_name + "/" + file_prefix + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs) - # export_dir_path = dir_name + "/" + file_prefix - - # print("file_name: ", file_name) - # print("Start Ts: ", time_query.startTs) - # print("End Ts: ", time_query.endTs) - if unsafe_ignore_save is True: logging.warning("CSV export was ignored") else: @@ -122,42 +106,42 @@ def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEF ts = esta.TimeSeries.get_time_series(user_id) time_query = espq.get_time_range_for_export_data(user_id) - # file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs) - file_name = os.environ.get('DATA_DIR', '/Users/mmahadik/Documents/Work/OpenPATH/Code/GitHub/logs/data/export_purge_restore/purge/tests') + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs) + export_file_name = dir_name + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs) - import datetime - print("Start Time: ", datetime.datetime.fromtimestamp(time_query.startTs).strftime('%Y-%m-%d %H:%M:%S')) + + start_ts_datetime = datetime.datetime.fromtimestamp(time_query.startTs).strftime('%Y-%m-%d %H:%M:%S') + end_ts_datetime = datetime.datetime.fromtimestamp(time_query.endTs).strftime('%Y-%m-%d %H:%M:%S') + print("Start Time: ", start_ts_datetime) print("Start Ts: ", time_query.startTs) - print("End Time: ", datetime.datetime.fromtimestamp(time_query.endTs).strftime('%Y-%m-%d %H:%M:%S')) + print("End Time: ", end_ts_datetime) print("End Ts: ", time_query.endTs) - export_queries = eee.export(user_id, ts, time_query.startTs, time_query.endTs, file_name, False, databases) - - database_dict = { - 'timeseries_db': ts.timeseries_db, - 'analysis_timeseries_db': ts.analysis_timeseries_db - # TODO: Add usercache - } - - for database in databases: - for key, value in export_queries.items(): - if value["type"] == "time": - ts_query = ts._get_query(time_query=value["query"]) - else: - ts_query = value["query"] - delete_query = {"user_id": user_id, **ts_query} - - # Get the count of matching documents - count = database_dict[database].count_documents(delete_query) - print(f"Number of documents matching for {database_dict[database]} with {key} query: {count}") - # delete_result = database_dict[database].delete_many(delete_query) - # deleted_count = delete_result.deleted_count - # print(f"Number of documents deleted for {database_dict[database]} with {key} query: {deleted_count}") - - - # logging.info("Deleting entries from database...") - # result = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}) - # logging.info("{} deleted entries since {}".format(result.deleted_count, datetime.fromtimestamp(last_ts_run))) + # Receiving these queries from export.py that were used to fetch the data entries that were exported. + # Need these for use in the purge_user_timeseries.py script so that we only delete those entries that were exported + export_queries = eee.export(user_id, ts, time_query.startTs, time_query.endTs, export_file_name, False, databases=['timeseries_db']) + + for key, value in export_queries.items(): + if value["type"] == "time": + ts_query = ts._get_query(time_query=value["query"]) + print(ts_query) + # Separate case for handling the first_place_extra_query from export.py + # else: + # ts_query = ts._get_query(extra_query_list=[value["query"]]) + # print(ts_query) + # sort_key = ts._get_sort_key(None) + # print(len(list(ts.find_entries(key_list=None, time_query=None, extra_query_list=[value["query"]])))) + delete_query = {"user_id": user_id, **ts_query} + + # Get the count of matching documents + count = ts.timeseries_db.count_documents(delete_query) + print(f"Number of documents matching for {ts.timeseries_db} with {key} query: {count}") + # print(f"Number of documents deleted for {ts.timeseries_db} with {key} query: {deleted_count}") + + print("Deleting entries from database...") + # result = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}) + result = ts.timeseries_db.delete_many(delete_query) + print(f"Key query: {key}") + print("{} deleted entries from {} to {}".format(result.deleted_count, start_ts_datetime, end_ts_datetime)) return file_name @@ -170,9 +154,6 @@ def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEF group.add_argument("-e", "--user_email") group.add_argument("-u", "--user_uuid") - parser.add_argument("--databases", nargs="+", default=None, - help="List of databases to fetch data from (supported options: timeseries_db, analysis_timeseries_db, usercache)" - ) parser.add_argument( "-d", "--dir_name", help="Target directory for exported JSON data (defaults to {})".format(DEFAULT_DIR_NAME), @@ -201,4 +182,4 @@ def purgeUserTimeseries(user_uuid, user_email=None, databases=None, dir_name=DEF # } logging.info(f"Default temporary directory: {DEFAULT_DIR_NAME}") # purgeUserTimeseries(exportFileFlags, args.user_uuid, args.user_email, args.dir_name, args.file_prefix, args.unsafe_ignore_save) - purgeUserTimeseries(args.user_uuid, args.user_email, args.databases, args.dir_name, args.unsafe_ignore_save) \ No newline at end of file + purgeUserTimeseries(args.user_uuid, args.user_email, args.dir_name, args.unsafe_ignore_save) \ No newline at end of file diff --git a/emission/core/get_database.py b/emission/core/get_database.py index 7b5121c73..0939b41d9 100644 --- a/emission/core/get_database.py +++ b/emission/core/get_database.py @@ -24,18 +24,14 @@ try: parsed=pymongo.uri_parser.parse_uri(url) except: - # print("URL not formatted, defaulting to \"Stage_database\"") - # db_name = "Stage_database" - print("URL not formatted, defaulting to \"openpath_stage\"") - db_name = "openpath_stage" + print("URL not formatted, defaulting to \"Stage_database\"") + db_name = "Stage_database" else: if parsed['database']: db_name = parsed['database'] else: - # print("URL does not specify a DB name, defaulting to \"Stage_database\"") - # db_name = "Stage_database" - print("URL does not specify a DB name, defaulting to \"openpath_stage\"") - db_name = "openpath_stage" + print("URL does not specify a DB name, defaulting to \"Stage_database\"") + db_name = "Stage_database" print("Connecting to database URL "+url) _current_db = MongoClient(url, uuidRepresentation='pythonLegacy')[db_name] diff --git a/emission/export/export.py b/emission/export/export.py index 34bdba826..ae9e11955 100644 --- a/emission/export/export.py +++ b/emission/export/export.py @@ -112,6 +112,8 @@ def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None): json.dump(combined_list, gcfd, default=esj.wrapped_default, allow_nan=False, indent=4) + # Returning these queries that were used to fetch the data entries that were exported. + # Need these for use in the purge_user_timeseries.py script so that we only delete those entries that were exported return { 'trip_time_query': { 'query': trip_time_query, 'type': "time" }, 'place_time_query': { 'query': place_time_query, 'type': "time" }, diff --git a/emission/exportdata/export_data.py b/emission/exportdata/export_data.py index 29f3202e7..cc87a243d 100644 --- a/emission/exportdata/export_data.py +++ b/emission/exportdata/export_data.py @@ -11,11 +11,11 @@ import bson.json_util as bju import os -def export_data(user_id, databases=None, dir_name=None): +def export_data(user_id): try: edp = ExportDataPipeline() edp.user_id = user_id - edp.run_export_data_pipeline(user_id, databases, dir_name) + edp.run_export_data_pipeline(user_id) if edp.last_trip_done is None: logging.debug("After run, last_trip_done == None, must be early return") espq.mark_export_data_done(user_id, edp.last_trip_done) @@ -31,15 +31,12 @@ def __init__(self): def last_trip_done(self): return self._last_trip_done - def run_export_data_pipeline(self, user_id, databases=None, dir_name='emission/archived'): + def run_export_data_pipeline(self, user_id): ts = esta.TimeSeries.get_time_series(user_id) time_query = espq.get_time_range_for_export_data(user_id) if "DATA_DIR" in os.environ: if os.path.isdir(os.environ['DATA_DIR']) == False: os.mkdir(os.environ['DATA_DIR']) - # file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs) - print("In export_data.run_export_data_pipeline: Start timestamp: ", time_query.startTs) - print("In export_data.run_export_data_pipeline: End timestamp: ", time_query.endTs) - file_name = dir_name + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs) - eee.export(user_id, ts, time_query.startTs, time_query.endTs, file_name, False, databases) + file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (user_id, time_query.startTs, time_query.endTs) + eee.export(user_id, ts, time_query.startTs, time_query.endTs, file_name, False) diff --git a/emission/pipeline/export_stage.py b/emission/pipeline/export_stage.py index eef293bf5..d43139770 100644 --- a/emission/pipeline/export_stage.py +++ b/emission/pipeline/export_stage.py @@ -20,7 +20,7 @@ import emission.storage.decorations.stats_queries as esds import emission.exportdata.export_data as eeded -def run_export_pipeline(process_number, uuid_list, databases=None, dir_name=None): +def run_export_pipeline(process_number, uuid_list): try: with open("conf/log/export.conf", "r") as cf: export_log_config = json.load(cf) @@ -43,18 +43,18 @@ def run_export_pipeline(process_number, uuid_list, databases=None, dir_name=None continue try: - run_export_pipeline_for_user(uuid, databases, dir_name) + run_export_pipeline_for_user(uuid) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " "for user %s, skipping" % (e, uuid)) -def run_export_pipeline_for_user(uuid, databases=None, dir_name=None): +def run_export_pipeline_for_user(uuid): with ect.Timer() as edt: logging.info("*" * 10 + "UUID %s: exporting data" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: exporting data" % uuid + "*" * 10) - eeded.export_data(uuid, databases, dir_name) + eeded.export_data(uuid) esds.store_pipeline_time(uuid, ecwp.PipelineStages.EXPORT_DATA.name, time.time(), edt.elapsed) diff --git a/emission/tests/binTests/TestExportPurgeRestoreUserTimeseries.py b/emission/tests/binTests/TestExportPurgeRestoreUserTimeseries.py index bc91230ee..90161d98c 100644 --- a/emission/tests/binTests/TestExportPurgeRestoreUserTimeseries.py +++ b/emission/tests/binTests/TestExportPurgeRestoreUserTimeseries.py @@ -36,18 +36,11 @@ def testExportPurgeRestoreUserTimeseries(self): # Without running intake pipeline, for both usercache and analysis, no entries exported, export file not created since no data in usercache # With intake pipeline, can see same 3 analysis key entries: 'analysis/cleaned_place', 'segmentation/raw_place', 'analysis/confirmed_place' - # export_databases = ['usercache_db'] - # export_databases = ['analysis_timeseries_db'] - export_databases = ['timeseries_db'] - export_file_name = bput.purgeUserTimeseries(str(self.testUUID), databases=export_databases, dir_name=tmpdirname) + export_file_name = bput.purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname) export_file_name += ".gz" - import argparse - parser = argparse.ArgumentParser() - parser.add_argument("file_prefix", help="the name of the file or file prefix that contains the json representation of the timeline") - args = parser.parse_args(["--file_prefix", export_file_name]) print("In TestExportPurgeRestoreUserTimeseries: export_file_name: ", export_file_name) - lmtfr.load_multi_timeline_for_range(args) + lmtfr.load_multi_timeline_for_range(export_file_name) diff --git a/emission/tests/exportTests/TestExportModule.py b/emission/tests/exportTests/TestExportModule.py index 1a64a7cf7..f4ce7ed80 100644 --- a/emission/tests/exportTests/TestExportModule.py +++ b/emission/tests/exportTests/TestExportModule.py @@ -24,29 +24,15 @@ def testExportModule(self): ts = esta.TimeSeries.get_time_series(self.testUUID) time_query = espq.get_time_range_for_export_data(self.testUUID) - # file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (self.testUUID, time_query.startTs, time_query.endTs) - file_name = os.environ.get('DATA_DIR', '../logs/data/export_purge_restore/export/archived') + "/archive_%s_%s_%s" % (self.testUUID, time_query.startTs, time_query.endTs) + file_name = os.environ.get('DATA_DIR', 'emission/archived') + "/archive_%s_%s_%s" % (self.testUUID, time_query.startTs, time_query.endTs) - import datetime - print("UUID: ", self.testUUID) - print("File Name: ", file_name) - - print("Start Time: ", datetime.datetime.fromtimestamp(time_query.startTs).strftime('%Y-%m-%d %H:%M:%S')) - print("Start Ts: ", time_query.startTs) - print("End Time: ", datetime.datetime.fromtimestamp(time_query.endTs).strftime('%Y-%m-%d %H:%M:%S')) - print("End Ts: ", time_query.endTs) - - eee.export(self.testUUID, ts, time_query.startTs, time_query.endTs, file_name, False, databases=["timeseries_db"]) - # eee.export(self.testUUID, ts, time_query.startTs, time_query.endTs, file_name, False, databases=["analysis_timeseries_db"]) - # eee.export(self.testUUID, ts, time_query.startTs, time_query.endTs, file_name, False, databases=["usercache_db"]) + eee.export(self.testUUID, ts, time_query.startTs, time_query.endTs, file_name, False) file_name += ".gz" #Assert the file exists after the export process self.assertTrue(pl.Path(file_name).is_file()) with gzip.open(file_name, 'r') as ef: exported_data = json.loads(ef.read().decode('utf-8')) - - print("Exported Data: ", len(exported_data)) confirmed_trips_exported = [] for t in exported_data: @@ -83,57 +69,57 @@ def testExportModule(self): self.assertEqual(len(background_location_exported), len(background_location_raw)) self.assertEqual(len(background_location_exported), len(background_location_db)) - # def testExportPipelineFull(self): - # #Testing functionality of the export pipeline entirely, first with tempdir usage - # #Setup - # etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") - # etc.runIntakePipeline(self.testUUID) - - # #Create a temporary directory within the emission folder - # with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: - # self.assertTrue(path.isdir(tmpdirname)) - - # #Set the envrionment variable - # os.environ['DATA_DIR'] = tmpdirname - # self.assertEqual(os.environ['DATA_DIR'], tmpdirname) - - # #Run the export pipeline - # eeed.export_data(self.testUUID) - # directory = os.listdir(tmpdirname) - - # #Check to see if there is a file in the temp directory - # self.assertTrue(len(directory) == 1) - - # #Check to make sure the file is of the correct UUID - # uuid = str(self.testUUID) - # file_name = directory[0] - # self.assertTrue(uuid in file_name) - - # def testExportPipelineCreateDirectory(self): - # #Testing for the creation of non existent directories (creation in export pipeline) - # #Setup - # etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") - # etc.runIntakePipeline(self.testUUID) - - # #Set the os.environ['DATA_DIR'] to a directory path that does not yet exist - # os.environ['DATA_DIR'] = '/tmp/nonexistent' - # self.assertTrue(os.environ['DATA_DIR'], '/tmp/nonexistent') - - # #Run the export pipeline - # eeed.export_data(self.testUUID) - # directory = os.listdir(os.environ['DATA_DIR']) - - # #Check to see if there is a file in the directory - # uuid = str(self.testUUID) - # file_name = directory[0] - # self.assertTrue(uuid in file_name) - - # #Remove the file from the directory - # dir = os.environ['DATA_DIR'] - # for f in os.listdir(dir): - # os.remove(os.path.join(dir, f)) - # os.rmdir(dir) - # self.assertFalse(os.path.isdir(os.environ['DATA_DIR'])) + def testExportPipelineFull(self): + #Testing functionality of the export pipeline entirely, first with tempdir usage + #Setup + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") + etc.runIntakePipeline(self.testUUID) + + #Create a temporary directory within the emission folder + with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: + self.assertTrue(path.isdir(tmpdirname)) + + #Set the envrionment variable + os.environ['DATA_DIR'] = tmpdirname + self.assertEqual(os.environ['DATA_DIR'], tmpdirname) + + #Run the export pipeline + eeed.export_data(self.testUUID) + directory = os.listdir(tmpdirname) + + #Check to see if there is a file in the temp directory + self.assertTrue(len(directory) == 1) + + #Check to make sure the file is of the correct UUID + uuid = str(self.testUUID) + file_name = directory[0] + self.assertTrue(uuid in file_name) + + def testExportPipelineCreateDirectory(self): + #Testing for the creation of non existent directories (creation in export pipeline) + #Setup + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") + etc.runIntakePipeline(self.testUUID) + + #Set the os.environ['DATA_DIR'] to a directory path that does not yet exist + os.environ['DATA_DIR'] = '/tmp/nonexistent' + self.assertTrue(os.environ['DATA_DIR'], '/tmp/nonexistent') + + #Run the export pipeline + eeed.export_data(self.testUUID) + directory = os.listdir(os.environ['DATA_DIR']) + + #Check to see if there is a file in the directory + uuid = str(self.testUUID) + file_name = directory[0] + self.assertTrue(uuid in file_name) + + #Remove the file from the directory + dir = os.environ['DATA_DIR'] + for f in os.listdir(dir): + os.remove(os.path.join(dir, f)) + os.rmdir(dir) + self.assertFalse(os.path.isdir(os.environ['DATA_DIR'])) def readDataFromFile(self, dataFile): with open(dataFile) as dect: