diff --git a/waterbutler/providers/dropbox/provider.py b/waterbutler/providers/dropbox/provider.py index 6d100fbcc..dcaa19633 100644 --- a/waterbutler/providers/dropbox/provider.py +++ b/waterbutler/providers/dropbox/provider.py @@ -1,6 +1,7 @@ import json import typing import logging +import tempfile from http import HTTPStatus from waterbutler.core import provider, streams @@ -60,6 +61,7 @@ class DropboxProvider(provider.BaseProvider): BASE_URL = pd_settings.BASE_URL CONTIGUOUS_UPLOAD_SIZE_LIMIT = pd_settings.CONTIGUOUS_UPLOAD_SIZE_LIMIT CHUNK_SIZE = pd_settings.CHUNK_SIZE + MAX_429_RETRIES = pd_settings.MAX_429_RETRIES def __init__(self, auth, credentials, settings): super().__init__(auth, credentials, settings) @@ -281,29 +283,62 @@ async def _contiguous_upload(self, :param conflict: whether to replace upon conflict :rtype: `dict` :return: A dictionary of the metadata about the file just uploaded + + Quirks: Dropbox Rate Limiting + + Making requests to Dropbox API via OAuth2 may be rate-limited with a 429 response. The error message can be + "too_many_requests" which literally means too many request, or "too_many_write_operations" which means namespace + lock contentions has occurred. Both can be solved by retry the request after the time indicated in the response + header "Retry-After". In addition, Dropbox's rate-limiting algorithm is a black box. They also keep trying + different ones. This makes balancing the requests less effective. + + References: https://www.dropbox.com/developers/reference/data-ingress-guide + + Quirks: Retry Upload Load Requests + + When an upload request finishes, the stream will have been consumed. In order to retry such a request, WB needs + to cache a temporary local file from the incoming stream so both the initial request and retries can stream from + this file. """ path_arg = {"path": path.full_path} if conflict == 'replace': path_arg['mode'] = 'overwrite' - resp = await self.make_request( - 'POST', - self._build_content_url('files', 'upload'), - headers={ - 'Content-Type': 'application/octet-stream', - 'Dropbox-API-Arg': json.dumps(path_arg), - 'Content-Length': str(stream.size), - }, - data=stream, - expects=(200, 409,), - throws=core_exceptions.UploadError, - ) - - data = await resp.json() - if resp.status == 409: - self.dropbox_conflict_error_handler(data, path.path) - return data + file_cache = tempfile.TemporaryFile() + chunk = await stream.read() + while chunk: + file_cache.write(chunk) + chunk = await stream.read() + + rate_limit_retry = 0 + while rate_limit_retry < self.MAX_429_RETRIES: + file_stream = streams.FileStreamReader(file_cache) + resp = await self.make_request( + 'POST', + self._build_content_url('files', 'upload'), + headers={ + 'Content-Type': 'application/octet-stream', + 'Dropbox-API-Arg': json.dumps(path_arg), + 'Content-Length': str(file_stream.size), + }, + data=file_stream, + expects=(200, 409, 429, ), + throws=core_exceptions.UploadError, + ) + data = await resp.json() + if resp.status == 429: + rate_limit_retry += 1 + logger.debug('Retry {} for {}'.format(rate_limit_retry, str(path))) + continue + elif resp.status == 409: + file_cache.close() + self.dropbox_conflict_error_handler(data, path.path) + else: + file_cache.close() + return data + file_cache.close() + raise core_exceptions.UploadError(message='Upload failed for {} due to rate limiting'.format(str(path))) async def _chunked_upload(self, stream: streams.BaseStream, path: WaterButlerPath, conflict: str='replace') -> dict: diff --git a/waterbutler/providers/dropbox/settings.py b/waterbutler/providers/dropbox/settings.py index 4978a170e..192dacab6 100644 --- a/waterbutler/providers/dropbox/settings.py +++ b/waterbutler/providers/dropbox/settings.py @@ -10,3 +10,5 @@ CONTIGUOUS_UPLOAD_SIZE_LIMIT = int(config.get('CONTIGUOUS_UPLOAD_SIZE_LIMIT', 150000000)) # 150 MB CHUNK_SIZE = int(config.get('CHUNK_SIZE', 4000000)) # 4 MB + +MAX_429_RETRIES = int(config.get('MAX_429_RETRIES', 2))