From 21056edcb1c78021a4c4e90b41f3922f35ecdab0 Mon Sep 17 00:00:00 2001 From: TomBaxter Date: Mon, 22 May 2017 16:08:30 -0400 Subject: [PATCH] Proof of concept: Notify users after returning a 202 Accepted? Task returns "Content-Location" header along with 202 Content-Location points to new /pending end-point Upon task completion /pending end-point return 303 and "Content-Location" header Pending Content-Location header returns uri of created/updated resource. --- waterbutler/server/api/v1/core.py | 11 ++++- .../server/api/v1/provider/movecopy.py | 6 ++- waterbutler/server/app.py | 3 +- waterbutler/server/handlers.py | 40 +++++++++++++++++++ waterbutler/tasks/core.py | 4 +- waterbutler/tasks/settings.py | 18 +++++---- 6 files changed, 69 insertions(+), 13 deletions(-) diff --git a/waterbutler/server/api/v1/core.py b/waterbutler/server/api/v1/core.py index e4980e2e1..7db982e98 100644 --- a/waterbutler/server/api/v1/core.py +++ b/waterbutler/server/api/v1/core.py @@ -4,7 +4,7 @@ from raven.contrib.tornado import SentryMixin from waterbutler import tasks -from waterbutler.server import utils +from waterbutler.server import utils, settings from waterbutler.core import exceptions @@ -24,7 +24,16 @@ def write_error(self, status_code, exc_info): finish_args = [exc.data] if exc.data else [{'code': exc.code, 'message': exc.message}] elif issubclass(etype, tasks.WaitTimeOutError): self.set_status(202) + pending_url = '{}/pending/{}/{}'.format(settings.DOMAIN, exc.args[1], exc.args[0]) + self.add_header('Content-Location', pending_url) exception_kwargs = {'data': {'level': 'info'}} + finish_args = [{'data': {'attributes': {'status': 'Accepted', + 'id': exc.args[0], + 'resource': exc.args[1] + }, + 'links': {'status': pending_url} + } + }] else: finish_args = [{'code': status_code, 'message': self._reason}] diff --git a/waterbutler/server/api/v1/provider/movecopy.py b/waterbutler/server/api/v1/provider/movecopy.py index ec3b8c563..18410fe01 100644 --- a/waterbutler/server/api/v1/provider/movecopy.py +++ b/waterbutler/server/api/v1/provider/movecopy.py @@ -111,13 +111,15 @@ async def move_or_copy(self): if not getattr(self.provider, 'can_intra_' + action)(self.dest_provider, self.path): # this weird signature syntax courtesy of py3.4 not liking trailing commas on kwargs conflict = self.json.get('conflict', DEFAULT_CONFLICT) + task_args = self.build_args() result = await getattr(tasks, action).adelay( rename=self.json.get('rename'), conflict=conflict, request=remote_logging._serialize_request(self.request), - *self.build_args() + *task_args ) - metadata, created = await tasks.wait_on_celery(result) + metadata, created = await tasks.wait_on_celery(result, + result_resource=task_args[1]['nid']) else: metadata, created = ( await tasks.backgrounded( diff --git a/waterbutler/server/app.py b/waterbutler/server/app.py index 92d7de2c4..5c7920a2c 100644 --- a/waterbutler/server/app.py +++ b/waterbutler/server/app.py @@ -42,7 +42,8 @@ def make_app(debug): app = tornado.web.Application( api_to_handlers(v0) + api_to_handlers(v1) + - [(r'/status', handlers.StatusHandler)], + [(r'/status', handlers.StatusHandler), + (r'/pending/(?P(?:\w|\d)+)/(?P(?:\w|\d|-)+)', handlers.PendingHandler)], debug=debug, ) app.sentry_client = AsyncSentryClient(settings.SENTRY_DSN, release=waterbutler.__version__) diff --git a/waterbutler/server/handlers.py b/waterbutler/server/handlers.py index 20a604be4..b172570af 100644 --- a/waterbutler/server/handlers.py +++ b/waterbutler/server/handlers.py @@ -1,6 +1,12 @@ import tornado.web +from celery.result import AsyncResult +from celery import Celery import waterbutler +from waterbutler.tasks import settings as tasks_settings + +app = Celery() +app.config_from_object(tasks_settings) class StatusHandler(tornado.web.RequestHandler): @@ -11,3 +17,37 @@ def get(self): 'status': 'up', 'version': waterbutler.__version__ }) + + +class PendingHandler(tornado.web.RequestHandler): + + def get(self, task_id, result_resource): + # TKB need to cover all possible states + app = Celery() + app.config_from_object(tasks_settings) + result = AsyncResult(id=task_id, app=app) + if str(result.ready()) == 'True': + if str(result.state) == 'SUCCESS': + meta, created = result.get(timeout=3) + self.set_status(303) + self.add_header('Content-Location', '{}://{}/v1/resources/{}/providers/{}{}'.format(self.request.protocol, self.request.host, result_resource, meta.provider, meta.path)) + self.write({'data': + {'task_id': task_id, + 'state': str(result.state), + 'ready': str(result.ready()), + } + }) + elif str(result.state) == 'FAILURE': + self.set_status(200) + self.write({'errors': + {'status': result.error_code, + 'source': {'pointer': self.url}, + 'title': result.error_msg, + 'detail': result.error_dtl + } + }) + else: + self.write({ + 'task_id': task_id, + 'state': str(result.state), + }) diff --git a/waterbutler/tasks/core.py b/waterbutler/tasks/core.py index 74930e80c..696d04c39 100644 --- a/waterbutler/tasks/core.py +++ b/waterbutler/tasks/core.py @@ -99,7 +99,7 @@ def celery_task(func, *args, **kwargs): @backgroundify -async def wait_on_celery(result, interval=None, timeout=None, basepath=None): +async def wait_on_celery(result, result_resource=None, interval=None, timeout=None, basepath=None): timeout = timeout or settings.WAIT_TIMEOUT interval = interval or settings.WAIT_INTERVAL basepath = basepath or settings.ADHOC_BACKEND_PATH @@ -123,6 +123,6 @@ async def wait_on_celery(result, interval=None, timeout=None, basepath=None): return result.result if waited > timeout: - raise exceptions.WaitTimeOutError + raise exceptions.WaitTimeOutError(str(result.id), str(result_resource)) await asyncio.sleep(interval) waited += interval diff --git a/waterbutler/tasks/settings.py b/waterbutler/tasks/settings.py index 36bf00386..7cdb6aa9d 100644 --- a/waterbutler/tasks/settings.py +++ b/waterbutler/tasks/settings.py @@ -15,8 +15,19 @@ os.environ.get('RABBITMQ_PORT_5672_TCP_PORT', ''), ) ) +CELERY_RESULT_BACKEND = config.get( + 'CELERY_RESULT_BACKEND', + '{}://{}:{}/{}'.format( + os.environ.get('CELERY_RESULT_BACKEND_PROTO', ''), + os.environ.get('CELERY_RESULT_BACKEND_TCP_ADDR', ''), + os.environ.get('CELERY_RESULT_BACKEND_TCP_PORT', ''), + os.environ.get('CELERY_RESULT_BACKEND_ID', ''), + ) +) WAIT_TIMEOUT = int(config.get('WAIT_TIMEOUT', 15)) +# For testing 202 response +# WAIT_TIMEOUT = int(config.get('WAIT_TIMEOUT', 1)) WAIT_INTERVAL = float(config.get('WAIT_INTERVAL', 0.5)) ADHOC_BACKEND_PATH = config.get('ADHOC_BACKEND_PATH', '/tmp') @@ -25,12 +36,6 @@ CELERY_QUEUES = ( Queue('waterbutler', Exchange('waterbutler'), routing_key='waterbutler'), ) -# CELERY_ALWAYS_EAGER = config.get('CELERY_ALWAYS_EAGER', True) -CELERY_ALWAYS_EAGER = config.get_bool('CELERY_ALWAYS_EAGER', False) -# CELERY_RESULT_BACKEND = config.get('CELERY_RESULT_BACKEND', 'redis://') -CELERY_RESULT_BACKEND = config.get_nullable('CELERY_RESULT_BACKEND', None) -CELERY_DISABLE_RATE_LIMITS = config.get_bool('CELERY_DISABLE_RATE_LIMITS', True) -CELERY_TASK_RESULT_EXPIRES = int(config.get('CELERY_TASK_RESULT_EXPIRES', 60)) CELERY_IMPORTS = [ entry.module_name for entry in iter_entry_points(group='waterbutler.providers.tasks', name=None) @@ -41,4 +46,3 @@ CELERY_ACKS_LATE = True CELERYD_HIJACK_ROOT_LOGGER = False -CELERY_EAGER_PROPAGATES_EXCEPTIONS = True