Skip to content

Commit

Permalink
Revert "Improving Trip Segmentation by reducing DB calls"
Browse files Browse the repository at this point in the history
This reverts commit d95b2a0.
  • Loading branch information
humbleOldSage committed Feb 6, 2024
1 parent d95b2a0 commit fe4c33b
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 85 deletions.
37 changes: 17 additions & 20 deletions emission/analysis/intake/segmentation/restart_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,36 @@
import emission.core.wrapper.transition as ecwt
import emission.storage.timeseries.timequery as estt

def is_tracking_restarted_in_range(start_ts, end_ts, transition_df):
def is_tracking_restarted_in_range(start_ts, end_ts, timeseries):
"""
Check to see if tracing was restarted between the times specified
:param start_ts: the start of the time range to check
:param end_ts: the end of the time range to check
:param timeseries: the timeseries to use for checking
:return:
"""
transition_df_start_idx=transition_df.ts.searchsorted(start_ts,side='left')
transition_df_end_idx=transition_df.ts.searchsorted(end_ts,side='right')
transition_df_for_current=transition_df.iloc[transition_df_start_idx:transition_df_end_idx]
import emission.storage.timeseries.timequery as estt

if len(transition_df_for_current) == 0:
tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
endTs=end_ts)
transition_df = timeseries.get_data_df("statemachine/transition", tq)
if len(transition_df) == 0:
logging.debug("In range %s -> %s found no transitions" %
(start_ts, end_ts))
(tq.startTs, tq.endTs))
return False
logging.debug("In range %s -> %s found transitions %s" %
(start_ts, end_ts, transition_df_for_current[["fmt_time", "curr_state", "transition"]]))
return _is_tracking_restarted_android(transition_df_for_current) or \
_is_tracking_restarted_ios(transition_df_for_current)
(tq.startTs, tq.endTs, transition_df[["fmt_time", "curr_state", "transition"]]))
return _is_tracking_restarted_android(transition_df) or \
_is_tracking_restarted_ios(transition_df)

def get_ongoing_motion_in_range(start_ts, end_ts, motion_df):
## in case when we receive an empty dataframe, there's nothing to
## process
if motion_df.shape == (0,0):
return motion_df

motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left')
motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right')
filtered_motion_df=motion_df.iloc[motion_df_start_idx:motion_df_end_idx]
def get_ongoing_motion_in_range(start_ts, end_ts, timeseries):
tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts,
endTs = end_ts)
motion_list = list(timeseries.find_entries(["background/motion_activity"], tq))
logging.debug("Found %s motion_activity entries in range %s -> %s" %
(len(filtered_motion_df),start_ts, end_ts))
return filtered_motion_df
(len(motion_list), tq.startTs, tq.endTs))
logging.debug("sample activities are %s" % motion_list[0:5])
return motion_list

def _is_tracking_restarted_android(transition_df):
"""
Expand Down
29 changes: 11 additions & 18 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from builtins import *
from builtins import object
import logging
import pandas as pd

import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.place_queries as esdp
Expand Down Expand Up @@ -66,12 +65,6 @@ def segment_current_trips(user_id):
# We need to use the appropriate filter based on the incoming data
# So let's read in the location points for the specified query
loc_df = ts.get_data_df("background/filtered_location", time_query)
transition_df = ts.get_data_df("statemachine/transition", time_query)
motion_df = ts.get_data_df("background/motion_activity",time_query)
if len(transition_df) > 0:
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
else:
logging.debug("no transitions found. This can happen for continuous sensing")
if len(loc_df) == 0:
# no new segments, no need to keep looking at these again
logging.debug("len(loc_df) == 0, early return")
Expand All @@ -96,12 +89,12 @@ def segment_current_trips(user_id):
if len(filters_in_df) == 1:
# Common case - let's make it easy

segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(transition_df,motion_df,ts,
segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts,
time_query)
else:
segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query,
filters_in_df,
filter_methods,transition_df,motion_df)
filter_methods)
# Create and store trips and places based on the segmentation points
if segmentation_points is None:
epq.mark_segmentation_failed(user_id)
Expand All @@ -111,13 +104,13 @@ def segment_current_trips(user_id):
epq.mark_segmentation_done(user_id, None)
else:
try:
create_places_and_trips(user_id, transition_df, segmentation_points, filter_method_names[filters_in_df[0]])
create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]])
epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods))
except:
logging.exception("Trip generation failed for user %s" % user_id)
epq.mark_segmentation_failed(user_id)

def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods, transition_df, motion_df):
def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods):
"""
We can have mixed filters in a particular time range for multiple reasons.
a) user switches phones from one platform to another
Expand Down Expand Up @@ -156,7 +149,7 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt
time_query.endTs = loc_df.iloc[endIndex+1].ts
logging.debug("for filter %s, startTs = %d and endTs = %d" %
(curr_filter, time_query.startTs, time_query.endTs))
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(transition_df,motion_df,ts, time_query)
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query)
logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys()))
sortedStartTsList = sorted(segmentation_map.keys())
segmentation_points = []
Expand All @@ -178,7 +171,7 @@ def get_last_ts_processed(filter_methods):
logging.info("Returning last_ts_processed = %s" % last_ts_processed)
return last_ts_processed

