diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 165018d1..f06a977d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -27,6 +27,28 @@ jobs: - "hooks/openfeature-hooks-opentelemetry" - "providers/openfeature-provider-flagd" + services: + # flagd-testbed for flagd RPC provider e2e tests + flagd: + image: ghcr.io/open-feature/flagd-testbed:v0.5.4 + ports: + - 8013:8013 + # flagd-testbed for flagd RPC provider reconnect e2e tests + flagd-unstable: + image: ghcr.io/open-feature/flagd-testbed-unstable:v0.5.4 + ports: + - 8014:8013 + # sync-testbed for flagd in-process provider e2e tests + sync: + image: ghcr.io/open-feature/sync-testbed:v0.5.4 + ports: + - 9090:9090 + # sync-testbed for flagd in-process provider reconnect e2e tests + sync-unstable: + image: ghcr.io/open-feature/sync-testbed-unstable:v0.5.4 + ports: + - 9091:9090 + steps: - uses: actions/checkout@v4 with: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3f9c9096..a6188125 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -26,7 +26,15 @@ We use `pytest` for our unit testing, making use of `parametrized` to inject cas ### Integration tests -These are planned once the SDK has been stabilized and a Flagd provider implemented. At that point, we will utilize the [gherkin integration tests](https://github.com/open-feature/test-harness/blob/main/features/evaluation.feature) to validate against a live, seeded Flagd instance. +The Flagd provider utilizes the [gherkin integration tests](https://github.com/open-feature/test-harness/blob/main/features/evaluation.feature) to validate against a live, seeded Flagd instance. + +To run the integration tests: + +```bash +cd providers/openfeature-provider-flagd +docker-compose up -d # this runs the flagd sidecars +hatch run test +``` ## Pull Request @@ -62,7 +70,7 @@ To start working on a new feature or bugfix, create a new branch and start worki ```bash git checkout -b feat/NAME_OF_FEATURE # Make your changes -git commit +git commit -s -m "feat: my feature" git push fork feat/NAME_OF_FEATURE ``` diff --git a/providers/openfeature-provider-flagd/README.md b/providers/openfeature-provider-flagd/README.md index aa63c55c..43271f55 100644 --- a/providers/openfeature-provider-flagd/README.md +++ b/providers/openfeature-provider-flagd/README.md @@ -19,6 +19,19 @@ from openfeature.contrib.provider.flagd import FlagdProvider api.set_provider(FlagdProvider()) ``` + +To use in-process evaluation with flagd gRPC sync service: + +```python +from openfeature import api +from openfeature.contrib.provider.flagd import FlagdProvider +from openfeature.contrib.provider.flagd.config import ResolverType + +api.set_provider(FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, +)) +``` + To use in-process evaluation in offline mode with a file as source: ```python @@ -36,12 +49,17 @@ api.set_provider(FlagdProvider( The default options can be defined in the FlagdProvider constructor. -| Option name | Type & Values | Default | -|----------------|---------------|-----------| -| host | str | localhost | -| port | int | 8013 | -| schema | str | http | -| timeout | int | 2 | +| Option name | Environment Variable Name | Type & Values | Default | +|-------------------------------|-------------------------------------|----------------|-----------| +| resolver_type | FLAGD_RESOLVER_TYPE | enum | grpc | +| host | FLAGD_HOST | str | localhost | +| port | FLAGD_PORT | int | 8013 | +| tls | FLAGD_TLS | bool | false | +| timeout | | int | 5 | +| retry_backoff_seconds | FLAGD_RETRY_BACKOFF_SECONDS | float | 2.0 | +| selector | FLAGD_SELECTOR | str | None | +| offline_flag_source_path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | str | None | +| offline_poll_interval_seconds | FLAGD_OFFLINE_POLL_INTERVAL_SECONDS | float | 1.0 | ## License diff --git a/providers/openfeature-provider-flagd/docker-compose.yaml b/providers/openfeature-provider-flagd/docker-compose.yaml new file mode 100644 index 00000000..71a14c02 --- /dev/null +++ b/providers/openfeature-provider-flagd/docker-compose.yaml @@ -0,0 +1,25 @@ +services: + flagd: + build: + context: test-harness + dockerfile: flagd/Dockerfile + ports: + - 8013:8013 + flagd-unstable: + build: + context: test-harness + dockerfile: flagd/Dockerfile.unstable + ports: + - 8014:8013 + flagd-sync: + build: + context: test-harness + dockerfile: sync/Dockerfile + ports: + - 9090:9090 + flagd-sync-unstable: + build: + context: test-harness + dockerfile: sync/Dockerfile.unstable + ports: + - 9091:9090 \ No newline at end of file diff --git a/providers/openfeature-provider-flagd/pyproject.toml b/providers/openfeature-provider-flagd/pyproject.toml index ff3daaea..5f7e64d0 100644 --- a/providers/openfeature-provider-flagd/pyproject.toml +++ b/providers/openfeature-provider-flagd/pyproject.toml @@ -58,6 +58,7 @@ cov = [ exclude = [ ".gitignore", "schemas", + "docker-compose.yaml", ] [tool.hatch.build.targets.wheel] diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index a95c3153..326b10b6 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -30,6 +30,8 @@ def __init__( # noqa: PLR0913 port: typing.Optional[int] = None, tls: typing.Optional[bool] = None, timeout: typing.Optional[int] = None, + retry_backoff_seconds: typing.Optional[float] = None, + selector: typing.Optional[str] = None, resolver_type: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, offline_poll_interval_seconds: typing.Optional[float] = None, @@ -42,6 +44,14 @@ def __init__( # noqa: PLR0913 env_or_default("FLAGD_TLS", False, cast=str_to_bool) if tls is None else tls ) self.timeout = 5 if timeout is None else timeout + self.retry_backoff_seconds: float = ( + float(env_or_default("FLAGD_RETRY_BACKOFF_SECONDS", 2.0)) + if retry_backoff_seconds is None + else retry_backoff_seconds + ) + self.selector = ( + env_or_default("FLAGD_SELECTOR", None) if selector is None else selector + ) self.resolver_type = ( ResolverType(env_or_default("FLAGD_RESOLVER_TYPE", "grpc")) if resolver_type is None diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py index 299a004b..16c958c5 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from flagd.evaluation.v1 import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2 +from . import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2 class ServiceStub(object): diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/sync/v1/sync_pb2_grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/sync/v1/sync_pb2_grpc.py index ce040715..16bed235 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/sync/v1/sync_pb2_grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/sync/v1/sync_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from flagd.sync.v1 import sync_pb2 as flagd_dot_sync_dot_v1_dot_sync__pb2 +from . import sync_pb2 as flagd_dot_sync_dot_v1_dot_sync__pb2 class FlagSyncServiceStub(object): diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/sync/v1/sync_service_pb2_grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/sync/v1/sync_service_pb2_grpc.py index fa38ac67..2afaf263 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/sync/v1/sync_service_pb2_grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/sync/v1/sync_service_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from sync.v1 import sync_service_pb2 as sync_dot_v1_dot_sync__service__pb2 +from . import sync_service_pb2 as sync_dot_v1_dot_sync__service__pb2 class FlagSyncServiceStub(object): diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 76307475..dca91821 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -24,6 +24,7 @@ import typing from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEventDetails from openfeature.flag_evaluation import FlagResolutionDetails from openfeature.provider.metadata import Metadata from openfeature.provider.provider import AbstractProvider @@ -43,6 +44,8 @@ def __init__( # noqa: PLR0913 port: typing.Optional[int] = None, tls: typing.Optional[bool] = None, timeout: typing.Optional[int] = None, + retry_backoff_seconds: typing.Optional[float] = None, + selector: typing.Optional[str] = None, resolver_type: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, offline_poll_interval_seconds: typing.Optional[float] = None, @@ -60,6 +63,8 @@ def __init__( # noqa: PLR0913 port=port, tls=tls, timeout=timeout, + retry_backoff_seconds=retry_backoff_seconds, + selector=selector, resolver_type=resolver_type, offline_flag_source_path=offline_flag_source_path, offline_poll_interval_seconds=offline_poll_interval_seconds, @@ -71,12 +76,20 @@ def setup_resolver(self) -> AbstractResolver: if self.config.resolver_type == ResolverType.GRPC: return GrpcResolver(self.config) elif self.config.resolver_type == ResolverType.IN_PROCESS: - return InProcessResolver(self.config, self) + return InProcessResolver( + self.config, + self.emit_provider_ready, + self.emit_provider_error, + self.emit_provider_configuration_changed, + ) else: raise ValueError( f"`resolver_type` parameter invalid: {self.config.resolver_type}" ) + def initialize(self, evaluation_context: EvaluationContext) -> None: + self.resolver.initialize(evaluation_context) + def shutdown(self) -> None: if self.resolver: self.resolver.shutdown() @@ -85,6 +98,11 @@ def get_metadata(self) -> Metadata: """Returns provider metadata""" return Metadata(name="FlagdProvider") + def flag_store_updated_callback(self, flag_keys: typing.List[str]) -> None: + self.emit_provider_configuration_changed( + ProviderEventDetails(flags_changed=flag_keys) + ) + def resolve_boolean_details( self, key: str, diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py index 53e17938..f539de8f 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py @@ -1,51 +1,5 @@ -import typing - -from typing_extensions import Protocol - -from openfeature.evaluation_context import EvaluationContext -from openfeature.flag_evaluation import FlagResolutionDetails - from .grpc import GrpcResolver from .in_process import InProcessResolver - - -class AbstractResolver(Protocol): - def shutdown(self) -> None: ... - - def resolve_boolean_details( - self, - key: str, - default_value: bool, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[bool]: ... - - def resolve_string_details( - self, - key: str, - default_value: str, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[str]: ... - - def resolve_float_details( - self, - key: str, - default_value: float, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[float]: ... - - def resolve_integer_details( - self, - key: str, - default_value: int, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[int]: ... - - def resolve_object_details( - self, - key: str, - default_value: typing.Union[dict, list], - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[typing.Union[dict, list]]: ... - +from .protocol import AbstractResolver __all__ = ["AbstractResolver", "GrpcResolver", "InProcessResolver"] diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index caab101a..ca697cf3 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -16,11 +16,12 @@ from ..config import Config from ..flag_type import FlagType from ..proto.schema.v1 import schema_pb2, schema_pb2_grpc +from .protocol import AbstractResolver T = typing.TypeVar("T") -class GrpcResolver: +class GrpcResolver(AbstractResolver): def __init__(self, config: Config): self.config = config channel_factory = ( diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py index 907a62d6..37c1608a 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py @@ -4,13 +4,16 @@ from json_logic import builtins, jsonLogic # type: ignore[import-untyped] from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEventDetails from openfeature.exception import FlagNotFoundError, ParseError from openfeature.flag_evaluation import FlagResolutionDetails, Reason -from openfeature.provider.provider import AbstractProvider from ..config import Config +from .process.connector import FlagStateConnector +from .process.connector.file_watcher import FileWatcher +from .process.connector.grpc_watcher import GrpcWatcher from .process.custom_ops import ends_with, fractional, sem_ver, starts_with -from .process.file_watcher import FileWatcherFlagStore +from .process.flags import FlagStore T = typing.TypeVar("T") @@ -24,21 +27,39 @@ class InProcessResolver: "sem_ver": sem_ver, } - def __init__(self, config: Config, provider: AbstractProvider): + def __init__( + self, + config: Config, + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], + emit_provider_error: typing.Callable[[ProviderEventDetails], None], + emit_provider_configuration_changed: typing.Callable[ + [ProviderEventDetails], None + ], + ): self.config = config - self.provider = provider - if not self.config.offline_flag_source_path: - raise ValueError( - "offline_flag_source_path must be provided when using in-process resolver" + self.flag_store = FlagStore(emit_provider_configuration_changed) + self.connector: FlagStateConnector = ( + FileWatcher( + self.config.offline_flag_source_path, + self.flag_store, + emit_provider_ready, + emit_provider_error, + self.config.offline_poll_interval_seconds, + ) + if self.config.offline_flag_source_path + else GrpcWatcher( + self.config, + self.flag_store, + emit_provider_ready, + emit_provider_error, ) - self.flag_store = FileWatcherFlagStore( - self.config.offline_flag_source_path, - self.provider, - self.config.offline_poll_interval_seconds, ) + def initialize(self, evaluation_context: EvaluationContext) -> None: + self.connector.initialize(evaluation_context) + def shutdown(self) -> None: - self.flag_store.shutdown() + self.connector.shutdown() def resolve_boolean_details( self, @@ -62,7 +83,10 @@ def resolve_float_details( default_value: float, evaluation_context: typing.Optional[EvaluationContext] = None, ) -> FlagResolutionDetails[float]: - return self._resolve(key, default_value, evaluation_context) + result = self._resolve(key, default_value, evaluation_context) + if not isinstance(result.value, float): + result.value = float(result.value) + return result def resolve_integer_details( self, @@ -70,7 +94,10 @@ def resolve_integer_details( default_value: int, evaluation_context: typing.Optional[EvaluationContext] = None, ) -> FlagResolutionDetails[int]: - return self._resolve(key, default_value, evaluation_context) + result = self._resolve(key, default_value, evaluation_context) + if not isinstance(result.value, int): + result.value = int(result.value) + return result def resolve_object_details( self, diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py new file mode 100644 index 00000000..07d49241 --- /dev/null +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py @@ -0,0 +1,11 @@ +import typing + +from openfeature.evaluation_context import EvaluationContext + + +class FlagStateConnector(typing.Protocol): + def initialize( + self, evaluation_context: EvaluationContext + ) -> None: ... # pragma: no cover + + def shutdown(self) -> None: ... # pragma: no cover diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py new file mode 100644 index 00000000..934c4332 --- /dev/null +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py @@ -0,0 +1,99 @@ +import json +import logging +import os +import threading +import time +import typing + +import yaml + +from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEventDetails +from openfeature.exception import ParseError, ProviderNotReadyError + +from ..connector import FlagStateConnector +from ..flags import FlagStore + +logger = logging.getLogger("openfeature.contrib") + + +class FileWatcher(FlagStateConnector): + def __init__( + self, + file_path: str, + flag_store: FlagStore, + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], + emit_provider_error: typing.Callable[[ProviderEventDetails], None], + poll_interval_seconds: float = 1.0, + ): + self.file_path = file_path + self.emit_provider_ready = emit_provider_ready + self.emit_provider_error = emit_provider_error + self.poll_interval_seconds = poll_interval_seconds + + self.last_modified = 0.0 + self.flag_store = flag_store + self.should_emit_ready_on_success = False + + def initialize(self, evaluation_context: EvaluationContext) -> None: + self.active = True + self.thread = threading.Thread( + target=self.refresh_file, daemon=True, name="FlagdFileWatcherWorkerThread" + ) + self.thread.start() + + # Let this throw exceptions so that provider status is set correctly + try: + self._load_data() + self.should_emit_ready_on_success = True + except Exception as err: + raise ProviderNotReadyError from err + + def shutdown(self) -> None: + self.active = False + + def refresh_file(self) -> None: + while self.active: + time.sleep(self.poll_interval_seconds) + logger.debug("checking for new flag store contents from file") + self.safe_load_data() + + def safe_load_data(self) -> None: + try: + last_modified = os.path.getmtime(self.file_path) + if last_modified > self.last_modified: + self._load_data(last_modified) + except FileNotFoundError: + self.handle_error("Provided file path not valid") + except json.JSONDecodeError: + self.handle_error("Could not parse JSON flag data from file") + except yaml.error.YAMLError: + self.handle_error("Could not parse YAML flag data from file") + except ParseError: + self.handle_error("Could not parse flag data using flagd syntax") + except Exception: + self.handle_error("Could not read flags from file") + + def _load_data(self, modified_time: typing.Optional[float] = None) -> None: + with open(self.file_path) as file: + if self.file_path.endswith(".yaml"): + data = yaml.safe_load(file) + else: + data = json.load(file) + + self.flag_store.update(data) + + if self.should_emit_ready_on_success: + self.emit_provider_ready( + ProviderEventDetails( + message="Reloading file contents recovered from error state" + ) + ) + self.should_emit_ready_on_success = False + + self.last_modified = modified_time or os.path.getmtime(self.file_path) + + def handle_error(self, error_message: str) -> None: + logger.exception(error_message) + self.should_emit_ready_on_success = True + self.emit_provider_error(ProviderEventDetails(message=error_message)) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py new file mode 100644 index 00000000..f6ea3ecf --- /dev/null +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -0,0 +1,110 @@ +import json +import logging +import threading +import time +import typing + +import grpc + +from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEventDetails +from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError + +from ....config import Config +from ....proto.flagd.sync.v1 import sync_pb2, sync_pb2_grpc +from ..connector import FlagStateConnector +from ..flags import FlagStore + +logger = logging.getLogger("openfeature.contrib") + + +class GrpcWatcher(FlagStateConnector): + MAX_BACK_OFF = 120 + + def __init__( + self, + config: Config, + flag_store: FlagStore, + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], + emit_provider_error: typing.Callable[[ProviderEventDetails], None], + ): + self.flag_store = flag_store + channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel + self.channel = channel_factory(f"{config.host}:{config.port}") + self.stub = sync_pb2_grpc.FlagSyncServiceStub(self.channel) + self.timeout = config.timeout + self.retry_backoff_seconds = config.retry_backoff_seconds + self.selector = config.selector + self.emit_provider_ready = emit_provider_ready + self.emit_provider_error = emit_provider_error + + self.connected = False + + def initialize(self, context: EvaluationContext) -> None: + self.active = True + self.thread = threading.Thread( + target=self.sync_flags, daemon=True, name="FlagdGrpcSyncWorkerThread" + ) + self.thread.start() + + ## block until ready or deadline reached + deadline = self.timeout + time.time() + while not self.connected and time.time() < deadline: + time.sleep(0.05) + logger.debug("Finished blocking gRPC state initialization") + + if not self.connected: + raise ProviderNotReadyError( + "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations." + ) + + def shutdown(self) -> None: + self.active = False + + def sync_flags(self) -> None: + request = sync_pb2.SyncFlagsRequest(selector=self.selector) # type:ignore[attr-defined] + + retry_delay = self.retry_backoff_seconds + while self.active: + try: + logger.debug("Setting up gRPC sync flags connection") + for flag_rsp in self.stub.SyncFlags(request): + flag_str = flag_rsp.flag_configuration + logger.debug( + f"Received flag configuration - {abs(hash(flag_str)) % (10 ** 8)}" + ) + self.flag_store.update(json.loads(flag_str)) + + if not self.connected: + self.emit_provider_ready( + ProviderEventDetails( + message="gRPC sync connection established" + ) + ) + self.connected = True + # reset retry delay after successsful read + retry_delay = self.retry_backoff_seconds + if not self.active: + logger.info("Terminating gRPC sync thread") + return + except grpc.RpcError as e: + logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") + except json.JSONDecodeError: + logger.exception( + f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}" + ) + except ParseError: + logger.exception( + f"Could not parse flag data using flagd syntax: {flag_str=}" + ) + + self.connected = False + self.emit_provider_error( + ProviderEventDetails( + message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", + error_code=ErrorCode.GENERAL, + ) + ) + logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") + time.sleep(retry_delay) + retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py deleted file mode 100644 index 0918981f..00000000 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py +++ /dev/null @@ -1,89 +0,0 @@ -import json -import logging -import os -import re -import threading -import time -import typing - -import yaml - -from openfeature.event import ProviderEventDetails -from openfeature.exception import ParseError -from openfeature.provider.provider import AbstractProvider - -from .flags import Flag - -logger = logging.getLogger("openfeature.contrib") - - -class FileWatcherFlagStore: - def __init__( - self, - file_path: str, - provider: AbstractProvider, - poll_interval_seconds: float = 1.0, - ): - self.file_path = file_path - self.provider = provider - self.poll_interval_seconds = poll_interval_seconds - - self.last_modified = 0.0 - self.flag_data: typing.Mapping[str, Flag] = {} - self.load_data() - self.thread = threading.Thread(target=self.refresh_file, daemon=True) - self.thread.start() - - def shutdown(self) -> None: - pass - - def get_flag(self, key: str) -> typing.Optional[Flag]: - return self.flag_data.get(key) - - def refresh_file(self) -> None: - while True: - time.sleep(self.poll_interval_seconds) - logger.debug("checking for new flag store contents from file") - last_modified = os.path.getmtime(self.file_path) - if last_modified > self.last_modified: - self.load_data(last_modified) - - def load_data(self, modified_time: typing.Optional[float] = None) -> None: - try: - with open(self.file_path) as file: - if self.file_path.endswith(".yaml"): - data = yaml.safe_load(file) - else: - data = json.load(file) - - self.flag_data = self.parse_flags(data) - logger.debug(f"{self.flag_data=}") - self.provider.emit_provider_configuration_changed( - ProviderEventDetails(flags_changed=list(self.flag_data.keys())) - ) - self.last_modified = modified_time or os.path.getmtime(self.file_path) - except FileNotFoundError: - logger.exception("Provided file path not valid") - except json.JSONDecodeError: - logger.exception("Could not parse JSON flag data from file") - except yaml.error.YAMLError: - logger.exception("Could not parse YAML flag data from file") - except ParseError: - logger.exception("Could not parse flag data using flagd syntax") - except Exception: - logger.exception("Could not read flags from file") - - def parse_flags(self, flags_data: dict) -> dict: - flags = flags_data.get("flags", {}) - evaluators: typing.Optional[dict] = flags_data.get("$evaluators") - if evaluators: - transposed = json.dumps(flags) - for name, rule in evaluators.items(): - transposed = re.sub( - rf"{{\s*\"\$ref\":\s*\"{name}\"\s*}}", json.dumps(rule), transposed - ) - flags = json.loads(transposed) - - if not isinstance(flags, dict): - raise ParseError("`flags` key of configuration must be a dictionary") - return {key: Flag.from_dict(key, data) for key, data in flags.items()} diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py index 0354ac42..889edac7 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py @@ -1,9 +1,45 @@ +import json +import re import typing from dataclasses import dataclass +from openfeature.event import ProviderEventDetails from openfeature.exception import ParseError +class FlagStore: + def __init__( + self, + emit_provider_configuration_changed: typing.Callable[ + [ProviderEventDetails], None + ], + ): + self.emit_provider_configuration_changed = emit_provider_configuration_changed + self.flags: typing.Mapping[str, "Flag"] = {} + + def get_flag(self, key: str) -> typing.Optional["Flag"]: + return self.flags.get(key) + + def update(self, flags_data: dict) -> None: + flags = flags_data.get("flags", {}) + evaluators: typing.Optional[dict] = flags_data.get("$evaluators") + if evaluators: + transposed = json.dumps(flags) + for name, rule in evaluators.items(): + transposed = re.sub( + rf"{{\s*\"\$ref\":\s*\"{name}\"\s*}}", json.dumps(rule), transposed + ) + flags = json.loads(transposed) + + if not isinstance(flags, dict): + raise ParseError("`flags` key of configuration must be a dictionary") + self.flags = {key: Flag.from_dict(key, data) for key, data in flags.items()} + + self.emit_provider_configuration_changed( + ProviderEventDetails(flags_changed=list(self.flags.keys())) + ) + + @dataclass class Flag: key: str @@ -32,11 +68,15 @@ def __post_init__(self) -> None: @classmethod def from_dict(cls, key: str, data: dict) -> "Flag": - data["default_variant"] = data["defaultVariant"] - del data["defaultVariant"] - flag = cls(key=key, **data) + if "defaultVariant" in data: + data["default_variant"] = data["defaultVariant"] + del data["defaultVariant"] - return flag + try: + flag = cls(key=key, **data) + return flag + except Exception as err: + raise ParseError from err @property def default(self) -> typing.Tuple[str, typing.Any]: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/protocol.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/protocol.py new file mode 100644 index 00000000..a6d70fba --- /dev/null +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/protocol.py @@ -0,0 +1,48 @@ +import typing + +from typing_extensions import Protocol + +from openfeature.evaluation_context import EvaluationContext +from openfeature.flag_evaluation import FlagResolutionDetails + + +class AbstractResolver(Protocol): + def initialize(self, evaluation_context: EvaluationContext) -> None: + return + + def shutdown(self) -> None: ... + + def resolve_boolean_details( + self, + key: str, + default_value: bool, + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[bool]: ... + + def resolve_string_details( + self, + key: str, + default_value: str, + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[str]: ... + + def resolve_float_details( + self, + key: str, + default_value: float, + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[float]: ... + + def resolve_integer_details( + self, + key: str, + default_value: int, + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[int]: ... + + def resolve_object_details( + self, + key: str, + default_value: typing.Union[dict, list], + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[typing.Union[dict, list]]: ... diff --git a/providers/openfeature-provider-flagd/test-harness b/providers/openfeature-provider-flagd/test-harness index 6197b3d9..c9e0be36 160000 --- a/providers/openfeature-provider-flagd/test-harness +++ b/providers/openfeature-provider-flagd/test-harness @@ -1 +1 @@ -Subproject commit 6197b3d956d358bf662e5b8e0aebdc4800480f6b +Subproject commit c9e0be36e89ad33aa99b8e32b40d67e9bf350f88 diff --git a/providers/openfeature-provider-flagd/tests/conftest.py b/providers/openfeature-provider-flagd/tests/conftest.py index 287f5240..692b84b2 100644 --- a/providers/openfeature-provider-flagd/tests/conftest.py +++ b/providers/openfeature-provider-flagd/tests/conftest.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import pytest @@ -8,13 +9,14 @@ @pytest.fixture() def flagd_provider_client(): - api.set_provider(FlagdProvider()) - return api.get_client() + provider = FlagdProvider() + api.set_provider(provider) + yield api.get_client() + provider.shutdown() def setup_flag_file(base_dir: str, flag_file: str) -> str: - with open(f"test-harness/flags/{flag_file}") as src_file: - contents = src_file.read() + contents = (Path(__file__).parent / "../test-harness/flags" / flag_file).read_text() dst_path = os.path.join(base_dir, flag_file) with open(dst_path, "w") as dst_file: dst_file.write(contents) diff --git a/providers/openfeature-provider-flagd/tests/e2e/__init__.py b/providers/openfeature-provider-flagd/tests/e2e/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index 243f5724..5de9923f 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -1,35 +1,25 @@ +import logging +import time import typing import pytest -from pytest_bdd import given, parsers, then, when -from tests.e2e.parsers import to_bool +from pytest_bdd import parsers, then, when -from openfeature import api -from openfeature.client import OpenFeatureClient -from openfeature.contrib.provider.flagd import FlagdProvider -from openfeature.contrib.provider.flagd.config import ResolverType +from openfeature.client import OpenFeatureClient, ProviderEvent from openfeature.evaluation_context import EvaluationContext JsonPrimitive = typing.Union[str, bool, float, int] +def to_bool(s: str) -> bool: + return s.lower() == "true" + + @pytest.fixture def evaluation_context() -> EvaluationContext: return EvaluationContext() -@given("a flagd provider is set", target_fixture="client") -def setup_provider(flag_file) -> OpenFeatureClient: - api.set_provider( - FlagdProvider( - resolver_type=ResolverType.IN_PROCESS, - offline_flag_source_path=flag_file, - offline_poll_interval_seconds=0.1, - ) - ) - return api.get_client() - - @when( parsers.cfparse( 'a zero-value boolean flag with key "{key}" is evaluated with default value "{default:bool}"', @@ -206,3 +196,117 @@ def assert_reason( key, default = key_and_default evaluation_result = client.get_string_details(key, default, evaluation_context) assert evaluation_result.reason.value == reason + + +@pytest.fixture +def handles() -> list: + return [] + + +@when( + parsers.cfparse( + "a {event_type:ProviderEvent} handler is added", + extra_types={"ProviderEvent": ProviderEvent}, + ), + target_fixture="handles", +) +def add_event_handler( + client: OpenFeatureClient, event_type: ProviderEvent, handles: list +): + def handler(event): + logging.info((event_type, event)) + handles.append( + { + "type": event_type, + "event": event, + } + ) + + client.add_handler(event_type, handler) + return handles + + +@when( + parsers.cfparse( + "a {event_type:ProviderEvent} handler and a {event_type2:ProviderEvent} handler are added", + extra_types={"ProviderEvent": ProviderEvent}, + ), + target_fixture="handles", +) +def add_event_handlers( + client: OpenFeatureClient, + event_type: ProviderEvent, + event_type2: ProviderEvent, + handles: list, +): + add_event_handler(client, event_type, handles) + add_event_handler(client, event_type2, handles) + + +def assert_handlers( + handles, event_type: ProviderEvent, max_wait: int = 2, num_events: int = 1 +): + poll_interval = 0.05 + while max_wait > 0: + if sum([h["type"] == event_type for h in handles]) < num_events: + max_wait -= poll_interval + time.sleep(poll_interval) + continue + break + + logging.info(f"asserting num({event_type}) >= {num_events}: {handles}") + actual_num_events = sum([h["type"] == event_type for h in handles]) + assert ( + num_events <= actual_num_events + ), f"Expected {num_events} but got {actual_num_events}: {handles}" + + +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run when the provider connects", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_handler_run(handles, event_type: ProviderEvent): + assert_handlers(handles, event_type, max_wait=3) + + +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run when the provider's connection is lost", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_disconnect_handler(handles, event_type: ProviderEvent): + # docker sync upstream restarts every 5s, waiting 2 cycles reduces test noise + assert_handlers(handles, event_type, max_wait=10) + + +@then( + parsers.cfparse( + "when the connection is reestablished the {event_type:ProviderEvent} handler must run again", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_disconnect_error(client: OpenFeatureClient, event_type: ProviderEvent): + reconnect_handles = [] + add_event_handler(client, event_type, reconnect_handles) + assert_handlers(reconnect_handles, event_type, max_wait=6) + + +@then(parsers.cfparse('the event details must indicate "{key}" was altered')) +def assert_flag_changed(handles, key): + handle = None + for h in handles: + if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: + handle = h + break + + assert handle is not None + assert key in handle["event"].flags_changed diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py new file mode 100644 index 00000000..81831c69 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py @@ -0,0 +1,17 @@ +from pytest_bdd import given + +from openfeature import api +from openfeature.contrib.provider.flagd import FlagdProvider +from openfeature.contrib.provider.flagd.config import ResolverType + + +@given("a flagd provider is set", target_fixture="client") +def setup_provider(flag_file): + provider = FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, + offline_flag_source_path=flag_file, + offline_poll_interval_seconds=0.1, + ) + api.set_provider(provider) + yield api.get_client() + provider.shutdown() diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_custom_ops.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_custom_ops.py similarity index 68% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_custom_ops.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_custom_ops.py index 70ceb1aa..d4ee6db1 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_custom_ops.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_custom_ops.py @@ -2,6 +2,8 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + @pytest.fixture def flag_file(tmp_path): @@ -9,14 +11,15 @@ def flag_file(tmp_path): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", "Fractional operator" + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", + "Fractional operator", ) def test_fractional_operator(): """Fractional operator.""" @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Semantic version operator numeric comparison", ) def test_semantic_version_operator_numeric_comparison(): @@ -24,7 +27,7 @@ def test_semantic_version_operator_numeric_comparison(): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Semantic version operator semantic comparison", ) def test_semantic_version_operator_semantic_comparison(): @@ -32,7 +35,8 @@ def test_semantic_version_operator_semantic_comparison(): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", "Substring operators" + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", + "Substring operators", ) def test_substring_operators(): """Substring operators.""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_edge_cases.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_edge_cases.py similarity index 65% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_edge_cases.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_edge_cases.py index 0583d8e9..976a5107 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_edge_cases.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_edge_cases.py @@ -2,14 +2,14 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + @pytest.fixture def flag_file(tmp_path): return setup_flag_file(tmp_path, "edge-case-flags.json") -@scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", "Errors and edge cases" -) +@scenario(f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Errors and edge cases") def test_errors_and_edge_cases(): """Errors and edge cases.""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_evaluator_reuse.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_evaluator_reuse.py similarity index 65% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_evaluator_reuse.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_evaluator_reuse.py index 5abcddb5..5bc802f7 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_evaluator_reuse.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_evaluator_reuse.py @@ -2,12 +2,14 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + @pytest.fixture def flag_file(tmp_path): return setup_flag_file(tmp_path, "evaluator-refs.json") -@scenario("../../test-harness/gherkin/flagd-json-evaluator.feature", "Evaluator reuse") +@scenario(f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Evaluator reuse") def test_evaluator_reuse(): """Evaluator reuse.""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py new file mode 100644 index 00000000..58603dcc --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py @@ -0,0 +1,35 @@ +import logging +import time +from pathlib import Path + +import pytest +from pytest_bdd import parsers, scenario, when +from tests.conftest import setup_flag_file + +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + + +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Provider ready event") +def test_ready_event(caplog): + """Provider ready event""" + caplog.set_level(logging.DEBUG) + + +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Flag change event") +def test_change_event(): + """Flag change event""" + + +@pytest.fixture +def flag_file(tmp_path): + return setup_flag_file(tmp_path, "changing-flag-bar.json") + + +@when(parsers.cfparse('a flag with key "{key}" is modified')) +def modify_flag(flag_file, key): + time.sleep(0.1) # guard against race condition + contents = ( + Path(__file__).parent / "../../../../test-harness/flags/changing-flag-foo.json" + ).read_text() + with open(flag_file, "w") as f: + f.write(contents) diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_testing_flags.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_testing_flags.py similarity index 72% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_testing_flags.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_testing_flags.py index 4e3bd069..84e815a6 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_testing_flags.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_testing_flags.py @@ -2,6 +2,8 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + @pytest.fixture def flag_file(tmp_path): @@ -9,7 +11,7 @@ def flag_file(tmp_path): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Time-based operations", ) def test_timebased_operations(): @@ -17,7 +19,7 @@ def test_timebased_operations(): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Targeting by targeting key", ) def test_targeting_by_targeting_key(): diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_zero_evals.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_zero_evals.py similarity index 55% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_zero_evals.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_zero_evals.py index 30de0dc6..cebe6e10 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_zero_evals.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_zero_evals.py @@ -2,23 +2,25 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" -@scenario("../../test-harness/gherkin/flagd.feature", "Resolves boolean zero value") + +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves boolean zero value") def test_eval_boolean(): """Resolve boolean zero value""" -@scenario("../../test-harness/gherkin/flagd.feature", "Resolves string zero value") +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves string zero value") def test_eval_string(): """Resolve string zero value""" -@scenario("../../test-harness/gherkin/flagd.feature", "Resolves integer zero value") +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves integer zero value") def test_eval_integer(): """Resolve integer zero value""" -@scenario("../../test-harness/gherkin/flagd.feature", "Resolves float zero value") +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves float zero value") def test_eval_float(): """Resolve float zero value""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py new file mode 100644 index 00000000..3c15b0c8 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py @@ -0,0 +1,49 @@ +import pytest +from pytest_bdd import given, parsers, then, when +from tests.e2e.conftest import add_event_handler, assert_handlers + +from openfeature import api +from openfeature.client import OpenFeatureClient, ProviderEvent +from openfeature.contrib.provider.flagd import FlagdProvider +from openfeature.contrib.provider.flagd.config import ResolverType + + +@pytest.fixture +def port(): + # Port for flagd-sync, override to 9091 to test unstable version + return 9090 + + +@given("a flagd provider is set", target_fixture="client") +def setup_provider(port: int) -> OpenFeatureClient: + api.set_provider( + FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, + port=port, + timeout=0.5, + retry_backoff_seconds=0.1, + ) + ) + return api.get_client() + + +@when(parsers.cfparse('a flag with key "{key}" is modified')) +def modify_flag(key): + # sync service will flip flag contents regularly + pass + + +@given("flagd is unavailable", target_fixture="client") +def flagd_unavailable(): + return setup_provider(99999) + + +@when("a flagd provider is set and initialization is awaited") +def flagd_init(client: OpenFeatureClient, handles): + add_event_handler(client, ProviderEvent.PROVIDER_ERROR, handles) + add_event_handler(client, ProviderEvent.PROVIDER_READY, handles) + + +@then("an error should be indicated within the configured deadline") +def flagd_error(handles): + assert_handlers(handles, ProviderEvent.PROVIDER_ERROR) diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py new file mode 100644 index 00000000..926c2195 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py @@ -0,0 +1,6 @@ +from pytest_bdd import scenarios + +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + +scenarios(f"{GHERKIN_FOLDER}flagd-json-evaluator.feature") +scenarios(f"{GHERKIN_FOLDER}flagd.feature") diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_reconnect.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_reconnect.py new file mode 100644 index 00000000..e3e1b85d --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_reconnect.py @@ -0,0 +1,12 @@ +import pytest +from pytest_bdd import scenarios + +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + +scenarios(f"{GHERKIN_FOLDER}flagd-reconnect.feature") + + +@pytest.fixture +def port(): + # Port for flagd-sync-unstable, overrides main conftest port + return 9091 diff --git a/providers/openfeature-provider-flagd/tests/e2e/parsers.py b/providers/openfeature-provider-flagd/tests/e2e/parsers.py deleted file mode 100644 index 16e89d94..00000000 --- a/providers/openfeature-provider-flagd/tests/e2e/parsers.py +++ /dev/null @@ -1,2 +0,0 @@ -def to_bool(s: str) -> bool: - return s.lower() == "true" diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_events.py b/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_events.py deleted file mode 100644 index e00a4844..00000000 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_events.py +++ /dev/null @@ -1,91 +0,0 @@ -import logging -import time - -import pytest -from pytest_bdd import parsers, scenario, then, when -from tests.conftest import setup_flag_file - -from openfeature.client import OpenFeatureClient, ProviderEvent - - -@scenario("../../test-harness/gherkin/flagd.feature", "Provider ready event") -def test_ready_event(caplog): - """Provider ready event""" - caplog.set_level(logging.DEBUG) - - -@scenario("../../test-harness/gherkin/flagd.feature", "Flag change event") -def test_change_event(): - """Flag change event""" - - -@pytest.fixture -def flag_file(tmp_path): - return setup_flag_file(tmp_path, "changing-flag-bar.json") - - -@pytest.fixture -def handles() -> list: - return [] - - -@when( - parsers.cfparse( - "a {event_type:ProviderEvent} handler is added", - extra_types={"ProviderEvent": ProviderEvent}, - ), - target_fixture="handles", -) -def add_event_handler( - client: OpenFeatureClient, event_type: ProviderEvent, handles: list -): - def handler(event): - handles.append( - { - "type": event_type, - "event": event, - } - ) - - client.add_handler(event_type, handler) - return handles - - -@then( - parsers.cfparse( - "the {event_type:ProviderEvent} handler must run", - extra_types={"ProviderEvent": ProviderEvent}, - ) -) -def assert_handler_run(handles, event_type: ProviderEvent): - max_wait = 2 - poll_interval = 0.1 - while max_wait > 0: - if all(h["type"] != event_type for h in handles): - max_wait -= poll_interval - time.sleep(poll_interval) - continue - break - - assert any(h["type"] == event_type for h in handles) - - -@when(parsers.cfparse('a flag with key "{key}" is modified')) -def modify_flag(flag_file, key): - time.sleep(0.1) # guard against race condition - with open("test-harness/flags/changing-flag-foo.json") as src_file: - contents = src_file.read() - with open(flag_file, "w") as f: - f.write(contents) - - -@then(parsers.cfparse('the event details must indicate "{key}" was altered')) -def assert_flag_changed(handles, key): - handle = None - for h in handles: - if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: - handle = h - break - - assert handle is not None - assert key in handle["event"].flags_changed diff --git a/providers/openfeature-provider-flagd/tests/test_errors.py b/providers/openfeature-provider-flagd/tests/test_errors.py index 4adb332e..39ed91e2 100644 --- a/providers/openfeature-provider-flagd/tests/test_errors.py +++ b/providers/openfeature-provider-flagd/tests/test_errors.py @@ -1,9 +1,12 @@ +import time + import pytest from openfeature import api from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.contrib.provider.flagd.config import ResolverType from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEvent from openfeature.exception import ErrorCode from openfeature.flag_evaluation import Reason @@ -77,3 +80,27 @@ def test_flag_disabled(): assert res.value == "fallback" assert res.reason == Reason.DISABLED + + +@pytest.mark.parametrize("wait", (0.5, 0.25)) +def test_grpc_sync_fail_deadline(wait: float): + init_failed = False + + def fail(*args, **kwargs): + nonlocal init_failed + init_failed = True + + api.get_client().add_handler(ProviderEvent.PROVIDER_ERROR, fail) + + t = time.time() + api.set_provider( + FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, + port=99999, # dead port to test failure + timeout=wait, + ) + ) + + elapsed = time.time() - t + assert abs(elapsed - wait) < 0.1 + assert init_failed diff --git a/providers/openfeature-provider-flagd/tests/test_file_store.py b/providers/openfeature-provider-flagd/tests/test_file_store.py index 2ae98ffa..12a1d976 100644 --- a/providers/openfeature-provider-flagd/tests/test_file_store.py +++ b/providers/openfeature-provider-flagd/tests/test_file_store.py @@ -1,14 +1,13 @@ from unittest.mock import Mock import pytest -from src.openfeature.contrib.provider.flagd.resolvers.process.file_watcher import ( - FileWatcherFlagStore, -) -from src.openfeature.contrib.provider.flagd.resolvers.process.flags import Flag from openfeature import api from openfeature.contrib.provider.flagd import FlagdProvider -from openfeature.provider.provider import AbstractProvider +from openfeature.contrib.provider.flagd.resolvers.process.connector.file_watcher import ( + FileWatcher, +) +from openfeature.contrib.provider.flagd.resolvers.process.flags import Flag, FlagStore def create_client(provider: FlagdProvider): @@ -23,11 +22,17 @@ def create_client(provider: FlagdProvider): "basic-flag.yaml", ], ) -def test_file_load_errors(file_name: str): - provider = Mock(spec=AbstractProvider) - file_store = FileWatcherFlagStore(f"tests/flags/{file_name}", provider) - - flag = file_store.flag_data.get("basic-flag") +def test_file_load(file_name: str): + emit_provider_configuration_changed = Mock() + emit_provider_ready = Mock() + emit_provider_error = Mock() + flag_store = FlagStore(emit_provider_configuration_changed) + file_watcher = FileWatcher( + f"tests/flags/{file_name}", flag_store, emit_provider_ready, emit_provider_error + ) + file_watcher.initialize(None) + + flag = flag_store.get_flag("basic-flag") assert flag is not None assert isinstance(flag, Flag) diff --git a/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py b/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py new file mode 100644 index 00000000..e68e7178 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py @@ -0,0 +1,45 @@ +import time +from unittest.mock import MagicMock, patch + +import pytest + +from openfeature.contrib.provider.flagd.config import Config +from openfeature.contrib.provider.flagd.resolvers.process.connector.grpc_watcher import ( + GrpcWatcher, +) +from openfeature.contrib.provider.flagd.resolvers.process.flags import FlagStore +from openfeature.exception import ProviderNotReadyError + + +def fake_grpc_service(flag_config: str, keepalive: int = 100): + def sync(request): + mock_resp = MagicMock() + mock_resp.flag_configuration = flag_config + yield mock_resp + time.sleep(keepalive) + + return sync + + +@pytest.mark.parametrize( + "flag_configuration", + ( + """{"flags": {"a-flag": {"garbage": "can"}}}""", + """not even a JSON""", + """{"flags": {"no-default": {"state": "ENABLED", "variants": {}}}}""", + ), +) +def test_invalid_payload(flag_configuration: str): + emit_provider_configuration_changed = MagicMock() + emit_provider_ready = MagicMock() + emit_provider_error = MagicMock() + flag_store = FlagStore(emit_provider_configuration_changed) + watcher = GrpcWatcher( + Config(timeout=0.2), flag_store, emit_provider_ready, emit_provider_error + ) + + fake_sync_flags = fake_grpc_service(flag_configuration) + with patch.object(watcher.stub, "SyncFlags", fake_sync_flags), pytest.raises( + ProviderNotReadyError + ): + watcher.initialize(None)