Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace mode pipeline #892

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
50 changes: 50 additions & 0 deletions conf/analysis/trip_model.conf.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,56 @@
"similarity_threshold_meters": 500,
"apply_cutoff": false,
"incremental_evaluation": false
},
"gbdt": {
"feature_list": {
"data.user_input.mode_confirm": [
"drive",
"bike",
"walk",
"transit"
],
"data.user_input.purpose_confirm": [
"work",
"home",
"school"
]
},
"dependent_var": {
"name": "data.user_input.replaced_mode",
"classes": [
"drive",
"walk",
"bike",
"transit"
]
},
"incremental_evaluation": false
},
"svm": {
"feature_list": {
"data.user_input.mode_confirm": [
"drive",
"bike",
"walk",
"transit"
],
"data.user_input.purpose_confirm": [
"work",
"home",
"school"
]
},
"dependent_var": {
"name": "data.user_input.replaced_mode",
"classes": [
"drive",
"walk",
"bike",
"transit"
]
},
"incremental_evaluation": true
}
}
}
16 changes: 16 additions & 0 deletions emission/analysis/classification/inference/labels/inferrers.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,19 @@ def predict_cluster_confidence_discounting(trip, max_confidence=None, first_conf
labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
return labels

def predict_gradient_boosted_decision_tree(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None):
# load application config
model_type = eamtc.get_model_type()
Comment on lines +160 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like it is just a copy/paste of the previous predict_cluster_confidence_discounting
Why does this have to be in the labels directory anyway?
labels is for predicting labels based on other labels
replaced_mode is for predicting the replaced mode based on other characteristics (e.g. demographics).

So while it is appropriate to have this be inspired by the label assist algorithm, it is its own algorithm/model, and for clarity, it should be in its own directory. Its scaffolding can be similar to the label assist, but it is not a label assist.

model_storage = eamtc.get_model_storage()
labels, n = eamur.predict_labels_with_gbdt(trip, model_type, model_storage)
if n <= 0: # No model data or trip didn't match a cluster
logging.debug(f"In predict_gradient_boosted_decision_tree: n={n}; returning as-is")
return labels

# confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier)
# logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}")

labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
return labels
Comment on lines +172 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concretely, this is also wrong because there will not be a label array or probabilities.
Note that this code as written does not work because confidence_coeff is not defined.

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Standard imports
import logging
import random
import copy

# Our imports
import emission.storage.pipeline_queries as epq
import emission.storage.timeseries.abstract_timeseries as esta
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.core.wrapper.labelprediction as ecwl
import emission.core.wrapper.entry as ecwe
import emission.analysis.classification.inference.labels.inferrers as eacili
import emission.analysis.classification.inference.labels.ensembles as eacile


# For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which
# runs on the results of other algorithms), primary_algorithms specifies a corresponding
# function in eacili to run. This makes it easy to plug in additional algorithms later.
primary_algorithms = {
ecwl.AlgorithmTypes.GRADIENT_BOOSTED_DECISION_TREE: eacili.predict_gradient_boosted_decision_tree
}

# ensemble specifies which algorithm in eacile to run.
# This makes it easy to test various ways of combining various algorithms.
ensemble = eacile.ensemble_first_prediction


# Does all the work necessary for a given user
def infer_labels(user_id):
time_query = epq.get_time_range_for_label_inference(user_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, this is not the time range to query for because that will return the time range for the label inference algorithm. You are your own algorithm and you need your own time range

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break the pipeline unless changed.

try:
lip = LabelInferencePipeline()
lip.user_id = user_id
lip.run_prediction_pipeline(user_id, time_query)
if lip.last_trip_done is None:
logging.debug("After run, last_trip_done == None, must be early return")
epq.mark_label_inference_done(user_id, lip.last_trip_done)
except:
logging.exception("Error while inferring labels, timestamp is unchanged")
epq.mark_label_inference_failed(user_id)

# Code structure based on emission.analysis.classification.inference.mode.pipeline
# and emission.analysis.classification.inference.mode.rule_engine
class LabelInferencePipeline:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, this needs to change for clarity

def __init__(self):
self._last_trip_done = None

@property
def last_trip_done(self):
return self._last_trip_done

# For a given user and time range, runs all the primary algorithms and ensemble, saves results
# to the database, and records progress
def run_prediction_pipeline(self, user_id, time_range):
self.ts = esta.TimeSeries.get_time_series(user_id)
self.toPredictTrips = esda.get_entries(
esda.CLEANED_TRIP_KEY, user_id, time_query=time_range)
for cleaned_trip in self.toPredictTrips:
# Create an inferred trip
cleaned_trip_dict = copy.copy(cleaned_trip)["data"]
inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict)

Comment on lines +60 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have basically copy-pasted the other pipeline.py, you need to understand how it works and adapt it to be a separate step.

# Run the algorithms and the ensemble, store results
results = self.compute_and_save_algorithms(inferred_trip)
ensemble = self.compute_and_save_ensemble(inferred_trip, results)

# Put final results into the inferred trip and store it
inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id()
inferred_trip["data"]["inferred_labels"] = ensemble["prediction"]
self.ts.insert(inferred_trip)

if self._last_trip_done is None or self._last_trip_done["data"]["end_ts"] < cleaned_trip["data"]["end_ts"]:
self._last_trip_done = cleaned_trip

# This is where the labels for a given trip are actually predicted.
# Though the only information passed in is the trip object, the trip object can provide the
# user_id and other potentially useful information.
def compute_and_save_algorithms(self, trip):
predictions = []
for algorithm_id, algorithm_fn in primary_algorithms.items():
prediction = algorithm_fn(trip)
lp = ecwl.Labelprediction()
lp.trip_id = trip.get_id()
lp.algorithm_id = algorithm_id
lp.prediction = prediction
lp.start_ts = trip["data"]["start_ts"]
lp.end_ts = trip["data"]["end_ts"]
self.ts.insert_data(self.user_id, "inference/labels", lp)
predictions.append(lp)
return predictions

# Combine all our predictions into a single ensemble prediction.
# As a placeholder, we just take the first prediction.
# TODO: implement a real combination algorithm.
def compute_and_save_ensemble(self, trip, predictions):
il = ecwl.Labelprediction()
il.trip_id = trip.get_id()
il.start_ts = trip["data"]["start_ts"]
il.end_ts = trip["data"]["end_ts"]
(il.algorithm_id, il.prediction) = ensemble(trip, predictions)
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il)
return il
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging
from tokenize import group
from typing import Dict, List, Optional, Tuple

