diff --git a/analytics/pattern_detection_model.py b/analytics/pattern_detection_model.py index 2a556ff4f..2357d41b4 100644 --- a/analytics/pattern_detection_model.py +++ b/analytics/pattern_detection_model.py @@ -53,24 +53,12 @@ def learn(self, segments): window_size = 200 dataframe = self.data_prov.get_dataframe() - start_index, stop_index = 0, len(dataframe) - if len(segments) > 0: - min_time, max_time = segments_box(segments) - try: - start_index = dataframe[dataframe['timestamp'] >= min_time].index[0] - stop_index = dataframe[dataframe['timestamp'] > max_time].index[0] - start_index = max(start_index - window_size, 0) - stop_index = min(stop_index + window_size, len(dataframe)) - except IndexError: - pass - - dataframe = dataframe[start_index:stop_index] segments = self.data_prov.transform_anomalies(segments) + # TODO: pass only part of dataframe that has segments self.model.fit(dataframe, segments) self.__save_model() return 0 - # return last_prediction_time def predict(self, last_prediction_time): if self.model is None: diff --git a/analytics/step_detector.py b/analytics/step_detector.py index d2ce68520..3831c67b5 100644 --- a/analytics/step_detector.py +++ b/analytics/step_detector.py @@ -1,17 +1,6 @@ import numpy as np import pickle - - -def find_segments(array, threshold): - segments = [] - above_points = np.where(array > threshold, 1, 0) - ap_dif = np.diff(above_points) - cross_ups = np.where(ap_dif == 1)[0] - cross_dns = np.where(ap_dif == -1)[0] - for upi, dni in zip(cross_ups,cross_dns): - segments.append((upi, dni)) - return segments - +from scipy.signal import argrelextrema def is_intersect(target_segment, segments): for segment in segments: @@ -21,168 +10,67 @@ def is_intersect(target_segment, segments): return True return False - -def calc_intersections(segments, finded_segments): - intersections = 0 - labeled = 0 - for segment in segments: - if not segment['labeled']: - continue - - labeled += 1 - intersect = False - for finded_segment in finded_segments: - start = max(segment['start'], finded_segment[0]) - finish = min(segment['finish'], finded_segment[1]) - if start <= finish: - intersect = True - break - if intersect: - intersections += 1 - return intersections, labeled - - -def cost_function(segments, finded_segments): - intersections, labeled = calc_intersections(segments, finded_segments) - return intersections == labeled - - -def compress_segments(segments): - result = [] - for segment in segments: - if len(result) == 0 or result[len(result) - 1][1] < segment[0]: - result.append(segment) - else: - result[len(result) - 1] = (result[len(result) - 1][0], segment[1]) +def exponential_smoothing(series, alpha): + result = [series[0]] + for n in range(1, len(series)): + result.append(alpha * series[n] + (1 - alpha) * result[n-1]) return result - class StepDetector: + def __init__(self, pattern): self.pattern = pattern - self.mean = None - self.window_size = None - self.corr_max = None - self.threshold = None self.segments = [] + self.confidence = 1.5 + + def fit(self, dataframe, segments): + data = dataframe['value'] + confidences = [] + for segment in segments: + if segment['labeled']: + segment_data = data[segment['start'] : segment['finish'] + 1] + segment_min = min(segment_data) + segment_max = max(segment_data) + confidences.append(0.24 * (segment_max - segment_min)) + if len(confidences) > 0: + self.confidence = min(confidences) + else: + self.confidence = 1.5 - def fit(self, dataframe, segments, contamination=0.01): - array = dataframe['value'].as_matrix() - self.mean = array.mean() - self.segments = segments - - norm_data = (array - self.mean) - - self.__optimize(norm_data, segments, contamination) - - # print(self.threshold) - - # import matplotlib.pyplot as plt - # fig, ax = plt.subplots(figsize=[18, 16]) - # ax = fig.add_subplot(2, 1, 1) - # ax.plot(array) - # ax = fig.add_subplot(2, 1, 2, sharex=ax) - # ax.plot(corr_res) - # plt.show() - - # #print(R.size) - # # Nw = 20 - # # result = R[Nw,Nw:-1] - # # result[0] = 0 - # #ax.plot(result) - # #print(len(data)) - # #print(len(R)) - # - # print(self.window_size) - # print(self.threshold) def predict(self, dataframe): - array = dataframe['value'].as_matrix() - - norm_data = (array - self.mean) - - step_size = self.window_size // 2 - pattern = np.concatenate([[-1] * step_size, [1] * step_size]) - corr_res = np.correlate(norm_data, pattern, mode='valid') / self.window_size - corr_res = np.concatenate((np.zeros(step_size), corr_res, np.zeros(step_size))) - - corr_res /= self.corr_max - - result = self.__predict(corr_res, self.threshold) - - # import matplotlib.pyplot as plt - # fig, ax = plt.subplots(figsize=[18, 16]) - # ax = fig.add_subplot(2, 1, 1) - # ax.plot(array[:70000]) - # ax = fig.add_subplot(2, 1, 2, sharex=ax) - # ax.plot(corr_res[:70000]) - # plt.show() + data = dataframe['value'] + result = self.__predict(data) result.sort() - result = compress_segments(result) if len(self.segments) > 0: result = [segment for segment in result if not is_intersect(segment, self.segments)] return result - def __optimize(self, data, segments, contamination): - window_size = 10 - mincost = None - while window_size < 100: - # print(window_size) - cost = self.__optimize_threshold(data, window_size, segments, contamination) - if mincost is None or cost < mincost: - mincost = cost - self.window_size = window_size - window_size = int(window_size * 1.2) - self.__optimize_threshold(data, self.window_size, segments, contamination) - - def __optimize_threshold(self, data, window_size, segments, contamination): - step_size = window_size // 2 - pattern = np.concatenate([[-1] * step_size, [1] * step_size]) - corr_res = np.correlate(data, pattern, mode='same') / window_size - corr_res = np.concatenate((np.zeros(step_size), corr_res, np.zeros(step_size))) - self.corr_max = corr_res.max() - corr_res /= self.corr_max - N = 20 - lower = 0. - upper = 1. - cost = 0 - for i in range(0, N): - self.threshold = 0.5 * (lower + upper) - result = self.__predict(corr_res, self.threshold) - - if len(segments) > 0: - intersections, labeled = calc_intersections(segments, result) - good = intersections == labeled - cost = len(result) - else: - total_sum = 0 - for segment in result: - total_sum += (segment[1] - segment[0]) - good = total_sum > len(data) * contamination - cost = -self.threshold + def __predict(self, data): + all_normal_flatten_data = data.rolling(window=10).mean() + all_max_flatten_data = data.rolling(window=24).mean() + all_mins = argrelextrema(np.array(all_max_flatten_data), np.less)[0] + extrema_list = [] - if good: - lower = self.threshold - else: - upper = self.threshold + for i in exponential_smoothing(data - self.confidence, 0.03): + extrema_list.append(i) - return cost + segments = [] + for i in all_mins: + if all_max_flatten_data[i] < extrema_list[i]: + segments.append(i - 20) - def __predict(self, data, threshold): - segments = find_segments(data, threshold) - segments += find_segments(data * -1, threshold) - #segments -= 1 - return [(x - 1, y - 1) for (x, y) in segments] + return [(x - 1, x + 1) for x in segments] def save(self, model_filename): with open(model_filename, 'wb') as file: - pickle.dump((self.mean, self.window_size, self.corr_max, self.threshold), file) + pickle.dump((self.confidence), file) def load(self, model_filename): try: with open(model_filename, 'rb') as file: - self.mean, self.window_size, self.corr_max, self.threshold = pickle.load(file) + self.confidence = pickle.load(file) except: pass diff --git a/analytics/worker.py b/analytics/worker.py index 134e64822..76e805af8 100644 --- a/analytics/worker.py +++ b/analytics/worker.py @@ -72,7 +72,17 @@ def do_learn(self, anomaly_id, segments, pattern): model = self.get_model(anomaly_id, pattern) model.synchronize_data() last_prediction_time = model.learn(segments) - result = self.do_predict(anomaly_id, last_prediction_time, pattern) + # TODO: we should not do predict before labeling in all models, not just in drops + if pattern == 'drops' and len(segments) == 0: + result = { + 'status': 'success', + 'anomaly_id': anomaly_id, + 'segments': [], + 'last_prediction_time': last_prediction_time + } + else: + result = self.do_predict(anomaly_id, last_prediction_time, pattern) + result['task'] = 'learn' return result