-
Notifications
You must be signed in to change notification settings - Fork 117
/
app_utils.py
25 lines (22 loc) · 932 Bytes
/
app_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from flask import request, jsonify, current_app
from functools import wraps
import jsonschema
def validate_payload(schema):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not request.json:
return jsonify({"message": "Missing JSON in request"}), 400
try:
jsonschema.validate(instance=request.json, schema=schema)
except jsonschema.exceptions.ValidationError as validation_error:
return jsonify({"message": f"Invalid payload: {validation_error.message}"}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
def queue_task_wrapper(bypass_queue=False):
def decorator(f):
def wrapper(*args, **kwargs):
return current_app.queue_task(bypass_queue=bypass_queue)(f)(*args, **kwargs)
return wrapper
return decorator