diff --git a/numalogic/config/factory.py b/numalogic/config/factory.py index 2ad4e483..961f33ea 100644 --- a/numalogic/config/factory.py +++ b/numalogic/config/factory.py @@ -43,7 +43,12 @@ class PreprocessFactory(_ObjectFactory): """Factory class to create preprocess instances.""" from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler - from numalogic.transforms import LogTransformer, StaticPowerTransformer, TanhScaler + from numalogic.transforms import ( + LogTransformer, + StaticPowerTransformer, + TanhScaler, + FlattenVector, + ) _CLS_MAP: ClassVar[dict] = { "StandardScaler": StandardScaler, @@ -53,6 +58,7 @@ class PreprocessFactory(_ObjectFactory): "LogTransformer": LogTransformer, "StaticPowerTransformer": StaticPowerTransformer, "TanhScaler": TanhScaler, + "FlattenVector": FlattenVector, } def get_pipeline_instance(self, objs_info: list[ModelInfo]): diff --git a/numalogic/tools/types.py b/numalogic/tools/types.py index 6b7dcf5b..b7b45ab1 100644 --- a/numalogic/tools/types.py +++ b/numalogic/tools/types.py @@ -43,6 +43,7 @@ class KeyedArtifact(NamedTuple): dkeys: KEYS artifact: artifact_t + stateful: bool = True class Singleton(type): diff --git a/numalogic/transforms/__init__.py b/numalogic/transforms/__init__.py index 41c10090..530ace8a 100644 --- a/numalogic/transforms/__init__.py +++ b/numalogic/transforms/__init__.py @@ -15,7 +15,7 @@ """ from numalogic.transforms._scaler import TanhScaler -from numalogic.transforms._stateless import LogTransformer, StaticPowerTransformer +from numalogic.transforms._stateless import LogTransformer, StaticPowerTransformer, FlattenVector from numalogic.transforms._movavg import ExpMovingAverage, expmov_avg_aggregator from numalogic.transforms._postprocess import TanhNorm, tanh_norm @@ -27,4 +27,5 @@ "expmov_avg_aggregator", "TanhNorm", "tanh_norm", + "FlattenVector", ] diff --git a/numalogic/transforms/_stateless.py b/numalogic/transforms/_stateless.py index 456bb0f3..02ec3911 100644 --- a/numalogic/transforms/_stateless.py +++ b/numalogic/transforms/_stateless.py @@ -53,3 +53,22 @@ def transform(self, X, **__): def inverse_transform(self, X) -> npt.NDArray[float]: return np.power(X, 1.0 / self.n) - self.add_factor + + +class FlattenVector(StatelessTransformer): + """A stateless transformer that flattens a vector. + + Args: + ____ + n_features: number of features + + """ + + def __init__(self, n_features: int): + self.n_features = n_features + + def transform(self, X: npt.NDArray[float], **__) -> npt.NDArray[float]: + return X.flatten().reshape(-1, 1) + + def inverse_transform(self, X: npt.NDArray[float]) -> npt.NDArray[float]: + return X.reshape(-1, self.n_features) diff --git a/numalogic/udfs/preprocess.py b/numalogic/udfs/preprocess.py index 59b2c86f..0c0c67c8 100644 --- a/numalogic/udfs/preprocess.py +++ b/numalogic/udfs/preprocess.py @@ -37,6 +37,17 @@ _LOGGER = logging.getLogger(__name__) +def _get_updated_metrics(uuid: str, metrics: list, shape: tuple) -> list[str]: + if shape[1] != len(metrics) and shape[1] == 1: + metrics = ["-".join(metrics)] + _LOGGER.debug( + "%s - Metrics used: %s", + uuid, + metrics, + ) + return metrics + + class PreprocessUDF(NumalogicUDF): """ Preprocess UDF for Numalogic. @@ -174,6 +185,12 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: payload = replace(payload, status=Status.ARTIFACT_FOUND) try: x_scaled = self.compute(model=preproc_clf, input_=payload.get_data()) + + # make metrics list matching same shape as data + payload = replace( + payload, metrics=_get_updated_metrics(payload.uuid, payload.metrics, x_scaled.shape) + ) + _update_info_metric(x_scaled, payload.metrics, _metric_label_values) payload = replace( payload, diff --git a/numalogic/udfs/trainer/_base.py b/numalogic/udfs/trainer/_base.py index cf5aa5bc..a12640d7 100644 --- a/numalogic/udfs/trainer/_base.py +++ b/numalogic/udfs/trainer/_base.py @@ -11,7 +11,6 @@ from sklearn.pipeline import make_pipeline from torch.utils.data import DataLoader -from numalogic.base import StatelessTransformer from numalogic.config import PreprocessFactory, ModelFactory, ThresholdFactory, RegistryFactory from numalogic.config._config import NumalogicConf from numalogic.models.autoencoder import TimeseriesTrainer @@ -105,7 +104,9 @@ def compute( if preproc_clf: input_ = preproc_clf.fit_transform(input_) dict_artifacts["preproc_clf"] = KeyedArtifact( - dkeys=[_conf.name for _conf in numalogic_cfg.preprocess], artifact=preproc_clf + dkeys=[_conf.name for _conf in numalogic_cfg.preprocess], + artifact=preproc_clf, + stateful=any(_conf.stateful for _conf in numalogic_cfg.preprocess), ) train_ds = StreamingDataset(input_, model.seq_len) @@ -117,13 +118,15 @@ def compute( model, dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size) ).numpy() dict_artifacts["inference"] = KeyedArtifact( - dkeys=[numalogic_cfg.model.name], artifact=model + dkeys=[numalogic_cfg.model.name], artifact=model, stateful=numalogic_cfg.model.stateful ) if threshold_clf: threshold_clf.fit(train_reconerr) dict_artifacts["threshold_clf"] = KeyedArtifact( - dkeys=[numalogic_cfg.threshold.name], artifact=threshold_clf + dkeys=[numalogic_cfg.threshold.name], + artifact=threshold_clf, + stateful=numalogic_cfg.threshold.stateful, ) return dict_artifacts @@ -242,7 +245,6 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: threshold_clf=thresh_clf, numalogic_cfg=_conf.numalogic_conf, ) - # Save artifacts self.artifacts_to_save( @@ -306,11 +308,10 @@ def artifacts_to_save( """ dict_artifacts = { - k: KeyedArtifact([payload.pipeline_id, *v.dkeys], v.artifact) + k: KeyedArtifact([payload.pipeline_id, *v.dkeys], v.artifact, v.stateful) for k, v in dict_artifacts.items() - if not isinstance(v.artifact, StatelessTransformer) + if v.stateful } - try: ver_dict = model_registry.save_multiple( skeys=skeys, diff --git a/pyproject.toml b/pyproject.toml index 89bd2416..32bd3346 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "numalogic" -version = "0.6.1" +version = "0.6.2" description = "Collection of operational Machine Learning models and tools." authors = ["Numalogic Developers"] packages = [{ include = "numalogic" }] diff --git a/tests/transforms/test_transforms.py b/tests/transforms/test_transforms.py index 70c95eaa..ffa353f6 100644 --- a/tests/transforms/test_transforms.py +++ b/tests/transforms/test_transforms.py @@ -6,8 +6,7 @@ from sklearn.pipeline import make_pipeline from numalogic.base import StatelessTransformer -from numalogic.transforms import LogTransformer, StaticPowerTransformer, TanhScaler - +from numalogic.transforms import LogTransformer, StaticPowerTransformer, TanhScaler, FlattenVector RNG = np.random.default_rng(42) @@ -77,6 +76,13 @@ def test_base_transform(self): self.assertRaises(NotImplementedError, trfr.fit_transform, x) self.assertEqual(trfr.fit(x), trfr) + def test_FlattenVector(self): + x = RNG.random((5, 2)) + clf = FlattenVector(n_features=2) + data = clf.transform(x) + self.assertEqual(data.shape[1], 1) + self.assertEqual(clf.inverse_transform(data).shape[1], 2) + if __name__ == "__main__": unittest.main() diff --git a/tests/udfs/resources/_config.yaml b/tests/udfs/resources/_config.yaml index 04f9964c..30ca63be 100644 --- a/tests/udfs/resources/_config.yaml +++ b/tests/udfs/resources/_config.yaml @@ -3,7 +3,7 @@ stream_confs: config_id: "druid-config" source: "druid" composite_keys: [ 'service-mesh', '1', '2' ] - window_size: 10 + window_size: 20 ml_pipelines: pipeline1: pipeline_id: "pipeline1" @@ -14,9 +14,13 @@ stream_confs: model: name: "VanillaAE" conf: - seq_len: 10 - n_features: 2 + seq_len: 20 + n_features: 1 preprocess: + - name: "FlattenVector" + stateful: false + conf: + n_features: 2 - name: "LogTransformer" stateful: false conf: diff --git a/tests/udfs/test_trainer.py b/tests/udfs/test_trainer.py index 7a715443..195ab042 100644 --- a/tests/udfs/test_trainer.py +++ b/tests/udfs/test_trainer.py @@ -105,30 +105,12 @@ def test_trainer_01(self): @patch.object(DruidFetcher, "fetch", Mock(return_value=mock_druid_fetch_data())) def test_trainer_02(self): - self.udf1.register_conf( - "druid-config", - StreamConf( - ml_pipelines={ - "pipeline1": MLPipelineConf( - pipeline_id="pipeline1", - numalogic_conf=NumalogicConf( - model=ModelInfo( - name="VanillaAE", conf={"seq_len": 12, "n_features": 2} - ), - preprocess=[ModelInfo(name="StandardScaler", conf={})], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ), - ) - } - ), - ) self.udf1(self.keys, self.datum) self.assertEqual( - 3, + 2, REDIS_CLIENT.exists( b"5984175597303660107::pipeline1:VanillaAE::LATEST", b"5984175597303660107::pipeline1:StdDevThreshold::LATEST", - b"5984175597303660107::pipeline1:StandardScaler::LATEST", ), ) diff --git a/tests/udfs/utility.py b/tests/udfs/utility.py index 8ffa16c6..69cb0b02 100644 --- a/tests/udfs/utility.py +++ b/tests/udfs/utility.py @@ -48,13 +48,16 @@ def store_in_redis(pl_conf, registry): registry.save_multiple( skeys=[*pl_conf.stream_confs["druid-config"].composite_keys], dict_artifacts={ - "inference": KeyedArtifact(dkeys=[_pipeline_id, "AE"], artifact=VanillaAE(10)), + "inference": KeyedArtifact( + dkeys=[_pipeline_id, "AE"], artifact=VanillaAE(10), stateful=True + ), "preproc": KeyedArtifact( dkeys=[ _pipeline_id, *[_conf.name for _conf in _ml_conf.numalogic_conf.preprocess], ], artifact=preproc_clf, + stateful=True, ), }, )