Skip to content

Commit

Permalink
add scheduler task for monthly reporter tasks
Browse files Browse the repository at this point in the history
- update monthly reporters with `iter_report_kwargs`
  (mostly affects `PublicItemUsageReporter`, which was
   badly optimized to generate many reports at once)
- add `schedule_monthly_reporter` task that schedules
  tasks from `iter_report_kwargs` results
- change `MonthlyReporter.followup_task()` to run per-report
  • Loading branch information
aaxelb committed Nov 22, 2024
1 parent 1ea9756 commit 65eec01
Show file tree
Hide file tree
Showing 16 changed files with 417 additions and 335 deletions.
8 changes: 6 additions & 2 deletions admin/management/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from scripts.find_spammy_content import manage_spammy_content
from django.urls import reverse
from django.shortcuts import redirect
from osf.metrics.utils import YearMonth
from osf.models import Preprint, Node, Registration


Expand Down Expand Up @@ -122,8 +123,11 @@ def post(self, request, *args, **kwargs):
report_date = None

errors = monthly_reporters_go(
report_month=getattr(report_date, 'month', None),
report_year=getattr(report_date, 'year', None)
yearmonth=(
str(YearMonth.from_date(report_date))
if report_date is not None
else ''
),
)

if errors:
Expand Down
5 changes: 5 additions & 0 deletions osf/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,8 @@ switches:
- flag_name: ENABLE_INACTIVE_SCHEMAS
name: enable_inactive_schemas
note: This is no longer used

- flag_name: COUNTEDUSAGE_UNIFIED_METRICS_2024
name: countedusage_unified_metrics_2024
note: use only `osf.metrics.counted_usage`-based metrics where possible; un-use PageCounter, PreprintView, PreprintDownload, etc
active: false
125 changes: 90 additions & 35 deletions osf/management/commands/monthly_reporters_go.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,123 @@
import datetime
import logging

from django.core.management.base import BaseCommand
from django.db.utils import OperationalError
from django.utils import timezone
from django.db import OperationalError as DjangoOperationalError
from elasticsearch.exceptions import ConnectionError as ElasticConnectionError
from psycopg2 import OperationalError as PostgresOperationalError

from framework.celery_tasks import app as celery_app
from osf.metrics.reporters import AllMonthlyReporters
from osf.metrics.utils import YearMonth
from website.app import init_app


logger = logging.getLogger(__name__)


MAXMONTH = 12

_RETRY_ERRORS = (
DjangoOperationalError,
ElasticConnectionError,
PostgresOperationalError,
)

@celery_app.task(name='management.commands.monthly_reporters_go')
def monthly_reporters_go(report_year=None, report_month=None):
init_app() # OSF-specific setup

if report_year and report_month:
report_yearmonth = YearMonth(report_year, report_month)
else: # default to last month if year and month not provided
today = timezone.now().date()
report_yearmonth = YearMonth(
year=today.year if today.month > 1 else today.year - 1,
month=today.month - 1 or MAXMONTH,
)
for _reporter_key in AllMonthlyReporters.__members__.keys():
monthly_reporter_go.apply_async(kwargs={
def monthly_reporters_go(yearmonth: str = '', reporter_key: str = ''):
_yearmonth = (
YearMonth.from_str(yearmonth)
if yearmonth
else YearMonth.from_date(datetime.date.today()).prior() # default last month
)
_reporter_keys = (
[reporter_key]
if reporter_key
else _enum_names(AllMonthlyReporters)
)
for _reporter_key in _reporter_keys:
schedule_monthly_reporter.apply_async(kwargs={
'yearmonth': str(_yearmonth),
'reporter_key': _reporter_key,
'yearmonth': str(report_yearmonth),
})


@celery_app.task(name='management.commands.schedule_monthly_reporter')
def schedule_monthly_reporter(
yearmonth: str,
reporter_key: str,
continue_after: dict | None = None,
):
_reporter = _get_reporter(reporter_key, yearmonth)
_last_kwargs = None
_done = False
try:
for _kwargs in _reporter.iter_report_kwargs(continue_after=continue_after):
monthly_reporter_do.apply_async(kwargs={
'yearmonth': yearmonth,
'reporter_key': reporter_key,
'report_kwargs': _kwargs,
})
_last_kwargs = _kwargs
_done = True
except _RETRY_ERRORS:
if not _done and (_last_kwargs is not None):
schedule_monthly_reporter.apply_async(kwargs={
'yearmonth': yearmonth,
'reporter_key': reporter_key,
'continue_after': _last_kwargs,
})


@celery_app.task(
name='management.commands.monthly_reporter_go',
autoretry_for=(OperationalError,),
name='management.commands.monthly_reporter_do',
autoretry_for=(
DjangoOperationalError,
ElasticConnectionError,
PostgresOperationalError,
),
max_retries=5,
retry_backoff=True,
bind=True,
)
def monthly_reporter_go(task, reporter_key: str, yearmonth: str):
_reporter_class = AllMonthlyReporters[reporter_key].value
_reporter = _reporter_class(YearMonth.from_str(yearmonth))
_reporter.run_and_record_for_month()
_followup = _reporter.followup_task()
if _followup is not None:
_followup.apply_async()
def monthly_reporter_do(reporter_key: str, yearmonth: str, report_kwargs: dict):
_reporter = _get_reporter(reporter_key, yearmonth)
_report = _reporter.report(**report_kwargs)
if _report is not None:
_report.report_yearmonth = _reporter.yearmonth
_report.save()
_followup_task = _reporter.followup_task(_report)
if _followup_task is not None:
_followup_task.apply_async()


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
'yearmonth',
type=YearMonth.from_str,
default={'year': None, 'month': None},
type=str,
help='year and month (YYYY-MM)',
)
parser.add_argument(
'-r', '--reporter',
type=str,
choices={_name.lower() for _name in _enum_names(AllMonthlyReporters)},
default='',
help='name of the reporter to run (default all)',
)

def handle(self, *args, **options):
def handle(self, *args, **kwargs):
monthly_reporters_go(
report_year=getattr(options.get('yearmonth'), 'year', None),
report_month=getattr(options.get('yearmonth'), 'month', None),
yearmonth=kwargs['yearmonth'].upper(),
reporter_key=kwargs['reporter'].upper(),
)
self.stdout.write(self.style.SUCCESS('reporter tasks scheduled.'))
self.stdout.write(self.style.SUCCESS(
f'scheduling tasks for monthly reporter "{kwargs['reporter']}"...'
if kwargs['reporter']
else 'scheduling tasks for all monthly reporters...'
))


def _get_reporter(reporter_key: str, yearmonth: str):
_reporter_class = AllMonthlyReporters[reporter_key].value
return _reporter_class(YearMonth.from_str(yearmonth))


def _enum_names(enum_cls) -> list[str]:
return list(AllMonthlyReporters.__members__.keys())
4 changes: 2 additions & 2 deletions osf/metrics/preprint_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def record_for_preprint(cls, preprint, user=None, **kwargs):
)

@classmethod
def get_count_for_preprint(cls, preprint, after=None, before=None, index=None):
search = cls.search(after=after, before=before, index=index).filter('match', preprint_id=preprint._id)
def get_count_for_preprint(cls, preprint, after=None, before=None, index=None) -> int:
search = cls.search(index=index).filter('term', preprint_id=preprint._id)
timestamp = {}
if after:
timestamp['gte'] = after
Expand Down
19 changes: 9 additions & 10 deletions osf/metrics/reporters/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@
class MonthlyReporter:
yearmonth: YearMonth

def report(self) -> abc.Iterable[MonthlyReport] | abc.Iterator[MonthlyReport]:
def iter_report_kwargs(self, continue_after: dict | None = None) -> abc.Iterator[dict]:
# override for multiple reports per month
if continue_after is None:
yield {} # by default, calls `.report()` once with no kwargs

def report(self, **report_kwargs) -> MonthlyReport | None:
"""build a report for the given month
"""
raise NotImplementedError(f'{self.__name__} must implement `report`')

def run_and_record_for_month(self) -> None:
reports = self.report()
for report in reports:
report.report_yearmonth = self.yearmonth
report.save()
raise NotImplementedError(f'{self.__class__.__name__} must implement `report`')

def followup_task(self) -> celery.Signature | None:
def followup_task(self, report) -> celery.Signature | None:
return None


Expand All @@ -36,7 +35,7 @@ def report(self, report_date):
return an iterable of DailyReport (unsaved)
"""
raise NotImplementedError(f'{self.__name__} must implement `report`')
raise NotImplementedError(f'{self.__class__.__name__} must implement `report`')

def run_and_record_for_date(self, report_date):
reports = self.report(report_date)
Expand Down
13 changes: 10 additions & 3 deletions osf/metrics/reporters/institution_summary_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@
class InstitutionalSummaryMonthlyReporter(MonthlyReporter):
"""Generate an InstitutionMonthlySummaryReport for each institution."""

def report(self):
for institution in Institution.objects.all():
yield self.generate_report(institution)
def iter_report_kwargs(self, continue_after: dict | None = None):
_inst_qs = Institution.objects.order_by('pk')
if continue_after:
_inst_qs = _inst_qs.filter(pk__gt=continue_after['institution_pk'])
for _pk in _inst_qs.values_list('pk', flat=True):
yield {'institution_pk': _pk}

def report(self, **report_kwargs):
_institution = Institution.objects.get(pk=report_kwargs['institution_pk'])
return self.generate_report(_institution)

def generate_report(self, institution):
node_queryset = institution.nodes.filter(
Expand Down
30 changes: 23 additions & 7 deletions osf/metrics/reporters/institutional_users.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import dataclasses
import datetime

from django.contrib.contenttypes.models import ContentType
from django.db.models import Q, F, Sum
Expand All @@ -22,13 +21,27 @@ class InstitutionalUsersReporter(MonthlyReporter):
which offers institutional admins insight into how people at their institution are
using osf, based on their explicitly-affiliated osf objects
'''
def report(self):
def iter_report_kwargs(self, continue_after: dict | None = None):
_before_datetime = self.yearmonth.month_end()
for _institution in osfdb.Institution.objects.filter(created__lt=_before_datetime):
_inst_qs = (
osfdb.Institution.objects
.filter(created__lt=_before_datetime)
.order_by('pk')
)
if continue_after:
_inst_qs = _inst_qs.filter(pk__gte=continue_after['institution_pk'])
for _institution in _inst_qs:
_user_qs = _institution.get_institution_users().filter(created__lt=_before_datetime)
for _user in _user_qs.iterator(chunk_size=_CHUNK_SIZE):
_helper = _InstiUserReportHelper(_institution, _user, self.yearmonth, _before_datetime)
yield _helper.report
if continue_after and (_institution.pk == continue_after['institution_pk']):
_user_qs = _user_qs.filter(pk__gt=continue_after['user_pk'])
for _user_pk in _user_qs.values_list('pk', flat=True).iterator(chunk_size=_CHUNK_SIZE):
yield {'institution_pk': _institution.pk, 'user_pk': _user_pk}

def report(self, **report_kwargs):
_institution = osfdb.Institution.objects.get(pk=report_kwargs['institution_pk'])
_user = osfdb.OSFUser.objects.get(pk=report_kwargs['user_pk'])
_helper = _InstiUserReportHelper(_institution, _user, self.yearmonth)
return _helper.report


# helper
Expand All @@ -37,7 +50,6 @@ class _InstiUserReportHelper:
institution: osfdb.Institution
user: osfdb.OSFUser
yearmonth: YearMonth
before_datetime: datetime.datetime
report: InstitutionalUserReport = dataclasses.field(init=False)

def __post_init__(self):
Expand All @@ -64,6 +76,10 @@ def __post_init__(self):
storage_byte_count=self._storage_byte_count(),
)

@property
def before_datetime(self):
return self.yearmonth.month_end()

def _node_queryset(self):
_institution_node_qs = self.institution.nodes.filter(
created__lt=self.before_datetime,
Expand Down
Loading

0 comments on commit 65eec01

Please sign in to comment.