Skip to content

Commit

Permalink
Implement response heading parsing in rust
Browse files Browse the repository at this point in the history
## Motivation / Description
There was nothing else to optimize in python, but Rust can parse
the string and build a ResponseFlags object in 22-50ns depending
on the number of flags.

While the interface between rust and python will never reach ns
performance, it helps indeed finding the header in the
response buffer.

Initial (before all optimizations):
multithreaded: Overall: 110779.55 RPS / 9.03 us/req
singlethreaded: Overall: 111545.63 RPS / 8.96 us/req

Python optimized:
multithreaded: Overall: 193340.40 RPS / 5.17 us/req
singlethreaded: Overall: 193036.56 RPS / 5.18 us/req

Using rust for header parsing:
multithreaded: Overall: 245898.34 RPS / 4.07 us/req
singlethreaded: Overall: 246165.19 RPS / 4.06 us/req
  • Loading branch information
bisho committed Nov 7, 2023
1 parent d117d3f commit c4b711c
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 259 deletions.
1 change: 1 addition & 0 deletions src/meta_memcache/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
Miss,
NotStored,
ServerVersion,
ResponseFlags,
SetMode,
Success,
TokenFlag,
Expand Down
16 changes: 8 additions & 8 deletions src/meta_memcache/commands/high_level_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,23 +289,23 @@ def get_or_lease_cas(

if isinstance(result, Value):
# It is a hit.
if result.win:
if result.flags.win:
# Win flag present, meaning we got the lease to
# recache/cache the item. We need to mimic a miss.
return None, result.cas_token
if result.size == 0 and result.win is False:
return None, result.flags.cas_token
if result.size == 0 and result.flags.win is False:
# The value is empty, this is a miss lease,
# and we lost, so we must keep retrying and
# wait for the winner to populate the value.
# wait for the.flags.winner to populate the value.
if i < lease_policy.miss_retries:
continue
else:
# We run out of retries, behave as a miss
return None, result.cas_token
return None, result.flags.cas_token
else:
# There is data, either the is no lease or
# we lost and should use the stale value.
return result.value, result.cas_token
return result.value, result.flags.cas_token
else:
# With MISS_LEASE_TTL we should always get a value
# because on miss a lease empty value is generated
Expand Down Expand Up @@ -380,7 +380,7 @@ def get_cas(
if result is None:
return None, None
else:
return result.value, result.cas_token
return result.value, result.flags.cas_token

def _get(
self: HighLevelCommandMixinWithMetaCommands,
Expand Down Expand Up @@ -416,7 +416,7 @@ def _process_get_result(
) -> Optional[Value]:
if isinstance(result, Value):
# It is a hit
if result.win:
if result.flags.win:
# Win flag present, meaning we got the lease to
# recache the item. We need to mimic a miss, so
# we set the value to None.
Expand Down
65 changes: 38 additions & 27 deletions src/meta_memcache/connection/memcache_socket.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import logging
import socket
from typing import Union
from typing import Optional, Tuple, Union

import meta_socket

from meta_memcache.errors import MemcacheError
from meta_memcache.protocol import (
ENDL,
ENDL_LEN,
EMPTY_RESPONSE_FLAGS,
NOOP,
Conflict,
Miss,
NotStored,
ServerVersion,
Success,
ResponseFlags,
Value,
get_store_success_response_header,
)
Expand Down Expand Up @@ -119,30 +123,29 @@ def _reset_buffer(self) -> None:
self._pos = 0
self._read = remaining_data

def _get_single_header(self) -> memoryview:
def _get_single_header(
self,
) -> Tuple[int, Optional[int], Optional[int], Optional[ResponseFlags]]:
# Reset buffer for new data
if self._read == self._pos:
self._read = 0
self._pos = 0
elif self._pos > self._reset_buffer_size:
self._reset_buffer()

endl_pos = -1
while True:
if self._read - self._pos > ENDL_LEN:
endl_pos = self._buf.find(ENDL, self._pos, self._read)
if endl_pos >= 0:
break
if self._read != self._pos:
# We have data in the buffer: find the header
if header_data := meta_socket.parse_header(
self._buf_view, self._pos, self._read
):
self._pos = header_data[0]
return header_data
# Missing data, but still space in buffer, so read more
if self._recv_info_buffer() <= 0:
break

if endl_pos < 0:
raise MemcacheError("Bad response. Socket might have closed unexpectedly")

pos = self._pos
self._pos = endl_pos + ENDL_LEN
return self._buf_view[pos:endl_pos]
raise MemcacheError("Bad response. Socket might have closed unexpectedly")

def sendall(self, data: bytes, with_noop: bool = False) -> None:
if with_noop:
Expand All @@ -153,10 +156,12 @@ def sendall(self, data: bytes, with_noop: bool = False) -> None:
def _read_until_noop_header(self) -> None:
while self._noop_expected > 0:
header = self._get_single_header()
if header[0:2] == b"MN":
if header[1] == meta_socket.RESPONSE_NOOP:
self._noop_expected -= 1

def _get_header(self) -> memoryview:
def _get_header(
self,
) -> Tuple[int, Optional[int], Optional[int], Optional[ResponseFlags]]:
try:
if self._noop_expected > 0:
self._read_until_noop_header()
Expand All @@ -169,30 +174,36 @@ def _get_header(self) -> memoryview:
def get_response(
self,
) -> Union[Value, Success, NotStored, Conflict, Miss]:
header = self._get_header().tobytes()
response_code = header[0:2]
(_, response_code, size, flags) = self._get_header()
result: Union[Value, Success, NotStored, Conflict, Miss]
try:
if response_code == b"VA":
if response_code == meta_socket.RESPONSE_VALUE:
if size is None:
raise MemcacheError("Bad value response. Missing size")
# Value response
result = Value.from_header(header)
elif response_code == self._store_success_response_header:
result = Value(
size=size, flags=flags or EMPTY_RESPONSE_FLAGS, value=None
)
elif response_code == meta_socket.RESPONSE_SUCCESS:
# Stored or no value, return Success
result = Success.from_header(header)
elif response_code == b"NS":
result = Success(flags=flags or EMPTY_RESPONSE_FLAGS)
elif response_code == meta_socket.RESPONSE_NOT_STORED:
# Value response, parse size and flags
result = NOT_STORED
elif response_code == b"EX":
elif response_code == meta_socket.RESPONSE_CONFLICT:
# Already exists, not changed, CAS conflict
result = CONFLICT
elif response_code == b"EN" or response_code == b"NF":
elif response_code == meta_socket.RESPONSE_MISS:
# Not Found, Miss.
result = MISS
else:
raise MemcacheError(f"Unknown response: {bytes(response_code)!r}")
raise MemcacheError(f"Unknown response: {response_code}")
except Exception as e:
_log.warning(f"Error parsing response header in {self}: {header!r}")
raise MemcacheError(f"Error parsing response header {header!r}") from e
_log.warning(
f"Error parsing response header in {self}: "
f"Response: {response_code}, size {size}, flags: {flags}"
)
raise MemcacheError("Error parsing response header") from e

return result

Expand Down
5 changes: 3 additions & 2 deletions src/meta_memcache/executors/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
MetaCommand,
Miss,
NotStored,
ResponseFlags,
ServerVersion,
Success,
TokenFlag,
Expand Down Expand Up @@ -252,12 +253,12 @@ def _conn_recv_response(
Read response on a connection
"""
if flags and Flag.NOREPLY in flags:
return Success()
return Success(flags=ResponseFlags())
result = conn.get_response()
if isinstance(result, Value):
data = conn.get_value(result.size)
if result.size > 0:
encoding_id = result.client_flag or 0
encoding_id = result.flags.client_flag or 0
try:
result.value = self._serializer.unserialize(data, encoding_id)
except Exception:
Expand Down
6 changes: 5 additions & 1 deletion src/meta_memcache/extras/migrating_cache_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ def get_migration_mode(self) -> MigrationMode:
return current_mode

def _get_value_ttl(self, value: Value) -> int:
ttl = value.ttl if value.ttl is not None else self._default_read_backfill_ttl
ttl = (
value.flags.ttl
if value.flags.ttl is not None
else self._default_read_backfill_ttl
)
if ttl < 0:
# TTL for items marked to store forvered is returned as -1
ttl = 0
Expand Down
7 changes: 4 additions & 3 deletions src/meta_memcache/extras/probabilistic_hot_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ def _store_in_hot_cache_if_necessary(
allowed: bool,
) -> None:
if not is_hot:
hit_after_write = value.fetched or 0
last_read_age = value.last_access if value.last_access is not None else 9999
last_read_age = (
value.flags.last_access if value.flags.last_access is not None else 9999
)
if (
hit_after_write > 0
value.flags.fetched
and last_read_age <= self._max_last_access_age_seconds
):
# Is detected as hot
Expand Down
Loading

0 comments on commit c4b711c

Please sign in to comment.