forked from CenterForOpenScience/osf.io
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add scheduler task for monthly reporter tasks
- update monthly reporters with `iter_report_kwargs` (mostly affects `PublicItemUsageReporter`, which was badly optimized to generate many reports at once) - add `schedule_monthly_reporter` task that schedules tasks from `iter_report_kwargs` results - change `MonthlyReporter.followup_task()` to run per-report
- Loading branch information
Showing
16 changed files
with
418 additions
and
334 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,68 +1,125 @@ | ||
import logging | ||
|
||
from django.core.management.base import BaseCommand | ||
from django.db.utils import OperationalError | ||
from django.db import OperationalError as DjangoOperationalError | ||
from django.utils import timezone | ||
from elasticsearch.exceptions import ConnectionError as ElasticConnectionError | ||
from psycopg2 import OperationalError as PostgresOperationalError | ||
|
||
from framework.celery_tasks import app as celery_app | ||
from osf.metrics.reporters import AllMonthlyReporters | ||
from osf.metrics.utils import YearMonth | ||
from website.app import init_app | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
MAXMONTH = 12 | ||
|
||
_RETRY_ERRORS = ( | ||
DjangoOperationalError, | ||
ElasticConnectionError, | ||
PostgresOperationalError, | ||
) | ||
|
||
@celery_app.task(name='management.commands.monthly_reporters_go') | ||
def monthly_reporters_go(report_year=None, report_month=None): | ||
init_app() # OSF-specific setup | ||
|
||
if report_year and report_month: | ||
report_yearmonth = YearMonth(report_year, report_month) | ||
else: # default to last month if year and month not provided | ||
today = timezone.now().date() | ||
report_yearmonth = YearMonth( | ||
year=today.year if today.month > 1 else today.year - 1, | ||
month=today.month - 1 or MAXMONTH, | ||
) | ||
for _reporter_key in AllMonthlyReporters.__members__.keys(): | ||
monthly_reporter_go.apply_async(kwargs={ | ||
def monthly_reporters_go(yearmonth: str, reporter_key: str = ''): | ||
assert YearMonth.from_str(yearmonth) is not None | ||
_reporter_keys = ( | ||
[reporter_key] | ||
if reporter_key | ||
else _enum_names(AllMonthlyReporters) | ||
) | ||
for _reporter_key in _reporter_keys: | ||
schedule_monthly_reporter.apply_async(kwargs={ | ||
'reporter_key': _reporter_key, | ||
'yearmonth': str(report_yearmonth), | ||
'yearmonth': yearmonth, | ||
}) | ||
|
||
|
||
@celery_app.task(name='management.commands.schedule_monthly_reporter') | ||
def schedule_monthly_reporter( | ||
yearmonth: str, | ||
reporter_key: str, | ||
continue_after: dict | None = None, | ||
): | ||
_reporter = _get_reporter(reporter_key, yearmonth) | ||
_last_kwargs = None | ||
_done = False | ||
try: | ||
for _kwargs in _reporter.iter_report_kwargs(continue_after=continue_after): | ||
monthly_reporter_do.apply_async(kwargs={ | ||
'reporter_key': reporter_key, | ||
'yearmonth': yearmonth, | ||
'report_kwargs': _kwargs, | ||
}) | ||
_last_kwargs = _kwargs | ||
_done = True | ||
except _RETRY_ERRORS: | ||
if not _done and (_last_kwargs is not None): | ||
schedule_monthly_reporter.apply_async(kwargs={ | ||
'yearmonth': yearmonth, | ||
'reporter_key': reporter_key, | ||
'continue_after': _last_kwargs, | ||
}) | ||
|
||
|
||
@celery_app.task( | ||
name='management.commands.monthly_reporter_go', | ||
autoretry_for=(OperationalError,), | ||
name='management.commands.monthly_reporter_do', | ||
autoretry_for=( | ||
DjangoOperationalError, | ||
ElasticConnectionError, | ||
PostgresOperationalError, | ||
), | ||
max_retries=5, | ||
retry_backoff=True, | ||
bind=True, | ||
) | ||
def monthly_reporter_go(task, reporter_key: str, yearmonth: str): | ||
_reporter_class = AllMonthlyReporters[reporter_key].value | ||
_reporter = _reporter_class(YearMonth.from_str(yearmonth)) | ||
_reporter.run_and_record_for_month() | ||
_followup = _reporter.followup_task() | ||
if _followup is not None: | ||
_followup.apply_async() | ||
def monthly_reporter_do(reporter_key: str, yearmonth: str, report_kwargs: dict): | ||
_reporter = _get_reporter(reporter_key, yearmonth) | ||
_report = _reporter.report(**report_kwargs) | ||
if _report is not None: | ||
_report.report_yearmonth = _reporter.yearmonth | ||
_report.save() | ||
_followup_task = _reporter.followup_task(_report) | ||
if _followup_task is not None: | ||
_followup_task.apply_async() | ||
|
||
|
||
class Command(BaseCommand): | ||
def add_arguments(self, parser): | ||
parser.add_argument( | ||
'yearmonth', | ||
type=YearMonth.from_str, | ||
default={'year': None, 'month': None}, | ||
default=None, | ||
help='year and month (YYYY-MM)', | ||
) | ||
parser.add_argument( | ||
'-r', '--reporter', | ||
type=str, | ||
choices={_name.lower() for _name in _enum_names(AllMonthlyReporters)}, | ||
default='', | ||
help='name of the reporter to run (default all)', | ||
) | ||
|
||
def handle(self, *args, **options): | ||
monthly_reporters_go( | ||
report_year=getattr(options.get('yearmonth'), 'year', None), | ||
report_month=getattr(options.get('yearmonth'), 'month', None), | ||
def handle(self, *args, **kwargs): | ||
_yearmonth = ( | ||
kwargs.get('yearmonth') | ||
or YearMonth.from_date(timezone.now().date()).prior() # default last month | ||
) | ||
_reporter_key = kwargs.get('reporter', '').upper() | ||
monthly_reporters_go.delay( | ||
yearmonth=str(_yearmonth), | ||
reporter_key=_reporter_key, | ||
) | ||
self.stdout.write(self.style.SUCCESS('reporter tasks scheduled.')) | ||
self.stdout.write(self.style.SUCCESS( | ||
f'scheduling tasks for monthly reporter "{_reporter_key}"...' | ||
if _reporter_key | ||
else 'scheduling tasks for all monthly reporters...' | ||
)) | ||
|
||
|
||
def _get_reporter(reporter_key: str, yearmonth: str): | ||
_reporter_class = AllMonthlyReporters[reporter_key].value | ||
return _reporter_class(YearMonth.from_str(yearmonth)) | ||
|
||
|
||
def _enum_names(enum_cls) -> list[str]: | ||
return list(AllMonthlyReporters.__members__.keys()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.