diff --git a/numaprom/metrics/__init__.py b/numaprom/metrics/__init__.py index 4c023b6..ebfa381 100644 --- a/numaprom/metrics/__init__.py +++ b/numaprom/metrics/__init__.py @@ -1,15 +1,11 @@ from numaprom.metrics._metrics import ( - increase_redis_conn_status, + increase_redis_conn_error, inc_inference_count, start_metrics_server, - inc_redis_conn_success, - inc_redis_conn_failed, ) __all__ = [ - "increase_redis_conn_status", + "increase_redis_conn_error", "inc_inference_count", "start_metrics_server", - "inc_redis_conn_success", - "inc_redis_conn_failed", ] diff --git a/numaprom/metrics/_metrics.py b/numaprom/metrics/_metrics.py index ded1228..e4938ba 100644 --- a/numaprom/metrics/_metrics.py +++ b/numaprom/metrics/_metrics.py @@ -6,19 +6,15 @@ from numaprom import LOGGER # Metrics -REDIS_CONN_STATUS_COUNT = Counter("numaprom_redis_conn_status_count", "", ["vertex", "status"]) +REDIS_CONN_ERROR_COUNT = Counter("numaprom_redis_conn_error_count", "", ["vertex"]) INFERENCE_COUNT = Counter( "numaprom_inference_count", "", ["model", "namespace", "app", "metric", "status"] ) -def increase_redis_conn_status(vertex: str, status: str) -> None: - global REDIS_CONN_STATUS_COUNT - REDIS_CONN_STATUS_COUNT.labels(vertex, status).inc() - - -inc_redis_conn_success = partial(increase_redis_conn_status, status="success") -inc_redis_conn_failed = partial(increase_redis_conn_status, status="failed") +def increase_redis_conn_error(vertex: str, status: str) -> None: + global REDIS_CONN_ERROR_COUNT + REDIS_CONN_ERROR_COUNT.labels(vertex).inc() def inc_inference_count(model: str, namespace: str, app: str, metric: str, status: str) -> None: diff --git a/numaprom/udf/inference.py b/numaprom/udf/inference.py index 08adee8..f4bc5c1 100644 --- a/numaprom/udf/inference.py +++ b/numaprom/udf/inference.py @@ -15,8 +15,8 @@ from numaprom.clients.sentinel import get_redis_client_from_conf from numaprom.entities import PayloadFactory from numaprom.entities import Status, StreamPayload, Header +from numaprom.metrics import increase_redis_conn_error, inc_inference_count from numaprom.tools import msg_forward -from numaprom.metrics import inc_redis_conn_success, inc_inference_count, inc_redis_conn_failed from numaprom.watcher import ConfigManager @@ -89,10 +89,9 @@ def inference(_: list[str], datum: Datum) -> bytes: ) payload.set_header(Header.STATIC_INFERENCE) payload.set_status(Status.RUNTIME_ERROR) - inc_redis_conn_failed(_VERTEX) + increase_redis_conn_error(_VERTEX) return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) - inc_redis_conn_success(_VERTEX) if not artifact_data: LOGGER.info( "{uuid} - Inference artifact not found, " diff --git a/numaprom/udf/postprocess.py b/numaprom/udf/postprocess.py index 3672243..aa41573 100644 --- a/numaprom/udf/postprocess.py +++ b/numaprom/udf/postprocess.py @@ -11,8 +11,9 @@ from numaprom import LOGGER, UnifiedConf from numaprom.clients.sentinel import get_redis_client_from_conf from numaprom.entities import Status, PrometheusPayload, StreamPayload, Header +from numaprom.metrics import increase_redis_conn_error from numaprom.tools import msgs_forward, WindowScorer -from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed + from numaprom.watcher import ConfigManager @@ -152,12 +153,10 @@ def _publish(final_score: float, payload: StreamPayload) -> list[bytes]: uuid=payload.uuid, warn=warn, ) - inc_redis_conn_failed(_VERTEX) + increase_redis_conn_error(_VERTEX) unified_anomaly, anomalies = __save_to_redis( payload=payload, final_score=final_score, recreate=True, unified_config=unified_config ) - else: - inc_redis_conn_success(_VERTEX) # If the unified anomaly is -1, we don't want to publish it if unified_anomaly >= 0: diff --git a/numaprom/udf/preprocess.py b/numaprom/udf/preprocess.py index 376cdc5..824b38d 100644 --- a/numaprom/udf/preprocess.py +++ b/numaprom/udf/preprocess.py @@ -11,7 +11,7 @@ from numaprom.clients.sentinel import get_redis_client from numaprom.entities import Status, StreamPayload, Header from numaprom.tools import msg_forward -from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed +from numaprom.metrics import increase_redis_conn_error from numaprom.watcher import ConfigManager _VERTEX: Final[str] = "preprocess" @@ -58,7 +58,7 @@ def preprocess(_: list[str], datum: Datum) -> bytes: ) payload.set_header(Header.STATIC_INFERENCE) payload.set_status(Status.RUNTIME_ERROR) - inc_redis_conn_failed(_VERTEX) + increase_redis_conn_error(_VERTEX) return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) except Exception as ex: LOGGER.exception( @@ -72,7 +72,6 @@ def preprocess(_: list[str], datum: Datum) -> bytes: payload.set_status(Status.RUNTIME_ERROR) return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) - inc_redis_conn_success(_VERTEX) if not preproc_artifact: LOGGER.info( "{uuid} - Preprocess artifact not found, forwarding for static thresholding. " diff --git a/numaprom/udf/threshold.py b/numaprom/udf/threshold.py index d4bf6e1..13b9bfa 100644 --- a/numaprom/udf/threshold.py +++ b/numaprom/udf/threshold.py @@ -12,8 +12,8 @@ from numaprom._constants import TRAIN_VTX_KEY, POSTPROC_VTX_KEY from numaprom.clients.sentinel import get_redis_client_from_conf from numaprom.entities import Status, TrainerPayload, PayloadFactory, Header +from numaprom.metrics import increase_redis_conn_error from numaprom.tools import conditional_forward, calculate_static_thresh -from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed from numaprom.watcher import ConfigManager @@ -84,7 +84,7 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]: ) payload.set_header(Header.STATIC_INFERENCE) payload.set_status(Status.RUNTIME_ERROR) - inc_redis_conn_failed(_VERTEX) + increase_redis_conn_error(_VERTEX) return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) except Exception as ex: LOGGER.exception( @@ -100,8 +100,6 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]: (TRAIN_VTX_KEY, orjson.dumps(train_payload)), (POSTPROC_VTX_KEY, _get_static_thresh_payload(payload, metric_config)), ] - else: - inc_redis_conn_success(_VERTEX) if not thresh_artifact: LOGGER.info( diff --git a/numaprom/udf/window.py b/numaprom/udf/window.py index d157429..fc4219d 100644 --- a/numaprom/udf/window.py +++ b/numaprom/udf/window.py @@ -12,8 +12,8 @@ from numaprom import LOGGER from numaprom.clients.sentinel import get_redis_client_from_conf from numaprom.entities import StreamPayload, Status, Header +from numaprom.metrics import increase_redis_conn_error from numaprom.tools import msg_forward, create_composite_keys -from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed from numaprom.watcher import ConfigManager _VERTEX: Final[str] = "window" @@ -91,9 +91,7 @@ def window(_: list[str], datum: Datum) -> bytes | None: elements = __aggregate_window( unique_key, msg["timestamp"], value, win_size, buff_size, recreate=True ) - inc_redis_conn_failed(_VERTEX) - else: - inc_redis_conn_success(_VERTEX) + increase_redis_conn_error(_VERTEX) # Drop message if no of elements is less than sequence length needed if len(elements) < win_size: