From fbf683e33da5ed0bc7bc361fa64e0a075a080a00 Mon Sep 17 00:00:00 2001 From: gchaitan <90992305+gskiiit@users.noreply.github.com> Date: Wed, 9 Mar 2022 11:18:32 -0800 Subject: [PATCH 1/2] Fix Olympic scoring array aggregator --- .../query/anomaly/BaseAnomalyNode.java | 9 ++- .../OlympicScoringBaseline.java | 9 ++- .../TestOlympicScoringBaseline.java | 62 +++++++++++++++++++ 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/net/opentsdb/query/anomaly/BaseAnomalyNode.java b/core/src/main/java/net/opentsdb/query/anomaly/BaseAnomalyNode.java index 922410bc4..054989321 100644 --- a/core/src/main/java/net/opentsdb/query/anomaly/BaseAnomalyNode.java +++ b/core/src/main/java/net/opentsdb/query/anomaly/BaseAnomalyNode.java @@ -91,7 +91,7 @@ public class BaseAnomalyNode extends AbstractQueryNode { protected volatile QueryResult[] predictions; protected volatile QueryResult current; protected volatile NumericArrayAggregatorConfig aggregatorConfig; - protected volatile NumericArrayAggregatorFactory aggregatorFactory; + private volatile NumericArrayAggregatorFactory aggregatorFactory; protected TrainingQuery training_query; protected volatile QueryResult training_data; @@ -140,7 +140,7 @@ public Deferred initialize(final Span span) { LOG.error("No auto intervals for the downsampler."); } ds_interval = DownsampleFactory.getAutoInterval(query_time_span, - ((DownsampleFactory) factory).intervals(), null); + ((DownsampleFactory) dsf).intervals(), null); } else { ds_interval = ds.getInterval(); } @@ -657,7 +657,6 @@ void evaluate(final TimeSeries cur, if (eval.alerts() != null && !eval.alerts().isEmpty()) { pred_ts.addAlerts(eval.alerts()); } - result.addPredictionsAndThresholds(pred_ts, predictions); if (config.getSerializeDeltas()) { @@ -916,4 +915,8 @@ public Deferred predictAndSet(final QueryResult result, return Deferred.fromError(new UnsupportedOperationException( "This method must be implemented.")); } + + public NumericArrayAggregatorFactory getAggregatorFactory() { + return aggregatorFactory; + } } diff --git a/implementation/egads/src/main/java/net/opentsdb/query/anomaly/egads/olympicscoring/OlympicScoringBaseline.java b/implementation/egads/src/main/java/net/opentsdb/query/anomaly/egads/olympicscoring/OlympicScoringBaseline.java index 98804abac..558a2097f 100644 --- a/implementation/egads/src/main/java/net/opentsdb/query/anomaly/egads/olympicscoring/OlympicScoringBaseline.java +++ b/implementation/egads/src/main/java/net/opentsdb/query/anomaly/egads/olympicscoring/OlympicScoringBaseline.java @@ -19,6 +19,8 @@ import java.util.Optional; import java.util.Properties; +import net.opentsdb.data.types.numeric.aggregators.BaseArrayAggregatorConfig; +import net.opentsdb.data.types.numeric.aggregators.DefaultArrayAggregatorConfig; import net.opentsdb.data.types.numeric.aggregators.NumericArrayAggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +118,6 @@ TimeSeries predict(final Properties properties, final long prediction_start) { final com.yahoo.egads.data.TimeSeries prediction = new com.yahoo.egads.data.TimeSeries(); //final double[] results = new double[(int) node.predictionIntervals()]; - NumericArrayAggregator agg = node.newAggregator(); // fill the prediction with nans at the proper timestamps long ts = prediction_start; @@ -142,6 +143,12 @@ TimeSeries predict(final Properties properties, final long prediction_start) { // trained, now populate the query result final Iterator it = prediction.data.iterator(); + BaseArrayAggregatorConfig aggConfig = DefaultArrayAggregatorConfig.newBuilder() + .setArraySize(prediction.data.size()) + .build(); + + NumericArrayAggregator agg = + (NumericArrayAggregator) node.getAggregatorFactory().newAggregator(aggConfig); int i = 0; ts = prediction_start; while (it.hasNext()) { diff --git a/implementation/egads/src/test/java/net/opentsdb/query/anomaly/egads/olympicscoring/TestOlympicScoringBaseline.java b/implementation/egads/src/test/java/net/opentsdb/query/anomaly/egads/olympicscoring/TestOlympicScoringBaseline.java index 5dd40f397..372533cec 100644 --- a/implementation/egads/src/test/java/net/opentsdb/query/anomaly/egads/olympicscoring/TestOlympicScoringBaseline.java +++ b/implementation/egads/src/test/java/net/opentsdb/query/anomaly/egads/olympicscoring/TestOlympicScoringBaseline.java @@ -64,6 +64,8 @@ public void before() throws Exception { .setArraySize(60) .build() )); + + when(node.getAggregatorFactory()).thenReturn(factory); } @Test @@ -344,6 +346,66 @@ public void predict() throws Exception { assertEquals(0, id.tags().size()); } + @Test + public void predictWithRightIndex() throws Exception { + when(node.predictionIntervals()).thenReturn(60L); + when(node.predictionInterval()).thenReturn(60L); + + TimeSpecification time_spec = mock(TimeSpecification.class); + when(time_spec.start()).thenReturn(new SecondTimeStamp(BASE_TIME)); + when(time_spec.end()).thenReturn(new SecondTimeStamp(BASE_TIME + (3600 * 3))); + when(time_spec.interval()).thenReturn(Duration.ofSeconds(60)); + when(result.timeSpecification()).thenReturn(time_spec); + + TimeSeries source = new NumericArrayTimeSeries(ID, time_spec.start()); + long ts = BASE_TIME; + for (int x = 0; x < 3; x++) { + for (int i = 0; i < 60; i++) { + double value = Math.sin((ts % 3600) / 100) + x; + ((NumericArrayTimeSeries) source).add(value); + ts += 60; + } + } + + OlympicScoringBaseline baseline = new OlympicScoringBaseline(node, ID); + baseline.append(source, result); + assertEquals(BASE_TIME, baseline.baseline.startTime()); + assertEquals(BASE_TIME + (3600 * 3) - 60, baseline.baseline.lastTime()); + assertEquals(180, baseline.baseline.size()); + + Properties properties = new Properties(); + properties.setProperty("TS_MODEL", "OlympicModel2"); + properties.setProperty("INTERVAL", "1"); + properties.setProperty("INTERVAL_UNITS", "MINUTES"); + properties.setProperty("WINDOW_SIZE", "1"); + properties.setProperty("WINDOW_SIZE_UNITS", "HOURS"); + properties.setProperty("WINDOW_DISTANCE", "1"); + properties.setProperty("WINDOW_DISTANCE_UNITS", "HOURS"); + properties.setProperty("HISTORICAL_WINDOWS", "3"); + properties.setProperty("WINDOW_AGGREGATOR", "AVG"); + properties.setProperty("MODEL_START", Long.toString(BASE_TIME + (3600 * 3))); + properties.setProperty("ENABLE_WEIGHTING", "TRUE"); + properties.setProperty("AGGREGATOR", "AVG"); + properties.setProperty("NUM_TO_DROP_LOWEST", "0"); + properties.setProperty("NUM_TO_DROP_HIGHEST","0"); + properties.setProperty("PERIOD", "3600"); + + TimeSeries result = baseline.predict(properties, BASE_TIME + (3600 * 3)); + + TypedTimeSeriesIterator iterator = result.iterator(NumericArrayType.TYPE).get(); + assertTrue(iterator.hasNext()); + TimeSeriesValue v = (TimeSeriesValue) iterator.next(); + assertEquals(60, v.value().end()); + ts = BASE_TIME + (3600 * 3); + for (int i = v.value().offset(); i < v.value().end(); i++) { + assertTrue(Double.isFinite(v.value().doubleArray()[i])); + } + + TimeSeriesStringId id = (TimeSeriesStringId) result.id(); + assertEquals(((TimeSeriesStringId) ID).metric(), id.metric()); + assertEquals(0, id.tags().size()); + } + @Test public void predictNoBaseline() throws Exception { when(node.predictionIntervals()).thenReturn(60L); From abe0dc4e6c8ca59dff8924bdd20d9cb40da3a6c6 Mon Sep 17 00:00:00 2001 From: gchaitan <90992305+gskiiit@users.noreply.github.com> Date: Wed, 9 Mar 2022 13:25:53 -0800 Subject: [PATCH 2/2] Fix Anomaly prediction aggregator issue --- .../aggregators/ArrayAggregatorUtils.java | 87 ++++++++++++++++++- .../anomaly/AnomalyPredictionTimeSeries.java | 2 +- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/net/opentsdb/data/types/numeric/aggregators/ArrayAggregatorUtils.java b/core/src/main/java/net/opentsdb/data/types/numeric/aggregators/ArrayAggregatorUtils.java index c768a6711..d3c711cd2 100644 --- a/core/src/main/java/net/opentsdb/data/types/numeric/aggregators/ArrayAggregatorUtils.java +++ b/core/src/main/java/net/opentsdb/data/types/numeric/aggregators/ArrayAggregatorUtils.java @@ -21,6 +21,7 @@ import net.opentsdb.data.TypedTimeSeriesIterator; import net.opentsdb.data.types.numeric.NumericArrayType; import net.opentsdb.data.types.numeric.NumericType; +import net.opentsdb.query.anomaly.BaseAnomalyNode; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAmount; @@ -70,6 +71,7 @@ public enum AccumulateState { * aggregator. Check that before using this method otherwise the aggregator * will have garbage. * + * @param node * @param agg The non-null aggregator to write into. * @param aggStart The non-null starting timestamp of the aggregator array. * @param aggEnd The non-null ending timestamp of the aggregator array, @@ -79,7 +81,7 @@ public enum AccumulateState { * @return A non-null state based on the aggregation. */ public static AccumulateState accumulateInAggregatorArray( - final NumericArrayAggregator agg, + BaseAnomalyNode node, final NumericArrayAggregator agg, final TimeStamp aggStart, final TimeStamp aggEnd, final TemporalAmount interval, @@ -121,17 +123,31 @@ public static AccumulateState accumulateInAggregatorArray( int wrote = 0; if (value.value().isInteger()) { long[] array = value.value().longArray(); + + BaseArrayAggregatorConfig aggConfig = DefaultArrayAggregatorConfig.newBuilder() + .setArraySize(aggIndex + array.length) + .build(); + + NumericArrayAggregator aggLocal = + (NumericArrayAggregator) node.getAggregatorFactory().newAggregator(aggConfig); + while (arrayIndex < value.value().end() && currentTs.compare(TimeStamp.Op.LT, aggEnd)) { - agg.accumulate(array[arrayIndex++], aggIndex++); + aggLocal.accumulate(array[arrayIndex++], aggIndex++); currentTs.add(interval); ++wrote; } } else { double[] array = value.value().doubleArray(); + BaseArrayAggregatorConfig aggConfig = DefaultArrayAggregatorConfig.newBuilder() + .setArraySize(aggIndex + array.length) + .build(); + + NumericArrayAggregator aggLocal = + (NumericArrayAggregator) node.getAggregatorFactory().newAggregator(aggConfig); while (arrayIndex < value.value().end() && currentTs.compare(TimeStamp.Op.LT, aggEnd)) { - agg.accumulate(array[arrayIndex++], aggIndex++); + aggLocal.accumulate(array[arrayIndex++], aggIndex++); currentTs.add(interval); ++wrote; } @@ -139,6 +155,71 @@ public static AccumulateState accumulateInAggregatorArray( return wrote > 0 ? AccumulateState.SUCCESS : AccumulateState.OUT_OF_BOUNDS; } + + public static AccumulateState accumulateInAggregatorArray( + final NumericArrayAggregator agg, + final TimeStamp aggStart, + final TimeStamp aggEnd, + final TemporalAmount interval, + final TimeSeries timeseries) { + + final Optional> op = + timeseries.iterator(NumericArrayAggregator.TYPE); + if (!op.isPresent()) { + return AccumulateState.NOT_PRESENT; + } + final TypedTimeSeriesIterator iterator = op.get(); + if (!iterator.hasNext()) { + return AccumulateState.NO_VALUE; + } + + final TimeSeriesValue value = + (TimeSeriesValue) iterator.next(); + if (value.timestamp().compare(TimeStamp.Op.GTE, aggEnd)) { + return AccumulateState.OUT_OF_BOUNDS; + } + + // TODO - only handling seconds for now. need ms and nanos someday. + TimeStamp currentTs = value.timestamp().getCopy(); + int aggIndex = (int) ((value.timestamp().epoch() - aggStart.epoch()) / + interval.get(ChronoUnit.SECONDS)); + int arrayIndex = value.value().offset(); + // make sure we move to the start of the agg index if we have data before + // the interval. + while (aggIndex < 0 && arrayIndex < value.value().end() && + currentTs.compare(TimeStamp.Op.LT, aggEnd)) { + currentTs.add(interval); + ++aggIndex; + ++arrayIndex; + } + if (aggIndex < 0) { + return AccumulateState.OUT_OF_BOUNDS; + } + + int wrote = 0; + if (value.value().isInteger()) { + long[] array = value.value().longArray(); + + + while (arrayIndex < value.value().end() && + currentTs.compare(TimeStamp.Op.LT, aggEnd)) { + agg.accumulate(array[arrayIndex++], aggIndex++); + currentTs.add(interval); + ++wrote; + } + } else { + double[] array = value.value().doubleArray(); + + while (arrayIndex < value.value().end() && + currentTs.compare(TimeStamp.Op.LT, aggEnd)) { + agg.accumulate(array[arrayIndex++], aggIndex++); + currentTs.add(interval); + ++wrote; + } + } + + return wrote > 0 ? AccumulateState.SUCCESS : AccumulateState.OUT_OF_BOUNDS; + } /** * Writes the given time series into the proper location of the aggregation diff --git a/core/src/main/java/net/opentsdb/query/anomaly/AnomalyPredictionTimeSeries.java b/core/src/main/java/net/opentsdb/query/anomaly/AnomalyPredictionTimeSeries.java index 7f167891a..866e5e6f6 100644 --- a/core/src/main/java/net/opentsdb/query/anomaly/AnomalyPredictionTimeSeries.java +++ b/core/src/main/java/net/opentsdb/query/anomaly/AnomalyPredictionTimeSeries.java @@ -104,7 +104,7 @@ public AnomalyPredictionTimeSeries(final TimeSeries[] sources, for (int i = 0; i < sources.length; i++) { final ArrayAggregatorUtils.AccumulateState state = - ArrayAggregatorUtils.accumulateInAggregatorArray(aggregator, + ArrayAggregatorUtils.accumulateInAggregatorArray(node, aggregator, result.timeSpecification().start(), result.timeSpecification().end(), result.timeSpecification().interval(),