diff --git a/.github/workflows/test-with-manual-install.yml b/.github/workflows/test-with-manual-install.yml index 4a81eb000..bfb8b530b 100644 --- a/.github/workflows/test-with-manual-install.yml +++ b/.github/workflows/test-with-manual-install.yml @@ -70,4 +70,6 @@ jobs: - name: Teardown the test environment shell: bash -l {0} - run: source setup/teardown_tests.sh + run: | + source setup/activate_conda.sh + source setup/teardown_tests.sh diff --git a/bin/push/silent_ios_push.py b/bin/push/silent_ios_push.py index c4887abc9..03c7da5b2 100644 --- a/bin/push/silent_ios_push.py +++ b/bin/push/silent_ios_push.py @@ -7,6 +7,7 @@ from builtins import * import json import logging +logging.basicConfig(level=logging.DEBUG) import argparse import emission.net.ext_service.push.notify_usage as pnu diff --git a/emission/analysis/config.py b/emission/analysis/config.py index d484e5354..8dc514dcf 100644 --- a/emission/analysis/config.py +++ b/emission/analysis/config.py @@ -1,17 +1,21 @@ import json import os +ANALYSIS_CONF_PATH = "conf/analysis/debug.conf.json" +ANALYSIS_CONF_PROD_PATH = "conf/analysis/debug.conf.prod.json" +ANALYSIS_CONF_DEV_PATH = "conf/analysis/debug.conf.dev.json" + def get_config_data(): try: print("Trying to open debug.conf.json") - config_file = open('conf/analysis/debug.conf.json') + config_file = open(ANALYSIS_CONF_PATH) except: if os.getenv("PROD_STAGE") == "TRUE": print("In production environment, config not overridden, using default production debug.conf") - config_file = open('conf/analysis/debug.conf.prod.json') + config_file = open(ANALYSIS_CONF_PROD_PATH) else: print("analysis.debug.conf.json not configured, falling back to sample, default configuration") - config_file = open('conf/analysis/debug.conf.dev.json') + config_file = open(ANALYSIS_CONF_DEV_PATH) ret_val = json.load(config_file) config_file.close() return ret_val diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index 7b47bd49b..addac70e7 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -51,28 +51,20 @@ def segment_into_trips(self, timeseries, time_query): pass def segment_current_trips(user_id): - with ect.Timer() as t_get_time_series: - ts = esta.TimeSeries.get_time_series(user_id) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_series", time.time(), t_get_time_series.elapsed) - - with ect.Timer() as t_get_time_range: - time_query = epq.get_time_range_for_segmentation(user_id) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_time_range_for_segmentation", time.time(), t_get_time_range.elapsed) + ts = esta.TimeSeries.get_time_series(user_id) + time_query = epq.get_time_range_for_segmentation(user_id) import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_time_filter as dstf import emission.analysis.intake.segmentation.trip_segmentation_methods.dwell_segmentation_dist_filter as dsdf - with ect.Timer() as t_create_time_filter: - dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins - point_threshold=9, - distance_threshold=100) # 100 m - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_time_filter", time.time(), t_create_time_filter.elapsed) - with ect.Timer() as t_create_dist_filter: - dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins - point_threshold=9, - distance_threshold=50) # 50 m - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/create_dist_filter", time.time(), t_create_dist_filter.elapsed) + dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold=5 * 60, # 5 mins + point_threshold=9, + distance_threshold=100) # 100 m + + dsdfsm = dsdf.DwellSegmentationDistFilter(time_threshold=10 * 60, # 10 mins + point_threshold=9, + distance_threshold=50) # 50 m filter_methods = {"time": dstfsm, "distance": dsdfsm} filter_method_names = {"time": "DwellSegmentationTimeFilter", "distance": "DwellSegmentationDistFilter"} @@ -89,24 +81,20 @@ def segment_current_trips(user_id): epq.mark_segmentation_done(user_id, None) return - with ect.Timer() as t_handle_out_of_order: - out_of_order_points = loc_df[loc_df.ts.diff() < 0] - if len(out_of_order_points) > 0: - logging.info("Found out of order points!") - logging.info("%s" % out_of_order_points) - # drop from the table - loc_df = loc_df.drop(out_of_order_points.index.tolist()) - loc_df.reset_index(inplace=True) - # invalidate in the database. - out_of_order_id_list = out_of_order_points["_id"].tolist() - logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) - for ooid in out_of_order_id_list: - ts.invalidate_raw_entry(ooid) - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/handle_out_of_order_points", time.time(), t_handle_out_of_order.elapsed) - - with ect.Timer() as t_get_filters: - filters_in_df = loc_df["filter"].dropna().unique() - esds.store_pipeline_time(user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/get_filters_in_df", time.time(), t_get_filters.elapsed) + out_of_order_points = loc_df[loc_df.ts.diff() < 0] + if len(out_of_order_points) > 0: + logging.info("Found out of order points!") + logging.info("%s" % out_of_order_points) + # drop from the table + loc_df = loc_df.drop(out_of_order_points.index.tolist()) + loc_df.reset_index(inplace=True) + # invalidate in the database. + out_of_order_id_list = out_of_order_points["_id"].tolist() + logging.debug("out_of_order_id_list = %s" % out_of_order_id_list) + for ooid in out_of_order_id_list: + ts.invalidate_raw_entry(ooid) + + filters_in_df = loc_df["filter"].dropna().unique() logging.debug("Filters in the dataframe = %s" % filters_in_df) if len(filters_in_df) == 1: diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index 2ffd6f058..388d23848 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -60,23 +60,9 @@ def segment_into_trips(self, timeseries, time_query): t_get_filtered_points.elapsed ) - with ect.Timer() as t_mark_valid: - self.filtered_points_df.loc[:, "valid"] = True - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/mark_valid", - time.time(), - t_mark_valid.elapsed - ) + self.filtered_points_df.loc[:, "valid"] = True - with ect.Timer() as t_get_transition_df: - self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_transition_df", - time.time(), - t_get_transition_df.elapsed - ) + self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) if len(self.transition_df) > 0: logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) @@ -103,14 +89,7 @@ def segment_into_trips(self, timeseries, time_query): # segmentation_points.append(currPoint) if just_ended: - with ect.Timer() as t_continue_just_ended: - continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/continue_just_ended", - time.time(), - t_continue_just_ended.elapsed - ) + continue_flag = self.continue_just_ended(idx, currPoint, self.filtered_points_df) if continue_flag: # We have "processed" the currPoint by deciding to glom it @@ -119,14 +98,7 @@ def segment_into_trips(self, timeseries, time_query): # else: sel_point = currPoint logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) - with ect.Timer() as t_set_start_point: - curr_trip_start_point = sel_point - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/set_new_trip_start_point", - time.time(), - t_set_start_point.elapsed - ) + curr_trip_start_point = sel_point just_ended = False else: with ect.Timer() as t_process_trip: @@ -137,48 +109,27 @@ def segment_into_trips(self, timeseries, time_query): max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1 ] lastPoint = self.find_last_valid_point(idx) - with ect.Timer() as t_has_trip_ended: - trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/has_trip_ended", - time.time(), - t_has_trip_ended.elapsed - ) + trip_ended = self.has_trip_ended(lastPoint, currPoint, timeseries) if trip_ended: - with ect.Timer() as t_get_last_trip_end_point: - last_trip_end_point = lastPoint - logging.debug("Appending last_trip_end_point %s with index %s " % - (last_trip_end_point, idx - 1)) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/get_last_trip_end_point", - time.time(), - t_get_last_trip_end_point.elapsed - ) - - with ect.Timer() as t_handle_trip_end: - just_ended = True - # Now, we have finished processing the previous point as a trip - # end or not. But we still need to process this point by seeing - # whether it should represent a new trip start, or a glom to the - # previous trip - if not self.continue_just_ended(idx, currPoint, self.filtered_points_df): - sel_point = currPoint - logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) - curr_trip_start_point = sel_point - just_ended = False - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/handle_trip_end", - time.time(), - t_handle_trip_end.elapsed - ) + last_trip_end_point = lastPoint + logging.debug("Appending last_trip_end_point %s with index %s " % + (last_trip_end_point, idx - 1)) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts + just_ended = True + # Now, we have finished processing the previous point as a trip + # end or not. But we still need to process this point by seeing + # whether it should represent a new trip start, or a glom to the + # previous trip + if not self.continue_just_ended(idx, currPoint, self.filtered_points_df): + sel_point = currPoint + logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) + curr_trip_start_point = sel_point + just_ended = False + esds.store_pipeline_time( user_id, ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/loop", @@ -186,57 +137,44 @@ def segment_into_trips(self, timeseries, time_query): t_loop.elapsed ) - with ect.Timer() as t_post_loop: - # Since we only end a trip when we start a new trip, this means that - # the last trip that was pushed is ignored. Consider the example of - # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm, - # so during that remote push, a trip end was not detected. And we got - # back home shortly after 5pm, so the trip end was only detected on the - # phone at 6pm. At that time, the following points were pushed: - # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50, - # ..., 2016-02-22T16:57:04 - # Then, on the server, while iterating through the points, we detected - # a trip end at 16:04, and a new trip start at 16:49. But we did not - # detect the trip end at 16:57, because we didn't start a new point. - # This has two issues: - # - we won't see this trip until the next trip start, which may be on the next day - # - we won't see this trip at all, because when we run the pipeline the - # next time, we will only look at points from that time onwards. These - # points have been marked as "processed", so they won't even be considered. - - # There are multiple potential fixes: - # - we can mark only the completed trips as processed. This will solve (2) above, but not (1) - # - we can mark a trip end based on the fact that we only push data - # when a trip ends, so if we have data, it means that the trip has been - # detected as ended on the phone. - # This seems a bit fragile - what if we start pushing incomplete trip - # data for efficiency reasons? Therefore, we also check to see if there - # is a trip_end_detected in this timeframe after the last point. If so, - # then we end the trip at the last point that we have. - if not just_ended and len(self.transition_df) > 0: - with ect.Timer() as t_check_transitions: - stopped_moving_after_last = self.transition_df[ - (self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2) - ] - logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]]) - if len(stopped_moving_after_last) > 0: - logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last)) - segmentation_points.append((curr_trip_start_point, currPoint)) - self.last_ts_processed = currPoint.metadata_write_ts - else: - logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/check_transitions_post_loop", - time.time(), - t_check_transitions.elapsed - ) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_dist/post_loop", - time.time(), - t_post_loop.elapsed - ) + + # Since we only end a trip when we start a new trip, this means that + # the last trip that was pushed is ignored. Consider the example of + # 2016-02-22 when I took kids to karate. We arrived shortly after 4pm, + # so during that remote push, a trip end was not detected. And we got + # back home shortly after 5pm, so the trip end was only detected on the + # phone at 6pm. At that time, the following points were pushed: + # ..., 2016-02-22T16:04:02, 2016-02-22T16:49:34, 2016-02-22T16:49:50, + # ..., 2016-02-22T16:57:04 + # Then, on the server, while iterating through the points, we detected + # a trip end at 16:04, and a new trip start at 16:49. But we did not + # detect the trip end at 16:57, because we didn't start a new point. + # This has two issues: + # - we won't see this trip until the next trip start, which may be on the next day + # - we won't see this trip at all, because when we run the pipeline the + # next time, we will only look at points from that time onwards. These + # points have been marked as "processed", so they won't even be considered. + + # There are multiple potential fixes: + # - we can mark only the completed trips as processed. This will solve (2) above, but not (1) + # - we can mark a trip end based on the fact that we only push data + # when a trip ends, so if we have data, it means that the trip has been + # detected as ended on the phone. + # This seems a bit fragile - what if we start pushing incomplete trip + # data for efficiency reasons? Therefore, we also check to see if there + # is a trip_end_detected in this timeframe after the last point. If so, + # then we end the trip at the last point that we have. + if not just_ended and len(self.transition_df) > 0: + stopped_moving_after_last = self.transition_df[ + (self.transition_df.ts > currPoint.ts) & (self.transition_df.transition == 2) + ] + logging.debug("stopped_moving_after_last = %s" % stopped_moving_after_last[["fmt_time", "transition"]]) + if len(stopped_moving_after_last) > 0: + logging.debug("Found %d transitions after last point, ending trip..." % len(stopped_moving_after_last)) + segmentation_points.append((curr_trip_start_point, currPoint)) + self.last_ts_processed = currPoint.metadata_write_ts + else: + logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) return segmentation_points diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index 3febdca20..4cf216e58 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -65,40 +65,17 @@ def segment_into_trips(self, timeseries, time_query): data that they want from the sensor streams in order to determine the segmentation points. """ - with ect.Timer() as t_get_filtered_points_pre: - filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) - user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0] - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_filtered_points_pre_ts_diff_df", - time.time(), - t_get_filtered_points_pre.elapsed - ) - - with ect.Timer() as t_filter_bogus_points: - # Sometimes, we can get bogus points because data.ts and - # metadata.write_ts are off by a lot. If we don't do this, we end up - # appearing to travel back in time - # https://github.com/e-mission/e-mission-server/issues/457 - filtered_points_df = filtered_points_pre_ts_diff_df[ - (filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000 - ] - filtered_points_df.reset_index(inplace=True) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/filter_bogus_points", - time.time(), - t_filter_bogus_points.elapsed - ) - - with ect.Timer() as t_get_transition_df: - transition_df = timeseries.get_data_df("statemachine/transition", time_query) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/get_transition_df", - time.time(), - t_get_transition_df.elapsed - ) + filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) + user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0] + # Sometimes, we can get bogus points because data.ts and + # metadata.write_ts are off by a lot. If we don't do this, we end up + # appearing to travel back in time + # https://github.com/e-mission/e-mission-server/issues/457 + filtered_points_df = filtered_points_pre_ts_diff_df[ + (filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000 + ] + filtered_points_df.reset_index(inplace=True) + transition_df = timeseries.get_data_df("statemachine/transition", time_query) if len(transition_df) > 0: logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) @@ -135,47 +112,40 @@ def segment_into_trips(self, timeseries, time_query): curr_trip_start_point = sel_point just_ended = False - with ect.Timer() as t_calculations: - last5MinsPoints_df = filtered_points_df[np.logical_and( - np.logical_and( - filtered_points_df.ts > currPoint.ts - self.time_threshold, - filtered_points_df.ts < currPoint.ts - ), - filtered_points_df.ts >= curr_trip_start_point.ts - )] - # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. - # Using .iloc just ends up including points after this one. - # So we reset_index upstream and use it here. - # We are going to use the last 8 points for now. - # TODO: Change this back to last 10 points once we normalize phone and this - last10Points_df = filtered_points_df.iloc[ - max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1 - ] - distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) - timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts - last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) - logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) - last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) - logging.debug("last10PointsDistances = %s with length %d, shape %s" % ( - last10PointsDistances.to_numpy(), - len(last10PointsDistances), - last10PointsDistances.shape - )) - - # Fix for https://github.com/e-mission/e-mission-server/issues/348 - last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) - - logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % - (len(last10PointsDistances), len(last5MinsDistances))) - logging.debug("last5MinTimes.max() = %s, time_threshold = %s" % - (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) + last5MinsPoints_df = filtered_points_df[np.logical_and( + np.logical_and( + filtered_points_df.ts > currPoint.ts - self.time_threshold, + filtered_points_df.ts < currPoint.ts + ), + filtered_points_df.ts >= curr_trip_start_point.ts + )] + # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. + # Using .iloc just ends up including points after this one. + # So we reset_index upstream and use it here. + # We are going to use the last 8 points for now. + # TODO: Change this back to last 10 points once we normalize phone and this + last10Points_df = filtered_points_df.iloc[ + max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1 + ] + distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) + timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts + last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) + logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) + last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) + logging.debug("last10PointsDistances = %s with length %d, shape %s" % ( + last10PointsDistances.to_numpy(), + len(last10PointsDistances), + last10PointsDistances.shape + )) + + # Fix for https://github.com/e-mission/e-mission-server/issues/348 + last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) + + logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % + (len(last10PointsDistances), len(last5MinsDistances))) + logging.debug("last5MinTimes.max() = %s, time_threshold = %s" % + (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/calculations_per_iteration", - time.time(), - t_calculations.elapsed - ) with ect.Timer() as t_has_trip_ended: if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): @@ -216,31 +186,24 @@ def segment_into_trips(self, timeseries, time_query): t_loop.elapsed ) - with ect.Timer() as t_post_loop: - logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % - (just_ended, len(transition_df))) - if not just_ended and len(transition_df) > 0: - stopped_moving_after_last = transition_df[ - (transition_df.ts > currPoint.ts) & (transition_df.transition == 2) - ] - logging.debug("looking after %s, found transitions %s" % - (currPoint.ts, stopped_moving_after_last)) - if len(stopped_moving_after_last) > 0: - (unused, last_trip_end_point) = self.get_last_trip_end_point( - filtered_points_df, - last10Points_df, - None - ) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts - esds.store_pipeline_time( - user_id, - ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips_time/post_loop", - time.time(), - t_post_loop.elapsed - ) + logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % + (just_ended, len(transition_df))) + if not just_ended and len(transition_df) > 0: + stopped_moving_after_last = transition_df[ + (transition_df.ts > currPoint.ts) & (transition_df.transition == 2) + ] + logging.debug("looking after %s, found transitions %s" % + (currPoint.ts, stopped_moving_after_last)) + if len(stopped_moving_after_last) > 0: + (unused, last_trip_end_point) = self.get_last_trip_end_point( + filtered_points_df, + last10Points_df, + None + ) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts return segmentation_points diff --git a/emission/analysis/result/user_stat.py b/emission/analysis/result/user_stat.py new file mode 100644 index 000000000..fa1d7ac95 --- /dev/null +++ b/emission/analysis/result/user_stat.py @@ -0,0 +1,91 @@ +# emission/analysis/result/user_stats.py + +import logging +import pymongo +import arrow +from typing import Optional, Dict, Any +import emission.storage.timeseries.abstract_timeseries as esta +import emission.core.wrapper.user as ecwu + +def get_last_call_timestamp(ts: esta.TimeSeries) -> Optional[int]: + """ + Retrieves the last API call timestamp. + + :param ts: The time series object. + :type ts: esta.TimeSeries + :return: The last call timestamp or None if not found. + :rtype: Optional[int] + """ + last_call_ts = ts.get_first_value_for_field( + key='stats/server_api_time', + field='data.ts', + sort_order=pymongo.DESCENDING + ) + logging.debug(f"Last call timestamp: {last_call_ts}") + return None if last_call_ts == -1 else last_call_ts + + +def update_user_profile(user_id: str, data: Dict[str, Any]) -> None: + """ + Updates the user profile with the provided data. + + :param user_id: The UUID of the user. + :type user_id: str + :param data: The data to update in the user profile. + :type data: Dict[str, Any] + :return: None + """ + user = ecwu.User.fromUUID(user_id) + user.update(data) + logging.debug(f"User profile updated with data: {data}") + logging.debug(f"New profile: {user.getProfile()}") + + +def get_and_store_user_stats(user_id: str, trip_key: str) -> None: + """ + Aggregates and stores user statistics into the user profile. + + :param user_id: The UUID of the user. + :type user_id: str + :param trip_key: The key representing the trip data in the time series. + :type trip_key: str + :return: None + """ + try: + logging.info(f"Starting get_and_store_user_stats for user_id: {user_id}, trip_key: {trip_key}") + + ts = esta.TimeSeries.get_time_series(user_id) + start_ts_result = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) + start_ts = None if start_ts_result == -1 else start_ts_result + + end_ts_result = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) + end_ts = None if end_ts_result == -1 else end_ts_result + + total_trips = ts.find_entries_count(key_list=["analysis/confirmed_trip"]) + labeled_trips = ts.find_entries_count( + key_list=["analysis/confirmed_trip"], + extra_query_list=[{'data.user_input': {'$ne': {}}}] + ) + + logging.info(f"Total trips: {total_trips}, Labeled trips: {labeled_trips}") + logging.info(f"user_id type: {type(user_id)}") + + last_call_ts = get_last_call_timestamp(ts) + logging.info(f"Last call timestamp: {last_call_ts}") + + update_data = { + "pipeline_range": { + "start_ts": start_ts, + "end_ts": end_ts + }, + "total_trips": total_trips, + "labeled_trips": labeled_trips, + "last_call_ts": last_call_ts + } + + update_user_profile(user_id, update_data) + + logging.debug("User profile updated successfully.") + + except Exception as e: + logging.error(f"Error in get_and_store_user_stats for user_id {user_id}: {e}") \ No newline at end of file diff --git a/emission/net/api/usercache.py b/emission/net/api/usercache.py index d80cd03df..c16e62acc 100644 --- a/emission/net/api/usercache.py +++ b/emission/net/api/usercache.py @@ -29,6 +29,19 @@ def sync_server_to_phone(uuid): # logging.debug("retrievedData = %s" % retrievedData) return retrievedData +def _remove_dots(entry_doc): + for key in entry_doc: + # print(f"Checking {key=}") + if isinstance(entry_doc[key], dict): + # print(f"Found dict for {key=}, recursing") + _remove_dots(entry_doc[key]) + if '.' in key: + munged_key = key.replace(".", "_") + logging.info(f"Found {key=} with dot, munged to {munged_key=}") + # Get and delete in one swoop + # https://stackoverflow.com/a/11277439 + entry_doc[munged_key] = entry_doc.pop(key, None) + def sync_phone_to_server(uuid, data_from_phone): """ Puts the blob from the phone into the cache @@ -44,6 +57,10 @@ def sync_phone_to_server(uuid, data_from_phone): if "ts" in data["data"] and ecc.isMillisecs(data["data"]["ts"]): data["data"]["ts"] = old_div(float(data["data"]["ts"]), 1000) + + # mongodb/documentDB don't support field names with `.` + # let's convert them all to `_` + _remove_dots(data) # logging.debug("After updating with UUId, we get %s" % data) document = {'$set': data} @@ -51,14 +68,19 @@ def sync_phone_to_server(uuid, data_from_phone): 'metadata.type': data["metadata"]["type"], 'metadata.write_ts': data["metadata"]["write_ts"], 'metadata.key': data["metadata"]["key"]} - result = usercache_db.update_one(update_query, - document, - upsert=True) - logging.debug("Updated result for user = %s, key = %s, write_ts = %s = %s" % - (uuid, data["metadata"]["key"], data["metadata"]["write_ts"], result.raw_result)) + try: + result = usercache_db.update_one(update_query, + document, + upsert=True) + logging.debug("Updated result for user = %s, key = %s, write_ts = %s = %s" % + (uuid, data["metadata"]["key"], data["metadata"]["write_ts"], result.raw_result)) - # I am not sure how to trigger a writer error to test this - # and whether this is the format expected from the server in the rawResult - if 'ok' in result.raw_result and result.raw_result['ok'] != 1.0: - logging.error("In sync_phone_to_server, err = %s" % result.raw_result['writeError']) - raise Exception() + # I am not sure how to trigger a writer error to test this + # and whether this is the format expected from the server in the rawResult + if 'ok' in result.raw_result and result.raw_result['ok'] != 1.0: + logging.error("In sync_phone_to_server, err = %s" % result.raw_result['writeError']) + raise Exception() + except pymongo.errors.PyMongoError as e: + logging.error(f"In sync_phone_to_server, while executing {update_query=} on {document=}") + logging.exception(e) + raise diff --git a/emission/net/ext_service/push/notify_interface_impl/firebase.py b/emission/net/ext_service/push/notify_interface_impl/firebase.py index d0b0671f2..4e6bc1b92 100644 --- a/emission/net/ext_service/push/notify_interface_impl/firebase.py +++ b/emission/net/ext_service/push/notify_interface_impl/firebase.py @@ -91,12 +91,17 @@ def map_existing_fcm_tokens(self, token_map): unmapped_token_list.append(token) return (mapped_token_map, unmapped_token_list) - def retrieve_fcm_tokens(self, token_list, dev): + def retrieve_fcm_tokens(self, push_service, token_list, dev): if len(token_list) == 0: logging.debug("len(token_list) == 0, skipping fcm token mapping to save API call") return [] importedResultList = [] - importHeaders = {"Authorization": "key=%s" % self.server_auth_token, + existing_headers = push_service.requests_session.headers + logging.debug(f"Reading existing headers from current session {existing_headers}") + # Copying over the authorization from existing headers since, as of Dec + # 2024, we cannot use the server API key and must use an OAuth2 token instead + importHeaders = {"Authorization": existing_headers['Authorization'], + "access_token_auth": "true", "Content-Type": "application/json"} for curr_first in range(0, len(token_list), 100): curr_batch = token_list[curr_first:curr_first + 100] @@ -115,7 +120,7 @@ def retrieve_fcm_tokens(self, token_list, dev): print("After appending result of size %s, total size = %s" % (len(importedResult), len(importedResultList))) else: - print(f"Received invalid result for batch starting at = {curr_first}") + print(f"Received invalid response {importResponse} for batch starting at = {curr_first}") return importedResultList def process_fcm_token_result(self, importedResultList): @@ -133,9 +138,9 @@ def process_fcm_token_result(self, importedResultList): (result, i)); return ret_list - def convert_to_fcm_if_necessary(self, token_map, dev): + def convert_to_fcm_if_necessary(self, push_service, token_map, dev): (mapped_token_map, unmapped_token_list) = self.map_existing_fcm_tokens(token_map) - importedResultList = self.retrieve_fcm_tokens(unmapped_token_list, dev) + importedResultList = self.retrieve_fcm_tokens(push_service, unmapped_token_list, dev) newly_mapped_token_list = self.process_fcm_token_result(importedResultList) print("after mapping iOS tokens, imported %s -> processed %s" % (len(importedResultList), len(newly_mapped_token_list))) @@ -152,15 +157,15 @@ def send_visible_notification(self, token_map, title, message, json_data, dev=Fa logging.info("len(token_map) == 0, early return to save api calls") return - # convert tokens if necessary - fcm_token_map = self.convert_to_fcm_if_necessary(token_map, dev) - push_service = FCMNotification( service_account_file=self.service_account_file, project_id=self.project_id) # Send android and iOS messages separately because they have slightly # different formats # https://github.com/e-mission/e-mission-server/issues/564#issuecomment-360720598 + # convert tokens if necessary + fcm_token_map = self.convert_to_fcm_if_necessary(push_service, token_map, dev) + android_response = self.notify_multiple_devices(push_service, fcm_token_map["android"], notification_body = message, @@ -192,7 +197,7 @@ def send_silent_notification(self, token_map, json_data, dev=False): project_id=self.project_id) # convert tokens if necessary - fcm_token_map = self.convert_to_fcm_if_necessary(token_map, dev) + fcm_token_map = self.convert_to_fcm_if_necessary(push_service, token_map, dev) response = {} response["ios"] = self.notify_multiple_devices(push_service, diff --git a/emission/net/ext_service/transit_matching/match_stops.py b/emission/net/ext_service/transit_matching/match_stops.py index afa5d6abd..3e6deb1c3 100644 --- a/emission/net/ext_service/transit_matching/match_stops.py +++ b/emission/net/ext_service/transit_matching/match_stops.py @@ -15,12 +15,12 @@ url = "https://lz4.overpass-api.de/" try: - query_file = open('conf/net/ext_service/overpass_transit_stops_query_template') -except: + with open('conf/net/ext_service/overpass_transit_stops_query_template', 'r', encoding='UTF-8') as query_file: + query_string = "".join(query_file.readlines()) +except FileNotFoundError: print("transit stops query not configured, falling back to default") - query_file = open('conf/net/ext_service/overpass_transit_stops_query_template.sample') - -query_string = "".join(query_file.readlines()) + with open('conf/net/ext_service/overpass_transit_stops_query_template.sample', 'r', encoding='UTF-8') as query_file: + query_string = "".join(query_file.readlines()) RETRY = -1 diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 424d85e32..9fb95c49a 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -13,6 +13,7 @@ from uuid import UUID import time import pymongo +from datetime import datetime import emission.core.get_database as edb import emission.core.timer as ect @@ -39,6 +40,7 @@ import emission.storage.decorations.stats_queries as esds import emission.core.wrapper.user as ecwu +import emission.analysis.result.user_stat as eaurs def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): """ @@ -74,6 +76,12 @@ def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): try: run_intake_pipeline_for_user(uuid, skip_if_no_new_data) + with ect.Timer() as gsr: + logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) + eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") + esds.store_pipeline_time(uuid, 'STORE_USER_STATS', + time.time(), gsr.elapsed) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " @@ -198,23 +206,3 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name, time.time(), crt.elapsed) - with ect.Timer() as gsr: - logging.info("*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: storing user stats " % uuid + "*" * 10) - _get_and_store_range(uuid, "analysis/composite_trip") - - esds.store_pipeline_time(uuid, 'STORE_USER_STATS', - time.time(), gsr.elapsed) - -def _get_and_store_range(user_id, trip_key): - ts = esta.TimeSeries.get_time_series(user_id) - start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = None - end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) - if end_ts == -1: - end_ts = None - - user = ecwu.User(user_id) - user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}}) - logging.debug("After updating, new profiles is %s" % user.getProfile()) diff --git a/emission/tests/analysisTests/intakeTests/TestFilterAccuracy.py b/emission/tests/analysisTests/intakeTests/TestFilterAccuracy.py index 55ae19d90..e51e881a1 100644 --- a/emission/tests/analysisTests/intakeTests/TestFilterAccuracy.py +++ b/emission/tests/analysisTests/intakeTests/TestFilterAccuracy.py @@ -34,15 +34,14 @@ def setUp(self): import emission.core.get_database as edb import uuid - self.analysis_conf_path = \ - etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) + etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) self.testUUID = None def tearDown(self): import emission.core.get_database as edb edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID}) - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() def checkSuccessfulRun(self): pipelineState = edb.get_pipeline_state_db().find_one({"user_id": self.testUUID, diff --git a/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py b/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py index f01cdc042..35808bbcf 100644 --- a/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py +++ b/emission/tests/analysisTests/intakeTests/TestPipelineRealData.py @@ -60,8 +60,7 @@ class TestPipelineRealData(unittest.TestCase): def setUp(self): # Thanks to M&J for the number! np.random.seed(61297777) - self.analysis_conf_path = \ - etc.set_analysis_config("analysis.result.section.key", "analysis/cleaned_section") + etc.set_analysis_config("analysis.result.section.key", "analysis/cleaned_section") logging.info("setUp complete") def tearDown(self): @@ -76,8 +75,7 @@ def tearDown(self): # to determine whether to switch to a new implementation if not hasattr(self, "evaluation") or not self.evaluation: self.clearRelatedDb() - if hasattr(self, "analysis_conf_path"): - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() if hasattr(self, "seed_mode_path"): os.remove(self.seed_mode_path) logging.info("tearDown complete") @@ -744,8 +742,7 @@ def testJackUntrackedTimeMar12(self): def testJackUntrackedTimeMar12InferredSections(self): # Setup to use the inferred sections - self.analysis_conf_path = \ - etc.set_analysis_config("analysis.result.section.key", "analysis/inferred_section") + etc.set_analysis_config("analysis.result.section.key", "analysis/inferred_section") # along with the proper random seed self.seed_mode_path = etc.copy_dummy_seed_for_inference() dataFile = "emission/tests/data/real_examples/jack_untracked_time_2023-03-12" diff --git a/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py b/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py index df37a3c77..4162962fd 100644 --- a/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py +++ b/emission/tests/analysisTests/intakeTests/TestSectionSegmentation.py @@ -41,8 +41,7 @@ class TestSectionSegmentation(unittest.TestCase): def setUp(self): - self.analysis_conf_path = \ - etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) + etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27") self.androidUUID = self.testUUID @@ -58,8 +57,7 @@ def setUp(self): def tearDown(self): if not hasattr(self, "evaluation") or not self.evaluation: self.clearRelatedDb() - if hasattr(self, "analysis_conf_path"): - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() def clearRelatedDb(self): edb.get_timeseries_db().delete_many({"user_id": self.androidUUID}) diff --git a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py index 0cc469fea..37af7bea6 100644 --- a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py +++ b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py @@ -36,8 +36,7 @@ class TestTripSegmentation(unittest.TestCase): def setUp(self): - self.analysis_conf_path = \ - etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) + etc.set_analysis_config("intake.cleaning.filter_accuracy.enable", True) etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27") self.androidUUID = self.testUUID @@ -51,7 +50,7 @@ def setUp(self): logging.debug("androidUUID = %s, iosUUID = %s" % (self.androidUUID, self.iosUUID)) def tearDown(self): - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() edb.get_timeseries_db().delete_many({"user_id": self.androidUUID}) edb.get_timeseries_db().delete_many({"user_id": self.iosUUID}) edb.get_pipeline_state_db().delete_many({"user_id": self.androidUUID}) diff --git a/emission/tests/analysisTests/intakeTests/TestUserStat.py b/emission/tests/analysisTests/intakeTests/TestUserStat.py new file mode 100644 index 000000000..207aa0a98 --- /dev/null +++ b/emission/tests/analysisTests/intakeTests/TestUserStat.py @@ -0,0 +1,121 @@ +from __future__ import unicode_literals, print_function, division, absolute_import +import unittest +import uuid +import logging +import json +import os +import time +import pandas as pd + +from builtins import * +from future import standard_library +standard_library.install_aliases() + +# Standard imports +import emission.storage.json_wrappers as esj + +# Our imports +import emission.core.get_database as edb +import emission.storage.timeseries.timequery as estt +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.wrapper.user as ecwu +import emission.net.api.stats as enac +import emission.pipeline.intake_stage as epi + +# Test imports +import emission.tests.common as etc + + +class TestUserStats(unittest.TestCase): + def setUp(self): + """ + Set up the test environment by loading real example data for both Android and users. + """ + # Set up the real example data with entries + self.testUUID = uuid.uuid4() + with open("emission/tests/data/real_examples/shankari_2015-aug-21") as fp: + self.entries = json.load(fp, object_hook = esj.wrapped_object_hook) + # Retrieve the user profile + etc.setupRealExampleWithEntries(self) + profile = edb.get_profile_db().find_one({"user_id": self.testUUID}) + if profile is None: + # Initialize the profile if it does not exist + edb.get_profile_db().insert_one({"user_id": self.testUUID}) + + #etc.runIntakePipeline(self.testUUID) + etc.runIntakePipeline(self.testUUID) + logging.debug("UUID = %s" % (self.testUUID)) + + def tearDown(self): + """ + Clean up the test environment by removing analysis configuration and deleting test data from databases. + """ + + edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) + edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID}) + edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID}) + edb.get_profile_db().delete_one({"user_id": self.testUUID}) + + def testGetAndStoreUserStats(self): + """ + Test get_and_store_user_stats for the user to ensure that user statistics + are correctly aggregated and stored in the user profile. + """ + + # Retrieve the updated user profile from the database + profile = edb.get_profile_db().find_one({"user_id": self.testUUID}) + + # Ensure that the profile exists + self.assertIsNotNone(profile, "User profile should exist after storing stats.") + + # Verify that the expected fields are present + self.assertIn("total_trips", profile, "User profile should contain 'total_trips'.") + self.assertIn("labeled_trips", profile, "User profile should contain 'labeled_trips'.") + self.assertIn("pipeline_range", profile, "User profile should contain 'pipeline_range'.") + self.assertIn("last_call_ts", profile, "User profile should contain 'last_call_ts'.") + + expected_total_trips = 5 + expected_labeled_trips = 0 + + self.assertEqual(profile["total_trips"], expected_total_trips, + f"Expected total_trips to be {expected_total_trips}, got {profile['total_trips']}") + self.assertEqual(profile["labeled_trips"], expected_labeled_trips, + f"Expected labeled_trips to be {expected_labeled_trips}, got {profile['labeled_trips']}") + + # Verify pipeline range + pipeline_range = profile.get("pipeline_range", {}) + self.assertIn("start_ts", pipeline_range, "Pipeline range should contain 'start_ts'.") + self.assertIn("end_ts", pipeline_range, "Pipeline range should contain 'end_ts'.") + + expected_start_ts = 1440168891.095 + expected_end_ts = 1440209488.817 + + self.assertEqual(pipeline_range["start_ts"], expected_start_ts, + f"Expected start_ts to be {expected_start_ts}, got {pipeline_range['start_ts']}") + self.assertEqual(pipeline_range["end_ts"], expected_end_ts, + f"Expected end_ts to be {expected_end_ts}, got {pipeline_range['end_ts']}") + + def testLastCall(self): + # Call the function with all required arguments + test_call_ts = time.time() + enac.store_server_api_time(self.testUUID, "test_call_ts", test_call_ts, 69420) + etc.runIntakePipeline(self.testUUID) + + # Retrieve the profile from the database + profile = edb.get_profile_db().find_one({"user_id": self.testUUID}) + + # Verify that last_call_ts is updated correctly + expected_last_call_ts = test_call_ts + actual_last_call_ts = profile.get("last_call_ts") + + self.assertEqual( + actual_last_call_ts, + expected_last_call_ts, + f"Expected last_call_ts to be {expected_last_call_ts}, got {actual_last_call_ts}" + ) + +if __name__ == '__main__': + # Configure logging for the test + etc.configLogging() + unittest.main() diff --git a/emission/tests/common.py b/emission/tests/common.py index baae6053c..5c3ea8ca0 100644 --- a/emission/tests/common.py +++ b/emission/tests/common.py @@ -107,7 +107,7 @@ def getRealExampleEmail(testObj): def fillExistingUUID(testObj): userObj = ecwu.User.fromEmail(getRealExampleEmail(testObj)) print("Setting testUUID to %s" % userObj.uuid) - testObj.testUUID = userObj.uuir + testObj.testUUID = userObj.uuid def getRegEmailIfPresent(testObj): if hasattr(testObj, "evaluation") and testObj.evaluation: @@ -193,6 +193,7 @@ def runIntakePipeline(uuid): import emission.analysis.userinput.expectations as eaue import emission.analysis.classification.inference.labels.pipeline as eacilp import emission.analysis.plotting.composite_trip_creation as eapcc + import emission.analysis.result.user_stat as eaurs eaum.match_incoming_user_inputs(uuid) eaicf.filter_accuracy(uuid) @@ -205,6 +206,8 @@ def runIntakePipeline(uuid): eaue.populate_expectations(uuid) eaum.create_confirmed_objects(uuid) eapcc.create_composite_objects(uuid) + eaurs.get_and_store_user_stats(uuid, "analysis/composite_trip") + def configLogging(): """ @@ -263,25 +266,29 @@ def createDummyRequestEnviron(self, addl_headers, request_body): return test_environ def set_analysis_config(key, value): + """ + Tests that call this in their setUp must call clear_analysis_config in their tearDown + """ import emission.analysis.config as eac import shutil - analysis_conf_path = "conf/analysis/debug.conf.json" - shutil.copyfile("conf/analysis/debug.conf.dev.json", - analysis_conf_path) - with open(analysis_conf_path) as fd: + shutil.copyfile(eac.ANALYSIS_CONF_DEV_PATH, eac.ANALYSIS_CONF_PATH) + with open(eac.ANALYSIS_CONF_PATH) as fd: curr_config = json.load(fd) curr_config[key] = value - with open(analysis_conf_path, "w") as fd: + with open(eac.ANALYSIS_CONF_PATH, "w") as fd: json.dump(curr_config, fd, indent=4) - logging.debug("Finished setting up %s" % analysis_conf_path) - with open(analysis_conf_path) as fd: + logging.debug("Finished setting up %s" % eac.ANALYSIS_CONF_PATH) + with open(eac.ANALYSIS_CONF_PATH) as fd: logging.debug("Current values are %s" % json.load(fd)) eac.reload_config() - - # Return this so that we can delete it in the teardown - return analysis_conf_path + +def clear_analysis_config(): + import emission.analysis.config as eac + if os.path.exists(eac.ANALYSIS_CONF_PATH): + os.remove(eac.ANALYSIS_CONF_PATH) + eac.reload_config() def copy_dummy_seed_for_inference(): import shutil diff --git a/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py b/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py index 3d024be06..3f504af8e 100644 --- a/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py +++ b/emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py @@ -14,6 +14,7 @@ import uuid import attrdict as ad import time +import copy import geojson as gj # This change should be removed in the next server update, by which time hopefully the new geojson version will incorporate the long-term fix for their default precision # See - jazzband/geojson#177 @@ -273,6 +274,77 @@ def testTwoLongTermCalls(self): self.assertEqual(edb.get_timeseries_db().estimated_document_count(), 120) self.assertEqual(edb.get_timeseries_error_db().estimated_document_count(), 0) + def testRemoteDots(self): + test_template = {"ts":1735934360.256, + "client_app_version":"1.9.6", + "name":"open_notification", + "client_os_version":"15.5", + "reading":{ + "additionalData":{ + "google.c.sender.id":"FAKE_SENDER_ID", + "coldstart":False, + "notId":"1122334455667788", + "payload":1122334455667788, + "content-available":1, + "foreground":False, + "google.c.fid":"FAKE_FID", + "gcm.message_id":"FAKE_MESSAGE_ID"}}} + test_1 = copy.copy(test_template) + self.assertEqual(len(test_1["reading"]["additionalData"]), 8) + self.assertIn("google.c.sender.id", + test_1["reading"]["additionalData"]) + self.assertIn("google.c.fid", + test_1["reading"]["additionalData"]) + self.assertIn("gcm.message_id", + test_1["reading"]["additionalData"]) + mauc._remove_dots(test_1) + self.assertEqual(len(test_1["reading"]["additionalData"]), 8) + self.assertIn("google_c_sender_id", + test_1["reading"]["additionalData"]) + self.assertIn("google_c_fid", + test_1["reading"]["additionalData"]) + self.assertIn("gcm_message_id", + test_1["reading"]["additionalData"]) + self.assertNotIn("google.c.sender.id", + test_1["reading"]["additionalData"]) + self.assertNotIn("google.c.fid", + test_1["reading"]["additionalData"]) + self.assertNotIn("gcm.message_id", + test_1["reading"]["additionalData"]) + + metadata_template = {'plugin': 'none', + 'write_ts': self.curr_ts - 25, + 'time_zone': u'America/Los_Angeles', + 'platform': u'ios', + 'key': u'stats/client_time', + 'read_ts': self.curr_ts - 27, + 'type': u'message'} + + # there are 30 entries in the setup function + self.assertEqual(len(self.uc1.getMessage()), 30) + + three_entries_with_dots = [] + for i in range(3): + curr_md = copy.copy(metadata_template) + curr_md['write_ts'] = self.curr_ts - 25 + i + three_entries_with_dots.append({ + 'user_id': self.testUserUUID1, + 'data': copy.copy(test_template), + 'metadata': curr_md}) + + print(f"AFTER {[e.get('metadata', None) for e in three_entries_with_dots]}") + + mauc.sync_phone_to_server(self.testUserUUID1, three_entries_with_dots) + # we have munged, so these new entries should also be saved + # and we should have 33 entries in the usercache + self.assertEqual(len(self.uc1.getMessage()), 33) + self.assertEqual(len(list(self.ts1.find_entries())), 0) + enuah.UserCacheHandler.getUserCacheHandler(self.testUserUUID1).moveToLongTerm() + # since they were munged before saving into the usercache, + # there should be no errors while copying to the timeseries + self.assertEqual(len(self.uc1.getMessage()), 0) + self.assertEqual(len(list(self.ts1.find_entries())), 33) + if __name__ == '__main__': import emission.tests.common as etc diff --git a/emission/tests/netTests/TestMetricsCleanedSections.py b/emission/tests/netTests/TestMetricsCleanedSections.py index 24eac37a6..66f4f08e6 100644 --- a/emission/tests/netTests/TestMetricsCleanedSections.py +++ b/emission/tests/netTests/TestMetricsCleanedSections.py @@ -23,8 +23,7 @@ class TestMetrics(unittest.TestCase): def setUp(self): - self.analysis_conf_path = \ - etc.set_analysis_config("analysis.result.section.key", "analysis/cleaned_section") + etc.set_analysis_config("analysis.result.section.key", "analysis/cleaned_section") etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-21") self.testUUID1 = self.testUUID @@ -41,7 +40,7 @@ def setUp(self): def tearDown(self): self.clearRelatedDb() - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() def clearRelatedDb(self): edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) diff --git a/emission/tests/netTests/TestMetricsConfirmedTrips.py b/emission/tests/netTests/TestMetricsConfirmedTrips.py index 27a91dda0..444c5ed60 100644 --- a/emission/tests/netTests/TestMetricsConfirmedTrips.py +++ b/emission/tests/netTests/TestMetricsConfirmedTrips.py @@ -17,8 +17,7 @@ class TestMetrics(unittest.TestCase): def setUp(self): - self.analysis_conf_path = \ - etc.set_analysis_config("analysis.result.section.key", "analysis/confirmed_trip") + etc.set_analysis_config("analysis.result.section.key", "analysis/confirmed_trip") self._loadDataFileAndInputs("emission/tests/data/real_examples/shankari_2016-06-20") self.testUUID1 = self.testUUID self._loadDataFileAndInputs("emission/tests/data/real_examples/shankari_2016-06-21") @@ -39,7 +38,7 @@ def _loadDataFileAndInputs(self, dataFile): def tearDown(self): self.clearRelatedDb() - os.remove(self.analysis_conf_path) + etc.clear_analysis_config() def clearRelatedDb(self): edb.get_timeseries_db().delete_many({"user_id": self.testUUID1}) diff --git a/emission/tests/netTests/TestPipeline.py b/emission/tests/netTests/TestPipeline.py index 54f81ba8e..d404ac6e2 100644 --- a/emission/tests/netTests/TestPipeline.py +++ b/emission/tests/netTests/TestPipeline.py @@ -7,6 +7,7 @@ import emission.core.wrapper.localdate as ecwl import emission.tests.common as etc import emission.pipeline.intake_stage as epi +import emission.analysis.result.user_stat as eaurs from emission.net.api import pipeline @@ -38,6 +39,7 @@ def testNoAnalysisResults(self): def testAnalysisResults(self): self.assertEqual(pipeline.get_range(self.testUUID), (None, None)) epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False) + eaurs.get_and_store_user_stats(self.testUUID, "analysis/composite_trip") pr = pipeline.get_range(self.testUUID) self.assertAlmostEqual(pr[0], 1440688739.672) self.assertAlmostEqual(pr[1], 1440729142.709)