-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathused_metrics.py
98 lines (83 loc) · 2.64 KB
/
used_metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import numpy as np
import multiprocessing
def precision_at_k(r, k):
"""Score is precision @ k
Relevance is binary (nonzero is relevant).
Returns:
Precision @ k
Raises:
ValueError: len(r) must be >= k
"""
assert k >= 1
try:
r = r[:k]
except:
print(r)
raise ImportError('error r')
return np.mean(r)
def dcg_at_k(r, k, method=1):
"""Score is discounted cumulative gain (dcg)
Relevance is positive real values. Can use binary
as the previous methods.
Returns:
Discounted cumulative gain
"""
r = np.asfarray(r)[:k]
if r.size:
if method == 0:
return r[0] + np.sum(r[1:] / np.log2(np.arange(2, r.size + 1)))
elif method == 1:
return np.sum(r / np.log2(np.arange(2, r.size + 2)))
else:
raise ValueError('method must be 0 or 1.')
return 0.
def ndcg_at_k(r, k, maxlen, method=1):
"""Score is normalized discounted cumulative gain (ndcg)
Relevance is positive real values. Can use binary
as the previous methods.
Returns:
Normalized discounted cumulative gain
"""
tp = 1. / np.log2(np.arange(2, k + 2))
dcg_max = (tp[:min(maxlen, k)]).sum()
if not dcg_max:
return 0.
r_k = r[:k]
dcg_at_k_ = (r_k * tp).sum()
return dcg_at_k_ / dcg_max
def recall_at_k(r, k, all_pos_num):
r = r[:k]
return np.sum(r) / all_pos_num
def hit_at_k(r, k):
r = r[:k]
return min(1.,np.sum(r))
def get_r(user_pos_test, r):
r_new = np.isin(r,user_pos_test).astype(float)
return r_new
def get_performance(user_pos_test, r, Ks):
precision, recall, ndcg, hit_ratio = [], [], [], []
r = get_r(user_pos_test,r)
for K in Ks:
precision.append(precision_at_k(r, K))#P = TP/ (TP+FP)
recall.append(recall_at_k(r, K, len(user_pos_test)))#R = TP/ (TP+FN)
ndcg.append(ndcg_at_k(r, K, len(user_pos_test)))
hit_ratio.append(hit_at_k(r, K))#HR = SIGMA(TP) / SIGMA(test_set)
# print(hit_ratio)
return {'recall': np.array(recall), 'precision': np.array(precision),
'ndcg': np.array(ndcg), 'hit_ratio': np.array(hit_ratio)}
# def test_one_user(u):
# # user u's ratings for user u
# try:
# user_pos_test = test_user_list[u]
# except:
# user_pos_test = []
# r = rec_user_list[u]
# #print(len(r))
# return get_performance(user_pos_test, r, Ks)
# cores=15
# def test():
# pool = multiprocessing.Pool(cores)
# test_user = list(rec_user_list.keys())
# print('test_user number:',len(test_user))
# bat_result = pool.map(test_one_user, test_user)
# return bat_result