Skip to content

Commit

Permalink
Issue #112 Add AggregatorBackendConfig.zookeeper_prefix
Browse files Browse the repository at this point in the history
 and deprecate `AggregatorConfig.zookeeper_prefix`
  • Loading branch information
soxofaan committed Feb 21, 2024
1 parent 32cda56 commit 185aa3e
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.21.0]

- Add `AggregatorBackendConfig.zookeeper_prefix` and deprecate `AggregatorConfig.zookeeper_prefix` ([#112](https://github.com/Open-EO/openeo-aggregator/issues/112))

## [0.20.0]

- Replace `kazoo_client_factory` config with `AggregatorBackendConfig.zk_memoizer_tracking` ([#112](https://github.com/Open-EO/openeo-aggregator/issues/112))
Expand Down
2 changes: 1 addition & 1 deletion src/openeo_aggregator/about.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from typing import Optional

__version__ = "0.20.0a1"
__version__ = "0.21.0a1"


def log_version_info(logger: Optional[logging.Logger] = None):
Expand Down
3 changes: 2 additions & 1 deletion src/openeo_aggregator/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,10 @@ def get_memoizer(memoizer_type: str, memoizer_conf: dict) -> Memoizer:
# TODO: better solution than using a module level global here?
stats=zk_memoizer_stats,
)
zookeeper_prefix = get_backend_config().zookeeper_prefix or config.zookeeper_prefix
return ZkMemoizer(
client=kazoo_client,
path_prefix=f"{config.zookeeper_prefix}/cache/{namespace}",
path_prefix=f"{zookeeper_prefix}/cache/{namespace}",
namespace=namespace,
default_ttl=memoizer_conf.get("default_ttl"),
zk_timeout=memoizer_conf.get("zk_timeout"),
Expand Down
5 changes: 5 additions & 0 deletions src/openeo_aggregator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class AggregatorConfig(dict):
configured_oidc_providers: List[OidcProvider] = dict_item(default=[])

partitioned_job_tracking = dict_item(default=None)
# TODO #112 Deprecated, use AggregatorBackendConfig.zookeeper_prefix instead
zookeeper_prefix = dict_item(default="/openeo-aggregator/")

# See `memoizer_from_config` for details.
Expand Down Expand Up @@ -138,6 +139,10 @@ class AggregatorBackendConfig(OpenEoBackendConfig):
# List of collection ids to cover with the aggregator (when None: support union of all upstream collections)
collection_whitelist: Optional[List[Union[str, re.Pattern]]] = None

# TODO #112: empty default is to allow config migration from AggregatorConfig to AggregatorBackendConfig.
# To be replaced eventually with "/openeo-aggregator/"
zookeeper_prefix: str = ""

zk_memoizer_tracking: bool = smart_bool(os.environ.get("OPENEO_AGGREGATOR_ZK_MEMOIZER_TRACKING"))


Expand Down
8 changes: 6 additions & 2 deletions src/openeo_aggregator/partitionedjobs/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from kazoo.exceptions import NodeExistsError, NoNodeError
from openeo_driver.errors import JobNotFoundException

from openeo_aggregator.config import AggregatorConfig, ConfigException
from openeo_aggregator.config import (
AggregatorConfig,
ConfigException,
get_backend_config,
)
from openeo_aggregator.partitionedjobs import (
STATUS_INSERTED,
PartitionedJob,
Expand Down Expand Up @@ -46,7 +50,7 @@ def from_config(cls, config: AggregatorConfig) -> "ZooKeeperPartitionedJobDB":
else:
raise ConfigException("Failed to construct zk_client")
# Determine ZooKeeper prefix
base_prefix = config.zookeeper_prefix
base_prefix = get_backend_config().zookeeper_prefix or config.zookeeper_prefix
assert len(base_prefix.replace("/", "")) >= 3
partitioned_jobs_prefix = config.partitioned_job_tracking.get("zookeeper_prefix", cls.NAMESPACE)
prefix = strip_join("/", base_prefix, partitioned_jobs_prefix)
Expand Down
1 change: 1 addition & 0 deletions tests/backend_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
OidcProvider(id="z-agg", issuer="https://z.test", title="Z (agg)"),
],
connections_cache_ttl=1.0,
zookeeper_prefix="/o-a/",
)
13 changes: 6 additions & 7 deletions tests/background/test_prime_caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def config(backend1, backend2, backend1_id, backend2_id, zk_client) -> Aggregato
backend1_id: backend1,
backend2_id: backend2,
}
conf.zookeeper_prefix = "/oa/"
conf.memoizer = {
"type": "zookeeper",
"config": {
Expand Down Expand Up @@ -65,15 +64,15 @@ def test_prime_caches_basic(config, upstream_request_mocks, zk_client):

assert zk_client.get_data_deserialized() == DictSubSet(
{
"/oa/cache/CollectionCatalog/all": [
"/o-a/cache/CollectionCatalog/all": [
[DictSubSet({"id": "S2"})],
DictSubSet({"_jsonserde": DictSubSet()}),
],
"/oa/cache/CollectionCatalog/collection/S2": DictSubSet({"id": "S2"}),
"/oa/cache/Processing/all/1.1.0": DictSubSet({"load_collection": DictSubSet({"id": "load_collection"})}),
"/oa/cache/general/file_formats": FILE_FORMATS_JUST_GEOTIFF,
"/oa/cache/mbcon/api_versions": ["1.1.0"],
"/oa/cache/SecondaryServices/service_types": {
"/o-a/cache/CollectionCatalog/collection/S2": DictSubSet({"id": "S2"}),
"/o-a/cache/Processing/all/1.1.0": DictSubSet({"load_collection": DictSubSet({"id": "load_collection"})}),
"/o-a/cache/general/file_formats": FILE_FORMATS_JUST_GEOTIFF,
"/o-a/cache/mbcon/api_versions": ["1.1.0"],
"/o-a/cache/SecondaryServices/service_types": {
"service_types": {},
"supporting_backend_ids": [],
},
Expand Down
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def base_config(configured_oidc_providers, zk_client, memoizer_config) -> Aggreg

conf.memoizer = memoizer_config

conf.zookeeper_prefix = "/o-a/"
conf.partitioned_job_tracking = {
"zk_client": zk_client,
}
Expand Down
7 changes: 3 additions & 4 deletions tests/test_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
memoizer_from_config,
)
from openeo_aggregator.config import AggregatorConfig
from openeo_aggregator.testing import DummyZnodeStat, clock_mock
from openeo_aggregator.testing import DummyZnodeStat, clock_mock, config_overrides
from openeo_aggregator.utils import Clock


Expand Down Expand Up @@ -723,8 +723,8 @@ def test_zookeeper_memoizer(self):
"type": "zookeeper",
"config": {"zk_hosts": "zk.test:2181", "default_ttl": 99, "zk_timeout": 88}
}
config.zookeeper_prefix = "/oea/test"
memoizer = memoizer_from_config(config, namespace="test-ns")
with config_overrides(zookeeper_prefix="/oea/test"):
memoizer = memoizer_from_config(config, namespace="test-ns")
assert isinstance(memoizer, ZkMemoizer)
assert memoizer._default_ttl == 99
assert memoizer._prefix == "/oea/test/cache/test-ns"
Expand All @@ -739,7 +739,6 @@ def test_chained_memoizer(self):
{"type": "dict", "config": {"default_ttl": 333}},
]}
}
config.zookeeper_prefix = "/oea/test"
memoizer = memoizer_from_config(config, namespace="test-ns")
assert isinstance(memoizer, ChainedMemoizer)
assert len(memoizer._memoizers) == 2
Expand Down

0 comments on commit 185aa3e

Please sign in to comment.