-
Notifications
You must be signed in to change notification settings - Fork 1
/
pipeline.py
68 lines (49 loc) · 2.83 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import kfp
from kfp import dsl
from kfp.components import InputPath
@dsl.pipeline(name='advanced-crop-classification-pipeline', description='Classify crops extracted from RPG.')
def crop_classification_pipeline(json_img: str,shp:str,cross_validation: bool = False,iterations: int = 2,cv: int = 2):
def compare_models(xgboost_csv : InputPath(str), lstm_csv : InputPath(str)) -> str:
import pandas as pd
xgb_df = pd.read_csv(xgboost_csv)
xgb_acc = xgb_df['precision'][2]
lstm_df = pd.read_csv(lstm_csv)
lstm_acc = lstm_df['precision'][2]
if xgb_acc>=lstm_acc:
print ("XGBoost model will be used for serving")
return "XGB"
else:
print ("LSTM will be used for serving")
return "LSTM"
# create components from yaml manifest
download_img = kfp.components.load_component_from_file('process_img/process_img.yaml')
temporal_stats = kfp.components.load_component_from_file('temporal_stats/temporal_stats.yaml')
preprocess = kfp.components.load_component_from_file('preprocess_data/preprocess_data.yaml')
xgboost_classif = kfp.components.load_component_from_file('extreme_gradient_boost/extreme_gradient_boost.yaml')
lstm_classif = kfp.components.load_component_from_file('lstm/lstm.yaml')
compare = kfp.components.create_component_from_func(
func=compare_models,
base_image='python:3.7',
#output_component_file='compare_models.yaml',
packages_to_install=['pandas==0.24'],
)
# Run first task
download_task = download_img(json_img,shp)
#download_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# create temporal stats from results of the previous task
temporal_task = temporal_stats(download_task.output)
#temporal_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# preprocess data
preprocess_task = preprocess(temporal_task.output)
#preprocess_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# classification with XGBoost
xgboost_task = xgboost_classif(preprocess_task.output,cross_validation,iterations,cv)
#xgboost_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# classification with LSTM
lstm_task = lstm_classif(preprocess_task.output,cross_validation,iterations,cv)
#lstm_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
# compare models
compare_task = compare(xgboost_task.outputs['Report'],lstm_task.outputs['Report'])
compare_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
if __name__ == '__main__':
kfp.compiler.Compiler().compile(crop_classification_pipeline, 'advanced-crop-classification-pipeline.yaml')