From 8cfeb380066730479635db691c62f6bbcd48afc7 Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Fri, 22 Nov 2024 06:54:43 -0500 Subject: [PATCH] add scheduler task for monthly reporter tasks - update monthly reporters with `iter_report_kwargs` (mostly affects `PublicItemUsageReporter`, which was badly optimized to generate many reports at once) - add `schedule_monthly_reporter` task that schedules tasks from `iter_report_kwargs` results - change `MonthlyReporter.followup_task()` to run per-report --- admin/management/views.py | 8 +- osf/features.yaml | 5 + .../commands/monthly_reporters_go.py | 125 +++++-- osf/metrics/preprint_metrics.py | 4 +- osf/metrics/reporters/_base.py | 19 +- .../reporters/institution_summary_monthly.py | 13 +- osf/metrics/reporters/institutional_users.py | 30 +- osf/metrics/reporters/public_item_usage.py | 346 +++++++----------- osf/metrics/reporters/spam_count.py | 7 +- osf/metrics/utils.py | 8 + osf_tests/metrics/reporters/__init__.py | 0 osf_tests/metrics/reporters/_testutils.py | 10 + .../test_institutional_summary_reporter.py | 7 +- .../test_institutional_users_reporter.py | 15 +- .../test_public_item_usage_reporter.py | 148 +++++--- osf_tests/metrics/test_yearmonth.txt | 7 + 16 files changed, 418 insertions(+), 334 deletions(-) create mode 100644 osf_tests/metrics/reporters/__init__.py create mode 100644 osf_tests/metrics/reporters/_testutils.py diff --git a/admin/management/views.py b/admin/management/views.py index 88548a518d1..bb7065c1062 100644 --- a/admin/management/views.py +++ b/admin/management/views.py @@ -12,6 +12,7 @@ from scripts.find_spammy_content import manage_spammy_content from django.urls import reverse from django.shortcuts import redirect +from osf.metrics.utils import YearMonth from osf.models import Preprint, Node, Registration @@ -122,8 +123,11 @@ def post(self, request, *args, **kwargs): report_date = None errors = monthly_reporters_go( - report_month=getattr(report_date, 'month', None), - report_year=getattr(report_date, 'year', None) + yearmonth=( + str(YearMonth.from_date(report_date)) + if report_date is not None + else '' + ), ) if errors: diff --git a/osf/features.yaml b/osf/features.yaml index a3f0fcc1f14..1b41e4b2cdc 100644 --- a/osf/features.yaml +++ b/osf/features.yaml @@ -221,3 +221,8 @@ switches: - flag_name: ENABLE_INACTIVE_SCHEMAS name: enable_inactive_schemas note: This is no longer used + + - flag_name: COUNTEDUSAGE_UNIFIED_METRICS_2024 + name: countedusage_unified_metrics_2024 + note: use only `osf.metrics.counted_usage`-based metrics where possible; un-use PageCounter, PreprintView, PreprintDownload, etc + active: false diff --git a/osf/management/commands/monthly_reporters_go.py b/osf/management/commands/monthly_reporters_go.py index c467640cd15..aeb9435b27e 100644 --- a/osf/management/commands/monthly_reporters_go.py +++ b/osf/management/commands/monthly_reporters_go.py @@ -1,54 +1,86 @@ import logging from django.core.management.base import BaseCommand -from django.db.utils import OperationalError +from django.db import OperationalError as DjangoOperationalError from django.utils import timezone +from elasticsearch.exceptions import ConnectionError as ElasticConnectionError +from psycopg2 import OperationalError as PostgresOperationalError from framework.celery_tasks import app as celery_app from osf.metrics.reporters import AllMonthlyReporters from osf.metrics.utils import YearMonth -from website.app import init_app logger = logging.getLogger(__name__) -MAXMONTH = 12 - +_RETRY_ERRORS = ( + DjangoOperationalError, + ElasticConnectionError, + PostgresOperationalError, +) @celery_app.task(name='management.commands.monthly_reporters_go') -def monthly_reporters_go(report_year=None, report_month=None): - init_app() # OSF-specific setup - - if report_year and report_month: - report_yearmonth = YearMonth(report_year, report_month) - else: # default to last month if year and month not provided - today = timezone.now().date() - report_yearmonth = YearMonth( - year=today.year if today.month > 1 else today.year - 1, - month=today.month - 1 or MAXMONTH, - ) - for _reporter_key in AllMonthlyReporters.__members__.keys(): - monthly_reporter_go.apply_async(kwargs={ +def monthly_reporters_go(yearmonth: str, reporter_key: str = ''): + assert YearMonth.from_str(yearmonth) is not None + _reporter_keys = ( + [reporter_key] + if reporter_key + else _enum_names(AllMonthlyReporters) + ) + for _reporter_key in _reporter_keys: + schedule_monthly_reporter.apply_async(kwargs={ 'reporter_key': _reporter_key, - 'yearmonth': str(report_yearmonth), + 'yearmonth': yearmonth, }) +@celery_app.task(name='management.commands.schedule_monthly_reporter') +def schedule_monthly_reporter( + yearmonth: str, + reporter_key: str, + continue_after: dict | None = None, +): + _reporter = _get_reporter(reporter_key, yearmonth) + _last_kwargs = None + _done = False + try: + for _kwargs in _reporter.iter_report_kwargs(continue_after=continue_after): + monthly_reporter_do.apply_async(kwargs={ + 'reporter_key': reporter_key, + 'yearmonth': yearmonth, + 'report_kwargs': _kwargs, + }) + _last_kwargs = _kwargs + _done = True + except _RETRY_ERRORS: + if not _done and (_last_kwargs is not None): + schedule_monthly_reporter.apply_async(kwargs={ + 'yearmonth': yearmonth, + 'reporter_key': reporter_key, + 'continue_after': _last_kwargs, + }) + + @celery_app.task( - name='management.commands.monthly_reporter_go', - autoretry_for=(OperationalError,), + name='management.commands.monthly_reporter_do', + autoretry_for=( + DjangoOperationalError, + ElasticConnectionError, + PostgresOperationalError, + ), max_retries=5, retry_backoff=True, - bind=True, ) -def monthly_reporter_go(task, reporter_key: str, yearmonth: str): - _reporter_class = AllMonthlyReporters[reporter_key].value - _reporter = _reporter_class(YearMonth.from_str(yearmonth)) - _reporter.run_and_record_for_month() - _followup = _reporter.followup_task() - if _followup is not None: - _followup.apply_async() +def monthly_reporter_do(reporter_key: str, yearmonth: str, report_kwargs: dict): + _reporter = _get_reporter(reporter_key, yearmonth) + _report = _reporter.report(**report_kwargs) + if _report is not None: + _report.report_yearmonth = _reporter.yearmonth + _report.save() + _followup_task = _reporter.followup_task(_report) + if _followup_task is not None: + _followup_task.apply_async() class Command(BaseCommand): @@ -56,13 +88,38 @@ def add_arguments(self, parser): parser.add_argument( 'yearmonth', type=YearMonth.from_str, - default={'year': None, 'month': None}, + default=None, help='year and month (YYYY-MM)', ) + parser.add_argument( + '-r', '--reporter', + type=str, + choices={_name.lower() for _name in _enum_names(AllMonthlyReporters)}, + default='', + help='name of the reporter to run (default all)', + ) - def handle(self, *args, **options): - monthly_reporters_go( - report_year=getattr(options.get('yearmonth'), 'year', None), - report_month=getattr(options.get('yearmonth'), 'month', None), + def handle(self, *args, **kwargs): + _yearmonth = ( + kwargs.get('yearmonth') + or YearMonth.from_date(timezone.now().date()).prior() # default last month + ) + _reporter_key = kwargs.get('reporter', '').upper() + monthly_reporters_go.delay( + yearmonth=str(_yearmonth), + reporter_key=_reporter_key, ) - self.stdout.write(self.style.SUCCESS('reporter tasks scheduled.')) + self.stdout.write(self.style.SUCCESS( + f'scheduling tasks for monthly reporter "{_reporter_key}"...' + if _reporter_key + else 'scheduling tasks for all monthly reporters...' + )) + + +def _get_reporter(reporter_key: str, yearmonth: str): + _reporter_class = AllMonthlyReporters[reporter_key].value + return _reporter_class(YearMonth.from_str(yearmonth)) + + +def _enum_names(enum_cls) -> list[str]: + return list(AllMonthlyReporters.__members__.keys()) diff --git a/osf/metrics/preprint_metrics.py b/osf/metrics/preprint_metrics.py index 472cd01f698..4b64398a5c6 100644 --- a/osf/metrics/preprint_metrics.py +++ b/osf/metrics/preprint_metrics.py @@ -37,8 +37,8 @@ def record_for_preprint(cls, preprint, user=None, **kwargs): ) @classmethod - def get_count_for_preprint(cls, preprint, after=None, before=None, index=None): - search = cls.search(after=after, before=before, index=index).filter('match', preprint_id=preprint._id) + def get_count_for_preprint(cls, preprint, after=None, before=None, index=None) -> int: + search = cls.search(index=index).filter('term', preprint_id=preprint._id) timestamp = {} if after: timestamp['gte'] = after diff --git a/osf/metrics/reporters/_base.py b/osf/metrics/reporters/_base.py index 931afe23fd0..707e869522b 100644 --- a/osf/metrics/reporters/_base.py +++ b/osf/metrics/reporters/_base.py @@ -15,18 +15,17 @@ class MonthlyReporter: yearmonth: YearMonth - def report(self) -> abc.Iterable[MonthlyReport] | abc.Iterator[MonthlyReport]: + def iter_report_kwargs(self, continue_after: dict | None = None) -> abc.Iterator[dict]: + # override for multiple reports per month + if continue_after is None: + yield {} # by default, calls `.report()` once with no kwargs + + def report(self, **report_kwargs) -> MonthlyReport | None: """build a report for the given month """ - raise NotImplementedError(f'{self.__name__} must implement `report`') - - def run_and_record_for_month(self) -> None: - reports = self.report() - for report in reports: - report.report_yearmonth = self.yearmonth - report.save() + raise NotImplementedError(f'{self.__class__.__name__} must implement `report`') - def followup_task(self) -> celery.Signature | None: + def followup_task(self, report) -> celery.Signature | None: return None @@ -36,7 +35,7 @@ def report(self, report_date): return an iterable of DailyReport (unsaved) """ - raise NotImplementedError(f'{self.__name__} must implement `report`') + raise NotImplementedError(f'{self.__class__.__name__} must implement `report`') def run_and_record_for_date(self, report_date): reports = self.report(report_date) diff --git a/osf/metrics/reporters/institution_summary_monthly.py b/osf/metrics/reporters/institution_summary_monthly.py index 998cc056298..4748860db32 100644 --- a/osf/metrics/reporters/institution_summary_monthly.py +++ b/osf/metrics/reporters/institution_summary_monthly.py @@ -11,9 +11,16 @@ class InstitutionalSummaryMonthlyReporter(MonthlyReporter): """Generate an InstitutionMonthlySummaryReport for each institution.""" - def report(self): - for institution in Institution.objects.all(): - yield self.generate_report(institution) + def iter_report_kwargs(self, continue_after: dict | None = None): + _inst_qs = Institution.objects.order_by('pk') + if continue_after: + _inst_qs = _inst_qs.filter(pk__gt=continue_after['institution_pk']) + for _pk in _inst_qs.values_list('pk', flat=True): + yield {'institution_pk': _pk} + + def report(self, **report_kwargs): + _institution = Institution.objects.get(pk=report_kwargs['institution_pk']) + return self.generate_report(_institution) def generate_report(self, institution): node_queryset = institution.nodes.filter( diff --git a/osf/metrics/reporters/institutional_users.py b/osf/metrics/reporters/institutional_users.py index e0f7f42a156..876d675b610 100644 --- a/osf/metrics/reporters/institutional_users.py +++ b/osf/metrics/reporters/institutional_users.py @@ -1,5 +1,4 @@ import dataclasses -import datetime from django.contrib.contenttypes.models import ContentType from django.db.models import Q, F, Sum @@ -22,13 +21,27 @@ class InstitutionalUsersReporter(MonthlyReporter): which offers institutional admins insight into how people at their institution are using osf, based on their explicitly-affiliated osf objects ''' - def report(self): + def iter_report_kwargs(self, continue_after: dict | None = None): _before_datetime = self.yearmonth.month_end() - for _institution in osfdb.Institution.objects.filter(created__lt=_before_datetime): + _inst_qs = ( + osfdb.Institution.objects + .filter(created__lt=_before_datetime) + .order_by('pk') + ) + if continue_after: + _inst_qs = _inst_qs.filter(pk__gte=continue_after['institution_pk']) + for _institution in _inst_qs: _user_qs = _institution.get_institution_users().filter(created__lt=_before_datetime) - for _user in _user_qs.iterator(chunk_size=_CHUNK_SIZE): - _helper = _InstiUserReportHelper(_institution, _user, self.yearmonth, _before_datetime) - yield _helper.report + if continue_after and (_institution.pk == continue_after['institution_pk']): + _user_qs = _user_qs.filter(pk__gt=continue_after['user_pk']) + for _user_pk in _user_qs.values_list('pk', flat=True).iterator(chunk_size=_CHUNK_SIZE): + yield {'institution_pk': _institution.pk, 'user_pk': _user_pk} + + def report(self, **report_kwargs): + _institution = osfdb.Institution.objects.get(pk=report_kwargs['institution_pk']) + _user = osfdb.OSFUser.objects.get(pk=report_kwargs['user_pk']) + _helper = _InstiUserReportHelper(_institution, _user, self.yearmonth) + return _helper.report # helper @@ -37,7 +50,6 @@ class _InstiUserReportHelper: institution: osfdb.Institution user: osfdb.OSFUser yearmonth: YearMonth - before_datetime: datetime.datetime report: InstitutionalUserReport = dataclasses.field(init=False) def __post_init__(self): @@ -64,6 +76,10 @@ def __post_init__(self): storage_byte_count=self._storage_byte_count(), ) + @property + def before_datetime(self): + return self.yearmonth.month_end() + def _node_queryset(self): _institution_node_qs = self.institution.nodes.filter( created__lt=self.before_datetime, diff --git a/osf/metrics/reporters/public_item_usage.py b/osf/metrics/reporters/public_item_usage.py index ecc34a5d9c7..46da74c9e3d 100644 --- a/osf/metrics/reporters/public_item_usage.py +++ b/osf/metrics/reporters/public_item_usage.py @@ -1,24 +1,26 @@ from __future__ import annotations -import typing +import datetime -import celery -if typing.TYPE_CHECKING: - import elasticsearch_dsl as edsl +import waffle +import osf.features from osf.metadata.osf_gathering import OsfmapPartition from osf.metrics.counted_usage import ( CountedAuthUsage, get_item_type, get_provider_id, ) +from osf.metrics.preprint_metrics import ( + PreprintDownload, + PreprintView, +) from osf.metrics.reports import PublicItemUsageReport +from osf.metrics.utils import YearMonth from osf import models as osfdb from website import settings as website_settings from ._base import MonthlyReporter -_CHUNK_SIZE = 500 - _MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control @@ -31,80 +33,91 @@ class PublicItemUsageReporter(MonthlyReporter): includes projects, project components, registrations, registration components, and preprints ''' - - def report(self): - # use two composite aggregations in parallel to page thru every - # public item viewed or downloaded this month, counting: - # - views and downloads for each item (using `CountedAuthUsage.item_guid`) - # - views for each item's components and files (using `CountedAuthUsage.surrounding_guids`) - for _exact_bucket, _contained_views_bucket in _zip_composite_aggs( - self._exact_item_search(), 'agg_osfid', - self._contained_item_views_search(), 'agg_surrounding_osfid', - ): - try: - _report = self._report_from_buckets(_exact_bucket, _contained_views_bucket) - yield _report - except _SkipItem: - pass - - def followup_task(self): - return task__update_monthly_metadatas.signature( - args=[str(self.yearmonth)], - countdown=30, # give index time to settle - ) - - def _report_from_buckets(self, exact_bucket, contained_views_bucket): - # either exact_bucket or contained_views_bucket may be None, but not both - assert (exact_bucket is not None) or (contained_views_bucket is not None) - _report = ( - self._init_report_from_exact_bucket(exact_bucket) - if exact_bucket is not None - else self._init_report_from_osfid(contained_views_bucket.key.osfid) + def iter_report_kwargs(self, continue_after: dict | None = None): + _osfid_qs = ( + osfdb.Guid.objects + # try to use the index on ('content_type', 'object_id', 'created') + # (will keep only the oldest-created for each referent, which is fine) + .order_by('content_type_id', 'object_id', 'created') + .distinct('content_type_id', 'object_id') ) - # view counts include views on contained items (components, files) - _report.view_count, _report.view_session_count = self._get_view_counts(_report.item_osfid) - return _report - - def _init_report_from_exact_bucket(self, exact_bucket) -> PublicItemUsageReport: - # in the (should-be common) case of an item that has been directly viewed in - # this month, the stored metrics already have the data required - _report = PublicItemUsageReport( - item_osfid=exact_bucket.key.osfid, - item_type=_agg_keys(exact_bucket.agg_item_type), - provider_id=_agg_keys(exact_bucket.agg_provider_id), - platform_iri=_agg_keys(exact_bucket.agg_platform_iri), - # default counts to zero, will be updated if non-zero - view_count=0, - view_session_count=0, - download_count=0, - download_session_count=0, - ) - for _actionbucket in exact_bucket.agg_action: - # note: view counts computed separately to avoid double-counting - if _actionbucket.key == CountedAuthUsage.ActionLabel.DOWNLOAD.value: - _report.download_count = _actionbucket.doc_count - _report.download_session_count = _actionbucket.agg_session_count.value - return _report + if continue_after: + _created_after = datetime.datetime.fromisoformat(continue_after['_osfid_created']) + _osfid_qs = ( + _osfid_qs + .filter(created__gte=_created_after) + .exclude(_id=continue_after['osfid']) + ) + for _osfid, _created in _osfid_qs.values_list('_id', 'created').iterator(): + yield {'osfid': _osfid, '_osfid_created': _created.isoformat()} + + def report(self, **report_kwargs): + _osfid = report_kwargs['osfid'] + # get usage metrics from several sources: + # - osf.metrics.counted_usage: + # - views and downloads for each item (using `CountedAuthUsage.item_guid`) + # - views for each item's components and files (using `CountedAuthUsage.surrounding_guids`) + # - osf.metrics.preprint_metrics: + # - preprint views and downloads + # - PageCounter? (no) + try: + _guid = osfdb.Guid.load(_osfid) + if _guid is None or _guid.referent is None: + raise _SkipItem + _obj = _guid.referent + _report = self._init_report(_obj) + self._fill_report_counts(_report, _obj) + if not any(( + _report.view_count, + _report.view_session_count, + _report.download_count, + _report.download_session_count, + )): + raise _SkipItem + return _report + except _SkipItem: + return None + + def followup_task(self, report): + _is_last_month = report.yearmonth.next() == YearMonth.from_date(datetime.date.today()) + if _is_last_month: + from api.share.utils import task__update_share + return task__update_share.s( + report.item_osfid, + is_backfill=True, + osfmap_partition_name=OsfmapPartition.MONTHLY_SUPPLEMENT.name, + countdown=30, # give index time to settle + ) - def _init_report_from_osfid(self, osfid: str) -> PublicItemUsageReport: - # for the (should-be unusual) case where the components/files contained by - # an item have views in this month, but the item itself does not -- - # load necessary info via django models, instead - _osfguid = osfdb.Guid.load(osfid) - if _osfguid is None or not getattr(_osfguid.referent, 'is_public', False): + def _init_report(self, osf_obj) -> PublicItemUsageReport: + if not _is_item_public(osf_obj): raise _SkipItem return PublicItemUsageReport( - item_osfid=osfid, - item_type=[get_item_type(_osfguid.referent)], - provider_id=[get_provider_id(_osfguid.referent)], + item_osfid=osf_obj._id, + item_type=[get_item_type(osf_obj)], + provider_id=[get_provider_id(osf_obj)], platform_iri=[website_settings.DOMAIN], - # default counts to zero, will be updated if non-zero - view_count=0, - view_session_count=0, - download_count=0, - download_session_count=0, + # leave counts null; will be set if there's data ) + def _fill_report_counts(self, report, osf_obj): + if ( + isinstance(osf_obj, osfdb.Preprint) + and not waffle.switch_is_active(osf.features.COUNTEDUSAGE_UNIFIED_METRICS_2024) # type: ignore[attr-defined] + ): + # note: no session-count info in preprint metrics + report.view_count = self._preprint_views(osf_obj) + report.download_count = self._preprint_downloads(osf_obj) + else: + ( + report.view_count, + report.view_session_count, + ) = self._countedusage_view_counts(osf_obj) + ( + report.download_count, + report.download_session_count, + ) = self._countedusage_download_counts(osf_obj) + def _base_usage_search(self): return ( CountedAuthUsage.search() @@ -113,59 +126,10 @@ def _base_usage_search(self): 'gte': self.yearmonth.month_start(), 'lt': self.yearmonth.month_end(), }) - .update_from_dict({'size': 0}) # only aggregations, no hits + .extra(size=0) # only aggregations, no hits ) - def _exact_item_search(self) -> edsl.Search: - '''aggregate views and downloads on each osfid (not including components/files)''' - _search = self._base_usage_search() - # the main agg: use a composite aggregation to page thru *every* item - _agg_osfid = _search.aggs.bucket( - 'agg_osfid', - 'composite', - sources=[{'osfid': {'terms': {'field': 'item_guid'}}}], - size=_CHUNK_SIZE, - ) - # nested agg: for each item, get platform_iri values - _agg_osfid.bucket('agg_platform_iri', 'terms', field='platform_iri') - # nested agg: for each item, get provider_id values - _agg_osfid.bucket('agg_provider_id', 'terms', field='provider_id') - # nested agg: for each item, get item_type values - _agg_osfid.bucket('agg_item_type', 'terms', field='item_type') - # nested agg: for each item, get download count - _agg_action = _agg_osfid.bucket( - 'agg_action', - 'terms', - field='action_labels', - include=[ - CountedAuthUsage.ActionLabel.DOWNLOAD.value, - ], - ) - # nested nested agg: get download session count - _agg_action.metric( - 'agg_session_count', - 'cardinality', - field='session_id', - precision_threshold=_MAX_CARDINALITY_PRECISION, - ) - return _search - - def _contained_item_views_search(self) -> edsl.Search: - '''iterate osfids with views on contained components and files''' - _search = ( - self._base_usage_search() - .filter('term', action_labels=CountedAuthUsage.ActionLabel.VIEW.value) - ) - # the main agg: use a composite aggregation to page thru *every* item - _search.aggs.bucket( - 'agg_surrounding_osfid', - 'composite', - sources=[{'osfid': {'terms': {'field': 'surrounding_guids'}}}], - size=_CHUNK_SIZE, - ) - return _search - - def _get_view_counts(self, osfid: str) -> tuple[int, int]: + def _countedusage_view_counts(self, osf_obj) -> tuple[int, int]: '''compute view_session_count separately to avoid double-counting (the same session may be represented in both the composite agg on `item_guid` @@ -179,8 +143,8 @@ def _get_view_counts(self, osfid: str) -> tuple[int, int]: {'term': {'action_labels': CountedAuthUsage.ActionLabel.VIEW.value}}, ], should=[ - {'term': {'item_guid': osfid}}, - {'term': {'surrounding_guids': osfid}}, + {'term': {'item_guid': osf_obj._id}}, + {'term': {'surrounding_guids': osf_obj._id}}, ], minimum_should_match=1, ) @@ -193,94 +157,54 @@ def _get_view_counts(self, osfid: str) -> tuple[int, int]: ) _response = _search.execute() _view_count = _response.hits.total - _view_session_count = _response.aggregations.agg_session_count.value + _view_session_count = ( + _response.aggregations.agg_session_count.value + if 'agg_session_count' in _response.aggregations + else 0 + ) return (_view_count, _view_session_count) - -### -# followup celery task -@celery.shared_task -def task__update_monthly_metadatas(yearmonth: str): - from api.share.utils import task__update_share - _report_search = ( - PublicItemUsageReport.search() - .filter('term', report_yearmonth=yearmonth) - .source(['item_osfid']) # return only the 'item_osfid' field - ) - for _hit in _report_search.scan(): - task__update_share.delay( - _hit.item_osfid, - is_backfill=True, - osfmap_partition_name=OsfmapPartition.MONTHLY_SUPPLEMENT.name, + def _countedusage_download_counts(self, osf_obj) -> tuple[int, int]: + '''aggregate downloads on each osfid (not including components/files)''' + _search = ( + self._base_usage_search() + .filter('term', item_guid=osf_obj._id) + .filter('term', action_labels=CountedAuthUsage.ActionLabel.DOWNLOAD.value) + ) + # agg: get download session count + _search.aggs.metric( + 'agg_session_count', + 'cardinality', + field='session_id', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + _download_count = _response.hits.total + _download_session_count = ( + _response.aggregations.agg_session_count.value + if 'agg_session_count' in _response.aggregations + else 0 + ) + return (_download_count, _download_session_count) + + def _preprint_views(self, preprint: osfdb.Preprint) -> int: + '''aggregate views on each preprint''' + return PreprintView.get_count_for_preprint( + preprint=preprint, + after=self.yearmonth.month_start(), + before=self.yearmonth.month_end(), ) - -### -# local helpers - -def _agg_keys(bucket_agg_result) -> list: - return [_bucket.key for _bucket in bucket_agg_result] - - -def _zip_composite_aggs( - search_a: edsl.Search, - composite_agg_name_a: str, - search_b: edsl.Search, - composite_agg_name_b: str, -): - '''iterate thru two composite aggregations, yielding pairs of buckets matched by key - - the composite aggregations must have matching names in `sources` so their keys can be compared - ''' - _iter_a = _iter_composite_buckets(search_a, composite_agg_name_a) - _iter_b = _iter_composite_buckets(search_b, composite_agg_name_b) - _next_a = next(_iter_a, None) - _next_b = next(_iter_b, None) - while True: - if _next_a is None and _next_b is None: - return # both done - elif _next_a is None or _next_b is None: - # one is done but not the other -- no matching needed - yield (_next_a, _next_b) - _next_a = next(_iter_a, None) - _next_b = next(_iter_b, None) - elif _next_a.key == _next_b.key: - # match -- yield and increment both - yield (_next_a, _next_b) - _next_a = next(_iter_a, None) - _next_b = next(_iter_b, None) - elif _orderable_key(_next_a) < _orderable_key(_next_b): - # mismatch -- yield and increment a (but not b) - yield (_next_a, None) - _next_a = next(_iter_a, None) - else: - # mismatch -- yield and increment b (but not a) - yield (None, _next_b) - _next_b = next(_iter_b, None) - - -def _iter_composite_buckets(search: edsl.Search, composite_agg_name: str): - '''iterate thru *all* buckets of a composite aggregation, requesting new pages as needed - - assumes the given search has a composite aggregation of the given name - - updates the search in-place for subsequent pages - ''' - while True: - _page_response = search.execute(ignore_cache=True) # reused search object has the previous page cached - try: - _agg_result = _page_response.aggregations[composite_agg_name] - except KeyError: - return # no data; all done - yield from _agg_result.buckets - # update the search for the next page - try: - _next_after = _agg_result.after_key - except AttributeError: - return # all done - else: - search.aggs[composite_agg_name].after = _next_after + def _preprint_downloads(self, preprint: osfdb.Preprint) -> int: + '''aggregate downloads on each preprint''' + return PreprintDownload.get_count_for_preprint( + preprint=preprint, + after=self.yearmonth.month_start(), + before=self.yearmonth.month_end(), + ) -def _orderable_key(composite_bucket) -> list: - return sorted(composite_bucket.key.to_dict().items()) +def _is_item_public(osfid_referent) -> bool: + if isinstance(osfid_referent, osfdb.Preprint): + return bool(osfid_referent.verified_publishable) # quacks like Preprint + return getattr(osfid_referent, 'is_public', False) # quacks like AbstractNode diff --git a/osf/metrics/reporters/spam_count.py b/osf/metrics/reporters/spam_count.py index 94290f96203..cb1c3eeb641 100644 --- a/osf/metrics/reporters/spam_count.py +++ b/osf/metrics/reporters/spam_count.py @@ -8,11 +8,12 @@ class SpamCountReporter(MonthlyReporter): - def report(self): + def report(self, **report_kwargs): + assert not report_kwargs target_month = self.yearmonth.month_start() next_month = self.yearmonth.month_end() - report = SpamSummaryReport( + return SpamSummaryReport( # Node Log entries node_confirmed_spam=NodeLog.objects.filter( action=NodeLog.CONFIRM_SPAM, @@ -79,5 +80,3 @@ def report(self): created__lt=next_month, ).count() ) - - return [report] diff --git a/osf/metrics/utils.py b/osf/metrics/utils.py index 910b1f3104c..febfd24d6d2 100644 --- a/osf/metrics/utils.py +++ b/osf/metrics/utils.py @@ -58,6 +58,14 @@ def next(self) -> YearMonth: else YearMonth(self.year, self.month + 1) ) + def prior(self) -> YearMonth: + """get a new YearMonth for the month before this one""" + return ( + YearMonth(self.year - 1, int(calendar.DECEMBER)) + if self.month == calendar.JANUARY + else YearMonth(self.year, self.month - 1) + ) + def month_start(self) -> datetime.datetime: """get a datetime (in UTC timezone) when this YearMonth starts""" return datetime.datetime(self.year, self.month, 1, tzinfo=datetime.UTC) diff --git a/osf_tests/metrics/reporters/__init__.py b/osf_tests/metrics/reporters/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/osf_tests/metrics/reporters/_testutils.py b/osf_tests/metrics/reporters/_testutils.py new file mode 100644 index 00000000000..0d18f3bcac9 --- /dev/null +++ b/osf_tests/metrics/reporters/_testutils.py @@ -0,0 +1,10 @@ +from osf.metrics.reporters._base import MonthlyReporter +from osf.metrics.reports import MonthlyReport + + +def list_monthly_reports(reporter: MonthlyReporter) -> list[MonthlyReport]: + _reports = ( + reporter.report(**_kwargs) + for _kwargs in reporter.iter_report_kwargs() + ) + return [_report for _report in _reports if (_report is not None)] diff --git a/osf_tests/metrics/reporters/test_institutional_summary_reporter.py b/osf_tests/metrics/reporters/test_institutional_summary_reporter.py index 715a2cd1553..05baa4d38e7 100644 --- a/osf_tests/metrics/reporters/test_institutional_summary_reporter.py +++ b/osf_tests/metrics/reporters/test_institutional_summary_reporter.py @@ -11,6 +11,7 @@ PreprintFactory, AuthUserFactory, ) +from ._testutils import list_monthly_reports class TestInstiSummaryMonthlyReporter(TestCase): @@ -78,7 +79,7 @@ def _create_active_user(cls, institution, date_confirmed): def test_report_generation(self): reporter = InstitutionalSummaryMonthlyReporter(self._yearmonth) - reports = list(reporter.report()) + reports = list_monthly_reports(reporter) self.assertEqual(len(reports), 1) report = reports[0] @@ -114,7 +115,7 @@ def test_report_generation_multiple_institutions(self): # Run the reporter for the current month (February 2018) reporter = InstitutionalSummaryMonthlyReporter(self._yearmonth) - reports = list(reporter.report()) + reports = list_monthly_reports(reporter) self.assertEqual(len(reports), 3) # Reports for self._institution, institution2, institution3 # Extract reports by institution @@ -263,7 +264,7 @@ def test_high_counts_multiple_institutions(self): if enable_benchmarking: reporter_start_time = time.time() reporter = InstitutionalSummaryMonthlyReporter(self._yearmonth) - reports = list(reporter.report()) + reports = list_monthly_reports(reporter) assert len(reports) == additional_institution_count + 1 if enable_benchmarking: diff --git a/osf_tests/metrics/reporters/test_institutional_users_reporter.py b/osf_tests/metrics/reporters/test_institutional_users_reporter.py index 876fd08cf9b..275fcb1e8a1 100644 --- a/osf_tests/metrics/reporters/test_institutional_users_reporter.py +++ b/osf_tests/metrics/reporters/test_institutional_users_reporter.py @@ -18,6 +18,7 @@ UserFactory, EmbargoFactory, ) +from ._testutils import list_monthly_reports def _patch_now(fakenow: datetime.datetime): @@ -67,24 +68,24 @@ def _assert_report_matches_setup(self, report: InstitutionalUserReport, setup: _ self.assertEqual(report.published_preprint_count, setup.published_preprint_count) def test_no_users(self): - _actual_reports = list(InstitutionalUsersReporter(self._yearmonth).report()) + _actual_reports = list_monthly_reports(InstitutionalUsersReporter(self._yearmonth)) self.assertEqual(_actual_reports, []) def test_one_user_with_nothing(self): self._user_setup_with_nothing.affiliate_user() - _reports = list(InstitutionalUsersReporter(self._yearmonth).report()) + _reports = list_monthly_reports(InstitutionalUsersReporter(self._yearmonth)) self.assertEqual(len(_reports), 1) self._assert_report_matches_setup(_reports[0], self._user_setup_with_nothing) def test_one_user_with_ones(self): self._user_setup_with_ones.affiliate_user() - _reports = list(InstitutionalUsersReporter(self._yearmonth).report()) + _reports = list_monthly_reports(InstitutionalUsersReporter(self._yearmonth)) self.assertEqual(len(_reports), 1) self._assert_report_matches_setup(_reports[0], self._user_setup_with_ones) def test_one_user_with_stuff_and_no_files(self): self._user_setup_with_stuff.affiliate_user() - _reports = list(InstitutionalUsersReporter(self._yearmonth).report()) + _reports = list_monthly_reports(InstitutionalUsersReporter(self._yearmonth)) self.assertEqual(len(_reports), 1) self._assert_report_matches_setup(_reports[0], self._user_setup_with_stuff) self.assertEqual(_reports[0].public_file_count, 2) # preprint 2 files @@ -96,7 +97,7 @@ def test_one_user_with_stuff_and_a_file(self): _project = _user.nodes.first() with _patch_now(self._now): create_test_file(target=_project, user=_user, size=37) - (_report,) = InstitutionalUsersReporter(self._yearmonth).report() + (_report,) = list_monthly_reports(InstitutionalUsersReporter(self._yearmonth)) self._assert_report_matches_setup(_report, self._user_setup_with_stuff) self.assertEqual(_report.public_file_count, 3) # 2 preprint files self.assertEqual(_report.storage_byte_count, 2711) # 2 preprint files @@ -113,7 +114,7 @@ def test_one_user_with_stuff_and_multiple_files(self): create_test_file(target=_component, user=_user, size=53, filename='bla') create_test_file(target=_component, user=_user, size=51, filename='blar') create_test_file(target=_component, user=_user, size=47, filename='blarg') - (_report,) = InstitutionalUsersReporter(self._yearmonth).report() + (_report,) = list_monthly_reports(InstitutionalUsersReporter(self._yearmonth)) self._assert_report_matches_setup(_report, self._user_setup_with_stuff) self.assertEqual(_report.public_file_count, 7) # 2 preprint files self.assertEqual(_report.storage_byte_count, 2935) # 2 preprint files + 37 + 73 + 53 + 51 + 47 @@ -130,7 +131,7 @@ def test_several_users(self): _setup.user._id: _setup for _setup in _setups } - _reports = list(InstitutionalUsersReporter(self._yearmonth).report()) + _reports = list_monthly_reports(InstitutionalUsersReporter(self._yearmonth)) self.assertEqual(len(_reports), len(_setup_by_userid)) for _actual_report in _reports: _setup = _setup_by_userid[_actual_report.user_id] diff --git a/osf_tests/metrics/reporters/test_public_item_usage_reporter.py b/osf_tests/metrics/reporters/test_public_item_usage_reporter.py index 454b8d6700d..b75c420b1a2 100644 --- a/osf_tests/metrics/reporters/test_public_item_usage_reporter.py +++ b/osf_tests/metrics/reporters/test_public_item_usage_reporter.py @@ -1,27 +1,48 @@ -from datetime import timedelta +from datetime import datetime, timedelta from operator import attrgetter from unittest import mock import pytest from osf.metrics.counted_usage import CountedAuthUsage +from osf.metrics.preprint_metrics import ( + PreprintDownload, + PreprintView, +) from osf.metrics.reporters.public_item_usage import PublicItemUsageReporter from osf.metrics.reports import PublicItemUsageReport from osf.metrics.utils import YearMonth +from osf import models as osfdb +from osf_tests import factories +from ._testutils import list_monthly_reports @pytest.mark.es_metrics +@pytest.mark.django_db class TestPublicItemUsageReporter: @pytest.fixture(autouse=True) - def _mocks(self): - with ( - # set a tiny page size to force aggregation pagination: - mock.patch('osf.metrics.reporters.public_item_usage._CHUNK_SIZE', 1), - # HACK: skip auto-filling fields from the database: - mock.patch('osf.models.base.Guid.load', return_value=None), - ): + def _patch_settings(self): + with mock.patch('website.settings.DOMAIN', 'http://osf.example'): yield + @pytest.fixture + def item0(self): + _item0 = factories.PreprintFactory(is_public=True) + _item0._id = 'item0' + return _item0 + + @pytest.fixture + def item1(self): + _item1 = factories.ProjectFactory(is_public=True) + _item1._id = 'item1' + return _item1 + + @pytest.fixture + def item2(self, item1): + _item2 = factories.ProjectFactory(is_public=True, parent=item1) + _item2._id = 'item2' + return _item2 + @pytest.fixture def ym_empty(self) -> YearMonth: return YearMonth(2012, 7) @@ -35,89 +56,87 @@ def ym_busy(self) -> YearMonth: return YearMonth(2023, 7) @pytest.fixture - def sparse_month_usage(self, ym_sparse): + def sparse_month_usage(self, ym_sparse, item0, item1, item2): # "sparse" month: # item0: 3 views, 0 downloads, 2 sessions # item1: 1 views, 1 download, 1 session (plus 1 view from child item2) # item2: 1 views, 0 downloads, 1 session _month_start = ym_sparse.month_start() _save_usage( + item0, timestamp=_month_start, - item_guid='item0', session_id='sesh0', action_labels=['view'], ) _save_usage( + item0, timestamp=_month_start + timedelta(minutes=2), - item_guid='item0', session_id='sesh0', action_labels=['view'], ) _save_usage( + item1, timestamp=_month_start + timedelta(minutes=3), - item_guid='item1', session_id='sesh0', action_labels=['download'], ) _save_usage( + item0, timestamp=_month_start + timedelta(days=17), - item_guid='item0', session_id='sesh1', action_labels=['view'], ) _save_usage( + item1, timestamp=_month_start + timedelta(days=17, minutes=3), - item_guid='item1', session_id='sesh1', action_labels=['view'], ) _save_usage( + item2, timestamp=_month_start + timedelta(days=17, minutes=5), - item_guid='item2', - surrounding_guids=['item1'], session_id='sesh1', action_labels=['view'], ) _save_usage( + item2, timestamp=_month_start + timedelta(days=17, minutes=11), - item_guid='item2', - surrounding_guids=['item1'], session_id='sesh1', action_labels=['download'], ) @pytest.fixture - def busy_month_item0(self, ym_busy): + def busy_month_item0(self, ym_busy, item0): # item0: 4 sessions, 4*7 views, 4*5 downloads _month_start = ym_busy.month_start() for _sesh in range(0, 4): _sesh_start = _month_start + timedelta(days=_sesh) for _minute in range(0, 7): _save_usage( + item0, timestamp=_sesh_start + timedelta(minutes=_minute), - item_guid='item0', session_id=f'sesh0{_sesh}', action_labels=['view'], ) for _minute in range(10, 15): _save_usage( + item0, timestamp=_sesh_start + timedelta(minutes=_minute), - item_guid='item0', session_id=f'sesh0{_sesh}', action_labels=['download'], ) @pytest.fixture - def busy_month_item1(self, ym_busy): - # item1: 10 sessions, 6*9 views, 5*7 downloads, 2 providers + def busy_month_item1(self, ym_busy, item1): + # item1: 10 sessions, 6*9 views, 5*7 downloads # (plus 11 views in 11 sessions from child item2) _month_start = ym_busy.month_start() for _sesh in range(0, 6): _sesh_start = _month_start + timedelta(days=_sesh) for _minute in range(0, 9): _save_usage( + item1, timestamp=_sesh_start + timedelta(minutes=_minute), - item_guid='item1', session_id=f'sesh1{_sesh}', action_labels=['view'], ) @@ -125,42 +144,39 @@ def busy_month_item1(self, ym_busy): _sesh_start = _month_start + timedelta(days=_sesh) for _minute in range(10, 17): _save_usage( + item1, timestamp=_sesh_start + timedelta(minutes=_minute), - item_guid='item1', session_id=f'sesh1{_sesh}', action_labels=['download'], - provider_id='prov1', # additional provider_id ) @pytest.fixture - def busy_month_item2(self, ym_busy): + def busy_month_item2(self, ym_busy, item2): # item2: 11 sessions, 11 views, 11 downloads (child of item1) _month_start = ym_busy.month_start() for _sesh in range(1, 12): _save_usage( + item2, timestamp=_month_start + timedelta(days=_sesh), - item_guid='item2', - surrounding_guids=['item1'], session_id=f'sesh2{_sesh}', action_labels=['view'], ) _save_usage( + item2, timestamp=_month_start + timedelta(days=_sesh, hours=_sesh), - item_guid='item2', - surrounding_guids=['item1'], session_id=f'sesh2{_sesh}', action_labels=['download'], ) def test_no_data(self, ym_empty): _reporter = PublicItemUsageReporter(ym_empty) - _empty = list(_reporter.report()) + _empty = list_monthly_reports(_reporter) assert _empty == [] - def test_reporter(self, ym_empty, ym_sparse, ym_busy, sparse_month_usage, busy_month_item0, busy_month_item1, busy_month_item2): - _empty = list(PublicItemUsageReporter(ym_empty).report()) - _sparse = list(PublicItemUsageReporter(ym_sparse).report()) - _busy = list(PublicItemUsageReporter(ym_busy).report()) + def test_reporter(self, ym_empty, ym_sparse, ym_busy, sparse_month_usage, busy_month_item0, busy_month_item1, busy_month_item2, item0): + _empty = list_monthly_reports(PublicItemUsageReporter(ym_empty)) + _sparse = list_monthly_reports(PublicItemUsageReporter(ym_sparse)) + _busy = list_monthly_reports(PublicItemUsageReporter(ym_busy)) # empty month: assert _empty == [] @@ -171,16 +187,16 @@ def test_reporter(self, ym_empty, ym_sparse, ym_busy, sparse_month_usage, busy_m # sparse-month item0 assert isinstance(_sparse_item0, PublicItemUsageReport) assert _sparse_item0.item_osfid == 'item0' - assert _sparse_item0.provider_id == ['prov0'] + assert _sparse_item0.provider_id == [item0.provider._id] assert _sparse_item0.platform_iri == ['http://osf.example'] assert _sparse_item0.view_count == 3 - assert _sparse_item0.view_session_count == 2 + assert _sparse_item0.view_session_count is None # no session count for preprints assert _sparse_item0.download_count == 0 - assert _sparse_item0.download_session_count == 0 + assert _sparse_item0.download_session_count is None # no session count for preprints # sparse-month item1 assert isinstance(_sparse_item1, PublicItemUsageReport) assert _sparse_item1.item_osfid == 'item1' - assert _sparse_item1.provider_id == ['prov0'] + assert _sparse_item1.provider_id == ['osf'] assert _sparse_item1.platform_iri == ['http://osf.example'] assert _sparse_item1.view_count == 2 # including item2 assert _sparse_item1.view_session_count == 1 # including item2 @@ -189,7 +205,7 @@ def test_reporter(self, ym_empty, ym_sparse, ym_busy, sparse_month_usage, busy_m # sparse-month item2 assert isinstance(_sparse_item1, PublicItemUsageReport) assert _sparse_item2.item_osfid == 'item2' - assert _sparse_item2.provider_id == ['prov0'] + assert _sparse_item2.provider_id == ['osf'] assert _sparse_item2.platform_iri == ['http://osf.example'] assert _sparse_item2.view_count == 1 assert _sparse_item2.view_session_count == 1 @@ -202,16 +218,16 @@ def test_reporter(self, ym_empty, ym_sparse, ym_busy, sparse_month_usage, busy_m # busy-month item0 assert isinstance(_busy_item0, PublicItemUsageReport) assert _busy_item0.item_osfid == 'item0' - assert _busy_item0.provider_id == ['prov0'] + assert _busy_item0.provider_id == [item0.provider._id] assert _busy_item0.platform_iri == ['http://osf.example'] assert _busy_item0.view_count == 4 * 7 - assert _busy_item0.view_session_count == 4 + assert _busy_item0.view_session_count is None # no session count for preprints assert _busy_item0.download_count == 4 * 5 - assert _busy_item0.download_session_count == 4 + assert _busy_item0.download_session_count is None # no session count for preprints # busy-month item1 assert isinstance(_busy_item1, PublicItemUsageReport) assert _busy_item1.item_osfid == 'item1' - assert _busy_item1.provider_id == ['prov0', 'prov1'] + assert _busy_item1.provider_id == ['osf'] assert _busy_item1.platform_iri == ['http://osf.example'] assert _busy_item1.view_count == 6 * 9 + 11 assert _busy_item1.view_session_count == 6 + 11 @@ -220,7 +236,7 @@ def test_reporter(self, ym_empty, ym_sparse, ym_busy, sparse_month_usage, busy_m # busy-month item2 assert isinstance(_busy_item2, PublicItemUsageReport) assert _busy_item2.item_osfid == 'item2' - assert _busy_item2.provider_id == ['prov0'] + assert _busy_item2.provider_id == ['osf'] assert _busy_item2.platform_iri == ['http://osf.example'] assert _busy_item2.view_count == 11 assert _busy_item2.view_session_count == 11 @@ -228,11 +244,41 @@ def test_reporter(self, ym_empty, ym_sparse, ym_busy, sparse_month_usage, busy_m assert _busy_item2.download_session_count == 11 -def _save_usage(**kwargs): - _kwargs = { # overridable defaults: +def _save_usage( + item, + *, + timestamp: datetime, + action_labels: list[str], + **kwargs, +): + _countedusage_kwargs = { + 'timestamp': timestamp, + 'item_guid': item._id, + 'action_labels': action_labels, 'platform_iri': 'http://osf.example', - 'item_public': True, - 'provider_id': 'prov0', **kwargs, } - CountedAuthUsage(**_kwargs).save(refresh=True) + CountedAuthUsage(**_countedusage_kwargs).save(refresh=True) + if isinstance(item, osfdb.Preprint): + if 'view' in action_labels: + _save_preprint_view(item, timestamp) + if 'download' in action_labels: + _save_preprint_download(item, timestamp) + + +def _save_preprint_view(preprint, timestamp): + PreprintView( + timestamp=timestamp, + count=1, + preprint_id=preprint._id, + provider_id=preprint.provider._id, + ).save(refresh=True) + + +def _save_preprint_download(preprint, timestamp): + PreprintDownload( + timestamp=timestamp, + count=1, + preprint_id=preprint._id, + provider_id=preprint.provider._id, + ).save(refresh=True) diff --git a/osf_tests/metrics/test_yearmonth.txt b/osf_tests/metrics/test_yearmonth.txt index 646c73c42f9..fae6b990c36 100644 --- a/osf_tests/metrics/test_yearmonth.txt +++ b/osf_tests/metrics/test_yearmonth.txt @@ -35,6 +35,13 @@ YearMonth(year=1491, month=12) >>> ym.next().next() YearMonth(year=1492, month=1) +`prior` method gives the prior year-month: +>>> ym = YearMonth(1492, 2) +>>> ym.prior() +YearMonth(year=1492, month=1) +>>> ym.prior().prior() +YearMonth(year=1491, month=12) + `month_start` method: >>> YearMonth(3333, 3).month_start() datetime.datetime(3333, 3, 1, 0, 0, tzinfo=datetime.timezone.utc)