diff --git a/kobo/apps/hook/constants.py b/kobo/apps/hook/constants.py index 00998fc333..7b84fa03bb 100644 --- a/kobo/apps/hook/constants.py +++ b/kobo/apps/hook/constants.py @@ -1,5 +1,6 @@ # coding: utf-8 from enum import Enum +from rest_framework import status HOOK_LOG_FAILED = 0 @@ -16,3 +17,11 @@ class HookLogStatus(Enum): KOBO_INTERNAL_ERROR_STATUS_CODE = None SUBMISSION_PLACEHOLDER = '%SUBMISSION%' + +# Status codes that trigger a retry +RETRIABLE_STATUS_CODES = [ + # status.HTTP_429_TOO_MANY_REQUESTS, + status.HTTP_502_BAD_GATEWAY, + status.HTTP_503_SERVICE_UNAVAILABLE, + status.HTTP_504_GATEWAY_TIMEOUT, +] diff --git a/kobo/apps/hook/exceptions.py b/kobo/apps/hook/exceptions.py new file mode 100644 index 0000000000..1997f47c2e --- /dev/null +++ b/kobo/apps/hook/exceptions.py @@ -0,0 +1,3 @@ + +class HookRemoteServerDownError(Exception): + pass diff --git a/kobo/apps/hook/models/hook_log.py b/kobo/apps/hook/models/hook_log.py index 51c1a8a2f9..2cba30815f 100644 --- a/kobo/apps/hook/models/hook_log.py +++ b/kobo/apps/hook/models/hook_log.py @@ -1,4 +1,3 @@ -# coding: utf-8 from datetime import timedelta import constance @@ -17,39 +16,44 @@ class HookLog(models.Model): - hook = models.ForeignKey("Hook", related_name="logs", on_delete=models.CASCADE) + hook = models.ForeignKey( + "Hook", related_name="logs", on_delete=models.CASCADE + ) uid = KpiUidField(uid_prefix="hl") - submission_id = models.IntegerField(default=0, db_index=True) # `KoBoCAT.logger.Instance.id` + submission_id = models.IntegerField( # `KoboCAT.logger.Instance.id` + default=0, db_index=True + ) tries = models.PositiveSmallIntegerField(default=0) status = models.PositiveSmallIntegerField( choices=[[e.value, e.name.title()] for e in HookLogStatus], - default=HookLogStatus.PENDING.value + default=HookLogStatus.PENDING.value, ) # Could use status_code, but will speed-up queries - status_code = models.IntegerField(default=KOBO_INTERNAL_ERROR_STATUS_CODE, null=True, blank=True) + status_code = models.IntegerField( + default=KOBO_INTERNAL_ERROR_STATUS_CODE, null=True, blank=True + ) message = models.TextField(default="") date_created = models.DateTimeField(auto_now_add=True) date_modified = models.DateTimeField(auto_now_add=True) class Meta: - ordering = ["-date_created"] + ordering = ['-date_created'] + @property def can_retry(self) -> bool: """ Return whether instance can be resent to external endpoint. Notice: even if False is returned, `self.retry()` can be triggered. """ if self.hook.active: - seconds = HookLog.get_elapsed_seconds( - constance.config.HOOK_MAX_RETRIES - ) - threshold = timezone.now() - timedelta(seconds=seconds) - # We can retry only if system has already tried 3 times. - # If log is still pending after 3 times, there was an issue, - # we allow the retry - return ( - self.status == HOOK_LOG_FAILED - or (self.date_modified < threshold and self.status == HOOK_LOG_PENDING) - ) + if self.tries >= constance.config.HOOK_MAX_RETRIES: + # If log is still pending after `constance.config.HOOK_MAX_RETRIES` + # times, there was an issue, we allow the retry. + threshold = timezone.now() - timedelta(seconds=120) + + return self.status == HOOK_LOG_FAILED or ( + self.date_modified < threshold + and self.status == HOOK_LOG_PENDING + ) return False @@ -66,29 +70,6 @@ def change_status( self.save(reset_status=True) - @staticmethod - def get_elapsed_seconds(retries_count: int) -> int: - """ - Calculate number of elapsed seconds since first try. - Return the number of seconds. - """ - # We need to sum all seconds between each retry - seconds = 0 - for retries_count in range(retries_count): - # Range is zero-indexed - seconds += HookLog.get_remaining_seconds(retries_count) - - return seconds - - @staticmethod - def get_remaining_seconds(retries_count): - """ - Calculate number of remaining seconds before next retry - :param retries_count: int. - :return: int. Number of seconds - """ - return 60 * (10 ** retries_count) - def retry(self): """ Retries to send data to external service @@ -100,7 +81,7 @@ def retry(self): service_definition.send() self.refresh_from_db() except Exception as e: - logging.error("HookLog.retry - {}".format(str(e)), exc_info=True) + logging.error('HookLog.retry - {}'.format(str(e)), exc_info=True) self.change_status(HOOK_LOG_FAILED) return False @@ -110,7 +91,7 @@ def save(self, *args, **kwargs): # Update date_modified each time object is saved self.date_modified = timezone.now() # We don't want to alter tries when we only change the status - if kwargs.pop("reset_status", False) is False: + if kwargs.pop('reset_status', False) is False: self.tries += 1 self.hook.reset_totals() super().save(*args, **kwargs) diff --git a/kobo/apps/hook/models/service_definition_interface.py b/kobo/apps/hook/models/service_definition_interface.py index e721bf45b7..9b2bd1a095 100644 --- a/kobo/apps/hook/models/service_definition_interface.py +++ b/kobo/apps/hook/models/service_definition_interface.py @@ -15,7 +15,9 @@ HOOK_LOG_SUCCESS, HOOK_LOG_FAILED, KOBO_INTERNAL_ERROR_STATUS_CODE, + RETRIABLE_STATUS_CODES, ) +from ..exceptions import HookRemoteServerDownError class ServiceDefinitionInterface(metaclass=ABCMeta): @@ -41,7 +43,8 @@ def _get_data(self): 'service_json.ServiceDefinition._get_data: ' f'Hook #{self._hook.uid} - Data #{self._submission_id} - ' f'{str(e)}', - exc_info=True) + exc_info=True, + ) return None @abstractmethod @@ -71,106 +74,141 @@ def _prepare_request_kwargs(self): """ pass - def send(self): + def send(self) -> bool: """ - Sends data to external endpoint - :return: bool + Sends data to external endpoint. + + Raise an exception if something is wrong. Retries are only allowed + when `HookRemoteServerDownError` is raised. """ - success = False + if not self._data: + self.save_log( + KOBO_INTERNAL_ERROR_STATUS_CODE, 'Submission has been deleted', allow_retries=False + ) + return False + # Need to declare response before requests.post assignment in case of # RequestException response = None - if self._data: - try: - request_kwargs = self._prepare_request_kwargs() - - # Add custom headers - request_kwargs.get("headers").update( - self._hook.settings.get("custom_headers", {})) - - # Add user agent - public_domain = "- {} ".format(os.getenv("PUBLIC_DOMAIN_NAME")) \ - if os.getenv("PUBLIC_DOMAIN_NAME") else "" - request_kwargs.get("headers").update({ - "User-Agent": "KoboToolbox external service {}#{}".format( - public_domain, - self._hook.uid) - }) - - # If the request needs basic authentication with username and - # password, let's provide them - if self._hook.auth_level == Hook.BASIC_AUTH: - request_kwargs.update({ - "auth": (self._hook.settings.get("username"), - self._hook.settings.get("password")) - }) - - ssrf_protect_options = {} - if constance.config.SSRF_ALLOWED_IP_ADDRESS.strip(): - ssrf_protect_options['allowed_ip_addresses'] = constance.\ - config.SSRF_ALLOWED_IP_ADDRESS.strip().split('\r\n') - - if constance.config.SSRF_DENIED_IP_ADDRESS.strip(): - ssrf_protect_options['denied_ip_addresses'] = constance.\ - config.SSRF_DENIED_IP_ADDRESS.strip().split('\r\n') - - SSRFProtect.validate(self._hook.endpoint, - options=ssrf_protect_options) - - response = requests.post(self._hook.endpoint, timeout=30, - **request_kwargs) - response.raise_for_status() - self.save_log(response.status_code, response.text, True) - success = True - except requests.exceptions.RequestException as e: - # If request fails to communicate with remote server. - # Exception is raised before request.post can return something. - # Thus, response equals None - status_code = KOBO_INTERNAL_ERROR_STATUS_CODE - text = str(e) - if response is not None: - text = response.text - status_code = response.status_code - self.save_log(status_code, text) - except SSRFProtectException as e: - logging.error( - 'service_json.ServiceDefinition.send: ' - f'Hook #{self._hook.uid} - ' - f'Data #{self._submission_id} - ' - f'{str(e)}', - exc_info=True) - self.save_log( - KOBO_INTERNAL_ERROR_STATUS_CODE, - f'{self._hook.endpoint} is not allowed') - except Exception as e: - logging.error( - 'service_json.ServiceDefinition.send: ' - f'Hook #{self._hook.uid} - ' - f'Data #{self._submission_id} - ' - f'{str(e)}', - exc_info=True) - self.save_log( - KOBO_INTERNAL_ERROR_STATUS_CODE, - "An error occurred when sending data to external endpoint") - else: - self.save_log( - KOBO_INTERNAL_ERROR_STATUS_CODE, - 'Submission has been deleted' + try: + request_kwargs = self._prepare_request_kwargs() + + # Add custom headers + request_kwargs.get('headers').update( + self._hook.settings.get('custom_headers', {}) ) - return success + # Add user agent + public_domain = ( + '- {} '.format(os.getenv('PUBLIC_DOMAIN_NAME')) + if os.getenv('PUBLIC_DOMAIN_NAME') + else '' + ) + request_kwargs.get('headers').update( + { + 'User-Agent': 'KoboToolbox external service {}#{}'.format( + public_domain, self._hook.uid + ) + } + ) - def save_log(self, status_code: int, message: str, success: bool = False): + # If the request needs basic authentication with username and + # password, let's provide them + if self._hook.auth_level == Hook.BASIC_AUTH: + request_kwargs.update( + { + 'auth': ( + self._hook.settings.get('username'), + self._hook.settings.get('password'), + ) + } + ) + + ssrf_protect_options = {} + if constance.config.SSRF_ALLOWED_IP_ADDRESS.strip(): + ssrf_protect_options[ + 'allowed_ip_addresses' + ] = constance.config.SSRF_ALLOWED_IP_ADDRESS.strip().split( + '\r\n' + ) + + if constance.config.SSRF_DENIED_IP_ADDRESS.strip(): + ssrf_protect_options[ + 'denied_ip_addresses' + ] = constance.config.SSRF_DENIED_IP_ADDRESS.strip().split( + '\r\n' + ) + + SSRFProtect.validate( + self._hook.endpoint, options=ssrf_protect_options + ) + + response = requests.post( + self._hook.endpoint, timeout=30, **request_kwargs + ) + response.raise_for_status() + self.save_log(response.status_code, response.text, success=True) + + return True + + except requests.exceptions.RequestException as e: + # If request fails to communicate with remote server. + # Exception is raised before request.post can return something. + # Thus, response equals None + status_code = KOBO_INTERNAL_ERROR_STATUS_CODE + text = str(e) + if response is not None: + text = response.text + status_code = response.status_code + + if status_code in RETRIABLE_STATUS_CODES: + self.save_log(status_code, text, allow_retries=True) + raise HookRemoteServerDownError + + self.save_log(status_code, text) + raise + except SSRFProtectException as e: + logging.error( + 'service_json.ServiceDefinition.send: ' + f'Hook #{self._hook.uid} - ' + f'Data #{self._submission_id} - ' + f'{str(e)}', + exc_info=True, + ) + self.save_log( + KOBO_INTERNAL_ERROR_STATUS_CODE, + f'{self._hook.endpoint} is not allowed' + ) + raise + except Exception as e: + logging.error( + 'service_json.ServiceDefinition.send: ' + f'Hook #{self._hook.uid} - ' + f'Data #{self._submission_id} - ' + f'{str(e)}', + exc_info=True, + ) + self.save_log( + KOBO_INTERNAL_ERROR_STATUS_CODE, + 'An error occurred when sending ' + f'data to external endpoint: {str(e)}', + ) + raise + + def save_log( + self, + status_code: int, + message: str, + success: bool = False, + allow_retries: bool = False, + ): """ Updates/creates log entry with: - `status_code` as the HTTP status code of the remote server response - `message` as the content of the remote server response """ - fields = { - 'hook': self._hook, - 'submission_id': self._submission_id - } + fields = {'hook': self._hook, 'submission_id': self._submission_id} try: # Try to load the log with a multiple field FK because # we don't know the log `uid` in this context, but we do know @@ -181,7 +219,7 @@ def save_log(self, status_code: int, message: str, success: bool = False): if success: log.status = HOOK_LOG_SUCCESS - elif log.tries >= constance.config.HOOK_MAX_RETRIES: + elif not allow_retries or log.tries >= constance.config.HOOK_MAX_RETRIES: log.status = HOOK_LOG_FAILED log.status_code = status_code diff --git a/kobo/apps/hook/tasks.py b/kobo/apps/hook/tasks.py index b1dfbf7a96..c87cd21076 100644 --- a/kobo/apps/hook/tasks.py +++ b/kobo/apps/hook/tasks.py @@ -9,37 +9,33 @@ from django.utils import translation, timezone from django_celery_beat.models import PeriodicTask +from kobo.celery import celery_app from kpi.utils.log import logging from .constants import HOOK_LOG_FAILED +from .exceptions import HookRemoteServerDownError from .models import Hook, HookLog - - -@shared_task(bind=True) -def service_definition_task(self, hook_id, submission_id): +from .utils.lazy import LazyMaxRetriesInt + + +@celery_app.task( + autoretry_for=(HookRemoteServerDownError,), + retry_backoff=60, + retry_backoff_max=1200, + max_retries=LazyMaxRetriesInt(), + retry_jitter=True, + queue='kpi_low_priority_queue', +) +def service_definition_task(hook_id: int, submission_id: int) -> bool: """ Tries to send data to the endpoint of the hook It retries n times (n = `constance.config.HOOK_MAX_RETRIES`) - - - after 1 minutes, - - after 10 minutes, - - after 100 minutes - etc ... - - :param self: Celery.Task. - :param hook_id: int. Hook PK - :param submission_id: int. Instance PK """ hook = Hook.objects.get(id=hook_id) # Use camelcase (even if it's not PEP-8 compliant) # because variable represents the class, not the instance. ServiceDefinition = hook.get_service_definition() # noqa service_definition = ServiceDefinition(hook, submission_id) - if not service_definition.send(): - # Countdown is in seconds - countdown = HookLog.get_remaining_seconds(self.request.retries) - raise self.retry(countdown=countdown, max_retries=constance.config.HOOK_MAX_RETRIES) - - return True + return service_definition.send() @shared_task diff --git a/kobo/apps/hook/tests/test_utils.py b/kobo/apps/hook/tests/test_utils.py index ad198a7b7c..94167f1886 100644 --- a/kobo/apps/hook/tests/test_utils.py +++ b/kobo/apps/hook/tests/test_utils.py @@ -3,7 +3,7 @@ from rest_framework import status from .hook_test_case import HookTestCase, MockSSRFProtect -from ..utils import HookUtils +from ..utils.services import call_services class HookUtilsTestCase(HookTestCase): @@ -29,7 +29,7 @@ def test_data_submission(self): submissions = self.asset.deployment.get_submissions(self.asset.owner) submission_id = submissions[0]['_id'] - assert HookUtils.call_services(self.asset.uid, submission_id) is True + assert call_services(self.asset.uid, submission_id) is True # Create second hook second_hook = self._create_hook( @@ -45,8 +45,8 @@ def test_data_submission(self): ) # Since second hook hasn't received the submission, `call_services` # should still return True - assert HookUtils.call_services(self.asset.uid, submission_id) is True + assert call_services(self.asset.uid, submission_id) is True # But if we try again, it should return False (we cannot send the same # submission twice to the same external endpoint). - assert HookUtils.call_services(self.asset.uid, submission_id) is False + assert call_services(self.asset.uid, submission_id) is False diff --git a/kobo/apps/hook/utils.py b/kobo/apps/hook/utils.py deleted file mode 100644 index 5bb6e1391c..0000000000 --- a/kobo/apps/hook/utils.py +++ /dev/null @@ -1,33 +0,0 @@ -# coding: utf-8 -from .models.hook import Hook -from .models.hook_log import HookLog -from .tasks import service_definition_task - - -class HookUtils: - - @staticmethod - def call_services(asset_uid: str, submission_id: int) -> bool: - """ - Delegates to Celery data submission to remote servers - """ - # Retrieve `Hook` ids, to send data to their respective endpoint. - hooks_ids = ( - Hook.objects.filter(asset__uid=asset_uid, active=True) - .values_list('id', flat=True) - .distinct() - ) - # At least, one of the hooks must not have a log that corresponds to - # `submission_id` - # to make success equal True - success = False - for hook_id in hooks_ids: - if not HookLog.objects.filter( - submission_id=submission_id, hook_id=hook_id - ).exists(): - success = True - service_definition_task.apply_async( - queue='kpi_low_priority_queue', args=(hook_id, submission_id) - ) - - return success diff --git a/kobo/apps/hook/utils/__init__.py b/kobo/apps/hook/utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/kobo/apps/hook/utils/lazy.py b/kobo/apps/hook/utils/lazy.py new file mode 100644 index 0000000000..58a64c9d1a --- /dev/null +++ b/kobo/apps/hook/utils/lazy.py @@ -0,0 +1,44 @@ +import constance + + +class LazyMaxRetriesInt: + """ + constance settings cannot be used as default parameters of a function. + This wrapper helps to return the value of `constance.config.HOOK_MAX_RETRIES` + on demand. + """ + def __call__(self, *args, **kwargs): + return constance.config.HOOK_MAX_RETRIES + + def __repr__(self): + return str(constance.config.HOOK_MAX_RETRIES) + + def __eq__(self, other): + if isinstance(other, int): + return self() == other + return NotImplemented + + def __ne__(self, other): + if isinstance(other, int): + return self() != other + return NotImplemented + + def __lt__(self, other): + if isinstance(other, int): + return self() < other + return NotImplemented + + def __le__(self, other): + if isinstance(other, int): + return self() <= other + return NotImplemented + + def __gt__(self, other): + if isinstance(other, int): + return self() > other + return NotImplemented + + def __ge__(self, other): + if isinstance(other, int): + return self() >= other + return NotImplemented diff --git a/kobo/apps/hook/utils/services.py b/kobo/apps/hook/utils/services.py new file mode 100644 index 0000000000..f0d05c7ad1 --- /dev/null +++ b/kobo/apps/hook/utils/services.py @@ -0,0 +1,27 @@ +from ..models.hook import Hook +from ..models.hook_log import HookLog +from ..tasks import service_definition_task + + +def call_services(asset_uid: str, submission_id: int) -> bool: + """ + Delegates to Celery data submission to remote servers + """ + # Retrieve `Hook` ids, to send data to their respective endpoint. + hooks_ids = ( + Hook.objects.filter(asset__uid=asset_uid, active=True) + .values_list('id', flat=True) + .distinct() + ) + # At least, one of the hooks must not have a log that corresponds to + # `submission_id` + # to make success equal True + success = False + + for hook_id in hooks_ids: + if not HookLog.objects.filter( + submission_id=submission_id, hook_id=hook_id + ).exists(): + success = True + service_definition_task.delay(hook_id, submission_id) + return success diff --git a/kobo/apps/hook/views/v2/hook.py b/kobo/apps/hook/views/v2/hook.py index 9c4f795b0d..1e2a975868 100644 --- a/kobo/apps/hook/views/v2/hook.py +++ b/kobo/apps/hook/views/v2/hook.py @@ -174,13 +174,20 @@ def retry(self, request, uid=None, *args, **kwargs): response = {"detail": t("Task successfully scheduled")} status_code = status.HTTP_200_OK if hook.active: - seconds = HookLog.get_elapsed_seconds(constance.config.HOOK_MAX_RETRIES) - threshold = timezone.now() - timedelta(seconds=seconds) - - records = hook.logs.filter(Q(date_modified__lte=threshold, - status=HOOK_LOG_PENDING) | - Q(status=HOOK_LOG_FAILED)). \ - values_list("id", "uid").distinct() + threshold = timezone.now() - timedelta(seconds=120) + + records = ( + hook.logs.filter( + Q( + date_modified__lte=threshold, + status=HOOK_LOG_PENDING, + tries__gte=constance.config.HOOK_MAX_RETRIES, + ) + | Q(status=HOOK_LOG_FAILED) + ) + .values_list('id', 'uid') + .distinct() + ) # Prepare lists of ids hooklogs_ids = [] hooklogs_uids = [] @@ -190,7 +197,9 @@ def retry(self, request, uid=None, *args, **kwargs): if len(records) > 0: # Mark all logs as PENDING - HookLog.objects.filter(id__in=hooklogs_ids).update(status=HOOK_LOG_PENDING) + HookLog.objects.filter(id__in=hooklogs_ids).update( + status=HOOK_LOG_PENDING + ) # Delegate to Celery retry_all_task.apply_async( queue='kpi_low_priority_queue', args=(hooklogs_ids,) diff --git a/kobo/apps/hook/views/v2/hook_log.py b/kobo/apps/hook/views/v2/hook_log.py index ab6e1e29c9..049047655c 100644 --- a/kobo/apps/hook/views/v2/hook_log.py +++ b/kobo/apps/hook/views/v2/hook_log.py @@ -108,7 +108,7 @@ def retry(self, request, uid=None, *args, **kwargs): status_code = status.HTTP_200_OK hook_log = self.get_object() - if hook_log.can_retry(): + if hook_log.can_retry: hook_log.change_status() success = hook_log.retry() if success: diff --git a/kobo/apps/openrosa/apps/viewer/models/parsed_instance.py b/kobo/apps/openrosa/apps/viewer/models/parsed_instance.py index 2427ab651e..4afd88a3f3 100644 --- a/kobo/apps/openrosa/apps/viewer/models/parsed_instance.py +++ b/kobo/apps/openrosa/apps/viewer/models/parsed_instance.py @@ -26,7 +26,7 @@ ) from kobo.apps.openrosa.libs.utils.decorators import apply_form_field_names from kobo.apps.openrosa.libs.utils.model_tools import queryset_iterator -from kobo.apps.hook.utils import HookUtils +from kobo.apps.hook.utils.services import call_services from kpi.utils.log import logging # this is Mongo Collection where we will store the parsed submissions @@ -380,7 +380,7 @@ def save(self, asynchronous=False, *args, **kwargs): f'ParsedInstance #: {self.pk} - XForm is not linked with Asset' ) else: - HookUtils.call_services(asset_uid, self.instance_id) + call_services(asset_uid, self.instance_id) return success diff --git a/kobo/settings/base.py b/kobo/settings/base.py index b0ba92ff27..715d2badc3 100644 --- a/kobo/settings/base.py +++ b/kobo/settings/base.py @@ -1198,7 +1198,7 @@ def dj_stripe_request_callback_method(): # http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#redis-visibility-timeout # TODO figure out how to pass `Constance.HOOK_MAX_RETRIES` or `HookLog.get_remaining_seconds() # Otherwise hardcode `HOOK_MAX_RETRIES` in Settings - "visibility_timeout": 60 * (10 ** 3) # Longest ETA for RestService (seconds) + "visibility_timeout": 60 * (10 ** 2) # Longest ETA for RestService (seconds) } CELERY_TASK_DEFAULT_QUEUE = "kpi_queue"