Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ipc compression option #137

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions materializationengine/blueprints/client/api2.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ def __schema__(self):
that are not valid at the timestamp that is queried. If True the filter will likely \
not be relevant and the user might not be getting data back that they expect, but it will not error.",
)
query_parser.add_argument(
"ipc_compress",
type=inputs.boolean,
default=True,
required=False,
location="args",
help="whether to have arrow compress the result when using \
return_pyarrow=True and arrow_format=True. \
If False, the result will not have it's internal data\
compressed (note that the entire response \
will be gzip compressed if accept-enconding includes gzip). \
If True, accept-encoding will determine what \
internal compression is used",
)


metadata_parser = reqparse.RequestParser()
Expand Down Expand Up @@ -1187,6 +1201,7 @@ def post(self, datastack_name: str):
desired_resolution=user_data["desired_resolution"],
return_pyarrow=args["return_pyarrow"],
arrow_format=args["arrow_format"],
ipc_compress=args["ipc_compress"],
)


Expand Down Expand Up @@ -1422,6 +1437,7 @@ def post(
desired_resolution=data["desired_resolution"],
return_pyarrow=args["return_pyarrow"],
arrow_format=args["arrow_format"],
ipc_compress=args["ipc_compress"],
)


Expand Down
3 changes: 2 additions & 1 deletion materializationengine/blueprints/client/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ def handle_simple_query(
desired_resolution=data["desired_resolution"],
return_pyarrow=args["return_pyarrow"],
arrow_format=args["arrow_format"],
ipc_compress=args["ipc_compress"],
)


Expand All @@ -293,7 +294,6 @@ def handle_complex_query(
data,
convert_desired_resolution=False,
):

aligned_volume_name, pcg_table_name = get_relevant_datastack_info(datastack_name)
db = dynamic_annotation_cache.get_db(aligned_volume_name)

Expand Down Expand Up @@ -447,4 +447,5 @@ def handle_complex_query(
desired_resolution=data["desired_resolution"],
return_pyarrow=args["return_pyarrow"],
arrow_format=args["arrow_format"],
ipc_compress=args["ipc_compress"],
)
14 changes: 8 additions & 6 deletions materializationengine/blueprints/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


def collect_crud_columns(column_names):

crud_columns = []
created_columns = []
for table in column_names.keys():
Expand All @@ -20,7 +19,6 @@ def collect_crud_columns(column_names):


def after_request(response):

accept_encoding = request.headers.get("Accept-Encoding", "")

if "gzip" not in accept_encoding.lower():
Expand Down Expand Up @@ -68,6 +66,7 @@ def create_query_response(
column_names,
return_pyarrow=True,
arrow_format=False,
ipc_compress=True,
):
accept_encoding = request.headers.get("Accept-Encoding", "")

Expand All @@ -79,10 +78,13 @@ def create_query_response(
if arrow_format:
batch = pa.RecordBatch.from_pandas(df)
sink = pa.BufferOutputStream()
if "lz4" in accept_encoding:
compression = "LZ4_FRAME"
elif "zstd" in accept_encoding:
compression = "ZSTD"
if ipc_compress:
if "lz4" in accept_encoding:
compression = "LZ4_FRAME"
elif "zstd" in accept_encoding:
compression = "ZSTD"
else:
compression = None
else:
compression = None
opt = pa.ipc.IpcWriteOptions(compression=compression)
Expand Down
Loading