diff --git a/osf/metrics/reporters/public_item_usage.py b/osf/metrics/reporters/public_item_usage.py index 46da74c9e3d..42c53099587 100644 --- a/osf/metrics/reporters/public_item_usage.py +++ b/osf/metrics/reporters/public_item_usage.py @@ -1,7 +1,10 @@ from __future__ import annotations import datetime +import typing import waffle +if typing.TYPE_CHECKING: + import elasticsearch_dsl as edsl import osf.features from osf.metadata.osf_gathering import OsfmapPartition @@ -21,6 +24,8 @@ from ._base import MonthlyReporter +_CHUNK_SIZE = 500 + _MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control @@ -34,22 +39,13 @@ class PublicItemUsageReporter(MonthlyReporter): includes projects, project components, registrations, registration components, and preprints ''' def iter_report_kwargs(self, continue_after: dict | None = None): - _osfid_qs = ( - osfdb.Guid.objects - # try to use the index on ('content_type', 'object_id', 'created') - # (will keep only the oldest-created for each referent, which is fine) - .order_by('content_type_id', 'object_id', 'created') - .distinct('content_type_id', 'object_id') - ) - if continue_after: - _created_after = datetime.datetime.fromisoformat(continue_after['_osfid_created']) - _osfid_qs = ( - _osfid_qs - .filter(created__gte=_created_after) - .exclude(_id=continue_after['osfid']) - ) - for _osfid, _created in _osfid_qs.values_list('_id', 'created').iterator(): - yield {'osfid': _osfid, '_osfid_created': _created.isoformat()} + _after_osfid = continue_after['osfid'] if continue_after else None + for _osfid in _zip_sorted( + self._countedusage_osfids(_after_osfid), + self._preprintview_osfids(_after_osfid), + self._preprintdownload_osfids(_after_osfid), + ): + yield {'osfid': _osfid} def report(self, **report_kwargs): _osfid = report_kwargs['osfid'] @@ -89,6 +85,50 @@ def followup_task(self, report): countdown=30, # give index time to settle ) + def _countedusage_osfids(self, after_osfid: str | None) -> typing.Iterator[str]: + _search = self._base_usage_search() + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'item_guid'}}}], + size=_CHUNK_SIZE, + ) + return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + + def _preprintview_osfids(self, after_osfid: str | None) -> typing.Iterator[str]: + _search = ( + PreprintView.search() + .filter('range', timestamp={ + 'gte': self.yearmonth.month_start(), + 'lt': self.yearmonth.month_end(), + }) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}], + size=_CHUNK_SIZE, + ) + return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + + def _preprintdownload_osfids(self, after_osfid: str | None) -> typing.Iterator[str]: + _search = ( + PreprintDownload.search() + .filter('range', timestamp={ + 'gte': self.yearmonth.month_start(), + 'lt': self.yearmonth.month_end(), + }) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}], + size=_CHUNK_SIZE, + ) + return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + def _init_report(self, osf_obj) -> PublicItemUsageReport: if not _is_item_public(osf_obj): raise _SkipItem @@ -208,3 +248,61 @@ def _is_item_public(osfid_referent) -> bool: if isinstance(osfid_referent, osfdb.Preprint): return bool(osfid_referent.verified_publishable) # quacks like Preprint return getattr(osfid_referent, 'is_public', False) # quacks like AbstractNode + + +def _zip_sorted( + *iterators: typing.Iterator[str], +) -> typing.Iterator[str]: + '''loop thru multiple iterators on sorted (ascending) sequences of strings + ''' + _nexts = { # holds the next value from each iterator, or None + _i: next(_iter, None) + for _i, _iter in enumerate(iterators) + } + while True: + _nonnull_nexts = [ + _next + for _next in _nexts.values() + if _next is not None + ] + if not _nonnull_nexts: + return # all done + _value = min(_nonnull_nexts) + yield _value + for _i, _iter in enumerate(iterators): + if _nexts[_i] == _value: + _nexts[_i] = next(_iter, None) + + +def _iter_composite_bucket_keys( + search: edsl.Search, + composite_agg_name: str, + composite_source_name: str, + after: str | None = None, +) -> typing.Iterator[str]: + '''iterate thru *all* buckets of a composite aggregation, requesting new pages as needed + + assumes the given search has a composite aggregation of the given name + with a single value source of the given name + + updates the search in-place for subsequent pages + ''' + if after is not None: + search.aggs[composite_agg_name].after = {composite_source_name: after} + while True: + _page_response = search.execute(ignore_cache=True) # reused search object has the previous page cached + try: + _agg_result = _page_response.aggregations[composite_agg_name] + except KeyError: + return # no data; all done + for _bucket in _agg_result.buckets: + _key = _bucket.key.to_dict() + assert set(_key.keys()) == {composite_source_name}, f'expected only one key ("{composite_source_name}") in {_bucket.key}' + yield _key[composite_source_name] + # update the search for the next page + try: + _next_after = _agg_result.after_key + except AttributeError: + return # all done + else: + search.aggs[composite_agg_name].after = _next_after