diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index bb5d80f6f..3a3fa696e 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -100,27 +100,29 @@ def check_job_status(): select from jobs where job_status == 'in progress' - and processing started between 30 and 35 minutes ago + and processing started some time ago OR where the job_status == 'pending' - and the job scheduled_for timestamp is between 30 and 35 minutes ago. + and the job scheduled_for timestamp is some time ago. if any results then update the job_status to 'error' process the rows in the csv that are missing (in another task) just do the check here. """ - thirty_minutes_ago = utc_now() - timedelta(minutes=30) - thirty_five_minutes_ago = utc_now() - timedelta(minutes=35) + START_MINUTES = 245 + END_MINUTES = 240 + end_minutes_ago = utc_now() - timedelta(minutes=END_MINUTES) + start_minutes_ago = utc_now() - timedelta(minutes=START_MINUTES) incomplete_in_progress_jobs = Job.query.filter( Job.job_status == JobStatus.IN_PROGRESS, - between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago), + between(Job.processing_started, start_minutes_ago, end_minutes_ago), ) incomplete_pending_jobs = Job.query.filter( Job.job_status == JobStatus.PENDING, Job.scheduled_for.isnot(None), - between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago), + between(Job.scheduled_for, start_minutes_ago, end_minutes_ago), ) - jobs_not_complete_after_30_minutes = ( + jobs_not_complete_after_allotted_time = ( incomplete_in_progress_jobs.union(incomplete_pending_jobs) .order_by(Job.processing_started, Job.scheduled_for) .all() @@ -129,7 +131,7 @@ def check_job_status(): # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks # if they haven't been re-processed in time. job_ids = [] - for job in jobs_not_complete_after_30_minutes: + for job in jobs_not_complete_after_allotted_time: job.job_status = JobStatus.ERROR dao_update_job(job) job_ids.append(str(job.id)) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index ae9ea571d..f436aacf2 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -23,6 +23,8 @@ from tests.app import load_example_csv from tests.app.db import create_job, create_notification, create_template +CHECK_JOB_STATUS_TOO_OLD_MINUTES = 241 + def test_should_call_delete_codes_on_delete_verify_codes_task( notify_db_session, mocker @@ -108,8 +110,9 @@ def test_check_job_status_task_calls_process_incomplete_jobs(mocker, sample_temp job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) create_notification(template=sample_template, job=job) @@ -125,9 +128,10 @@ def test_check_job_status_task_calls_process_incomplete_jobs_when_scheduled_job_ job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) check_job_status() @@ -142,8 +146,8 @@ def test_check_job_status_task_calls_process_incomplete_jobs_for_pending_schedul job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.PENDING, ) @@ -175,17 +179,19 @@ def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs( job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) job_2 = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) check_job_status() @@ -200,23 +206,24 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=29), + created_at=utc_now() - timedelta(minutes=300), + processing_started=utc_now() - timedelta(minutes=239), job_status=JobStatus.IN_PROGRESS, ) create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=50), - scheduled_for=utc_now() - timedelta(minutes=29), + created_at=utc_now() - timedelta(minutes=300), + scheduled_for=utc_now() - timedelta(minutes=239), job_status=JobStatus.PENDING, ) check_job_status() @@ -230,16 +237,17 @@ def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template): job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) job_2 = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=29), + created_at=utc_now() - timedelta(minutes=300), + processing_started=utc_now() - timedelta(minutes=239), job_status=JobStatus.IN_PROGRESS, ) check_job_status() @@ -311,16 +319,18 @@ def test_check_job_status_task_does_not_raise_error(sample_template): create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.FINISHED, ) create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.FINISHED, )