def create_places_and_trips(user_id, transition_df, segmentation_points, segmentation_method_name):
def create_places_and_trips(user_id, segmentation_points, segmentation_method_name):
# new segments, need to deal with them
# First, retrieve the last place so that we can stitch it to the newly created trip.
# Again, there are easy and hard. In the easy case, the trip was
Expand Down Expand Up @@ -221,7 +214,7 @@ def create_places_and_trips(user_id, transition_df, segmentation_points, segment
new_place_entry = ecwe.Entry.create_entry(user_id,
"segmentation/raw_place", new_place, create_id = True)

if found_untracked_period(transition_df, last_place_entry.data, start_loc, segmentation_method_name):
if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name):
# Fill in the gap in the chain with an untracked period
curr_untracked = ecwut.Untrackedtime()
curr_untracked.source = segmentation_method_name
Expand Down Expand Up @@ -261,7 +254,7 @@ def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start
# it will be lost
ts.update(last_place_entry)

def found_untracked_period(transition_df, last_place, start_loc, segmentation_method_name):
def found_untracked_period(timeseries, last_place, start_loc, segmentation_method_name):
"""
Check to see whether the two places are the same.
This is a fix for https://github.com/e-mission/e-mission-server/issues/378
Expand All @@ -277,7 +270,7 @@ def found_untracked_period(transition_df, last_place, start_loc, segmentation_me
logging.debug("start of a chain, unable to check for restart from previous trip end, assuming not restarted")
return False

if _is_tracking_restarted(last_place, start_loc, transition_df):
if _is_tracking_restarted(last_place, start_loc, timeseries):
logging.debug("tracking has been restarted, returning True")
return True

Expand Down Expand Up @@ -385,6 +378,6 @@ def stitch_together_end(new_place_entry, curr_trip_entry, end_loc):
new_place_entry["data"] = new_place
curr_trip_entry["data"] = curr_trip

def _is_tracking_restarted(last_place, start_loc, transition_df):
return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, transition_df)
def _is_tracking_restarted(last_place, start_loc, timeseries):
return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, timeseries)

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
self.point_threshold = point_threshold
self.distance_threshold = distance_threshold

def segment_into_trips(self, transition_df, motion_df, timeseries, time_query):
def segment_into_trips(self, timeseries, time_query):
"""
Examines the timeseries database for a specific range and returns the
segmentation points. Note that the input is the entire timeseries and
Expand All @@ -48,7 +48,7 @@ def segment_into_trips(self, transition_df, motion_df, timeseries, time_query):
"""
self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query)
self.filtered_points_df.loc[:,"valid"] = True
self.transition_df = transition_df
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
else:
Expand Down Expand Up @@ -88,7 +88,7 @@ def segment_into_trips(self, transition_df, motion_df, timeseries, time_query):
# So we reset_index upstream and use it here.
last10Points_df = self.filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
lastPoint = self.find_last_valid_point(idx)
if self.has_trip_ended(lastPoint, currPoint, timeseries, motion_df):
if self.has_trip_ended(lastPoint, currPoint, timeseries):
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx-1))
Expand Down Expand Up @@ -144,7 +144,7 @@ def segment_into_trips(self, transition_df, motion_df, timeseries, time_query):
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
return segmentation_points

def has_trip_ended(self, lastPoint, currPoint, timeseries, motion_df):
def has_trip_ended(self, lastPoint, currPoint, timeseries):
# So we must not have been moving for the last _time filter_
# points. So the trip must have ended
# Since this is a distance filter, we detect that the last
Expand Down Expand Up @@ -173,14 +173,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries, motion_df):
# for this kind of test
speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2)))

if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, self.transition_df):
if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries):
logging.debug("tracking was restarted, ending trip")
return True

# In general, we get multiple locations between each motion activity. If we see a bunch of motion activities
# between two location points, and there is a large gap between the last location and the first
# motion activity as well, let us just assume that there was a restart
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, motion_df)
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries)
ongoing_motion_check = len(ongoing_motion_in_range) > 0
if timeDelta > self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import emission.core.wrapper.location as ecwl

import emission.analysis.intake.segmentation.restart_checking as eaisr
import emission.core.common as ec

class DwellSegmentationTimeFilter(eaist.TripSegmentationMethod):
def __init__(self, time_threshold, point_threshold, distance_threshold):
Expand Down Expand Up @@ -54,7 +53,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
self.point_threshold = point_threshold
self.distance_threshold = distance_threshold

def segment_into_trips(self,transition_df,motion_df,timeseries, time_query):
def segment_into_trips(self, timeseries, time_query):
"""
Examines the timeseries database for a specific range and returns the
segmentation points. Note that the input is the entire timeseries and
Expand Down Expand Up @@ -114,26 +113,24 @@ def segment_into_trips(self,transition_df,motion_df,timeseries, time_query):
# We are going to use the last 8 points for now.
# TODO: Change this back to last 10 points once we normalize phone and this
last10Points_df = filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
last10Points_coords=last10Points_df[['longitude','latitude']].to_numpy()
# create a similar dimension current cordintaes numpy array
currPoint_coords = np.repeat(np.array([[currPoint.longitude,currPoint.latitude]]),len(last10Points_df),axis=0)
#compute distance
last10PointsDistances=ec.calDistance(last10Points_coords,currPoint_coords)
# Reset current coordintes numpy array as per last 5 mins Points array's dimensions
currPoint_coords = np.repeat(np.array([[currPoint.longitude,currPoint.latitude]]),len(last5MinsPoints_df),axis=0)
# get 2d numpy array, from df
last5MinsPoints_coords=last5MinsPoints_df[['longitude','latitude']].to_numpy()
# calcualte distance
last5MinsDistances=ec.calDistance(last5MinsPoints_coords,currPoint_coords)
distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint)
timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts
last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1)
logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances)))
last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1)
logging.debug("last10PointsDistances = %s with length %d, shape %s" % (last10PointsDistances.to_numpy(),
len(last10PointsDistances),
last10PointsDistances.shape))

