diff --git a/docs/quick-start.md b/docs/quick-start.md index 4a7725e9..ceb0ab42 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -34,51 +34,51 @@ from numalogic.models.threshold import StdDevThreshold from numalogic.postprocess import TanhNorm from numalogic.tools.data import StreamingDataset -if __name__ == "__main__": - X_train = np.array([1, 3, 5, 2, 5, 1, 4, 5, 1, 4, 5, 8, 9, 1, 2, 4, 5, 1, 3]).reshape(-1, 1) - X_test = np.array([-20, 3, 5, 60, 5, 10, 4, 5, 200]).reshape(-1, 1) - - # Preprocess step - clf = StandardScaler() - train_data = clf.fit_transform(X_train) - test_data = clf.transform(X_test) - print(train_data) - print(test_data) - - # Set a sequence length. - SEQ_LEN = 8 - - # Define the model. We are using a simple fully connected autoencoder here. - model = VanillaAE(seq_len=SEQ_LEN, n_features=1) - - # Create a torch dataset - train_dataset = StreamingDataset(train_data, seq_len=SEQ_LEN) - - # Define the trainer, and fit the model. - trainer = AutoencoderTrainer(max_epochs=30, enable_progress_bar=True) - trainer.fit(model, train_dataloaders=DataLoader(train_dataset)) - - # Get the training reconstruction error from the model. - train_reconerr = trainer.predict(model, dataloaders=DataLoader(train_dataset, batch_size=2)) - print(train_reconerr) - - # Define threshold estimator, and find a threshold on the training reconstruction error. - thresh_clf = StdDevThreshold() - thresh_clf.fit(train_reconerr.numpy()) - - # Now it is time for inference on the test data. - # First, let's get the reconstruction error on the test set. - test_dataset = StreamingDataset(test_data, seq_len=SEQ_LEN) - test_recon_err = trainer.predict(model, dataloaders=DataLoader(test_dataset, batch_size=2)) - print(test_recon_err) - - # The trained threshold estimator can give us the anomaly score - anomaly_score = thresh_clf.score(test_recon_err.numpy()) - - # Optionally, we can normalize scores to range between 0-10 to make it more readable - postproc_clf = TanhNorm() - anomaly_score_norm = postproc_clf.fit_transform(anomaly_score) - print("Anomaly Scores:\n", str(anomaly_score_norm)) +# Create some synthetic data +X_train = np.array([1, 3, 5, 2, 5, 1, 4, 5, 1, 4, 5, 8, 9, 1, 2, 4, 5, 1, 3]).reshape(-1, 1) +X_test = np.array([-20, 3, 5, 60, 5, 10, 4, 5, 200]).reshape(-1, 1) + +# Preprocess step +clf = StandardScaler() +train_data = clf.fit_transform(X_train) +test_data = clf.transform(X_test) +print(train_data) +print(test_data) + +# Set a sequence length. +SEQ_LEN = 8 + +# Define the model. We are using a simple fully connected autoencoder here. +model = VanillaAE(seq_len=SEQ_LEN, n_features=1) + +# Create a torch dataset +train_dataset = StreamingDataset(train_data, seq_len=SEQ_LEN) + +# Define the trainer, and fit the model. +trainer = AutoencoderTrainer(max_epochs=30, enable_progress_bar=True) +trainer.fit(model, train_dataloaders=DataLoader(train_dataset)) + +# Get the training reconstruction error from the model. +train_reconerr = trainer.predict(model, dataloaders=DataLoader(train_dataset, batch_size=2)) +print(train_reconerr) + +# Define threshold estimator, and find a threshold on the training reconstruction error. +thresh_clf = StdDevThreshold() +thresh_clf.fit(train_reconerr.numpy()) + +# Now it is time for inference on the test data. +# First, let's get the reconstruction error on the test set. +test_dataset = StreamingDataset(test_data, seq_len=SEQ_LEN) +test_recon_err = trainer.predict(model, dataloaders=DataLoader(test_dataset, batch_size=2)) +print(test_recon_err) + +# The trained threshold estimator can give us the anomaly score +anomaly_score = thresh_clf.score_samples(test_recon_err.numpy()) + +# Optionally, we can normalize scores to range between 0-10 to make it more readable +postproc_clf = TanhNorm() +anomaly_score_norm = postproc_clf.fit_transform(anomaly_score) +print("Anomaly Scores:\n", str(anomaly_score_norm)) ``` diff --git a/numalogic/models/threshold/_static.py b/numalogic/models/threshold/_static.py index 14b8c461..68217ca5 100644 --- a/numalogic/models/threshold/_static.py +++ b/numalogic/models/threshold/_static.py @@ -34,9 +34,9 @@ class StaticThreshold(BaseEstimator): __slots__ = ("upper_limit", "outlier_score", "inlier_score") def __init__(self, upper_limit: float, outlier_score: float = 10.0, inlier_score: float = 0.5): - self.upper_limit = upper_limit - self.outlier_score = outlier_score - self.inlier_score = inlier_score + self.upper_limit = float(upper_limit) + self.outlier_score = float(outlier_score) + self.inlier_score = float(inlier_score) assert ( self.outlier_score > self.inlier_score @@ -46,15 +46,15 @@ def fit(self, _: npt.NDArray[float]) -> Self: """Does not do anything. Only for API compatibility""" return self - def predict(self, x_test: npt.NDArray[float]) -> npt.NDArray[float]: + def predict(self, x_test: npt.NDArray[float]) -> npt.NDArray[int]: """ - Returns an array of same shape as input. + Returns an integer array of same shape as input. 1 denotes anomaly. """ - x_test = x_test.copy() - x_test[x_test < self.upper_limit] = 0.0 - x_test[x_test >= self.upper_limit] = 1.0 - return x_test + y_test = x_test.copy() + y_test[x_test < self.upper_limit] = 0 + y_test[x_test >= self.upper_limit] = 1 + return y_test def score_samples(self, x_test: npt.NDArray[float]) -> npt.NDArray[float]: """ diff --git a/numalogic/models/threshold/_std.py b/numalogic/models/threshold/_std.py index d70e5997..7c09287e 100644 --- a/numalogic/models/threshold/_std.py +++ b/numalogic/models/threshold/_std.py @@ -1,6 +1,19 @@ +# Copyright 2022 The Numaproj Authors. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + import numpy as np from numpy.typing import NDArray from sklearn.base import BaseEstimator +from typing_extensions import Self class StdDevThreshold(BaseEstimator): @@ -11,6 +24,10 @@ class StdDevThreshold(BaseEstimator): Generates anomaly score as the ratio between the input data and threshold generated. + + Args: + std_factor: scaler factor for std to be added to mean + min_threshold: clip the threshold value to be above this value """ def __init__(self, std_factor: float = 3.0, min_threshold: float = 0.1): @@ -33,7 +50,10 @@ def std(self): def threshold(self): return self._threshold - def fit(self, x_train: NDArray[float], y=None) -> "StdDevThreshold": + def fit(self, x_train: NDArray[float], y=None) -> Self: + """ + Fit the estimator on the training set. + """ self._std = np.std(x_train, axis=0) self._mean = np.mean(x_train, axis=0) self._threshold = self._mean + (self.std_factor * self._std) @@ -41,9 +61,19 @@ def fit(self, x_train: NDArray[float], y=None) -> "StdDevThreshold": return self - def predict(self, x_test: NDArray[float]) -> NDArray[float]: - anomaly_scores = x_test / self.threshold - return anomaly_scores + def predict(self, x_test: NDArray[float]) -> NDArray[int]: + """ + Returns an integer array of same shape as input. + 1 denotes outlier, 0 denotes inlier + """ + y_pred = x_test.copy() + y_pred[x_test < self._threshold] = 0 + y_pred[x_test >= self._threshold] = 1 + return y_pred - def score(self, x_test: NDArray[float]) -> NDArray[float]: - return self.predict(x_test) + def score_samples(self, x_test: NDArray[float]) -> NDArray[float]: + """ + Returns an anomaly score array with the same shape as input. + """ + y_scores = x_test / self.threshold + return y_scores diff --git a/tests/models/test_threshold.py b/tests/models/test_threshold.py index 9bc8f693..896c20ca 100644 --- a/tests/models/test_threshold.py +++ b/tests/models/test_threshold.py @@ -13,13 +13,13 @@ def setUp(self) -> None: def test_estimator_predict(self): clf = StdDevThreshold() clf.fit(self.x_train) - score = clf.predict(self.x_test) - self.assertAlmostEqual(0.93317, np.mean(score), places=2) + y = clf.predict(self.x_test) + self.assertAlmostEqual(0.4, np.mean(y), places=1) def test_estimator_score(self): clf = StdDevThreshold() clf.fit(self.x_train) - score = clf.score(self.x_test) + score = clf.score_samples(self.x_test) self.assertAlmostEqual(0.93317, np.mean(score), places=2)