Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [SVCS-325] Proof of concept: Notify users after returning a 202 Accepted? #226

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion waterbutler/server/api/v1/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from raven.contrib.tornado import SentryMixin

from waterbutler import tasks
from waterbutler.server import utils
from waterbutler.server import utils, settings
from waterbutler.core import exceptions


Expand All @@ -24,7 +24,16 @@ def write_error(self, status_code, exc_info):
finish_args = [exc.data] if exc.data else [{'code': exc.code, 'message': exc.message}]
elif issubclass(etype, tasks.WaitTimeOutError):
self.set_status(202)
pending_url = '{}/pending/{}/{}'.format(settings.DOMAIN, exc.args[1], exc.args[0])
self.add_header('Content-Location', pending_url)
exception_kwargs = {'data': {'level': 'info'}}
finish_args = [{'data': {'attributes': {'status': 'Accepted',
'id': exc.args[0],
'resource': exc.args[1]
},
'links': {'status': pending_url}
}
}]
else:
finish_args = [{'code': status_code, 'message': self._reason}]

Expand Down
6 changes: 4 additions & 2 deletions waterbutler/server/api/v1/provider/movecopy.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ async def move_or_copy(self):
if not getattr(self.provider, 'can_intra_' + action)(self.dest_provider, self.path):
# this weird signature syntax courtesy of py3.4 not liking trailing commas on kwargs
conflict = self.json.get('conflict', DEFAULT_CONFLICT)
task_args = self.build_args()
result = await getattr(tasks, action).adelay(
rename=self.json.get('rename'),
conflict=conflict,
request=remote_logging._serialize_request(self.request),
*self.build_args()
*task_args
)
metadata, created = await tasks.wait_on_celery(result)
metadata, created = await tasks.wait_on_celery(result,
result_resource=task_args[1]['nid'])
else:
metadata, created = (
await tasks.backgrounded(
Expand Down
3 changes: 2 additions & 1 deletion waterbutler/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def make_app(debug):
app = tornado.web.Application(
api_to_handlers(v0) +
api_to_handlers(v1) +
[(r'/status', handlers.StatusHandler)],
[(r'/status', handlers.StatusHandler),
(r'/pending/(?P<result_resource>(?:\w|\d)+)/(?P<task_id>(?:\w|\d|-)+)', handlers.PendingHandler)],
debug=debug,
)
app.sentry_client = AsyncSentryClient(settings.SENTRY_DSN, release=waterbutler.__version__)
Expand Down
40 changes: 40 additions & 0 deletions waterbutler/server/handlers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import tornado.web
from celery.result import AsyncResult
from celery import Celery

import waterbutler
from waterbutler.tasks import settings as tasks_settings

app = Celery()
app.config_from_object(tasks_settings)


class StatusHandler(tornado.web.RequestHandler):
Expand All @@ -11,3 +17,37 @@ def get(self):
'status': 'up',
'version': waterbutler.__version__
})


class PendingHandler(tornado.web.RequestHandler):

def get(self, task_id, result_resource):
# TKB need to cover all possible states
app = Celery()
app.config_from_object(tasks_settings)
result = AsyncResult(id=task_id, app=app)
if str(result.ready()) == 'True':
if str(result.state) == 'SUCCESS':
meta, created = result.get(timeout=3)
self.set_status(303)
self.add_header('Content-Location', '{}://{}/v1/resources/{}/providers/{}{}'.format(self.request.protocol, self.request.host, result_resource, meta.provider, meta.path))
self.write({'data':
{'task_id': task_id,
'state': str(result.state),
'ready': str(result.ready()),
}
})
elif str(result.state) == 'FAILURE':
self.set_status(200)
self.write({'errors':
{'status': result.error_code,
'source': {'pointer': self.url},
'title': result.error_msg,
'detail': result.error_dtl
}
})
else:
self.write({
'task_id': task_id,
'state': str(result.state),
})
4 changes: 2 additions & 2 deletions waterbutler/tasks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def celery_task(func, *args, **kwargs):


@backgroundify
async def wait_on_celery(result, interval=None, timeout=None, basepath=None):
async def wait_on_celery(result, result_resource=None, interval=None, timeout=None, basepath=None):
timeout = timeout or settings.WAIT_TIMEOUT
interval = interval or settings.WAIT_INTERVAL
basepath = basepath or settings.ADHOC_BACKEND_PATH
Expand All @@ -123,6 +123,6 @@ async def wait_on_celery(result, interval=None, timeout=None, basepath=None):
return result.result

if waited > timeout:
raise exceptions.WaitTimeOutError
raise exceptions.WaitTimeOutError(str(result.id), str(result_resource))
await asyncio.sleep(interval)
waited += interval
18 changes: 11 additions & 7 deletions waterbutler/tasks/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,19 @@
os.environ.get('RABBITMQ_PORT_5672_TCP_PORT', ''),
)
)
CELERY_RESULT_BACKEND = config.get(
'CELERY_RESULT_BACKEND',
'{}://{}:{}/{}'.format(
os.environ.get('CELERY_RESULT_BACKEND_PROTO', ''),
os.environ.get('CELERY_RESULT_BACKEND_TCP_ADDR', ''),
os.environ.get('CELERY_RESULT_BACKEND_TCP_PORT', ''),
os.environ.get('CELERY_RESULT_BACKEND_ID', ''),
)
)

WAIT_TIMEOUT = int(config.get('WAIT_TIMEOUT', 15))
# For testing 202 response
# WAIT_TIMEOUT = int(config.get('WAIT_TIMEOUT', 1))
WAIT_INTERVAL = float(config.get('WAIT_INTERVAL', 0.5))
ADHOC_BACKEND_PATH = config.get('ADHOC_BACKEND_PATH', '/tmp')

Expand All @@ -25,12 +36,6 @@
CELERY_QUEUES = (
Queue('waterbutler', Exchange('waterbutler'), routing_key='waterbutler'),
)
# CELERY_ALWAYS_EAGER = config.get('CELERY_ALWAYS_EAGER', True)
CELERY_ALWAYS_EAGER = config.get_bool('CELERY_ALWAYS_EAGER', False)
# CELERY_RESULT_BACKEND = config.get('CELERY_RESULT_BACKEND', 'redis://')
CELERY_RESULT_BACKEND = config.get_nullable('CELERY_RESULT_BACKEND', None)
CELERY_DISABLE_RATE_LIMITS = config.get_bool('CELERY_DISABLE_RATE_LIMITS', True)
CELERY_TASK_RESULT_EXPIRES = int(config.get('CELERY_TASK_RESULT_EXPIRES', 60))
CELERY_IMPORTS = [
entry.module_name
for entry in iter_entry_points(group='waterbutler.providers.tasks', name=None)
Expand All @@ -41,4 +46,3 @@

CELERY_ACKS_LATE = True
CELERYD_HIJACK_ROOT_LOGGER = False
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True