diff --git a/README.md b/README.md index 3f40754..da829d6 100644 --- a/README.md +++ b/README.md @@ -33,9 +33,10 @@ A special effort was made to make a *recsys for humans*, which stresses on the e from polara.recommender.data import RecommenderData from polara.recommender.models import SVDModel from polara.tools.movielens import get_movielens_data - +# get data and convert it into appropriate format ml_data = get_movielens_data(get_genres=False) data_model = RecommenderData(ml_data, 'userid', 'movieid', 'rating') +# build PureSVD model and evaluate it svd = SVDModel(data_model) svd.build() svd.evaluate() @@ -44,44 +45,33 @@ svd.evaluate() ## Creating new recommender models Basic models can be extended by subclassing `RecommenderModel` class and defining two required methods: `self.build()` and `self.get_recommendations()`. Here's an example of a simple item-to-item recommender model: ```python -import scipy as sp -import scipy.sparse -from scipy.sparse import csr_matrix -import numpy as np from polara.recommender.models import RecommenderModel class CooccurrenceModel(RecommenderModel): def __init__(self, *args, **kwargs): super(CooccurrenceModel, self).__init__(*args, **kwargs) - self.method = 'item-to-item' #pick some meaningful name - self.implicit = True # will convert feedback values to all ones + self.method = 'item-to-item' # pick some meaningful name def build(self): - self._recommendations = None - idx, val, shp = self.data.to_coo() - if self.implicit: - val = np.ones_like(val) - user_item_matrix = csr_matrix((val, (idx[:, 0], idx[:, 1])), - shape=shp, dtype=np.float64) - - i2i_matrix = user_item_matrix.T.dot(user_item_matrix) - #exclude "self-links" - diag_vals = i2i_matrix.diagonal() - i2i_matrix -= sp.sparse.dia_matrix((diag_vals, 0), shape=i2i_matrix.shape) + # build model - calculate item-to-item matrix + user_item_matrix = self.get_training_matrix() + # rating matrix product R^T R gives cooccurrences count + i2i_matrix = user_item_matrix.T.dot(user_item_matrix) # gives CSC format + i2i_matrix.setdiag(0) # exclude "self-links" + i2i_matrix.eliminate_zeros() # ensure only non-zero elements are stored + # store matrix for generating recommendations self._i2i_matrix = i2i_matrix def get_recommendations(self): - test_data = self.data.test_to_coo() - test_shape = self.data.get_test_shape() + # get test users information and generate top-k recommendations + test_data, test_shape = self._get_test_data() test_matrix, _ = self.get_test_matrix(test_data, test_shape) - if self.implicit: - test_matrix.data = np.ones_like(test_matrix.data) - + # calculate predicted scores i2i_scores = test_matrix.dot(self._i2i_matrix) if self.filter_seen: # prevent seen items from appearing in recommendations self.downvote_seen_items(i2i_scores, test_data) - + # generate top-k recommendations for every test user top_recs = self.get_topk_items(i2i_scores) return top_recs ``` diff --git a/polara/evaluation/evaluation_engine.py b/polara/evaluation/evaluation_engine.py index c1c2ee0..37c7e4a 100644 --- a/polara/evaluation/evaluation_engine.py +++ b/polara/evaluation/evaluation_engine.py @@ -47,11 +47,6 @@ def build_models(models): model.build() -def refresh_models(models): - for model in models: - model._recommendations = None - - def consolidate(scores, params, metrics): res = {} for i, metric in enumerate(metrics): @@ -72,45 +67,32 @@ def consolidate_folds(scores, folds, metrics, index_names = ['fold', 'top-n']): def holdout_test_pair(model1, model2, holdout_sizes=[1], metrics=['hits']): holdout_scores = [] models = [model1, model2] - check_updates(models) data1 = model1.data data2 = model2.data for i in holdout_sizes: print i, data1.holdout_size = i - data2.holdout_size = i - data1.update() data1.update() + data2.holdout_size = i + data2.update() - refresh_models(models) metric_scores = evaluate_models(models, metrics) holdout_scores.append(metric_scores) return consolidate(holdout_scores, holdout_sizes, metrics) -def check_updates(models): - data = models[0].data - if data.has_changed: #Rebuild models entirely - print 'Data has been changed. Rebuiding the models.' - build_models(models) - elif data.has_updated: #just force recommendations renewal - print 'Test data has been updated. Refreshing the models.' - refresh_models(models) - - def holdout_test(models, holdout_sizes=[1], metrics=['hits']): - #check_updates(models) #will rebuild or renew models if data was manipulated in previous experiments holdout_scores = [] data = models[0].data assert all([model.data is data for model in models[1:]]) #check that data is shared across models + build_models(models) for i in holdout_sizes: print i, data.holdout_size = i - data.update() #can be omitted but it's more safe - refresh_models(models) #test data is updated - clear old recommendations + data.update() metric_scores = evaluate_models(models, metrics) holdout_scores.append(metric_scores) @@ -121,11 +103,12 @@ def holdout_test(models, holdout_sizes=[1], metrics=['hits']): def topk_test(models, topk_list=[10], metrics=['hits']): topk_scores = [] data = models[0].data - data.update() - check_updates(models) #will rebuild or renew models if data was manipulated in previous experiments assert all([model.data is data for model in models[1:]]) #check that data is shared across models + + data.update() topk_list = list(reversed(sorted(topk_list))) #start from max topk and rollback + build_models(models) for topk in topk_list: print topk, metric_scores = evaluate_models(models, metrics, topk) diff --git a/polara/evaluation/plotting.py b/polara/evaluation/plotting.py index edd0b27..d83847e 100644 --- a/polara/evaluation/plotting.py +++ b/polara/evaluation/plotting.py @@ -1,87 +1,31 @@ import matplotlib.pyplot as plt -def show_hits(all_scores, errors=None, err_alpha = 0.2, figsize=(16, 5), ax=None): +def _plot_pair(scores, keys, titles=None, errors=None, err_alpha = 0.2, figsize=(16, 5), ax=None): if not ax: fig, ax = plt.subplots(1, 2, figsize=figsize) - all_scores['hits']['true_positive'].plot(ax=ax[0], legend=False, title='True Positive Hits') - all_scores['hits']['false_positive'].plot(ax=ax[1], legend=False, title='False Positive Hits') - - if errors: - errx = errors['hits']['false_positive'] - erry = errors['hits']['true_positive'] - for method in errL.columns: - x = all_scores['hits']['false_positive'][method] - y = all_scores['hits']['true_positive'][method] - lower_boundx = x - errx - upper_boundx = x + errx - lower_boundy = y - erry - upper_boundy = y + erry - - ax.fill_between(x, lower_boundy, upper_boundy, alpha=err_alpha, label='std err') - ax.fill_betweenx(y, lower_boundx, upper_boundx, alpha=err_alpha, label='std err') - # plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) - - -def show_hit_rates(all_scores, errors=None, err_alpha = 0.2, ROC_middle=True, figsize=(16, 5), limit=True, ax=None): - if not ax: - fig = plt.figure() - ax = fig.gca() show_legend = True else: show_legend = False - max_val = 0 - for name in all_scores['relevance'].columns.levels[1]: - scores = all_scores['relevance'].xs(name, 1, 1).sort_values('fallout') - new_val = max(scores.recall.max(), scores.fallout.max()) - if (new_val > max_val) and name != 'mostpopular': - max_val = new_val - scores.plot.line(x='fallout', y='recall', label=name, ax=ax, legend=False) - if errors: - err = errors['relevance'].xs(name, 1, 1).sort_values('fallout') - x = scores.fallout - y = scores.recall - lower_bound = y - err.recall - upper_bound = y + err.recall - ax.fill_between(x, lower_bound, upper_bound, alpha=err_alpha, label='std err') - - if show_legend: - plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) - - if limit: - ax.set_xlim(0, max_val+0.01) - ax.set_ylim(0, max_val+0.01) - ax.set_ylabel("True Positive Rate") - ax.set_xlabel("False Positive Rate") + left, right = keys + left_title, right_title = titles or keys - if ROC_middle: - ax.plot([0, max_val], [0, max_val], linestyle='--', c='grey') - - - -def show_ranking(all_scores, errors=None, err_alpha = 0.2, figsize=(16, 10), limit=False, ax=None): - if not ax: - fig, ax = plt.subplots(1, 2, figsize=figsize) - show_legend = True - else: - show_legend = False + scores[left].plot(ax=ax[0], legend=False) + scores[right].plot(ax=ax[1], legend=False) - all_scores['ranking']['nDCG'].plot(ax=ax[0], legend=False) - all_scores['ranking']['nDCL'].plot(ax=ax[1], legend=False) - if show_legend: plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) - - if errors: - errG = errors['ranking']['nDCG'] - errL = errors['ranking']['nDCL'] + + if errors is not None: + errG = errors[left] + errL = errors[right] for method in errL.columns: x = errG.index err1 = errG[method] err2 = errL[method] - y1 = all_scores['ranking']['nDCG'][method] - y2 = all_scores['ranking']['nDCL'][method] + y1 = scores[left][method] + y2 = scores[right][method] lower_bound1 = y1 - err1 upper_bound1 = y1 + err1 lower_bound2 = y2 - err2 @@ -90,73 +34,99 @@ def show_ranking(all_scores, errors=None, err_alpha = 0.2, figsize=(16, 10), lim ax[0].fill_between(x, lower_bound1, upper_bound1, alpha=err_alpha, label='std err') ax[1].fill_between(x, lower_bound2, upper_bound2, alpha=err_alpha, label='std err') - ax[0].set_ylabel('nDCG@$N$') - ax[1].set_ylabel('nDCL@$N$') - #plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) + ax[0].set_ylabel(left_title) + ax[1].set_ylabel(right_title) +def show_hits(all_scores, **kwargs): + scores = all_scores['hits'] + keys = ['true_positive', 'false_positive'] + kwargs['titles'] = ['True Positive Hits @$n$', 'False Positive Hits @$n$'] + kwargs['errors'] = kwargs['errors']['hits'] if kwargs.get('errors', False) else None + _plot_pair(scores, keys, **kwargs) -def show_ranking_positivity(all_scores, ROC_middle=True, figsize=(16, 5), limit=True, ax=None): - if not ax: - fig = plt.figure() - ax = fig.gca() - - max_val = 0 - methods = all_scores['ranking'].columns.levels[1] - - for method in methods: - scores = all_scores['ranking'].xs(method, 1, 1).sort_values('nDCL') - new_val = max(scores.nDCG.max(), scores.nDCL.max()) - if (new_val > max_val) and method != 'mostpopular': - max_val = new_val - scores.plot.line(x='nDCL', y='nDCG', label=method, ax=ax, legend=False) - if limit: - ax.set_xlim(0, max_val+0.01) - ax.set_ylim(0, max_val+0.01) - - if ROC_middle: - ax.plot([0, max_val], [0, max_val], linestyle='--', c='grey') +def show_ranking(all_scores, **kwargs): + scores = all_scores['ranking'] + keys = ['nDCG', 'nDCL'] + kwargs['titles'] = ['nDCG@$n$', 'nDCL@$n$'] + kwargs['errors'] = kwargs['errors']['ranking'] if kwargs.get('errors', False) else None + _plot_pair(scores, keys, **kwargs) - ax.set_ylabel("Positive Ranking") - ax.set_xlabel("Negative Ranking") - -def show_precision_recall(all_scores, errors=None, err_alpha = 0.2, figsize=(16, 5), limit=False, ax=None): +def _cross_plot(scores, keys, titles=None, errors=None, err_alpha = 0.2, ROC_middle=False, figsize=(8, 5), limit=None, ax=None): if not ax: fig = plt.figure(figsize=figsize) - ax = plt.gca() + ax = fig.gca() show_legend = True else: show_legend = False - - for name in all_scores['relevance'].columns.levels[1]: - plot_data = all_scores['relevance'].xs(name, 1, 1).sort_values('recall') - plot_data.plot.line(x='recall', y='precision', label=name, ax=ax , legend=False) - - - if errors: - #errx = errors['relevance'].xs(name, 1, 1).sort_values('recall').recall - erry = errors['relevance'].xs(name, 1, 1).sort_values('recall').precision - x = all_scores['relevance'].xs(name, 1, 1).sort_values('recall').recall - y = all_scores['relevance'].xs(name, 1, 1).sort_values('recall').precision - #lower_boundx = x - errx - #upper_boundx = x + errx - lower_boundy = y - erry - upper_boundy = y + erry - ax.fill_between(x, lower_boundy, upper_boundy, alpha=err_alpha, label='std err') - #ax.fill_betweenx(y, lower_boundx, upper_boundx, alpha=err_alpha, label='std err') + + methods = scores.columns.levels[1] + x, y = keys + for method in methods: + plot_data = scores.xs(method, 1, 1).sort_values(x) + plot_data.plot.line(x=x, y=y, label=method, ax=ax, legend=False) if show_legend: - plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) + plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) + + if errors is not None: + for method in methods: + plot_data = scores.xs(method, 1, 1).sort_values(x) + error = errors.xs(method, 1, 1).sort_values(x) + x_val = plot_data[x] + y_val = plot_data[y] + lower_bound = y_val - error[y] + upper_bound = y_val + error[y] + ax.fill_between(x_val, lower_bound, upper_bound, alpha=err_alpha, label='std err') - ax.set_ylabel("Precision") - ax.set_xlabel("Recall") if limit: - lim = max(all_scores['relevance']['precision'].max().max(), - all_scores['relevance']['recall'].max().max()) + 0.05 - plt.xlim((0, lim)); - plt.ylim((0, lim)); + if not isinstance(limit, (tuple, list)): + limit = (0, limit) + ax.set_xlim(*limit) + ax.set_ylim(*limit) + + titles = titles or keys + ax.set_xlabel(titles[0]) + ax.set_ylabel(titles[1]) + + if ROC_middle: + lims = ax.get_xlim() + ax.plot(lims, lims, linestyle='--', c='grey') + + +def show_hit_rates(all_scores, **kwargs): + scores = all_scores['relevance'] + keys = ['fallout', 'recall'] + kwargs['titles'] = ['False Positive Rate', 'True Positive Rate'] + kwargs['errors'] = kwargs['errors']['relevance'] if kwargs.get('errors', False) else None + kwargs['ROC_middle'] = True + kwargs['limit'] = max(scores['fallout'].max().max(), scores['recall'].max().max()) + 0.01 + _cross_plot(scores, keys, **kwargs) + + +def show_ranking_positivity(all_scores, **kwargs): + scores = all_scores['ranking'] + keys = ['nDCL', 'nDCG'] + kwargs['titles'] = ['Negative Ranking', 'Positive Ranking'] + kwargs['errors'] = kwargs['errors']['ranking'] if kwargs.get('errors', False) else None + kwargs['ROC_middle'] = True + kwargs['limit'] = max(scores['nDCL'].max().max(), scores['nDCG'].max().max()) + 0.01 + _cross_plot(scores, keys, **kwargs) + + +def show_precision_recall(all_scores, limit=False, ignore_field_limit=None, **kwargs): + scores = all_scores['relevance'] + keys = ['recall', 'precision'] + kwargs['titles'] = ['Recall', 'Precision'] + kwargs['errors'] = kwargs['errors']['relevance'] if kwargs.get('errors', False) else None + kwargs['ROC_middle'] = False + if limit: + maxx = scores['recall'].drop(ignore_field_limit, axis=1, errors='ignore').max().max() + maxy = scores['precision'].drop(ignore_field_limit, axis=1, errors='ignore').max().max() + kwargs['limit'] = max(maxx, maxy) + 0.05 + _cross_plot(scores, keys, **kwargs) def show_relevance(all_scores, figsize=(16, 10), ax=None): @@ -167,4 +137,3 @@ def show_relevance(all_scores, figsize=(16, 10), ax=None): all_scores['relevance']['recall'].plot(ax=ax[0, 1], title='Recall@$N$', legend=False) all_scores['relevance']['fallout'].plot(ax=ax[1, 0], title='Fallout@$N$', legend=False) all_scores['relevance']['miss_rate'].plot(ax=ax[1, 1], title='Miss Rate@$N$', legend=False) - # plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5)) diff --git a/polara/lib/similarity.py b/polara/lib/similarity.py new file mode 100644 index 0000000..991e3ce --- /dev/null +++ b/polara/lib/similarity.py @@ -0,0 +1,370 @@ +from __future__ import division +import math +import types +from collections import defaultdict, OrderedDict +import numpy as np +import pandas as pd +from numba import jit +import scipy as sp +import scipy.sparse +from scipy.sparse import csc_matrix, csr_matrix, coo_matrix, SparseEfficiencyWarning +import warnings + + +def _fix_empty_features(feature_mat): + if feature_mat.format != 'csc': + raise NotImplementedError + nf = feature_mat.getnnz(axis=1) + zf = (nf == 0) + if zf.any(): + # add +1 dummy feature for every zero row + # to avoid zeros on diagonal of similarity matrix + nnz = zf.sum(dtype=long) #long is required to avoid overflow with int32 dtype + nnz_ind = np.where(zf)[0] + feature_mat.indptr = np.hstack([feature_mat.indptr[:-1], feature_mat.indptr[-1]+np.arange(nnz+1)]) + feature_mat.indices = np.hstack([feature_mat.indices, nnz_ind]) + feature_mat.data = np.hstack([feature_mat.data, np.ones((nnz,), dtype=feature_mat.data.dtype)]) + feature_mat._shape = (feature_mat.shape[0], feature_mat.shape[1]+nnz) + nf[nnz_ind] = 1 + return nf + + +def set_diagonal_values(mat, val=1): + #disable warning when setting diagonal elements of sparse similarity matrix + with warnings.catch_warnings(): + warnings.simplefilter('ignore', category=SparseEfficiencyWarning) + mat.setdiag(val) + + +def safe_inverse_root(d): + if (d < 0).any(): + raise ValueError + return np.piecewise(d, [d>0, d==0], [lambda x: x**(-0.5), lambda x: x]) + + +def normalize_binary_features(feature_mat): + sqsum = feature_mat.getnnz(axis=1) + if feature_mat.format == 'csc': + ind = feature_mat.indices.copy() + ptr = feature_mat.indptr.copy() + norm_data = 1 / np.sqrt(np.take(sqsum, ind)) + normed = csc_matrix((norm_data, ind, ptr), shape=feature_mat.shape) + else: + norm_data = safe_inverse_root(sqsum.astype(np.float64)) + normed = sp.sparse.diags(norm_data).dot(feature_mat) + return normed + + +def normalize_features(feature_mat): + if feature_mat.format == 'csc': + ptr = feature_mat.indptr.copy() + ind = feature_mat.indices.copy() + sqsum = np.bincount(ind, weights=feature_mat.data**2) + # could do np.add.at(sqsum, indices, data**2), but bincount seem faster + norm_data = feature_mat.data / np.sqrt(np.take(sqsum, ind)) + normed = csc_matrix((norm_data, ind, ptr), shape=feature_mat.shape) + else: + sqsum = feature_mat.power(2).sum(axis=1).view(type=np.ndarray).squeeze() + # avoid zero division + norm_data = safe_inverse_root(sqsum.astype(np.float64)) + normed = sp.sparse.diags(norm_data).dot(feature_mat) + return normed + + +def tfidf_transform(feature_mat): + itemfreq = 1 + feature_mat.getnnz(axis=0) + N = 1 + feature_mat.shape[0] + idf = np.log(N/itemfreq) + + if feature_mat.format == 'csr': + ind = feature_mat.indices + ptr = feature_mat.indptr + data = np.take(idf, ind) + transformed = csr_matrix((data, ind, ptr), shape=feature_mat.shape) + else: + # safe multiplicationin in case data is not binary + transformed = (feature_mat!=0).dot(sp.sparse.diags(idf)) + return transformed + + +@jit(nopython=True) +def _jaccard_similarity_inplace(sdata, indcs, pntrs, nf): + # assumes data is symmetric => format doesn't matter + # it doesn't enforce 1 on the diagonal! + # so the similarity data must be prepared accordingly + ncols = len(pntrs) - 1 + for col in xrange(ncols): + lind = pntrs[col] + rind = pntrs[col+1] + nf_col = nf[col] + for j in xrange(lind, rind): + row = indcs[j] + denom = nf_col + nf[row] - sdata[j] + sdata[j] /= denom + + +def jaccard_similarity(F, fill_diagonal=True): + F = (F!=0) + nf = F.getnnz(axis=1) + + S = F.dot(F.T).astype(np.float64) # dtype needed for inplace jaccard index computation + _jaccard_similarity_inplace(S.data, S.indices, S.indptr, nf) + if fill_diagonal: + # eigenvalues computation is very sensitive to roundoff errors in + # similarity matrix; explicitly seting diagonal values is better + # than using _fix_empty_features + set_diagonal_values(S, 1) + return S + + +def cosine_similarity(F, fill_diagonal=True, assume_binary=False): + normalize = normalize_binary_features if assume_binary else normalize_features + F = normalize(F) + S = F.dot(F.T) + if fill_diagonal: + # eigenvalues computation is very sensitive to roundoff errors in + # similarity matrix; explicitly seting diagonal values is better + # than using _fix_empty_features + set_diagonal_values(S, 1) + return S + + +def cosine_tfidf_similarity(F, fill_diagonal=True): + F = tfidf_transform(F) + S = cosine_similarity(F, fill_diagonal=fill_diagonal, assume_binary=False) + return S + + +@jit(nopython=True) +def _jaccard_similarity_weighted_tri(dat, ind, ptr, shift): + z = dat[0] #np.float64 + #need to initialize lists with certain dtype + data = [z,] + cols = [z,] + rows = [z,] + + nrows = len(ptr) - 1 + for i in xrange(nrows): + lind_i = ptr[i] + rind_i = ptr[i+1] + if lind_i != rind_i: + ish = i + shift + for j in xrange(ish, nrows): + lind_j = ptr[j] + rind_j = ptr[j+1] + min_sum = 0 + max_sum = 0 + for k in xrange(lind_j, rind_j): + for s in xrange(lind_i, rind_i): + iind = ind[s] + jind = ind[k] + if iind == jind: + min_val = min(dat[s], dat[k]) + max_val = max(dat[s], dat[k]) + min_sum += min_val + max_sum += max_val + break + else: + max_sum += dat[k] + + for s in xrange(lind_i, rind_i): + for k in xrange(lind_j, rind_j): + iind = ind[s] + jind = ind[k] + if iind == jind: + break + else: + max_sum += dat[s] + + if min_sum: + wjac = min_sum/max_sum + data.append(wjac) + cols.append(i) + rows.append(j) + + return data[1:], rows[1:], cols[1:] #ignore initialization element + + +def jaccard_similarity_weighted(F, fill_diagonal=True): + assert F.format == 'csr' + if not F.has_sorted_indices: + F.sort_indices() + + ind = F.indices + ptr = F.indptr + dat = F.data.astype(np.float64, copy=False) # dtype needed for jaccard computation + + shift = 1 if fill_diagonal else 0 + data, rows, cols = _jaccard_similarity_weighted_tri(dat, ind, ptr, shift) + + S = sp.sparse.coo_matrix((data, (rows, cols)), shape=(F.shape[0],)*2).tocsc() + S += S.T # doubles diagonal values if fill_diagonal is False + + if fill_diagonal: + set_diagonal_values(S, 1) + else: + set_diagonal_values(S, np.sign(S.diagonal())) # set to 1, preserve zeros + return S + + +def jaccard_similarity_weighted_dense(F, fill_diagonal=True): + if (F.data < 0).any(): + raise ValueError + + f = F.sum(axis=1).view(type=np.ndarray).squeeze() + fplus = f + f[:, None] + FA = F.A + fminus = np.abs(FA[None, :, :] - FA[:, None, :]).sum(axis=2) + + with np.errstate(invalid='ignore'): + res = np.nan_to_num((fplus - fminus) / (fplus + fminus)) + + if fill_diagonal: + np.fill_diagonal(res, 1) + return res + + +def uniquify_ordered(seq): + # order preserving + # https://www.peterbe.com/plog/uniqifiers-benchmark + # http://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-whilst-preserving-order + seen = set() + seen_add = seen.add + return [x for x in seq if not (x in seen or seen_add(x))] + + +def feature2sparse(feature_data, ranking=None, deduplicate=True): + if deduplicate: + feature_data = feature_data.apply(uniquify_ordered if ranking else set) + + feature_lbl = defaultdict(lambda: len(feature_lbl)) + indices = [feature_lbl[item] for items in feature_data for item in items] + indptr = np.r_[0, feature_data.apply(len).cumsum().values] + + if ranking: + if ranking is True: + ranking = 'linear' + + if isinstance(ranking, str): + if ranking.lower() == 'linear': + data = [1/(n+1) for items in feature_data for n, item in enumerate(items)] + elif ranking.lower() == 'exponential': + data = [math.exp(-n) for items in feature_data for n, item in enumerate(items)] + elif ranking.lower() == 'bag-of-features': + raise NotImplementedError + else: + raise ValueError + elif isinstance(ranking, types.FunctionType): + data = [ranking(n) for items in feature_data for n, item in enumerate(items)] + else: + data = np.ones_like(indices) + + feature_mat = csr_matrix((data, indices, indptr), + shape=(feature_data.shape[0], len(feature_lbl))) + if not ranking: + feature_mat = feature_mat.tocsc() + + return feature_mat, dict(feature_lbl) + + +def get_features_data(meta_data, ranking=None, deduplicate=True): + feature_mats = OrderedDict() + feature_lbls = OrderedDict() + features = meta_data.columns + + ranking = ranking or {} + + if ranking is True: + ranking = 'linear' + + if isinstance(ranking, str): + ranking = [ranking,] * len(features) + + if not isinstance(ranking, dict): + ranking = {k:v for k, v in zip(features, ranking)} + + for feature in features: + feature_data = meta_data[feature] + mat, lbl = feature2sparse(feature_data, ranking=ranking.get(feature, None)) + feature_mats[feature], feature_lbls[feature] = mat, lbl + return feature_mats, feature_lbls + + +def _sim_func(func_type): + if func_type.lower() == 'jaccard': + return jaccard_similarity + elif func_type.lower() == 'cosine': + return cosine_similarity + elif func_type.lower() == 'tfidf-cosine': + return cosine_tfidf_similarity + elif func_type.lower() == 'jaccard-weighted': + return jaccard_similarity_weighted + else: + raise NotImplementedError + + +def one_hot_similarity(meta_data): + raise NotImplementedError + + +def get_similarity_data(meta_data, similarity_type='jaccard'): + features = meta_data.columns + + if isinstance(similarity_type, str): + similarity_type = [similarity_type,] * len(features) + + if not isinstance(similarity_type, dict): + similarity_type = {k:v for k, v in zip(features, similarity_type)} + + similarity_mats = {} + for feature in features: + feature_data = meta_data[feature] + sim_type = similarity_type[feature] + ranking = sim_type == 'jaccard-weighted' + get_similarity = _sim_func(sim_type) + + feature_mat, feature_lbl = feature2sparse(feature_data, ranking=ranking) + similarity_mats[feature] = get_similarity(feature_mat) + + return similarity_mats + + +def combine_distribute_similarity_data(meta_data, similarity_type='jaccard', weights=None): + # similarity_mats = get_similarity_data(meta_data, similarity_type) + # iprob = {f: 1+np.prod(s.shape)/s.nnz for f, s in similarity_mats.iteritems()} + # wsum = np.log(iprob.values()).sum() + # weights = {f:w/wsum for f, w in iprob.iteritems()} + raise NotImplementedError + + +def combine_similarity_data(meta_data, similarity_type='jaccard', weights=None): + features = meta_data.columns + num_feat = len(features) + + if isinstance(similarity_type, str): + similarity_type = [similarity_type,] * num_feat + + if not isinstance(similarity_type, dict): + similarity_type = {k:v for k, v in zip(features, similarity_type)} + + if weights is None: + weights = [1.0/num_feat,] * num_feat + + if not isinstance(weights, dict): + weights = {k:v for k, v in zip(features, weights)} + + similarity = csc_matrix((meta_data.shape[0],)*2) + for feature in features: + feature_data = meta_data[feature] + sim_type = similarity_type[feature] + weight = weights[feature] + ranking = sim_type == 'jaccard-weighted' + get_similarity = _sim_func(sim_type) + + feature_mat, feature_lbl = feature2sparse(feature_data, ranking=ranking) + similarity += weight * get_similarity(feature_mat) + + # remove round off errors + similarity.setdiag(1) + similarity.data[similarity.data>1] = 1 + return similarity diff --git a/polara/lib/sparse.py b/polara/lib/sparse.py new file mode 100644 index 0000000..c13aeba --- /dev/null +++ b/polara/lib/sparse.py @@ -0,0 +1,78 @@ +import numpy as np +import scipy as sp +from scipy import sparse +from numba import jit + +# matvec implementation is based on +# http://stackoverflow.com/questions/18595981/improving-performance-of-multiplication-of-scipy-sparse-matrices + +@jit(nopython=True, nogil=True) +def matvec2dense(m_ptr, m_ind, m_val, v_nnz, v_val, out): + l = len(v_nnz) + for j in xrange(l): + col_start = v_nnz[j] + col_end = col_start + 1 + ind_start = m_ptr[col_start] + ind_end = m_ptr[col_end] + if ind_start != ind_end: + out[m_ind[ind_start:ind_end]] += m_val[ind_start:ind_end] * v_val[j] + + +@jit(nopython=True, nogil=True) +def matvec2sparse(m_ptr, m_ind, m_val, v_nnz, v_val, sizes, indices, data): + l = len(sizes) - 1 + for j in xrange(l): + col_start = v_nnz[j] + col_end = col_start + 1 + ind_start = m_ptr[col_start] + ind_end = m_ptr[col_end] + data_start = sizes[j] + data_end = sizes[j+1] + if ind_start != ind_end: + indices[data_start:data_end] = m_ind[ind_start:ind_end] + data[data_start:data_end] = m_val[ind_start:ind_end] * v_val[j] + + +def csc_matvec(mat_csc, vec, dense_output=True, dtype=None): + v_nnz = vec.indices + v_val = vec.data + + m_val = mat_csc.data + m_ind = mat_csc.indices + m_ptr = mat_csc.indptr + + res_dtype = dtype or np.result_type(mat_csc.dtype, vec.dtype) + if dense_output: + res = np.zeros((mat_csc.shape[0],), dtype=res_dtype) + matvec2dense(m_ptr, m_ind, m_val, v_nnz, v_val, res) + else: + sizes = m_ptr.take(v_nnz+1) - m_ptr.take(v_nnz) + sizes = np.concatenate(([0], np.cumsum(sizes))) + n = sizes[-1] + data = np.empty((n,), dtype=res_dtype) + indices = np.empty((n,), dtype=np.intp) + indptr = np.array([0, n], dtype=np.intp) + matvec2sparse(m_ptr, m_ind, m_val, v_nnz, v_val, sizes, indices, data) + res = sp.sparse.csr_matrix((data, indices, indptr), shape=(1, mat_csc.shape[0]), dtype=res_dtype) + res.sum_duplicates() # expensive operation + return res + +jit(nopython=True) +def _blockify(ind, ptr, major_dim): + # convenient function to compute only diagonal + # elements of the product of 2 matrices; + # indices must be intp in order to avoid overflow + # major_dim is shape[0] for csc format and shape[1] for csr format + n = len(ptr) - 1 + for i in xrange(1, n): #first row/col is unchanged + lind = ptr[i] + rind = ptr[i+1] + for j in xrange(lind, rind): + shift_ind = i * major_dim + ind[j] += shift_ind + + +def inverse_permutation(p): + s = np.empty(p.size, p.dtype) + s[p] = np.arange(p.size) + return s diff --git a/polara/recommender/data.py b/polara/recommender/data.py index 220e659..25bc7b0 100644 --- a/polara/recommender/data.py +++ b/polara/recommender/data.py @@ -4,10 +4,33 @@ from collections import namedtuple +def random_choice(df, num, random_state): + return df.iloc[random_state.choice(df.shape[0], num, replace=False)] + + +def filter_by_length(data, userid='userid', min_session_length=3): + """Filters users with insufficient number of items""" + if data.duplicated().any(): + raise NotImplementedError + + sz = data[userid].value_counts(sort=False) + has_valid_session_length = sz >= min_session_length + if not has_valid_session_length.all(): + valid_users = sz.index[has_valid_session_length] + new_data = data[data[userid].isin(valid_users)].copy() + print 'Sessions are filtered by length' + else: + new_data = data + return new_data + + class RecommenderData(object): _std_fields = ('userid', 'itemid', 'feedback') + # changing one of these params requires running prepare() method: _datawise_properties = {'_shuffle_data', '_test_ratio', '_test_fold'} - _testwise_properties = {'_holdout_size', '_test_sample', '_permute_tops', '_random_holdout', '_negative_prediction'} + # changing one of these params only requires to renew testset and holdout: + _testwise_properties = {'_holdout_size', '_test_sample', '_permute_tops', + '_random_holdout', '_negative_prediction'} _config = _datawise_properties.union(_testwise_properties) # ('test_ratio', 'holdout_size', 'test_fold', 'shuffle_data', # 'test_sample', 'permute_tops', 'random_holdout', 'negative_prediction') @@ -32,10 +55,26 @@ def __init__(self, data, userid, itemid, feedback, custom_order=None): self.index = self.index._make([None]*len(self._std_fields)) self._set_defaults() - self._has_updated = False #indicated whether test data has been changed - self._has_changed = False #indicated whether full data has been changed self._change_properties = set() #container for changed properties - self.random_seed = None #use with shuffle_data and permute_tops property + self.random_state = None #use with shuffle_data, permute_tops, random_choice + + self._attached_models = {'on_change': {}, 'on_update': {}} + # on_change indicates whether full data has been changed + # on_update indicates whether only test data has been changed + + + def _get_attached_models(self, event): + return self._attached_models[event] + + def _attach_model(self, event, model, callback): + self._get_attached_models(event)[model] = callback + + def _detach_model(self, event, model): + del self._get_attached_models(event)[model] + + def _notify(self, event): + for model, callback in self._get_attached_models(event).iteritems(): + getattr(model, callback)() def _set_defaults(self, params=None): @@ -133,33 +172,14 @@ def test_fold(self, new_value): @property def test(self): - update_data, update_test = self._pending_change() - if update_data or not hasattr(self, '_test'): - self.prepare() - test_change_required = False #changes are already effective - if update_test: - self._split_eval_data() + self.update() return self._test @property def training(self): - update_data, _ = self._pending_change() - if update_data or not hasattr(self, '_training'): - self.prepare() + self.update() # both _test and _training attributes appear simultaneously return self._training - @property - def has_changed(self): - value = self._has_changed - self._has_changed = False #this is an indicator property, reset once read - return value - - @property - def has_updated(self): - value = self._has_updated - self._has_updated = False #this is an indicator property, reset once read - return value - def _lazy_data_update(self, data_property): self._change_properties.add(data_property) @@ -194,10 +214,11 @@ def _pending_change(self, data_properties=None): def prepare(self): print 'Preparing data' if self._shuffle_data: - self._data = self._data.sample(frac=1, random_state=self.random_seed) + self._data = self._data.sample(frac=1, random_state=self.random_state) elif '_shuffle_data' in self._change_properties: print 'Recovering original data state due to change in shuffle_data.' self._data = self._data.sort_index() + self._change_properties.clear() self._split_test_data() @@ -205,8 +226,7 @@ def prepare(self): self._align_test_items() self._split_eval_data() - self._has_changed = True - #TODO implement operations with this property container + self._notify('on_change') def update(self): @@ -329,61 +349,80 @@ def _align_test_items(self): new_test_idx = self.reindex(self._test, userid, sort=False, inplace=True) #update index info accordingly old_test_idx = self.index.userid.test - self.index.userid._replace(test=old_test_idx[old_test_idx['new'].isin(valid_users_idx)]) + self.index = self.index._replace(userid=self.index.userid._replace(test=old_test_idx[old_test_idx['new'].isin(valid_users_idx)])) self.index.userid.test.loc[new_test_idx['old'].values, 'new'] = new_test_idx['new'].values - def _split_eval_data(self, renew=False): - def random_choice(df, num): - # TODO add control with RandomState - return df.iloc[np.random.choice(df.shape[0], num, replace=False)] + def _split_eval_data(self): + userid, feedback = self.fields.userid, self.fields.feedback if self._change_properties: # print 'Updating test data.' - #print 'preparing new test and eval data' self._test = self._test_old - self._has_updated = True - self._change_properties.clear() - userid, itemid, feedback = self.fields - lastn = self._holdout_size - test_sample = self._test_sample + self._notify('on_update') + self._change_properties.clear() # data may have many items with top ratings and result depends on how # they are sorted. randomizing the data helps to avoid biases if self.permute_tops: - test_data = self._test.sample(frac=1, random_state=self.random_seed) + test_data = self._test.sample(frac=1, random_state=self.random_state) else: test_data = self._test + # split holdout items from the rest of the test data + holdout = self._sample_holdout(test_data) + evalidx = holdout.index.get_level_values(1) + evalset = test_data.loc[evalidx] + + # get test users whos items were hidden + testset = test_data[~test_data.index.isin(evalidx)] + # leave at most self.test_sample items for every test user + testset = self._sample_testset(testset) + + # ensure identical ordering of users in testset and holdout + testset = testset.sort_values(userid) + evalset = evalset.sort_values(userid) + + self._test_old = self._test + #TODO make it computed from index data and _data + #self._test_idx = namedtuple('TestDataIndex', 'testset evalset') + # ._make([self._test.testset.index, self._test.evalset.index]) + self._test = namedtuple('TestData', 'testset evalset')._make([testset, evalset]) + + + def _sample_holdout(self, data): + userid, feedback = self.fields.userid, self.fields.feedback order_field = self._custom_order or feedback - eval_grouper = test_data.groupby(userid, sort=False)[order_field] + grouper = data.groupby(userid, sort=False)[order_field] if self.random_holdout: #randomly sample data for evaluation - eval_data = eval_grouper.apply(random_choice, lastn) + holdout = grouper.apply(random_choice, self._holdout_size, self.random_state or np.random) elif self.negative_prediction: #try to holdout negative only examples - eval_data = eval_grouper.nsmallest(lastn) + holdout = grouper.nsmallest(self._holdout_size) else: #standard top-score prediction mode - eval_data = eval_grouper.nlargest(lastn) - - eval_idx = eval_data.index.get_level_values(1) - #ensure correct sorting of users in test and eval - order must be the same - evalset = test_data.loc[eval_idx].sort_values(userid) - testset = test_data[~test_data.index.isin(eval_idx)].sort_values(userid) - - if isinstance(test_sample, int): - if test_sample > 0: - testset = (testset.groupby(userid, sort=False, group_keys=False) - .apply(random_choice, test_sample)) - elif test_sample < 0: #leave only the most negative feedback from user - test_idx = (testset.groupby(userid, sort=False)[feedback] - .nsmallest(-test_sample).index.get_level_values(1)) - testset = testset.loc[test_idx] + holdout = grouper.nlargest(self._holdout_size) - self._test_old = self._test - #TODO make it computed from index data and _data, instead of storing in memory - self._test = namedtuple('TestData', 'testset evalset')._make([testset, evalset]) - #self._test_idx = namedtuple('TestDataIndex', 'testset evalset')._make([self._test.testset.index, self._test.evalset.index]) + return holdout + + + def _sample_testset(self, data): + test_sample = self.test_sample + if not isinstance(test_sample, int): + return data + + userid, feedback = self.fields.userid, self.fields.feedback + if test_sample > 0: + sampled = (data.groupby(userid, sort=False, group_keys=False) + .apply(random_choice, test_sample, self.random_state or np.random)) + elif test_sample < 0: #leave only the most negative feedback from user + idx = (data.groupby(userid, sort=False)[feedback] + .nsmallest(-test_sample).index.get_level_values(1)) + sampled = data.loc[idx] + else: + sampled = data + + return sampled def to_coo(self, tensor_mode=False): @@ -404,7 +443,7 @@ def to_coo(self, tensor_mode=False): val = self.training[feedback].values shp = tuple(idx.max(axis=0) + 1) - idx = idx.astype(np.int64) + idx = idx.astype(np.intp) val = np.ascontiguousarray(val) return idx, val, shp @@ -413,8 +452,8 @@ def test_to_coo(self, tensor_mode=False): userid, itemid, feedback = self.fields test_data = self.test.testset - user_idx = test_data[userid].values.astype(np.int64) - item_idx = test_data[itemid].values.astype(np.int64) + user_idx = test_data[userid].values.astype(np.intp) + item_idx = test_data[itemid].values.astype(np.intp) fdbk_val = test_data[feedback].values if tensor_mode: @@ -422,7 +461,7 @@ def test_to_coo(self, tensor_mode=False): if np.isnan(fdbk_idx).any(): raise NotImplementedError('Not all values of feedback are present in training data') else: - fdbk_idx = fdbk_idx.astype(np.int64) + fdbk_idx = fdbk_idx.astype(np.intp) test_coo = (user_idx, item_idx, fdbk_idx) else: test_coo = (user_idx, item_idx, fdbk_val) @@ -444,18 +483,96 @@ def get_test_shape(self, tensor_mode=False): return shape -class RecommenderDataPositive(RecommenderData): - def __init__(self, pos_value, *args, **kwargs): - super(RecommenderDataPositive, self).__init__(*args, **kwargs) - self.pos_value = pos_value +class BinaryDataMixin(object): + def __init__(self, *args, **kwargs): + self.binary_threshold = kwargs.pop('binary_threshold', None) + super(BinaryDataMixin, self).__init__(*args, **kwargs) + + def _binarize(self, data, return_filtered_users=False): + feedback = self.fields.feedback + data = data[data[feedback] >= self.binary_threshold].copy() + data[feedback] = np.ones_like(data[feedback]) + return data def _split_test_data(self): - super(RecommenderDataPositive, self)._split_test_data() - self._get_positive_only() + super(BinaryDataMixin, self)._split_test_data() + if self.binary_threshold is not None: + self._training = self._binarize(self._training) - def _get_positive_only(self): - userid, feedback = self.fields.userid, self.fields.feedback - pos_only_data = self._training.loc[self._training[feedback] >= self.pos_value] - valid_users = pos_only_data.groupby(userid, sort=False).size() > self.holdout_size - user_idx = valid_users.index[valid_users] - self._training = pos_only_data[pos_only_data.userid.isin(user_idx)].copy() + def _split_eval_data(self): + super(BinaryDataMixin, self)._split_eval_data() + if self.binary_threshold is not None: + userid = self.fields.userid + testset = self._binarize(self.test.testset) + test_users = testset[userid].unique() + user_sel = self.test.evalset[userid].isin(test_users) + evalset = self.test.evalset[user_sel].copy() + self._test = namedtuple('TestData', 'testset evalset')._make([testset, evalset]) + if len(test_users) != (testset[userid].max()+1): + # remove gaps in test user indices + self._update_test_user_index() + + def _update_test_user_index(self): + testset, evalset = self._test + userid = self.fields.userid + new_test_idx = self.reindex(testset, userid, sort=False, inplace=True) + evalset.loc[:, userid] = evalset[userid].map(new_test_idx.set_index('old').new) + new_test_idx.old = new_test_idx.old.map(self.index.userid.test.set_index('new').old) + self.index = self.index._replace(userid=self.index.userid._replace(test=new_test_idx)) + + +class LongTailMixin(object): + def __init__(self, *args, **kwargs): + self.long_tail_holdout = kwargs.pop('long_tail_holdout', False) + # use predefined list if defined + self.short_head_items = kwargs.pop('short_head_items', None) + # amount of feedback accumulated in short head + self.head_feedback_frac = kwargs.pop('head_feedback_frac', 0.33) + # fraction of popular items considered as short head + self.head_items_frac = kwargs.pop('head_items_frac', None) + self._long_tail_items = None + super(LongTailMixin, self).__init__(*args, **kwargs) + + @property + def long_tail_items(self): + if self.short_head_items is not None: + short_head = self.short_head_items + long_tail = self.index.itemid.query('old not in @short_head').new.values + else: + long_tail = self._get_long_tail() + return long_tail + + def _get_long_tail(self): + itemid = self.fields.itemid + popularity = self.training[itemid].value_counts(ascending=False, normalize=True) + tail_idx = None + + if self.head_items_frac: + self.head_feedback_frac = None # could in principle calculate real value instead + items_frac = np.arange(1, len(popularity)+1) / len(popularity) + tail_idx = items_frac > self.head_items_frac + + if self.head_feedback_frac: + tail_idx = popularity.cumsum().values > self.head_feedback_frac + + if tail_idx is None: + long_tail = None + self.long_tail_holdout = False + else: + long_tail = popularity.index[tail_idx] + + return long_tail + + def _sample_holdout(self, data): + if self.long_tail_holdout: + itemid = self.fields.itemid + long_tail_sel = data[itemid].isin(self.long_tail_items) + self.__head_data = data[~long_tail_sel] + data = data[long_tail_sel] + return super(LongTailMixin, self)._sample_holdout(data) + + def _sample_test_data(self, data): + if self.long_tail_holdout: + data = pd.concat([self.__head_data, data], copy=True) + del self.__head_data + return super(LongTailMixin, self)._sample_test_data(data) diff --git a/polara/recommender/defaults.py b/polara/recommender/defaults.py index e2a4e92..fc670fe 100644 --- a/polara/recommender/defaults.py +++ b/polara/recommender/defaults.py @@ -1,6 +1,6 @@ import sys -#INITIALIZATION +#DATA #properties that require rebuilding test data: test_ratio = 0.2 #split 80% of users for training, 20% for test test_fold = 5 #which fold to use for test data @@ -25,6 +25,7 @@ show_output = False flattener = slice(0, None) + #RECOMMENDATIONS topk = 10 #number of items to return filter_seen = True #prevent seen items from appearing in recommendations diff --git a/polara/recommender/evaluation.py b/polara/recommender/evaluation.py index 7bbeafc..7092b57 100644 --- a/polara/recommender/evaluation.py +++ b/polara/recommender/evaluation.py @@ -8,8 +8,8 @@ def unmask(x): return None if x is np.ma.masked else x -def get_hits(matched_predictions, positive_feedback): - reldata = get_relevance_data(matched_predictions, positive_feedback) +def get_hits(matched_predictions, positive_feedback, not_rated_penalty): + reldata = get_relevance_data(matched_predictions, positive_feedback, not_rated_penalty) true_pos, false_pos = reldata.tp, reldata.fp true_neg, false_neg = reldata.tn, reldata.fn @@ -23,22 +23,23 @@ def get_hits(matched_predictions, positive_feedback): return hits -def get_relevance_scores(matched_predictions, positive_feedback): +def get_relevance_scores(matched_predictions, positive_feedback, not_rated_penalty): users_num = matched_predictions.shape[0] - reldata = get_relevance_data(matched_predictions, positive_feedback) + reldata = get_relevance_data(matched_predictions, positive_feedback, not_rated_penalty) true_pos, false_pos = reldata.tp, reldata.fp true_neg, false_neg = reldata.tn, reldata.fn - # true positive rate - precision = true_pos / (true_pos + false_pos) - # sensitivity - recall = true_pos / (true_pos + false_neg) - # false positive rate - fallout = false_pos / (false_pos + true_neg) - # true negative rate - specifity = true_neg / (false_pos + true_neg) - # false negative rate - miss_rate = false_neg / (false_neg + true_pos) + with np.errstate(invalid='ignore'): + # true positive rate + precision = true_pos / (true_pos + false_pos) + # sensitivity + recall = true_pos / (true_pos + false_neg) + # false positive rate + fallout = false_pos / (false_pos + true_neg) + # true negative rate + specifity = true_neg / (false_pos + true_neg) + # false negative rate + miss_rate = false_neg / (false_neg + true_pos) #average over all users precision = unmask(np.nansum(precision) / users_num) @@ -81,13 +82,16 @@ def get_ranking_scores(matched_predictions, feedback_data, switch_positive, alte dcl = (relevance_scores_neg / -discount[:topk]).sum(axis=1) idcg = (ideal_scores_pos / discount[:holdout]).sum(axis=1) idcl = (ideal_scores_neg / -discount[:holdout]).sum(axis=1) - ndcg = unmask(np.nansum(dcg / idcg) / users_num) - ndcl = unmask(np.nansum(dcl / idcl) / users_num) + + with np.errstate(invalid='ignore'): + ndcg = unmask(np.nansum(dcg / idcg) / users_num) + ndcl = unmask(np.nansum(dcl / idcl) / users_num) + ranking_score = namedtuple('Ranking', ['nDCG', 'nDCL'])._make([ndcg, ndcl]) return ranking_score -def get_relevance_data(matched_items, positive_feedback, not_rated_penalty=0): +def get_relevance_data(matched_items, positive_feedback, not_rated_penalty): negative_feedback = ~positive_feedback missed_items = ~matched_items diff --git a/polara/recommender/models.py b/polara/recommender/models.py index d47dd09..31a7375 100644 --- a/polara/recommender/models.py +++ b/polara/recommender/models.py @@ -1,3 +1,4 @@ +from collections import namedtuple from timeit import default_timer as timer import pandas as pd import numpy as np @@ -7,15 +8,49 @@ from scipy.sparse.linalg import svds from polara.recommender import data, defaults from polara.recommender.evaluation import get_hits, get_relevance_scores, get_ranking_scores -from polara.recommender.utils import array_split +from polara.recommender.utils import array_split, NNZ_MAX from polara.lib.hosvd import tucker_als +from polara.lib.sparse import csc_matvec def get_default(name): return defaults.get_config([name])[name] +class Timer(): + def __init__(self, model_name='Model'): + self.model_name = model_name + self.message = '{} training time: {}s' + + def __enter__(self): + self.start = timer() + return None # could return anything, to be used like this: with Timer("Message") as value: + + def __exit__(self, type, value, traceback): + elapsed_time = timer() - self.start + print(self.message.format(self.model_name, elapsed_time)) + + +class MetaModel(type): + # this metaclass ensures that every time the build function is called, + # all cached recommendations are cleared + # key idea is borrowed from here: + # https://stackoverflow.com/questions/18858759/python-decorating-a-class-method-that-is-intended-to-be-overwritten-when-inheri + def __init__(cls, name, bases, clsdict): + super(MetaModel, cls).__init__(name, bases, clsdict) + if 'build' in clsdict: + def clean_build(self, *args, **kwargs): + self._is_ready = False + self._recommendations = None + clsdict['build'](self, *args, **kwargs) + self._is_ready = True + setattr(cls, 'build', clean_build) + + class RecommenderModel(object): + + __metaclass__ = MetaModel + _config = ('topk', 'filter_seen', 'switch_positive', 'verify_integrity') _pad_const = -1 # used for sparse data @@ -33,17 +68,20 @@ def __init__(self, recommender_data, switch_positive=None): # (in contrast to `on_feedback_level` argument of self.evaluate) self.switch_positive = switch_positive or get_default('switch_positive') self.verify_integrity = get_default('verify_integrity') + self.not_rated_penalty = 0 + + self._is_ready = False + self.data._attach_model('on_change', self, '_on_change') + self.data._attach_model('on_update', self, '_on_update') @property def recommendations(self): - if (self._recommendations is None): - try: - self._recommendations = self.get_recommendations() - except AttributeError: + if self._recommendations is None: + if not self._is_ready: print '{} model is not ready. Rebuilding.'.format(self.method) self.build() - self._recommendations = self.get_recommendations() + self._recommendations = self.get_recommendations() return self._recommendations @@ -63,6 +101,14 @@ def build(self): raise NotImplementedError('This must be implemented in subclasses') + def _on_change(self): + self._recommendations = None + self._is_ready = False + + def _on_update(self): + self._recommendations = None + + def _get_slices_idx(self, shape, result_width=None, scores_multiplier=None, dtypes=None): result_width = result_width or self.topk if scores_multiplier is None: @@ -99,20 +145,30 @@ def _slice_test_data(self, test_data, start, stop): return (user_slice_coo, item_slice_coo, fdbk_slice_coo) + def get_training_matrix(self, dtype=None): + idx, val, shp = self.data.to_coo(tensor_mode=False) + dtype = dtype or val.dtype + matrix = csr_matrix((val, (idx[:, 0], idx[:, 1])), + shape=shp, dtype=dtype) + return matrix + + def get_test_matrix(self, test_data, shape, user_slice=None): + num_users_all = shape[0] if user_slice: start, stop = user_slice + stop = min(stop, num_users_all) num_users = stop - start coo_data = self._slice_test_data(test_data, start, stop) else: - num_users = shape[0] + num_users = num_users_all coo_data = test_data user_coo, item_coo, fdbk_coo = coo_data num_items = shape[1] test_matrix = csr_matrix((fdbk_coo, (user_coo, item_coo)), shape=(num_users, num_items), - dtype=np.float64) + dtype=fdbk_coo.dtype) return test_matrix, coo_data @@ -120,10 +176,69 @@ def slice_recommendations(self, test_data, shape, start, end): raise NotImplementedError('This must be implemented in subclasses') - def user_recommendations(self, i): + def _user_scores(self, i): + # should not be exposed, designed for use within framework + # operates on internal itemid's test_data, test_shape = self._get_test_data() scores, seen_idx = self.slice_recommendations(test_data, test_shape, i, i+1) - return scores.squeeze(), seen_idx[1] + + if self.filter_seen: + self.downvote_seen_items(scores, seen_idx) + + return scores, seen_idx + + + def _make_user(self, user_info): + # converts external user info into internal representation + userid, itemid, feedback = self.data.fields + + if isinstance(user_info, dict): + user_info = user_info.items() + try: + items_data, feedback_data = zip(*user_info) + except TypeError: + items_data = user_info + feedback_val = self.data.training[feedback].max() + feedback_data = [feedback_val]*len(items_data) + + # need to convert itemid's to internal representation + # conversion is not required for feedback (it's made in *to_coo functions, if needed) + items_data = self.data.index.itemid.set_index('old').loc[items_data, 'new'].values + user_data = pd.DataFrame({userid:[0]*len(items_data), + itemid:items_data, + feedback:feedback_data}) + return user_data + + + def show_recommendations(self, user_info, topk=None): + # convenience function to model users and get recs + # operates on external itemid's + if isinstance(user_info, int): + scores, seen_idx = self._user_scores(user_info) + else: + testset = self.data.test.testset + evalset = self.data.test.evalset + user_data = self._make_user(user_info) + try: + # makes a "fake" test user + self.data._test = namedtuple('TestData', 'testset evalset')._make([user_data, None]) + scores, seen_idx = self._user_scores(0) + finally: + # restore original data - prevent information loss + self.data._test = namedtuple('TestData', 'testset evalset')._make([testset, evalset]) + + _topk = self.topk + self.topk = topk or _topk + # takes care of both sparse and dense recommendation lists + top_recs = self.get_topk_items(scores).squeeze() #remove singleton + self.topk = _topk + + seen_idx = seen_idx[1] # only items idx + # covert back to external representation + item_idx_map = self.data.index.itemid.set_index('new') + top_recs = item_idx_map.loc[top_recs, 'old'].values + seen_items = item_idx_map.loc[seen_idx, 'old'].values + return top_recs, seen_items def get_recommendations(self): @@ -142,7 +257,12 @@ def get_recommendations(self): scores, slice_data = self.slice_recommendations(test_data, test_shape, start, stop) if self.filter_seen: - #prevent seen items from appearing in recommendations + # prevent seen items from appearing in recommendations + # NOTE: in case of sparse models (e.g. simple item-to-item) + # there's a risk of having seen items in recommendations list + # (for topk < i2i_matrix.shape[1]-len(unseen)) + # this is related to low generalization ability + # of the naive cooccurrence method itself, not to the algorithm self.downvote_seen_items(scores, slice_data) top_recs[start:stop, :] = self.get_topk_items(scores) @@ -179,12 +299,13 @@ def get_feedback_data(self, on_level=None): if on_level is not None: try: iter(on_level) + except TypeError: + feedback_data = np.ma.masked_not_equal(feedback_data, on_level) + else: mask_level = np.in1d(feedback_data.ravel(), on_level, invert=True).reshape(feedback_data.shape) feedback_data = np.ma.masked_where(mask_level, feedback_data) - except TypeError: - feedback_data = np.ma.masked_not_equal(feedback_data, on_level) return feedback_data @@ -204,13 +325,13 @@ def evaluate(self, method='hits', topk=None, on_feedback_level=None): if method == 'relevance': positive_feedback = self.get_positive_feedback(on_feedback_level) - scores = get_relevance_scores(matched_predictions, positive_feedback) + scores = get_relevance_scores(matched_predictions, positive_feedback, self.not_rated_penalty) elif method == 'ranking': feedback = self.get_feedback_data(on_feedback_level) scores = get_ranking_scores(matched_predictions, feedback, self.switch_positive) elif method == 'hits': positive_feedback = self.get_positive_feedback(on_feedback_level) - scores = get_hits(matched_predictions, positive_feedback) + scores = get_hits(matched_predictions, positive_feedback, self.not_rated_penalty) else: raise NotImplementedError return scores @@ -247,7 +368,11 @@ def downvote_seen_items(recs, idx_seen): lowered = recs.data.min() - (seen_data.max() - seen_data) - 1 recs.data[idx_seen_bool] = lowered else: - idx_seen_flat = np.ravel_multi_index(idx_seen, recs.shape) + try: + idx_seen_flat = np.ravel_multi_index(idx_seen, recs.shape) + except ValueError: + # make compatible for single user recommendations + idx_seen_flat = idx_seen seen_data = recs.flat[idx_seen_flat] # move seen items scores below minimum value lowered = recs.min() - (seen_data.max() - seen_data) - 1 @@ -257,6 +382,7 @@ def downvote_seen_items(recs, idx_seen): def get_topk_items(self, scores): topk = self.topk if sp.sparse.issparse(scores): + assert scores.format == 'csr' # there can be less then topk values in some rows # need to extend sorted scores to conform with evaluation matrix shape # can do this by adding -1's to the right, however: @@ -278,7 +404,18 @@ def topscore(x, k): idx = scores.nonzero() row_data = pd.DataFrame({'data': scores.data, 'cols': idx[1]}).groupby(idx[0], sort=True) - recs = np.asarray(row_data.apply(topscore, topk).tolist()) + nnz_users = row_data.grouper.levels[0] + num_users = scores.shape[0] + if len(nnz_users) < num_users: + # scores may have zero-valued rows, this breaks get_topk_items + # as scores.nonzero() will filter out indices of those rows. + # Need to restore full data with zeros in that case. + recs = np.empty((num_users, topk)) + zero_rows = np.in1d(np.arange(num_users), nnz_users, assume_unique=True, invert=True) + recs[zero_rows, :] = self._pad_const + recs[~zero_rows, :] = np.asarray(row_data.apply(topscore, topk).tolist()) + else: + recs = np.asarray(row_data.apply(topscore, topk).tolist()) else: # apply_along_axis is more memory efficient then argsort on full array recs = np.apply_along_axis(self.topsort, 1, scores, topk) @@ -286,12 +423,16 @@ def topscore(x, k): @staticmethod - def orthogonalize(u, v): + def orthogonalize(u, v, complete=False): Qu, Ru = np.linalg.qr(u) Qv, Rv = np.linalg.qr(v) - Ur, Sr, Vr = np.linalg.svd(Ru.dot(Rv.T)) - U = Qu.dot(Ur) - V = Qv.dot(Vr.T) + if complete: + # it's not needed for folding-in, as Ur and Vr will cancel out anyway + Ur, Sr, Vr = np.linalg.svd(Ru.dot(Rv.T)) + U = Qu.dot(Ur) + V = Qv.dot(Vr.T) + else: + U, V = Qu, Qv return U, V @@ -321,7 +462,7 @@ def __init__(self, kind, *args, **kwargs): def build(self): - self._recommendations = None + pass def get_recommendations(self): @@ -358,48 +499,54 @@ def __init__(self, *args, **kwargs): super(CooccurrenceModel, self).__init__(*args, **kwargs) self.method = 'item-to-item' #pick some meaningful name self.implicit = True + self.dense_output = False def build(self): - self._recommendations = None - idx, val, shp = self.data.to_coo() - + user_item_matrix = self.get_training_matrix() if self.implicit: - val = np.ones_like(val) + # np.sign allows for negative values as well + user_item_matrix.data = np.sign(user_item_matrix.data) - user_item_matrix = csr_matrix((val, (idx[:, 0], idx[:, 1])), - shape=shp, dtype=np.float64) - tik = timer() - i2i_matrix = user_item_matrix.T.dot(user_item_matrix) - - #exclude "self-links" - diag_vals = i2i_matrix.diagonal() - i2i_matrix -= sp.sparse.dia_matrix((diag_vals, 0), shape=i2i_matrix.shape) - tok = timer() - tik - print '{} model training time: {}s'.format(self.method, tok) + with Timer(self.method): + i2i_matrix = user_item_matrix.T.dot(user_item_matrix) # gives CSC format + i2i_matrix.setdiag(0) #exclude "self-links" + i2i_matrix.eliminate_zeros() self._i2i_matrix = i2i_matrix - def get_recommendations(self): - test_data = self.data.test_to_coo() - test_shape = self.data.get_test_shape() - test_matrix, _ = self.get_test_matrix(test_data, test_shape) - if self.implicit: - test_matrix.data = np.ones_like(test_matrix.data) + def _sparse_dot(self, tst_mat, i2i_mat): + # scipy always returns sparse result, even if dot product is actually dense + # this function offers solution to this problem + # it also takes care on sparse result w.r.t. to further processing + if self.dense_output: # calculate dense result directly + # TODO implement matmat multiplication instead of iteration with matvec + res_type = np.result_type(i2i_mat.dtype, tst_mat.dtype) + scores = np.empty((tst_mat.shape[0], i2i_mat.shape[1]), dtype=res_type) + for i in xrange(tst_mat.shape[0]): + v = tst_mat.getrow(i) + scores[i, :] = csc_matvec(i2i_mat, v, dense_output=True, dtype=res_type) + else: + scores = tst_mat.dot(i2i_mat.T) + # NOTE even though not neccessary for symmetric i2i matrix, + # transpose helps to avoid expensive conversion to CSR (performed by scipy) + if scores.nnz > NNZ_MAX: + # too many nnz lead to undesired memory overhead in downvote_seen_items + scores = scores.toarray(order='C') + return scores - i2i_scores = test_matrix.dot(self._i2i_matrix) - if self.filter_seen: - # prevent seen items from appearing in recommendations; - # caution: there's a risk of having seen items in the list - # (for topk < i2i_matrix.shape[1]-len(unseen)) - # this is related to low generalization ability - # of the naive cooccurrence method itself, not to the algorithm - self.downvote_seen_items(i2i_scores, test_data) - - top_recs = self.get_topk_items(i2i_scores) - return top_recs + def slice_recommendations(self, test_data, shape, start, stop): + test_matrix, slice_data = self.get_test_matrix(test_data, shape, (start, stop)) + # NOTE CSR format is mandatory for proper handling of signle user + # recommendations, as vector of shape (1, N) in CSC format is inefficient + + if self.implicit: + test_matrix.data = np.sign(test_matrix.data) + + scores = self._sparse_dot(test_matrix, self._i2i_matrix) + return scores, slice_data class SVDModel(RecommenderModel): @@ -408,18 +555,14 @@ def __init__(self, *args, **kwargs): super(SVDModel, self).__init__(*args, **kwargs) self.rank = defaults.svd_rank self.method = 'SVD' + self.operator = None - def build(self): - self._recommendations = None - idx, val, shp = self.data.to_coo(tensor_mode=False) - svd_matrix = csr_matrix((val, (idx[:, 0], idx[:, 1])), - shape=shp, dtype=np.float64) + def build(self, operator=None): + svd_matrix = self.operator or operator or self.get_training_matrix(dtype=np.float64) - tik = timer() - _, _, items_factors = svds(svd_matrix, k=self.rank, return_singular_vectors='vh') - tok = timer() - tik - print '{} model training time: {}s'.format(self.method, tok) + with Timer(self.method): + _, _, items_factors = svds(svd_matrix, k=self.rank, return_singular_vectors='vh') self._items_factors = np.ascontiguousarray(items_factors[::-1, :]).T @@ -483,16 +626,15 @@ def flatten_scores(tensor_scores, flattener=None): def build(self): - self._recommendations = None idx, val, shp = self.data.to_coo(tensor_mode=True) - tik = timer() - users_factors, items_factors, feedback_factors, core = \ - tucker_als(idx, val, shp, self.mlrank, - growth_tol=self.growth_tol, - iters = self.num_iters, - batch_run=not self.show_output) - tok = timer() - tik - print '{} model training time: {}s'.format(self.method, tok) + + with Timer(self.method): + users_factors, items_factors, feedback_factors, core = \ + tucker_als(idx, val, shp, self.mlrank, + growth_tol=self.growth_tol, + iters = self.num_iters, + batch_run=not self.show_output) + self._users_factors = users_factors self._items_factors = items_factors self._feedback_factors = feedback_factors diff --git a/polara/recommender/utils.py b/polara/recommender/utils.py index ea3903d..a9805e9 100644 --- a/polara/recommender/utils.py +++ b/polara/recommender/utils.py @@ -1,14 +1,23 @@ from __future__ import division +import sys import numpy as np from polara.tools.systools import get_available_memory -MEMORY_HARD_LIMIT = 1 # in gigbytes, default=1 +MEMORY_HARD_LIMIT = 1 # in gigbytes, default=1, depends on hardware # varying this value may significantly impact performance # setting it to None or large value typically reduces performance, # as iterating over a smaller number of huge arrays takes longer # than over a higher number of smaller arrays +tuplsize = sys.getsizeof(()) +itemsize = np.dtype(np.intp).itemsize +pntrsize = sys.getsizeof(1.0) +# size of list of tuples of indices - to estimate when to convert sparse matrix to dense +# based on http://stackoverflow.com/questions/15641344/python-memory-consumption-dict-vs-list-of-tuples +# and https://code.tutsplus.com/tutorials/understand-how-much-memory-your-python-objects-use--cms-25609 +NNZ_MAX = int(MEMORY_HARD_LIMIT * (1024**3) / (tuplsize + 2*(pntrsize + itemsize))) + def range_division(length, fit_size): # based on np.array_split diff --git a/polara/tools/movielens.py b/polara/tools/movielens.py index 4111e4a..31eebc0 100644 --- a/polara/tools/movielens.py +++ b/polara/tools/movielens.py @@ -4,50 +4,55 @@ from pandas.io.common import ZipFile -def get_movielens_data(local_file=None, get_genres=False): +def get_movielens_data(local_file=None, get_ratings=True, get_genres=False, + split_genres=True, db_mapping=False): '''Downloads movielens data and stores it in pandas dataframe. ''' if not local_file: - #print 'Downloading data...' + # downloading data zip_file_url = 'http://files.grouplens.org/datasets/movielens/ml-1m.zip' zip_response = get(zip_file_url) zip_contents = StringIO(zip_response.content) - #print 'Done.' else: zip_contents = local_file - #print 'Loading data into memory...' + ml_data = ml_genres = mapping = None + # loading data into memory with ZipFile(zip_contents) as zfile: zip_files = pd.Series(zfile.namelist()) zip_file = zip_files[zip_files.str.contains('ratings')].iat[0] - zdata = zfile.read(zip_file) - if 'latest' in zip_file: - header = 0 - else: - header = None - delimiter = ',' - zdata = zdata.replace('::', delimiter) # makes data compatible with pandas c-engine - ml_data = pd.read_csv(StringIO(zdata), sep=delimiter, header=header, engine='c', - names=['userid', 'movieid', 'rating', 'timestamp'], - usecols=['userid', 'movieid', 'rating']) + is_latest = 'latest' in zip_file + header = 0 if is_latest else None + if get_ratings: + zdata = zfile.read(zip_file) + delimiter = ',' + zdata = zdata.replace('::', delimiter) # makes data compatible with pandas c-engine + ml_data = pd.read_csv(StringIO(zdata), sep=delimiter, header=header, engine='c', + names=['userid', 'movieid', 'rating', 'timestamp'], + usecols=['userid', 'movieid', 'rating']) if get_genres: zip_file = zip_files[zip_files.str.contains('movies')].iat[0] with zfile.open(zip_file) as zdata: - if 'latest' in zip_file: - delimiter = ',' - else: - delimiter = '::' + delimiter = ',' if is_latest else '::' genres_data = pd.read_csv(zdata, sep=delimiter, header=header, engine='python', names=['movieid', 'movienm', 'genres']) - ml_genres = split_genres(genres_data) - ml_data = (ml_data, ml_genres) + ml_genres = get_split_genres(genres_data) if split_genres else genres_data - return ml_data + if is_latest and db_mapping: + # imdb and tmdb mapping - exists only in ml-latest datasets + zip_file = zip_files[zip_files.str.contains('links')].iat[0] + with zfile.open(zip_file) as zdata: + mapping = pd.read_csv(zdata, sep=',', header=0, engine='c', + names=['movieid', 'imdbid', 'tmdbid']) + + res = [data for data in [ml_data, ml_genres, mapping] if data is not None] + if len(res)==1: res = res[0] + return res -def split_genres(genres_data): +def get_split_genres(genres_data): genres_data.index.name = 'movie_idx' genres_stacked = genres_data.genres.str.split('|', expand=True).stack().to_frame('genreid') ml_genres = genres_data[['movieid', 'movienm']].join(genres_stacked).reset_index(drop=True) diff --git a/polara/tools/printing.py b/polara/tools/printing.py index 78933f0..3e86de3 100644 --- a/polara/tools/printing.py +++ b/polara/tools/printing.py @@ -1,4 +1,6 @@ from IPython.display import HTML +from contextlib import contextmanager +import sys, os def print_frames(dataframes): if not isinstance(dataframes, tuple): @@ -14,4 +16,15 @@ def print_frames(dataframes): ''' - return HTML(table) \ No newline at end of file + return HTML(table) + +# from http://thesmithfam.org/blog/2012/10/25/temporarily-suppress-console-output-in-python/# +@contextmanager +def suppress_stdout(): + with open(os.devnull, "w") as devnull: + old_stdout = sys.stdout + sys.stdout = devnull + try: + yield + finally: + sys.stdout = old_stdout diff --git a/polara/tools/systools.py b/polara/tools/systools.py index ab6314a..06a48f2 100644 --- a/polara/tools/systools.py +++ b/polara/tools/systools.py @@ -1,4 +1,5 @@ from __future__ import division +import os import sys import ctypes