Skip to content

Commit

Permalink
Improve celery retries
Browse files Browse the repository at this point in the history
  • Loading branch information
noliveleger committed Aug 1, 2024
1 parent 11a0322 commit 5943b24
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 198 deletions.
9 changes: 9 additions & 0 deletions kobo/apps/hook/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# coding: utf-8
from enum import Enum
from rest_framework import status


HOOK_LOG_FAILED = 0
Expand All @@ -16,3 +17,11 @@ class HookLogStatus(Enum):
KOBO_INTERNAL_ERROR_STATUS_CODE = None

SUBMISSION_PLACEHOLDER = '%SUBMISSION%'

# Status codes that trigger a retry
RETRIABLE_STATUS_CODES = [
# status.HTTP_429_TOO_MANY_REQUESTS,
status.HTTP_502_BAD_GATEWAY,
status.HTTP_503_SERVICE_UNAVAILABLE,
status.HTTP_504_GATEWAY_TIMEOUT,
]
3 changes: 3 additions & 0 deletions kobo/apps/hook/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

class HookRemoteServerDownError(Exception):
pass
65 changes: 23 additions & 42 deletions kobo/apps/hook/models/hook_log.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# coding: utf-8
from datetime import timedelta

import constance
Expand All @@ -17,39 +16,44 @@

class HookLog(models.Model):

hook = models.ForeignKey("Hook", related_name="logs", on_delete=models.CASCADE)
hook = models.ForeignKey(
"Hook", related_name="logs", on_delete=models.CASCADE
)
uid = KpiUidField(uid_prefix="hl")
submission_id = models.IntegerField(default=0, db_index=True) # `KoBoCAT.logger.Instance.id`
submission_id = models.IntegerField( # `KoboCAT.logger.Instance.id`
default=0, db_index=True
)
tries = models.PositiveSmallIntegerField(default=0)
status = models.PositiveSmallIntegerField(
choices=[[e.value, e.name.title()] for e in HookLogStatus],
default=HookLogStatus.PENDING.value
default=HookLogStatus.PENDING.value,
) # Could use status_code, but will speed-up queries
status_code = models.IntegerField(default=KOBO_INTERNAL_ERROR_STATUS_CODE, null=True, blank=True)
status_code = models.IntegerField(
default=KOBO_INTERNAL_ERROR_STATUS_CODE, null=True, blank=True
)
message = models.TextField(default="")
date_created = models.DateTimeField(auto_now_add=True)
date_modified = models.DateTimeField(auto_now_add=True)

class Meta:
ordering = ["-date_created"]
ordering = ['-date_created']

@property
def can_retry(self) -> bool:
"""
Return whether instance can be resent to external endpoint.
Notice: even if False is returned, `self.retry()` can be triggered.
"""
if self.hook.active:
seconds = HookLog.get_elapsed_seconds(
constance.config.HOOK_MAX_RETRIES
)
threshold = timezone.now() - timedelta(seconds=seconds)
# We can retry only if system has already tried 3 times.
# If log is still pending after 3 times, there was an issue,
# we allow the retry
return (
self.status == HOOK_LOG_FAILED
or (self.date_modified < threshold and self.status == HOOK_LOG_PENDING)
)
if self.tries >= constance.config.HOOK_MAX_RETRIES:
# If log is still pending after `constance.config.HOOK_MAX_RETRIES`
# times, there was an issue, we allow the retry.
threshold = timezone.now() - timedelta(seconds=120)

return self.status == HOOK_LOG_FAILED or (
self.date_modified < threshold
and self.status == HOOK_LOG_PENDING
)

return False

