Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add grpc sync flag store #84

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions providers/openfeature-provider-flagd/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
services:
flagd:
build:
context: test-harness
dockerfile: flagd/Dockerfile
ports:
- 8013:8013
flagd-unstable:
build:
context: test-harness
dockerfile: flagd/Dockerfile.unstable
ports:
- 8014:8013
flagd-sync:
build:
context: test-harness
dockerfile: sync/Dockerfile
ports:
- 9090:9090
flagd-sync-unstable:
build:
context: test-harness
dockerfile: sync/Dockerfile.unstable
ports:
- 9091:9090
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure this file is excluded from published package. We can do so from the pyproject.toml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Checking this now it also includes tests and test-harness etc. Should we simplify to only include src?

[tool.hatch.build.targets.sdist]
include = [
  "src",
]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have to check the generated build to confirm. If you can please make this change in a separate PR.

5 changes: 5 additions & 0 deletions providers/openfeature-provider-flagd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ omit = [
"src/openfeature/contrib/provider/flagd/proto/*",
"tests/**",
]

[tool.pytest.ini_options]
addopts = [
"--import-mode=importlib",
]
colebaileygit marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from flagd.evaluation.v1 import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
from . import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes automated after grpc code generation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I actually didn't notice that script. When I try to run it I get this file not found error, does it need to be fixed and maybe documented in the contributing guide?

Failure: could not read file schemas/protobuf/buf.gen.python.yaml: open schemas/protobuf/buf.gen.python.yaml: no such file or directory



