Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scalability fixes - Load model once per user #944

Merged
merged 18 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 132 additions & 95 deletions emission/analysis/classification/inference/labels/inferrers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import logging
import random
import copy
import time
import arrow

import emission.analysis.modelling.tour_model_first_only.load_predict as lp
import emission.analysis.modelling.trip_model.run_model as eamur
Expand All @@ -20,109 +22,128 @@
# the user walks to the location it is to shop (e.g., because they don't have a basket on
# their bike), and the user bikes to the location four times more than they walk there.
# Obviously, it is a simplification.
def placeholder_predictor_0(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
]
def placeholder_predictor_0(trip_list):
predictions_list = []
for trip in trip_list:
predictions = [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
]
predictions_list.append(predictions)
return predictions_list


# The next placeholder scenario provides that same set of labels in 75% of cases and no
# labels in the rest.
def placeholder_predictor_1(trip):
return [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
] if random.random() > 0.25 else []
def placeholder_predictor_1(trip_list):
predictions_list = []
for trip in trip_list:
predictions = [
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
] if random.random() > 0.25 else []
predictions_list.append(predictions)
return predictions_list



# This third scenario provides labels designed to test the soundness and resilience of
# the client-side inference processing algorithms.
def placeholder_predictor_2(trip):
# Timestamp2index gives us a deterministic way to match test trips with labels
# Hardcoded to match "test_july_22" -- clearly, this is just for testing
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
return [
[

],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "drove_alone"}, "p": 0.8},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][index]
def placeholder_predictor_2(trip_list):
predictions_list = []
for trip in trip_list:
# Timestamp2index gives us a deterministic way to match test trips with labels
# Hardcoded to match "test_july_22" -- clearly, this is just for testing
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
predictions = [
[

],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "drove_alone"}, "p": 0.8},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.8},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.2}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
]
][index]
predictions_list.append(predictions)
return predictions_list


# This fourth scenario provides labels designed to test the expectation and notification system.
def placeholder_predictor_3(trip):
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
return [
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04}
]
][index]
def placeholder_predictor_3(trip_list):
predictions_list = []
for trip in trip_list:
timestamp2index = {494: 5, 565: 4, 795: 3, 805: 2, 880: 1, 960: 0}
timestamp = trip["data"]["start_local_dt"]["hour"]*60+trip["data"]["start_local_dt"]["minute"]
index = timestamp2index[timestamp] if timestamp in timestamp2index else 0
predictions = [
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.80},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.20}
],
[
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "entertainment"}, "p": 0.70},
],
[
{"labels": {"mode_confirm": "bike", "purpose_confirm": "work"}, "p": 0.96},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.04}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.45},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.35},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.15},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.05}
],
[
{"labels": {"mode_confirm": "walk", "purpose_confirm": "shopping"}, "p": 0.60},
{"labels": {"mode_confirm": "walk", "purpose_confirm": "entertainment"}, "p": 0.25},
{"labels": {"mode_confirm": "drove_alone", "purpose_confirm": "work"}, "p": 0.11},
{"labels": {"mode_confirm": "shared_ride", "purpose_confirm": "work"}, "p": 0.04}
]
][index]
predictions_list.append(predictions)
return predictions_list

# Placeholder that is suitable for a demo.
# Finds all unique label combinations for this user and picks one randomly
def placeholder_predictor_demo(trip):
def placeholder_predictor_demo(trip_list):
import random

import emission.core.get_database as edb
user = trip["user_id"]
unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user}).distinct("data.user_input")

unique_user_inputs = edb.get_analysis_timeseries_db().find({"user_id": user_id}).distinct("data.user_input")
predictions_list = []
if len(unique_user_inputs) == 0:
return []
predictions_list.append([])
return predictions_list
random_user_input = random.choice(unique_user_inputs) if random.randrange(0,10) > 0 else []

logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user}, returning value {random_user_input}")
return [{"labels": random_user_input, "p": random.random()}]
logging.debug(f"In placeholder_predictor_demo: found {len(unique_user_inputs)} for user {user_id}, returning value {random_user_input}")
predictions_list.append([{"labels": random_user_input, "p": random.random()}])
return predictions_list