Expand All @@ -66,29 +70,6 @@ def change_status(

self.save(reset_status=True)

@staticmethod
def get_elapsed_seconds(retries_count: int) -> int:
"""
Calculate number of elapsed seconds since first try.
Return the number of seconds.
"""
# We need to sum all seconds between each retry
seconds = 0
for retries_count in range(retries_count):
# Range is zero-indexed
seconds += HookLog.get_remaining_seconds(retries_count)

return seconds

@staticmethod
def get_remaining_seconds(retries_count):
"""
Calculate number of remaining seconds before next retry
:param retries_count: int.
:return: int. Number of seconds
"""
return 60 * (10 ** retries_count)

def retry(self):
"""
Retries to send data to external service
Expand All @@ -100,7 +81,7 @@ def retry(self):
service_definition.send()
self.refresh_from_db()
except Exception as e:
logging.error("HookLog.retry - {}".format(str(e)), exc_info=True)
logging.error('HookLog.retry - {}'.format(str(e)), exc_info=True)
self.change_status(HOOK_LOG_FAILED)
return False

Expand All @@ -110,7 +91,7 @@ def save(self, *args, **kwargs):
# Update date_modified each time object is saved
self.date_modified = timezone.now()
# We don't want to alter tries when we only change the status
if kwargs.pop("reset_status", False) is False:
if kwargs.pop('reset_status', False) is False:
self.tries += 1
self.hook.reset_totals()
super().save(*args, **kwargs)
Expand Down
214 changes: 126 additions & 88 deletions kobo/apps/hook/models/service_definition_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
HOOK_LOG_SUCCESS,
HOOK_LOG_FAILED,
KOBO_INTERNAL_ERROR_STATUS_CODE,
RETRIABLE_STATUS_CODES,
)
from ..exceptions import HookRemoteServerDownError


class ServiceDefinitionInterface(metaclass=ABCMeta):
Expand All @@ -41,7 +43,8 @@ def _get_data(self):
'service_json.ServiceDefinition._get_data: '
f'Hook #{self._hook.uid} - Data #{self._submission_id} - '
f'{str(e)}',
exc_info=True)
exc_info=True,
)
return None

@abstractmethod
Expand Down Expand Up @@ -71,106 +74,141 @@ def _prepare_request_kwargs(self):
"""
pass

def send(self):
def send(self) -> bool:
"""
Sends data to external endpoint
:return: bool
Sends data to external endpoint.
Raise an exception if something is wrong. Retries are only allowed
when `HookRemoteServerDownError` is raised.
"""

success = False
if not self._data:
self.save_log(
KOBO_INTERNAL_ERROR_STATUS_CODE, 'Submission has been deleted', allow_retries=False
)
return False

# Need to declare response before requests.post assignment in case of
# RequestException
response = None
if self._data:
try:
request_kwargs = self._prepare_request_kwargs()

# Add custom headers
request_kwargs.get("headers").update(
self._hook.settings.get("custom_headers", {}))

# Add user agent
public_domain = "- {} ".format(os.getenv("PUBLIC_DOMAIN_NAME")) \
if os.getenv("PUBLIC_DOMAIN_NAME") else ""
request_kwargs.get("headers").update({
"User-Agent": "KoboToolbox external service {}#{}".format(
public_domain,
self._hook.uid)
})

# If the request needs basic authentication with username and
# password, let's provide them
if self._hook.auth_level == Hook.BASIC_AUTH:
request_kwargs.update({
"auth": (self._hook.settings.get("username"),
self._hook.settings.get("password"))
})

ssrf_protect_options = {}
if constance.config.SSRF_ALLOWED_IP_ADDRESS.strip():
ssrf_protect_options['allowed_ip_addresses'] = constance.\
config.SSRF_ALLOWED_IP_ADDRESS.strip().split('\r\n')

if constance.config.SSRF_DENIED_IP_ADDRESS.strip():
ssrf_protect_options['denied_ip_addresses'] = constance.\
config.SSRF_DENIED_IP_ADDRESS.strip().split('\r\n')

SSRFProtect.validate(self._hook.endpoint,
options=ssrf_protect_options)

response = requests.post(self._hook.endpoint, timeout=30,
**request_kwargs)
response.raise_for_status()
self.save_log(response.status_code, response.text, True)
success = True
except requests.exceptions.RequestException as e:
# If request fails to communicate with remote server.
# Exception is raised before request.post can return something.
# Thus, response equals None
status_code = KOBO_INTERNAL_ERROR_STATUS_CODE
text = str(e)
if response is not None:
text = response.text
status_code = response.status_code
self.save_log(status_code, text)
except SSRFProtectException as e:
logging.error(
'service_json.ServiceDefinition.send: '
f'Hook #{self._hook.uid} - '
f'Data #{self._submission_id} - '
f'{str(e)}',
exc_info=True)
self.save_log(
KOBO_INTERNAL_ERROR_STATUS_CODE,
f'{self._hook.endpoint} is not allowed')
except Exception as e:
logging.error(
'service_json.ServiceDefinition.send: '
f'Hook #{self._hook.uid} - '
f'Data #{self._submission_id} - '
f'{str(e)}',
exc_info=True)
self.save_log(
KOBO_INTERNAL_ERROR_STATUS_CODE,
"An error occurred when sending data to external endpoint")
else:
self.save_log(
KOBO_INTERNAL_ERROR_STATUS_CODE,
'Submission has been deleted'
try:
request_kwargs = self._prepare_request_kwargs()

# Add custom headers
request_kwargs.get('headers').update(
self._hook.settings.get('custom_headers', {})
)

return success
# Add user agent
public_domain = (
'- {} '.format(os.getenv('PUBLIC_DOMAIN_NAME'))
if os.getenv('PUBLIC_DOMAIN_NAME')
else ''
)
request_kwargs.get('headers').update(
{
'User-Agent': 'KoboToolbox external service {}#{}'.format(
public_domain, self._hook.uid
)
}
)

def save_log(self, status_code: int, message: str, success: bool = False):
# If the request needs basic authentication with username and
# password, let's provide them
if self._hook.auth_level == Hook.BASIC_AUTH:
request_kwargs.update(
{
'auth': (
self._hook.settings.get('username'),
self._hook.settings.get('password'),
)
}
)

ssrf_protect_options = {}
if constance.config.SSRF_ALLOWED_IP_ADDRESS.strip():
ssrf_protect_options[
'allowed_ip_addresses'
] = constance.config.SSRF_ALLOWED_IP_ADDRESS.strip().split(
'\r\n'
)

if constance.config.SSRF_DENIED_IP_ADDRESS.strip():
ssrf_protect_options[
'denied_ip_addresses'
] = constance.config.SSRF_DENIED_IP_ADDRESS.strip().split(
'\r\n'
)

SSRFProtect.validate(
self._hook.endpoint, options=ssrf_protect_options
)

response = requests.post(
self._hook.endpoint, timeout=30, **request_kwargs
)
response.raise_for_status()
self.save_log(response.status_code, response.text, success=True)

return True

except requests.exceptions.RequestException as e:
# If request fails to communicate with remote server.
# Exception is raised before request.post can return something.
# Thus, response equals None
status_code = KOBO_INTERNAL_ERROR_STATUS_CODE
text = str(e)
if response is not None:
text = response.text
status_code = response.status_code

if status_code in RETRIABLE_STATUS_CODES:
self.save_log(status_code, text, allow_retries=True)
raise HookRemoteServerDownError

self.save_log(status_code, text)
raise
except SSRFProtectException as e:
logging.error(
'service_json.ServiceDefinition.send: '
f'Hook #{self._hook.uid} - '
f'Data #{self._submission_id} - '
f'{str(e)}',
exc_info=True,
)
self.save_log(
KOBO_INTERNAL_ERROR_STATUS_CODE,
f'{self._hook.endpoint} is not allowed'
)
raise
except Exception as e:
logging.error(
'service_json.ServiceDefinition.send: '
f'Hook #{self._hook.uid} - '
f'Data #{self._submission_id} - '
f'{str(e)}',
exc_info=True,
)
self.save_log(
KOBO_INTERNAL_ERROR_STATUS_CODE,
'An error occurred when sending '
f'data to external endpoint: {str(e)}',
)
raise

def save_log(
self,
status_code: int,
message: str,
success: bool = False,
allow_retries: bool = False,
):
"""
Updates/creates log entry with:
- `status_code` as the HTTP status code of the remote server response
- `message` as the content of the remote server response
"""
fields = {
'hook': self._hook,
'submission_id': self._submission_id
}
fields = {'hook': self._hook, 'submission_id': self._submission_id}
try:
# Try to load the log with a multiple field FK because
# we don't know the log `uid` in this context, but we do know
Expand All @@ -181,7 +219,7 @@ def save_log(self, status_code: int, message: str, success: bool = False):

if success:
log.status = HOOK_LOG_SUCCESS
elif log.tries >= constance.config.HOOK_MAX_RETRIES:
elif not allow_retries or log.tries >= constance.config.HOOK_MAX_RETRIES:
log.status = HOOK_LOG_FAILED

log.status_code = status_code
Expand Down
Loading

0 comments on commit 5943b24

Please sign in to comment.