Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table dump to a csv feature #157

Merged
merged 21 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ RUN mkdir -p /home/nginx/.cloudvolume/secrets \
COPY requirements.txt /app/.
RUN python -m pip install --upgrade pip
RUN pip install -r requirements.txt
# Install gcloud SDK as root and set permissions
# Install gcloud SDK as root
COPY . /app
COPY override/timeout.conf /etc/nginx/conf.d/timeout.conf
COPY gracefully_shutdown_celery.sh /home/nginx
RUN chmod +x /home/nginx/gracefully_shutdown_celery.sh
RUN mkdir -p /home/nginx/tmp/shutdown
RUN chmod +x /entrypoint.sh
WORKDIR /app
WORKDIR /app
USER nginx
RUN curl -sSL https://sdk.cloud.google.com | bash
ENV PATH /home/nginx/google-cloud-sdk/bin:/root/google-cloud-sdk/bin:$PATH
USER root
55 changes: 26 additions & 29 deletions materializationengine/blueprints/client/api2.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from materializationengine.blueprints.client.utils import (
create_query_response,
collect_crud_columns,
get_latest_version,
)
from materializationengine.blueprints.client.schemas import (
ComplexQuerySchema,
Expand Down Expand Up @@ -573,7 +574,9 @@ def get(self, datastack_name: str):
return versions, 200


@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>")
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int(signed=True):version>"
)
class DatastackVersion(Resource):
method_decorators = [
limit_by_category("fast_query"),
Expand Down Expand Up @@ -610,7 +613,7 @@ def get(self, datastack_name: str, version: int):


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/count"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/table/<string:table_name>/count"
)
class FrozenTableCount(Resource):
method_decorators = [
Expand Down Expand Up @@ -707,7 +710,9 @@ def get(self, datastack_name: str):
return schema.dump(response, many=True), 200


@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>/tables")
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/tables"
)
class FrozenTableVersions(Resource):
method_decorators = [
limit_by_category("fast_query"),
Expand Down Expand Up @@ -752,7 +757,7 @@ def get(self, datastack_name: str, version: int):


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/tables/metadata"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/tables/metadata"
)
class FrozenTablesMetadata(Resource):
method_decorators = [
Expand Down Expand Up @@ -811,7 +816,7 @@ def get(


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/metadata"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/table/<string:table_name>/metadata"
)
class FrozenTableMetadata(Resource):
method_decorators = [
Expand Down Expand Up @@ -868,7 +873,7 @@ def get(

@client_bp.expect(query_parser)
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/query"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/table/<string:table_name>/query"
)
class FrozenTableQuery(Resource):
method_decorators = [
Expand Down Expand Up @@ -1144,7 +1149,7 @@ def preprocess_view_dataframe(df, view_name, db_name, column_names):

@client_bp.expect(query_seg_prop_parser)
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/info"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/table/<string:table_name>/info"
)
class MatTableSegmentInfo(Resource):
method_decorators = [
Expand Down Expand Up @@ -1330,7 +1335,9 @@ def get(


@client_bp.expect(query_parser)
@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>/query")
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/query"
)
class FrozenQuery(Resource):
method_decorators = [
validate_datastack,
Expand Down Expand Up @@ -1623,7 +1630,9 @@ def post(self, datastack_name: str):


@client_bp.expect(query_parser)
@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>/views")
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/views"
)
class AvailableViews(Resource):
method_decorators = [
validate_datastack,
Expand Down Expand Up @@ -1668,7 +1677,7 @@ def get(

@client_bp.expect(query_parser)
@client_bp.route(
"/datastack/<string:datastack_name>/version/<version>/views/<string:view_name>/metadata"
"/datastack/<string:datastack_name>/version/<int(signed+True):version>/views/<string:view_name>/metadata"
)
class ViewMetadata(Resource):
method_decorators = [
Expand Down Expand Up @@ -1868,24 +1877,12 @@ def get(
datastack_name
)

if version == -1:
version = get_latest_version(datastack_name)
print(f"using version {version}")
mat_db_name = f"{datastack_name}__mat{version}"
if version == 0:
mat_db_name = f"{aligned_volume_name}"
elif version == -1:
mat_db_name = f"{aligned_volume_name}"
session = sqlalchemy_cache.get(mat_db_name)
# query the database for the latest valid version
response = (
session.query(AnalysisVersion)
.filter(AnalysisVersion.datastack == datastack_name)
.filter(AnalysisVersion.valid)
.order_by(AnalysisVersion.time_stamp.desc())
.first()
)
version = response.version
print(f"using version {version}")
mat_db_name = f"{datastack_name}__mat{version}"
else:
mat_db_name = f"{datastack_name}__mat{version}"

df, column_names, warnings = assemble_view_dataframe(
datastack_name, version, view_name, {}, {}
Expand Down Expand Up @@ -1920,7 +1917,7 @@ def get(

@client_bp.expect(query_parser)
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/views/<string:view_name>/query"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/views/<string:view_name>/query"
)
class ViewQuery(Resource):
method_decorators = [
Expand Down Expand Up @@ -2040,7 +2037,7 @@ def get_table_schema(table):


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/views/<string:view_name>/schema"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/views/<string:view_name>/schema"
)
class ViewSchema(Resource):
method_decorators = [
Expand Down Expand Up @@ -2086,7 +2083,7 @@ def get(


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/views/schemas"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/views/schemas"
)
class ViewSchemas(Resource):
method_decorators = [
Expand Down
2 changes: 2 additions & 0 deletions materializationengine/blueprints/client/datastack.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def wrapper(*args, **kwargs):
AnalysisVersion.datastack == target_datastack
)
if target_version:
if target_version == 0:
return f(*args, **kwargs)
if target_version == -1:
return f(*args, **kwargs)
version_query = version_query.filter(
Expand Down
21 changes: 21 additions & 0 deletions materializationengine/blueprints/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from cloudfiles import compression
from io import BytesIO

from materializationengine.info_client import get_datastack_info
from materializationengine.database import sqlalchemy_cache
from dynamicannotationdb.models import AnalysisVersion


def collect_crud_columns(column_names):
crud_columns = []
Expand Down Expand Up @@ -59,6 +63,23 @@ def update_notice_text_warnings(ann_md, warnings, table_name):
return warnings


def get_latest_version(datastack_name):
aligned_volume_name = get_datastack_info(datastack_name)["aligned_volume"]["name"]
session = sqlalchemy_cache.get(aligned_volume_name)
# query the database for the latest valid version
response = (
session.query(AnalysisVersion)
.filter(AnalysisVersion.datastack == datastack_name)
.filter(AnalysisVersion.valid)
.order_by(AnalysisVersion.time_stamp.desc())
.first()
)
if response is None:
return None
else:
return response.version


def create_query_response(
df,
warnings,
Expand Down
141 changes: 140 additions & 1 deletion materializationengine/blueprints/materialize/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from dynamicannotationdb.models import AnalysisTable, Base
from flask import abort, current_app, request
from flask_accepts import accepts
from flask_restx import Namespace, Resource, inputs, reqparse
from flask_restx import Namespace, Resource, inputs, reqparse, fields
from materializationengine.blueprints.client.utils import get_latest_version
from materializationengine.blueprints.reset_auth import reset_auth
from materializationengine.database import (
create_session,
Expand All @@ -24,6 +25,9 @@
from sqlalchemy.engine.url import make_url
from sqlalchemy.exc import NoSuchTableError
from materializationengine.utils import check_write_permission
import os
import subprocess
import cloudfiles


from materializationengine.blueprints.materialize.schemas import (
Expand Down Expand Up @@ -396,6 +400,141 @@ def post(self, datastack_name: str):
return 200


response_model = mat_bp.model(
"Response",
{
"message": fields.String(description="Response message"),
"csv_path": fields.String(description="Path to csv file", required=False),
"header_path": fields.String(description="Path to header file", required=False),
},
)


@mat_bp.route(
"/materialize/run/dump_csv_table/datastack/<string:datastack_name>/version/<int(signed=True):version>/table_name/<string:table_name>/"
)
class DumpTableToBucketAsCSV(Resource):
@reset_auth
@auth_requires_admin
@mat_bp.doc("Take table or view and dump it to a bucket as csv", security="apikey")
@mat_bp.response(200, "Success", response_model)
@mat_bp.response(500, "Internal Server Error", response_model)
def post(self, datastack_name: str, version: int, table_name: str):
"""Dump table to bucket as csv

Args:
datastack_name (str): name of datastack from infoservice
version (int): version of datastack
table_name (str): name of table or view to dump
"""
mat_db_name = f"{datastack_name}__mat{version}"

# TODO: add validation of parameters
sql_instance_name = current_app.config.get("SQL_INSTANCE_NAME", None)
if not sql_instance_name:
return {"message": "SQL_INSTANCE_NAME not set in app config"}, 500

bucket = current_app.config.get("MATERIALIZATION_DUMP_BUCKET", None)
if not bucket:
return {"message": "MATERIALIZATION_DUMP_BUCKET not set in app config"}, 500

if version == -1:
version = get_latest_version(datastack_name)

cf = cloudfiles.CloudFiles(bucket)
filename = f"{datastack_name}/v{version}/{table_name}.csv.gz"

cloudpath = os.path.join(bucket, filename)
header_file = f"{datastack_name}/v{version}/{table_name}_header.csv"
header_cloudpath = os.path.join(bucket, header_file)

# check if the file already exists
if cf.exists(filename):
# return a flask respoonse 200 message that says that the file already exitss
return {
"message": "file already created",
"csv_path": cloudpath,
"header_path": header_cloudpath,
}, 200

else:
# run a gcloud command to activate the service account for gcloud
activate_command = [
"gcloud",
"auth",
"activate-service-account",
"--key-file",
os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"),
]
process = subprocess.Popen(
activate_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
# run this command and capture the stdout and return code
return_code = process.returncode
if return_code != 0:
return {
"message": f"failed to activate service account using {activate_command}. Error: {stderr.decode()} stdout: {stdout.decode()}"
}, 500

header_command = [
"gcloud",
"sql",
"export",
"csv",
sql_instance_name,
header_cloudpath,
"--database",
mat_db_name,
"--query",
f"SELECT column_name, data_type from INFORMATION_SCHEMA.COLUMNS where TABLE_NAME = '{table_name}'",
]
process = subprocess.Popen(
header_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
# run this command and capture the stdout and return code
return_code = process.returncode
if return_code != 0:
return {
"message": f"header file failed to create using:\
{header_command}. Error: {stderr.decode()} stdout: {stdout.decode()}"
}, 500

# run a gcloud command to select * from table and write it to disk as a csv
export_command = [
"gcloud",
"sql",
"export",
"csv",
sql_instance_name,
cloudpath,
"--database",
mat_db_name,
"--async",
"--query",
f"SELECT * from {table_name}",
]

process = subprocess.Popen(
export_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
# run this command and capture the stdout and return code
return_code = process.returncode
if return_code != 0:
return {
"message": f"file failed to create using: {export_command}. Error: {stderr.decode()} stdout: {stdout.decode()}"
}, 500

else:
return {
"message": "file created sucessefully",
"csv_path": cloudpath,
"header_path": header_cloudpath,
}, 200


@mat_bp.route("/materialize/run/update_database/datastack/<string:datastack_name>")
class UpdateLiveDatabaseResource(Resource):
@reset_auth
Expand Down
Loading
Loading