Skip to content

Commit

Permalink
Merge pull request #708 from hubmapconsortium/karlburke/IntroduceS3Re…
Browse files Browse the repository at this point in the history
…sponses

Introduce usage of S3 for large responses
  • Loading branch information
yuanzhou authored Aug 8, 2024
2 parents 8de96cb + c87677a commit ba4051a
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 26 deletions.
136 changes: 110 additions & 26 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from hubmap_commons import neo4j_driver
from hubmap_commons.hm_auth import AuthHelper
from hubmap_commons.exceptions import HTTPException

from hubmap_commons.S3_worker import S3Worker

# Root logger configuration
global logger
Expand All @@ -68,6 +68,13 @@
app.config['ONTOLOGY_API_URL'] = app.config['ONTOLOGY_API_URL'].strip('/')
app.config['SEARCH_API_URL_LIST'] = [url.strip('/') for url in app.config['SEARCH_API_URL_LIST']]

S3_settings_dict = {'large_response_threshold': app.config['LARGE_RESPONSE_THRESHOLD']
, 'aws_access_key_id': app.config['AWS_ACCESS_KEY_ID']
, 'aws_secret_access_key': app.config['AWS_SECRET_ACCESS_KEY']
, 'aws_s3_bucket_name': app.config['AWS_S3_BUCKET_NAME']
, 'aws_object_url_expiration_in_secs': app.config['AWS_OBJECT_URL_EXPIRATION_IN_SECS']
, 'service_configured_obj_prefix': app.config['AWS_S3_OBJECT_PREFIX']}

# This mode when set True disables the PUT and POST calls, used on STAGE to make entity-api READ-ONLY
# to prevent developers from creating new UUIDs and new entities or updating existing entities
READ_ONLY_MODE = app.config['READ_ONLY_MODE']
Expand Down Expand Up @@ -225,6 +232,20 @@ def http_internal_server_error(e):
# Log the full stack trace, prepend a line with our message
logger.exception(msg)

####################################################################################################
## Initialize an S3Worker from hubmap-commons
####################################################################################################

try:
anS3Worker = S3Worker(ACCESS_KEY_ID=S3_settings_dict['aws_access_key_id']
, SECRET_ACCESS_KEY=S3_settings_dict['aws_secret_access_key']
, S3_BUCKET_NAME=S3_settings_dict['aws_s3_bucket_name']
, S3_OBJECT_URL_EXPIRATION_IN_SECS=S3_settings_dict['aws_object_url_expiration_in_secs']
, LARGE_RESPONSE_THRESHOLD=S3_settings_dict['large_response_threshold']
, SERVICE_S3_OBJ_PREFIX=S3_settings_dict['service_configured_obj_prefix'])
logger.info("anS3Worker initialized")
except Exception as s3exception:
logger.critical(s3exception, exc_info=True)

