-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathdefinitions.py
104 lines (92 loc) · 3.37 KB
/
definitions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from pathlib import Path
from datetime import timedelta
import pandas as pd
from feast import (
Entity,
FeatureService,
FeatureView,
Field,
FileSource,
PushSource,
RequestSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64
DATA_PATH = Path("dataset/driver_stats.parquet")
__all__ = ["driver",
"driver_stats_source",
"driver_stats_feature_view",
"transformed_stats",
"driver_stats_push_source",
"driver_activity",
"driver_stats_fresh_feature_view"]
# Define an entity for the driver.
# You can think of an entity as a primary key used to fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])
# Read data from parquet files. Parquet is convenient for development mode.
# For production, you can use your favorite DWH, such as BigQuery.
# See Feast documentation for more info.
driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path=str(DATA_PATH),
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
# The dataset includes a `driver_id`, a timestamp and other feature columns.
# Here we define a Feature View to serve this data to our model online.
driver_stats_feature_view = FeatureView(
# The unique name of this feature view.
# Two feature views in a single project cannot have the same name.
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
# The list of features defined below define the db schema.
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64,
description="Average daily trips"),
],
online=True,
source=driver_stats_source,
tags={"team": "driver_performance"},
)
# Defines a way to push data (available offline, online or both) into Feast.
driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_stats_source,
)
# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features.
@on_demand_feature_view(
sources=[driver_stats_feature_view],
schema=[
Field(name="conv_plus_trips", dtype=Float64),
Field(name="acc_plus_trips", dtype=Float64)
],
)
def transformed_stats(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_plus_trips"] = inputs["conv_rate"] * inputs["avg_daily_trips"]
df["acc_plus_trips"] = inputs["acc_rate"] * inputs["avg_daily_trips"]
return df
driver_activity = FeatureService(
name="driver_activity", features=[driver_stats_feature_view,
transformed_stats]
)
# Defines a slightly modified version of the feature view from above, where the
# source has been changed to the push source.
# This allows fresh features to be directly pushed to the online store.
driver_stats_fresh_feature_view = FeatureView(
name="driver_hourly_stats_fresh",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_push_source, # Changed from above
tags={"team": "driver_performance"},
)