Skip to content

Commit

Permalink
chore: Bumping fastapi + starlette (feast-dev#3938)
Browse files Browse the repository at this point in the history
  • Loading branch information
bushwhackr authored Feb 10, 2024
1 parent d3a2a45 commit 4e450ad
Show file tree
Hide file tree
Showing 42 changed files with 147 additions and 178 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ format-python:
cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests

lint-python:
cd ${ROOT_DIR}/sdk/python; python -m mypy
cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ --follow-imports=skip feast
cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only
cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/
cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests
Expand Down
19 changes: 9 additions & 10 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import enum
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -485,12 +484,12 @@ def to_proto(self) -> DataSourceProto:
return data_source_proto

def validate(self, config: RepoConfig):
pass
raise NotImplementedError

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
pass
raise NotImplementedError

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
Expand Down Expand Up @@ -534,12 +533,12 @@ def __init__(
self.schema = schema

def validate(self, config: RepoConfig):
pass
raise NotImplementedError

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
pass
raise NotImplementedError

def __eq__(self, other):
if not isinstance(other, RequestSource):
Expand Down Expand Up @@ -610,12 +609,12 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
@typechecked
class KinesisSource(DataSource):
def validate(self, config: RepoConfig):
pass
raise NotImplementedError

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
pass
raise NotImplementedError

@staticmethod
def from_proto(data_source: DataSourceProto):
Expand All @@ -639,7 +638,7 @@ def from_proto(data_source: DataSourceProto):

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
pass
raise NotImplementedError

def get_table_query_string(self) -> str:
raise NotImplementedError
Expand Down Expand Up @@ -772,12 +771,12 @@ def __hash__(self):
return super().__hash__()

def validate(self, config: RepoConfig):
pass
raise NotImplementedError

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
pass
raise NotImplementedError

@staticmethod
def from_proto(data_source: DataSourceProto):
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(
*,
name: str,
features: List[Union[FeatureView, OnDemandFeatureView]],
tags: Dict[str, str] = None,
tags: Optional[Dict[str, str]] = None,
description: str = "",
owner: str = "",
logging_config: Optional[LoggingConfig] = None,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(
name: str,
source: DataSource,
schema: Optional[List[Field]] = None,
entities: List[Entity] = None,
entities: Optional[List[Entity]] = None,
ttl: Optional[timedelta] = timedelta(days=0),
online: bool = True,
description: str = "",
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/importer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import importlib
from typing import Optional

from feast.errors import (
FeastClassImportError,
Expand All @@ -7,7 +8,7 @@
)


def import_class(module_name: str, class_name: str, class_type: str = None):
def import_class(module_name: str, class_name: str, class_type: Optional[str] = None):
"""
Dynamically loads and returns a class from a module.
Expand Down
11 changes: 9 additions & 2 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StreamingQuery

from feast.data_format import AvroFormat, JsonFormat
from feast.data_source import KafkaSource, PushMode
Expand Down Expand Up @@ -63,7 +64,13 @@ def __init__(
self.join_keys = [fs.get_entity(entity).join_key for entity in sfv.entities]
super().__init__(fs=fs, sfv=sfv, data_source=sfv.stream_source)

def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
# Type hinting for data_source type.
# data_source type has been checked to be an instance of KafkaSource.
self.data_source: KafkaSource = self.data_source # type: ignore

def ingest_stream_feature_view(
self, to: PushMode = PushMode.ONLINE
) -> StreamingQuery:
ingested_stream_df = self._ingest_stream_data()
transformed_df = self._construct_transformation_plan(ingested_stream_df)
online_store_query = self._write_stream_data(transformed_df, to)
Expand Down Expand Up @@ -122,7 +129,7 @@ def _ingest_stream_data(self) -> StreamTable:
def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
return self.sfv.udf.__call__(df) if self.sfv.udf else df

def _write_stream_data(self, df: StreamTable, to: PushMode):
def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery:
# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
def batch_write(row: DataFrame, batch_id: int):
rows: pd.DataFrame = row.toPandas()
Expand Down
11 changes: 6 additions & 5 deletions sdk/python/feast/infra/contrib/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import TYPE_CHECKING, Optional

from pyspark.sql import DataFrame
from typing_extensions import TypeAlias

from feast.data_source import DataSource, PushMode
from feast.importer import import_class
Expand All @@ -17,7 +18,7 @@
}

# TODO: support more types other than just Spark.
StreamTable = DataFrame
StreamTable: TypeAlias = DataFrame


class ProcessorConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -54,28 +55,28 @@ def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
Ingests data from the stream source attached to the stream feature view; transforms the data
and then persists it to the online store and/or offline store, depending on the 'to' parameter.
"""
pass
raise NotImplementedError

def _ingest_stream_data(self) -> StreamTable:
"""
Ingests data into a StreamTable.
"""
pass
raise NotImplementedError

def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
"""
Applies transformations on top of StreamTable object. Since stream engines use lazy
evaluation, the StreamTable will not be materialized until it is actually evaluated.
For example: df.collect() in spark or tbl.execute() in Flink.
"""
pass
raise NotImplementedError

def _write_stream_data(self, table: StreamTable, to: PushMode) -> None:
"""
Launches a job to persist stream data to the online store and/or offline store, depending
on the 'to' parameter, and returns a handle for the job.
"""
pass
raise NotImplementedError


def get_stream_processor_object(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,9 @@ class SavedDatasetAthenaStorage(SavedDatasetStorage):
def __init__(
self,
table_ref: str,
query: str = None,
database: str = None,
data_source: str = None,
query: Optional[str] = None,
database: Optional[str] = None,
data_source: Optional[str] = None,
):
self.athena_options = AthenaOptions(
table=table_ref, query=query, database=database, data_source=data_source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def create_data_source(
suffix: Optional[str] = None,
timestamp_field="ts",
created_timestamp_column="created_ts",
field_mapping: Dict[str, str] = None,
field_mapping: Optional[Dict[str, str]] = None,
) -> DataSource:

table_name = destination_name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Dict, List, Optional

import pandas as pd
import pytest
Expand Down Expand Up @@ -66,7 +66,7 @@ def create_data_source(
destination_name: str,
timestamp_field="ts",
created_timestamp_column="created_ts",
field_mapping: Dict[str, str] = None,
field_mapping: Optional[Dict[str, str]] = None,
**kwargs,
) -> DataSource:
# Make sure the field mapping is correct and convert the datetime datasources.
Expand Down Expand Up @@ -99,10 +99,10 @@ def create_data_source(
)

def create_saved_dataset_destination(self) -> SavedDatasetStorage:
pass
raise NotImplementedError

def get_prefixed_table_name(self, destination_name: str) -> str:
return f"{self.project_name}_{destination_name}"

def teardown(self):
pass
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def create_data_source(
suffix: Optional[str] = None,
timestamp_field="ts",
created_timestamp_column="created_ts",
field_mapping: Dict[str, str] = None,
field_mapping: Optional[Dict[str, str]] = None,
) -> DataSource:
destination_name = self.get_prefixed_table_name(destination_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import shutil
import tempfile
import uuid
from typing import Dict, List
from typing import Dict, List, Optional

import pandas as pd
from pyspark import SparkConf
Expand Down Expand Up @@ -70,7 +70,7 @@ def create_data_source(
destination_name: str,
timestamp_field="ts",
created_timestamp_column="created_ts",
field_mapping: Dict[str, str] = None,
field_mapping: Optional[Dict[str, str]] = None,
**kwargs,
) -> DataSource:
if timestamp_field in df:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def create_filesystem_and_path(
return None, path

def get_table_query_string(self) -> str:
pass
raise NotImplementedError


class FileOptions:
Expand Down
Loading

0 comments on commit 4e450ad

Please sign in to comment.