Skip to content

Commit

Permalink
use composite aggs for iterating all
Browse files Browse the repository at this point in the history
  • Loading branch information
aaxelb committed Nov 22, 2024
1 parent 65eec01 commit 4fcfa3b
Showing 1 changed file with 114 additions and 16 deletions.
130 changes: 114 additions & 16 deletions osf/metrics/reporters/public_item_usage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations
import datetime
import typing

import waffle
if typing.TYPE_CHECKING:
import elasticsearch_dsl as edsl

import osf.features
from osf.metadata.osf_gathering import OsfmapPartition
Expand All @@ -21,6 +24,8 @@
from ._base import MonthlyReporter


_CHUNK_SIZE = 500

_MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control


Expand All @@ -34,22 +39,13 @@ class PublicItemUsageReporter(MonthlyReporter):
includes projects, project components, registrations, registration components, and preprints
'''
def iter_report_kwargs(self, continue_after: dict | None = None):
_osfid_qs = (
osfdb.Guid.objects
# try to use the index on ('content_type', 'object_id', 'created')
# (will keep only the oldest-created for each referent, which is fine)
.order_by('content_type_id', 'object_id', 'created')
.distinct('content_type_id', 'object_id')
)
if continue_after:
_created_after = datetime.datetime.fromisoformat(continue_after['_osfid_created'])
_osfid_qs = (
_osfid_qs
.filter(created__gte=_created_after)
.exclude(_id=continue_after['osfid'])
)
for _osfid, _created in _osfid_qs.values_list('_id', 'created').iterator():
yield {'osfid': _osfid, '_osfid_created': _created.isoformat()}
_after_osfid = continue_after['osfid'] if continue_after else None
for _osfid in _zip_sorted(
self._countedusage_osfids(_after_osfid),
self._preprintview_osfids(_after_osfid),
self._preprintdownload_osfids(_after_osfid),
):
yield {'osfid': _osfid}

def report(self, **report_kwargs):
_osfid = report_kwargs['osfid']
Expand Down Expand Up @@ -89,6 +85,50 @@ def followup_task(self, report):
countdown=30, # give index time to settle
)

def _countedusage_osfids(self, after_osfid: str | None) -> typing.Iterator[str]:
_search = self._base_usage_search()
_search.aggs.bucket(
'agg_osfid',
'composite',
sources=[{'osfid': {'terms': {'field': 'item_guid'}}}],
size=_CHUNK_SIZE,
)
return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid)

def _preprintview_osfids(self, after_osfid: str | None) -> typing.Iterator[str]:
_search = (
PreprintView.search()
.filter('range', timestamp={
'gte': self.yearmonth.month_start(),
'lt': self.yearmonth.month_end(),
})
.extra(size=0) # only aggregations, no hits
)
_search.aggs.bucket(
'agg_osfid',
'composite',
sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}],
size=_CHUNK_SIZE,
)
return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid)

def _preprintdownload_osfids(self, after_osfid: str | None) -> typing.Iterator[str]:
_search = (
PreprintDownload.search()
.filter('range', timestamp={
'gte': self.yearmonth.month_start(),
'lt': self.yearmonth.month_end(),
})
.extra(size=0) # only aggregations, no hits
)
_search.aggs.bucket(
'agg_osfid',
'composite',
sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}],
size=_CHUNK_SIZE,
)
return _iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid)

def _init_report(self, osf_obj) -> PublicItemUsageReport:
if not _is_item_public(osf_obj):
raise _SkipItem
Expand Down Expand Up @@ -208,3 +248,61 @@ def _is_item_public(osfid_referent) -> bool:
if isinstance(osfid_referent, osfdb.Preprint):
return bool(osfid_referent.verified_publishable) # quacks like Preprint
return getattr(osfid_referent, 'is_public', False) # quacks like AbstractNode


def _zip_sorted(
*iterators: typing.Iterator[str],
) -> typing.Iterator[str]:
'''loop thru multiple iterators on sorted (ascending) sequences of strings
'''
_nexts = { # holds the next value from each iterator, or None
_i: next(_iter, None)
for _i, _iter in enumerate(iterators)
}
while True:
_nonnull_nexts = [
_next
for _next in _nexts.values()
if _next is not None
]
if not _nonnull_nexts:
return # all done
_value = min(_nonnull_nexts)
yield _value
for _i, _iter in enumerate(iterators):
if _nexts[_i] == _value:
_nexts[_i] = next(_iter, None)


def _iter_composite_bucket_keys(
search: edsl.Search,
composite_agg_name: str,
composite_source_name: str,
after: str | None = None,
) -> typing.Iterator[str]:
'''iterate thru *all* buckets of a composite aggregation, requesting new pages as needed
assumes the given search has a composite aggregation of the given name
with a single value source of the given name
updates the search in-place for subsequent pages
'''
if after is not None:
search.aggs[composite_agg_name].after = {composite_source_name: after}
while True:
_page_response = search.execute(ignore_cache=True) # reused search object has the previous page cached
try:
_agg_result = _page_response.aggregations[composite_agg_name]
except KeyError:
return # no data; all done
for _bucket in _agg_result.buckets:
_key = _bucket.key.to_dict()
assert set(_key.keys()) == {composite_source_name}, f'expected only one key ("{composite_source_name}") in {_bucket.key}'
yield _key[composite_source_name]
# update the search for the next page
try:
_next_after = _agg_result.after_key
except AttributeError:
return # all done
else:
search.aggs[composite_agg_name].after = _next_after

0 comments on commit 4fcfa3b

Please sign in to comment.