Skip to content

Commit

Permalink
--wip-- [skip ci]
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli committed Dec 17, 2024
1 parent b6d767d commit 527ce6b
Show file tree
Hide file tree
Showing 25 changed files with 744 additions and 331 deletions.
2 changes: 1 addition & 1 deletion providers/openfeature-provider-flagd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ classifiers = [
keywords = []
dependencies = [
"openfeature-sdk>=0.6.0",
"grpcio>=1.68.0",
"grpcio>=1.68.1",
"protobuf>=4.25.2",
"mmh3>=4.1.0",
"panzi-json-logic>=1.0.1",
Expand Down
8 changes: 8 additions & 0 deletions providers/openfeature-provider-flagd/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@ markers =
in-process: tests for rpc mode.
customCert: Supports custom certs.
unixsocket: Supports unixsockets.
targetURI: Supports targetURI.
grace: Supports grace attempts.
targeting: Supports targeting.
fractional: Supports fractional.
string: Supports string.
semver: Supports semver.
reconnect: Supports reconnect.
events: Supports events.
sync: Supports sync.
caching: Supports caching.
offline: Supports offline.
bdd_features_base_dir = tests/features
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ def __init__( # noqa: PLR0913
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
deadline: typing.Optional[int] = None,
deadline_ms: typing.Optional[int] = None,
timeout: typing.Optional[int] = None,
retry_backoff_ms: typing.Optional[int] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
cache_type: typing.Optional[CacheType] = None,
cache: typing.Optional[CacheType] = None,
max_cache_size: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_period: typing.Optional[int] = None,
Expand All @@ -61,16 +61,16 @@ def __init__( # noqa: PLR0913
:param host: the host to make requests to
:param port: the port the flagd service is available on
:param tls: enable/disable secure TLS connectivity
:param deadline: the maximum to wait before a request times out
:param deadline_ms: the maximum to wait before a request times out
:param timeout: the maximum time to wait before a request times out
:param retry_backoff_ms: the number of milliseconds to backoff
:param offline_flag_source_path: the path to the flag source file
:param stream_deadline_ms: the maximum time to wait before a request times out
:param keep_alive_time: the number of milliseconds to keep alive
:param resolver_type: the type of resolver to use
"""
if deadline is None and timeout is not None:
deadline = timeout * 1000
if deadline_ms is None and timeout is not None:
deadline_ms = timeout * 1000
warnings.warn(
"'timeout' property is deprecated, please use 'deadline' instead, be aware that 'deadline' is in milliseconds",
DeprecationWarning,
Expand All @@ -81,15 +81,15 @@ def __init__( # noqa: PLR0913
host=host,
port=port,
tls=tls,
deadline_ms=deadline,
deadline_ms=deadline_ms,
retry_backoff_ms=retry_backoff_ms,
retry_backoff_max_ms=retry_backoff_max_ms,
retry_grace_period=retry_grace_period,
resolver=resolver_type,
offline_flag_source_path=offline_flag_source_path,
stream_deadline_ms=stream_deadline_ms,
keep_alive_time=keep_alive_time,
cache=cache_type,
cache=cache,
max_cache_size=max_cache_size,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
import threading
import time
Expand Down Expand Up @@ -55,47 +54,22 @@ def __init__(
self.emit_provider_error = emit_provider_error
self.emit_provider_stale = emit_provider_stale
self.emit_provider_configuration_changed = emit_provider_configuration_changed
self.cache: typing.Optional[BaseCacheImpl] = (
LRUCache(maxsize=self.config.max_cache_size)
if self.config.cache == CacheType.LRU
else None
)
self.cache: typing.Optional[BaseCacheImpl] = self._create_cache()

retry_backoff_seconds = config.retry_backoff_ms * 0.001
retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001
self.retry_grace_period = config.retry_grace_period
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline_ms * 0.001
self.connected = False
channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel
service_config = {
"methodConfig": [
{
"name": [
{
"service": "flagd.evaluation.v1.Service",
"method": "EventStream",
}
],
"retryPolicy": {
"maxAttempts": 50000, # Max value for a 32-bit integer
"initialBackoff": f"{retry_backoff_seconds}s", # Initial backoff delay
"maxBackoff": f"{retry_backoff_max_seconds}s", # Maximum backoff delay
"backoffMultiplier": 2, # Exponential backoff multiplier
"retryableStatusCodes": [
"UNAVAILABLE",
"UNKNOWN",
], # Retry on these statuses
},
}
],
}

# Create the channel with the service config
options = [
("grpc.service_config", json.dumps(service_config)),
("grpc.keepalive_time_ms", config.keep_alive_time),
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
("grpc.min_reconnect_backoff_ms", config.deadline_ms),
]

self.channel = channel_factory(
f"{config.host}:{config.port}",
options=options,
Expand All @@ -104,15 +78,21 @@ def __init__(

self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None
self.active = False

def _create_cache(self):
return (
LRUCache(maxsize=self.config.max_cache_size)
if self.config.cache == CacheType.LRU
else None
)

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.connect()

def shutdown(self) -> None:
self.active = False
self.channel.close()
if self.cache:
self.cache.clear()

def connect(self) -> None:
self.active = True
Expand Down Expand Up @@ -167,7 +147,7 @@ def state_change_callback(new_state: ChannelConnectivity) -> None:

def emit_error(self) -> None:
logger.debug("gRPC error emitted")
if self.cache:
if self.cache is not None:
self.cache.clear()
self.emit_provider_error(
ProviderEventDetails(
Expand All @@ -189,7 +169,9 @@ def listen(self) -> None:
while self.active:
try:
logger.info("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(request, **call_args):
for message in self.stub.EventStream(
request, wait_for_ready=True, **call_args
):
if message.type == "provider_ready":
self.connected = True
self.emit_provider_ready(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ def __init__(
self.last_modified = 0.0
self.flag_data: typing.Mapping[str, Flag] = {}
self.load_data()
self.active = True
self.thread = threading.Thread(target=self.refresh_file, daemon=True)
self.thread.start()
self.active = True

def shutdown(self) -> None:
self.active = False
Expand Down
16 changes: 6 additions & 10 deletions providers/openfeature-provider-flagd/tests/e2e/conftest.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import typing

from tests.e2e.steps import * # noqa: F403
from tests.e2e.step.config_steps import * # noqa: F403
from tests.e2e.step.context_steps import * # noqa: F403
from tests.e2e.step.event_steps import * # noqa: F403
from tests.e2e.step.flag_step import * # noqa: F403
from tests.e2e.step.provider_steps import * # noqa: F403
from tests.e2e.steps import * # noqa: F403 # noqa: F403

JsonPrimitive = typing.Union[str, bool, float, int]

TEST_HARNESS_PATH = "../../openfeature/test-harness"
SPEC_PATH = "../../openfeature/spec"


# running all gherkin tests, except the ones, not implemented
def pytest_collection_modifyitems(config):
marker = "not customCert and not unixsocket and not sync and not targetURI"

# this seems to not work with python 3.8
if hasattr(config.option, "markexpr") and config.option.markexpr == "":
config.option.markexpr = marker
20 changes: 14 additions & 6 deletions providers/openfeature-provider-flagd/tests/e2e/flagd_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,27 @@
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs

from openfeature.contrib.provider.flagd.config import ResolverType

HEALTH_CHECK = 8014


class FlagdContainer(DockerContainer):
def __init__(
self,
image: str = "ghcr.io/open-feature/flagd-testbed:v0.5.15",
port: int = 8013,
**kwargs,
self,
**kwargs,
) -> None:
image: str = "ghcr.io/open-feature/flagd-testbed:v0.5.15"
super().__init__(image, **kwargs)
self.port = port
self.with_exposed_ports(self.port, HEALTH_CHECK)
self.rpc = 8013
self.ipr = 8015
self.with_exposed_ports(self.rpc, self.ipr, HEALTH_CHECK)

def get_port(self, resolver_type:ResolverType):
if resolver_type == ResolverType.RPC:
return self.get_exposed_port(self.rpc)
else:
return self.get_exposed_port(self.ipr)

def start(self) -> "FlagdContainer":
super().start()
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import json
import os
import tempfile

import pytest
import yaml
from pytest_bdd import given, parsers, when
from tests.e2e.conftest import TEST_HARNESS_PATH
from tests.e2e.step._utils import wait_for
from tests.e2e.testFilter import TestFilter

from openfeature import api
from openfeature.client import OpenFeatureClient
from openfeature.contrib.provider.flagd import FlagdProvider
from openfeature.contrib.provider.flagd.config import ResolverType
from openfeature.provider import ProviderStatus

# from tests.e2e.step.config_steps import *
# from tests.e2e.step.event_steps import *
# from tests.e2e.step.provider_steps import *

resolver = ResolverType.IN_PROCESS
feature_list = {
"~targetURI",
"~customCert",
"~unixsocket",
"~events",
"~sync",
"~caching",
"~reconnect",
"~grace",
}


def pytest_collection_modifyitems(config, items):
test_filter = TestFilter(
config, feature_list=feature_list, resolver=resolver.value, base_path=__file__
)
test_filter.filter_items(items)


KEY_EVALUATORS = "$evaluators"

KEY_FLAGS = "flags"

MERGED_FILE = "merged_file"


@pytest.fixture()
def resolver_type() -> ResolverType:
return resolver


@pytest.fixture(scope="module")
def all_flags(request):
result = {KEY_FLAGS: {}, KEY_EVALUATORS: {}}

path = os.path.abspath(
os.path.join(os.path.dirname(__file__), f"../{TEST_HARNESS_PATH}/flags/")
)

for f in os.listdir(path):
with open(path + "/" + f, "rb") as infile:
loaded_json = json.load(infile)
result[KEY_FLAGS] = {**result[KEY_FLAGS], **loaded_json[KEY_FLAGS]}
if loaded_json.get(KEY_EVALUATORS):
result[KEY_EVALUATORS] = {
**result[KEY_EVALUATORS],
**loaded_json[KEY_EVALUATORS],
}

return result


@pytest.fixture(params=["json", "yaml"], scope="module")
def file_name(request, all_flags):
extension = request.param
outfile = tempfile.NamedTemporaryFile("w", delete=False, suffix="." + extension)
write_test_file(outfile, all_flags)
yield outfile
return outfile


def write_test_file(outfile, all_flags):
with open(outfile.name, "w") as file:
if file.name.endswith("json"):
json.dump(all_flags, file)
else:
yaml.dump(all_flags, file)


@when(
parsers.cfparse('a flag with key "{flag_key}" is modified'),
target_fixture="changed_flag",
)
def changed_flag(
flag_key: str,
all_flags: dict,
file_name,
):
flag = all_flags[KEY_FLAGS][flag_key]

other_variant = [k for k in flag["variants"] if flag["defaultVariant"] in k].pop()

flag["defaultVariant"] = other_variant

all_flags[KEY_FLAGS][flag_key] = flag
write_test_file(file_name, all_flags)
return flag_key


@pytest.fixture(autouse=True)
def container(request, file_name, all_flags, option_values):
api.set_provider(
FlagdProvider(
resolver_type=ResolverType.IN_PROCESS,
deadline_ms=500,
stream_deadline_ms=0,
retry_backoff_ms=1000,
offline_flag_source_path=file_name.name,
**option_values,
),
)
pass


@given(parsers.cfparse("a {provider_type} flagd provider"), target_fixture="client")
def setup_provider(
resolver_type: ResolverType, provider_type: str, option_values: dict, file_name
) -> OpenFeatureClient:
client = api.get_client()

wait_for(lambda: client.get_provider_status() == ProviderStatus.READY)
return client
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from e2e.conftest import SPEC_PATH
from pytest_bdd import scenarios
from tests.e2e.conftest import TEST_HARNESS_PATH

scenarios(f"{TEST_HARNESS_PATH}/gherkin", f"{SPEC_PATH}/specification/assets/gherkin")
Empty file.
Loading

0 comments on commit 527ce6b

Please sign in to comment.