-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkflow.py
45 lines (36 loc) · 1.29 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from kfp import dsl
from mlrun.platforms import auto_mount
import mlrun
funcs = {}
# init functions is used to configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
for f in functions.values():
f.apply(auto_mount())
@dsl.pipeline(
name="Kando MLRun demo",
description="Kando MLRun demo"
)
def kfpipeline():
files = [
{"url": "https://s3.wasabisys.com/iguazio/data/Taxi/taxi_zones.csv"},
{"url": "https://s3.wasabisys.com/iguazio/data/Taxi/yellow_tripdata_2019-01_subset.csv"}
]
parallel_tasks = dsl.ParallelFor(files)
with parallel_tasks as task:
fetch = funcs['test'].as_step(
name="fetch_data",
handler='fetch_data',
inputs={'dataset_url': task.url},
outputs=['test-dataset'])
# Join and transform the data sets
transform = funcs["test"].as_step(
name="analyze_data",
handler='analyze_data',
inputs={"dataset": 'test-ds'},
outputs=['result']).after(parallel_tasks)
# # Join and transform the data sets
# transform = funcs["test"].as_step(
# name="analyze_data",
# handler='analyze_data',
# inputs={"dataset": fetch.outputs['test-dataset']},
# outputs=['result'])