diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index bb5d80f6f..3a3fa696e 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -100,27 +100,29 @@ def check_job_status(): select from jobs where job_status == 'in progress' - and processing started between 30 and 35 minutes ago + and processing started some time ago OR where the job_status == 'pending' - and the job scheduled_for timestamp is between 30 and 35 minutes ago. + and the job scheduled_for timestamp is some time ago. if any results then update the job_status to 'error' process the rows in the csv that are missing (in another task) just do the check here. """ - thirty_minutes_ago = utc_now() - timedelta(minutes=30) - thirty_five_minutes_ago = utc_now() - timedelta(minutes=35) + START_MINUTES = 245 + END_MINUTES = 240 + end_minutes_ago = utc_now() - timedelta(minutes=END_MINUTES) + start_minutes_ago = utc_now() - timedelta(minutes=START_MINUTES) incomplete_in_progress_jobs = Job.query.filter( Job.job_status == JobStatus.IN_PROGRESS, - between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago), + between(Job.processing_started, start_minutes_ago, end_minutes_ago), ) incomplete_pending_jobs = Job.query.filter( Job.job_status == JobStatus.PENDING, Job.scheduled_for.isnot(None), - between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago), + between(Job.scheduled_for, start_minutes_ago, end_minutes_ago), ) - jobs_not_complete_after_30_minutes = ( + jobs_not_complete_after_allotted_time = ( incomplete_in_progress_jobs.union(incomplete_pending_jobs) .order_by(Job.processing_started, Job.scheduled_for) .all() @@ -129,7 +131,7 @@ def check_job_status(): # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks # if they haven't been re-processed in time. job_ids = [] - for job in jobs_not_complete_after_30_minutes: + for job in jobs_not_complete_after_allotted_time: job.job_status = JobStatus.ERROR dao_update_job(job) job_ids.append(str(job.id)) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index cbb9e22b6..3743aa294 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -24,7 +24,6 @@ from app.errors import TotalRequestsError from app.notifications.process_notifications import ( get_notification, - notification_exists, persist_notification, ) from app.notifications.validators import check_service_over_total_message_limit @@ -214,9 +213,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i f"service not allowed to send for job_id {notification.get('job', None)}, aborting" ) ) - current_app.logger.debug( - "SMS {} failed as restricted service".format(notification_id) - ) + current_app.logger.debug(f"SMS {notification_id} failed as restricted service") return try: @@ -244,11 +241,12 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i reply_to_text=reply_to_text, ) except IntegrityError: - if notification_exists(notification_id): - saved_notification = get_notification(notification_id) - - else: - raise + current_app.logger.warning( + f"{NotificationType.SMS}: {notification_id} already exists." + ) + # If we don't have the return statement here, we will fall through and end + # up retrying because IntegrityError is a subclass of SQLAlchemyError + return # Kick off sns process in provider_tasks.py sn = saved_notification diff --git a/app/service_invite/rest.py b/app/service_invite/rest.py index 38bc1c404..e375b93a5 100644 --- a/app/service_invite/rest.py +++ b/app/service_invite/rest.py @@ -54,7 +54,7 @@ def _create_service_invite(invited_user, nonce, state): data["invited_user_email"] = invited_user.email_address invite_redis_key = f"invite-data-{unquote(state)}" - redis_store.set(invite_redis_key, get_user_data_url_safe(data)) + redis_store.set(invite_redis_key, get_user_data_url_safe(data), ex=2 * 24 * 60 * 60) url = os.environ["LOGIN_DOT_GOV_REGISTRATION_URL"] diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index ae9ea571d..f436aacf2 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -23,6 +23,8 @@ from tests.app import load_example_csv from tests.app.db import create_job, create_notification, create_template +CHECK_JOB_STATUS_TOO_OLD_MINUTES = 241 + def test_should_call_delete_codes_on_delete_verify_codes_task( notify_db_session, mocker @@ -108,8 +110,9 @@ def test_check_job_status_task_calls_process_incomplete_jobs(mocker, sample_temp job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) create_notification(template=sample_template, job=job) @@ -125,9 +128,10 @@ def test_check_job_status_task_calls_process_incomplete_jobs_when_scheduled_job_ job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) check_job_status() @@ -142,8 +146,8 @@ def test_check_job_status_task_calls_process_incomplete_jobs_for_pending_schedul job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.PENDING, ) @@ -175,17 +179,19 @@ def test_check_job_status_task_calls_process_incomplete_jobs_for_multiple_jobs( job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) job_2 = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) check_job_status() @@ -200,23 +206,24 @@ def test_check_job_status_task_only_sends_old_tasks(mocker, sample_template): job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=29), + created_at=utc_now() - timedelta(minutes=300), + processing_started=utc_now() - timedelta(minutes=239), job_status=JobStatus.IN_PROGRESS, ) create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=50), - scheduled_for=utc_now() - timedelta(minutes=29), + created_at=utc_now() - timedelta(minutes=300), + scheduled_for=utc_now() - timedelta(minutes=239), job_status=JobStatus.PENDING, ) check_job_status() @@ -230,16 +237,17 @@ def test_check_job_status_task_sets_jobs_to_error(mocker, sample_template): job = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.IN_PROGRESS, ) job_2 = create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=29), + created_at=utc_now() - timedelta(minutes=300), + processing_started=utc_now() - timedelta(minutes=239), job_status=JobStatus.IN_PROGRESS, ) check_job_status() @@ -311,16 +319,18 @@ def test_check_job_status_task_does_not_raise_error(sample_template): create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(hours=2), - scheduled_for=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(hours=5), + scheduled_for=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.FINISHED, ) create_job( template=sample_template, notification_count=3, - created_at=utc_now() - timedelta(minutes=31), - processing_started=utc_now() - timedelta(minutes=31), + created_at=utc_now() - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), + processing_started=utc_now() + - timedelta(minutes=CHECK_JOB_STATUS_TOO_OLD_MINUTES), job_status=JobStatus.FINISHED, )