From 189957161412add8d6105feaa68be5fa0986d17b Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Sat, 27 Apr 2024 11:57:12 +0200 Subject: [PATCH 01/17] feat: add grpc sync flag store Signed-off-by: Cole Bailey --- .../docker-compose.yaml | 25 +++++ .../openfeature-provider-flagd/pyproject.toml | 5 + .../evaluation/v1/evaluation_pb2_grpc.py | 2 +- .../proto/flagd/sync/v1/sync_pb2_grpc.py | 2 +- .../proto/sync/v1/sync_service_pb2_grpc.py | 2 +- .../provider/flagd/resolvers/in_process.py | 18 ++-- .../process/{ => connector}/file_watcher.py | 22 +---- .../process/connector/grpc_watcher.py | 92 +++++++++++++++++++ .../provider/flagd/resolvers/process/flags.py | 26 ++++++ .../tests/e2e/__init__.py | 0 .../tests/e2e/conftest.py | 22 +---- .../tests/e2e/inprocess/file/conftest.py | 18 ++++ .../file}/test_inprocess_custom_ops.py | 12 ++- .../file}/test_inprocess_edge_cases.py | 6 +- .../file}/test_inprocess_evaluator_reuse.py | 4 +- .../file}/test_inprocess_events.py | 6 +- .../file}/test_inprocess_testing_flags.py | 6 +- .../file}/test_inprocess_zero_evals.py | 10 +- .../tests/e2e/inprocess/grpc/conftest.py | 24 +++++ .../grpc/test_inprocess_grpc_evals.py | 6 ++ .../grpc/test_inprocess_grpc_events.py | 91 ++++++++++++++++++ .../grpc/test_inprocess_grpc_zero_evals.py | 23 +++++ .../tests/e2e/parsers.py | 2 - .../tests/test_file_store.py | 8 +- 24 files changed, 363 insertions(+), 69 deletions(-) create mode 100644 providers/openfeature-provider-flagd/docker-compose.yaml rename providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/{ => connector}/file_watcher.py (75%) create mode 100644 providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py create mode 100644 providers/openfeature-provider-flagd/tests/e2e/__init__.py create mode 100644 providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py rename providers/openfeature-provider-flagd/tests/e2e/{ => inprocess/file}/test_inprocess_custom_ops.py (68%) rename providers/openfeature-provider-flagd/tests/e2e/{ => inprocess/file}/test_inprocess_edge_cases.py (65%) rename providers/openfeature-provider-flagd/tests/e2e/{ => inprocess/file}/test_inprocess_evaluator_reuse.py (65%) rename providers/openfeature-provider-flagd/tests/e2e/{ => inprocess/file}/test_inprocess_events.py (92%) rename providers/openfeature-provider-flagd/tests/e2e/{ => inprocess/file}/test_inprocess_testing_flags.py (72%) rename providers/openfeature-provider-flagd/tests/e2e/{ => inprocess/file}/test_inprocess_zero_evals.py (55%) create mode 100644 providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py create mode 100644 providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_evals.py create mode 100644 providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_events.py create mode 100644 providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_zero_evals.py delete mode 100644 providers/openfeature-provider-flagd/tests/e2e/parsers.py diff --git a/providers/openfeature-provider-flagd/docker-compose.yaml b/providers/openfeature-provider-flagd/docker-compose.yaml new file mode 100644 index 00000000..71a14c02 --- /dev/null +++ b/providers/openfeature-provider-flagd/docker-compose.yaml @@ -0,0 +1,25 @@ +services: + flagd: + build: + context: test-harness + dockerfile: flagd/Dockerfile + ports: + - 8013:8013 + flagd-unstable: + build: + context: test-harness + dockerfile: flagd/Dockerfile.unstable + ports: + - 8014:8013 + flagd-sync: + build: + context: test-harness + dockerfile: sync/Dockerfile + ports: + - 9090:9090 + flagd-sync-unstable: + build: + context: test-harness + dockerfile: sync/Dockerfile.unstable + ports: + - 9091:9090 \ No newline at end of file diff --git a/providers/openfeature-provider-flagd/pyproject.toml b/providers/openfeature-provider-flagd/pyproject.toml index ff3daaea..84a49d44 100644 --- a/providers/openfeature-provider-flagd/pyproject.toml +++ b/providers/openfeature-provider-flagd/pyproject.toml @@ -69,3 +69,8 @@ omit = [ "src/openfeature/contrib/provider/flagd/proto/*", "tests/**", ] + +[tool.pytest.ini_options] +addopts = [ + "--import-mode=importlib", +] \ No newline at end of file diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py index 299a004b..16c958c5 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from flagd.evaluation.v1 import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2 +from . import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2 class ServiceStub(object): diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/sync/v1/sync_pb2_grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/sync/v1/sync_pb2_grpc.py index ce040715..16bed235 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/sync/v1/sync_pb2_grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/sync/v1/sync_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from flagd.sync.v1 import sync_pb2 as flagd_dot_sync_dot_v1_dot_sync__pb2 +from . import sync_pb2 as flagd_dot_sync_dot_v1_dot_sync__pb2 class FlagSyncServiceStub(object): diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/sync/v1/sync_service_pb2_grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/sync/v1/sync_service_pb2_grpc.py index fa38ac67..2afaf263 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/sync/v1/sync_service_pb2_grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/sync/v1/sync_service_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from sync.v1 import sync_service_pb2 as sync_dot_v1_dot_sync__service__pb2 +from . import sync_service_pb2 as sync_dot_v1_dot_sync__service__pb2 class FlagSyncServiceStub(object): diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py index 907a62d6..e292dfa3 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py @@ -9,8 +9,10 @@ from openfeature.provider.provider import AbstractProvider from ..config import Config +from .process.connector.file_watcher import FileWatcherFlagStore +from .process.connector.grpc_watcher import GrpcWatcherFlagStore from .process.custom_ops import ends_with, fractional, sem_ver, starts_with -from .process.file_watcher import FileWatcherFlagStore +from .process.flags import FlagStore T = typing.TypeVar("T") @@ -27,14 +29,14 @@ class InProcessResolver: def __init__(self, config: Config, provider: AbstractProvider): self.config = config self.provider = provider - if not self.config.offline_flag_source_path: - raise ValueError( - "offline_flag_source_path must be provided when using in-process resolver" + self.flag_store: FlagStore = ( + FileWatcherFlagStore( + self.config.offline_flag_source_path, + self.provider, + self.config.offline_poll_interval_seconds, ) - self.flag_store = FileWatcherFlagStore( - self.config.offline_flag_source_path, - self.provider, - self.config.offline_poll_interval_seconds, + if self.config.offline_flag_source_path + else GrpcWatcherFlagStore(self.config, self.provider) ) def shutdown(self) -> None: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py similarity index 75% rename from providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py rename to providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py index 0918981f..d440f17d 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py @@ -1,7 +1,6 @@ import json import logging import os -import re import threading import time import typing @@ -12,12 +11,12 @@ from openfeature.exception import ParseError from openfeature.provider.provider import AbstractProvider -from .flags import Flag +from ..flags import Flag, FlagStore logger = logging.getLogger("openfeature.contrib") -class FileWatcherFlagStore: +class FileWatcherFlagStore(FlagStore): def __init__( self, file_path: str, @@ -56,7 +55,7 @@ def load_data(self, modified_time: typing.Optional[float] = None) -> None: else: data = json.load(file) - self.flag_data = self.parse_flags(data) + self.flag_data = Flag.parse_flags(data) logger.debug(f"{self.flag_data=}") self.provider.emit_provider_configuration_changed( ProviderEventDetails(flags_changed=list(self.flag_data.keys())) @@ -72,18 +71,3 @@ def load_data(self, modified_time: typing.Optional[float] = None) -> None: logger.exception("Could not parse flag data using flagd syntax") except Exception: logger.exception("Could not read flags from file") - - def parse_flags(self, flags_data: dict) -> dict: - flags = flags_data.get("flags", {}) - evaluators: typing.Optional[dict] = flags_data.get("$evaluators") - if evaluators: - transposed = json.dumps(flags) - for name, rule in evaluators.items(): - transposed = re.sub( - rf"{{\s*\"\$ref\":\s*\"{name}\"\s*}}", json.dumps(rule), transposed - ) - flags = json.loads(transposed) - - if not isinstance(flags, dict): - raise ParseError("`flags` key of configuration must be a dictionary") - return {key: Flag.from_dict(key, data) for key, data in flags.items()} diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py new file mode 100644 index 00000000..50a3690c --- /dev/null +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -0,0 +1,92 @@ +import json +import logging +import threading +import time +import typing + +import grpc + +from openfeature.event import ProviderEventDetails +from openfeature.exception import ParseError +from openfeature.provider.provider import AbstractProvider + +from ....config import Config +from ....proto.flagd.sync.v1 import sync_pb2, sync_pb2_grpc +from ..flags import Flag, FlagStore + +logger = logging.getLogger("openfeature.contrib") + + +class GrpcWatcherFlagStore(FlagStore): + INIT_BACK_OFF = 2 + MAX_BACK_OFF = 120 + + def __init__(self, config: Config, provider: AbstractProvider): + self.provider = provider + self.flag_data: typing.Mapping[str, Flag] = {} + channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel + self.channel = channel_factory(f"{config.host}:{config.port}") + self.stub = sync_pb2_grpc.FlagSyncServiceStub(self.channel) + + self.connected = False + + # TODO: Add selector + + self.thread = threading.Thread(target=self.sync_flags, daemon=True) + self.thread.start() + + ## block until ready or deadline reached + + # TODO: get deadline from user + deadline = 2 + time.time() + while not self.connected and time.time() < deadline: + logger.debug("blocking on init") + time.sleep(0.05) + + if not self.connected: + logger.warning( + "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations." + ) + + def shutdown(self) -> None: + pass + + def get_flag(self, key: str) -> typing.Optional[Flag]: + return self.flag_data.get(key) + + def sync_flags(self) -> None: + request = sync_pb2.SyncFlagsRequest() # type:ignore[attr-defined] + + retry_delay = self.INIT_BACK_OFF + while True: + try: + logger.debug("Setting up gRPC sync flags connection") + for flag_rsp in self.stub.SyncFlags(request): + flag_str = flag_rsp.flag_configuration + logger.debug( + f"Received flag configuration - {abs(hash(flag_str)) % (10 ** 8)}" + ) + self.flag_data = Flag.parse_flags(json.loads(flag_str)) + + self.connected = True + self.provider.emit_provider_configuration_changed( + ProviderEventDetails(flags_changed=list(self.flag_data.keys())) + ) + + # reset retry delay after successsful read + retry_delay = self.INIT_BACK_OFF + except grpc.RpcError as e: # noqa: PERF203 + logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") + except json.JSONDecodeError: + logger.exception( + f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}" + ) + except ParseError: + logger.exception( + f"Could not parse flag data using flagd syntax: {flag_str=}" + ) + finally: + # self.connected = False + logger.info(f"Reconnecting in {retry_delay}s") + time.sleep(retry_delay) + retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py index 0354ac42..a0e08e05 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py @@ -1,9 +1,19 @@ +import json +import re import typing from dataclasses import dataclass from openfeature.exception import ParseError +class FlagStore(typing.Protocol): + def get_flag(self, key: str) -> typing.Optional["Flag"]: + pass + + def shutdown(self) -> None: + pass + + @dataclass class Flag: key: str @@ -49,3 +59,19 @@ def get_variant( variant_key = str(variant_key).lower() return variant_key, self.variants.get(variant_key) + + @classmethod + def parse_flags(cls, flags_data: dict) -> typing.Dict[str, "Flag"]: + flags = flags_data.get("flags", {}) + evaluators: typing.Optional[dict] = flags_data.get("$evaluators") + if evaluators: + transposed = json.dumps(flags) + for name, rule in evaluators.items(): + transposed = re.sub( + rf"{{\s*\"\$ref\":\s*\"{name}\"\s*}}", json.dumps(rule), transposed + ) + flags = json.loads(transposed) + + if not isinstance(flags, dict): + raise ParseError("`flags` key of configuration must be a dictionary") + return {key: Flag.from_dict(key, data) for key, data in flags.items()} diff --git a/providers/openfeature-provider-flagd/tests/e2e/__init__.py b/providers/openfeature-provider-flagd/tests/e2e/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index 243f5724..e829c465 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -1,35 +1,23 @@ import typing import pytest -from pytest_bdd import given, parsers, then, when -from tests.e2e.parsers import to_bool +from pytest_bdd import parsers, then, when -from openfeature import api from openfeature.client import OpenFeatureClient -from openfeature.contrib.provider.flagd import FlagdProvider -from openfeature.contrib.provider.flagd.config import ResolverType from openfeature.evaluation_context import EvaluationContext JsonPrimitive = typing.Union[str, bool, float, int] +def to_bool(s: str) -> bool: + return s.lower() == "true" + + @pytest.fixture def evaluation_context() -> EvaluationContext: return EvaluationContext() -@given("a flagd provider is set", target_fixture="client") -def setup_provider(flag_file) -> OpenFeatureClient: - api.set_provider( - FlagdProvider( - resolver_type=ResolverType.IN_PROCESS, - offline_flag_source_path=flag_file, - offline_poll_interval_seconds=0.1, - ) - ) - return api.get_client() - - @when( parsers.cfparse( 'a zero-value boolean flag with key "{key}" is evaluated with default value "{default:bool}"', diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py new file mode 100644 index 00000000..9cf5be0c --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py @@ -0,0 +1,18 @@ +from pytest_bdd import given + +from openfeature import api +from openfeature.client import OpenFeatureClient +from openfeature.contrib.provider.flagd import FlagdProvider +from openfeature.contrib.provider.flagd.config import ResolverType + + +@given("a flagd provider is set", target_fixture="client") +def setup_provider(flag_file) -> OpenFeatureClient: + api.set_provider( + FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, + offline_flag_source_path=flag_file, + offline_poll_interval_seconds=0.1, + ) + ) + return api.get_client() diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_custom_ops.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_custom_ops.py similarity index 68% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_custom_ops.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_custom_ops.py index 70ceb1aa..d4ee6db1 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_custom_ops.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_custom_ops.py @@ -2,6 +2,8 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + @pytest.fixture def flag_file(tmp_path): @@ -9,14 +11,15 @@ def flag_file(tmp_path): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", "Fractional operator" + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", + "Fractional operator", ) def test_fractional_operator(): """Fractional operator.""" @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Semantic version operator numeric comparison", ) def test_semantic_version_operator_numeric_comparison(): @@ -24,7 +27,7 @@ def test_semantic_version_operator_numeric_comparison(): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Semantic version operator semantic comparison", ) def test_semantic_version_operator_semantic_comparison(): @@ -32,7 +35,8 @@ def test_semantic_version_operator_semantic_comparison(): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", "Substring operators" + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", + "Substring operators", ) def test_substring_operators(): """Substring operators.""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_edge_cases.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_edge_cases.py similarity index 65% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_edge_cases.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_edge_cases.py index 0583d8e9..976a5107 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_edge_cases.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_edge_cases.py @@ -2,14 +2,14 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + @pytest.fixture def flag_file(tmp_path): return setup_flag_file(tmp_path, "edge-case-flags.json") -@scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", "Errors and edge cases" -) +@scenario(f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Errors and edge cases") def test_errors_and_edge_cases(): """Errors and edge cases.""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_evaluator_reuse.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_evaluator_reuse.py similarity index 65% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_evaluator_reuse.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_evaluator_reuse.py index 5abcddb5..5bc802f7 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_evaluator_reuse.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_evaluator_reuse.py @@ -2,12 +2,14 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + @pytest.fixture def flag_file(tmp_path): return setup_flag_file(tmp_path, "evaluator-refs.json") -@scenario("../../test-harness/gherkin/flagd-json-evaluator.feature", "Evaluator reuse") +@scenario(f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Evaluator reuse") def test_evaluator_reuse(): """Evaluator reuse.""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_events.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py similarity index 92% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_events.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py index e00a4844..a5ff523c 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_events.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py @@ -7,14 +7,16 @@ from openfeature.client import OpenFeatureClient, ProviderEvent +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" -@scenario("../../test-harness/gherkin/flagd.feature", "Provider ready event") + +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Provider ready event") def test_ready_event(caplog): """Provider ready event""" caplog.set_level(logging.DEBUG) -@scenario("../../test-harness/gherkin/flagd.feature", "Flag change event") +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Flag change event") def test_change_event(): """Flag change event""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_testing_flags.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_testing_flags.py similarity index 72% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_testing_flags.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_testing_flags.py index 4e3bd069..84e815a6 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_testing_flags.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_testing_flags.py @@ -2,6 +2,8 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + @pytest.fixture def flag_file(tmp_path): @@ -9,7 +11,7 @@ def flag_file(tmp_path): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Time-based operations", ) def test_timebased_operations(): @@ -17,7 +19,7 @@ def test_timebased_operations(): @scenario( - "../../test-harness/gherkin/flagd-json-evaluator.feature", + f"{GHERKIN_FOLDER}flagd-json-evaluator.feature", "Targeting by targeting key", ) def test_targeting_by_targeting_key(): diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_zero_evals.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_zero_evals.py similarity index 55% rename from providers/openfeature-provider-flagd/tests/e2e/test_inprocess_zero_evals.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_zero_evals.py index 30de0dc6..cebe6e10 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_inprocess_zero_evals.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_zero_evals.py @@ -2,23 +2,25 @@ from pytest_bdd import scenario from tests.conftest import setup_flag_file +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" -@scenario("../../test-harness/gherkin/flagd.feature", "Resolves boolean zero value") + +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves boolean zero value") def test_eval_boolean(): """Resolve boolean zero value""" -@scenario("../../test-harness/gherkin/flagd.feature", "Resolves string zero value") +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves string zero value") def test_eval_string(): """Resolve string zero value""" -@scenario("../../test-harness/gherkin/flagd.feature", "Resolves integer zero value") +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves integer zero value") def test_eval_integer(): """Resolve integer zero value""" -@scenario("../../test-harness/gherkin/flagd.feature", "Resolves float zero value") +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves float zero value") def test_eval_float(): """Resolve float zero value""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py new file mode 100644 index 00000000..61fda032 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py @@ -0,0 +1,24 @@ +import pytest +from pytest_bdd import given + +from openfeature import api +from openfeature.client import OpenFeatureClient +from openfeature.contrib.provider.flagd import FlagdProvider +from openfeature.contrib.provider.flagd.config import ResolverType + + +@pytest.fixture +def port(): + # Port for flagd-sync, override to 9091 to test unstable version + return 9090 + + +@given("a flagd provider is set", target_fixture="client") +def setup_provider(port: int) -> OpenFeatureClient: + api.set_provider( + FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, + port=port, + ) + ) + return api.get_client() diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_evals.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_evals.py new file mode 100644 index 00000000..926c2195 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_evals.py @@ -0,0 +1,6 @@ +from pytest_bdd import scenarios + +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + +scenarios(f"{GHERKIN_FOLDER}flagd-json-evaluator.feature") +scenarios(f"{GHERKIN_FOLDER}flagd.feature") diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_events.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_events.py new file mode 100644 index 00000000..5f038da9 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_events.py @@ -0,0 +1,91 @@ +import logging +import time + +import pytest +from pytest_bdd import parsers, scenario, then, when + +from openfeature.client import OpenFeatureClient, ProviderEvent + +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + + +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Provider ready event") +def test_ready_event(caplog): + """Provider ready event""" + caplog.set_level(logging.DEBUG) + + +@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Flag change event") +def test_change_event(): + """Flag change event""" + + +@pytest.fixture +def handles() -> list: + return [] + + +@when( + parsers.cfparse( + "a {event_type:ProviderEvent} handler is added", + extra_types={"ProviderEvent": ProviderEvent}, + ), + target_fixture="handles", +) +def add_event_handler( + client: OpenFeatureClient, event_type: ProviderEvent, handles: list +): + def handler(event): + handles.append( + { + "type": event_type, + "event": event, + } + ) + + client.add_handler(event_type, handler) + return handles + + +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_handler_run(handles, event_type: ProviderEvent): + max_wait = 2 + poll_interval = 0.1 + while max_wait > 0: + if all(h["type"] != event_type for h in handles): + max_wait -= poll_interval + time.sleep(poll_interval) + continue + break + + assert any(h["type"] == event_type for h in handles) + + +@when(parsers.cfparse('a flag with key "{key}" is modified')) +def modify_flag(key): + pass + + +# def modify_flag(flag_file, key): +# time.sleep(0.1) # guard against race condition +# with open("test-harness/flags/changing-flag-foo.json") as src_file: +# contents = src_file.read() +# with open(flag_file, "w") as f: +# f.write(contents) + + +@then(parsers.cfparse('the event details must indicate "{key}" was altered')) +def assert_flag_changed(handles, key): + handle = None + for h in handles: + if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: + handle = h + break + + assert handle is not None + assert key in handle["event"].flags_changed diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_zero_evals.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_zero_evals.py new file mode 100644 index 00000000..444548d8 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_zero_evals.py @@ -0,0 +1,23 @@ +# from pytest_bdd import scenario + +# GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + + +# @scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves boolean zero value") +# def test_eval_boolean(): +# """Resolve boolean zero value""" + + +# @scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves string zero value") +# def test_eval_string(): +# """Resolve string zero value""" + + +# @scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves integer zero value") +# def test_eval_integer(): +# """Resolve integer zero value""" + + +# @scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves float zero value") +# def test_eval_float(): +# """Resolve float zero value""" diff --git a/providers/openfeature-provider-flagd/tests/e2e/parsers.py b/providers/openfeature-provider-flagd/tests/e2e/parsers.py deleted file mode 100644 index 16e89d94..00000000 --- a/providers/openfeature-provider-flagd/tests/e2e/parsers.py +++ /dev/null @@ -1,2 +0,0 @@ -def to_bool(s: str) -> bool: - return s.lower() == "true" diff --git a/providers/openfeature-provider-flagd/tests/test_file_store.py b/providers/openfeature-provider-flagd/tests/test_file_store.py index 2ae98ffa..b91f1628 100644 --- a/providers/openfeature-provider-flagd/tests/test_file_store.py +++ b/providers/openfeature-provider-flagd/tests/test_file_store.py @@ -1,13 +1,13 @@ from unittest.mock import Mock import pytest -from src.openfeature.contrib.provider.flagd.resolvers.process.file_watcher import ( - FileWatcherFlagStore, -) -from src.openfeature.contrib.provider.flagd.resolvers.process.flags import Flag from openfeature import api from openfeature.contrib.provider.flagd import FlagdProvider +from openfeature.contrib.provider.flagd.resolvers.process.connector.file_watcher import ( + FileWatcherFlagStore, +) +from openfeature.contrib.provider.flagd.resolvers.process.flags import Flag from openfeature.provider.provider import AbstractProvider From 2bfffe67e60e73480b4e0ad996a546d069896a70 Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Sat, 27 Apr 2024 19:20:40 +0200 Subject: [PATCH 02/17] fix: float/int type casting Signed-off-by: Cole Bailey --- .../provider/flagd/resolvers/in_process.py | 10 ++++++-- ..._grpc_events.py => test_inprocess_grpc.py} | 23 +++---------------- .../grpc/test_inprocess_grpc_evals.py | 6 ----- .../grpc/test_inprocess_grpc_zero_evals.py | 23 ------------------- 4 files changed, 11 insertions(+), 51 deletions(-) rename providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/{test_inprocess_grpc_events.py => test_inprocess_grpc.py} (72%) delete mode 100644 providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_evals.py delete mode 100644 providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_zero_evals.py diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py index e292dfa3..b3094b57 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py @@ -64,7 +64,10 @@ def resolve_float_details( default_value: float, evaluation_context: typing.Optional[EvaluationContext] = None, ) -> FlagResolutionDetails[float]: - return self._resolve(key, default_value, evaluation_context) + result = self._resolve(key, default_value, evaluation_context) + if not isinstance(result.value, float): + result.value = float(result.value) + return result def resolve_integer_details( self, @@ -72,7 +75,10 @@ def resolve_integer_details( default_value: int, evaluation_context: typing.Optional[EvaluationContext] = None, ) -> FlagResolutionDetails[int]: - return self._resolve(key, default_value, evaluation_context) + result = self._resolve(key, default_value, evaluation_context) + if not isinstance(result.value, int): + result.value = int(result.value) + return result def resolve_object_details( self, diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_events.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py similarity index 72% rename from providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_events.py rename to providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py index 5f038da9..96a33f06 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_events.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py @@ -1,23 +1,14 @@ -import logging import time import pytest -from pytest_bdd import parsers, scenario, then, when +from pytest_bdd import parsers, scenarios, then, when from openfeature.client import OpenFeatureClient, ProviderEvent GHERKIN_FOLDER = "../../../../test-harness/gherkin/" - -@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Provider ready event") -def test_ready_event(caplog): - """Provider ready event""" - caplog.set_level(logging.DEBUG) - - -@scenario(f"{GHERKIN_FOLDER}flagd.feature", "Flag change event") -def test_change_event(): - """Flag change event""" +scenarios(f"{GHERKIN_FOLDER}flagd-json-evaluator.feature") +scenarios(f"{GHERKIN_FOLDER}flagd.feature") @pytest.fixture @@ -71,14 +62,6 @@ def modify_flag(key): pass -# def modify_flag(flag_file, key): -# time.sleep(0.1) # guard against race condition -# with open("test-harness/flags/changing-flag-foo.json") as src_file: -# contents = src_file.read() -# with open(flag_file, "w") as f: -# f.write(contents) - - @then(parsers.cfparse('the event details must indicate "{key}" was altered')) def assert_flag_changed(handles, key): handle = None diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_evals.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_evals.py deleted file mode 100644 index 926c2195..00000000 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_evals.py +++ /dev/null @@ -1,6 +0,0 @@ -from pytest_bdd import scenarios - -GHERKIN_FOLDER = "../../../../test-harness/gherkin/" - -scenarios(f"{GHERKIN_FOLDER}flagd-json-evaluator.feature") -scenarios(f"{GHERKIN_FOLDER}flagd.feature") diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_zero_evals.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_zero_evals.py deleted file mode 100644 index 444548d8..00000000 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_zero_evals.py +++ /dev/null @@ -1,23 +0,0 @@ -# from pytest_bdd import scenario - -# GHERKIN_FOLDER = "../../../../test-harness/gherkin/" - - -# @scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves boolean zero value") -# def test_eval_boolean(): -# """Resolve boolean zero value""" - - -# @scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves string zero value") -# def test_eval_string(): -# """Resolve string zero value""" - - -# @scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves integer zero value") -# def test_eval_integer(): -# """Resolve integer zero value""" - - -# @scenario(f"{GHERKIN_FOLDER}flagd.feature", "Resolves float zero value") -# def test_eval_float(): -# """Resolve float zero value""" From fd8fe11021c391f7ebed01341b04cef8a236469a Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Wed, 1 May 2024 10:28:48 +0200 Subject: [PATCH 03/17] add: test cases and reconnect functionality Signed-off-by: Cole Bailey --- .github/workflows/build.yml | 22 ++++ .../process/connector/file_watcher.py | 25 +++- .../process/connector/grpc_watcher.py | 24 +++- .../tests/e2e/conftest.py | 112 +++++++++++++++++- .../inprocess/file/test_inprocess_events.py | 62 +--------- .../tests/e2e/inprocess/grpc/conftest.py | 27 ++++- .../e2e/inprocess/grpc/test_inprocess_grpc.py | 70 +---------- .../grpc/test_inprocess_grpc_reconnect.py | 12 ++ 8 files changed, 210 insertions(+), 144 deletions(-) create mode 100644 providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_reconnect.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 165018d1..f06a977d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -27,6 +27,28 @@ jobs: - "hooks/openfeature-hooks-opentelemetry" - "providers/openfeature-provider-flagd" + services: + # flagd-testbed for flagd RPC provider e2e tests + flagd: + image: ghcr.io/open-feature/flagd-testbed:v0.5.4 + ports: + - 8013:8013 + # flagd-testbed for flagd RPC provider reconnect e2e tests + flagd-unstable: + image: ghcr.io/open-feature/flagd-testbed-unstable:v0.5.4 + ports: + - 8014:8013 + # sync-testbed for flagd in-process provider e2e tests + sync: + image: ghcr.io/open-feature/sync-testbed:v0.5.4 + ports: + - 9090:9090 + # sync-testbed for flagd in-process provider reconnect e2e tests + sync-unstable: + image: ghcr.io/open-feature/sync-testbed-unstable:v0.5.4 + ports: + - 9091:9090 + steps: - uses: actions/checkout@v4 with: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py index d440f17d..6ecdb715 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py @@ -30,6 +30,7 @@ def __init__( self.last_modified = 0.0 self.flag_data: typing.Mapping[str, Flag] = {} self.load_data() + self.has_error = False self.thread = threading.Thread(target=self.refresh_file, daemon=True) self.thread.start() @@ -57,17 +58,31 @@ def load_data(self, modified_time: typing.Optional[float] = None) -> None: self.flag_data = Flag.parse_flags(data) logger.debug(f"{self.flag_data=}") + + if self.has_error: + self.provider.emit_provider_ready( + ProviderEventDetails( + message="Reloading file contents recovered from error state" + ) + ) + self.has_error = False + self.provider.emit_provider_configuration_changed( ProviderEventDetails(flags_changed=list(self.flag_data.keys())) ) self.last_modified = modified_time or os.path.getmtime(self.file_path) except FileNotFoundError: - logger.exception("Provided file path not valid") + self.handle_error("Provided file path not valid") except json.JSONDecodeError: - logger.exception("Could not parse JSON flag data from file") + self.handle_error("Could not parse JSON flag data from file") except yaml.error.YAMLError: - logger.exception("Could not parse YAML flag data from file") + self.handle_error("Could not parse YAML flag data from file") except ParseError: - logger.exception("Could not parse flag data using flagd syntax") + self.handle_error("Could not parse flag data using flagd syntax") except Exception: - logger.exception("Could not read flags from file") + self.handle_error("Could not read flags from file") + + def handle_error(self, error_message: str) -> None: + logger.exception(error_message) + self.has_error = True + self.provider.emit_provider_error(ProviderEventDetails(message=error_message)) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py index 50a3690c..06252bcb 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -7,7 +7,7 @@ import grpc from openfeature.event import ProviderEventDetails -from openfeature.exception import ParseError +from openfeature.exception import ErrorCode, ParseError from openfeature.provider.provider import AbstractProvider from ....config import Config @@ -68,13 +68,19 @@ def sync_flags(self) -> None: ) self.flag_data = Flag.parse_flags(json.loads(flag_str)) - self.connected = True + if not self.connected: + self.provider.emit_provider_ready( + ProviderEventDetails( + message="gRPC sync connection established" + ) + ) + self.connected = True + # reset retry delay after successsful read + retry_delay = self.INIT_BACK_OFF + self.provider.emit_provider_configuration_changed( ProviderEventDetails(flags_changed=list(self.flag_data.keys())) ) - - # reset retry delay after successsful read - retry_delay = self.INIT_BACK_OFF except grpc.RpcError as e: # noqa: PERF203 logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") except json.JSONDecodeError: @@ -86,7 +92,13 @@ def sync_flags(self) -> None: f"Could not parse flag data using flagd syntax: {flag_str=}" ) finally: - # self.connected = False + self.connected = False + self.provider.emit_provider_error( + ProviderEventDetails( + message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", + error_code=ErrorCode.GENERAL, + ) + ) logger.info(f"Reconnecting in {retry_delay}s") time.sleep(retry_delay) retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF) diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index e829c465..d3427ed8 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -1,9 +1,10 @@ +import time import typing import pytest from pytest_bdd import parsers, then, when -from openfeature.client import OpenFeatureClient +from openfeature.client import OpenFeatureClient, ProviderEvent from openfeature.evaluation_context import EvaluationContext JsonPrimitive = typing.Union[str, bool, float, int] @@ -194,3 +195,112 @@ def assert_reason( key, default = key_and_default evaluation_result = client.get_string_details(key, default, evaluation_context) assert evaluation_result.reason.value == reason + + +@pytest.fixture +def handles() -> list: + return [] + + +@when( + parsers.cfparse( + "a {event_type:ProviderEvent} handler is added", + extra_types={"ProviderEvent": ProviderEvent}, + ), + target_fixture="handles", +) +def add_event_handler( + client: OpenFeatureClient, event_type: ProviderEvent, handles: list +): + def handler(event): + handles.append( + { + "type": event_type, + "event": event, + } + ) + + client.add_handler(event_type, handler) + return handles + + +@when( + parsers.cfparse( + "a {event_type:ProviderEvent} handler and a {event_type2:ProviderEvent} handler are added", + extra_types={"ProviderEvent": ProviderEvent}, + ), + target_fixture="handles", +) +def add_event_handlers( + client: OpenFeatureClient, + event_type: ProviderEvent, + event_type2: ProviderEvent, + handles: list, +): + add_event_handler(client, event_type, handles) + add_event_handler(client, event_type2, handles) + + +def assert_handlers( + handles, event_type: ProviderEvent, max_wait: int = 2, num_events: int = 1 +): + poll_interval = 0.05 + while max_wait > 0: + if len([h["type"] == event_type for h in handles]) < num_events: + max_wait -= poll_interval + time.sleep(poll_interval) + continue + break + + actual_num_events = len([h["type"] == event_type for h in handles]) + assert ( + num_events <= actual_num_events + ), f"Expected {num_events} but got {actual_num_events}: {handles}" + + +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run when the provider connects", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_handler_run(handles, event_type: ProviderEvent): + assert_handlers(handles, event_type, max_wait=3) + + +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run when the provider's connection is lost", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_disconnect_handler(handles, event_type: ProviderEvent): + assert_handlers(handles, event_type, max_wait=6) + + +@then( + parsers.cfparse( + "when the connection is reestablished the {event_type:ProviderEvent} handler must run again", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_disconnect_error(handles, event_type: ProviderEvent): + assert_handlers(handles, event_type, max_wait=6, num_events=2) + + +@then(parsers.cfparse('the event details must indicate "{key}" was altered')) +def assert_flag_changed(handles, key): + handle = None + for h in handles: + if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: + handle = h + break + + assert handle is not None + assert key in handle["event"].flags_changed diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py index a5ff523c..2c78f882 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py @@ -2,11 +2,9 @@ import time import pytest -from pytest_bdd import parsers, scenario, then, when +from pytest_bdd import parsers, scenario, when from tests.conftest import setup_flag_file -from openfeature.client import OpenFeatureClient, ProviderEvent - GHERKIN_FOLDER = "../../../../test-harness/gherkin/" @@ -26,52 +24,6 @@ def flag_file(tmp_path): return setup_flag_file(tmp_path, "changing-flag-bar.json") -@pytest.fixture -def handles() -> list: - return [] - - -@when( - parsers.cfparse( - "a {event_type:ProviderEvent} handler is added", - extra_types={"ProviderEvent": ProviderEvent}, - ), - target_fixture="handles", -) -def add_event_handler( - client: OpenFeatureClient, event_type: ProviderEvent, handles: list -): - def handler(event): - handles.append( - { - "type": event_type, - "event": event, - } - ) - - client.add_handler(event_type, handler) - return handles - - -@then( - parsers.cfparse( - "the {event_type:ProviderEvent} handler must run", - extra_types={"ProviderEvent": ProviderEvent}, - ) -) -def assert_handler_run(handles, event_type: ProviderEvent): - max_wait = 2 - poll_interval = 0.1 - while max_wait > 0: - if all(h["type"] != event_type for h in handles): - max_wait -= poll_interval - time.sleep(poll_interval) - continue - break - - assert any(h["type"] == event_type for h in handles) - - @when(parsers.cfparse('a flag with key "{key}" is modified')) def modify_flag(flag_file, key): time.sleep(0.1) # guard against race condition @@ -79,15 +31,3 @@ def modify_flag(flag_file, key): contents = src_file.read() with open(flag_file, "w") as f: f.write(contents) - - -@then(parsers.cfparse('the event details must indicate "{key}" was altered')) -def assert_flag_changed(handles, key): - handle = None - for h in handles: - if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: - handle = h - break - - assert handle is not None - assert key in handle["event"].flags_changed diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py index 61fda032..ef6c4638 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py @@ -1,8 +1,9 @@ import pytest -from pytest_bdd import given +from pytest_bdd import given, parsers, then, when +from tests.e2e.conftest import add_event_handler, assert_handlers from openfeature import api -from openfeature.client import OpenFeatureClient +from openfeature.client import OpenFeatureClient, ProviderEvent from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.contrib.provider.flagd.config import ResolverType @@ -22,3 +23,25 @@ def setup_provider(port: int) -> OpenFeatureClient: ) ) return api.get_client() + + +@when(parsers.cfparse('a flag with key "{key}" is modified')) +def modify_flag(key): + # sync service will flip flag contents regularly + pass + + +@given("flagd is unavailable", target_fixture="client") +def flagd_unavailable(): + return setup_provider(99999) + + +@when("a flagd provider is set and initialization is awaited") +def flagd_init(client: OpenFeatureClient, handles): + add_event_handler(client, ProviderEvent.PROVIDER_ERROR, handles) + add_event_handler(client, ProviderEvent.PROVIDER_READY, handles) + + +@then("an error should be indicated within the configured deadline") +def flagd_error(handles): + assert_handlers(handles, ProviderEvent.PROVIDER_ERROR) diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py index 96a33f06..926c2195 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc.py @@ -1,74 +1,6 @@ -import time - -import pytest -from pytest_bdd import parsers, scenarios, then, when - -from openfeature.client import OpenFeatureClient, ProviderEvent +from pytest_bdd import scenarios GHERKIN_FOLDER = "../../../../test-harness/gherkin/" scenarios(f"{GHERKIN_FOLDER}flagd-json-evaluator.feature") scenarios(f"{GHERKIN_FOLDER}flagd.feature") - - -@pytest.fixture -def handles() -> list: - return [] - - -@when( - parsers.cfparse( - "a {event_type:ProviderEvent} handler is added", - extra_types={"ProviderEvent": ProviderEvent}, - ), - target_fixture="handles", -) -def add_event_handler( - client: OpenFeatureClient, event_type: ProviderEvent, handles: list -): - def handler(event): - handles.append( - { - "type": event_type, - "event": event, - } - ) - - client.add_handler(event_type, handler) - return handles - - -@then( - parsers.cfparse( - "the {event_type:ProviderEvent} handler must run", - extra_types={"ProviderEvent": ProviderEvent}, - ) -) -def assert_handler_run(handles, event_type: ProviderEvent): - max_wait = 2 - poll_interval = 0.1 - while max_wait > 0: - if all(h["type"] != event_type for h in handles): - max_wait -= poll_interval - time.sleep(poll_interval) - continue - break - - assert any(h["type"] == event_type for h in handles) - - -@when(parsers.cfparse('a flag with key "{key}" is modified')) -def modify_flag(key): - pass - - -@then(parsers.cfparse('the event details must indicate "{key}" was altered')) -def assert_flag_changed(handles, key): - handle = None - for h in handles: - if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: - handle = h - break - - assert handle is not None - assert key in handle["event"].flags_changed diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_reconnect.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_reconnect.py new file mode 100644 index 00000000..e3e1b85d --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/test_inprocess_grpc_reconnect.py @@ -0,0 +1,12 @@ +import pytest +from pytest_bdd import scenarios + +GHERKIN_FOLDER = "../../../../test-harness/gherkin/" + +scenarios(f"{GHERKIN_FOLDER}flagd-reconnect.feature") + + +@pytest.fixture +def port(): + # Port for flagd-sync-unstable, overrides main conftest port + return 9091 From 17e8ce8d6c30a68a231b3578f35298d8b232c2e8 Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Wed, 1 May 2024 11:04:10 +0200 Subject: [PATCH 04/17] fix: pyproject settings Signed-off-by: Cole Bailey --- providers/openfeature-provider-flagd/pyproject.toml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/providers/openfeature-provider-flagd/pyproject.toml b/providers/openfeature-provider-flagd/pyproject.toml index 84a49d44..5f7e64d0 100644 --- a/providers/openfeature-provider-flagd/pyproject.toml +++ b/providers/openfeature-provider-flagd/pyproject.toml @@ -58,6 +58,7 @@ cov = [ exclude = [ ".gitignore", "schemas", + "docker-compose.yaml", ] [tool.hatch.build.targets.wheel] @@ -69,8 +70,3 @@ omit = [ "src/openfeature/contrib/provider/flagd/proto/*", "tests/**", ] - -[tool.pytest.ini_options] -addopts = [ - "--import-mode=importlib", -] \ No newline at end of file From 378ccc9ead552c047f7ec75653afb54c1f96868d Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Wed, 1 May 2024 12:05:04 +0200 Subject: [PATCH 05/17] fix: event assertions Signed-off-by: Cole Bailey --- .../tests/e2e/conftest.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index d3427ed8..7a85c2f0 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -1,3 +1,4 @@ +import logging import time import typing @@ -213,6 +214,7 @@ def add_event_handler( client: OpenFeatureClient, event_type: ProviderEvent, handles: list ): def handler(event): + logging.info((event_type, event)) handles.append( { "type": event_type, @@ -246,13 +248,14 @@ def assert_handlers( ): poll_interval = 0.05 while max_wait > 0: - if len([h["type"] == event_type for h in handles]) < num_events: + if sum([h["type"] == event_type for h in handles]) < num_events: max_wait -= poll_interval time.sleep(poll_interval) continue break - actual_num_events = len([h["type"] == event_type for h in handles]) + logging.info(f"asserting num({event_type}) >= {num_events}: {handles}") + actual_num_events = sum([h["type"] == event_type for h in handles]) assert ( num_events <= actual_num_events ), f"Expected {num_events} but got {actual_num_events}: {handles}" @@ -290,8 +293,10 @@ def assert_disconnect_handler(handles, event_type: ProviderEvent): extra_types={"ProviderEvent": ProviderEvent}, ) ) -def assert_disconnect_error(handles, event_type: ProviderEvent): - assert_handlers(handles, event_type, max_wait=6, num_events=2) +def assert_disconnect_error(client: OpenFeatureClient, event_type: ProviderEvent): + reconnect_handles = [] + add_event_handler(client, event_type, reconnect_handles) + assert_handlers(reconnect_handles, event_type, max_wait=6) @then(parsers.cfparse('the event details must indicate "{key}" was altered')) From 054e5afdcd55df3f457e69547e227daa340ea2a4 Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Wed, 1 May 2024 15:30:03 +0200 Subject: [PATCH 06/17] add: thread shutdown and cleaner flag store / connector logic Signed-off-by: Cole Bailey --- .../contrib/provider/flagd/provider.py | 4 + .../provider/flagd/resolvers/in_process.py | 18 +++-- .../resolvers/process/connector/__init__.py | 11 +++ .../process/connector/file_watcher.py | 78 ++++++++++--------- .../process/connector/grpc_watcher.py | 59 +++++++------- .../provider/flagd/resolvers/process/flags.py | 49 +++++++----- .../tests/conftest.py | 10 ++- .../tests/e2e/inprocess/file/conftest.py | 17 ++-- .../inprocess/file/test_inprocess_events.py | 6 +- .../tests/test_file_store.py | 12 +-- 10 files changed, 154 insertions(+), 110 deletions(-) create mode 100644 providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 76307475..da80b821 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -77,6 +77,10 @@ def setup_resolver(self) -> AbstractResolver: f"`resolver_type` parameter invalid: {self.config.resolver_type}" ) + def initialize(self, evaluation_context: EvaluationContext) -> None: + if hasattr(self.resolver, "initialize"): + self.resolver.initialize(evaluation_context) + def shutdown(self) -> None: if self.resolver: self.resolver.shutdown() diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py index b3094b57..e0ea96ba 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py @@ -9,8 +9,9 @@ from openfeature.provider.provider import AbstractProvider from ..config import Config -from .process.connector.file_watcher import FileWatcherFlagStore -from .process.connector.grpc_watcher import GrpcWatcherFlagStore +from .process.connector import FlagStateConnector +from .process.connector.file_watcher import FileWatcher +from .process.connector.grpc_watcher import GrpcWatcher from .process.custom_ops import ends_with, fractional, sem_ver, starts_with from .process.flags import FlagStore @@ -29,18 +30,23 @@ class InProcessResolver: def __init__(self, config: Config, provider: AbstractProvider): self.config = config self.provider = provider - self.flag_store: FlagStore = ( - FileWatcherFlagStore( + self.flag_store = FlagStore(provider) + self.connector: FlagStateConnector = ( + FileWatcher( self.config.offline_flag_source_path, self.provider, + self.flag_store, self.config.offline_poll_interval_seconds, ) if self.config.offline_flag_source_path - else GrpcWatcherFlagStore(self.config, self.provider) + else GrpcWatcher(self.config, self.provider, self.flag_store) ) + def initialize(self, evaluation_context: EvaluationContext) -> None: + self.connector.initialize(evaluation_context) + def shutdown(self) -> None: - self.flag_store.shutdown() + self.connector.shutdown() def resolve_boolean_details( self, diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py new file mode 100644 index 00000000..98e6b5df --- /dev/null +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py @@ -0,0 +1,11 @@ +import typing + +from openfeature.evaluation_context import EvaluationContext + + +class FlagStateConnector(typing.Protocol): + def initialize(self, evaluation_context: EvaluationContext) -> None: + pass + + def shutdown(self) -> None: + pass diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py index 6ecdb715..7fccee91 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py @@ -7,20 +7,23 @@ import yaml +from openfeature.evaluation_context import EvaluationContext from openfeature.event import ProviderEventDetails -from openfeature.exception import ParseError +from openfeature.exception import ParseError, ProviderNotReadyError from openfeature.provider.provider import AbstractProvider -from ..flags import Flag, FlagStore +from ..connector import FlagStateConnector +from ..flags import FlagStore logger = logging.getLogger("openfeature.contrib") -class FileWatcherFlagStore(FlagStore): +class FileWatcher(FlagStateConnector): def __init__( self, file_path: str, provider: AbstractProvider, + flag_store: FlagStore, poll_interval_seconds: float = 1.0, ): self.file_path = file_path @@ -28,49 +31,35 @@ def __init__( self.poll_interval_seconds = poll_interval_seconds self.last_modified = 0.0 - self.flag_data: typing.Mapping[str, Flag] = {} - self.load_data() - self.has_error = False + self.flag_store = flag_store + self.emit_ready = False + + def initialize(self, evaluation_context: EvaluationContext) -> None: + self.active = True self.thread = threading.Thread(target=self.refresh_file, daemon=True) self.thread.start() - def shutdown(self) -> None: - pass + # Let this throw exceptions so that provider status is set correctly + try: + self._load_data() + self.emit_ready = True + except Exception as err: + raise ProviderNotReadyError from err - def get_flag(self, key: str) -> typing.Optional[Flag]: - return self.flag_data.get(key) + def shutdown(self) -> None: + self.active = False def refresh_file(self) -> None: - while True: + while self.active: time.sleep(self.poll_interval_seconds) logger.debug("checking for new flag store contents from file") last_modified = os.path.getmtime(self.file_path) if last_modified > self.last_modified: - self.load_data(last_modified) + self.safe_load_data(last_modified) - def load_data(self, modified_time: typing.Optional[float] = None) -> None: + def safe_load_data(self, modified_time: typing.Optional[float] = None) -> None: try: - with open(self.file_path) as file: - if self.file_path.endswith(".yaml"): - data = yaml.safe_load(file) - else: - data = json.load(file) - - self.flag_data = Flag.parse_flags(data) - logger.debug(f"{self.flag_data=}") - - if self.has_error: - self.provider.emit_provider_ready( - ProviderEventDetails( - message="Reloading file contents recovered from error state" - ) - ) - self.has_error = False - - self.provider.emit_provider_configuration_changed( - ProviderEventDetails(flags_changed=list(self.flag_data.keys())) - ) - self.last_modified = modified_time or os.path.getmtime(self.file_path) + self._load_data(modified_time) except FileNotFoundError: self.handle_error("Provided file path not valid") except json.JSONDecodeError: @@ -82,7 +71,26 @@ def load_data(self, modified_time: typing.Optional[float] = None) -> None: except Exception: self.handle_error("Could not read flags from file") + def _load_data(self, modified_time: typing.Optional[float] = None) -> None: + with open(self.file_path) as file: + if self.file_path.endswith(".yaml"): + data = yaml.safe_load(file) + else: + data = json.load(file) + + self.flag_store.update(data) + + if self.emit_ready: + self.provider.emit_provider_ready( + ProviderEventDetails( + message="Reloading file contents recovered from error state" + ) + ) + self.emit_ready = False + + self.last_modified = modified_time or os.path.getmtime(self.file_path) + def handle_error(self, error_message: str) -> None: logger.exception(error_message) - self.has_error = True + self.emit_ready = True self.provider.emit_provider_error(ProviderEventDetails(message=error_message)) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py index 06252bcb..9f4f4bb2 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -2,28 +2,31 @@ import logging import threading import time -import typing import grpc +from openfeature.evaluation_context import EvaluationContext from openfeature.event import ProviderEventDetails -from openfeature.exception import ErrorCode, ParseError +from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError from openfeature.provider.provider import AbstractProvider from ....config import Config from ....proto.flagd.sync.v1 import sync_pb2, sync_pb2_grpc -from ..flags import Flag, FlagStore +from ..connector import FlagStateConnector +from ..flags import FlagStore logger = logging.getLogger("openfeature.contrib") -class GrpcWatcherFlagStore(FlagStore): +class GrpcWatcher(FlagStateConnector): INIT_BACK_OFF = 2 MAX_BACK_OFF = 120 - def __init__(self, config: Config, provider: AbstractProvider): + def __init__( + self, config: Config, provider: AbstractProvider, flag_store: FlagStore + ): self.provider = provider - self.flag_data: typing.Mapping[str, Flag] = {} + self.flag_store = flag_store channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel self.channel = channel_factory(f"{config.host}:{config.port}") self.stub = sync_pb2_grpc.FlagSyncServiceStub(self.channel) @@ -32,6 +35,8 @@ def __init__(self, config: Config, provider: AbstractProvider): # TODO: Add selector + def initialize(self, context: EvaluationContext) -> None: + self.active = True self.thread = threading.Thread(target=self.sync_flags, daemon=True) self.thread.start() @@ -40,25 +45,22 @@ def __init__(self, config: Config, provider: AbstractProvider): # TODO: get deadline from user deadline = 2 + time.time() while not self.connected and time.time() < deadline: - logger.debug("blocking on init") time.sleep(0.05) + logger.debug("Finished blocking gRPC state initialization") if not self.connected: - logger.warning( + raise ProviderNotReadyError( "Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations." ) def shutdown(self) -> None: - pass - - def get_flag(self, key: str) -> typing.Optional[Flag]: - return self.flag_data.get(key) + self.active = False def sync_flags(self) -> None: request = sync_pb2.SyncFlagsRequest() # type:ignore[attr-defined] retry_delay = self.INIT_BACK_OFF - while True: + while self.active: try: logger.debug("Setting up gRPC sync flags connection") for flag_rsp in self.stub.SyncFlags(request): @@ -66,7 +68,7 @@ def sync_flags(self) -> None: logger.debug( f"Received flag configuration - {abs(hash(flag_str)) % (10 ** 8)}" ) - self.flag_data = Flag.parse_flags(json.loads(flag_str)) + self.flag_store.update(json.loads(flag_str)) if not self.connected: self.provider.emit_provider_ready( @@ -77,11 +79,10 @@ def sync_flags(self) -> None: self.connected = True # reset retry delay after successsful read retry_delay = self.INIT_BACK_OFF - - self.provider.emit_provider_configuration_changed( - ProviderEventDetails(flags_changed=list(self.flag_data.keys())) - ) - except grpc.RpcError as e: # noqa: PERF203 + if not self.active: + logger.info("Terminating gRPC sync thread") + return + except grpc.RpcError as e: logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") except json.JSONDecodeError: logger.exception( @@ -91,14 +92,14 @@ def sync_flags(self) -> None: logger.exception( f"Could not parse flag data using flagd syntax: {flag_str=}" ) - finally: - self.connected = False - self.provider.emit_provider_error( - ProviderEventDetails( - message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", - error_code=ErrorCode.GENERAL, - ) + + self.connected = False + self.provider.emit_provider_error( + ProviderEventDetails( + message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", + error_code=ErrorCode.GENERAL, ) - logger.info(f"Reconnecting in {retry_delay}s") - time.sleep(retry_delay) - retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF) + ) + logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") + time.sleep(retry_delay) + retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py index a0e08e05..ffb76b3c 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py @@ -3,15 +3,40 @@ import typing from dataclasses import dataclass +from openfeature.event import ProviderEventDetails from openfeature.exception import ParseError +from openfeature.provider.provider import AbstractProvider -class FlagStore(typing.Protocol): +class FlagStore: + def __init__( + self, + provider: AbstractProvider, + ): + self.provider = provider + self.flags: typing.Mapping[str, "Flag"] = {} + def get_flag(self, key: str) -> typing.Optional["Flag"]: - pass + return self.flags.get(key) + + def update(self, flags_data: dict) -> None: + flags = flags_data.get("flags", {}) + evaluators: typing.Optional[dict] = flags_data.get("$evaluators") + if evaluators: + transposed = json.dumps(flags) + for name, rule in evaluators.items(): + transposed = re.sub( + rf"{{\s*\"\$ref\":\s*\"{name}\"\s*}}", json.dumps(rule), transposed + ) + flags = json.loads(transposed) + + if not isinstance(flags, dict): + raise ParseError("`flags` key of configuration must be a dictionary") + self.flags = {key: Flag.from_dict(key, data) for key, data in flags.items()} - def shutdown(self) -> None: - pass + self.provider.emit_provider_configuration_changed( + ProviderEventDetails(flags_changed=list(self.flags.keys())) + ) @dataclass @@ -59,19 +84,3 @@ def get_variant( variant_key = str(variant_key).lower() return variant_key, self.variants.get(variant_key) - - @classmethod - def parse_flags(cls, flags_data: dict) -> typing.Dict[str, "Flag"]: - flags = flags_data.get("flags", {}) - evaluators: typing.Optional[dict] = flags_data.get("$evaluators") - if evaluators: - transposed = json.dumps(flags) - for name, rule in evaluators.items(): - transposed = re.sub( - rf"{{\s*\"\$ref\":\s*\"{name}\"\s*}}", json.dumps(rule), transposed - ) - flags = json.loads(transposed) - - if not isinstance(flags, dict): - raise ParseError("`flags` key of configuration must be a dictionary") - return {key: Flag.from_dict(key, data) for key, data in flags.items()} diff --git a/providers/openfeature-provider-flagd/tests/conftest.py b/providers/openfeature-provider-flagd/tests/conftest.py index 287f5240..692b84b2 100644 --- a/providers/openfeature-provider-flagd/tests/conftest.py +++ b/providers/openfeature-provider-flagd/tests/conftest.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import pytest @@ -8,13 +9,14 @@ @pytest.fixture() def flagd_provider_client(): - api.set_provider(FlagdProvider()) - return api.get_client() + provider = FlagdProvider() + api.set_provider(provider) + yield api.get_client() + provider.shutdown() def setup_flag_file(base_dir: str, flag_file: str) -> str: - with open(f"test-harness/flags/{flag_file}") as src_file: - contents = src_file.read() + contents = (Path(__file__).parent / "../test-harness/flags" / flag_file).read_text() dst_path = os.path.join(base_dir, flag_file) with open(dst_path, "w") as dst_file: dst_file.write(contents) diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py index 9cf5be0c..81831c69 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/conftest.py @@ -1,18 +1,17 @@ from pytest_bdd import given from openfeature import api -from openfeature.client import OpenFeatureClient from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.contrib.provider.flagd.config import ResolverType @given("a flagd provider is set", target_fixture="client") -def setup_provider(flag_file) -> OpenFeatureClient: - api.set_provider( - FlagdProvider( - resolver_type=ResolverType.IN_PROCESS, - offline_flag_source_path=flag_file, - offline_poll_interval_seconds=0.1, - ) +def setup_provider(flag_file): + provider = FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, + offline_flag_source_path=flag_file, + offline_poll_interval_seconds=0.1, ) - return api.get_client() + api.set_provider(provider) + yield api.get_client() + provider.shutdown() diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py index 2c78f882..58603dcc 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/file/test_inprocess_events.py @@ -1,5 +1,6 @@ import logging import time +from pathlib import Path import pytest from pytest_bdd import parsers, scenario, when @@ -27,7 +28,8 @@ def flag_file(tmp_path): @when(parsers.cfparse('a flag with key "{key}" is modified')) def modify_flag(flag_file, key): time.sleep(0.1) # guard against race condition - with open("test-harness/flags/changing-flag-foo.json") as src_file: - contents = src_file.read() + contents = ( + Path(__file__).parent / "../../../../test-harness/flags/changing-flag-foo.json" + ).read_text() with open(flag_file, "w") as f: f.write(contents) diff --git a/providers/openfeature-provider-flagd/tests/test_file_store.py b/providers/openfeature-provider-flagd/tests/test_file_store.py index b91f1628..f44c9b8c 100644 --- a/providers/openfeature-provider-flagd/tests/test_file_store.py +++ b/providers/openfeature-provider-flagd/tests/test_file_store.py @@ -5,9 +5,9 @@ from openfeature import api from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.contrib.provider.flagd.resolvers.process.connector.file_watcher import ( - FileWatcherFlagStore, + FileWatcher, ) -from openfeature.contrib.provider.flagd.resolvers.process.flags import Flag +from openfeature.contrib.provider.flagd.resolvers.process.flags import Flag, FlagStore from openfeature.provider.provider import AbstractProvider @@ -23,11 +23,13 @@ def create_client(provider: FlagdProvider): "basic-flag.yaml", ], ) -def test_file_load_errors(file_name: str): +def test_file_load(file_name: str): provider = Mock(spec=AbstractProvider) - file_store = FileWatcherFlagStore(f"tests/flags/{file_name}", provider) + flag_store = FlagStore(provider) + file_watcher = FileWatcher(f"tests/flags/{file_name}", provider, flag_store) + file_watcher.initialize(None) - flag = file_store.flag_data.get("basic-flag") + flag = flag_store.get_flag("basic-flag") assert flag is not None assert isinstance(flag, Flag) From 19a29e5d8f64872c3142fd80168ea5421ff8b796 Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Wed, 1 May 2024 16:05:47 +0200 Subject: [PATCH 07/17] add: selector and sync init timeout Signed-off-by: Cole Bailey --- .../contrib/provider/flagd/config.py | 10 +++++++ .../contrib/provider/flagd/provider.py | 4 +++ .../process/connector/grpc_watcher.py | 16 +++++------ .../tests/test_errors.py | 27 +++++++++++++++++++ 4 files changed, 48 insertions(+), 9 deletions(-) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index a95c3153..326b10b6 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -30,6 +30,8 @@ def __init__( # noqa: PLR0913 port: typing.Optional[int] = None, tls: typing.Optional[bool] = None, timeout: typing.Optional[int] = None, + retry_backoff_seconds: typing.Optional[float] = None, + selector: typing.Optional[str] = None, resolver_type: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, offline_poll_interval_seconds: typing.Optional[float] = None, @@ -42,6 +44,14 @@ def __init__( # noqa: PLR0913 env_or_default("FLAGD_TLS", False, cast=str_to_bool) if tls is None else tls ) self.timeout = 5 if timeout is None else timeout + self.retry_backoff_seconds: float = ( + float(env_or_default("FLAGD_RETRY_BACKOFF_SECONDS", 2.0)) + if retry_backoff_seconds is None + else retry_backoff_seconds + ) + self.selector = ( + env_or_default("FLAGD_SELECTOR", None) if selector is None else selector + ) self.resolver_type = ( ResolverType(env_or_default("FLAGD_RESOLVER_TYPE", "grpc")) if resolver_type is None diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index da80b821..2ba4945a 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -43,6 +43,8 @@ def __init__( # noqa: PLR0913 port: typing.Optional[int] = None, tls: typing.Optional[bool] = None, timeout: typing.Optional[int] = None, + retry_backoff_seconds: typing.Optional[float] = None, + selector: typing.Optional[str] = None, resolver_type: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, offline_poll_interval_seconds: typing.Optional[float] = None, @@ -60,6 +62,8 @@ def __init__( # noqa: PLR0913 port=port, tls=tls, timeout=timeout, + retry_backoff_seconds=retry_backoff_seconds, + selector=selector, resolver_type=resolver_type, offline_flag_source_path=offline_flag_source_path, offline_poll_interval_seconds=offline_poll_interval_seconds, diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py index 9f4f4bb2..debc69ac 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -19,7 +19,6 @@ class GrpcWatcher(FlagStateConnector): - INIT_BACK_OFF = 2 MAX_BACK_OFF = 120 def __init__( @@ -30,20 +29,19 @@ def __init__( channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel self.channel = channel_factory(f"{config.host}:{config.port}") self.stub = sync_pb2_grpc.FlagSyncServiceStub(self.channel) + self.timeout = config.timeout + self.retry_backoff_seconds = config.retry_backoff_seconds + self.selector = config.selector self.connected = False - # TODO: Add selector - def initialize(self, context: EvaluationContext) -> None: self.active = True self.thread = threading.Thread(target=self.sync_flags, daemon=True) self.thread.start() ## block until ready or deadline reached - - # TODO: get deadline from user - deadline = 2 + time.time() + deadline = self.timeout + time.time() while not self.connected and time.time() < deadline: time.sleep(0.05) logger.debug("Finished blocking gRPC state initialization") @@ -57,9 +55,9 @@ def shutdown(self) -> None: self.active = False def sync_flags(self) -> None: - request = sync_pb2.SyncFlagsRequest() # type:ignore[attr-defined] + request = sync_pb2.SyncFlagsRequest(selector=self.selector) # type:ignore[attr-defined] - retry_delay = self.INIT_BACK_OFF + retry_delay = self.retry_backoff_seconds while self.active: try: logger.debug("Setting up gRPC sync flags connection") @@ -78,7 +76,7 @@ def sync_flags(self) -> None: ) self.connected = True # reset retry delay after successsful read - retry_delay = self.INIT_BACK_OFF + retry_delay = self.retry_backoff_seconds if not self.active: logger.info("Terminating gRPC sync thread") return diff --git a/providers/openfeature-provider-flagd/tests/test_errors.py b/providers/openfeature-provider-flagd/tests/test_errors.py index 4adb332e..39ed91e2 100644 --- a/providers/openfeature-provider-flagd/tests/test_errors.py +++ b/providers/openfeature-provider-flagd/tests/test_errors.py @@ -1,9 +1,12 @@ +import time + import pytest from openfeature import api from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.contrib.provider.flagd.config import ResolverType from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEvent from openfeature.exception import ErrorCode from openfeature.flag_evaluation import Reason @@ -77,3 +80,27 @@ def test_flag_disabled(): assert res.value == "fallback" assert res.reason == Reason.DISABLED + + +@pytest.mark.parametrize("wait", (0.5, 0.25)) +def test_grpc_sync_fail_deadline(wait: float): + init_failed = False + + def fail(*args, **kwargs): + nonlocal init_failed + init_failed = True + + api.get_client().add_handler(ProviderEvent.PROVIDER_ERROR, fail) + + t = time.time() + api.set_provider( + FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, + port=99999, # dead port to test failure + timeout=wait, + ) + ) + + elapsed = time.time() - t + assert abs(elapsed - wait) < 0.1 + assert init_failed From cc14759772b1f41c3c971e3249826185849fbc1c Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Wed, 1 May 2024 16:20:40 +0200 Subject: [PATCH 08/17] fix: noisy test due to insufficient wait time Signed-off-by: Cole Bailey --- providers/openfeature-provider-flagd/tests/e2e/conftest.py | 3 ++- .../tests/e2e/inprocess/grpc/conftest.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index 7a85c2f0..5de9923f 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -284,7 +284,8 @@ def assert_handler_run(handles, event_type: ProviderEvent): ) ) def assert_disconnect_handler(handles, event_type: ProviderEvent): - assert_handlers(handles, event_type, max_wait=6) + # docker sync upstream restarts every 5s, waiting 2 cycles reduces test noise + assert_handlers(handles, event_type, max_wait=10) @then( diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py index ef6c4638..4ddd18a0 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py @@ -20,6 +20,8 @@ def setup_provider(port: int) -> OpenFeatureClient: FlagdProvider( resolver_type=ResolverType.IN_PROCESS, port=port, + timeout=0.1, + retry_backoff_seconds=0.1, ) ) return api.get_client() From 8088a6f16459a3947b9e57745a3c59e4bc78bc2e Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Wed, 1 May 2024 16:29:19 +0200 Subject: [PATCH 09/17] docs: configuration options in readme Signed-off-by: Cole Bailey --- .../openfeature-provider-flagd/README.md | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/providers/openfeature-provider-flagd/README.md b/providers/openfeature-provider-flagd/README.md index aa63c55c..0782ceb5 100644 --- a/providers/openfeature-provider-flagd/README.md +++ b/providers/openfeature-provider-flagd/README.md @@ -19,6 +19,19 @@ from openfeature.contrib.provider.flagd import FlagdProvider api.set_provider(FlagdProvider()) ``` + +To use in-process evaluation with flagd gRPC sync service: + +```python +from openfeature import api +from openfeature.contrib.provider.flagd import FlagdProvider +from openfeature.contrib.provider.flagd.config import ResolverType + +api.set_provider(FlagdProvider( + resolver_type=ResolverType.IN_PROCESS, +)) +``` + To use in-process evaluation in offline mode with a file as source: ```python @@ -36,12 +49,17 @@ api.set_provider(FlagdProvider( The default options can be defined in the FlagdProvider constructor. -| Option name | Type & Values | Default | -|----------------|---------------|-----------| -| host | str | localhost | -| port | int | 8013 | -| schema | str | http | -| timeout | int | 2 | +| Option name | Type & Values | Default | +|-------------------------------|---------------|-----------| +| resolver_type | enum | grpc | +| host | str | localhost | +| port | int | 8013 | +| tls | bool | false | +| timeout | int | 5 | +| retry_backoff_seconds | float | 2.0 | +| selector | str | None | +| offline_flag_source_path | str | None | +| offline_poll_interval_seconds | float | 1.0 | ## License From e86984bf9071b5b7b92dccc85de1b632af07341a Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Wed, 1 May 2024 20:52:04 +0200 Subject: [PATCH 10/17] tests: edge cases for grpc response parsing Signed-off-by: Cole Bailey --- .../resolvers/process/connector/__init__.py | 8 ++-- .../process/connector/file_watcher.py | 10 ++--- .../provider/flagd/resolvers/process/flags.py | 14 ++++--- .../tests/e2e/inprocess/grpc/conftest.py | 2 +- .../tests/test_grpc_sync_connector.py | 41 +++++++++++++++++++ 5 files changed, 60 insertions(+), 15 deletions(-) create mode 100644 providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py index 98e6b5df..07d49241 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/__init__.py @@ -4,8 +4,8 @@ class FlagStateConnector(typing.Protocol): - def initialize(self, evaluation_context: EvaluationContext) -> None: - pass + def initialize( + self, evaluation_context: EvaluationContext + ) -> None: ... # pragma: no cover - def shutdown(self) -> None: - pass + def shutdown(self) -> None: ... # pragma: no cover diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py index 7fccee91..22c3004e 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py @@ -53,13 +53,13 @@ def refresh_file(self) -> None: while self.active: time.sleep(self.poll_interval_seconds) logger.debug("checking for new flag store contents from file") - last_modified = os.path.getmtime(self.file_path) - if last_modified > self.last_modified: - self.safe_load_data(last_modified) + self.safe_load_data() - def safe_load_data(self, modified_time: typing.Optional[float] = None) -> None: + def safe_load_data(self) -> None: try: - self._load_data(modified_time) + last_modified = os.path.getmtime(self.file_path) + if last_modified > self.last_modified: + self._load_data(last_modified) except FileNotFoundError: self.handle_error("Provided file path not valid") except json.JSONDecodeError: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py index ffb76b3c..c8fb6181 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py @@ -67,11 +67,15 @@ def __post_init__(self) -> None: @classmethod def from_dict(cls, key: str, data: dict) -> "Flag": - data["default_variant"] = data["defaultVariant"] - del data["defaultVariant"] - flag = cls(key=key, **data) - - return flag + if "defaultVariant" in data: + data["default_variant"] = data["defaultVariant"] + del data["defaultVariant"] + + try: + flag = cls(key=key, **data) + return flag + except Exception as err: + raise ParseError from err @property def default(self) -> typing.Tuple[str, typing.Any]: diff --git a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py index 4ddd18a0..3c15b0c8 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/inprocess/grpc/conftest.py @@ -20,7 +20,7 @@ def setup_provider(port: int) -> OpenFeatureClient: FlagdProvider( resolver_type=ResolverType.IN_PROCESS, port=port, - timeout=0.1, + timeout=0.5, retry_backoff_seconds=0.1, ) ) diff --git a/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py b/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py new file mode 100644 index 00000000..00d08a20 --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py @@ -0,0 +1,41 @@ +import time +from unittest.mock import MagicMock, patch + +import pytest + +from openfeature.contrib.provider.flagd.config import Config +from openfeature.contrib.provider.flagd.resolvers.process.connector.grpc_watcher import ( + GrpcWatcher, +) +from openfeature.contrib.provider.flagd.resolvers.process.flags import FlagStore +from openfeature.exception import ProviderNotReadyError + + +def fake_grpc_service(flag_config: str, keepalive: int = 100): + def sync(request): + mock_resp = MagicMock() + mock_resp.flag_configuration = flag_config + yield mock_resp + time.sleep(keepalive) + + return sync + + +@pytest.mark.parametrize( + "flag_configuration", + ( + """{"flags": {"a-flag": {"garbage": "can"}}}""", + """not even a JSON""", + """{"flags": {"no-default": {"state": "ENABLED", "variants": {}}}}""", + ), +) +def test_invalid_payload(flag_configuration: str): + fake_provider = MagicMock() + flag_store = FlagStore(fake_provider) + watcher = GrpcWatcher(Config(timeout=0.2), fake_provider, flag_store) + + fake_sync_flags = fake_grpc_service(flag_configuration) + with patch.object(watcher.stub, "SyncFlags", fake_sync_flags), pytest.raises( + ProviderNotReadyError + ): + watcher.initialize(None) From b8fa382b9e0d0ced1841cf37e041b787174318ba Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Fri, 3 May 2024 09:16:06 +0200 Subject: [PATCH 11/17] Add env var configs to readme Signed-off-by: Cole Bailey --- .../openfeature-provider-flagd/README.md | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/providers/openfeature-provider-flagd/README.md b/providers/openfeature-provider-flagd/README.md index 0782ceb5..43271f55 100644 --- a/providers/openfeature-provider-flagd/README.md +++ b/providers/openfeature-provider-flagd/README.md @@ -49,17 +49,17 @@ api.set_provider(FlagdProvider( The default options can be defined in the FlagdProvider constructor. -| Option name | Type & Values | Default | -|-------------------------------|---------------|-----------| -| resolver_type | enum | grpc | -| host | str | localhost | -| port | int | 8013 | -| tls | bool | false | -| timeout | int | 5 | -| retry_backoff_seconds | float | 2.0 | -| selector | str | None | -| offline_flag_source_path | str | None | -| offline_poll_interval_seconds | float | 1.0 | +| Option name | Environment Variable Name | Type & Values | Default | +|-------------------------------|-------------------------------------|----------------|-----------| +| resolver_type | FLAGD_RESOLVER_TYPE | enum | grpc | +| host | FLAGD_HOST | str | localhost | +| port | FLAGD_PORT | int | 8013 | +| tls | FLAGD_TLS | bool | false | +| timeout | | int | 5 | +| retry_backoff_seconds | FLAGD_RETRY_BACKOFF_SECONDS | float | 2.0 | +| selector | FLAGD_SELECTOR | str | None | +| offline_flag_source_path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | str | None | +| offline_poll_interval_seconds | FLAGD_OFFLINE_POLL_INTERVAL_SECONDS | float | 1.0 | ## License From 80419a7a701e9b2f51486f956057a412a6f93793 Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Sat, 4 May 2024 14:25:42 +0200 Subject: [PATCH 12/17] fix: improve naming of emit flag Signed-off-by: Cole Bailey --- .../flagd/resolvers/process/connector/file_watcher.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py index 22c3004e..db0e41f2 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py @@ -32,7 +32,7 @@ def __init__( self.last_modified = 0.0 self.flag_store = flag_store - self.emit_ready = False + self.should_emit_ready_on_success = False def initialize(self, evaluation_context: EvaluationContext) -> None: self.active = True @@ -42,7 +42,7 @@ def initialize(self, evaluation_context: EvaluationContext) -> None: # Let this throw exceptions so that provider status is set correctly try: self._load_data() - self.emit_ready = True + self.should_emit_ready_on_success = True except Exception as err: raise ProviderNotReadyError from err @@ -80,17 +80,17 @@ def _load_data(self, modified_time: typing.Optional[float] = None) -> None: self.flag_store.update(data) - if self.emit_ready: + if self.should_emit_ready_on_success: self.provider.emit_provider_ready( ProviderEventDetails( message="Reloading file contents recovered from error state" ) ) - self.emit_ready = False + self.should_emit_ready_on_success = False self.last_modified = modified_time or os.path.getmtime(self.file_path) def handle_error(self, error_message: str) -> None: logger.exception(error_message) - self.emit_ready = True + self.should_emit_ready_on_success = True self.provider.emit_provider_error(ProviderEventDetails(message=error_message)) From 5782c098f53c0971b378a008579b83c6d1b27cd6 Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Sat, 4 May 2024 14:31:17 +0200 Subject: [PATCH 13/17] docs: update CONTRIBUTING details Signed-off-by: Cole Bailey --- CONTRIBUTING.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3f9c9096..a6188125 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -26,7 +26,15 @@ We use `pytest` for our unit testing, making use of `parametrized` to inject cas ### Integration tests -These are planned once the SDK has been stabilized and a Flagd provider implemented. At that point, we will utilize the [gherkin integration tests](https://github.com/open-feature/test-harness/blob/main/features/evaluation.feature) to validate against a live, seeded Flagd instance. +The Flagd provider utilizes the [gherkin integration tests](https://github.com/open-feature/test-harness/blob/main/features/evaluation.feature) to validate against a live, seeded Flagd instance. + +To run the integration tests: + +```bash +cd providers/openfeature-provider-flagd +docker-compose up -d # this runs the flagd sidecars +hatch run test +``` ## Pull Request @@ -62,7 +70,7 @@ To start working on a new feature or bugfix, create a new branch and start worki ```bash git checkout -b feat/NAME_OF_FEATURE # Make your changes -git commit +git commit -s -m "feat: my feature" git push fork feat/NAME_OF_FEATURE ``` From 1b25540d4f876acc7959f23c369a0dcc32e3a092 Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Thu, 20 Jun 2024 20:46:34 +0200 Subject: [PATCH 14/17] add: worker thread names Signed-off-by: Cole Bailey --- .../flagd/resolvers/process/connector/file_watcher.py | 4 +++- .../flagd/resolvers/process/connector/grpc_watcher.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py index db0e41f2..a33601ce 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py @@ -36,7 +36,9 @@ def __init__( def initialize(self, evaluation_context: EvaluationContext) -> None: self.active = True - self.thread = threading.Thread(target=self.refresh_file, daemon=True) + self.thread = threading.Thread( + target=self.refresh_file, daemon=True, name="FlagdFileWatcherWorkerThread" + ) self.thread.start() # Let this throw exceptions so that provider status is set correctly diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py index debc69ac..9afbd6ab 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -37,7 +37,9 @@ def __init__( def initialize(self, context: EvaluationContext) -> None: self.active = True - self.thread = threading.Thread(target=self.sync_flags, daemon=True) + self.thread = threading.Thread( + target=self.sync_flags, daemon=True, name="FlagdGrpcSyncWorkerThread" + ) self.thread.start() ## block until ready or deadline reached From f5e89c08e01eca8f298e138b773e51562768dffa Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Thu, 20 Jun 2024 21:06:38 +0200 Subject: [PATCH 15/17] fix: remove hasattr usage Signed-off-by: Cole Bailey --- .../src/openfeature/contrib/provider/flagd/provider.py | 3 +-- .../openfeature/contrib/provider/flagd/resolvers/__init__.py | 3 +++ .../src/openfeature/contrib/provider/flagd/resolvers/grpc.py | 3 ++- providers/openfeature-provider-flagd/test-harness | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 2ba4945a..15c86f67 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -82,8 +82,7 @@ def setup_resolver(self) -> AbstractResolver: ) def initialize(self, evaluation_context: EvaluationContext) -> None: - if hasattr(self.resolver, "initialize"): - self.resolver.initialize(evaluation_context) + self.resolver.initialize(evaluation_context) def shutdown(self) -> None: if self.resolver: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py index 53e17938..1b77c0c8 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py @@ -10,6 +10,9 @@ class AbstractResolver(Protocol): + def initialize(self, evaluation_context: EvaluationContext) -> None: + return + def shutdown(self) -> None: ... def resolve_boolean_details( diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index caab101a..c2f98522 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -16,11 +16,12 @@ from ..config import Config from ..flag_type import FlagType from ..proto.schema.v1 import schema_pb2, schema_pb2_grpc +from . import AbstractResolver T = typing.TypeVar("T") -class GrpcResolver: +class GrpcResolver(AbstractResolver): def __init__(self, config: Config): self.config = config channel_factory = ( diff --git a/providers/openfeature-provider-flagd/test-harness b/providers/openfeature-provider-flagd/test-harness index 6197b3d9..c9e0be36 160000 --- a/providers/openfeature-provider-flagd/test-harness +++ b/providers/openfeature-provider-flagd/test-harness @@ -1 +1 @@ -Subproject commit 6197b3d956d358bf662e5b8e0aebdc4800480f6b +Subproject commit c9e0be36e89ad33aa99b8e32b40d67e9bf350f88 From 45e37391fcf182910153b149501f4cb0079cb751 Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Thu, 20 Jun 2024 21:40:41 +0200 Subject: [PATCH 16/17] fix: circular dependency Signed-off-by: Cole Bailey --- .../provider/flagd/resolvers/__init__.py | 51 +------------------ .../contrib/provider/flagd/resolvers/grpc.py | 2 +- .../provider/flagd/resolvers/protocol.py | 48 +++++++++++++++++ 3 files changed, 50 insertions(+), 51 deletions(-) create mode 100644 providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/protocol.py diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py index 1b77c0c8..f539de8f 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py @@ -1,54 +1,5 @@ -import typing - -from typing_extensions import Protocol - -from openfeature.evaluation_context import EvaluationContext -from openfeature.flag_evaluation import FlagResolutionDetails - from .grpc import GrpcResolver from .in_process import InProcessResolver - - -class AbstractResolver(Protocol): - def initialize(self, evaluation_context: EvaluationContext) -> None: - return - - def shutdown(self) -> None: ... - - def resolve_boolean_details( - self, - key: str, - default_value: bool, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[bool]: ... - - def resolve_string_details( - self, - key: str, - default_value: str, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[str]: ... - - def resolve_float_details( - self, - key: str, - default_value: float, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[float]: ... - - def resolve_integer_details( - self, - key: str, - default_value: int, - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[int]: ... - - def resolve_object_details( - self, - key: str, - default_value: typing.Union[dict, list], - evaluation_context: typing.Optional[EvaluationContext] = None, - ) -> FlagResolutionDetails[typing.Union[dict, list]]: ... - +from .protocol import AbstractResolver __all__ = ["AbstractResolver", "GrpcResolver", "InProcessResolver"] diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index c2f98522..ca697cf3 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -16,7 +16,7 @@ from ..config import Config from ..flag_type import FlagType from ..proto.schema.v1 import schema_pb2, schema_pb2_grpc -from . import AbstractResolver +from .protocol import AbstractResolver T = typing.TypeVar("T") diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/protocol.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/protocol.py new file mode 100644 index 00000000..a6d70fba --- /dev/null +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/protocol.py @@ -0,0 +1,48 @@ +import typing + +from typing_extensions import Protocol + +from openfeature.evaluation_context import EvaluationContext +from openfeature.flag_evaluation import FlagResolutionDetails + + +class AbstractResolver(Protocol): + def initialize(self, evaluation_context: EvaluationContext) -> None: + return + + def shutdown(self) -> None: ... + + def resolve_boolean_details( + self, + key: str, + default_value: bool, + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[bool]: ... + + def resolve_string_details( + self, + key: str, + default_value: str, + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[str]: ... + + def resolve_float_details( + self, + key: str, + default_value: float, + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[float]: ... + + def resolve_integer_details( + self, + key: str, + default_value: int, + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[int]: ... + + def resolve_object_details( + self, + key: str, + default_value: typing.Union[dict, list], + evaluation_context: typing.Optional[EvaluationContext] = None, + ) -> FlagResolutionDetails[typing.Union[dict, list]]: ... From e9cd2ed3a96d6533d88a893deb6a511eb45fe546 Mon Sep 17 00:00:00 2001 From: Cole Bailey Date: Thu, 20 Jun 2024 22:03:08 +0200 Subject: [PATCH 17/17] fix: AbstractProvider dependency everywhere Signed-off-by: Cole Bailey --- .../contrib/provider/flagd/provider.py | 13 +++++++++- .../provider/flagd/resolvers/in_process.py | 25 ++++++++++++++----- .../process/connector/file_watcher.py | 11 ++++---- .../process/connector/grpc_watcher.py | 15 +++++++---- .../provider/flagd/resolvers/process/flags.py | 9 ++++--- .../tests/test_file_store.py | 11 +++++--- .../tests/test_grpc_sync_connector.py | 10 +++++--- 7 files changed, 66 insertions(+), 28 deletions(-) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 15c86f67..dca91821 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -24,6 +24,7 @@ import typing from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEventDetails from openfeature.flag_evaluation import FlagResolutionDetails from openfeature.provider.metadata import Metadata from openfeature.provider.provider import AbstractProvider @@ -75,7 +76,12 @@ def setup_resolver(self) -> AbstractResolver: if self.config.resolver_type == ResolverType.GRPC: return GrpcResolver(self.config) elif self.config.resolver_type == ResolverType.IN_PROCESS: - return InProcessResolver(self.config, self) + return InProcessResolver( + self.config, + self.emit_provider_ready, + self.emit_provider_error, + self.emit_provider_configuration_changed, + ) else: raise ValueError( f"`resolver_type` parameter invalid: {self.config.resolver_type}" @@ -92,6 +98,11 @@ def get_metadata(self) -> Metadata: """Returns provider metadata""" return Metadata(name="FlagdProvider") + def flag_store_updated_callback(self, flag_keys: typing.List[str]) -> None: + self.emit_provider_configuration_changed( + ProviderEventDetails(flags_changed=flag_keys) + ) + def resolve_boolean_details( self, key: str, diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py index e0ea96ba..37c1608a 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py @@ -4,9 +4,9 @@ from json_logic import builtins, jsonLogic # type: ignore[import-untyped] from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEventDetails from openfeature.exception import FlagNotFoundError, ParseError from openfeature.flag_evaluation import FlagResolutionDetails, Reason -from openfeature.provider.provider import AbstractProvider from ..config import Config from .process.connector import FlagStateConnector @@ -27,19 +27,32 @@ class InProcessResolver: "sem_ver": sem_ver, } - def __init__(self, config: Config, provider: AbstractProvider): + def __init__( + self, + config: Config, + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], + emit_provider_error: typing.Callable[[ProviderEventDetails], None], + emit_provider_configuration_changed: typing.Callable[ + [ProviderEventDetails], None + ], + ): self.config = config - self.provider = provider - self.flag_store = FlagStore(provider) + self.flag_store = FlagStore(emit_provider_configuration_changed) self.connector: FlagStateConnector = ( FileWatcher( self.config.offline_flag_source_path, - self.provider, self.flag_store, + emit_provider_ready, + emit_provider_error, self.config.offline_poll_interval_seconds, ) if self.config.offline_flag_source_path - else GrpcWatcher(self.config, self.provider, self.flag_store) + else GrpcWatcher( + self.config, + self.flag_store, + emit_provider_ready, + emit_provider_error, + ) ) def initialize(self, evaluation_context: EvaluationContext) -> None: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py index a33601ce..934c4332 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/file_watcher.py @@ -10,7 +10,6 @@ from openfeature.evaluation_context import EvaluationContext from openfeature.event import ProviderEventDetails from openfeature.exception import ParseError, ProviderNotReadyError -from openfeature.provider.provider import AbstractProvider from ..connector import FlagStateConnector from ..flags import FlagStore @@ -22,12 +21,14 @@ class FileWatcher(FlagStateConnector): def __init__( self, file_path: str, - provider: AbstractProvider, flag_store: FlagStore, + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], + emit_provider_error: typing.Callable[[ProviderEventDetails], None], poll_interval_seconds: float = 1.0, ): self.file_path = file_path - self.provider = provider + self.emit_provider_ready = emit_provider_ready + self.emit_provider_error = emit_provider_error self.poll_interval_seconds = poll_interval_seconds self.last_modified = 0.0 @@ -83,7 +84,7 @@ def _load_data(self, modified_time: typing.Optional[float] = None) -> None: self.flag_store.update(data) if self.should_emit_ready_on_success: - self.provider.emit_provider_ready( + self.emit_provider_ready( ProviderEventDetails( message="Reloading file contents recovered from error state" ) @@ -95,4 +96,4 @@ def _load_data(self, modified_time: typing.Optional[float] = None) -> None: def handle_error(self, error_message: str) -> None: logger.exception(error_message) self.should_emit_ready_on_success = True - self.provider.emit_provider_error(ProviderEventDetails(message=error_message)) + self.emit_provider_error(ProviderEventDetails(message=error_message)) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py index 9afbd6ab..f6ea3ecf 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -2,13 +2,13 @@ import logging import threading import time +import typing import grpc from openfeature.evaluation_context import EvaluationContext from openfeature.event import ProviderEventDetails from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError -from openfeature.provider.provider import AbstractProvider from ....config import Config from ....proto.flagd.sync.v1 import sync_pb2, sync_pb2_grpc @@ -22,9 +22,12 @@ class GrpcWatcher(FlagStateConnector): MAX_BACK_OFF = 120 def __init__( - self, config: Config, provider: AbstractProvider, flag_store: FlagStore + self, + config: Config, + flag_store: FlagStore, + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], + emit_provider_error: typing.Callable[[ProviderEventDetails], None], ): - self.provider = provider self.flag_store = flag_store channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel self.channel = channel_factory(f"{config.host}:{config.port}") @@ -32,6 +35,8 @@ def __init__( self.timeout = config.timeout self.retry_backoff_seconds = config.retry_backoff_seconds self.selector = config.selector + self.emit_provider_ready = emit_provider_ready + self.emit_provider_error = emit_provider_error self.connected = False @@ -71,7 +76,7 @@ def sync_flags(self) -> None: self.flag_store.update(json.loads(flag_str)) if not self.connected: - self.provider.emit_provider_ready( + self.emit_provider_ready( ProviderEventDetails( message="gRPC sync connection established" ) @@ -94,7 +99,7 @@ def sync_flags(self) -> None: ) self.connected = False - self.provider.emit_provider_error( + self.emit_provider_error( ProviderEventDetails( message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", error_code=ErrorCode.GENERAL, diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py index c8fb6181..889edac7 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/flags.py @@ -5,15 +5,16 @@ from openfeature.event import ProviderEventDetails from openfeature.exception import ParseError -from openfeature.provider.provider import AbstractProvider class FlagStore: def __init__( self, - provider: AbstractProvider, + emit_provider_configuration_changed: typing.Callable[ + [ProviderEventDetails], None + ], ): - self.provider = provider + self.emit_provider_configuration_changed = emit_provider_configuration_changed self.flags: typing.Mapping[str, "Flag"] = {} def get_flag(self, key: str) -> typing.Optional["Flag"]: @@ -34,7 +35,7 @@ def update(self, flags_data: dict) -> None: raise ParseError("`flags` key of configuration must be a dictionary") self.flags = {key: Flag.from_dict(key, data) for key, data in flags.items()} - self.provider.emit_provider_configuration_changed( + self.emit_provider_configuration_changed( ProviderEventDetails(flags_changed=list(self.flags.keys())) ) diff --git a/providers/openfeature-provider-flagd/tests/test_file_store.py b/providers/openfeature-provider-flagd/tests/test_file_store.py index f44c9b8c..12a1d976 100644 --- a/providers/openfeature-provider-flagd/tests/test_file_store.py +++ b/providers/openfeature-provider-flagd/tests/test_file_store.py @@ -8,7 +8,6 @@ FileWatcher, ) from openfeature.contrib.provider.flagd.resolvers.process.flags import Flag, FlagStore -from openfeature.provider.provider import AbstractProvider def create_client(provider: FlagdProvider): @@ -24,9 +23,13 @@ def create_client(provider: FlagdProvider): ], ) def test_file_load(file_name: str): - provider = Mock(spec=AbstractProvider) - flag_store = FlagStore(provider) - file_watcher = FileWatcher(f"tests/flags/{file_name}", provider, flag_store) + emit_provider_configuration_changed = Mock() + emit_provider_ready = Mock() + emit_provider_error = Mock() + flag_store = FlagStore(emit_provider_configuration_changed) + file_watcher = FileWatcher( + f"tests/flags/{file_name}", flag_store, emit_provider_ready, emit_provider_error + ) file_watcher.initialize(None) flag = flag_store.get_flag("basic-flag") diff --git a/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py b/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py index 00d08a20..e68e7178 100644 --- a/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py +++ b/providers/openfeature-provider-flagd/tests/test_grpc_sync_connector.py @@ -30,9 +30,13 @@ def sync(request): ), ) def test_invalid_payload(flag_configuration: str): - fake_provider = MagicMock() - flag_store = FlagStore(fake_provider) - watcher = GrpcWatcher(Config(timeout=0.2), fake_provider, flag_store) + emit_provider_configuration_changed = MagicMock() + emit_provider_ready = MagicMock() + emit_provider_error = MagicMock() + flag_store = FlagStore(emit_provider_configuration_changed) + watcher = GrpcWatcher( + Config(timeout=0.2), flag_store, emit_provider_ready, emit_provider_error + ) fake_sync_flags = fake_grpc_service(flag_configuration) with patch.object(watcher.stub, "SyncFlags", fake_sync_flags), pytest.raises(