From 373e624abb8779b8a60d30aa08d25414d987bb1b Mon Sep 17 00:00:00 2001 From: Tornike Gurgenidze Date: Wed, 7 Feb 2024 03:26:32 +0400 Subject: [PATCH 1/8] feat: Add gRPC Registry Server (#3924) --- protos/feast/registry/RegistryServer.proto | 230 ++++++++++++++++++ sdk/python/feast/cli.py | 21 +- sdk/python/feast/constants.py | 3 + sdk/python/feast/feature_store.py | 7 + .../feast/infra/registry/base_registry.py | 4 +- sdk/python/feast/infra/registry/registry.py | 8 +- sdk/python/feast/registry_server.py | 202 +++++++++++++++ sdk/python/tests/unit/test_registry_server.py | 60 +++++ setup.py | 2 +- 9 files changed, 532 insertions(+), 5 deletions(-) create mode 100644 protos/feast/registry/RegistryServer.proto create mode 100644 sdk/python/feast/registry_server.py create mode 100644 sdk/python/tests/unit/test_registry_server.py diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto new file mode 100644 index 0000000000..3e7773e89a --- /dev/null +++ b/protos/feast/registry/RegistryServer.proto @@ -0,0 +1,230 @@ +syntax = "proto3"; + +package feast.registry; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/empty.proto"; +import "feast/core/Registry.proto"; +import "feast/core/Entity.proto"; +import "feast/core/DataSource.proto"; +import "feast/core/FeatureView.proto"; +import "feast/core/RequestFeatureView.proto"; +import "feast/core/StreamFeatureView.proto"; +import "feast/core/OnDemandFeatureView.proto"; +import "feast/core/FeatureService.proto"; +import "feast/core/SavedDataset.proto"; +import "feast/core/ValidationProfile.proto"; +import "feast/core/InfraObject.proto"; + +service RegistryServer{ + // Entity RPCs + rpc GetEntity (GetEntityRequest) returns (feast.core.Entity) {} + rpc ListEntities (ListEntitiesRequest) returns (ListEntitiesResponse) {} + + // DataSource RPCs + rpc GetDataSource (GetDataSourceRequest) returns (feast.core.DataSource) {} + rpc ListDataSources (ListDataSourcesRequest) returns (ListDataSourcesResponse) {} + + // FeatureView RPCs + rpc GetFeatureView (GetFeatureViewRequest) returns (feast.core.FeatureView) {} + rpc ListFeatureViews (ListFeatureViewsRequest) returns (ListFeatureViewsResponse) {} + + // RequestFeatureView RPCs + rpc GetRequestFeatureView (GetRequestFeatureViewRequest) returns (feast.core.RequestFeatureView) {} + rpc ListRequestFeatureViews (ListRequestFeatureViewsRequest) returns (ListRequestFeatureViewsResponse) {} + + // StreamFeatureView RPCs + rpc GetStreamFeatureView (GetStreamFeatureViewRequest) returns (feast.core.StreamFeatureView) {} + rpc ListStreamFeatureViews (ListStreamFeatureViewsRequest) returns (ListStreamFeatureViewsResponse) {} + + // OnDemandFeatureView RPCs + rpc GetOnDemandFeatureView (GetOnDemandFeatureViewRequest) returns (feast.core.OnDemandFeatureView) {} + rpc ListOnDemandFeatureViews (ListOnDemandFeatureViewsRequest) returns (ListOnDemandFeatureViewsResponse) {} + + // FeatureService RPCs + rpc GetFeatureService (GetFeatureServiceRequest) returns (feast.core.FeatureService) {} + rpc ListFeatureServices (ListFeatureServicesRequest) returns (ListFeatureServicesResponse) {} + + // SavedDataset RPCs + rpc GetSavedDataset (GetSavedDatasetRequest) returns (feast.core.SavedDataset) {} + rpc ListSavedDatasets (ListSavedDatasetsRequest) returns (ListSavedDatasetsResponse) {} + + // ValidationReference RPCs + rpc GetValidationReference (GetValidationReferenceRequest) returns (feast.core.ValidationReference) {} + rpc ListValidationReferences (ListValidationReferencesRequest) returns (ListValidationReferencesResponse) {} + + rpc ListProjectMetadata (ListProjectMetadataRequest) returns (ListProjectMetadataResponse) {} + rpc GetInfra (GetInfraRequest) returns (feast.core.Infra) {} + rpc Refresh (RefreshRequest) returns (google.protobuf.Empty) {} + rpc Proto (google.protobuf.Empty) returns (feast.core.Registry) {} + +} + +message RefreshRequest { + string project = 1; +} + +message GetInfraRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListProjectMetadataRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListProjectMetadataResponse { + repeated feast.core.ProjectMetadata project_metadata = 1; +} + +message GetEntityRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListEntitiesRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListEntitiesResponse { + repeated feast.core.Entity entities = 1; +} + +// DataSources + +message GetDataSourceRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListDataSourcesRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListDataSourcesResponse { + repeated feast.core.DataSource data_sources = 1; +} + +// FeatureViews + +message GetFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListFeatureViewsResponse { + repeated feast.core.FeatureView feature_views = 1; +} + +// RequestFeatureView + +message GetRequestFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListRequestFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListRequestFeatureViewsResponse { + repeated feast.core.RequestFeatureView request_feature_views = 1; +} + +// StreamFeatureView + +message GetStreamFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListStreamFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListStreamFeatureViewsResponse { + repeated feast.core.StreamFeatureView stream_feature_views = 1; +} + +// OnDemandFeatureView + +message GetOnDemandFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListOnDemandFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListOnDemandFeatureViewsResponse { + repeated feast.core.OnDemandFeatureView on_demand_feature_views = 1; +} + +// FeatureServices + +message GetFeatureServiceRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListFeatureServicesRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListFeatureServicesResponse { + repeated feast.core.FeatureService feature_services = 1; +} + +// SavedDataset + +message GetSavedDatasetRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListSavedDatasetsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListSavedDatasetsResponse { + repeated feast.core.SavedDataset saved_datasets = 1; +} + +// ValidationReference + +message GetValidationReferenceRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListValidationReferencesRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListValidationReferencesResponse { + repeated feast.core.ValidationReference validation_references = 1; +} diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 2eb2c27bcb..985c44b821 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -25,7 +25,10 @@ from pygments import formatters, highlight, lexers from feast import utils -from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT +from feast.constants import ( + DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT, + DEFAULT_REGISTRY_SERVER_PORT, +) from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError from feast.feature_view import FeatureView from feast.infra.contrib.grpc_server import get_grpc_server @@ -753,6 +756,22 @@ def serve_transformations_command(ctx: click.Context, port: int): store.serve_transformations(port) +@cli.command("serve_registry") +@click.option( + "--port", + "-p", + type=click.INT, + default=DEFAULT_REGISTRY_SERVER_PORT, + help="Specify a port for the server", +) +@click.pass_context +def serve_registry_command(ctx: click.Context, port: int): + """Start a registry server locally on a given port.""" + store = create_feature_store(ctx) + + store.serve_registry(port) + + @cli.command("validate") @click.option( "--feature-service", diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 574d79f416..c022ecba55 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -44,5 +44,8 @@ # Default FTS port DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT = 6569 +# Default registry server port +DEFAULT_REGISTRY_SERVER_PORT = 6570 + # Environment variable for feature server docker image tag DOCKER_IMAGE_TAG_ENV_NAME: str = "FEAST_SERVER_DOCKER_IMAGE_TAG" diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index d3f98f8032..4a53672b2e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2278,6 +2278,13 @@ def serve_ui( root_path=root_path, ) + @log_exceptions_and_usage + def serve_registry(self, port: int) -> None: + """Start registry server locally on a given port.""" + from feast import registry_server + + registry_server.start_server(self, port) + @log_exceptions_and_usage def serve_transformations(self, port: int) -> None: """Start the feature transformation server locally on a given port.""" diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 14b098bb12..8928a5800d 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -329,7 +329,9 @@ def list_feature_views( # request feature view operations @abstractmethod - def get_request_feature_view(self, name: str, project: str) -> RequestFeatureView: + def get_request_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> RequestFeatureView: """ Retrieves a request feature view. diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 1a72cbb4a5..fc7be75e0d 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -528,8 +528,12 @@ def list_feature_views( ) return proto_registry_utils.list_feature_views(registry_proto, project) - def get_request_feature_view(self, name: str, project: str): - registry_proto = self._get_registry_proto(project=project, allow_cache=False) + def get_request_feature_view( + self, name: str, project: str, allow_cache: bool = False + ): + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) return proto_registry_utils.get_request_feature_view( registry_proto, name, project ) diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py new file mode 100644 index 0000000000..221715480e --- /dev/null +++ b/sdk/python/feast/registry_server.py @@ -0,0 +1,202 @@ +from concurrent import futures + +import grpc +from google.protobuf.empty_pb2 import Empty + +from feast import FeatureStore +from feast.protos.feast.registry import RegistryServer_pb2, RegistryServer_pb2_grpc + + +class RegistryServer(RegistryServer_pb2_grpc.RegistryServerServicer): + def __init__(self, store: FeatureStore) -> None: + super().__init__() + self.proxied_registry = store.registry + + def GetEntity(self, request: RegistryServer_pb2.GetEntityRequest, context): + return self.proxied_registry.get_entity( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListEntities(self, request, context): + return RegistryServer_pb2.ListEntitiesResponse( + entities=[ + entity.to_proto() + for entity in self.proxied_registry.list_entities( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetDataSource(self, request: RegistryServer_pb2.GetDataSourceRequest, context): + return self.proxied_registry.get_data_source( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListDataSources(self, request, context): + return RegistryServer_pb2.ListDataSourcesResponse( + data_sources=[ + data_source.to_proto() + for data_source in self.proxied_registry.list_data_sources( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetFeatureView( + self, request: RegistryServer_pb2.GetFeatureViewRequest, context + ): + return self.proxied_registry.get_feature_view( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListFeatureViews(self, request, context): + return RegistryServer_pb2.ListFeatureViewsResponse( + feature_views=[ + feature_view.to_proto() + for feature_view in self.proxied_registry.list_feature_views( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetRequestFeatureView( + self, request: RegistryServer_pb2.GetRequestFeatureViewRequest, context + ): + return self.proxied_registry.get_request_feature_view( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListRequestFeatureViews(self, request, context): + return RegistryServer_pb2.ListRequestFeatureViewsResponse( + request_feature_views=[ + request_feature_view.to_proto() + for request_feature_view in self.proxied_registry.list_request_feature_views( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetStreamFeatureView( + self, request: RegistryServer_pb2.GetStreamFeatureViewRequest, context + ): + return self.proxied_registry.get_stream_feature_view( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListStreamFeatureViews(self, request, context): + return RegistryServer_pb2.ListStreamFeatureViewsResponse( + stream_feature_views=[ + stream_feature_view.to_proto() + for stream_feature_view in self.proxied_registry.list_stream_feature_views( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetOnDemandFeatureView( + self, request: RegistryServer_pb2.GetOnDemandFeatureViewRequest, context + ): + return self.proxied_registry.get_on_demand_feature_view( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListOnDemandFeatureViews(self, request, context): + return RegistryServer_pb2.ListOnDemandFeatureViewsResponse( + on_demand_feature_views=[ + on_demand_feature_view.to_proto() + for on_demand_feature_view in self.proxied_registry.list_on_demand_feature_views( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetFeatureService( + self, request: RegistryServer_pb2.GetFeatureServiceRequest, context + ): + return self.proxied_registry.get_feature_service( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListFeatureServices( + self, request: RegistryServer_pb2.ListFeatureServicesRequest, context + ): + return RegistryServer_pb2.ListFeatureServicesResponse( + feature_services=[ + feature_service.to_proto() + for feature_service in self.proxied_registry.list_feature_services( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetSavedDataset( + self, request: RegistryServer_pb2.GetSavedDatasetRequest, context + ): + return self.proxied_registry.get_saved_dataset( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListSavedDatasets( + self, request: RegistryServer_pb2.ListSavedDatasetsRequest, context + ): + return RegistryServer_pb2.ListSavedDatasetsResponse( + saved_datasets=[ + saved_dataset.to_proto() + for saved_dataset in self.proxied_registry.list_saved_datasets( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetValidationReference( + self, request: RegistryServer_pb2.GetValidationReferenceRequest, context + ): + return self.proxied_registry.get_validation_reference( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListValidationReferences( + self, request: RegistryServer_pb2.ListValidationReferencesRequest, context + ): + return RegistryServer_pb2.ListValidationReferencesResponse( + validation_references=[ + validation_reference.to_proto() + for validation_reference in self.proxied_registry.list_validation_references( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def ListProjectMetadata( + self, request: RegistryServer_pb2.ListProjectMetadataRequest, context + ): + return RegistryServer_pb2.ListProjectMetadataResponse( + project_metadata=[ + project_metadata.to_proto() + for project_metadata in self.proxied_registry.list_project_metadata( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetInfra(self, request: RegistryServer_pb2.GetInfraRequest, context): + return self.proxied_registry.get_infra( + project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def Refresh(self, request, context): + self.proxied_registry.refresh(request.project) + return Empty() + + def Proto(self, request, context): + return self.proxied_registry.proto() + + +def start_server(store: FeatureStore, port: int): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + RegistryServer_pb2_grpc.add_RegistryServerServicer_to_server( + RegistryServer(store), server + ) + server.add_insecure_port(f"[::]:{port}") + server.start() + server.wait_for_termination() diff --git a/sdk/python/tests/unit/test_registry_server.py b/sdk/python/tests/unit/test_registry_server.py new file mode 100644 index 0000000000..734bbfe19b --- /dev/null +++ b/sdk/python/tests/unit/test_registry_server.py @@ -0,0 +1,60 @@ +import assertpy +import grpc_testing +import pytest +from google.protobuf.empty_pb2 import Empty + +from feast import Entity, FeatureStore +from feast.protos.feast.registry import RegistryServer_pb2 +from feast.registry_server import RegistryServer + + +def call_registry_server(server, method: str, request=None): + service = RegistryServer_pb2.DESCRIPTOR.services_by_name["RegistryServer"] + rpc = server.invoke_unary_unary( + service.methods_by_name[method], (), request if request else Empty(), None + ) + + return rpc.termination() + + +@pytest.fixture +def registry_server(environment): + store: FeatureStore = environment.feature_store + + servicer = RegistryServer(store=store) + + return grpc_testing.server_from_dictionary( + {RegistryServer_pb2.DESCRIPTOR.services_by_name["RegistryServer"]: servicer}, + grpc_testing.strict_real_time(), + ) + + +def test_registry_server_get_entity(environment, registry_server): + store: FeatureStore = environment.feature_store + entity = Entity(name="driver", join_keys=["driver_id"]) + store.apply(entity) + + expected = store.get_entity(entity.name) + + get_entity_request = RegistryServer_pb2.GetEntityRequest( + name=entity.name, project=store.project, allow_cache=False + ) + response, trailing_metadata, code, details = call_registry_server( + registry_server, "GetEntity", get_entity_request + ) + response_entity = Entity.from_proto(response) + + assertpy.assert_that(response_entity).is_equal_to(expected) + + +def test_registry_server_proto(environment, registry_server): + store: FeatureStore = environment.feature_store + entity = Entity(name="driver", join_keys=["driver_id"]) + store.apply(entity) + + expected = store.registry.proto() + response, trailing_metadata, code, details = call_registry_server( + registry_server, "Proto" + ) + + assertpy.assert_that(response).is_equal_to(expected) diff --git a/setup.py b/setup.py index 4905a7697d..29b8dc5a68 100644 --- a/setup.py +++ b/setup.py @@ -234,7 +234,7 @@ else: use_scm_version = None -PROTO_SUBDIRS = ["core", "serving", "types", "storage"] +PROTO_SUBDIRS = ["core", "registry", "serving", "types", "storage"] PYTHON_CODE_PREFIX = "sdk/python" From b4aed657bf830502344ede5c98841d0d77ebf4ef Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 7 Feb 2024 13:50:12 +0700 Subject: [PATCH 2/8] chore: Set upper bound for moto package (#3937) chore: set upper bound for moto package Signed-off-by: Hai Nguyen --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 29b8dc5a68..4fb80871b2 100644 --- a/setup.py +++ b/setup.py @@ -155,7 +155,7 @@ "grpcio-testing>=1.56.2,<2", "minio==7.1.0", "mock==2.0.0", - "moto", + "moto<5", "mypy>=0.981,<0.990", "avro==1.10.0", "fsspec<2023.10.0", From 49d2988a562c66b3949cf2368fe44ed41e767eab Mon Sep 17 00:00:00 2001 From: Dongwoo Park <40623259+Woo-Dong@users.noreply.github.com> Date: Thu, 8 Feb 2024 14:39:29 +0900 Subject: [PATCH 3/8] fix: Trino as an OfflineStore Access Denied when BasicAuthenticaion (#3898) --- .../infra/offline_stores/contrib/trino_offline_store/trino.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index f662cda913..d4cfdb6632 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -40,7 +40,7 @@ class BasicAuthModel(FeastConfigBaseModel): username: StrictStr - password: SecretStr + password: StrictStr class KerberosAuthModel(FeastConfigBaseModel): From c16e5afcc5273b0c26b79dd4e233a28618ac490a Mon Sep 17 00:00:00 2001 From: TS <67011812+tsisodia10@users.noreply.github.com> Date: Thu, 8 Feb 2024 09:54:41 -0500 Subject: [PATCH 4/8] fix: Typo Correction in Feast UI Readme (#3939) Modify the README to point to correct project list Signed-off-by: Twinkll Sisodia --- ui/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/README.md b/ui/README.md index e91a8741ec..a9ce5d3ec7 100644 --- a/ui/README.md +++ b/ui/README.md @@ -46,7 +46,7 @@ ReactDOM.render( ); ``` -When you start the React app, it will look for `project-list.json` to find a list of your projects. The JSON should looks something like this. +When you start the React app, it will look for `projects-list.json` to find a list of your projects. The JSON should looks something like this. ```json { From bdce99d8e4581b7c59558b91840f019a16194b41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20T=C3=B3th?= Date: Fri, 9 Feb 2024 23:44:47 +0100 Subject: [PATCH 5/8] docs: Add ScyllaDB as online store alternative (fixed DCO) (#3944) --- docs/SUMMARY.md | 1 + docs/reference/online-stores/README.md | 4 +- docs/reference/online-stores/scylladb.md | 94 ++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 docs/reference/online-stores/scylladb.md diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index c80ded2adf..8affea898e 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -99,6 +99,7 @@ * [MySQL (contrib)](reference/online-stores/mysql.md) * [Rockset (contrib)](reference/online-stores/rockset.md) * [Hazelcast (contrib)](reference/online-stores/hazelcast.md) + * [ScyllaDB (contrib)](reference/online-stores/scylladb.md) * [Providers](reference/providers/README.md) * [Local](reference/providers/local.md) * [Google Cloud Platform](reference/providers/google-cloud-platform.md) diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index f86e6f6a1d..d90bfcf163 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -54,4 +54,6 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli [hazelcast.md](hazelcast.md) {% endcontent-ref %} - +{% content-ref url="scylladb.md" %} +[scylladb.md](scylladb.md) +{% endcontent-ref %} diff --git a/docs/reference/online-stores/scylladb.md b/docs/reference/online-stores/scylladb.md new file mode 100644 index 0000000000..e28e810e21 --- /dev/null +++ b/docs/reference/online-stores/scylladb.md @@ -0,0 +1,94 @@ +# ScyllaDB Cloud online store + +## Description + +ScyllaDB is a low-latency and high-performance Cassandra-compatible (uses CQL) database. You can use the existing Cassandra connector to use ScyllaDB as an online store in Feast. + +The [ScyllaDB](https://www.scylladb.com/) online store provides support for materializing feature values into a ScyllaDB or [ScyllaDB Cloud](https://www.scylladb.com/product/scylla-cloud/) cluster for serving online features real-time. + +## Getting started + +Install Feast with Cassandra support: +```bash +pip install "feast[cassandra]" +``` + +Create a new Feast project: +```bash +feast init REPO_NAME -t cassandra +``` + +### Example (ScyllaDB) + +{% code title="feature_store.yaml" %} +```yaml +project: scylla_feature_repo +registry: data/registry.db +provider: local +online_store: + type: cassandra + hosts: + - 172.17.0.2 + keyspace: feast + username: scylla + password: password +``` +{% endcode %} + +### Example (ScyllaDB Cloud) + +{% code title="feature_store.yaml" %} +```yaml +project: scylla_feature_repo +registry: data/registry.db +provider: local +online_store: + type: cassandra + hosts: + - node-0.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud + - node-1.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud + - node-2.aws_us_east_1.xxxxxxxx.clusters.scylla.cloud + keyspace: feast + username: scylla + password: password +``` +{% endcode %} + + +The full set of configuration options is available in [CassandraOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStoreConfig). +For a full explanation of configuration options please look at file +`sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/README.md`. + +Storage specifications can be found at `docs/specs/online_store_format.md`. + +## Functionality Matrix + +The set of functionality supported by online stores is described in detail [here](overview.md#functionality). +Below is a matrix indicating which functionality is supported by the Cassandra plugin. + +| | Cassandra | +| :-------------------------------------------------------- | :-------- | +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | yes | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | no | +| support for ttl (time to live) at retrieval | no | +| support for deleting expired data | no | +| collocated by feature view | yes | +| collocated by feature service | no | +| collocated by entity key | no | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). + +## Resources + +* [Sample application with ScyllaDB](https://feature-store.scylladb.com/stable/) +* [ScyllaDB website](https://www.scylladb.com/) +* [ScyllaDB Cloud documentation](https://cloud.docs.scylladb.com/stable/) From 7d75fc525a7f2f46811d168ce71f91b5736ad788 Mon Sep 17 00:00:00 2001 From: Job Almekinders <55230856+job-almekinders@users.noreply.github.com> Date: Fri, 9 Feb 2024 23:45:22 +0100 Subject: [PATCH 6/8] fix: Add conn.commit() to Postgresonline_write_batch.online_write_batch (#3904) --- sdk/python/feast/infra/online_stores/contrib/postgres.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index a12e66f109..49f87ddb0a 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -99,6 +99,7 @@ def online_write_batch( cur_batch, page_size=batch_size, ) + conn.commit() if progress: progress(len(cur_batch)) From d3a2a45d9bc2b690a7aa784ec7b0411e91244dab Mon Sep 17 00:00:00 2001 From: Tornike Gurgenidze Date: Sat, 10 Feb 2024 02:45:38 +0400 Subject: [PATCH 7/8] fix: Transformation server doesn't generate files from proto (#3902) --- sdk/python/feast/infra/transformation_servers/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/transformation_servers/Dockerfile b/sdk/python/feast/infra/transformation_servers/Dockerfile index c072ed0160..41f272c757 100644 --- a/sdk/python/feast/infra/transformation_servers/Dockerfile +++ b/sdk/python/feast/infra/transformation_servers/Dockerfile @@ -15,7 +15,7 @@ COPY README.md README.md # Install dependencies -RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[gcp,aws]' +RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir '.[gcp,aws]' # Start feature transformation server CMD [ "python", "app.py" ] From 4e450ad3b1b6d2f66fd87e07805bb57772390142 Mon Sep 17 00:00:00 2001 From: Chester Date: Sat, 10 Feb 2024 21:00:09 +0800 Subject: [PATCH 8/8] chore: Bumping fastapi + starlette (#3938) --- Makefile | 2 +- sdk/python/feast/data_source.py | 19 +++++----- sdk/python/feast/feature_service.py | 2 +- sdk/python/feast/feature_view.py | 2 +- sdk/python/feast/importer.py | 3 +- .../infra/contrib/spark_kafka_processor.py | 11 +++++- .../feast/infra/contrib/stream_processor.py | 11 +++--- .../athena_offline_store/athena_source.py | 6 +-- .../athena_offline_store/tests/data_source.py | 2 +- .../mssql_offline_store/tests/data_source.py | 8 ++-- .../tests/data_source.py | 2 +- .../spark_offline_store/tests/data_source.py | 4 +- .../feast/infra/offline_stores/file_source.py | 2 +- .../infra/offline_stores/offline_store.py | 37 +++++++------------ .../feast/infra/offline_stores/redshift.py | 6 +-- .../infra/offline_stores/snowflake_source.py | 4 +- .../feast/infra/online_stores/dynamodb.py | 4 +- .../feast/infra/passthrough_provider.py | 2 +- sdk/python/feast/infra/provider.py | 2 +- .../feast/infra/registry/base_registry.py | 2 + .../feast/infra/registry/registry_store.py | 4 +- sdk/python/feast/infra/registry/snowflake.py | 2 +- sdk/python/feast/infra/utils/aws_utils.py | 2 +- sdk/python/feast/infra/utils/hbase_utils.py | 8 ++-- .../infra/utils/snowflake/snowflake_utils.py | 6 ++- sdk/python/feast/type_map.py | 9 +++-- .../requirements/py3.10-ci-requirements.txt | 26 ++++--------- .../requirements/py3.10-requirements.txt | 12 ++---- .../requirements/py3.8-ci-requirements.txt | 26 ++++--------- .../requirements/py3.8-requirements.txt | 12 ++---- .../requirements/py3.9-ci-requirements.txt | 26 ++++--------- .../requirements/py3.9-requirements.txt | 12 ++---- sdk/python/tests/data/data_creator.py | 2 +- sdk/python/tests/foo_provider.py | 8 ++-- .../universal/data_source_creator.py | 4 +- .../universal/data_sources/bigquery.py | 2 +- .../universal/data_sources/file.py | 6 +-- .../universal/data_sources/redshift.py | 2 +- .../universal/data_sources/snowflake.py | 2 +- .../universal/online_store_creator.py | 4 +- .../offline_stores/test_offline_store.py | 15 +++++--- setup.py | 4 +- 42 files changed, 147 insertions(+), 178 deletions(-) diff --git a/Makefile b/Makefile index 4b85c0e448..6736e64078 100644 --- a/Makefile +++ b/Makefile @@ -310,7 +310,7 @@ format-python: cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests lint-python: - cd ${ROOT_DIR}/sdk/python; python -m mypy + cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ --follow-imports=skip feast cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index b7ce19aad9..3421fd5d30 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import enum import warnings from abc import ABC, abstractmethod @@ -485,12 +484,12 @@ def to_proto(self) -> DataSourceProto: return data_source_proto def validate(self, config: RepoConfig): - pass + raise NotImplementedError def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: - pass + raise NotImplementedError @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: @@ -534,12 +533,12 @@ def __init__( self.schema = schema def validate(self, config: RepoConfig): - pass + raise NotImplementedError def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: - pass + raise NotImplementedError def __eq__(self, other): if not isinstance(other, RequestSource): @@ -610,12 +609,12 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: @typechecked class KinesisSource(DataSource): def validate(self, config: RepoConfig): - pass + raise NotImplementedError def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: - pass + raise NotImplementedError @staticmethod def from_proto(data_source: DataSourceProto): @@ -639,7 +638,7 @@ def from_proto(data_source: DataSourceProto): @staticmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: - pass + raise NotImplementedError def get_table_query_string(self) -> str: raise NotImplementedError @@ -772,12 +771,12 @@ def __hash__(self): return super().__hash__() def validate(self, config: RepoConfig): - pass + raise NotImplementedError def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: - pass + raise NotImplementedError @staticmethod def from_proto(data_source: DataSourceProto): diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index c3037a55da..7ec923205a 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -56,7 +56,7 @@ def __init__( *, name: str, features: List[Union[FeatureView, OnDemandFeatureView]], - tags: Dict[str, str] = None, + tags: Optional[Dict[str, str]] = None, description: str = "", owner: str = "", logging_config: Optional[LoggingConfig] = None, diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 67f9662d31..f87ae7ab13 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -101,7 +101,7 @@ def __init__( name: str, source: DataSource, schema: Optional[List[Field]] = None, - entities: List[Entity] = None, + entities: Optional[List[Entity]] = None, ttl: Optional[timedelta] = timedelta(days=0), online: bool = True, description: str = "", diff --git a/sdk/python/feast/importer.py b/sdk/python/feast/importer.py index bbd592101a..d1d7d62901 100644 --- a/sdk/python/feast/importer.py +++ b/sdk/python/feast/importer.py @@ -1,4 +1,5 @@ import importlib +from typing import Optional from feast.errors import ( FeastClassImportError, @@ -7,7 +8,7 @@ ) -def import_class(module_name: str, class_name: str, class_type: str = None): +def import_class(module_name: str, class_name: str, class_type: Optional[str] = None): """ Dynamically loads and returns a class from a module. diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index ea55d89988..bac1c28b06 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -5,6 +5,7 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.avro.functions import from_avro from pyspark.sql.functions import col, from_json +from pyspark.sql.streaming import StreamingQuery from feast.data_format import AvroFormat, JsonFormat from feast.data_source import KafkaSource, PushMode @@ -63,7 +64,13 @@ def __init__( self.join_keys = [fs.get_entity(entity).join_key for entity in sfv.entities] super().__init__(fs=fs, sfv=sfv, data_source=sfv.stream_source) - def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: + # Type hinting for data_source type. + # data_source type has been checked to be an instance of KafkaSource. + self.data_source: KafkaSource = self.data_source # type: ignore + + def ingest_stream_feature_view( + self, to: PushMode = PushMode.ONLINE + ) -> StreamingQuery: ingested_stream_df = self._ingest_stream_data() transformed_df = self._construct_transformation_plan(ingested_stream_df) online_store_query = self._write_stream_data(transformed_df, to) @@ -122,7 +129,7 @@ def _ingest_stream_data(self) -> StreamTable: def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: return self.sfv.udf.__call__(df) if self.sfv.udf else df - def _write_stream_data(self, df: StreamTable, to: PushMode): + def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery: # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. def batch_write(row: DataFrame, batch_id: int): rows: pd.DataFrame = row.toPandas() diff --git a/sdk/python/feast/infra/contrib/stream_processor.py b/sdk/python/feast/infra/contrib/stream_processor.py index 24817c82ea..df4e144f8c 100644 --- a/sdk/python/feast/infra/contrib/stream_processor.py +++ b/sdk/python/feast/infra/contrib/stream_processor.py @@ -3,6 +3,7 @@ from typing import TYPE_CHECKING, Optional from pyspark.sql import DataFrame +from typing_extensions import TypeAlias from feast.data_source import DataSource, PushMode from feast.importer import import_class @@ -17,7 +18,7 @@ } # TODO: support more types other than just Spark. -StreamTable = DataFrame +StreamTable: TypeAlias = DataFrame class ProcessorConfig(FeastConfigBaseModel): @@ -54,13 +55,13 @@ def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: Ingests data from the stream source attached to the stream feature view; transforms the data and then persists it to the online store and/or offline store, depending on the 'to' parameter. """ - pass + raise NotImplementedError def _ingest_stream_data(self) -> StreamTable: """ Ingests data into a StreamTable. """ - pass + raise NotImplementedError def _construct_transformation_plan(self, table: StreamTable) -> StreamTable: """ @@ -68,14 +69,14 @@ def _construct_transformation_plan(self, table: StreamTable) -> StreamTable: evaluation, the StreamTable will not be materialized until it is actually evaluated. For example: df.collect() in spark or tbl.execute() in Flink. """ - pass + raise NotImplementedError def _write_stream_data(self, table: StreamTable, to: PushMode) -> None: """ Launches a job to persist stream data to the online store and/or offline store, depending on the 'to' parameter, and returns a handle for the job. """ - pass + raise NotImplementedError def get_stream_processor_object( diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py index 8e9e3893f3..0aca42cd68 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py @@ -297,9 +297,9 @@ class SavedDatasetAthenaStorage(SavedDatasetStorage): def __init__( self, table_ref: str, - query: str = None, - database: str = None, - data_source: str = None, + query: Optional[str] = None, + database: Optional[str] = None, + data_source: Optional[str] = None, ): self.athena_options = AthenaOptions( table=table_ref, query=query, database=database, data_source=data_source diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py index 384ab69e81..f68e109d6c 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/tests/data_source.py @@ -51,7 +51,7 @@ def create_data_source( suffix: Optional[str] = None, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, ) -> DataSource: table_name = destination_name diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py index 9b751d98ef..2604cf7c18 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/tests/data_source.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import Dict, List, Optional import pandas as pd import pytest @@ -66,7 +66,7 @@ def create_data_source( destination_name: str, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, **kwargs, ) -> DataSource: # Make sure the field mapping is correct and convert the datetime datasources. @@ -99,10 +99,10 @@ def create_data_source( ) def create_saved_dataset_destination(self) -> SavedDatasetStorage: - pass + raise NotImplementedError def get_prefixed_table_name(self, destination_name: str) -> str: return f"{self.project_name}_{destination_name}" def teardown(self): - pass + raise NotImplementedError diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py index f447950132..224fcea30f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py @@ -85,7 +85,7 @@ def create_data_source( suffix: Optional[str] = None, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, ) -> DataSource: destination_name = self.get_prefixed_table_name(destination_name) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py index 71c07b20c2..7b4fda3b5f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/tests/data_source.py @@ -2,7 +2,7 @@ import shutil import tempfile import uuid -from typing import Dict, List +from typing import Dict, List, Optional import pandas as pd from pyspark import SparkConf @@ -70,7 +70,7 @@ def create_data_source( destination_name: str, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, **kwargs, ) -> DataSource: if timestamp_field in df: diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index ac824b359f..887b410079 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -183,7 +183,7 @@ def create_filesystem_and_path( return None, path def get_table_query_string(self) -> str: - pass + raise NotImplementedError class FileOptions: diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 6141e3c435..30135feccb 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import warnings -from abc import ABC, abstractmethod +from abc import ABC from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union @@ -150,9 +150,8 @@ def to_sql(self) -> str: """ Return RetrievalJob generated SQL statement if applicable. """ - pass + raise NotImplementedError - @abstractmethod def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: """ Synchronously executes the underlying query and returns the result as a pandas dataframe. @@ -162,9 +161,8 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: Does not handle on demand transformations or dataset validation. For either of those, `to_df` should be used. """ - pass + raise NotImplementedError - @abstractmethod def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """ Synchronously executes the underlying query and returns the result as an arrow table. @@ -174,21 +172,18 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: Does not handle on demand transformations or dataset validation. For either of those, `to_arrow` should be used. """ - pass + raise NotImplementedError @property - @abstractmethod def full_feature_names(self) -> bool: """Returns True if full feature names should be applied to the results of the query.""" - pass + raise NotImplementedError @property - @abstractmethod def on_demand_feature_views(self) -> List[OnDemandFeatureView]: """Returns a list containing all the on demand feature views to be handled.""" - pass + raise NotImplementedError - @abstractmethod def persist( self, storage: SavedDatasetStorage, @@ -204,13 +199,12 @@ def persist( allow_overwrite: If True, a pre-existing location (e.g. table or file) can be overwritten. Currently not all individual offline store implementations make use of this parameter. """ - pass + raise NotImplementedError @property - @abstractmethod def metadata(self) -> Optional[RetrievalMetadata]: """Returns metadata about the retrieval job.""" - pass + raise NotImplementedError def supports_remote_storage_export(self) -> bool: """Returns True if the RetrievalJob supports `to_remote_storage`.""" @@ -226,7 +220,7 @@ def to_remote_storage(self) -> List[str]: Returns: A list of parquet file paths in remote storage. """ - raise NotImplementedError() + raise NotImplementedError class OfflineStore(ABC): @@ -239,7 +233,6 @@ class OfflineStore(ABC): """ @staticmethod - @abstractmethod def pull_latest_from_table_or_query( config: RepoConfig, data_source: DataSource, @@ -270,10 +263,9 @@ def pull_latest_from_table_or_query( Returns: A RetrievalJob that can be executed to get the entity rows. """ - pass + raise NotImplementedError @staticmethod - @abstractmethod def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], @@ -302,10 +294,9 @@ def get_historical_features( Returns: A RetrievalJob that can be executed to get the features. """ - pass + raise NotImplementedError @staticmethod - @abstractmethod def pull_all_from_table_or_query( config: RepoConfig, data_source: DataSource, @@ -334,7 +325,7 @@ def pull_all_from_table_or_query( Returns: A RetrievalJob that can be executed to get the entity rows. """ - pass + raise NotImplementedError @staticmethod def write_logged_features( @@ -358,7 +349,7 @@ def write_logged_features( logging_config: A LoggingConfig object that determines where the logs will be written. registry: The registry for the current feature store. """ - raise NotImplementedError() + raise NotImplementedError @staticmethod def offline_write_batch( @@ -377,4 +368,4 @@ def offline_write_batch( progress: Function to be called once a portion of the data has been written, used to show progress. """ - raise NotImplementedError() + raise NotImplementedError diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 837cf49655..6034bf5ac7 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -51,13 +51,13 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel): type: Literal["redshift"] = "redshift" """ Offline store type selector""" - cluster_id: Optional[StrictStr] + cluster_id: Optional[StrictStr] = None """ Redshift cluster identifier, for provisioned clusters """ - user: Optional[StrictStr] + user: Optional[StrictStr] = None """ Redshift user name, only required for provisioned clusters """ - workgroup: Optional[StrictStr] + workgroup: Optional[StrictStr] = None """ Redshift workgroup identifier, for serverless """ region: StrictStr diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 0cbf82dd1c..e29197c68d 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -1,5 +1,5 @@ import warnings -from typing import Callable, Dict, Iterable, Optional, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple from typeguard import typechecked @@ -223,7 +223,7 @@ def get_table_column_names_and_types( query = f"SELECT * FROM {self.get_table_query_string()} LIMIT 5" cursor = execute_snowflake_statement(conn, query) - metadata = [ + metadata: List[Dict[str, Any]] = [ { "column_name": column.name, "type_code": column.type_code, diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 525978e736..a1eef16f40 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -288,12 +288,12 @@ def _get_dynamodb_resource(self, region: str, endpoint_url: Optional[str] = None ) return self._dynamodb_resource - def _sort_dynamodb_response(self, responses: list, order: list): + def _sort_dynamodb_response(self, responses: list, order: list) -> Any: """DynamoDB Batch Get Item doesn't return items in a particular order.""" # Assign an index to order order_with_index = {value: idx for idx, value in enumerate(order)} # Sort table responses by index - table_responses_ordered = [ + table_responses_ordered: Any = [ (order_with_index[tbl_res["entity_id"]], tbl_res) for tbl_res in responses ] table_responses_ordered = sorted( diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 28b10c1259..811abe106c 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -180,7 +180,7 @@ def online_read( config: RepoConfig, table: FeatureView, entity_keys: List[EntityKeyProto], - requested_features: List[str] = None, + requested_features: Optional[List[str]] = None, ) -> List: set_usage_attribute("provider", self.__class__.__name__) result = [] diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 82879b264a..2a9670cace 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -211,7 +211,7 @@ def online_read( config: RepoConfig, table: FeatureView, entity_keys: List[EntityKeyProto], - requested_features: List[str] = None, + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: """ Reads features values for the given entity keys. diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 8928a5800d..f89b079478 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -503,7 +503,9 @@ def list_validation_references( Returns: List of request feature views """ + raise NotImplementedError + @abstractmethod def list_project_metadata( self, project: str, allow_cache: bool = False ) -> List[ProjectMetadata]: diff --git a/sdk/python/feast/infra/registry/registry_store.py b/sdk/python/feast/infra/registry/registry_store.py index c42a55cd9d..5151fd74b2 100644 --- a/sdk/python/feast/infra/registry/registry_store.py +++ b/sdk/python/feast/infra/registry/registry_store.py @@ -17,7 +17,7 @@ def get_registry_proto(self) -> RegistryProto: Returns: Returns either the registry proto stored at the registry path, or an empty registry proto. """ - pass + raise NotImplementedError @abstractmethod def update_registry_proto(self, registry_proto: RegistryProto): @@ -40,7 +40,7 @@ def teardown(self): class NoopRegistryStore(RegistryStore): def get_registry_proto(self) -> RegistryProto: - pass + return RegistryProto() def update_registry_proto(self, registry_proto: RegistryProto): pass diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index 56c7bc1f65..c1ebf13d6b 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -418,7 +418,7 @@ def _delete_object( """ cursor = execute_snowflake_statement(conn, query) - if cursor.rowcount < 1 and not_found_exception: + if cursor.rowcount < 1 and not_found_exception: # type: ignore raise not_found_exception(name, project) self._set_last_updated_metadata(datetime.utcnow(), project) diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index ef83c6d1c6..c3604ee41f 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -816,7 +816,7 @@ def execute_athena_query( database: str, workgroup: str, query: str, - temp_table: str = None, + temp_table: Optional[str] = None, ) -> str: """Execute athena statement synchronously. Waits for the query to finish. diff --git a/sdk/python/feast/infra/utils/hbase_utils.py b/sdk/python/feast/infra/utils/hbase_utils.py index d44f93f161..72afda2ef3 100644 --- a/sdk/python/feast/infra/utils/hbase_utils.py +++ b/sdk/python/feast/infra/utils/hbase_utils.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Optional from happybase import ConnectionPool @@ -38,9 +38,9 @@ class HBaseConnector: def __init__( self, - pool: ConnectionPool = None, - host: str = None, - port: int = None, + pool: Optional[ConnectionPool] = None, + host: Optional[str] = None, + port: Optional[int] = None, connection_pool_size: int = 4, ): if pool is None: diff --git a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py index a4cda89a6f..8eb5177ac2 100644 --- a/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py +++ b/sdk/python/feast/infra/utils/snowflake/snowflake_utils.py @@ -43,7 +43,11 @@ class GetSnowflakeConnection: - def __init__(self, config: str, autocommit=True): + def __init__( + self, + config: str, + autocommit=True, + ): self.config = config self.autocommit = autocommit diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index e51e1e743b..ad3e273d37 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -51,7 +51,7 @@ import pyarrow # null timestamps get converted to -9223372036854775808 -NULL_TIMESTAMP_INT_VALUE = np.datetime64("NaT").astype(int) +NULL_TIMESTAMP_INT_VALUE: int = np.datetime64("NaT").astype(int) def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: @@ -114,7 +114,10 @@ def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: def python_type_to_feast_value_type( - name: str, value: Any = None, recurse: bool = True, type_name: Optional[str] = None + name: str, + value: Optional[Any] = None, + recurse: bool = True, + type_name: Optional[str] = None, ) -> ValueType: """ Finds the equivalent Feast Value Type for a Python value. Both native @@ -321,7 +324,7 @@ def _python_datetime_to_int_timestamp( elif isinstance(value, Timestamp): int_timestamps.append(int(value.ToSeconds())) elif isinstance(value, np.datetime64): - int_timestamps.append(value.astype("datetime64[s]").astype(np.int_)) + int_timestamps.append(value.astype("datetime64[s]").astype(np.int_)) # type: ignore[attr-defined] elif isinstance(value, type(np.nan)): int_timestamps.append(NULL_TIMESTAMP_INT_VALUE) else: diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 740356907d..9435a68deb 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -121,9 +121,7 @@ comm==0.2.0 # ipykernel # ipywidgets coverage[toml]==7.3.2 - # via - # coverage - # pytest-cov + # via pytest-cov cryptography==41.0.6 # via # azure-identity @@ -173,7 +171,7 @@ execnet==2.0.2 # via pytest-xdist executing==2.0.1 # via stack-data -fastapi==0.99.1 +fastapi==0.109.1 # via feast (setup.py) fastavro==1.9.0 # via @@ -226,9 +224,7 @@ google-auth==2.23.4 google-auth-httplib2==0.1.1 # via google-api-python-client google-cloud-bigquery[pandas]==3.12.0 - # via - # feast (setup.py) - # google-cloud-bigquery + # via feast (setup.py) google-cloud-bigquery-storage==2.22.0 # via feast (setup.py) google-cloud-bigtable==2.21.0 @@ -462,7 +458,7 @@ msgpack==1.0.7 # via cachecontrol multiprocess==0.70.15 # via bytewax -mypy==0.982 +mypy==1.8.0 # via # feast (setup.py) # sqlalchemy @@ -801,9 +797,7 @@ sniffio==1.3.0 snowballstemmer==2.2.0 # via sphinx snowflake-connector-python[pandas]==3.5.0 - # via - # feast (setup.py) - # snowflake-connector-python + # via feast (setup.py) sortedcontainers==2.4.0 # via snowflake-connector-python soupsieve==2.5 @@ -829,14 +823,12 @@ sphinxcontrib-qthelp==1.0.6 sphinxcontrib-serializinghtml==1.1.9 # via sphinx sqlalchemy[mypy]==1.4.50 - # via - # feast (setup.py) - # sqlalchemy + # via feast (setup.py) sqlalchemy2-stubs==0.0.2a37 # via sqlalchemy stack-data==0.6.3 # via ipython -starlette==0.27.0 +starlette==0.35.1 # via fastapi tabulate==0.9.0 # via feast (setup.py) @@ -961,9 +953,7 @@ urllib3==1.26.18 # rockset # snowflake-connector-python uvicorn[standard]==0.24.0.post1 - # via - # feast (setup.py) - # uvicorn + # via feast (setup.py) uvloop==0.19.0 # via uvicorn virtualenv==20.23.0 diff --git a/sdk/python/requirements/py3.10-requirements.txt b/sdk/python/requirements/py3.10-requirements.txt index 18486d7fa9..5d5d451e14 100644 --- a/sdk/python/requirements/py3.10-requirements.txt +++ b/sdk/python/requirements/py3.10-requirements.txt @@ -42,7 +42,7 @@ dill==0.3.7 # via feast (setup.py) exceptiongroup==1.1.3 # via anyio -fastapi==0.99.1 +fastapi==0.109.1 # via feast (setup.py) fastavro==1.9.0 # via @@ -175,12 +175,10 @@ sniffio==1.3.0 # anyio # httpx sqlalchemy[mypy]==1.4.50 - # via - # feast (setup.py) - # sqlalchemy + # via feast (setup.py) sqlalchemy2-stubs==0.0.2a37 # via sqlalchemy -starlette==0.27.0 +starlette==0.35.1 # via fastapi tabulate==0.9.0 # via feast (setup.py) @@ -210,9 +208,7 @@ typing-extensions==4.8.0 urllib3==2.1.0 # via requests uvicorn[standard]==0.24.0.post1 - # via - # feast (setup.py) - # uvicorn + # via feast (setup.py) uvloop==0.19.0 # via uvicorn volatile==2.1.0 diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index 3bda9e72f9..808a58e11b 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -127,9 +127,7 @@ comm==0.2.0 # ipykernel # ipywidgets coverage[toml]==7.3.2 - # via - # coverage - # pytest-cov + # via pytest-cov cryptography==41.0.6 # via # azure-identity @@ -178,7 +176,7 @@ execnet==2.0.2 # via pytest-xdist executing==2.0.1 # via stack-data -fastapi==0.99.1 +fastapi==0.109.1 # via feast (setup.py) fastavro==1.9.0 # via @@ -231,9 +229,7 @@ google-auth==2.23.4 google-auth-httplib2==0.1.1 # via google-api-python-client google-cloud-bigquery[pandas]==3.12.0 - # via - # feast (setup.py) - # google-cloud-bigquery + # via feast (setup.py) google-cloud-bigquery-storage==2.22.0 # via feast (setup.py) google-cloud-bigtable==2.21.0 @@ -478,7 +474,7 @@ msgpack==1.0.7 # via cachecontrol multiprocess==0.70.15 # via bytewax -mypy==0.982 +mypy==1.8.0 # via # feast (setup.py) # sqlalchemy @@ -824,9 +820,7 @@ sniffio==1.3.0 snowballstemmer==2.2.0 # via sphinx snowflake-connector-python[pandas]==3.5.0 - # via - # feast (setup.py) - # snowflake-connector-python + # via feast (setup.py) sortedcontainers==2.4.0 # via snowflake-connector-python soupsieve==2.5 @@ -846,14 +840,12 @@ sphinxcontrib-qthelp==1.0.3 sphinxcontrib-serializinghtml==1.1.5 # via sphinx sqlalchemy[mypy]==1.4.50 - # via - # feast (setup.py) - # sqlalchemy + # via feast (setup.py) sqlalchemy2-stubs==0.0.2a37 # via sqlalchemy stack-data==0.6.3 # via ipython -starlette==0.27.0 +starlette==0.35.1 # via fastapi tabulate==0.9.0 # via feast (setup.py) @@ -981,9 +973,7 @@ urllib3==1.26.18 # rockset # snowflake-connector-python uvicorn[standard]==0.24.0.post1 - # via - # feast (setup.py) - # uvicorn + # via feast (setup.py) uvloop==0.19.0 # via uvicorn virtualenv==20.23.0 diff --git a/sdk/python/requirements/py3.8-requirements.txt b/sdk/python/requirements/py3.8-requirements.txt index c180c50c81..163fa4c9a8 100644 --- a/sdk/python/requirements/py3.8-requirements.txt +++ b/sdk/python/requirements/py3.8-requirements.txt @@ -42,7 +42,7 @@ dill==0.3.7 # via feast (setup.py) exceptiongroup==1.1.3 # via anyio -fastapi==0.99.1 +fastapi==0.109.1 # via feast (setup.py) fastavro==1.9.0 # via @@ -180,12 +180,10 @@ sniffio==1.3.0 # anyio # httpx sqlalchemy[mypy]==1.4.50 - # via - # feast (setup.py) - # sqlalchemy + # via feast (setup.py) sqlalchemy2-stubs==0.0.2a37 # via sqlalchemy -starlette==0.27.0 +starlette==0.35.1 # via fastapi tabulate==0.9.0 # via feast (setup.py) @@ -216,9 +214,7 @@ typing-extensions==4.8.0 urllib3==2.1.0 # via requests uvicorn[standard]==0.24.0.post1 - # via - # feast (setup.py) - # uvicorn + # via feast (setup.py) uvloop==0.19.0 # via uvicorn volatile==2.1.0 diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 6989d5b4cc..f9d7ac3fb9 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -121,9 +121,7 @@ comm==0.2.0 # ipykernel # ipywidgets coverage[toml]==7.3.2 - # via - # coverage - # pytest-cov + # via pytest-cov cryptography==41.0.6 # via # azure-identity @@ -173,7 +171,7 @@ execnet==2.0.2 # via pytest-xdist executing==2.0.1 # via stack-data -fastapi==0.99.1 +fastapi==0.109.1 # via feast (setup.py) fastavro==1.9.0 # via @@ -226,9 +224,7 @@ google-auth==2.23.4 google-auth-httplib2==0.1.1 # via google-api-python-client google-cloud-bigquery[pandas]==3.12.0 - # via - # feast (setup.py) - # google-cloud-bigquery + # via feast (setup.py) google-cloud-bigquery-storage==2.22.0 # via feast (setup.py) google-cloud-bigtable==2.21.0 @@ -469,7 +465,7 @@ msgpack==1.0.7 # via cachecontrol multiprocess==0.70.15 # via bytewax -mypy==0.982 +mypy==1.8.0 # via # feast (setup.py) # sqlalchemy @@ -810,9 +806,7 @@ sniffio==1.3.0 snowballstemmer==2.2.0 # via sphinx snowflake-connector-python[pandas]==3.5.0 - # via - # feast (setup.py) - # snowflake-connector-python + # via feast (setup.py) sortedcontainers==2.4.0 # via snowflake-connector-python soupsieve==2.5 @@ -838,14 +832,12 @@ sphinxcontrib-qthelp==1.0.6 sphinxcontrib-serializinghtml==1.1.9 # via sphinx sqlalchemy[mypy]==1.4.50 - # via - # feast (setup.py) - # sqlalchemy + # via feast (setup.py) sqlalchemy2-stubs==0.0.2a37 # via sqlalchemy stack-data==0.6.3 # via ipython -starlette==0.27.0 +starlette==0.35.1 # via fastapi tabulate==0.9.0 # via feast (setup.py) @@ -973,9 +965,7 @@ urllib3==1.26.18 # rockset # snowflake-connector-python uvicorn[standard]==0.24.0.post1 - # via - # feast (setup.py) - # uvicorn + # via feast (setup.py) uvloop==0.19.0 # via uvicorn virtualenv==20.23.0 diff --git a/sdk/python/requirements/py3.9-requirements.txt b/sdk/python/requirements/py3.9-requirements.txt index 3b6f88b4e2..4d9b8f107d 100644 --- a/sdk/python/requirements/py3.9-requirements.txt +++ b/sdk/python/requirements/py3.9-requirements.txt @@ -42,7 +42,7 @@ dill==0.3.7 # via feast (setup.py) exceptiongroup==1.1.3 # via anyio -fastapi==0.99.1 +fastapi==0.109.1 # via feast (setup.py) fastavro==1.9.0 # via @@ -175,12 +175,10 @@ sniffio==1.3.0 # anyio # httpx sqlalchemy[mypy]==1.4.50 - # via - # feast (setup.py) - # sqlalchemy + # via feast (setup.py) sqlalchemy2-stubs==0.0.2a37 # via sqlalchemy -starlette==0.27.0 +starlette==0.35.1 # via fastapi tabulate==0.9.0 # via feast (setup.py) @@ -211,9 +209,7 @@ typing-extensions==4.8.0 urllib3==2.1.0 # via requests uvicorn[standard]==0.24.0.post1 - # via - # feast (setup.py) - # uvicorn + # via feast (setup.py) uvloop==0.19.0 # via uvicorn volatile==2.1.0 diff --git a/sdk/python/tests/data/data_creator.py b/sdk/python/tests/data/data_creator.py index 8d5b1979fa..1fc66aee84 100644 --- a/sdk/python/tests/data/data_creator.py +++ b/sdk/python/tests/data/data_creator.py @@ -9,7 +9,7 @@ def create_basic_driver_dataset( entity_type: FeastType = Int32, - feature_dtype: str = None, + feature_dtype: Optional[str] = None, feature_is_list: bool = False, list_has_empty_list: bool = False, ) -> pd.DataFrame: diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index d27e2645d4..ba256a3813 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -71,16 +71,16 @@ def get_historical_features( project: str, full_feature_names: bool = False, ) -> RetrievalJob: - pass + return RetrievalJob() def online_read( self, config: RepoConfig, table: FeatureView, entity_keys: List[EntityKeyProto], - requested_features: List[str] = None, + requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - pass + return [] def retrieve_saved_dataset(self, config: RepoConfig, dataset: SavedDataset): pass @@ -102,4 +102,4 @@ def retrieve_feature_service_logs( config: RepoConfig, registry: BaseRegistry, ) -> RetrievalJob: - pass + return RetrievalJob() diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index b36af0db47..d64463606f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -20,7 +20,7 @@ def create_data_source( destination_name: str, event_timestamp_column="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, timestamp_field: Optional[str] = None, ) -> DataSource: """ @@ -53,7 +53,7 @@ def create_saved_dataset_destination(self) -> SavedDatasetStorage: ... def create_logged_features_destination(self) -> LoggingDestination: - pass + raise NotImplementedError @abstractmethod def teardown(self): diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 384037eef1..215d19ba7f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -66,7 +66,7 @@ def create_data_source( destination_name: str, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, **kwargs, ) -> DataSource: diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 124dd4c88d..3263785683 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -41,7 +41,7 @@ def create_data_source( destination_name: str, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, ) -> DataSource: destination_name = self.get_prefixed_table_name(destination_name) @@ -96,7 +96,7 @@ def create_data_source( destination_name: str, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, ) -> DataSource: destination_name = self.get_prefixed_table_name(destination_name) @@ -171,7 +171,7 @@ def create_data_source( suffix: Optional[str] = None, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, ) -> DataSource: filename = f"{destination_name}.parquet" port = self.minio.get_exposed_port("9000") diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py index dfe8e3d33b..e6f20d6125 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -51,7 +51,7 @@ def create_data_source( suffix: Optional[str] = None, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, ) -> DataSource: destination_name = self.get_prefixed_table_name(destination_name) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index c14780da97..1414291a18 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -51,7 +51,7 @@ def create_data_source( suffix: Optional[str] = None, timestamp_field="ts", created_timestamp_column="created_ts", - field_mapping: Dict[str, str] = None, + field_mapping: Optional[Dict[str, str]] = None, ) -> DataSource: destination_name = self.get_prefixed_table_name(destination_name) diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store_creator.py b/sdk/python/tests/integration/feature_repos/universal/online_store_creator.py index c3872ea697..10a8143739 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store_creator.py @@ -8,7 +8,7 @@ def __init__(self, project_name: str, **kwargs): self.project_name = project_name def create_online_store(self) -> FeastConfigBaseModel: - ... + raise NotImplementedError def teardown(self): - ... + raise NotImplementedError diff --git a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py index ef0cce0470..220bdba0da 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py @@ -39,6 +39,9 @@ class MockRetrievalJob(RetrievalJob): + def to_sql(self) -> str: + return "" + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: """ Synchronously executes the underlying query and returns the result as a pandas dataframe. @@ -46,7 +49,7 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: Does not handle on demand transformations or dataset validation. For either of those, `to_df` should be used. """ - pass + return pd.DataFrame() def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """ @@ -55,17 +58,17 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: Does not handle on demand transformations or dataset validation. For either of those, `to_arrow` should be used. """ - pass + return pyarrow.Table() @property def full_feature_names(self) -> bool: """Returns True if full feature names should be applied to the results of the query.""" - pass + return False @property def on_demand_feature_views(self) -> List[OnDemandFeatureView]: """Returns a list containing all the on demand feature views to be handled.""" - pass + return [] def persist( self, @@ -87,7 +90,7 @@ def persist( @property def metadata(self) -> Optional[RetrievalMetadata]: """Returns metadata about the retrieval job.""" - pass + raise NotImplementedError # Since RetreivalJob are not really tested for subclasses we add some tests here. @@ -208,7 +211,7 @@ def retrieval_job(request, environment): def test_to_sql(): - assert MockRetrievalJob().to_sql() is None + assert MockRetrievalJob().to_sql() == "" @pytest.mark.parametrize("timeout", (None, 30)) diff --git a/setup.py b/setup.py index 4fb80871b2..81ae63a7a4 100644 --- a/setup.py +++ b/setup.py @@ -71,7 +71,7 @@ "toml>=0.10.0,<1", "tqdm>=4,<5", "typeguard==2.13.3", - "fastapi>=0.68.0,<0.100", + "fastapi>=0.68.0", "uvicorn[standard]>=0.14.0,<1", "gunicorn", "dask>=2021.1.0", @@ -156,7 +156,7 @@ "minio==7.1.0", "mock==2.0.0", "moto<5", - "mypy>=0.981,<0.990", + "mypy>=1.4.1", "avro==1.10.0", "fsspec<2023.10.0", "urllib3>=1.25.4,<3",