From fe4c33b657f6507ae5ad5036823a35158cecc3f1 Mon Sep 17 00:00:00 2001 From: $aTyam Date: Tue, 6 Feb 2024 09:39:57 -0500 Subject: [PATCH] Revert "Improving Trip Segmentation by reducing DB calls" This reverts commit d95b2a01f4e86df7e110c60a24610bd71a1e2a86. --- .../intake/segmentation/restart_checking.py | 37 +++++++++---------- .../intake/segmentation/trip_segmentation.py | 29 ++++++--------- .../dwell_segmentation_dist_filter.py | 12 +++--- .../dwell_segmentation_time_filter.py | 33 ++++++++--------- .../trip_end_detection_corner_cases.py | 4 +- emission/core/common.py | 16 +------- .../intakeTests/TestTripSegmentation.py | 8 +--- 7 files changed, 54 insertions(+), 85 deletions(-) diff --git a/emission/analysis/intake/segmentation/restart_checking.py b/emission/analysis/intake/segmentation/restart_checking.py index 2a0b484d8..8ecc25e98 100644 --- a/emission/analysis/intake/segmentation/restart_checking.py +++ b/emission/analysis/intake/segmentation/restart_checking.py @@ -9,7 +9,7 @@ import emission.core.wrapper.transition as ecwt import emission.storage.timeseries.timequery as estt -def is_tracking_restarted_in_range(start_ts, end_ts, transition_df): +def is_tracking_restarted_in_range(start_ts, end_ts, timeseries): """ Check to see if tracing was restarted between the times specified :param start_ts: the start of the time range to check @@ -17,31 +17,28 @@ def is_tracking_restarted_in_range(start_ts, end_ts, transition_df): :param timeseries: the timeseries to use for checking :return: """ - transition_df_start_idx=transition_df.ts.searchsorted(start_ts,side='left') - transition_df_end_idx=transition_df.ts.searchsorted(end_ts,side='right') - transition_df_for_current=transition_df.iloc[transition_df_start_idx:transition_df_end_idx] + import emission.storage.timeseries.timequery as estt - if len(transition_df_for_current) == 0: + tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts, + endTs=end_ts) + transition_df = timeseries.get_data_df("statemachine/transition", tq) + if len(transition_df) == 0: logging.debug("In range %s -> %s found no transitions" % - (start_ts, end_ts)) + (tq.startTs, tq.endTs)) return False logging.debug("In range %s -> %s found transitions %s" % - (start_ts, end_ts, transition_df_for_current[["fmt_time", "curr_state", "transition"]])) - return _is_tracking_restarted_android(transition_df_for_current) or \ - _is_tracking_restarted_ios(transition_df_for_current) + (tq.startTs, tq.endTs, transition_df[["fmt_time", "curr_state", "transition"]])) + return _is_tracking_restarted_android(transition_df) or \ + _is_tracking_restarted_ios(transition_df) -def get_ongoing_motion_in_range(start_ts, end_ts, motion_df): - ## in case when we receive an empty dataframe, there's nothing to - ## process - if motion_df.shape == (0,0): - return motion_df - - motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left') - motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right') - filtered_motion_df=motion_df.iloc[motion_df_start_idx:motion_df_end_idx] +def get_ongoing_motion_in_range(start_ts, end_ts, timeseries): + tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts, + endTs = end_ts) + motion_list = list(timeseries.find_entries(["background/motion_activity"], tq)) logging.debug("Found %s motion_activity entries in range %s -> %s" % - (len(filtered_motion_df),start_ts, end_ts)) - return filtered_motion_df + (len(motion_list), tq.startTs, tq.endTs)) + logging.debug("sample activities are %s" % motion_list[0:5]) + return motion_list def _is_tracking_restarted_android(transition_df): """ diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index 62856c3a5..d6828af77 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -7,7 +7,6 @@ from builtins import * from builtins import object import logging -import pandas as pd import emission.storage.timeseries.abstract_timeseries as esta import emission.storage.decorations.place_queries as esdp @@ -66,12 +65,6 @@ def segment_current_trips(user_id): # We need to use the appropriate filter based on the incoming data # So let's read in the location points for the specified query loc_df = ts.get_data_df("background/filtered_location", time_query) - transition_df = ts.get_data_df("statemachine/transition", time_query) - motion_df = ts.get_data_df("background/motion_activity",time_query) - if len(transition_df) > 0: - logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) - else: - logging.debug("no transitions found. This can happen for continuous sensing") if len(loc_df) == 0: # no new segments, no need to keep looking at these again logging.debug("len(loc_df) == 0, early return") @@ -96,12 +89,12 @@ def segment_current_trips(user_id): if len(filters_in_df) == 1: # Common case - let's make it easy - segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(transition_df,motion_df,ts, + segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts, time_query) else: segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, - filter_methods,transition_df,motion_df) + filter_methods) # Create and store trips and places based on the segmentation points if segmentation_points is None: epq.mark_segmentation_failed(user_id) @@ -111,13 +104,13 @@ def segment_current_trips(user_id): epq.mark_segmentation_done(user_id, None) else: try: - create_places_and_trips(user_id, transition_df, segmentation_points, filter_method_names[filters_in_df[0]]) + create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]]) epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods)) except: logging.exception("Trip generation failed for user %s" % user_id) epq.mark_segmentation_failed(user_id) -def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods, transition_df, motion_df): +def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods): """ We can have mixed filters in a particular time range for multiple reasons. a) user switches phones from one platform to another @@ -156,7 +149,7 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt time_query.endTs = loc_df.iloc[endIndex+1].ts logging.debug("for filter %s, startTs = %d and endTs = %d" % (curr_filter, time_query.startTs, time_query.endTs)) - segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(transition_df,motion_df,ts, time_query) + segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query) logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys())) sortedStartTsList = sorted(segmentation_map.keys()) segmentation_points = [] @@ -178,7 +171,7 @@ def get_last_ts_processed(filter_methods): logging.info("Returning last_ts_processed = %s" % last_ts_processed) return last_ts_processed -def create_places_and_trips(user_id, transition_df, segmentation_points, segmentation_method_name): +def create_places_and_trips(user_id, segmentation_points, segmentation_method_name): # new segments, need to deal with them # First, retrieve the last place so that we can stitch it to the newly created trip. # Again, there are easy and hard. In the easy case, the trip was @@ -221,7 +214,7 @@ def create_places_and_trips(user_id, transition_df, segmentation_points, segment new_place_entry = ecwe.Entry.create_entry(user_id, "segmentation/raw_place", new_place, create_id = True) - if found_untracked_period(transition_df, last_place_entry.data, start_loc, segmentation_method_name): + if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name): # Fill in the gap in the chain with an untracked period curr_untracked = ecwut.Untrackedtime() curr_untracked.source = segmentation_method_name @@ -261,7 +254,7 @@ def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start # it will be lost ts.update(last_place_entry) -def found_untracked_period(transition_df, last_place, start_loc, segmentation_method_name): +def found_untracked_period(timeseries, last_place, start_loc, segmentation_method_name): """ Check to see whether the two places are the same. This is a fix for https://github.com/e-mission/e-mission-server/issues/378 @@ -277,7 +270,7 @@ def found_untracked_period(transition_df, last_place, start_loc, segmentation_me logging.debug("start of a chain, unable to check for restart from previous trip end, assuming not restarted") return False - if _is_tracking_restarted(last_place, start_loc, transition_df): + if _is_tracking_restarted(last_place, start_loc, timeseries): logging.debug("tracking has been restarted, returning True") return True @@ -385,6 +378,6 @@ def stitch_together_end(new_place_entry, curr_trip_entry, end_loc): new_place_entry["data"] = new_place curr_trip_entry["data"] = curr_trip -def _is_tracking_restarted(last_place, start_loc, transition_df): - return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, transition_df) +def _is_tracking_restarted(last_place, start_loc, timeseries): + return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, timeseries) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index 04f8f5c10..ea53c9abb 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -38,7 +38,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold): self.point_threshold = point_threshold self.distance_threshold = distance_threshold - def segment_into_trips(self, transition_df, motion_df, timeseries, time_query): + def segment_into_trips(self, timeseries, time_query): """ Examines the timeseries database for a specific range and returns the segmentation points. Note that the input is the entire timeseries and @@ -48,7 +48,7 @@ def segment_into_trips(self, transition_df, motion_df, timeseries, time_query): """ self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query) self.filtered_points_df.loc[:,"valid"] = True - self.transition_df = transition_df + self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) if len(self.transition_df) > 0: logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) else: @@ -88,7 +88,7 @@ def segment_into_trips(self, transition_df, motion_df, timeseries, time_query): # So we reset_index upstream and use it here. last10Points_df = self.filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1] lastPoint = self.find_last_valid_point(idx) - if self.has_trip_ended(lastPoint, currPoint, timeseries, motion_df): + if self.has_trip_ended(lastPoint, currPoint, timeseries): last_trip_end_point = lastPoint logging.debug("Appending last_trip_end_point %s with index %s " % (last_trip_end_point, idx-1)) @@ -144,7 +144,7 @@ def segment_into_trips(self, transition_df, motion_df, timeseries, time_query): logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) return segmentation_points - def has_trip_ended(self, lastPoint, currPoint, timeseries, motion_df): + def has_trip_ended(self, lastPoint, currPoint, timeseries): # So we must not have been moving for the last _time filter_ # points. So the trip must have ended # Since this is a distance filter, we detect that the last @@ -173,14 +173,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries, motion_df): # for this kind of test speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2))) - if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, self.transition_df): + if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries): logging.debug("tracking was restarted, ending trip") return True # In general, we get multiple locations between each motion activity. If we see a bunch of motion activities # between two location points, and there is a large gap between the last location and the first # motion activity as well, let us just assume that there was a restart - ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, motion_df) + ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries) ongoing_motion_check = len(ongoing_motion_in_range) > 0 if timeDelta > self.time_threshold and not ongoing_motion_check: logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" % diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index f2832d1df..d75760cd9 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -20,7 +20,6 @@ import emission.core.wrapper.location as ecwl import emission.analysis.intake.segmentation.restart_checking as eaisr -import emission.core.common as ec class DwellSegmentationTimeFilter(eaist.TripSegmentationMethod): def __init__(self, time_threshold, point_threshold, distance_threshold): @@ -54,7 +53,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold): self.point_threshold = point_threshold self.distance_threshold = distance_threshold - def segment_into_trips(self,transition_df,motion_df,timeseries, time_query): + def segment_into_trips(self, timeseries, time_query): """ Examines the timeseries database for a specific range and returns the segmentation points. Note that the input is the entire timeseries and @@ -114,26 +113,24 @@ def segment_into_trips(self,transition_df,motion_df,timeseries, time_query): # We are going to use the last 8 points for now. # TODO: Change this back to last 10 points once we normalize phone and this last10Points_df = filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1] - last10Points_coords=last10Points_df[['longitude','latitude']].to_numpy() - # create a similar dimension current cordintaes numpy array - currPoint_coords = np.repeat(np.array([[currPoint.longitude,currPoint.latitude]]),len(last10Points_df),axis=0) - #compute distance - last10PointsDistances=ec.calDistance(last10Points_coords,currPoint_coords) - # Reset current coordintes numpy array as per last 5 mins Points array's dimensions - currPoint_coords = np.repeat(np.array([[currPoint.longitude,currPoint.latitude]]),len(last5MinsPoints_df),axis=0) - # get 2d numpy array, from df - last5MinsPoints_coords=last5MinsPoints_df[['longitude','latitude']].to_numpy() - # calcualte distance - last5MinsDistances=ec.calDistance(last5MinsPoints_coords,currPoint_coords) + distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) + timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts + last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) + logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) + last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) + logging.debug("last10PointsDistances = %s with length %d, shape %s" % (last10PointsDistances.to_numpy(), + len(last10PointsDistances), + last10PointsDistances.shape)) + # Fix for https://github.com/e-mission/e-mission-server/issues/348 - last5MinTimes = currPoint.ts-last5MinsPoints_df.ts + last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % (len(last10PointsDistances), len(last5MinsDistances))) logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" % (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) - if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes, transition_df, motion_df): + if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): (ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df, last10Points_df, last5MinsPoints_df) segmentation_points.append((curr_trip_start_point, last_trip_end_point)) @@ -202,7 +199,7 @@ def continue_just_ended(self, idx, currPoint, filtered_points_df): else: return False - def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes, transition_df, motion_df): + def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): # Another mismatch between phone and server. Phone stops tracking too soon, # so the distance is still greater than the threshold at the end of the trip. # But then the next point is a long time away, so we can split again (similar to a distance filter) @@ -217,11 +214,11 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc speedDelta = np.nan speedThreshold = old_div(float(self.distance_threshold), self.time_threshold) - if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, transition_df): + if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries): logging.debug("tracking was restarted, ending trip") return True - ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, motion_df)) > 0 + ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0 if timeDelta > 2 * self.time_threshold and not ongoing_motion_check: logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" % (prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check)) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py index 76f13e2c9..c329fe69d 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py @@ -23,8 +23,8 @@ def is_huge_invalid_ts_offset(filterMethod, lastPoint, currPoint, timeseries, ecwm.MotionTypes.NONE.value, ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE.value] - non_still_motions=motionInRange[~motionInRange['type'].isin(ignore_modes_list) & (motionInRange['confidence'] ==100)] - #logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) + non_still_motions = [ma for ma in motionInRange if ma["data"]["type"] not in ignore_modes_list and ma["data"]["confidence"] == 100] + logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) non_still_motions_rate = len(non_still_motions) / (currPoint.ts - lastPoint.ts) diff --git a/emission/core/common.py b/emission/core/common.py index a2594643c..4d97ce681 100644 --- a/emission/core/common.py +++ b/emission/core/common.py @@ -10,7 +10,6 @@ from random import randrange import logging import copy -import numpy as np from datetime import datetime, timedelta from dateutil import parser from pytz import timezone @@ -52,20 +51,7 @@ def calDistance(point1, point2, coordinates=False): # SHANKARI: Why do we have two calDistance() functions? # Need to combine into one # points are now in geojson format (lng,lat) - - #Added to Support vectorization when dealing with numpy array - if isinstance(point1,np.ndarray) and isinstance(point2,np.ndarray): - dLat = np.radians(point1[:,1]-point2[:,1]) - dLon = np.radians(point1[:,0]-point2[:,0]) - lat1 = np.radians(point1[:,1]) - lat2 = np.radians(point2[:,1]) - - a = (np.sin(dLat/2) ** 2) + ((np.sin(dLon/2) ** 2) * np.cos(lat1) * np.cos(lat2)) - c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a)) - d = earthRadius * c - - return d - elif coordinates: + if coordinates: dLat = math.radians(point1.lat-point2.lat) dLon = math.radians(point1.lon-point2.lon) lat1 = math.radians(point1.lat) diff --git a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py index 575fac606..0cc469fea 100644 --- a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py +++ b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py @@ -68,12 +68,10 @@ def testEmptyCall(self): def testSegmentationPointsDwellSegmentationTimeFilter(self): ts = esta.TimeSeries.get_time_series(self.androidUUID) tq = estt.TimeQuery("metadata.write_ts", 1440658800, 1440745200) - transition_df = ts.get_data_df("statemachine/transition", tq) - motion_df = ts.get_data_df("background/motion_activity",tq) dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins point_threshold = 10, distance_threshold = 100) # 100 m - segmentation_points = dstfsm.segment_into_trips(transition_df, motion_df, ts, tq) + segmentation_points = dstfsm.segment_into_trips(ts, tq) for (start, end) in segmentation_points: logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts)) self.assertIsNotNone(segmentation_points) @@ -88,12 +86,10 @@ def testSegmentationPointsDwellSegmentationTimeFilter(self): def testSegmentationPointsDwellSegmentationDistFilter(self): ts = esta.TimeSeries.get_time_series(self.iosUUID) tq = estt.TimeQuery("metadata.write_ts", 1446796800, 1446847600) - transition_df = ts.get_data_df("statemachine/transition", tq) - motion_df = ts.get_data_df("background/motion_activity",tq) dstdsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 5 mins point_threshold = 10, distance_threshold = 100) # 100 m - segmentation_points = dstdsm.segment_into_trips(transition_df, motion_df, ts, tq) + segmentation_points = dstdsm.segment_into_trips(ts, tq) for (start, end) in segmentation_points: logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts)) self.assertIsNotNone(segmentation_points)