diff --git a/emission/analysis/intake/segmentation/restart_checking.py b/emission/analysis/intake/segmentation/restart_checking.py index 8ecc25e98..10729174e 100644 --- a/emission/analysis/intake/segmentation/restart_checking.py +++ b/emission/analysis/intake/segmentation/restart_checking.py @@ -9,7 +9,7 @@ import emission.core.wrapper.transition as ecwt import emission.storage.timeseries.timequery as estt -def is_tracking_restarted_in_range(start_ts, end_ts, timeseries): +def is_tracking_restarted_in_range(start_ts, end_ts, transition_df): """ Check to see if tracing was restarted between the times specified :param start_ts: the start of the time range to check @@ -17,28 +17,32 @@ def is_tracking_restarted_in_range(start_ts, end_ts, timeseries): :param timeseries: the timeseries to use for checking :return: """ - import emission.storage.timeseries.timequery as estt + transition_df_start_idx=transition_df.ts.searchsorted(start_ts,side='left') + transition_df_end_idx=transition_df.ts.searchsorted(end_ts,side='right') + transition_df_for_current=transition_df.iloc[transition_df_start_idx:transition_df_end_idx] - tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts, - endTs=end_ts) - transition_df = timeseries.get_data_df("statemachine/transition", tq) - if len(transition_df) == 0: + if len(transition_df_for_current) == 0: logging.debug("In range %s -> %s found no transitions" % - (tq.startTs, tq.endTs)) + (start_ts, end_ts)) return False logging.debug("In range %s -> %s found transitions %s" % - (tq.startTs, tq.endTs, transition_df[["fmt_time", "curr_state", "transition"]])) - return _is_tracking_restarted_android(transition_df) or \ - _is_tracking_restarted_ios(transition_df) + (start_ts, end_ts, transition_df_for_current[["fmt_time", "curr_state", "transition"]])) + return _is_tracking_restarted_android(transition_df_for_current) or \ + _is_tracking_restarted_ios(transition_df_for_current) -def get_ongoing_motion_in_range(start_ts, end_ts, timeseries): - tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts, - endTs = end_ts) - motion_list = list(timeseries.find_entries(["background/motion_activity"], tq)) +def get_ongoing_motion_in_range(start_ts, end_ts, motion_df): + ## in case when we receive an empty dataframe, there's nothing to + ## process + if motion_df.shape == (0,0): + return motion_df + + motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left') + motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right') + filtered_motion_df=motion_df.iloc[motion_df_start_idx:motion_df_end_idx] logging.debug("Found %s motion_activity entries in range %s -> %s" % - (len(motion_list), tq.startTs, tq.endTs)) - logging.debug("sample activities are %s" % motion_list[0:5]) - return motion_list + (len(filtered_motion_df),start_ts, end_ts)) + #logging.debug("sample activities are %s" % filtered_motion_df.head()) + return filtered_motion_df def _is_tracking_restarted_android(transition_df): """ diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index d6828af77..07fd7a941 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -65,6 +65,12 @@ def segment_current_trips(user_id): # We need to use the appropriate filter based on the incoming data # So let's read in the location points for the specified query loc_df = ts.get_data_df("background/filtered_location", time_query) + transition_df = ts.get_data_df("statemachine/transition", time_query) + motion_df = ts.get_data_df("background/motion_activity",time_query) + if len(transition_df) > 0: + logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) + else: + logging.debug("no transitions found. This can happen for continuous sensing") if len(loc_df) == 0: # no new segments, no need to keep looking at these again logging.debug("len(loc_df) == 0, early return") @@ -89,12 +95,12 @@ def segment_current_trips(user_id): if len(filters_in_df) == 1: # Common case - let's make it easy - segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, + segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(transition_df,motion_df,ts, time_query) else: segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, - filter_methods) + filter_methods,transition_df,motion_df) # Create and store trips and places based on the segmentation points if segmentation_points is None: epq.mark_segmentation_failed(user_id) @@ -104,13 +110,13 @@ def segment_current_trips(user_id): epq.mark_segmentation_done(user_id, None) else: try: - create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) + create_places_and_trips(user_id, transition_df, segmentation_points, filter_method_names[filters_in_df[0]]) epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) except: logging.exception("Trip generation failed for user %s" % user_id) epq.mark_segmentation_failed(user_id) -def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods): +def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods,transition_df, motion_df): """ We can have mixed filters in a particular time range for multiple reasons. a) user switches phones from one platform to another @@ -149,7 +155,7 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt time_query.endTs = loc_df.iloc[endIndex+1].ts logging.debug("for filter %s, startTs = %d and endTs = %d" % (curr_filter, time_query.startTs, time_query.endTs)) - segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query) + segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(transition_df,motion_df,ts, time_query) logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys())) sortedStartTsList = sorted(segmentation_map.keys()) segmentation_points = [] @@ -171,7 +177,7 @@ def get_last_ts_processed(filter_methods): logging.info("Returning last_ts_processed = %s" % last_ts_processed) return last_ts_processed -def create_places_and_trips(user_id, segmentation_points, segmentation_method_name): +def create_places_and_trips(user_id,transition_df, segmentation_points, segmentation_method_name): # new segments, need to deal with them # First, retrieve the last place so that we can stitch it to the newly created trip. # Again, there are easy and hard. In the easy case, the trip was @@ -214,7 +220,7 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na new_place_entry = ecwe.Entry.create_entry(user_id, "segmentation/raw_place", new_place, create_id = True) - if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): + if found_untracked_period(transition_df, last_place_entry.data, start_loc, segmentation_method_name): # Fill in the gap in the chain with an untracked period curr_untracked = ecwut.Untrackedtime() curr_untracked.source = segmentation_method_name @@ -254,7 +260,7 @@ def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start # it will be lost ts.update(last_place_entry) -def found_untracked_period(timeseries, last_place, start_loc, segmentation_method_name): +def found_untracked_period(transition_df, last_place, start_loc, segmentation_method_name): """ Check to see whether the two places are the same. This is a fix for https://github.com/e-mission/e-mission-server/issues/378 @@ -270,7 +276,7 @@ def found_untracked_period(timeseries, last_place, start_loc, segmentation_metho logging.debug("start of a chain, unable to check for restart from previous trip end, assuming not restarted") return False - if _is_tracking_restarted(last_place, start_loc, timeseries): + if _is_tracking_restarted(last_place, start_loc, transition_df): logging.debug("tracking has been restarted, returning True") return True @@ -378,6 +384,6 @@ def stitch_together_end(new_place_entry, curr_trip_entry, end_loc): new_place_entry["data"] = new_place curr_trip_entry["data"] = curr_trip -def _is_tracking_restarted(last_place, start_loc, timeseries): - return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, timeseries) +def _is_tracking_restarted(last_place, start_loc, transition_df): + return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, transition_df) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index ea53c9abb..3c0a8eaae 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -38,7 +38,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold): self.point_threshold = point_threshold self.distance_threshold = distance_threshold - def segment_into_trips(self, timeseries, time_query): + def segment_into_trips(self, transition_df, motion_df, timeseries, time_query): """ Examines the timeseries database for a specific range and returns the segmentation points. Note that the input is the entire timeseries and @@ -48,7 +48,8 @@ def segment_into_trips(self, timeseries, time_query): """ self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query) self.filtered_points_df.loc[:,"valid"] = True - self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) + self.transition_df = transition_df + self.motion_df =motion_df if len(self.transition_df) > 0: logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) else: @@ -173,14 +174,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries): # for this kind of test speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2))) - if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries): + if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, self.transition_df): logging.debug("tracking was restarted, ending trip") return True # In general, we get multiple locations between each motion activity. If we see a bunch of motion activities # between two location points, and there is a large gap between the last location and the first # motion activity as well, let us just assume that there was a restart - ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries) + ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, self.motion_df) ongoing_motion_check = len(ongoing_motion_in_range) > 0 if timeDelta > self.time_threshold and not ongoing_motion_check: logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" % diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index d75760cd9..d419b71cb 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -53,7 +53,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold): self.point_threshold = point_threshold self.distance_threshold = distance_threshold - def segment_into_trips(self, timeseries, time_query): + def segment_into_trips(self, transition_df,motion_df, timeseries, time_query): """ Examines the timeseries database for a specific range and returns the segmentation points. Note that the input is the entire timeseries and @@ -130,7 +130,7 @@ def segment_into_trips(self, timeseries, time_query): logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" % (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) - if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): + if self.has_trip_ended(prevPoint, currPoint, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df, motion_df): (ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df, last10Points_df, last5MinsPoints_df) segmentation_points.append((curr_trip_start_point, last_trip_end_point)) @@ -199,7 +199,7 @@ def continue_just_ended(self, idx, currPoint, filtered_points_df): else: return False - def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): + def has_trip_ended(self, prev_point, curr_point, last10PointsDistances, last5MinsDistances, last5MinTimes, transition_df, motion_df): # Another mismatch between phone and server. Phone stops tracking too soon, # so the distance is still greater than the threshold at the end of the trip. # But then the next point is a long time away, so we can split again (similar to a distance filter) @@ -214,11 +214,11 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc speedDelta = np.nan speedThreshold = old_div(float(self.distance_threshold), self.time_threshold) - if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries): + if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, transition_df): logging.debug("tracking was restarted, ending trip") return True - ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0 + ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, motion_df)) > 0 if timeDelta > 2 * self.time_threshold and not ongoing_motion_check: logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" % (prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check)) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py index c329fe69d..d118868b0 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py @@ -23,9 +23,10 @@ def is_huge_invalid_ts_offset(filterMethod, lastPoint, currPoint, timeseries, ecwm.MotionTypes.NONE.value, ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE.value] - non_still_motions = [ma for ma in motionInRange if ma["data"]["type"] not in ignore_modes_list and ma["data"]["confidence"] == 100] - logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) - + non_still_motions=motionInRange[~motionInRange['type'].isin(ignore_modes_list) & (motionInRange['confidence'] ==100)] + #logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) + logging.debug("non_still_motions = %s" %[(ecwm.MotionTypes(row['type']),row['confidence'],row['fmt_time']) for index,row in non_still_motions.iterrows()]) + non_still_motions_rate = len(non_still_motions) / (currPoint.ts - lastPoint.ts) logging.debug("in is_huge_invalid_ts_offset: len(intermediate_transitions) = %d, non_still_motions = %d, time_diff = %s mins, non_still_motions_rate = %s" % (len(intermediate_transitions), len(non_still_motions), (currPoint.ts - lastPoint.ts)/60, non_still_motions_rate)) diff --git a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py index 0cc469fea..575fac606 100644 --- a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py +++ b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py @@ -68,10 +68,12 @@ def testEmptyCall(self): def testSegmentationPointsDwellSegmentationTimeFilter(self): ts = esta.TimeSeries.get_time_series(self.androidUUID) tq = estt.TimeQuery("metadata.write_ts", 1440658800, 1440745200) + transition_df = ts.get_data_df("statemachine/transition", tq) + motion_df = ts.get_data_df("background/motion_activity",tq) dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins point_threshold = 10, distance_threshold = 100) # 100 m - segmentation_points = dstfsm.segment_into_trips(ts, tq) + segmentation_points = dstfsm.segment_into_trips(transition_df, motion_df, ts, tq) for (start, end) in segmentation_points: logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts)) self.assertIsNotNone(segmentation_points) @@ -86,10 +88,12 @@ def testSegmentationPointsDwellSegmentationTimeFilter(self): def testSegmentationPointsDwellSegmentationDistFilter(self): ts = esta.TimeSeries.get_time_series(self.iosUUID) tq = estt.TimeQuery("metadata.write_ts", 1446796800, 1446847600) + transition_df = ts.get_data_df("statemachine/transition", tq) + motion_df = ts.get_data_df("background/motion_activity",tq) dstdsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 5 mins point_threshold = 10, distance_threshold = 100) # 100 m - segmentation_points = dstdsm.segment_into_trips(ts, tq) + segmentation_points = dstdsm.segment_into_trips(transition_df, motion_df, ts, tq) for (start, end) in segmentation_points: logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts)) self.assertIsNotNone(segmentation_points)