# Fix for https://github.com/e-mission/e-mission-server/issues/348
last5MinTimes = currPoint.ts-last5MinsPoints_df.ts
last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1)

logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" %
(len(last10PointsDistances), len(last5MinsDistances)))
logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" %
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))

if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes, transition_df, motion_df):
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df,
last10Points_df, last5MinsPoints_df)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
Expand Down Expand Up @@ -202,7 +199,7 @@ def continue_just_ended(self, idx, currPoint, filtered_points_df):
else:
return False

def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes, transition_df, motion_df):
def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
# Another mismatch between phone and server. Phone stops tracking too soon,
# so the distance is still greater than the threshold at the end of the trip.
# But then the next point is a long time away, so we can split again (similar to a distance filter)
Expand All @@ -217,11 +214,11 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc
speedDelta = np.nan
speedThreshold = old_div(float(self.distance_threshold), self.time_threshold)

if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, transition_df):
if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries):
logging.debug("tracking was restarted, ending trip")
return True

ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, motion_df)) > 0
ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0
if timeDelta > 2 * self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
(prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def is_huge_invalid_ts_offset(filterMethod, lastPoint, currPoint, timeseries,
ecwm.MotionTypes.NONE.value,
ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE.value]

non_still_motions=motionInRange[~motionInRange['type'].isin(ignore_modes_list) & (motionInRange['confidence'] ==100)]
#logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions])
non_still_motions = [ma for ma in motionInRange if ma["data"]["type"] not in ignore_modes_list and ma["data"]["confidence"] == 100]
logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions])

non_still_motions_rate = len(non_still_motions) / (currPoint.ts - lastPoint.ts)

Expand Down
Loading

0 comments on commit fe4c33b

Please sign in to comment.