####################################################################################################
## REFERENCE DOI Redirection
Expand Down Expand Up @@ -3135,13 +3156,20 @@ def get_associated_donors_from_dataset(id):
Returns
-------
If the response is small enough to be returned directly through the gateway, an HTTP 200 response code will be
returned. If the response is too large to pass through the gateway, and HTTP 303 response code will be returned, and
the response body will contain a URL to an AWS S3 Object. The Object must be retrieved by following the URL before
it expires.
json
an array of each datatset's provenance info
tsv
a text file of tab separated values where each row is a dataset and the columns include all its prov info
"""
@app.route('/datasets/prov-info', methods=['GET'])
def get_prov_info():
global anS3Worker

# String constants
HEADER_DATASET_UUID = 'dataset_uuid'
HEADER_DATASET_HUBMAP_ID = 'dataset_hubmap_id'
Expand Down Expand Up @@ -3415,25 +3443,39 @@ def get_prov_info():
# Each dataset's dictionary is added to the list to be returned
dataset_prov_list.append(internal_dict)

# Determine whether the size of the returned data exceeds or nearly exceeds the AWS Gateway 10MB maximum size. If it
# is greater than 9437184 bytes Return a 400 and prompt the user to reduce the size of the output by applying optional
# argument filters.
dataset_prov_json_encode = json.dumps(dataset_prov_list).encode('utf-8')
if len(dataset_prov_json_encode) > 9437184:
bad_request_error(
"Request generated a response over the 10MB limit. Sub-select the results using a query parameter.")

# if return_json is true, this dictionary is ready to be returned already
# Establish a string for the Response which can be checked to
# see if it is small enough to return directly or must be stashed in S3.
if return_json:
return jsonify(dataset_prov_list)

# if return_json is false, the data must be converted to be returned as a tsv
resp_body = json.dumps(dataset_prov_list).encode('utf-8')
else:
# If return_json is false, convert the data to a TSV
new_tsv_file = StringIO()
writer = csv.DictWriter(new_tsv_file, fieldnames=headers, delimiter='\t')
writer.writeheader()
writer.writerows(dataset_prov_list)
new_tsv_file.seek(0)
resp_body = new_tsv_file.read()

# Check the size of what is to be returned through the AWS Gateway, and replace it with
# a response that links to an Object in the AWS S3 Bucket, if appropriate.
try:
s3_url = anS3Worker.stash_response_body_if_big(resp_body)
if s3_url is not None:
return Response(response=s3_url
, status=303) # See Other
except Exception as s3exception:
logger.error(f"Error using anS3Worker to handle len(resp_body)="
f"{len(resp_body)}.")
logger.error(s3exception, exc_info=True)
return Response(response=f"Unexpected error storing large results in S3. See logs."
, status=500)

# Return a regular response through the AWS Gateway
if return_json:
return jsonify(dataset_prov_list)
else:
# Return the TSV as an attachment, since it will is small enough to fit through the AWS Gateway.
new_tsv_file.seek(0)
output = Response(new_tsv_file, mimetype='text/tsv')
output.headers['Content-Disposition'] = 'attachment; filename=prov-info.tsv'
return output
Expand All @@ -3460,10 +3502,15 @@ def get_prov_info():
Returns
-------
If the response is small enough to be returned directly through the gateway, an HTTP 200 response code will be
returned. If the response is too large to pass through the gateway, and HTTP 303 response code will be returned, and
the response body will contain a URL to an AWS S3 Object. The Object must be retrieved by following the URL before
it expires.
json
an array of each datatset's provenance info
A dictionary of the Datatset's provenance info
tsv
a text file of tab separated values where each row is a dataset and the columns include all its prov info
A text file of tab separated prov info values for the Dataset, including a row of column headings.
"""
@app.route('/datasets/<id>/prov-info', methods=['GET'])
def get_prov_info_for_dataset(id):
Expand Down Expand Up @@ -3716,19 +3763,43 @@ def get_prov_info_for_dataset(id):

dataset_prov_list.append(internal_dict)

# Establish a string for the Response which can be checked to
# see if it is small enough to return directly or must be stashed in S3.
if return_json:
return jsonify(dataset_prov_list[0])
resp_body = json.dumps(dataset_prov_list).encode('utf-8')
else:
# If return_json is false, convert the data to a TSV
new_tsv_file = StringIO()
writer = csv.DictWriter(new_tsv_file, fieldnames=headers, delimiter='\t')
writer.writeheader()
writer.writerows(dataset_prov_list)
new_tsv_file.seek(0)
resp_body = new_tsv_file.read()

# Check the size of what is to be returned through the AWS Gateway, and replace it with
# a response that links to an Object in the AWS S3 Bucket, if appropriate.
try:
s3_url = anS3Worker.stash_response_body_if_big(resp_body)
if s3_url is not None:
return Response(response=s3_url
, status=303) # See Other
except Exception as s3exception:
logger.error(f"Error using anS3Worker to handle len(resp_body)="
f"{len(resp_body)}.")
logger.error(s3exception, exc_info=True)
return Response(response=f"Unexpected error storing large results in S3. See logs."
, status=500)