import sklearn.ensemble as ske

import emission.analysis.modelling.trip_model.trip_model as eamuu
import emission.analysis.modelling.trip_model.util as eamtu
import emission.analysis.modelling.trip_model.config as eamtc
import emission.core.wrapper.confirmedtrip as ecwc


class GradientBoostedDecisionTree(eamuu.TripModel):

is_incremental: bool = False # overwritten during __init__

def __init__(self, config=None):
"""
Instantiate a gradient boosted decision tree for all users.

This uses the sklearn implementation of a gradient boosted
decision tree to classify unlabeled replacement modes.

https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingClassifier.html

Replacement modes are considered to be the second-best choice for
a given trip (i.e., what mode would have been chosen if the actual
choice wasn't available). These labels are gathered from the user
along with the chosen mode and trip purpose after the trip takes place.

The model is currently trained on data from all users.
"""
if config is None:
config = eamtc.get_config_value_or_raise('model_parameters.gbdt')
logging.debug(f'GradientBoostedDecisionTree loaded model config from file')
else:
logging.debug(f'GradientBoostedDecisionTree using model config argument')
expected_keys = [
'incremental_evaluation',
'feature_list',
'dependent_var'
]
for k in expected_keys:
if config.get(k) is None:
msg = f"gbdt trip model config missing expected key {k}"
raise KeyError(msg)
self.is_incremental = config['incremental_evaluation']
# use the sklearn implementation of a GBDT
self.gbdt = ske.GradientBoostingClassifier(n_estimators=50)
self.feature_list = config['feature_list']
self.dependent_var = config['dependent_var']

def fit(self, trips: List[ecwc.Confirmedtrip]):
"""train the model by passing data, where each row in the data
corresponds to a label at the matching index of the label input

:param trips: 2D array of features to train from
"""
logging.debug(f'fit called with {len(trips)} trips')
unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips))
if len(unlabeled) > 0:
msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}'
raise Exception(msg)
X_train, y_train = self.extract_features(trips)
self.gbdt.fit(X_train, y_train)
logging.info(f"gradient boosted decision tree model fit to {len(X_train)} rows of trip data")
logging.info(f"training features were {X_train.columns}")