# Non-placeholder implementation. First bins the trips, and then clusters every bin
# See emission.analysis.modelling.tour_model for more details
Expand All @@ -141,18 +162,34 @@ def n_to_confidence_coeff(n, max_confidence=None, first_confidence=None, confide
return max_confidence-(max_confidence-first_confidence)*(1-confidence_multiplier)**(n-1) # This is the u = ... formula in the issue

# predict_two_stage_bin_cluster but with the above reduction in confidence
def predict_cluster_confidence_discounting(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None):
def predict_cluster_confidence_discounting(trip_list, max_confidence=None, first_confidence=None, confidence_multiplier=None):
# load application config
model_type = eamtc.get_model_type()
model_storage = eamtc.get_model_storage()
labels, n = eamur.predict_labels_with_n(trip, model_type, model_storage)
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
return labels

confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")

labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
return labels
# assert and fetch unique user id for trip_list
user_id_list = []
for trip in trip_list:
user_id_list.append(trip['user_id'])
assert user_id_list.count(user_id_list[0]) == len(user_id_list), "Multiple user_ids found for trip_list, expected unique user_id for all trips"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, future fix: you should add some additional details to the log - what were the unique user IDs, how many is "multiiple"? Otherwise, if I see this log, I am not sure how to debug it.

# Assertion successful, use unique user_id
user_id = user_id_list[0]

# load model
start_model_load_time = time.process_time()
model = eamur._load_stored_trip_model(user_id, model_type, model_storage)
print(f"{arrow.now()} Inside predict_labels_n: Model load time = {time.process_time() - start_model_load_time}")

labels_n_list = eamur.predict_labels_with_n(trip_list, model)
predictions_list = []
for labels, n in labels_n_list:
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; returning as-is")
predictions_list.append(labels)
continue
confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")
labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
predictions_list.append(labels)
return predictions_list
72 changes: 47 additions & 25 deletions emission/analysis/classification/inference/labels/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import random
import copy
import time
import arrow

# Our imports
import emission.storage.pipeline_queries as epq
Expand Down Expand Up @@ -55,48 +57,68 @@ def run_prediction_pipeline(self, user_id, time_range):
self.ts = esta.TimeSeries.get_time_series(user_id)
self.toPredictTrips = esda.get_entries(
esda.CLEANED_TRIP_KEY, user_id, time_query=time_range)
for cleaned_trip in self.toPredictTrips:
# Create an inferred trip

cleaned_trip_list = self.toPredictTrips
inferred_trip_list = []
results_dict = {}
ensemble_list = []

# Create list of inferred trips
for cleaned_trip in cleaned_trip_list:
cleaned_trip_dict = copy.copy(cleaned_trip)["data"]
inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict)

inferred_trip_list.append(inferred_trip)

if inferred_trip_list:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this always going to be true, given that you define inferred_trip_list in line 62?
Are you actually testing against the "no trip" case?
I suspect this doesn't matter, since we just iterate through the trips downstream, but then we can just skip this check altogether given that it will always be true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did test this out and Python evaluates an empty list to be False when used in an if condition.
So, it does not enter the if block unless trips are loaded into inferred_trip_list in the for loop above the if block and the list becomes non-empty which then evaluates to True.

# Computing outside loop by passing trip_list to ensure model loads once
# Run the algorithms and the ensemble, store results
results = self.compute_and_save_algorithms(inferred_trip)
ensemble = self.compute_and_save_ensemble(inferred_trip, results)
results_dict = self.compute_and_save_algorithms(inferred_trip_list)
ensemble_list = self.compute_and_save_ensemble(inferred_trip_list, results_dict)

start_insert_inferred_trip_time = time.process_time()
for cleaned_trip, inferred_trip, ensemble in zip(cleaned_trip_list, inferred_trip_list, ensemble_list):
# Put final results into the inferred trip and store it
inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id()
inferred_trip["data"]["inferred_labels"] = ensemble["prediction"]
self.ts.insert(inferred_trip)

if self._last_trip_done is None or self._last_trip_done["data"]["end_ts"] < cleaned_trip["data"]["end_ts"]:
self._last_trip_done = cleaned_trip
# print(f"{arrow.now()} Inside run_prediction_pipeline: Saving inferred_trip total time = {time.process_time() - start_insert_inferred_trip_time}")

# This is where the labels for a given trip are actually predicted.
# Though the only information passed in is the trip object, the trip object can provide the
# user_id and other potentially useful information.
def compute_and_save_algorithms(self, trip):
predictions = []
def compute_and_save_algorithms(self, trip_list):
predictions_dict = {trip.get_id(): [] for trip in trip_list}
for algorithm_id, algorithm_fn in primary_algorithms.items():
prediction = algorithm_fn(trip)
lp = ecwl.Labelprediction()
lp.trip_id = trip.get_id()
lp.algorithm_id = algorithm_id
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
return predictions
prediction_list = algorithm_fn(trip_list)
start_insert_inference_labels_time = time.process_time()
for trip, prediction in zip(trip_list, prediction_list):
lp = ecwl.Labelprediction()
lp.algorithm_id = algorithm_id
lp.trip_id = trip.get_id()
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions_dict[trip.get_id()].append(lp)
# print(f"{arrow.now()} Inside compute_and_save_algorithms: Saving inference/labels total time = {time.process_time() - start_insert_inference_labels_time}")
return predictions_dict

# Combine all our predictions into a single ensemble prediction.
# As a placeholder, we just take the first prediction.
# TODO: implement a real combination algorithm.
def compute_and_save_ensemble(self, trip, predictions):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions)
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
return il
def compute_and_save_ensemble(self, trip_list, predictions_dict):
start_insert_inferred_labels_time = time.process_time()
il_list = []
for trip, key in zip(trip_list, predictions_dict):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions_dict[key])
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
il_list.append(il)
# print(f"{arrow.now()} Inside compute_and_save_ensemble: Saving inferred_labels total time = {time.process_time() - start_insert_inferred_labels_time}")
return il_list
Loading
Loading