From 5490922051899b83664cee0e32dfe434cf87c9cf Mon Sep 17 00:00:00 2001 From: Guillermo Perez Date: Tue, 7 Nov 2023 22:58:19 +0100 Subject: [PATCH] Build cmd in rust ## Motivation / Description The second cpu-intensive part of the request processing is building the cmd. Also instead of building dicts of flags we can use a single flags object, which also simplifies the API of the lower commands. I chose to still built a single flags object, but we could explore building one flags object per meta-command, as the flags that they support differ, and it could lead to a more type-safe low-level implementation. ## Performance: * Initial: multithreaded: Overall: 110779.55 RPS / 9.03 us/req singlethreaded: Overall: 111545.63 RPS / 8.96 us/req * Rust only for response parsing multithreaded: Overall: 245898.34 RPS / 4.07 us/req singlethreaded: Overall: 246165.19 RPS / 4.06 us/req * Now (rust also for build_cmd) multithreaded: Overall: 319587.03 RPS / 3.13 us/req singlethreaded: Overall: 323101.77 RPS / 3.10 us/req --- src/meta_memcache/__init__.py | 4 +- src/meta_memcache/cache_client.py | 8 +- .../commands/high_level_commands.py | 204 +++++++----------- src/meta_memcache/commands/meta_commands.py | 36 +--- src/meta_memcache/configuration.py | 20 +- .../connection/memcache_socket.py | 14 +- src/meta_memcache/executors/default.py | 111 ++++------ src/meta_memcache/extras/client_wrapper.py | 36 +--- .../extras/migrating_cache_client.py | 50 +---- src/meta_memcache/interfaces/executor.py | 15 +- src/meta_memcache/interfaces/meta_commands.py | 26 +-- src/meta_memcache/interfaces/router.py | 15 +- src/meta_memcache/protocol.py | 82 ++----- src/meta_memcache/routers/default.py | 19 +- src/meta_memcache/routers/ephemeral.py | 24 +-- src/meta_memcache/routers/gutter.py | 28 +-- src/meta_memcache/routers/helpers.py | 31 +-- src/meta_memcache/settings.py | 5 +- tests/cache_client_test.py | 26 +-- tests/commands_test.py | 23 +- tests/ephemeral_cache_client_test.py | 10 +- tests/migrating_cache_client_test.py | 106 +++------ tests/probabilistic_hot_cache_test.py | 30 ++- 23 files changed, 311 insertions(+), 612 deletions(-) diff --git a/src/meta_memcache/__init__.py b/src/meta_memcache/__init__.py index e2609f5..cb8d14b 100644 --- a/src/meta_memcache/__init__.py +++ b/src/meta_memcache/__init__.py @@ -33,17 +33,15 @@ ) from meta_memcache.protocol import ( Conflict, - Flag, - IntFlag, Key, MetaCommand, Miss, NotStored, ServerVersion, ResponseFlags, + RequestFlags, SetMode, Success, - TokenFlag, Value, ) from meta_memcache.routers.default import DefaultRouter diff --git a/src/meta_memcache/cache_client.py b/src/meta_memcache/cache_client.py index b09f8ae..f6601f0 100644 --- a/src/meta_memcache/cache_client.py +++ b/src/meta_memcache/cache_client.py @@ -1,4 +1,4 @@ -from typing import Callable, Iterable, Optional, Tuple +from typing import Callable, Iterable, Optional from meta_memcache.base.base_cache_client import BaseCacheClient from meta_memcache.commands.high_level_commands import HighLevelCommandsMixin @@ -25,7 +25,7 @@ def cache_client_from_servers( servers: Iterable[ServerAddress], connection_pool_factory_fn: Callable[[ServerAddress], ConnectionPool], serializer: Optional[BaseSerializer] = None, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, ) -> CacheApi: executor = DefaultExecutor( @@ -48,7 +48,7 @@ def cache_client_with_gutter_from_servers( gutter_ttl: int, connection_pool_factory_fn: Callable[[ServerAddress], ConnectionPool], serializer: Optional[BaseSerializer] = None, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, ) -> CacheApi: executor = DefaultExecutor( @@ -76,7 +76,7 @@ def ephemeral_cache_client_from_servers( max_ttl: int, connection_pool_factory_fn: Callable[[ServerAddress], ConnectionPool], serializer: Optional[BaseSerializer] = None, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, ) -> CacheApi: executor = DefaultExecutor( diff --git a/src/meta_memcache/commands/high_level_commands.py b/src/meta_memcache/commands/high_level_commands.py index dfac617..a9ec8a5 100644 --- a/src/meta_memcache/commands/high_level_commands.py +++ b/src/meta_memcache/commands/high_level_commands.py @@ -5,7 +5,6 @@ Iterable, Optional, Protocol, - Set, Tuple, Type, TypeVar, @@ -18,35 +17,33 @@ from meta_memcache.interfaces.meta_commands import MetaCommandsProtocol from meta_memcache.interfaces.router import FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, Miss, ReadResponse, + RequestFlags, SetMode, Success, - TokenFlag, Value, + MA_MODE_DEC, ) T = TypeVar("T") _REFILL_FAILURE_HANDLING = FailureHandling(track_write_failures=False) - -DEFAULT_FLAGS: Set[Flag] = { - Flag.RETURN_VALUE, - Flag.RETURN_TTL, - Flag.RETURN_CLIENT_FLAG, - Flag.RETURN_LAST_ACCESS, - Flag.RETURN_FETCHED, -} -DEFAULT_CAS_FLAGS: Set[Flag] = { - Flag.RETURN_VALUE, - Flag.RETURN_TTL, - Flag.RETURN_CLIENT_FLAG, - Flag.RETURN_LAST_ACCESS, - Flag.RETURN_FETCHED, - Flag.RETURN_CAS_TOKEN, -} +DEFAULT_GET_FLAGS = RequestFlags( + return_value=True, + return_ttl=True, + return_client_flag=True, + return_last_access=True, + return_fetched=True, +) +DEFAULT_GET_CAS_FLAGS = RequestFlags( + return_value=True, + return_ttl=True, + return_client_flag=True, + return_last_access=True, + return_fetched=True, + return_cas_token=True, +) class HighLevelCommandMixinWithMetaCommands( @@ -83,7 +80,7 @@ def _get_delta_flags( no_reply: bool = False, cas_token: Optional[int] = None, return_value: bool = False, - ) -> Tuple[Set[Flag], Dict[IntFlag, int], Dict[TokenFlag, bytes]]: + ) -> RequestFlags: ... # pragma: no cover @@ -99,28 +96,21 @@ def set( set_mode: SetMode = SetMode.SET, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() + flags = RequestFlags(cache_ttl=ttl) if no_reply: - flags.add(Flag.NOREPLY) - int_flags: Dict[IntFlag, int] = { - IntFlag.CACHE_TTL: ttl, - } + flags.no_reply = True if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if stale_policy and stale_policy.mark_stale_on_cas_mismatch: - flags.add(Flag.MARK_STALE) - if set_mode == SetMode.SET: - token_flags = None - else: - token_flags = {TokenFlag.MODE: set_mode.value} + flags.mark_stale = True + if set_mode != SetMode.SET: + flags.mode = set_mode.value result = self.meta_set( key=key, value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) return isinstance(result, Success) @@ -149,17 +139,18 @@ def refill( there is no need to track failures. """ key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() + flags = RequestFlags( + cache_ttl=ttl, + mode=SetMode.ADD.value, + ) if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True result = self.meta_set( key=key, value=value, ttl=ttl, flags=flags, - int_flags={IntFlag.CACHE_TTL: ttl}, - token_flags={TokenFlag.MODE: SetMode.ADD.value}, failure_handling=_REFILL_FAILURE_HANDLING, ) @@ -179,21 +170,16 @@ def delete( it exists or not, use invalidate() instead. """ key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() - int_flags: Dict[IntFlag, int] = {} + flags = RequestFlags() if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if stale_policy and stale_policy.mark_stale_on_deletion_ttl > 0: - flags.add(Flag.MARK_STALE) - int_flags[IntFlag.CACHE_TTL] = stale_policy.mark_stale_on_deletion_ttl + flags.mark_stale = True + flags.cache_ttl = stale_policy.mark_stale_on_deletion_ttl - result = self.meta_delete( - key=key, - flags=flags, - int_flags=int_flags, - ) + result = self.meta_delete(key=key, flags=flags) return isinstance(result, Success) @@ -208,21 +194,16 @@ def invalidate( Returns true of the key deleted or it didn't exist anyway """ key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() - int_flags: Dict[IntFlag, int] = {} + flags = RequestFlags() if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if stale_policy and stale_policy.mark_stale_on_deletion_ttl > 0: - flags.add(Flag.MARK_STALE) - int_flags[IntFlag.CACHE_TTL] = stale_policy.mark_stale_on_deletion_ttl + flags.mark_stale = True + flags.cache_ttl = stale_policy.mark_stale_on_deletion_ttl - result = self.meta_delete( - key=key, - flags=flags, - int_flags=int_flags, - ) + result = self.meta_delete(key=key, flags=flags) return isinstance(result, (Success, Miss)) @@ -233,11 +214,10 @@ def touch( no_reply: bool = False, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags: Set[Flag] = set() - int_flags = {IntFlag.CACHE_TTL: ttl} + flags = RequestFlags(cache_ttl=ttl) if no_reply: - flags.add(Flag.NOREPLY) - result = self.meta_get(key, flags=flags, int_flags=int_flags) + flags.no_reply = True + result = self.meta_get(key, flags=flags) return isinstance(result, Success) @@ -346,22 +326,17 @@ def _multi_get( return_cas_token: bool = False, ) -> Dict[Key, Optional[Value]]: if return_cas_token: - flags = DEFAULT_CAS_FLAGS.copy() - else: - flags = DEFAULT_FLAGS.copy() - if recache_policy is None and touch_ttl is None: - int_flags = None + flags = DEFAULT_GET_CAS_FLAGS.copy() else: - int_flags = {} - if recache_policy: - int_flags[IntFlag.RECACHE_TTL] = recache_policy.ttl - if touch_ttl is not None and touch_ttl >= 0: - int_flags[IntFlag.CACHE_TTL] = touch_ttl + flags = DEFAULT_GET_FLAGS.copy() + if recache_policy: + flags.recache_ttl = recache_policy.ttl + if touch_ttl is not None and touch_ttl >= 0: + flags.cache_ttl = touch_ttl results = self.meta_multiget( keys=[key if isinstance(key, Key) else Key(key) for key in keys], flags=flags, - int_flags=int_flags, ) return {k: self._process_get_result(k, v) for k, v in results.items()} @@ -392,21 +367,17 @@ def _get( ) -> Optional[Value]: key = key if isinstance(key, Key) else Key(key) if return_cas_token: - flags = DEFAULT_CAS_FLAGS.copy() - else: - flags = DEFAULT_FLAGS.copy() - if lease_policy is None and recache_policy is None and touch_ttl is None: - int_flags = None + flags = DEFAULT_GET_CAS_FLAGS.copy() else: - int_flags = {} - if lease_policy: - int_flags[IntFlag.MISS_LEASE_TTL] = lease_policy.ttl - if recache_policy: - int_flags[IntFlag.RECACHE_TTL] = recache_policy.ttl - if touch_ttl is not None and touch_ttl >= 0: - int_flags[IntFlag.CACHE_TTL] = touch_ttl - - result = self.meta_get(key, flags=flags, int_flags=int_flags) + flags = DEFAULT_GET_FLAGS.copy() + if lease_policy: + flags.vivify_on_miss_ttl = lease_policy.ttl + if recache_policy: + flags.recache_ttl = recache_policy.ttl + if touch_ttl is not None and touch_ttl >= 0: + flags.cache_ttl = touch_ttl + + result = self.meta_get(key, flags=flags) return self._process_get_result(key, result) def _process_get_result( @@ -477,25 +448,22 @@ def _get_delta_flags( no_reply: bool = False, cas_token: Optional[int] = None, return_value: bool = False, - ) -> Tuple[Set[Flag], Dict[IntFlag, int], Dict[TokenFlag, bytes]]: - flags: Set[Flag] = set() - int_flags: Dict[IntFlag, int] = { - IntFlag.MA_DELTA_VALUE: abs(delta), - } - token_flags: Dict[TokenFlag, bytes] = {} - + ) -> RequestFlags: + flags = RequestFlags( + ma_delta_value=abs(delta), + ) if return_value: - flags.add(Flag.RETURN_VALUE) + flags.return_value = True if no_reply: - flags.add(Flag.NOREPLY) + flags.no_reply = True if refresh_ttl is not None: - int_flags[IntFlag.CACHE_TTL] = refresh_ttl + flags.cache_ttl = refresh_ttl if cas_token is not None: - int_flags[IntFlag.CAS_TOKEN] = cas_token + flags.cas_token = cas_token if delta < 0: - token_flags[TokenFlag.MODE] = b"-" + flags.mode = MA_MODE_DEC - return flags, int_flags, token_flags + return flags def delta( self: HighLevelCommandMixinWithMetaCommands, @@ -506,15 +474,13 @@ def delta( cas_token: Optional[int] = None, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( delta=delta, refresh_ttl=refresh_ttl, no_reply=no_reply, cas_token=cas_token, ) - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + result = self.meta_arithmetic(key=key, flags=flags) return isinstance(result, Success) def delta_initialize( @@ -528,17 +494,15 @@ def delta_initialize( cas_token: Optional[int] = None, ) -> bool: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( delta=delta, refresh_ttl=refresh_ttl, no_reply=no_reply, cas_token=cas_token, ) - int_flags[IntFlag.MA_INITIAL_VALUE] = abs(initial_value) - int_flags[IntFlag.MISS_LEASE_TTL] = initial_ttl - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + flags.ma_initial_value = abs(initial_value) + flags.vivify_on_miss_ttl = initial_ttl + result = self.meta_arithmetic(key=key, flags=flags) return isinstance(result, Success) def delta_and_get( @@ -549,15 +513,13 @@ def delta_and_get( cas_token: Optional[int] = None, ) -> Optional[int]: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( return_value=True, delta=delta, refresh_ttl=refresh_ttl, cas_token=cas_token, ) - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + result = self.meta_arithmetic(key=key, flags=flags) if isinstance(result, Value): if isinstance(result.value, str) and result.value.isnumeric(): return int(result.value) @@ -577,17 +539,15 @@ def delta_initialize_and_get( cas_token: Optional[int] = None, ) -> Optional[int]: key = key if isinstance(key, Key) else Key(key) - flags, int_flags, token_flags = self._get_delta_flags( + flags = self._get_delta_flags( return_value=True, delta=delta, refresh_ttl=refresh_ttl, cas_token=cas_token, ) - int_flags[IntFlag.MA_INITIAL_VALUE] = abs(initial_value) - int_flags[IntFlag.MISS_LEASE_TTL] = initial_ttl - result = self.meta_arithmetic( - key=key, flags=flags, int_flags=int_flags, token_flags=token_flags - ) + flags.ma_initial_value = abs(initial_value) + flags.vivify_on_miss_ttl = initial_ttl + result = self.meta_arithmetic(key=key, flags=flags) if isinstance(result, Value): if isinstance(result.value, str) and result.value.isnumeric(): return int(result.value) diff --git a/src/meta_memcache/commands/meta_commands.py b/src/meta_memcache/commands/meta_commands.py index 6d1901e..647f95e 100644 --- a/src/meta_memcache/commands/meta_commands.py +++ b/src/meta_memcache/commands/meta_commands.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional from meta_memcache.errors import MemcacheError from meta_memcache.interfaces.router import ( @@ -8,15 +8,13 @@ ) from meta_memcache.protocol import ( Conflict, - Flag, - IntFlag, Key, MetaCommand, Miss, NotStored, ReadResponse, + RequestFlags, Success, - TokenFlag, Value, ValueContainer, WriteResponse, @@ -27,9 +25,7 @@ class MetaCommandsMixin: def meta_multiget( self: HasRouter, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: results: Dict[Key, ReadResponse] = {} @@ -37,8 +33,6 @@ def meta_multiget( command=MetaCommand.META_GET, keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ).items(): if not isinstance(result, (Miss, Value, Success)): @@ -51,17 +45,13 @@ def meta_multiget( def meta_get( self: HasRouter, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: result = self.router.exec( command=MetaCommand.META_GET, key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Miss, Value, Success)): @@ -73,9 +63,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: result = self.router.exec( @@ -83,8 +71,6 @@ def meta_set( key=key, value=ValueContainer(value), flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Success, NotStored, Conflict, Miss)): @@ -94,17 +80,13 @@ def meta_set( def meta_delete( self: HasRouter, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: result = self.router.exec( command=MetaCommand.META_DELETE, key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Success, NotStored, Conflict, Miss)): @@ -116,17 +98,13 @@ def meta_delete( def meta_arithmetic( self: HasRouter, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: result = self.router.exec( command=MetaCommand.META_ARITHMETIC, key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) if not isinstance(result, (Success, NotStored, Conflict, Miss, Value)): diff --git a/src/meta_memcache/configuration.py b/src/meta_memcache/configuration.py index bc92ffa..ab2911e 100644 --- a/src/meta_memcache/configuration.py +++ b/src/meta_memcache/configuration.py @@ -1,8 +1,7 @@ -import base64 import hashlib import socket from enum import IntEnum -from typing import Callable, Dict, Iterable, NamedTuple, Optional, Tuple +from typing import Callable, Dict, Iterable, NamedTuple, Optional from meta_memcache.connection.pool import ConnectionPool from meta_memcache.protocol import Key, ServerVersion @@ -157,17 +156,18 @@ class StalePolicy(NamedTuple): mark_stale_on_cas_mismatch: bool = False -def default_key_encoder(key: Key) -> Tuple[bytes, bool]: +def default_key_encoder(key: Key) -> bytes: """ Generate valid memcache key as bytes for given Key. """ - if key.is_unicode or len(key.key) > MAX_KEY_SIZE: - key_hash = hashlib.blake2b(key.key.encode(), digest_size=18).digest() - return base64.b64encode(key_hash), True - elif " " in key.key: - raise ValueError(f"Invalid key {key}") - else: - return key.key.encode("ascii"), False + # TODO: Support + # if isinstance(key.key, bytes): + # data = key.key + # else: + data = key.key.encode() + if len(data) >= MAX_KEY_SIZE: + data = hashlib.blake2b(data, digest_size=18).digest() + return data class MigrationMode(IntEnum): diff --git a/src/meta_memcache/connection/memcache_socket.py b/src/meta_memcache/connection/memcache_socket.py index 66505c1..73ad81f 100644 --- a/src/meta_memcache/connection/memcache_socket.py +++ b/src/meta_memcache/connection/memcache_socket.py @@ -8,7 +8,6 @@ from meta_memcache.protocol import ( ENDL, ENDL_LEN, - EMPTY_RESPONSE_FLAGS, NOOP, Conflict, Miss, @@ -135,13 +134,12 @@ def _get_single_header( while True: if self._read != self._pos: - # We have data in the buffer: find the header + # We have data in the buffer: Try to find the header if header_data := meta_socket.parse_header( self._buf_view, self._pos, self._read ): self._pos = header_data[0] return header_data - # Missing data, but still space in buffer, so read more if self._recv_info_buffer() <= 0: break @@ -178,15 +176,13 @@ def get_response( result: Union[Value, Success, NotStored, Conflict, Miss] try: if response_code == meta_socket.RESPONSE_VALUE: - if size is None: - raise MemcacheError("Bad value response. Missing size") # Value response - result = Value( - size=size, flags=flags or EMPTY_RESPONSE_FLAGS, value=None - ) + assert size is not None and flags is not None + result = Value(size=size, flags=flags, value=None) elif response_code == meta_socket.RESPONSE_SUCCESS: # Stored or no value, return Success - result = Success(flags=flags or EMPTY_RESPONSE_FLAGS) + assert flags is not None + result = Success(flags=flags) elif response_code == meta_socket.RESPONSE_NOT_STORED: # Value response, parse size and flags result = NOT_STORED diff --git a/src/meta_memcache/executors/default.py b/src/meta_memcache/executors/default.py index 0383aa6..a0954bd 100644 --- a/src/meta_memcache/executors/default.py +++ b/src/meta_memcache/executors/default.py @@ -1,5 +1,7 @@ import logging -from typing import Callable, Dict, List, Optional, Set, Tuple +from typing import Callable, Dict, List, Optional, Tuple + +from meta_socket import RequestFlags, build_cmd from meta_memcache.base.base_serializer import BaseSerializer from meta_memcache.configuration import default_key_encoder @@ -9,8 +11,6 @@ from meta_memcache.events.write_failure_event import WriteFailureEvent from meta_memcache.protocol import ( ENDL, - Flag, - IntFlag, Key, MaybeValue, MemcacheResponse, @@ -20,20 +20,21 @@ ResponseFlags, ServerVersion, Success, - TokenFlag, Value, ValueContainer, - encode_size, ) _log: logging.Logger = logging.getLogger(__name__) +meta_commands_values = {cmd: cmd.value for cmd in MetaCommand} + + class DefaultExecutor: def __init__( self, serializer: BaseSerializer, - key_encoder_fn: Callable[[Key], Tuple[bytes, bool]] = default_key_encoder, + key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, touch_ttl_to_consider_write_failure: Optional[int] = 50, ) -> None: @@ -48,41 +49,39 @@ def _build_cmd( command: MetaCommand, key: Key, size: Optional[int] = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, version: ServerVersion = ServerVersion.STABLE, ) -> bytes: - encoded_key, is_binary = self._key_encoder_fn(key) - cmd = [command.value, encoded_key] - if size is not None: - cmd.append(encode_size(size, version=version)) - cmd_flags: List[bytes] = [] - if is_binary: - cmd_flags.append(Flag.BINARY.value) - if flags: - cmd_flags.extend(flag.value for flag in flags) - if int_flags: - for int_flag, int_value in int_flags.items(): - cmd_flags.append(int_flag.value + str(int_value).encode("ascii")) - if token_flags: - for token_flag, bytes_value in token_flags.items(): - cmd_flags.append(token_flag.value + bytes_value) - cmd.extend(cmd_flags) - return b" ".join(cmd) + ENDL + encoded_key = self._key_encoder_fn(key) + cmd = meta_commands_values[command] + if version == ServerVersion.STABLE: + return build_cmd( + cmd, + encoded_key, + size, + flags, + ) + else: + return build_cmd( + cmd, + encoded_key, + size, + flags, + legacy_size_format=True, + ) - def _prepare_serialized_value_and_int_flags( + def _prepare_serialized_value_and_flags( self, value: ValueContainer, - int_flags: Optional[Dict[IntFlag, int]], - ) -> Tuple[Optional[bytes], Optional[Dict[IntFlag, int]]]: + flags: Optional[RequestFlags], + ) -> Tuple[Optional[bytes], RequestFlags]: encoded_value = self._serializer.serialize(value.value) - int_flags = int_flags if int_flags is not None else {} - int_flags[IntFlag.SET_CLIENT_FLAG] = encoded_value.encoding_id - return encoded_value.data, int_flags + flags = flags if flags is not None else RequestFlags() + flags.client_flag = encoded_value.encoding_id + return encoded_value.data, flags def _is_a_write_failure( - self, command: MetaCommand, int_flags: Optional[Dict[IntFlag, int]] + self, command: MetaCommand, flags: Optional[RequestFlags] ) -> bool: if command in ( MetaCommand.META_DELETE, @@ -92,7 +91,7 @@ def _is_a_write_failure( if ( self._touch_ttl_to_consider_write_failure is not None and command == MetaCommand.META_GET - and (touch_ttl := (int_flags or {}).get(IntFlag.CACHE_TTL, None)) + and (touch_ttl := (flags.cache_ttl if flags else None)) and 0 < touch_ttl <= self._touch_ttl_to_consider_write_failure ): return True @@ -104,16 +103,14 @@ def exec_on_pool( command: MetaCommand, key: Key, value: MaybeValue, - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> MemcacheResponse: - cmd_value, int_flags = ( - (None, int_flags) + cmd_value, flags = ( + (None, flags) if value is None - else self._prepare_serialized_value_and_int_flags(value, int_flags) + else self._prepare_serialized_value_and_flags(value, flags) ) try: conn = pool.pop_connection() @@ -125,8 +122,6 @@ def exec_on_pool( key=key, value=cmd_value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) return self._conn_recv_response(conn, flags=flags) except Exception as e: @@ -135,7 +130,7 @@ def exec_on_pool( finally: pool.release_connection(conn, error=error) except MemcacheServerError: - if track_write_failures and self._is_a_write_failure(command, int_flags): + if track_write_failures and self._is_a_write_failure(command, flags): self.on_write_failure(key) raise_on_server_error = ( raise_on_server_error @@ -154,9 +149,7 @@ def exec_multi_on_pool( # noqa: C901 pool: ConnectionPool, command: MetaCommand, key_values: List[Tuple[Key, MaybeValue]], - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> Dict[Key, MemcacheResponse]: @@ -167,12 +160,10 @@ def exec_multi_on_pool( # noqa: C901 try: # with pool.get_connection() as conn: for key, value in key_values: - cmd_value, int_flags = ( - (None, int_flags) + cmd_value, flags = ( + (None, flags) if value is None - else self._prepare_serialized_value_and_int_flags( - value, int_flags - ) + else self._prepare_serialized_value_and_flags(value, flags) ) self._conn_send_cmd( @@ -181,8 +172,6 @@ def exec_multi_on_pool( # noqa: C901 key=key, value=cmd_value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) for key, _ in key_values: results[key] = self._conn_recv_response(conn, flags=flags) @@ -192,7 +181,7 @@ def exec_multi_on_pool( # noqa: C901 finally: pool.release_connection(conn, error=error) except MemcacheServerError: - if track_write_failures and self._is_a_write_failure(command, int_flags): + if track_write_failures and self._is_a_write_failure(command, flags): for key, _ in key_values: self.on_write_failure(key) raise_on_server_error = ( @@ -215,9 +204,7 @@ def _conn_send_cmd( command: MetaCommand, key: Key, value: Optional[bytes] = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> None: """ Execute command on a connection @@ -227,16 +214,12 @@ def _conn_send_cmd( key, size=len(value) if value is not None else None, flags=flags, - int_flags=int_flags, - token_flags=token_flags, version=conn.get_version(), ) # write meta commands with NOREPLY can potentially return errors # they are not fully silent, so we need to add a no-op to the wire. with_noop = ( - command != MetaCommand.META_GET - and flags is not None - and Flag.NOREPLY in flags + command != MetaCommand.META_GET and flags is not None and flags.no_reply ) if value: @@ -247,12 +230,12 @@ def _conn_send_cmd( def _conn_recv_response( self, conn: MemcacheSocket, - flags: Optional[Set[Flag]] = None, + flags: Optional[RequestFlags] = None, ) -> MemcacheResponse: """ Read response on a connection """ - if flags and Flag.NOREPLY in flags: + if flags and flags.no_reply: return Success(flags=ResponseFlags()) result = conn.get_response() if isinstance(result, Value): diff --git a/src/meta_memcache/extras/client_wrapper.py b/src/meta_memcache/extras/client_wrapper.py index 61f6678..6a1f6be 100644 --- a/src/meta_memcache/extras/client_wrapper.py +++ b/src/meta_memcache/extras/client_wrapper.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional from meta_memcache.commands.high_level_commands import HighLevelCommandsMixin from meta_memcache.configuration import ServerAddress @@ -6,11 +6,9 @@ from meta_memcache.interfaces.router import FailureHandling, DEFAULT_FAILURE_HANDLING from meta_memcache.interfaces.cache_api import CacheApi from meta_memcache.protocol import ( - Flag, - IntFlag, Key, ReadResponse, - TokenFlag, + RequestFlags, WriteResponse, ) @@ -33,32 +31,24 @@ def __init__( def meta_multiget( self, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: return self.client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) def meta_get( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: return self.client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) @@ -67,9 +57,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: return self.client.meta_set( @@ -77,40 +65,30 @@ def meta_set( value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) def meta_delete( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: return self.client.meta_delete( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) def meta_arithmetic( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: return self.client.meta_arithmetic( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, failure_handling=failure_handling, ) diff --git a/src/meta_memcache/extras/migrating_cache_client.py b/src/meta_memcache/extras/migrating_cache_client.py index 9d6c442..9bb5386 100644 --- a/src/meta_memcache/extras/migrating_cache_client.py +++ b/src/meta_memcache/extras/migrating_cache_client.py @@ -1,6 +1,6 @@ import random import time -from typing import Any, Dict, List, Optional, Set, Union +from typing import Any, Dict, List, Optional, Union from meta_memcache.commands.high_level_commands import HighLevelCommandsMixin from meta_memcache.configuration import MigrationMode, ServerAddress @@ -8,12 +8,10 @@ from meta_memcache.events.write_failure_event import WriteFailureEvent from meta_memcache.interfaces.cache_api import CacheApi from meta_memcache.protocol import ( - Flag, - IntFlag, Key, ReadResponse, + RequestFlags, SetMode, - TokenFlag, Value, WriteResponse, ) @@ -94,17 +92,13 @@ def _should_populate_read(self, migration_mode: MigrationMode) -> bool: def meta_multiget( self, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> Dict[Key, ReadResponse]: migration_mode = self.get_migration_mode() if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: return self._destination_client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) elif migration_mode in ( MigrationMode.POPULATE_WRITES_AND_READS_1PCT, @@ -113,8 +107,6 @@ def meta_multiget( results = self._origin_client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if self._should_populate_read(migration_mode): for key, result in results.items(): @@ -131,24 +123,18 @@ def meta_multiget( return self._origin_client.meta_multiget( keys=keys, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) def meta_get( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> ReadResponse: migration_mode = self.get_migration_mode() if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: return self._destination_client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) elif migration_mode in ( MigrationMode.POPULATE_WRITES_AND_READS_1PCT, @@ -157,8 +143,6 @@ def meta_get( result = self._origin_client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if isinstance(result, Value) and self._should_populate_read(migration_mode): self._destination_client.set( @@ -173,8 +157,6 @@ def meta_get( return self._origin_client.meta_get( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) def meta_set( @@ -182,9 +164,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> WriteResponse: origin_response = destination_response = None migration_mode = self.get_migration_mode() @@ -194,8 +174,6 @@ def meta_set( value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode > MigrationMode.ONLY_ORIGIN: destination_response = self._destination_client.meta_set( @@ -203,8 +181,6 @@ def meta_set( value=value, ttl=ttl, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: assert destination_response is not None # noqa: S101 @@ -216,9 +192,7 @@ def meta_set( def meta_delete( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> WriteResponse: origin_response = destination_response = None migration_mode = self.get_migration_mode() @@ -226,15 +200,11 @@ def meta_delete( origin_response = self._origin_client.meta_delete( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode > MigrationMode.ONLY_ORIGIN: destination_response = self._destination_client.meta_delete( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) if migration_mode >= MigrationMode.USE_DESTINATION_UPDATE_ORIGIN: assert destination_response is not None # noqa: S101 @@ -246,9 +216,7 @@ def meta_delete( def meta_arithmetic( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, ) -> WriteResponse: """ We can't reliably migrate cache data modified by meta-arithmetic, @@ -261,15 +229,11 @@ def meta_arithmetic( return self._destination_client.meta_arithmetic( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) else: return self._origin_client.meta_arithmetic( key=key, flags=flags, - int_flags=int_flags, - token_flags=token_flags, ) def touch( diff --git a/src/meta_memcache/interfaces/executor.py b/src/meta_memcache/interfaces/executor.py index f211b1a..c3ae584 100644 --- a/src/meta_memcache/interfaces/executor.py +++ b/src/meta_memcache/interfaces/executor.py @@ -1,15 +1,14 @@ -from typing import Dict, List, Optional, Protocol, Set, Tuple +from typing import Dict, List, Optional, Protocol, Tuple + from meta_memcache.connection.pool import ConnectionPool from meta_memcache.events.write_failure_event import WriteFailureEvent from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) @@ -20,9 +19,7 @@ def exec_on_pool( command: MetaCommand, key: Key, value: MaybeValue, - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> MemcacheResponse: @@ -38,9 +35,7 @@ def exec_multi_on_pool( pool: ConnectionPool, command: MetaCommand, key_values: List[Tuple[Key, MaybeValue]], - flags: Optional[Set[Flag]], - int_flags: Optional[Dict[IntFlag, int]], - token_flags: Optional[Dict[TokenFlag, bytes]], + flags: Optional[RequestFlags], track_write_failures: bool, raise_on_server_error: Optional[bool] = None, ) -> Dict[Key, MemcacheResponse]: diff --git a/src/meta_memcache/interfaces/meta_commands.py b/src/meta_memcache/interfaces/meta_commands.py index ed6fa23..8850394 100644 --- a/src/meta_memcache/interfaces/meta_commands.py +++ b/src/meta_memcache/interfaces/meta_commands.py @@ -1,13 +1,11 @@ -from typing import Any, Dict, List, Optional, Protocol, Set +from typing import Any, Dict, List, Optional, Protocol from meta_memcache.interfaces.router import FailureHandling, DEFAULT_FAILURE_HANDLING from meta_memcache.protocol import ( - Flag, - IntFlag, Key, ReadResponse, - TokenFlag, WriteResponse, + RequestFlags, ) @@ -15,9 +13,7 @@ class MetaCommandsProtocol(Protocol): def meta_multiget( self, keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: ... # pragma: no cover @@ -25,9 +21,7 @@ def meta_multiget( def meta_get( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: ... # pragma: no cover @@ -37,9 +31,7 @@ def meta_set( key: Key, value: Any, ttl: int, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: ... # pragma: no cover @@ -47,9 +39,7 @@ def meta_set( def meta_delete( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: ... # pragma: no cover @@ -57,9 +47,7 @@ def meta_delete( def meta_arithmetic( self, key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> WriteResponse: ... # pragma: no cover diff --git a/src/meta_memcache/interfaces/router.py b/src/meta_memcache/interfaces/router.py index 2e212d4..4373d9b 100644 --- a/src/meta_memcache/interfaces/router.py +++ b/src/meta_memcache/interfaces/router.py @@ -1,17 +1,16 @@ -from typing import Dict, List, NamedTuple, Optional, Protocol, Set +from typing import Dict, List, NamedTuple, Optional, Protocol + from meta_memcache.configuration import ServerAddress from meta_memcache.connection.pool import PoolCounters from meta_memcache.interfaces.executor import Executor from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) @@ -34,9 +33,7 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: """ @@ -52,9 +49,7 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: """ diff --git a/src/meta_memcache/protocol.py b/src/meta_memcache/protocol.py index fe76688..76cfa31 100644 --- a/src/meta_memcache/protocol.py +++ b/src/meta_memcache/protocol.py @@ -1,8 +1,19 @@ from dataclasses import dataclass from enum import Enum, IntEnum -from typing import Any, Dict, List, Optional, Union +from typing import Any, List, Optional, Union + +from meta_socket import ( # noqa: F401 + ResponseFlags, + RequestFlags as RequestFlags, + SET_MODE_ADD, + SET_MODE_APPEND, + SET_MODE_PREPEND, + SET_MODE_REPLACE, + SET_MODE_SET, + MA_MODE_INC as MA_MODE_INC, + MA_MODE_DEC as MA_MODE_DEC, +) -from meta_socket import ResponseFlags ENDL = b"\r\n" NOOP: bytes = b"mn" + ENDL @@ -39,57 +50,11 @@ class MetaCommand(Enum): class SetMode(Enum): - SET = b"S" # Default - ADD = b"E" # Add if item does NOT EXIST, else LRU bump and return NS - APPEND = b"A" # If item exists, append the new value to its data. - PREPEND = b"P" # If item exists, prepend the new value to its data. - REPLACE = b"R" # Set only if item already exists. - - -class Flag(Enum): - BINARY = b"b" - NOREPLY = b"q" - RETURN_CLIENT_FLAG = b"f" - RETURN_CAS_TOKEN = b"c" - RETURN_VALUE = b"v" - RETURN_TTL = b"t" - RETURN_SIZE = b"s" - RETURN_LAST_ACCESS = b"l" - RETURN_FETCHED = b"h" - RETURN_KEY = b"k" - NO_UPDATE_LRU = b"u" - MARK_STALE = b"I" - - -class IntFlag(Enum): - CACHE_TTL = b"T" - RECACHE_TTL = b"R" - MISS_LEASE_TTL = b"N" - SET_CLIENT_FLAG = b"F" - MA_INITIAL_VALUE = b"J" - MA_DELTA_VALUE = b"D" - CAS_TOKEN = b"C" - - -class TokenFlag(Enum): - OPAQUE = b"O" - # 'M' (mode switch): - # * Meta Arithmetic: - # - I or +: increment - # - D or -: decrement - # * Meta Set: See SetMode Enum above - # - E: "add" command. LRU bump and return NS if item exists. Else add. - # - A: "append" command. If item exists, append the new value to its data. - # - P: "prepend" command. If item exists, prepend the new value to its data. - # - R: "replace" command. Set only if item already exists. - # - S: "set" command. The default mode, added for completeness. - MODE = b"M" - - -# Store maps of byte values (int) to enum value -flag_values: Dict[int, Flag] = {f.value[0]: f for f in Flag} -int_flags_values: Dict[int, IntFlag] = {f.value[0]: f for f in IntFlag} -token_flags_values: Dict[int, TokenFlag] = {f.value[0]: f for f in TokenFlag} + SET = SET_MODE_SET # Default + ADD = SET_MODE_ADD # Add if item does NOT EXIST, else LRU bump and return NS + APPEND = SET_MODE_APPEND # If item exists, append the new value to its data. + PREPEND = SET_MODE_PREPEND # If item exists, prepend the new value to its data. + REPLACE = SET_MODE_REPLACE # Set only if item already exists. @dataclass @@ -113,10 +78,6 @@ class Success(MemcacheResponse): __slots__ = ("flags",) flags: ResponseFlags - @classmethod - def default(cls) -> "Success": - return cls(flags=ResponseFlags()) - @dataclass class Value(Success): @@ -168,10 +129,3 @@ def get_store_success_response_header(version: ServerVersion) -> bytes: if version == ServerVersion.AWS_1_6_6: return b"OK" return b"HD" - - -def encode_size(size: int, version: ServerVersion) -> bytes: - if version == ServerVersion.AWS_1_6_6: - return b"S" + str(size).encode("ascii") - else: - return str(size).encode("ascii") diff --git a/src/meta_memcache/routers/default.py b/src/meta_memcache/routers/default.py index 37ad312..0ebcef2 100644 --- a/src/meta_memcache/routers/default.py +++ b/src/meta_memcache/routers/default.py @@ -1,5 +1,6 @@ from collections import defaultdict -from typing import Callable, DefaultDict, Dict, List, Optional, Set, Tuple +from typing import Callable, DefaultDict, Dict, List, Optional, Tuple + from meta_memcache.configuration import ServerAddress from meta_memcache.connection.pool import ConnectionPool, PoolCounters @@ -7,14 +8,12 @@ from meta_memcache.interfaces.executor import Executor from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING, FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) @@ -32,9 +31,7 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: """ @@ -49,8 +46,6 @@ def exec( key=key, value=value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, track_write_failures=failure_handling.track_write_failures, raise_on_server_error=failure_handling.raise_on_server_error, ) @@ -60,9 +55,7 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: """ @@ -78,8 +71,6 @@ def exec_multi( command=command, key_values=key_values, flags=flags, - int_flags=int_flags, - token_flags=token_flags, track_write_failures=failure_handling.track_write_failures, raise_on_server_error=failure_handling.raise_on_server_error, ) diff --git a/src/meta_memcache/routers/ephemeral.py b/src/meta_memcache/routers/ephemeral.py index e13fde0..291ec4a 100644 --- a/src/meta_memcache/routers/ephemeral.py +++ b/src/meta_memcache/routers/ephemeral.py @@ -1,20 +1,18 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from meta_memcache.connection.providers import ConnectionPoolProvider from meta_memcache.interfaces.executor import Executor from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING, FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) from meta_memcache.routers.default import DefaultRouter -from meta_memcache.routers.helpers import adjust_int_flags_for_max_ttl +from meta_memcache.routers.helpers import adjust_flags_for_max_ttl class EphemeralRouter(DefaultRouter): @@ -52,18 +50,14 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: return super().exec( command=command, key=key, value=value, - flags=flags, - int_flags=adjust_int_flags_for_max_ttl(int_flags, self._max_ttl), - token_flags=token_flags, + flags=adjust_flags_for_max_ttl(flags, self._max_ttl), failure_handling=failure_handling, ) @@ -72,17 +66,13 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: return super().exec_multi( command=command, keys=keys, values=values, - flags=flags, - int_flags=adjust_int_flags_for_max_ttl(int_flags, self._max_ttl), - token_flags=token_flags, + flags=adjust_flags_for_max_ttl(flags, self._max_ttl), failure_handling=failure_handling, ) diff --git a/src/meta_memcache/routers/gutter.py b/src/meta_memcache/routers/gutter.py index 91e8eb0..0f32511 100644 --- a/src/meta_memcache/routers/gutter.py +++ b/src/meta_memcache/routers/gutter.py @@ -1,21 +1,19 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from meta_memcache.connection.providers import ConnectionPoolProvider from meta_memcache.errors import MemcacheServerError from meta_memcache.interfaces.executor import Executor from meta_memcache.interfaces.router import DEFAULT_FAILURE_HANDLING, FailureHandling from meta_memcache.protocol import ( - Flag, - IntFlag, Key, MaybeValue, MaybeValues, MemcacheResponse, MetaCommand, - TokenFlag, + RequestFlags, ) from meta_memcache.routers.default import DefaultRouter -from meta_memcache.routers.helpers import adjust_int_flags_for_max_ttl +from meta_memcache.routers.helpers import adjust_flags_for_max_ttl class GutterRouter(DefaultRouter): @@ -38,9 +36,7 @@ def exec( command: MetaCommand, key: Key, value: MaybeValue = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> MemcacheResponse: """ @@ -57,8 +53,6 @@ def exec( key=key, value=value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # We always want to raise on server errors so we can # try the gutter pool raise_on_server_error=True, @@ -67,15 +61,13 @@ def exec( ) except MemcacheServerError: # Override TTLs > than gutter TTL - int_flags = adjust_int_flags_for_max_ttl(int_flags, self._gutter_ttl) + flags = adjust_flags_for_max_ttl(flags, self._gutter_ttl) return self.executor.exec_on_pool( pool=self.gutter_pool_provider.get_pool(key), command=command, key=key, value=value, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # Respect the raise_on_server_error flag if the gutter pool also # fails raise_on_server_error=failure_handling.raise_on_server_error, @@ -89,9 +81,7 @@ def exec_multi( command: MetaCommand, keys: List[Key], values: MaybeValues = None, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, MemcacheResponse]: """ @@ -110,8 +100,6 @@ def exec_multi( command=command, key_values=key_values, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # We always want to raise on server errors so we can # try the gutter pool raise_on_server_error=True, @@ -126,7 +114,7 @@ def exec_multi( gutter_values.append(value) if gutter_keys: # Override TTLs > than gutter TTL - int_flags = adjust_int_flags_for_max_ttl(int_flags, self._gutter_ttl) + flags = adjust_flags_for_max_ttl(flags, self._gutter_ttl) for pool, key_values in self._exec_multi_prepare_pool_map( self.gutter_pool_provider.get_pool, gutter_keys, gutter_values ).items(): @@ -136,8 +124,6 @@ def exec_multi( command=command, key_values=key_values, flags=flags, - int_flags=int_flags, - token_flags=token_flags, # Respect the raise_on_server_error flag if the gutter pool also # fails raise_on_server_error=failure_handling.raise_on_server_error, diff --git a/src/meta_memcache/routers/helpers.py b/src/meta_memcache/routers/helpers.py index f8f1bf3..f9ebc29 100644 --- a/src/meta_memcache/routers/helpers.py +++ b/src/meta_memcache/routers/helpers.py @@ -1,23 +1,24 @@ -from typing import Dict, Optional +from typing import Optional -from meta_memcache.protocol import IntFlag +from meta_memcache.protocol import RequestFlags -def adjust_int_flags_for_max_ttl( - int_flags: Optional[Dict[IntFlag, int]], +def _adjust_ttl(ttl: Optional[int], max_ttl: int) -> Optional[int]: + if ttl is not None and (ttl == 0 or ttl > max_ttl): + return max_ttl + return ttl + + +def adjust_flags_for_max_ttl( + flags: Optional[RequestFlags], max_ttl: int, -) -> Optional[Dict[IntFlag, int]]: +) -> Optional[RequestFlags]: """ Override TTLs > than `max_ttl` """ - if int_flags: - for flag in ( - IntFlag.CACHE_TTL, - IntFlag.RECACHE_TTL, - IntFlag.MISS_LEASE_TTL, - ): - ttl = int_flags.get(flag) - if ttl is not None and (ttl == 0 or ttl > max_ttl): - int_flags[flag] = max_ttl + if flags: + flags.cache_ttl = _adjust_ttl(flags.cache_ttl, max_ttl) + flags.recache_ttl = _adjust_ttl(flags.recache_ttl, max_ttl) + flags.vivify_on_miss_ttl = _adjust_ttl(flags.vivify_on_miss_ttl, max_ttl) - return int_flags + return flags diff --git a/src/meta_memcache/settings.py b/src/meta_memcache/settings.py index 9e9a207..cecbbfd 100644 --- a/src/meta_memcache/settings.py +++ b/src/meta_memcache/settings.py @@ -5,4 +5,7 @@ DEFAULT_READ_BUFFER_SIZE = 4096 -MAX_KEY_SIZE = 250 +# Max key is 250, but when using binary keys will be b64 encoded +# so take more space. Keys longer than this will be hashed, so +# it's not a problem. +MAX_KEY_SIZE = 187 diff --git a/tests/cache_client_test.py b/tests/cache_client_test.py index 9e376e9..e224f6f 100644 --- a/tests/cache_client_test.py +++ b/tests/cache_client_test.py @@ -12,7 +12,7 @@ ) from meta_memcache.connection.pool import ConnectionPool, PoolCounters from meta_memcache.errors import MemcacheServerError -from meta_memcache.protocol import NOOP, Flag, IntFlag +from meta_memcache.protocol import NOOP, RequestFlags from meta_memcache.settings import DEFAULT_MARK_DOWN_PERIOD_S @@ -167,10 +167,10 @@ def connect(server_address: Tuple[str, int]) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) get_pool.assert_has_calls([call(Key(key="bar")), call(Key(key="foo"))]) get_gutter_pool.assert_called_once_with(Key("foo")) @@ -194,10 +194,10 @@ def connect(server_address: Tuple[str, int]) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) get_pool.assert_has_calls([call(Key(key="bar")), call(Key(key="foo"))]) get_gutter_pool.assert_called_once_with(Key("foo")) @@ -220,10 +220,10 @@ def connect(server_address: Tuple[str, int]) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) get_pool.assert_has_calls([call(Key(key="bar")), call(Key(key="foo"))]) get_gutter_pool.assert_not_called() diff --git a/tests/commands_test.py b/tests/commands_test.py index 3bd2241..cafb033 100644 --- a/tests/commands_test.py +++ b/tests/commands_test.py @@ -28,10 +28,9 @@ Miss, NotStored, ResponseFlags, + RequestFlags, ServerVersion, Success, - IntFlag, - TokenFlag, Value, ) from meta_memcache.routers.default import DefaultRouter @@ -243,7 +242,7 @@ def test_set_cmd( cache_client.set(key=Key("foo"), value=b"123", ttl=300, cas_token=666) memcache_socket.sendall.assert_called_once_with( - b"ms foo 3 T300 C666 F16\r\n123\r\n", with_noop=False + b"ms foo 3 T300 F16 C666\r\n123\r\n", with_noop=False ) memcache_socket.get_response.assert_called_once_with() memcache_socket.sendall.reset_mock() @@ -253,7 +252,7 @@ def test_set_cmd( key=Key("foo"), value=b"123", ttl=300, cas_token=666, stale_policy=StalePolicy() ) memcache_socket.sendall.assert_called_once_with( - b"ms foo 3 T300 C666 F16\r\n123\r\n", with_noop=False + b"ms foo 3 T300 F16 C666\r\n123\r\n", with_noop=False ) memcache_socket.get_response.assert_called_once_with() memcache_socket.sendall.reset_mock() @@ -267,7 +266,7 @@ def test_set_cmd( stale_policy=StalePolicy(mark_stale_on_cas_mismatch=True), ) memcache_socket.sendall.assert_called_once_with( - b"ms foo 3 I T300 C666 F16\r\n123\r\n", with_noop=False + b"ms foo 3 I T300 F16 C666\r\n123\r\n", with_noop=False ) memcache_socket.get_response.assert_called_once_with() memcache_socket.sendall.reset_mock() @@ -309,9 +308,7 @@ def test_refill( key=Key(key="foo"), value="bar", ttl=300, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 300}, - token_flags={TokenFlag.MODE: SetMode.ADD.value}, + flags=RequestFlags(cache_ttl=300, mode=SetMode.ADD.value), failure_handling=FailureHandling(track_write_failures=False), ) @@ -560,7 +557,7 @@ def test_get_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) -> ) assert_called_once_with_command( memcache_socket.sendall, - b"mg lCV3WxKxtWrdY4s1+R710+9J b t l v h f R30 T300\r\n", + b"mg w7puw63Dp29k4o23 b t l v h f R30 T300\r\n", with_noop=False, ) memcache_socket.sendall.reset_mock() @@ -1186,7 +1183,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - cache_client.delta(key=Key("foo"), delta=1, refresh_ttl=60, no_reply=True) memcache_socket.sendall.assert_called_once_with( - b"ma foo q D1 T60\r\n", with_noop=True + b"ma foo q T60 D1\r\n", with_noop=True ) memcache_socket.sendall.reset_mock() @@ -1205,7 +1202,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - cas_token=123, ) memcache_socket.sendall.assert_called_once_with( - b"ma foo q D1 C123 J10 N60\r\n", with_noop=True + b"ma foo q N60 J10 D1 C123\r\n", with_noop=True ) memcache_socket.sendall.reset_mock() @@ -1228,7 +1225,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - ) assert result is None memcache_socket.sendall.assert_called_once_with( - b"ma foo v D1 J0 N60\r\n", with_noop=False + b"ma foo v N60 J0 D1\r\n", with_noop=False ) memcache_socket.sendall.reset_mock() memcache_socket.get_response.reset_mock() @@ -1258,7 +1255,7 @@ def test_delta_cmd(memcache_socket: MemcacheSocket, cache_client: CacheClient) - ) assert result == 10 memcache_socket.sendall.assert_called_once_with( - b"ma foo v D1 J0 N60\r\n", with_noop=False + b"ma foo v N60 J0 D1\r\n", with_noop=False ) memcache_socket.sendall.reset_mock() memcache_socket.get_response.reset_mock() diff --git a/tests/ephemeral_cache_client_test.py b/tests/ephemeral_cache_client_test.py index ddc2a30..e1249a3 100644 --- a/tests/ephemeral_cache_client_test.py +++ b/tests/ephemeral_cache_client_test.py @@ -1,5 +1,5 @@ from unittest.mock import call -from meta_memcache.protocol import NOOP, Flag, IntFlag +from meta_memcache.protocol import NOOP, RequestFlags from pytest_mock import MockerFixture @@ -29,10 +29,10 @@ def test_ephemeral_cache_client(mocker: MockerFixture) -> None: cache_client.meta_multiget( keys=[Key("bar"), Key("foo")], - flags={ - Flag.NOREPLY, - }, - int_flags={IntFlag.CACHE_TTL: 1000}, + flags=RequestFlags( + no_reply=True, + cache_ttl=1000, + ), ) c.sendall.assert_has_calls([call(b"mg bar q T60\r\n"), call(b"mg foo q T60\r\n")]) c.sendall.reset_mock() diff --git a/tests/migrating_cache_client_test.py b/tests/migrating_cache_client_test.py index ddede09..0a96457 100644 --- a/tests/migrating_cache_client_test.py +++ b/tests/migrating_cache_client_test.py @@ -3,12 +3,12 @@ import pytest from meta_memcache import ( CacheClient, - IntFlag, Key, SetMode, Value, WriteFailureEvent, ResponseFlags, + RequestFlags, ) from meta_memcache.extras.migrating_cache_client import ( MigratingCacheClient, @@ -167,9 +167,7 @@ def test_migration_mode_origin_only( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_not_called() @@ -177,9 +175,7 @@ def test_migration_mode_origin_only( migration_client_origin_only.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_not_called() @@ -187,9 +183,7 @@ def test_migration_mode_origin_only( migration_client_origin_only.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -227,9 +221,7 @@ def test_migration_mode_destination_only( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes @@ -237,9 +229,7 @@ def test_migration_mode_destination_only( origin_client.meta_delete.assert_not_called() destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic @@ -247,9 +237,7 @@ def test_migration_mode_destination_only( origin_client.meta_arithmetic.assert_not_called() destination_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) # Touch @@ -289,41 +277,31 @@ def test_migration_mode_populate_writes( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic migration_client.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -451,41 +429,31 @@ def test_migration_mode_populate_writes_and_reads_1pct( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic migration_client.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -566,41 +534,31 @@ def test_migration_mode_populate_writes_and_reads_10pct( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic migration_client.delta(key="foo", delta=1) origin_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) destination_client.meta_arithmetic.assert_not_called() @@ -644,32 +602,24 @@ def test_migration_mode_use_destination_update_origin( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) destination_client.meta_set.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), value="bar", ttl=10, - flags=set(), - int_flags={IntFlag.CACHE_TTL: 10}, - token_flags=None, + flags=RequestFlags(cache_ttl=10), ) # Deletes (both receive writes) migration_client.delete(key="foo") origin_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) destination_client.meta_delete.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={}, - token_flags=None, + flags=RequestFlags(), ) # Arithmetic @@ -677,9 +627,7 @@ def test_migration_mode_use_destination_update_origin( origin_client.meta_arithmetic.assert_not_called() destination_client.meta_arithmetic.assert_called_once_with( key=Key(key="foo", routing_key=None, is_unicode=False), - flags=set(), - int_flags={IntFlag.MA_DELTA_VALUE: 1}, - token_flags={}, + flags=RequestFlags(ma_delta_value=1), ) # Touch diff --git a/tests/probabilistic_hot_cache_test.py b/tests/probabilistic_hot_cache_test.py index 4e02fcf..afb5496 100644 --- a/tests/probabilistic_hot_cache_test.py +++ b/tests/probabilistic_hot_cache_test.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from unittest.mock import Mock from prometheus_client import CollectorRegistry @@ -6,23 +6,21 @@ import pytest -from meta_memcache import CacheClient, IntFlag, Key, Value +from meta_memcache import CacheClient, Key, Value from meta_memcache.errors import MemcacheError from meta_memcache.extras.probabilistic_hot_cache import ( CachedValue, ProbabilisticHotCache, ) from meta_memcache.metrics.prometheus import PrometheusMetricsCollector -from meta_memcache.protocol import Flag, Miss, ReadResponse, TokenFlag, ResponseFlags +from meta_memcache.protocol import Miss, ReadResponse, ResponseFlags, RequestFlags @pytest.fixture def client() -> Mock: def meta_get( key: Key, - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> ReadResponse: if key.key.endswith("hot"): @@ -48,9 +46,7 @@ def meta_get( def meta_multiget( keys: List[Key], - flags: Optional[Set[Flag]] = None, - int_flags: Optional[Dict[IntFlag, int]] = None, - token_flags: Optional[Dict[TokenFlag, bytes]] = None, + flags: Optional[RequestFlags] = None, failure_handling: FailureHandling = DEFAULT_FAILURE_HANDLING, ) -> Dict[Key, ReadResponse]: return {key: meta_get(key=key) for key in keys} @@ -78,15 +74,13 @@ def random(monkeypatch) -> Mock: DEFAULT_FLAGS = { - "flags": { - Flag.RETURN_TTL, - Flag.RETURN_LAST_ACCESS, - Flag.RETURN_VALUE, - Flag.RETURN_FETCHED, - Flag.RETURN_CLIENT_FLAG, - }, - "int_flags": None, - "token_flags": None, + "flags": RequestFlags( + return_ttl=True, + return_last_access=True, + return_value=True, + return_fetched=True, + return_client_flag=True, + ), "failure_handling": DEFAULT_FAILURE_HANDLING, }