Skip to content

Commit

Permalink
updated
Browse files Browse the repository at this point in the history
Signed-off-by: Saravanan Balasubramanian <[email protected]>
  • Loading branch information
sarabala1979 committed Jul 31, 2023
1 parent 50fcf44 commit 38629a8
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 32 deletions.
8 changes: 2 additions & 6 deletions numaprom/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
from numaprom.metrics._metrics import (
increase_redis_conn_status,
increase_redis_conn_error,
inc_inference_count,
start_metrics_server,
inc_redis_conn_success,
inc_redis_conn_failed,
)

__all__ = [
"increase_redis_conn_status",
"increase_redis_conn_error",
"inc_inference_count",
"start_metrics_server",
"inc_redis_conn_success",
"inc_redis_conn_failed",
]
12 changes: 4 additions & 8 deletions numaprom/metrics/_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@
from numaprom import LOGGER

# Metrics
REDIS_CONN_STATUS_COUNT = Counter("numaprom_redis_conn_status_count", "", ["vertex", "status"])
REDIS_CONN_ERROR_COUNT = Counter("numaprom_redis_conn_error_count", "", ["vertex"])
INFERENCE_COUNT = Counter(
"numaprom_inference_count", "", ["model", "namespace", "app", "metric", "status"]
)


def increase_redis_conn_status(vertex: str, status: str) -> None:
global REDIS_CONN_STATUS_COUNT
REDIS_CONN_STATUS_COUNT.labels(vertex, status).inc()


inc_redis_conn_success = partial(increase_redis_conn_status, status="success")
inc_redis_conn_failed = partial(increase_redis_conn_status, status="failed")
def increase_redis_conn_error(vertex: str, status: str) -> None:
global REDIS_CONN_ERROR_COUNT
REDIS_CONN_ERROR_COUNT.labels(vertex).inc()


def inc_inference_count(model: str, namespace: str, app: str, metric: str, status: str) -> None:
Expand Down
5 changes: 2 additions & 3 deletions numaprom/udf/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from numaprom.clients.sentinel import get_redis_client_from_conf
from numaprom.entities import PayloadFactory
from numaprom.entities import Status, StreamPayload, Header
from numaprom.metrics import increase_redis_conn_error, inc_inference_count
from numaprom.tools import msg_forward
from numaprom.metrics import inc_redis_conn_success, inc_inference_count, inc_redis_conn_failed
from numaprom.watcher import ConfigManager


Expand Down Expand Up @@ -89,10 +89,9 @@ def inference(_: list[str], datum: Datum) -> bytes:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
inc_redis_conn_failed(_VERTEX)
increase_redis_conn_error(_VERTEX)
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)

inc_redis_conn_success(_VERTEX)
if not artifact_data:
LOGGER.info(
"{uuid} - Inference artifact not found, "
Expand Down
7 changes: 3 additions & 4 deletions numaprom/udf/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
from numaprom import LOGGER, UnifiedConf
from numaprom.clients.sentinel import get_redis_client_from_conf
from numaprom.entities import Status, PrometheusPayload, StreamPayload, Header
from numaprom.metrics import increase_redis_conn_error
from numaprom.tools import msgs_forward, WindowScorer
from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed

from numaprom.watcher import ConfigManager


Expand Down Expand Up @@ -152,12 +153,10 @@ def _publish(final_score: float, payload: StreamPayload) -> list[bytes]:
uuid=payload.uuid,
warn=warn,
)
inc_redis_conn_failed(_VERTEX)
increase_redis_conn_error(_VERTEX)
unified_anomaly, anomalies = __save_to_redis(
payload=payload, final_score=final_score, recreate=True, unified_config=unified_config
)
else:
inc_redis_conn_success(_VERTEX)

# If the unified anomaly is -1, we don't want to publish it
if unified_anomaly >= 0:
Expand Down
5 changes: 2 additions & 3 deletions numaprom/udf/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from numaprom.clients.sentinel import get_redis_client
from numaprom.entities import Status, StreamPayload, Header
from numaprom.tools import msg_forward
from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed
from numaprom.metrics import increase_redis_conn_error
from numaprom.watcher import ConfigManager

_VERTEX: Final[str] = "preprocess"
Expand Down Expand Up @@ -58,7 +58,7 @@ def preprocess(_: list[str], datum: Datum) -> bytes:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
inc_redis_conn_failed(_VERTEX)
increase_redis_conn_error(_VERTEX)
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)
except Exception as ex:
LOGGER.exception(
Expand All @@ -72,7 +72,6 @@ def preprocess(_: list[str], datum: Datum) -> bytes:
payload.set_status(Status.RUNTIME_ERROR)
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)

inc_redis_conn_success(_VERTEX)
if not preproc_artifact:
LOGGER.info(
"{uuid} - Preprocess artifact not found, forwarding for static thresholding. "
Expand Down
6 changes: 2 additions & 4 deletions numaprom/udf/threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from numaprom._constants import TRAIN_VTX_KEY, POSTPROC_VTX_KEY
from numaprom.clients.sentinel import get_redis_client_from_conf
from numaprom.entities import Status, TrainerPayload, PayloadFactory, Header
from numaprom.metrics import increase_redis_conn_error
from numaprom.tools import conditional_forward, calculate_static_thresh
from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed
from numaprom.watcher import ConfigManager


Expand Down Expand Up @@ -84,7 +84,7 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
inc_redis_conn_failed(_VERTEX)
increase_redis_conn_error(_VERTEX)
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)
except Exception as ex:
LOGGER.exception(
Expand All @@ -100,8 +100,6 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]:
(TRAIN_VTX_KEY, orjson.dumps(train_payload)),
(POSTPROC_VTX_KEY, _get_static_thresh_payload(payload, metric_config)),
]
else:
inc_redis_conn_success(_VERTEX)

if not thresh_artifact:
LOGGER.info(
Expand Down
6 changes: 2 additions & 4 deletions numaprom/udf/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from numaprom import LOGGER
from numaprom.clients.sentinel import get_redis_client_from_conf
from numaprom.entities import StreamPayload, Status, Header
from numaprom.metrics import increase_redis_conn_error
from numaprom.tools import msg_forward, create_composite_keys
from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed
from numaprom.watcher import ConfigManager

_VERTEX: Final[str] = "window"
Expand Down Expand Up @@ -91,9 +91,7 @@ def window(_: list[str], datum: Datum) -> bytes | None:
elements = __aggregate_window(
unique_key, msg["timestamp"], value, win_size, buff_size, recreate=True
)
inc_redis_conn_failed(_VERTEX)
else:
inc_redis_conn_success(_VERTEX)
increase_redis_conn_error(_VERTEX)

# Drop message if no of elements is less than sequence length needed
if len(elements) < win_size:
Expand Down

0 comments on commit 38629a8

Please sign in to comment.