Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving Trip Segmentation by reducing DB calls #958

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions emission/analysis/intake/segmentation/restart_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,40 @@
import emission.core.wrapper.transition as ecwt
import emission.storage.timeseries.timequery as estt

def is_tracking_restarted_in_range(start_ts, end_ts, timeseries):
def is_tracking_restarted_in_range(start_ts, end_ts, transition_df):
"""
Check to see if tracing was restarted between the times specified
:param start_ts: the start of the time range to check
:param end_ts: the end of the time range to check
:param timeseries: the timeseries to use for checking
:return:
"""
import emission.storage.timeseries.timequery as estt
transition_df_start_idx=transition_df.ts.searchsorted(start_ts,side='left')
transition_df_end_idx=transition_df.ts.searchsorted(end_ts,side='right')
transition_df_for_current=transition_df.iloc[transition_df_start_idx:transition_df_end_idx]
Comment on lines +20 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you are using this instead of something like

    transition_df_for_current=transition_df[transition_df.ts >= start_ts && transition_df.ts <= end_ts]

or

    transition_df_for_current=transition_df.query('ts >= start_ts & .ts <= end_ts')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First was the performance reason, O(log (n)) here vs O (n) in others.
Second, in case of query(), it creates a boolean series that marks rows to keep or discard, which happens internally, which increases temporary memory usage. For very large DataFrames, this can be an issue.

can use this one ,if log(n) vs O(n) is not an issue :

transition_df_for_current=transition_df[transition_df.ts >= start_ts && transition_df.ts <= end_ts]


tq = estt.TimeQuery(timeType="data.ts", startTs=start_ts,
endTs=end_ts)
transition_df = timeseries.get_data_df("statemachine/transition", tq)
if len(transition_df) == 0:
if len(transition_df_for_current) == 0:
logging.debug("In range %s -> %s found no transitions" %
(tq.startTs, tq.endTs))
(start_ts, end_ts))
return False
logging.debug("In range %s -> %s found transitions %s" %
(tq.startTs, tq.endTs, transition_df[["fmt_time", "curr_state", "transition"]]))
return _is_tracking_restarted_android(transition_df) or \
_is_tracking_restarted_ios(transition_df)
(start_ts, end_ts, transition_df_for_current[["fmt_time", "curr_state", "transition"]]))
return _is_tracking_restarted_android(transition_df_for_current) or \
_is_tracking_restarted_ios(transition_df_for_current)

def get_ongoing_motion_in_range(start_ts, end_ts, timeseries):
tq = estt.TimeQuery(timeType = "data.ts", startTs = start_ts,
endTs = end_ts)
motion_list = list(timeseries.find_entries(["background/motion_activity"], tq))
def get_ongoing_motion_in_range(start_ts, end_ts, motion_df):
## in case when we receive an empty dataframe, there's nothing to
## process
if motion_df.shape == (0,0):
return motion_df

motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left')
motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right')
filtered_motion_df=motion_df.iloc[motion_df_start_idx:motion_df_end_idx]
Comment on lines +39 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

logging.debug("Found %s motion_activity entries in range %s -> %s" %
(len(motion_list), tq.startTs, tq.endTs))
logging.debug("sample activities are %s" % motion_list[0:5])
return motion_list
(len(filtered_motion_df),start_ts, end_ts))
#logging.debug("sample activities are %s" % filtered_motion_df.head())
return filtered_motion_df

def _is_tracking_restarted_android(transition_df):
"""
Expand Down
29 changes: 18 additions & 11 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from builtins import *
from builtins import object
import logging
import pandas as pd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to add this import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused currently. Removed.


import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.place_queries as esdp
Expand Down Expand Up @@ -65,6 +66,12 @@ def segment_current_trips(user_id):
# We need to use the appropriate filter based on the incoming data
# So let's read in the location points for the specified query
loc_df = ts.get_data_df("background/filtered_location", time_query)
transition_df = ts.get_data_df("statemachine/transition", time_query)
motion_df = ts.get_data_df("background/motion_activity",time_query)
if len(transition_df) > 0:
logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]])
else:
logging.debug("no transitions found. This can happen for continuous sensing")
if len(loc_df) == 0:
# no new segments, no need to keep looking at these again
logging.debug("len(loc_df) == 0, early return")
Expand All @@ -89,12 +96,12 @@ def segment_current_trips(user_id):
if len(filters_in_df) == 1:
# Common case - let's make it easy

segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(ts,
segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(transition_df,motion_df,ts,
time_query)
else:
segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query,
filters_in_df,
filter_methods)
filter_methods,transition_df,motion_df)
# Create and store trips and places based on the segmentation points
if segmentation_points is None:
epq.mark_segmentation_failed(user_id)
Expand All @@ -104,13 +111,13 @@ def segment_current_trips(user_id):
epq.mark_segmentation_done(user_id, None)
else:
try:
create_places_and_trips(user_id, segmentation_points, filter_method_names[filters_in_df[0]])
create_places_and_trips(user_id, transition_df, segmentation_points, filter_method_names[filters_in_df[0]])
epq.mark_segmentation_done(user_id, get_last_ts_processed(filter_methods))
except:
logging.exception("Trip generation failed for user %s" % user_id)
epq.mark_segmentation_failed(user_id)

def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods):
def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods,transition_df, motion_df):
"""
We can have mixed filters in a particular time range for multiple reasons.
a) user switches phones from one platform to another
Expand Down Expand Up @@ -149,7 +156,7 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt
time_query.endTs = loc_df.iloc[endIndex+1].ts
logging.debug("for filter %s, startTs = %d and endTs = %d" %
(curr_filter, time_query.startTs, time_query.endTs))
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query)
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(transition_df,motion_df,ts, time_query)
logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys()))
sortedStartTsList = sorted(segmentation_map.keys())
segmentation_points = []
Expand All @@ -171,7 +178,7 @@ def get_last_ts_processed(filter_methods):
logging.info("Returning last_ts_processed = %s" % last_ts_processed)
return last_ts_processed