class ServiceStub(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from flagd.sync.v1 import sync_pb2 as flagd_dot_sync_dot_v1_dot_sync__pb2
from . import sync_pb2 as flagd_dot_sync_dot_v1_dot_sync__pb2


class FlagSyncServiceStub(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from sync.v1 import sync_service_pb2 as sync_dot_v1_dot_sync__service__pb2
from . import sync_service_pb2 as sync_dot_v1_dot_sync__service__pb2


class FlagSyncServiceStub(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from openfeature.provider.provider import AbstractProvider

from ..config import Config
from .process.connector.file_watcher import FileWatcherFlagStore
from .process.connector.grpc_watcher import GrpcWatcherFlagStore
from .process.custom_ops import ends_with, fractional, sem_ver, starts_with
from .process.file_watcher import FileWatcherFlagStore
from .process.flags import FlagStore

T = typing.TypeVar("T")

Expand All @@ -27,14 +29,14 @@ class InProcessResolver:
def __init__(self, config: Config, provider: AbstractProvider):
self.config = config
self.provider = provider
if not self.config.offline_flag_source_path:
raise ValueError(
"offline_flag_source_path must be provided when using in-process resolver"
self.flag_store: FlagStore = (
FileWatcherFlagStore(
self.config.offline_flag_source_path,
self.provider,
self.config.offline_poll_interval_seconds,
)
self.flag_store = FileWatcherFlagStore(
self.config.offline_flag_source_path,
self.provider,
self.config.offline_poll_interval_seconds,
if self.config.offline_flag_source_path
else GrpcWatcherFlagStore(self.config, self.provider)
)

def shutdown(self) -> None:
Expand Down Expand Up @@ -62,15 +64,21 @@ def resolve_float_details(
default_value: float,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[float]:
return self._resolve(key, default_value, evaluation_context)
result = self._resolve(key, default_value, evaluation_context)
if not isinstance(result.value, float):
result.value = float(result.value)
return result

def resolve_integer_details(
self,
key: str,
default_value: int,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[int]:
return self._resolve(key, default_value, evaluation_context)
result = self._resolve(key, default_value, evaluation_context)
if not isinstance(result.value, int):
result.value = int(result.value)
return result

def resolve_object_details(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import logging
import os
import re
import threading
import time
import typing
Expand All @@ -12,12 +11,12 @@
from openfeature.exception import ParseError
from openfeature.provider.provider import AbstractProvider

from .flags import Flag
from ..flags import Flag, FlagStore

logger = logging.getLogger("openfeature.contrib")


class FileWatcherFlagStore:
class FileWatcherFlagStore(FlagStore):
def __init__(
self,
file_path: str,
Expand Down Expand Up @@ -56,7 +55,7 @@ def load_data(self, modified_time: typing.Optional[float] = None) -> None:
else:
data = json.load(file)

self.flag_data = self.parse_flags(data)
self.flag_data = Flag.parse_flags(data)
logger.debug(f"{self.flag_data=}")
self.provider.emit_provider_configuration_changed(
ProviderEventDetails(flags_changed=list(self.flag_data.keys()))
Expand All @@ -72,18 +71,3 @@ def load_data(self, modified_time: typing.Optional[float] = None) -> None:
logger.exception("Could not parse flag data using flagd syntax")
except Exception:
logger.exception("Could not read flags from file")

def parse_flags(self, flags_data: dict) -> dict:
flags = flags_data.get("flags", {})
evaluators: typing.Optional[dict] = flags_data.get("$evaluators")
if evaluators:
transposed = json.dumps(flags)
for name, rule in evaluators.items():
transposed = re.sub(
rf"{{\s*\"\$ref\":\s*\"{name}\"\s*}}", json.dumps(rule), transposed
)
flags = json.loads(transposed)

if not isinstance(flags, dict):
raise ParseError("`flags` key of configuration must be a dictionary")
return {key: Flag.from_dict(key, data) for key, data in flags.items()}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import json
import logging
import threading
import time
import typing

import grpc

from openfeature.event import ProviderEventDetails
from openfeature.exception import ParseError
from openfeature.provider.provider import AbstractProvider

from ....config import Config
from ....proto.flagd.sync.v1 import sync_pb2, sync_pb2_grpc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more than one level up, an absolute import is much easier to read and to update if necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was getting some mypy errors when using absolute import, need to check how to fix those:

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py:10: error:
Cannot find implementation or library stub for module named
"openfeature.contrib.provider.flagd.proto.flagd.sync.v1"  [import-not-found]
    from openfeature.contrib.provider.flagd.proto.flagd.sync.v1 import (
    ^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't figure this out in a simple way, seems related to using pre-commit at root of a "mono-repo" but I don't see any clear resources of how to structure it better. Maybe this is a follow-up issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach that may work as long as the number of packages remains manageable.

from ..flags import Flag, FlagStore

logger = logging.getLogger("openfeature.contrib")


class GrpcWatcherFlagStore(FlagStore):
colebaileygit marked this conversation as resolved.
Show resolved Hide resolved
INIT_BACK_OFF = 2
MAX_BACK_OFF = 120

def __init__(self, config: Config, provider: AbstractProvider):
self.provider = provider
self.flag_data: typing.Mapping[str, Flag] = {}
channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel
self.channel = channel_factory(f"{config.host}:{config.port}")
self.stub = sync_pb2_grpc.FlagSyncServiceStub(self.channel)

self.connected = False

# TODO: Add selector

self.thread = threading.Thread(target=self.sync_flags, daemon=True)
colebaileygit marked this conversation as resolved.
Show resolved Hide resolved
self.thread.start()

## block until ready or deadline reached

# TODO: get deadline from user
deadline = 2 + time.time()
while not self.connected and time.time() < deadline:
logger.debug("blocking on init")
time.sleep(0.05)

if not self.connected:
logger.warning(
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
)

def shutdown(self) -> None:
pass

def get_flag(self, key: str) -> typing.Optional[Flag]:
return self.flag_data.get(key)

def sync_flags(self) -> None:
request = sync_pb2.SyncFlagsRequest() # type:ignore[attr-defined]

retry_delay = self.INIT_BACK_OFF
while True:
try:
logger.debug("Setting up gRPC sync flags connection")
for flag_rsp in self.stub.SyncFlags(request):
flag_str = flag_rsp.flag_configuration
logger.debug(
f"Received flag configuration - {abs(hash(flag_str)) % (10 ** 8)}"
)
self.flag_data = Flag.parse_flags(json.loads(flag_str))

self.connected = True
self.provider.emit_provider_configuration_changed(
ProviderEventDetails(flags_changed=list(self.flag_data.keys()))
)

# reset retry delay after successsful read
retry_delay = self.INIT_BACK_OFF
except grpc.RpcError as e: # noqa: PERF203
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
except json.JSONDecodeError:
logger.exception(
f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}"
)
except ParseError:
logger.exception(
f"Could not parse flag data using flagd syntax: {flag_str=}"
)
finally:
# self.connected = False
logger.info(f"Reconnecting in {retry_delay}s")
time.sleep(retry_delay)
retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF)
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import json
import re
import typing
from dataclasses import dataclass

from openfeature.exception import ParseError


class FlagStore(typing.Protocol):
def get_flag(self, key: str) -> typing.Optional["Flag"]:
pass

def shutdown(self) -> None:
pass


@dataclass
class Flag:
key: str
Expand Down Expand Up @@ -49,3 +59,19 @@ def get_variant(
variant_key = str(variant_key).lower()

return variant_key, self.variants.get(variant_key)

@classmethod
def parse_flags(cls, flags_data: dict) -> typing.Dict[str, "Flag"]:
flags = flags_data.get("flags", {})
evaluators: typing.Optional[dict] = flags_data.get("$evaluators")
if evaluators:
transposed = json.dumps(flags)
for name, rule in evaluators.items():
transposed = re.sub(
rf"{{\s*\"\$ref\":\s*\"{name}\"\s*}}", json.dumps(rule), transposed
)
flags = json.loads(transposed)

if not isinstance(flags, dict):
raise ParseError("`flags` key of configuration must be a dictionary")
return {key: Flag.from_dict(key, data) for key, data in flags.items()}
Empty file.
22 changes: 5 additions & 17 deletions providers/openfeature-provider-flagd/tests/e2e/conftest.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
import typing

import pytest
from pytest_bdd import given, parsers, then, when
from tests.e2e.parsers import to_bool
from pytest_bdd import parsers, then, when

from openfeature import api
from openfeature.client import OpenFeatureClient
from openfeature.contrib.provider.flagd import FlagdProvider
from openfeature.contrib.provider.flagd.config import ResolverType
from openfeature.evaluation_context import EvaluationContext

JsonPrimitive = typing.Union[str, bool, float, int]


def to_bool(s: str) -> bool:
return s.lower() == "true"


@pytest.fixture
def evaluation_context() -> EvaluationContext:
return EvaluationContext()


@given("a flagd provider is set", target_fixture="client")
def setup_provider(flag_file) -> OpenFeatureClient:
api.set_provider(
FlagdProvider(
resolver_type=ResolverType.IN_PROCESS,
offline_flag_source_path=flag_file,
offline_poll_interval_seconds=0.1,
)
)
return api.get_client()


@when(
parsers.cfparse(
'a zero-value boolean flag with key "{key}" is evaluated with default value "{default:bool}"',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pytest_bdd import given

from openfeature import api
from openfeature.client import OpenFeatureClient
from openfeature.contrib.provider.flagd import FlagdProvider
from openfeature.contrib.provider.flagd.config import ResolverType


@given("a flagd provider is set", target_fixture="client")
def setup_provider(flag_file) -> OpenFeatureClient:
api.set_provider(
FlagdProvider(
resolver_type=ResolverType.IN_PROCESS,
offline_flag_source_path=flag_file,
offline_poll_interval_seconds=0.1,
)
)
return api.get_client()
Loading
Loading