Skip to content

Commit

Permalink
chore: use slave sentinel for readonly vertices (#161)
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 authored Jul 10, 2023
1 parent 620d1f3 commit 5e26f2b
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 238 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# These owners will be the default owners for everything in
# the repo. Unless a later match takes precedence
* @ab93 @nkoppisetty @vigith
* @ab93 @nkoppisetty @vigith @s0nicboOm
23 changes: 16 additions & 7 deletions numaprom/clients/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from numaprom.watcher import ConfigManager


SENTINEL_MASTER_CLIENT: redis_client_t | None = None
SENTINEL_CLIENT: redis_client_t | None = None


def get_redis_client(
Expand All @@ -20,6 +20,7 @@ def get_redis_client(
password: str,
mastername: str,
recreate: bool = False,
master_node: bool = True,
) -> redis_client_t:
"""Return a master redis client for sentinel connections, with retry.
Expand All @@ -31,15 +32,16 @@ def get_redis_client(
mastername: Redis sentinel master name
decode_responses: Whether to decode responses
recreate: Whether to flush and recreate the client
master_node: Whether to use the master node or the slave nodes
Returns
-------
Redis client instance
"""
global SENTINEL_MASTER_CLIENT
global SENTINEL_CLIENT

if not recreate and SENTINEL_MASTER_CLIENT:
return SENTINEL_MASTER_CLIENT
if not recreate and SENTINEL_CLIENT:
return SENTINEL_CLIENT

retry = Retry(
ExponentialBackoff(),
Expand Down Expand Up @@ -67,9 +69,16 @@ def get_redis_client(
password=password,
**conn_kwargs
)
SENTINEL_MASTER_CLIENT = sentinel.master_for(mastername)
LOGGER.info("Sentinel redis params: {args}", args=conn_kwargs)
return SENTINEL_MASTER_CLIENT
if master_node:
SENTINEL_CLIENT = sentinel.master_for(mastername)
else:
SENTINEL_CLIENT = sentinel.slave_for(mastername)
LOGGER.info(
"Sentinel redis params: {args}, master_node: {is_master}",
args=conn_kwargs,
is_master=master_node,
)
return SENTINEL_CLIENT


def get_redis_client_from_conf(redis_conf: RedisConf = None, **kwargs) -> redis_client_t:
Expand Down
4 changes: 3 additions & 1 deletion numaprom/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

Vector = list[float]
Matrix = Vector | list[Vector] | npt.NDArray[float]
np.set_printoptions(precision=3, suppress=True)


class Status(str, Enum):
Expand Down Expand Up @@ -135,7 +136,8 @@ def as_json(self) -> bytes:
"Type": self.type,
"Value": self.value,
"Labels": self.labels,
}
},
option=orjson.OPT_SERIALIZE_NUMPY,
)

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion numaprom/udf/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from numaprom.tools import msg_forward
from numaprom.watcher import ConfigManager

REDIS_CLIENT = get_redis_client_from_conf()
REDIS_CLIENT = get_redis_client_from_conf(master_node=False)
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", 3600)) # default ttl set to 1 hour


Expand Down
16 changes: 9 additions & 7 deletions numaprom/udf/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
from numaprom.watcher import ConfigManager

AUTH = os.getenv("REDIS_AUTH")
SCORE_PRECISION = int(os.getenv("SCORE_PRECISION", 3))
UNDEFINED_SCORE = -1.0


def __save_to_redis(
payload: StreamPayload, final_score: float, recreate: bool, unified_config: UnifiedConf
):
) -> tuple[float, list[float]]:
r = get_redis_client_from_conf(recreate=recreate)

metric_name = payload.composite_keys["name"]
Expand All @@ -27,7 +29,7 @@ def __save_to_redis(
r_keys.pop("name")
r_key = f"{':'.join(r_keys.values())}:{payload.end_ts}"

final_score = -1 if np.isnan(final_score) else final_score
final_score = UNDEFINED_SCORE if np.isnan(final_score) else final_score
r.hset(r_key, mapping={metric_name: final_score})
LOGGER.info(
"{uuid} - Saved to redis, redis_key: {redis_key}, metric: {metric_name}, "
Expand All @@ -50,7 +52,7 @@ def __save_to_redis(
metric=m,
redis_key=r_key,
)
return -1, []
return UNDEFINED_SCORE, []

LOGGER.debug("{uuid} - Received all metrics, generating unified anomaly", uuid=payload.uuid)
unified_weights = unified_config.unified_weights
Expand Down Expand Up @@ -86,14 +88,13 @@ def __construct_publisher_payload(
for key in stream_payload.composite_keys:
if key != "name":
labels[key] = stream_payload.composite_keys[key]

return PrometheusPayload(
timestamp_ms=int(stream_payload.end_ts),
name=f"{metric_name}_anomaly",
namespace=namespace,
subsystem=None,
type="Gauge",
value=float(final_score),
value=round(final_score, SCORE_PRECISION),
labels=labels,
)

Expand All @@ -115,7 +116,7 @@ def __construct_unified_payload(
namespace=namespace,
subsystem=None,
type="Gauge",
value=max_anomaly,
value=round(max_anomaly, SCORE_PRECISION),
labels=labels,
)

Expand Down Expand Up @@ -151,7 +152,8 @@ def _publish(final_score: float, payload: StreamPayload) -> list[bytes]:
payload=payload, final_score=final_score, recreate=True, unified_config=unified_config
)

if unified_anomaly > -1:
# If the unified anomaly is -1, we don't want to publish it
if unified_anomaly >= 0:
unified_json = __construct_unified_payload(
payload, unified_anomaly, unified_config
).as_json()
Expand Down
1 change: 1 addition & 0 deletions numaprom/udf/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
password=AUTH,
mastername=REDIS_CONF.master_name,
recreate=False,
master_node=False,
)
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", 3600)) # default ttl set to 1 hour

Expand Down
4 changes: 3 additions & 1 deletion numaprom/udf/threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]:

# load threshold artifact
local_cache = LocalLRUCache(ttl=LOCAL_CACHE_TTL)
model_registry = RedisRegistry(client=get_redis_client_from_conf(), cache_registry=local_cache)
model_registry = RedisRegistry(
client=get_redis_client_from_conf(master_node=False), cache_registry=local_cache
)
try:
thresh_artifact = model_registry.load(
skeys=[payload.composite_keys["namespace"], payload.composite_keys["name"]],
Expand Down
Loading

0 comments on commit 5e26f2b

Please sign in to comment.