Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always stream out blocks in dods_encode #10

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions opendap_protocol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import re
from dataclasses import dataclass
from dask.cache import Cache

import dask.array as da
import numpy as np
Expand All @@ -53,6 +54,13 @@
@dataclass
class Config:
DASK_ENCODE_CHUNK_SIZE: int = 20e6
DASK_CACHE_SIZE: int = 120 * 1024 * 1024 # 120MB

# we load one `DASK_ENCODE_CHUNK_SIZE`-sized block of linearized data
# in to memory at one go. This may overlap with multiple dask chunks
# so lets cache those chunks since we might come back to them.
cache = Cache(Config.DASK_CACHE_SIZE)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be configured at the server level? That is what we do

Copy link
Author

@dcherian dcherian Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I think we should apply the cache more locally in that loop in dods_encode. We want to cache aggressively when we have multiple batches to stream out for a single request from a single array. This is because the order in which we yield bytes can be orthogonal to chunking, and we can visit the same chunk multiple times.

I think the more global server cache is appropriate for a less aggressive cache across multiple requests.

Perhaps we can pair at some point and just iterate through some options with a benchmark problem.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also seems like a good place to stick in a bit of async: compute the next iteration while streaming out the current iteration.

cache.register()


class DAPError(Exception):
Expand Down Expand Up @@ -491,8 +499,9 @@ def dods_encode(data, dtype):
if isinstance(data, da.Array):
# Encode in chunks of a defined size if we work with dask.Array
chunk_size = int(Config.DASK_ENCODE_CHUNK_SIZE / data.dtype.itemsize)
serialize_data = data.ravel().rechunk(chunk_size)
for block in serialize_data.blocks:
flat = data.ravel()
for start in range(0, data.size, chunk_size):
block = flat[slice(start, chunk_size)]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be

Suggested change
block = flat[slice(start, chunk_size)]
block = flat[slice(start, start+chunk_size)]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearly this needs tests!

yield block.astype(dtype.str).compute().tobytes()
else:
# Make sure we always encode an array or we will get wrong results
Expand Down