PyEmits, a python package for easy manipulation in time-series data.
The ultimate goal:
Keep it simple and stupid
Make everything configurable
Uniform API for machine learning and deep learning
Time-series data is very common in real life.
- Engineering
- FSI industry (Financial Services Industry)
- FMCG (Fast Moving Consumer Good)
Data scientist's work consists of:
- forecasting
- prediction/simulation
- data preparation
- cleansing
- anomaly detection
- descriptive data analysis/exploratory data analysis/data profile
- data processing and ETL pipeline scripts
each new business unit shall build the following wheels again and again
- data processing and ETL pipeline
- extraction
- transformation
- cleansing
- feature engineering
- remove outliers
- AI landing for prediction, forecasting
- write it back to database
- ml framework
- multiple model training
- multiple model prediction
- kfold validation
- anomaly detection
- forecasting
- develop deep learning model (regression)
- ensemble modelling
- exploratory data analysis
- descriptive data analysis
- data profile
- data set comparison
data scientist need to write different code to develop their model is there a package integrate all ml lib with single simple api? That's why I create this project.
This project is under active development, free to use (Apache 2.0) I am happy to see anyone can contribute for more advancement on features
db connection and manipulation
Features | Progress | Available at version | Notes |
PyOD integration | 80% | 0.1.2 | model parameters config are not yet finished |
XGBoost integration | 80% | 0.1.2 | model parameters config are not yet finished |
LightGBM integration | 80% | 0.1.2 | model parameters config are not yet finished |
Sklearn model integration | 80% | 0.1.2 | model parameters config are not yet finished |
Keras integration | 100% | 0.1.2 | |
Pytorch_lightning integration | 100% | 0.1.2 | |
MXnet integration | 0% | tbc | |
DB connection | 0% | tbc | |
aggregation | 0% | 0.1.3 | |
cleansing | 0% | 0.1.3 | |
dimensional reduction | 0% | 0.1.3 | |
Kalman filtering | 0% | 0.1.3 or later | |
model evaluation and visualization | 0% | 0.1.3 or later | |
data profile for exploration | 20% | 0.1.3 or later | finished data statistics only |
forecast at scale | 100% | 0.1.2 | see preprocessing.scaling.py |
Version | Features | Notes |
0.1 | initialization of project | |
0.1.1 | RegTrainer/ParallelTrainer/KFoldCV | |
0.1.2 | PyOD/Keras/Pytorch_lightning/scaling/splitting |
pip install pyemits
scikit-learn API style
inherit the design concept of pyecharts, "everything is configurable"
highly flexible configuration items, can easily integrate with existing model
easily integrate to SaaS product for product proof of concept
import numpy as np
from pyemits.core.ml.regression.trainer import RegTrainer, RegressionDataModel
X = np.random.randint(1, 100, size=(1000, 10))
y = np.random.randint(1, 100, size=(1000, 1))
raw_data_model = RegressionDataModel(X, y)
trainer = RegTrainer(['XGBoost'], [None], raw_data_model)
trainer.fit()
import numpy as np
from pyemits.core.ml.regression.trainer import RegTrainer, RegressionDataModel
from pyemits.core.ml.regression.nn import KerasWrapper
X = np.random.randint(1, 100, size=(1000, 10, 10))
y = np.random.randint(1, 100, size=(1000, 4))
keras_lstm_model = KerasWrapper.from_simple_lstm_model((10, 10), 4)
raw_data_model = RegressionDataModel(X, y)
trainer = RegTrainer([keras_lstm_model], [None], raw_data_model)
trainer.fit()
also keep flexibility on customized model
import numpy as np
from pyemits.core.ml.regression.trainer import RegTrainer, RegressionDataModel
from pyemits.core.ml.regression.nn import KerasWrapper
X = np.random.randint(1, 100, size=(1000, 10, 10))
y = np.random.randint(1, 100, size=(1000, 4))
from keras.layers import Dense, Dropout, LSTM
from keras import Sequential
model = Sequential()
model.add(LSTM(128,
activation='softmax',
input_shape=(10, 10),
))
model.add(Dropout(0.1))
model.add(Dense(4))
model.compile(loss='mse', optimizer='adam', metrics=['mse'])
keras_lstm_model = KerasWrapper(model, nickname='LSTM')
raw_data_model = RegressionDataModel(X, y)
trainer = RegTrainer([keras_lstm_model], [None], raw_data_model)
trainer.fit()
or attach it in algo config
import numpy as np
from pyemits.core.ml.regression.trainer import RegTrainer, RegressionDataModel
from pyemits.core.ml.regression.nn import KerasWrapper
from pyemits.common.config_model import KerasSequentialConfig
X = np.random.randint(1, 100, size=(1000, 10, 10))
y = np.random.randint(1, 100, size=(1000, 4))
from keras.layers import Dense, Dropout, LSTM
from keras import Sequential
keras_lstm_model = KerasWrapper(nickname='LSTM')
config = KerasSequentialConfig(layer=[LSTM(128,
activation='softmax',
input_shape=(10, 10),
),
Dropout(0.1),
Dense(4)],
compile=dict(loss='mse', optimizer='adam', metrics=['mse']))
raw_data_model = RegressionDataModel(X, y)
trainer = RegTrainer([keras_lstm_model],
[config],
raw_data_model,
{'fit_config': [dict(epochs=10, batch_size=32)]})
trainer.fit()
PyTorch, MXNet under development you can leave me a message if you want to contribute
import numpy as np
from pyemits.core.ml.regression.trainer import RegressionDataModel, MultiOutputRegTrainer
from pyemits.core.preprocessing.splitting import SlidingWindowSplitter
X = np.random.randint(1, 100, size=(10000, 1))
y = np.random.randint(1, 100, size=(10000, 1))
# when use auto-regressive like MultiOutput, pls set ravel = True
# ravel = False, when you are using LSTM which support multiple dimension
splitter = SlidingWindowSplitter(24, 24, ravel=True)
X, y = splitter.split(X, y)
raw_data_model = RegressionDataModel(X, y)
trainer = MultiOutputRegTrainer(['XGBoost'], [None], raw_data_model)
trainer.fit()
- provide fast training using parallel job
- use RegTrainer as base, but add Parallel running
import numpy as np
from pyemits.core.ml.regression.trainer import RegressionDataModel, ParallelRegTrainer
X = np.random.randint(1, 100, size=(10000, 1))
y = np.random.randint(1, 100, size=(10000, 1))
raw_data_model = RegressionDataModel(X, y)
trainer = ParallelRegTrainer(['XGBoost', 'LightGBM'], [None, None], raw_data_model)
trainer.fit()
or you can use RegTrainer for multiple model, but it is not in Parallel job
import numpy as np
from pyemits.core.ml.regression.trainer import RegressionDataModel, RegTrainer
X = np.random.randint(1, 100, size=(10000, 1))
y = np.random.randint(1, 100, size=(10000, 1))
raw_data_model = RegressionDataModel(X, y)
trainer = RegTrainer(['XGBoost', 'LightGBM'], [None, None], raw_data_model)
trainer.fit()
- KFoldConfig is global config, will apply to all
import numpy as np
from pyemits.core.ml.regression.trainer import RegressionDataModel, KFoldCVTrainer
from pyemits.common.config_model import KFoldConfig
X = np.random.randint(1, 100, size=(10000, 1))
y = np.random.randint(1, 100, size=(10000, 1))
raw_data_model = RegressionDataModel(X, y)
trainer = KFoldCVTrainer(['XGBoost', 'LightGBM'], [None, None], raw_data_model,
{'kfold_config': KFoldConfig(n_splits=10)})
trainer.fit()
import numpy as np
from pyemits.core.ml.regression.trainer import RegressionDataModel, RegTrainer
from pyemits.core.ml.regression.predictor import RegPredictor
X = np.random.randint(1, 100, size=(10000, 1))
y = np.random.randint(1, 100, size=(10000, 1))
raw_data_model = RegressionDataModel(X, y)
trainer = RegTrainer(['XGBoost', 'LightGBM'], [None, None], raw_data_model)
trainer.fit()
predictor = RegPredictor(trainer.clf_models, 'RegTrainer')
predictor.predict(RegressionDataModel(X))
- see examples: forecast at scale.ipynb
from pyemits.common.data_model import RegressionDataModel
import numpy as np
X = np.random.randint(1, 100, size=(1000, 10, 10))
y = np.random.randint(1, 100, size=(1000, 1))
data_model = RegressionDataModel(X, y)
directly write an attribute to the data model
data_model._update_attributes('X_shape', (1000, 10, 10))
data_model.X_shape
>> > (1000, 10, 10)
write something to the meta data
data_model.add_meta_data('dimension', (1000, 10, 10))
data_model.meta_data
>> > {'dimension': (1000, 10, 10)}
- see: anomaly detection
- root cause analyzer (under development)
- Kalman filter (under development)
from pyemits.core.ml.anomaly_detection.predictor import AnomalyPredictor
from pyemits.core.ml.anomaly_detection.trainer import AnomalyTrainer, PyodWrapper
from pyemits.common.data_model import AnomalyDataModel
from pyemits.common.config_model import PyodIforestConfig
from pyod.models.iforest import IForest
from pyod.utils import generate_data
contamination = 0.1 # percentage of outliers
n_train = 1000 # number of training points
n_test = 200 # number of testing points
X_train, y_train, X_test, y_test = generate_data(
n_train=n_train, n_test=n_test, contamination=contamination)
# highly flexible model config, accept str, PyodWrapper with/without initialized model
# either one
trainer = AnomalyTrainer(['IForest', PyodWrapper(IForest()), PyodWrapper(IForest), 'IForest', 'IForest', 'IForest'],
None, AnomalyDataModel(X_train))
trainer = AnomalyTrainer([PyodWrapper(IForest(contamination=0.05)), PyodWrapper(IForest)],
[None, PyodIforestConfig(contamination=0.05)], AnomalyDataModel(X_train))
trainer.fit()
# option 1
predictor = AnomalyPredictor(trainer.clf_models)
# option 2
predictor = AnomalyPredictor(trainer.clf_models,
other_config={'standard_scaler': predictor.misc_container['standard_scaler']})
# option 3
predictor = AnomalyPredictor(trainer.clf_models,
other_config={'standard_scaler': predictor.misc_container['standard_scaler'],
'combination_config': {'n_buckets': 5}}, combination_method='moa')
predictor.predict(AnomalyDataModel(X_test))
it features in the following:
- easy configuration
- register steps, tasks in data processing pipeline
- log data result in each tasks, each steps
- record the flow of pipeline, from steps to work (from marco to micro)
you can embed other function features in the task, but parameter: "data" is required to be passed in
e.g. add email notification, add log, upload to database etc...
from pyemits.core.preprocessing.pipeline import DataNode, NumpyDataNode, PandasDataFrameDataNode, PandasSeriesDataNode,
Pipeline, Step, Task
import pandas as pd
import numpy as np
df = pd.DataFrame(np.random.random(size=(20, 20)))
dn = PandasDataFrameDataNode.from_pandas(df)
def sum_each_col(data, a=1, b=2):
return data.sum()
def sum_series(data):
return np.array([data.sum()])
task registration and arguments registration
task_a = Task(sum_each_col)
task_a.register_args(a=10, b=10)
task_b = Task(sum_series)
pipeline register step and execute
pipeline = Pipeline()
step_a = Step('step_a', [task_a], '')
step_b = Step('step_b', [task_b], '')
pipeline.register_step(step_a)
pipeline.register_step(step_b)
pipeline.execute(dn)
know the steps and its tasks from start to end
pipeline.get_step_task_mapping()
>> > {0: ('test', ['sum_each_col']), 1: ('test1', ['sum_series'])}
know the snapshot result in each steps, each tasks, friendly to data scientist for debugging
pipeline.get_pipeline_snapshot_res(step_id=1,tasks_id=0)
> > > array([197.70351007])
- see module: evaluation
- backtesting
- model evaluation
- blending
- stacking
- voting
- by combo package
- moa
- aom
- average
- median
- maximization
- refer to the following package
- continuous evaluation
- aggregation
- dimensional reduction
- data profile (intensive data overview)
....
the following libraries gave me some idea/insight
- greykit
- changepoint detection
- model summary
- seaonality
- pytorch-forecasting
- darts
- pyaf
- orbit
- kats/prophets by facebook
- sktime
- gluon ts
- tslearn
- pyts
- luminaries
- tods
- autots
- pyodds
- scikit-hts