def create_places_and_trips(user_id, segmentation_points, segmentation_method_name):
def create_places_and_trips(user_id,transition_df, segmentation_points, segmentation_method_name):
# new segments, need to deal with them
# First, retrieve the last place so that we can stitch it to the newly created trip.
# Again, there are easy and hard. In the easy case, the trip was
Expand Down Expand Up @@ -214,7 +221,7 @@ def create_places_and_trips(user_id, segmentation_points, segmentation_method_na
new_place_entry = ecwe.Entry.create_entry(user_id,
"segmentation/raw_place", new_place, create_id = True)

if found_untracked_period(ts, last_place_entry.data, start_loc, segmentation_method_name):
if found_untracked_period(transition_df, last_place_entry.data, start_loc, segmentation_method_name):
# Fill in the gap in the chain with an untracked period
curr_untracked = ecwut.Untrackedtime()
curr_untracked.source = segmentation_method_name
Expand Down Expand Up @@ -254,7 +261,7 @@ def _link_and_save(ts, last_place_entry, curr_trip_entry, new_place_entry, start
# it will be lost
ts.update(last_place_entry)

def found_untracked_period(timeseries, last_place, start_loc, segmentation_method_name):
def found_untracked_period(transition_df, last_place, start_loc, segmentation_method_name):
"""
Check to see whether the two places are the same.
This is a fix for https://github.com/e-mission/e-mission-server/issues/378
Expand All @@ -270,7 +277,7 @@ def found_untracked_period(timeseries, last_place, start_loc, segmentation_metho
logging.debug("start of a chain, unable to check for restart from previous trip end, assuming not restarted")
return False

if _is_tracking_restarted(last_place, start_loc, timeseries):
if _is_tracking_restarted(last_place, start_loc, transition_df):
logging.debug("tracking has been restarted, returning True")
return True

Expand Down Expand Up @@ -378,6 +385,6 @@ def stitch_together_end(new_place_entry, curr_trip_entry, end_loc):
new_place_entry["data"] = new_place
curr_trip_entry["data"] = curr_trip

def _is_tracking_restarted(last_place, start_loc, timeseries):
return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, timeseries)
def _is_tracking_restarted(last_place, start_loc, transition_df):
return eaisr.is_tracking_restarted_in_range(last_place.enter_ts, start_loc.ts, transition_df)

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
self.point_threshold = point_threshold
self.distance_threshold = distance_threshold

def segment_into_trips(self, timeseries, time_query):
def segment_into_trips(self, transition_df, motion_df, timeseries, time_query):
"""
Examines the timeseries database for a specific range and returns the
segmentation points. Note that the input is the entire timeseries and
Expand All @@ -48,7 +48,7 @@ def segment_into_trips(self, timeseries, time_query):
"""
self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query)
self.filtered_points_df.loc[:,"valid"] = True
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
self.transition_df = transition_df
if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
else:
Expand Down Expand Up @@ -88,7 +88,7 @@ def segment_into_trips(self, timeseries, time_query):
# So we reset_index upstream and use it here.
last10Points_df = self.filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
lastPoint = self.find_last_valid_point(idx)
if self.has_trip_ended(lastPoint, currPoint, timeseries):
if self.has_trip_ended(lastPoint, currPoint, timeseries, motion_df):
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx-1))
Expand Down Expand Up @@ -144,7 +144,7 @@ def segment_into_trips(self, timeseries, time_query):
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
return segmentation_points

def has_trip_ended(self, lastPoint, currPoint, timeseries):
def has_trip_ended(self, lastPoint, currPoint, timeseries, motion_df):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are already passing in the motion_df here, why do you need to also pass in the timeseries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's being used here :

   timeseries.invalidate_raw_entry(currPoint["_id"])

# So we must not have been moving for the last _time filter_
# points. So the trip must have ended
# Since this is a distance filter, we detect that the last
Expand Down Expand Up @@ -173,14 +173,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries):
# for this kind of test
speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2)))

if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries):
if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, self.transition_df):
Comment on lines -176 to +177
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is transition_df a module field?

logging.debug("tracking was restarted, ending trip")
return True

# In general, we get multiple locations between each motion activity. If we see a bunch of motion activities
# between two location points, and there is a large gap between the last location and the first
# motion activity as well, let us just assume that there was a restart
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries)
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, motion_df)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while motion_df is not. They are both read upfront and should be treated the same way for clarity

ongoing_motion_check = len(ongoing_motion_in_range) > 0
if timeDelta > self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
self.point_threshold = point_threshold
self.distance_threshold = distance_threshold

def segment_into_trips(self, timeseries, time_query):
def segment_into_trips(self, transition_df,motion_df, timeseries, time_query):
"""
Examines the timeseries database for a specific range and returns the
segmentation points. Note that the input is the entire timeseries and
Expand Down Expand Up @@ -130,7 +130,7 @@ def segment_into_trips(self, timeseries, time_query):
logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" %
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))

