-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathevaluation.py
462 lines (385 loc) · 22.9 KB
/
evaluation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
###############################################
############ Evaluation Metrics ###############
###############################################
from typing import Dict, List, Tuple, Any
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pulp import LpProblem, LpMinimize, LpMaximize, LpVariable, LpBinary, lpSum, PULP_CBC_CMD
from cdrift.utils.helpers import calcAvgDuration, convertToTimedelta
def getTP_FP(detected:List[int], known:List[int], lag:int, count_duplicate_detections:bool = True)-> Tuple[int,int]:
"""Returns the number of true and false positives, using assign_changepoints to calculate the assignments of detected change point to actual change point.
Args:
detected (List[int]): List of indices of detected change point locations.
known (List[int]): The ground truth; List of indices of actual change points.
lag (int): The maximal distance a detected change point can have to an actual change point, whilst still counting as a true positive.
count_duplicate_detections (bool, optional): If a detected change point is not assigned to a ground truth change point, but lies within the lag window of some ground truth change point, should it be counted as a false positive (True if yes, False if no). Defaults to True.
Returns:
Tuple[int,int]: Tuple of: (true positives, false positives)
Examples:
>>> getTP_FP([1000,1001,2000], [1000,2000], 200, True)
>>> (2,1)
>>> getTP_FP([1000,1001,2000], [1000,2000], 200, False)
>>> (2,0)
"""
assignments = assign_changepoints(detected, known, lag_window=lag)
TP = len(assignments) # Every assignment is a True Positive, and every detected point is assigned at most once
if count_duplicate_detections:
FP = len(detected) - TP
else:
true_positive_candidates = [d for d in detected if any((k-lag <= d and d <= k+lag) for k in known)] # Detections that are in range of a
FP = len(detected) - len(true_positive_candidates)
return (TP,FP)
def calcPrecisionRecall(detected:List[int], known:List[int], lag:int, zero_division=np.NaN, count_duplicate_detections:bool = True)->Tuple[float, float]:
"""Calculates the precision and recall, using `get_TP_FP` for True positives and False Negatives, which uses assign_changepoints to calculate the assignments of detected change point to actual change point.
Args:
detected (List[int]): A list of indices of detected change point locations.
known (List[int]): The ground truth; List of indices of actual change points.
lag (int): The maximal distance a detected change point can have to an actual change point, whilst still counting as a true positive.
zero_division (Any, optional): The value to yield for precision/recall when a zero-division is encountered. Defaults to np.NaN.
count_duplicate_detections (bool, optional): If a detected change point is not assigned to a ground truth change point, but lies within the lag window of some ground truth change point, should it be counted as a false positive (True if yes, False if no). Defaults to True.
Returns:
Tuple[Union[float,np.NaN], Union[float,np.NaN]]: A tuple (precision, recall).
"""
TP, FP = getTP_FP(detected, known, lag, count_duplicate_detections)
if(TP+FP > 0):
precision = TP/(TP+FP)
else:
precision = zero_division
if(len(known) > 0):
recall = TP/len(known)
else:
recall = zero_division
return (precision, recall)
def F1_Score(detected:List[int], known: List[int], lag:int, zero_division=np.NaN, verbose:bool=False, count_duplicate_detections:bool=True) -> float:
""" Calculates the F1 Score for a Changepoint Detection Result
- Considering a known changepoint at timepoint t:
- A True Positive is when a changepoint is detected within [t-`lag`, t+`lag`]
- A False Negative is when no changepoint is detected in this window around a known changepoint
- A False Positive is when there is no known changepoint in a window of `lag` around the detected one
- Note: Only one detected change point can be a TP for a given known changepoint, and vice versa. The assignment of detected change points to actual change points is done using a Linear Program (see assign_changepoints)
- From this the F1-Score is calculated as (2·precision·recall) / (precision+recall)
Args:
detected (List[int]) : A list of indices of detected change point locations.
known (List[int]): The ground truth; List of indices of actual change points.
lag (int): The maximal distance a detected change point can have to an actual change point, whilst still counting as a true positive.
zero_division (str, optional): The return value if the calculation of precision/recall/F1 divides by 0.
verbose (bool, optional): If verbose, warning messages are printed when a zero-division is encountered. Defaults to False.
count_duplicate_detections (bool, optional): If a detected change point is not assigned to a ground truth change point, but lies within the lag window of some ground truth change point, should it be counted as a false positive (True if yes, False if no). Defaults to True.
Returns:
float: The F1-Score corresponding to the given prediction.
"""
TP, FP = getTP_FP(detected, known, lag, count_duplicate_detections)
try:
precision = TP / (TP+FP)
recall = TP / len(known)
f1_score = (2*precision*recall)/(precision+recall)
return f1_score
except ZeroDivisionError:
if verbose:
print("Calculation of F1-Score resulted in division by 0.")
return zero_division
# Alias for F1_Score
f1 = F1_Score
def calcTPR_FPR(detected:List[int], known:List[int], lag:int, num_possible_negatives:int=None)->Tuple[float, float]:
"""Calculates the True-Positive-Rate and the False-Positive-Rate for a given detection.
Args:
detected (List[int]): A list of indices of detected change point locations.
known (List[int]): The ground truth; List of indices of actual change points.
lag (int): The maximal distance a detected change point can have to an actual change point, whilst still counting as a true positive.
num_possible_negatives (int, optional): The number of possible negatives. In theory, this is `len(log)-len(known)`, however this number is way too large. Defaults to None.
Returns:
Tuple[Union[float, np.NaN], Union[float,np.NaN]]: A tuple of: (True-Positive-Rate, False-Positive-Rate)
"""
TP, FP = getTP_FP(detected, known, lag)
P = len(known)
TPR = TP/P
# So many Negative points it wouldnt make sense....
FPR = FP/num_possible_negatives if num_possible_negatives is not None else np.NaN
return (TPR, FPR)
def assign_changepoints(detected_changepoints: List[int], actual_changepoints:List[int], lag_window:int=200) -> List[Tuple[int,int]]:
"""Assigns detected changepoints to actual changepoints using a LP.
With restrictions:
- Detected point must be within `lag_window` of actual point.
- Detected point can only be assigned to one actual point.
- Every actual point can have at most one detected point assigned.
This is done by first optimizing for the number of assignments, finding how many detected change points could be assigned, without minimizing the \
total lag. Then, the LP is solved again, minimizing the sum of squared lags, while keeping the number of assignments as high as possible.
Args:
detected_changepoints (List[int]): List of locations of detected changepoints.
actual_changepoints (List[int]): List of locations of actual changepoints.
lag_window (int, optional): How close must a detected change point be to an actual changepoint to be a true positive. Defaults to 200.
Examples:
>>> detected_changepoints = [1050, 934, 2100]
>>> actual_changepoints = [1000,1149,2000]
>>> assign_changepoints(detected_changepoints, actual_changepoints, lag_window=200)
>>> [(1050, 1149), (934, 1000), (2100, 2000)]
>>> # Notice how the actual changepoint 1000 gets a further detected changepoint to allow 1149 to also get a changepoint assigned
Returns:
List[Tuple[int,int]]: List of tuples of (detected_changepoint, actual_changepoint) assignments
"""
def buildProb_NoObjective(sense):
"""
Builds the optimization problem, minus the Objective function. Makes multi-objective optimization simple
"""
prob = LpProblem("Changepoint_Assignment", sense)
# Create a variable for each pair of detected and actual changepoints
vars = LpVariable.dicts("x", (detected_changepoints, actual_changepoints), 0, 1, LpBinary) # Assign detected changepoint dp to actual changepoint ap?
# Flatten vars into dict of tuples of keys
x = {
(dc, ap): vars[dc][ap] for dc in detected_changepoints for ap in actual_changepoints
}
####### Constraints #########
# Only assign at most one changepoint to each actual changepoint
for ap in actual_changepoints:
prob += (
lpSum(x[dp, ap] for dp in detected_changepoints) <= 1,
f"Only_One_Changepoint_Per_Actual_Changepoint : {ap}"
)
# Each detected changepoint is assigned to at most one actual changepoint
for dp in detected_changepoints:
prob += (
lpSum(x[dp, ap] for ap in actual_changepoints) <= 1,
f"Only_One_Actual_Changepoint_Per_Detected_Changepoint : {dp}"
)
# Distance between chosen changepoints must be within lag window
for dp in detected_changepoints:
for ap in actual_changepoints:
prob += (
x[dp, ap] * abs(dp - ap) <= lag_window,
f"Distance_Within_Lag_Window : {dp}_{ap}"
)
return prob, x
solver = PULP_CBC_CMD(msg=0)
### Multi-Objective Optimization: First maximize number of assignments to find out the best True Positive number that can be achieved
# Find the largest number of change points:
prob1, prob1_vars = buildProb_NoObjective(LpMaximize)
prob1 += (
lpSum(
# Minimize the squared distance between assigned changepoints
prob1_vars[dp, ap]
for dp in detected_changepoints for ap in actual_changepoints
),
"Maximize number of assignments"
)
prob1.solve(solver)
# Calculate number of TP
num_tp = len([
(dp, ap)
for dp in detected_changepoints for ap in actual_changepoints
if prob1_vars[dp, ap].varValue == 1
])
### Multi-Objective Optimization: Now minimize the squared distance between assigned changepoints, using this maximal number of assignments
# Use this number as the number of assignments for second optimization
prob2, prob2_vars = buildProb_NoObjective(LpMinimize)
prob2 += (
lpSum(
# Minimize the squared distance between assigned changepoints
prob2_vars[dp, ap] * pow(dp - ap,2)
for dp in detected_changepoints for ap in actual_changepoints
),
"Squared_Distances"
)
# Number of assignments is the number of true positives we found in the first optimization
prob2 += (
lpSum(
prob2_vars[dp, ap]
for dp in detected_changepoints for ap in actual_changepoints
) == num_tp,
"Maximize Number of Assignments"
)
prob2.solve(solver)
return [
(dp, ap)
for dp in detected_changepoints for ap in actual_changepoints
if prob2_vars[dp, ap].varValue == 1
]
def get_avg_lag(detected_changepoints:List[int], actual_changepoints:List[int], lag:int=200)->float:
"""Calculates the average lag between detected and actual changepoints (Caution: false positives do not affect this metric!)
Args:
detected_changepoints (List[int]): Locations of detected changepoints
actual_changepoints (List[int]): Locations of actual (known) changepoints
lag (int, optional): How close must a detected change point be to an actual changepoint to be a true positive. Defaults to 200.
Examples:
>>> detected_changepoints = [1050, 934, 2100]
>>> actual_changepoints = [1000,1149,2000]
>>> get_avg_lag(detected_changepoints, actual_changepoints, lag=200)
>>> 88.33333333333333
Returns:
float: the average distance between detected changepoints and the actual changepoint they get assigned to
"""
assignments = assign_changepoints(detected_changepoints, actual_changepoints, lag_window=lag)
avg_lag = 0
for (dc,ap) in assignments:
avg_lag += abs(dc-ap)
try:
return avg_lag/len(assignments)
except ZeroDivisionError:
return np.nan;
def getROCData(lag:int, df:pd.DataFrame, undefined_equals=0)->List[Tuple[float,float]]:
"""Returns a list of points, as tuples of Recall (TPR) and Precision (Cannot do FPR because negatives are not really defined for concept drift detection/negatives are practically the entire log (`len(log)-len(detected)`))
Args:
lag (int): The maximal distance a detected change point can have to an actual change point, whilst still counting as a true positive.
df (pd.DataFrame): The Dataframe containing the detection results of the approach
undefined_equals (int, optional): The value to assign to undefined F1-Scores. Defaults to 0.
Returns:
List[Tuple[float,float]]: A list of the mean precision and recall values for each Window Size found in the dataframe.
"""
groups = df.groupby("Window Size")
points = []
for win, win_df in groups:
for idx, row in win_df.iterrows():
precisions = []
recalls = []
prec, rec = calcPrecisionRecall(lag, row["Detected Changepoints"], row["Actual Changepoints for Log"], zero_division=undefined_equals)
precisions.append(prec)
recalls.append(rec)
# Average precision and recall over all logs for this
points.append( (np.mean(recalls), np.mean(precisions)) )
return points
def plotROC(lag, df:pd.DataFrame, undefined_equals=0)->None:
"""Plot an ROC Curve (using precision and recall) for the given dataframe and a given lag value for precision and recall evaluation
Args:
lag (int): The maximal distance a detected change point can have to an actual change point, whilst still counting as a true positive.
df (pd.DataFrame): The Dataframe containing the detection results of the approach
undefined_equals (int, optional): The value to assign to undefined F1-Scores. Defaults to 0.
"""
dat = getROCData(lag,df,undefined_equals)
recalls, precisions = list(zip(*dat))
plt.plot(precisions, recalls) # x is precisions, y is recalls
plt.ylim(-0.01,1.01)
plt.xlim(-0.01,1.01)
plt.xlabel("Precision")
plt.ylabel("Recall")
plt.show()
def scatter_f1_duration(dfs:List[pd.DataFrame]):
"""Make a Scatterplot of the F1 Score versus the Duration of the approaches with the pareto front. Very specific method to be used on the results from running `testAll_MP.py`.
Args:
dfs (List[pd.DataFrame]): A list of DataFrames of evaluation results. Algorithms such as Bose, containing two different "algorithms" (J Measure and Window Count) should be split into two DataFrames.
Returns:
plt.figure: The figure object containing the plot.
"""
# Marker + Color Mapping for algorithms (and "synonymous" algorithm names)
markers = {
"Bose J": "^", # triangle_up
"Bose WC": "^", # triangle_up
"Martjushev J": "s", # square
"Martjushev WC": "s", #square
"Martjushev ADWIN J": "s", # square
"Martjushev ADWIN WC": "s", #square
}
colors = {
"Bose J": "#add8e6", # Light Blue
"Bose WC": "#000080", # Navy Blue
"Martjushev J": "#32cd32", # Navy Green
"Martjushev WC": "#006400", # Dark Green
"Martjushev ADWIN J": "#32cd32", # Navy Green
"Martjushev ADWIN WC": "#006400", # Dark Green
"ProDrift": "#ff0000", # Red
"Maaradji Runs": "#ff0000", # Red
"Earth Mover's Distance": "#ffa500", # Orange
"Process Graphs": "#ddcd10", # Some dark, rich yellow
"Process Graph Metrics": "#ddcd10", # Some dark, rich yellow
"Zheng": "#800080", # Purple
"Zheng DBSCAN": "#800080" # Purple
# The rest is just whatever it wants to give them
}
handled_dfs = []
for df in dfs:
# Turn Duration column into minutes
df_copy = df.copy(deep=True)
df_copy["Duration"] = df_copy["Duration"].apply(lambda x: x.seconds/60)
handled_dfs.append(
df_copy.rename(columns={"Duration": "Duration (Minutes)"}, inplace=False)
)
fig = scatter_pareto_front(handled_dfs, x="Duration (Minutes)", y="F1-Score", figsize=(8,5), lower_is_better_dimensions = ["Duration (Minutes)"], colors=colors, markers=markers)
plt.ylim(-0.02, 1)
return fig
def _getNameFromDataframe(df, name_column="Algorithm/Options"):
# Return the Algorithm/Options of the first entry
if name_column in df.columns:
return df.iloc[-1][name_column]
elif "Algorithm/Options" in df:
return df.iloc[-1]["Algorithm/Options"]
else:
return df.iloc[-1]["Algorithm"]
def _column_is_time(column, df):
# Check if the column has a time data type
return pd.core.dtypes.common.is_datetime_or_timedelta_dtype(df[column])
def calculate_scatter_data(dfs: List[pd.DataFrame], dimensions:List[str]=["F1-Score","Duration"], handle_nan_as=0)->List[Tuple[str, Dict[str, Any]]]:
"""Calculate the points for a Scatterplot. Uses the mean value of each column as the corresponding value.
Args:
dfs (List[pd.DataFrame]): List of Dataframes for the Scatterplot. The "Algorithm/Options" column will be used to infer the name of the respective approach.
dimensions (List[str], optional): List of Dimensions to consider when extracting the points. Defaults to ["F1-Score","Duration"].
Returns:
List[Tuple[str, Dict[str, Any]]]: List of points for the scatter plot. Points represented as a tuple of the *name* of the algorithm and a dictionary with a value for each dimension.
"""
# Each approach correspnds to a point that is the mean of each dimension
points = []
for df in dfs:
name = _getNameFromDataframe(df)
point = {dim: df[dim].fillna(handle_nan_as, inplace=False).mean() if not _column_is_time(dim, df) else calcAvgDuration(df, dim).seconds for dim in dimensions}
points.append((name, point))
return points
def get_pareto_optimal_points(points: List[Dict[str, Any]], lower_is_better_dimensions: List[str] = [])->List[Tuple[str, Dict[str, Any]]]:
"""Get the pareto optimal points from a list of points. A pareto optimal point is a point for which no other exists that is better in every dimension.
Args:
points (List[Dict[str, Any]]): The list of points, formatted as tuples: name of the algorithm, and dictionary of a value for each dimension.
lower_is_better_dimensions (List[str], optional): A List contianing all Dimensions where a *lower* value is better. Examples: Duration, Memory Consumption. Defaults to [].
Returns:
List[Tuple[str, Dict[str, Any]]]: A list of the points which are pareto optimal.
"""
return [
(name, point)
for idx, (name, point) in enumerate(points)
# If pareto-optimal
if all( # For every other point:
any( # This point is better or equal in at least one dimension --> the point is not better in *every* dimension
other_point[dim] <= point[dim] if not dim in lower_is_better_dimensions else other_point[dim] >= point[dim]
for dim in point.keys()
)
for _,other_point in [p for i,p in enumerate(points) if i != idx] # All other points
)
]
def scatter_pareto_front(dfs:List[pd.DataFrame], x:str="Duration", y:str="F1-Score", handle_nan_as:Any=0, figsize:Tuple[int,int]=(8,5), lower_is_better_dimensions: List[str] = [], markers:Dict={}, colors:Dict={})->plt.figure:
"""Plot the approaches as a scatterplot, considering dimensions `x` and `y`. Additionally, the Pareto front is indicated in the plot. Only two dimensions are supported.
Args:
dfs (List[pd.DataFrame]): List of Dataframes for the Scatterplot. The "Algorithm/Options" column will be used to infer the name of the respective approach.
x (str, optional): The Dimension to consider for the X-Axis. Defaults to "Duration".
y (str, optional): The Dimension to consider for the Y-Axis. Defaults to "F1-Score".
handle_nan_as (Any, optional): How to handle NaN values in computing the mean. If NaN is chosen, the NaN values will be ignored. Defaults to 0.
figsize (Tuple[str,str], optional): The figsize parameter to be used for Figure Creation. Defaults to (8,5).
lower_is_better_dimensions (List[str], optional): A List contianing all Dimensions where a *lower* value is better. Examples: Duration, Memory Consumption. Defaults to [].
markers (Dict, optional): A dictionary mapping Approach names to markers to be used for Matplotlib Scatterplot. If not specified, Matplotlib will decide. Defaults to {}.
colors (Dict, optional): A dictionary mapping Approach names to markers to be used for Matplotlib Scatterplot. If not specified, Matplotlib will decide. Defaults to {}.
Returns:
plt.figure: The figure object containing the plot.
"""
# First get the pareto points
data = calculate_scatter_data(dfs, [x,y], handle_nan_as)
pareto_points = get_pareto_optimal_points(data, lower_is_better_dimensions=lower_is_better_dimensions)
fig = plt.figure(figsize=figsize)
# Plot the scatterplot
for name, point in data:
color = colors.get(name, None)
marker = markers.get(name, None)
plt.scatter(y=point[y], x=point[x], label=name, color=color, marker=marker)
# Plot the pareto front
pareto_points = sorted(pareto_points, key=lambda point: point[1][x])
plt.plot([0,pareto_points[0][1][x]], [0, pareto_points[0][1][y]], "red", linestyle=":", zorder=1)
# Plot a line between the neighboring pareto optimal points
for idx, (_, point) in enumerate(pareto_points[:-1]):
_,point2 = pareto_points[idx+1]
xs = [point[x], point2[x]]
ys = [point[y], point2[y]]
plt.plot(xs, ys, "red", linestyle=":", zorder=1)
# Plot a line from the last point horizontally off the canvas
xlim_before = plt.xlim()
ub = plt.xlim()[1]*2
plt.plot([pareto_points[-1][1][x], ub], [pareto_points[-1][1][y],pareto_points[-1][1][y]], "red", linestyle=":", zorder=1)
plt.xlim(xlim_before)
plt.xlabel(f"Mean {x}")
plt.ylabel(f"Mean {y}")
plt.grid(True)
plt.legend(bbox_to_anchor=(1,1), loc="upper left", title="Algorithm")
return fig