# Return a regular response through the AWS Gateway
if return_json:
return jsonify(dataset_prov_list[0])
else:
# Return the TSV as an attachment, since it will is small enough to fit through the AWS Gateway.
new_tsv_file.seek(0)
output = Response(new_tsv_file, mimetype='text/tsv')
output.headers['Content-Disposition'] = 'attachment; filename=prov-info.tsv'
return output


"""
Get the information needed to generate the sankey on software-docs as a json.
Expand Down Expand Up @@ -3824,11 +3895,18 @@ def sankey_data():
Returns
-------
If the response is small enough to be returned directly through the gateway, an HTTP 200 response code will be
returned. If the response is too large to pass through the gateway, and HTTP 303 response code will be returned, and
the response body will contain a URL to an AWS S3 Object. The Object must be retrieved by following the URL before
it expires.
json
an array of each datatset's provenance info
"""
@app.route('/samples/prov-info', methods=['GET'])
def get_sample_prov_info():
global anS3Worker

# String Constants
HEADER_SAMPLE_UUID = "sample_uuid"
HEADER_SAMPLE_LAB_ID = "lab_id_or_name"
Expand Down Expand Up @@ -3947,16 +4025,23 @@ def get_sample_prov_info():
# Each sample's dictionary is added to the list to be returned
sample_prov_list.append(internal_dict)

# Determine whether the size of the returned data exceeds or nearly exceeds the AWS Gateway 10MB maximum size. If it
# is greater than 9437184 bytes Return a 400 and prompt the user to reduce the size of the output by applying optional
# argument filters.
sample_prov_json_encode = json.dumps(sample_prov_list).encode('utf-8')
if len(sample_prov_json_encode) > 9437184:
bad_request_error(
"Request generated a response over the 10MB limit. Sub-select the results using a query parameter.")
# Check the size of what is to be returned through the AWS Gateway, and replace it with
# a response that links to an Object in the AWS S3 Bucket, if appropriate.
try:
s3_url = anS3Worker.stash_response_body_if_big(json.dumps(sample_prov_list).encode('utf-8'))
if s3_url is not None:
return Response(response=s3_url
, status=303) # See Other
except Exception as s3exception:
logger.error(f"Error using anS3Worker to handle len(json.dumps(sample_prov_list).encode('utf-8'))="
f"{len(json.dumps(sample_prov_list).encode('utf-8'))}.")
logger.error(s3exception, exc_info=True)
return Response(response=f"Unexpected error storing large results in S3. See logs."
, status=500)

# Return a regular response through the AWS Gateway
return jsonify(sample_prov_list)


"""
Retrieve all unpublished datasets (datasets with status value other than 'Published' or 'Hold')
Expand Down Expand Up @@ -5584,7 +5669,6 @@ def _get_metadata_by_id(entity_id:str=None, metadata_scope:MetadataScopeEnum=Met
# Response with the dict
return final_result


####################################################################################################
## For local development/testing
####################################################################################################
Expand Down
12 changes: 12 additions & 0 deletions src/instance/app.cfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ SCHEMA_YAML_FILE = '/usr/src/app/src/schema/provenance_schema.yaml'
APP_CLIENT_ID = ''
APP_CLIENT_SECRET = ''

# AWS credentials for access such as S3 and presigned URLs
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
AWS_ACCESS_KEY_ID = ''
AWS_SECRET_ACCESS_KEY = ''
AWS_S3_BUCKET_NAME = 'hm-api-responses' #_DevTest'
AWS_S3_OBJECT_PREFIX = 'Dev_entity-api_'
AWS_OBJECT_URL_EXPIRATION_IN_SECS = 60*60 # 1 hour
# Large response threshold, as determined by len() for the character set, above
# which responses will be stashed in an S3 bucket and a pre-signed URL
# returned in the response to avoid the AWS Gateway 10Mb constraint
LARGE_RESPONSE_THRESHOLD = 9*(2**20) + 900*(2**10) #9.9Mb

# Neo4j connection (default value used for docker localhost deployment)
# Point to remote neo4j for dev/test/stage/prod deployment
NEO4J_URI = 'bolt://hubmap-neo4j-localhost:7687'
Expand Down

0 comments on commit ba4051a

Please sign in to comment.