if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df, motion_df):
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df,
last10Points_df, last5MinsPoints_df)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
Expand Down Expand Up @@ -199,7 +199,7 @@ def continue_just_ended(self, idx, currPoint, filtered_points_df):
else:
return False

def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes):
def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes, transition_df, motion_df):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. if we are passing in the transition_df and motion_df, why do we also need the timeseries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, it's unused. Cleared.

# Another mismatch between phone and server. Phone stops tracking too soon,
# so the distance is still greater than the threshold at the end of the trip.
# But then the next point is a long time away, so we can split again (similar to a distance filter)
Expand All @@ -214,11 +214,11 @@ def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistanc
speedDelta = np.nan
speedThreshold = old_div(float(self.distance_threshold), self.time_threshold)

if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, timeseries):
if eaisr.is_tracking_restarted_in_range(prev_point.ts, curr_point.ts, transition_df):
logging.debug("tracking was restarted, ending trip")
return True

ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, timeseries)) > 0
ongoing_motion_check = len(eaisr.get_ongoing_motion_in_range(prev_point.ts, curr_point.ts, motion_df)) > 0
if timeDelta > 2 * self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
(prev_point.ts, curr_point.ts,self.time_threshold, curr_point.ts - prev_point.ts, ongoing_motion_check))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def is_huge_invalid_ts_offset(filterMethod, lastPoint, currPoint, timeseries,
ecwm.MotionTypes.NONE.value,
ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE.value]

non_still_motions = [ma for ma in motionInRange if ma["data"]["type"] not in ignore_modes_list and ma["data"]["confidence"] == 100]
logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions])
non_still_motions=motionInRange[~motionInRange['type'].isin(ignore_modes_list) & (motionInRange['confidence'] ==100)]
#logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change doesn't appear to be related to this fix.


non_still_motions_rate = len(non_still_motions) / (currPoint.ts - lastPoint.ts)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ def testEmptyCall(self):
def testSegmentationPointsDwellSegmentationTimeFilter(self):
ts = esta.TimeSeries.get_time_series(self.androidUUID)
tq = estt.TimeQuery("metadata.write_ts", 1440658800, 1440745200)
transition_df = ts.get_data_df("statemachine/transition", tq)
motion_df = ts.get_data_df("background/motion_activity",tq)
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins
point_threshold = 10,
distance_threshold = 100) # 100 m
segmentation_points = dstfsm.segment_into_trips(ts, tq)
segmentation_points = dstfsm.segment_into_trips(transition_df, motion_df, ts, tq)
for (start, end) in segmentation_points:
logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts))
self.assertIsNotNone(segmentation_points)
Expand All @@ -86,10 +88,12 @@ def testSegmentationPointsDwellSegmentationTimeFilter(self):
def testSegmentationPointsDwellSegmentationDistFilter(self):
ts = esta.TimeSeries.get_time_series(self.iosUUID)
tq = estt.TimeQuery("metadata.write_ts", 1446796800, 1446847600)
transition_df = ts.get_data_df("statemachine/transition", tq)
motion_df = ts.get_data_df("background/motion_activity",tq)
dstdsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 5 mins
point_threshold = 10,
distance_threshold = 100) # 100 m
segmentation_points = dstdsm.segment_into_trips(ts, tq)
segmentation_points = dstdsm.segment_into_trips(transition_df, motion_df, ts, tq)
for (start, end) in segmentation_points:
logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts))
self.assertIsNotNone(segmentation_points)
Expand Down
Loading