def predict(self, trip: ecwc.Confirmedtrip) -> List[str]:
logging.debug(f"running gradient boosted mode prediction")
X_test, y_pred = self.extract_features(trip, is_prediction=True)
y_pred = self.gbdt.predict(X_test)
if y_pred is None:
logging.debug(f"unable to predict mode for trip {trip}")
return []
else:
logging.debug(f"made predictions {y_pred}")
return y_pred

def to_dict(self) -> Dict:
return self.gbdt.get_params()

def from_dict(self, model: Dict):
self.gbdt.set_params(model)

def extract_features(self, trips: ecwc.Confirmedtrip, is_prediction=False) -> List[float]:
return eamtu.get_replacement_mode_features(self.feature_list, self.dependent_var, trips, is_prediction)
51 changes: 51 additions & 0 deletions emission/analysis/modelling/trip_model/model_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,57 @@ def from_str(cls, str):
msg = f"{str} is not a known ModelStorage, must be one of {names}"
raise KeyError(msg)

def load_model_all_users(model_type: eamum.ModelType, model_storage: ModelStorage) -> Optional[Dict]:
"""load a user label model from a model storage location

:param user_id: the user to request a model for
:param model_type: expected type of model stored
:param model_storage: storage format
:return: the model representation as a Python Dict or None
:raises: TypeError if loaded model has different type than expected type
KeyError if the ModelType is not known
"""
if model_storage == ModelStorage.DOCUMENT_DATABASE:

# retrieve stored model with timestamp that matches/exceeds the most
# recent PipelineState.TRIP_MODEL entry
ms = esma.ModelStorage.get_model_storage(0)
latest_model_entry = ms.get_current_model(key=esda.REPLACE_MODEL_STORE_KEY)

if latest_model_entry is None:
logging.debug(f'no {model_type.name} model found')
return None

write_ts = latest_model_entry['metadata']['write_ts']
logging.debug(f'retrieved latest trip model recorded at timestamp {write_ts}')
logging.debug(latest_model_entry)

# parse str to enum for ModelType
latest_model_type_str = latest_model_entry.get('data', {}).get('model_type')
if latest_model_type_str is None:
raise TypeError('stored model does not have a model type')
latest_model_type = eamum.ModelType.from_str(latest_model_type_str)

# validate and return
if latest_model_entry is None:
return None
elif latest_model_type != model_type:
msg = (
f"loading model has model type '{latest_model_type.name}' "
f"but was expected to have model type {model_type.name}"
)
raise TypeError(msg)
else:
return latest_model_entry['data']['model']

else:
storage_types_str = ",".join(ModelStorage.names())
msg = (
f"unknown model storage type {model_storage}, must be one of "
f"{{{storage_types_str}}}"
)
raise TypeError(msg)

def load_model(user_id, model_type: eamum.ModelType, model_storage: ModelStorage) -> Optional[Dict]:
"""load a user label model from a model storage location

Expand Down
5 changes: 4 additions & 1 deletion emission/analysis/modelling/trip_model/model_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import emission.analysis.modelling.trip_model.trip_model as eamuu
import emission.analysis.modelling.similarity.od_similarity as eamso
import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamug
import emission.analysis.modelling.trip_model.gradient_boosted_decision_tree as eamtg


SIMILARITY_THRESHOLD_METERS=500
Expand All @@ -11,6 +12,7 @@
class ModelType(Enum):
# ENUM_NAME_CAPS = 'SHORTHAND_NAME_CAPS'
GREEDY_SIMILARITY_BINNING = 'GREEDY'
GRADIENT_BOOSTED_DECISION_TREE = 'GBDT'

def build(self, config=None) -> eamuu.TripModel:
"""
Expand All @@ -25,7 +27,8 @@ def build(self, config=None) -> eamuu.TripModel:
"""
# Dict[ModelType, TripModel]
MODELS = {
ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config)
ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config),
ModelType.GRADIENT_BOOSTED_DECISION_TREE: eamtg.GradientBoostedDecisionTree(config)
}
model = MODELS.get(self)
if model is None:
Expand Down
Loading