Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always stream out blocks in dods_encode #10

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions opendap_protocol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,26 @@
clients using the netCDF4 library. PyDAP client libraries are not supported.
"""

import importlib
import re
from dataclasses import dataclass
from dask.cache import Cache

import dask.array as da
import numpy as np

has_xarray = bool(importlib.util.find_spec("xarray"))

if has_xarray:
from xarray import Variable

INDENT = ' '
SLICE_CONSTRAINT_RE = r'\[([\d,\W]+)\]$'


@dataclass
class Config:
DASK_ENCODE_CHUNK_SIZE: int = 20e6
DASK_CACHE_SIZE: int = 120 * 1024 * 1024 # 120MB

# we load one `DASK_ENCODE_CHUNK_SIZE`-sized block of linearized data
# in to memory at one go. This may overlap with multiple dask chunks
# so lets cache those chunks since we might come back to them.
cache = Cache(Config.DASK_CACHE_SIZE)
cache.register()
STREAMING_BLOCK_SIZE: int = 20e6


class DAPError(Exception):
Expand Down Expand Up @@ -496,17 +495,21 @@ def dods_encode(data, dtype):

yield packed_length

chunk_size = int(Config.STREAMING_BLOCK_SIZE / data.dtype.itemsize)
if isinstance(data, da.Array):
# Encode in chunks of a defined size if we work with dask.Array
chunk_size = int(Config.DASK_ENCODE_CHUNK_SIZE / data.dtype.itemsize)
flat = data.ravel()
for start in range(0, data.size, chunk_size):
block = flat[slice(start, chunk_size)]
yield block.astype(dtype.str).compute().tobytes()
else:
# Make sure we always encode an array or we will get wrong results
data = np.asarray(data)
yield data.astype(dtype.str).tobytes()
data = data.ravel()

for start in range(0, data.size, chunk_size):
end = start + chunk_size
if isinstance(data, da.Array):
block = data[slice(start, end)].compute()
elif has_xarray and isinstance(data, Variable):
npidxr = np.unravel_index(np.arange(start, min(end, data.size)), shape=data.shape)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image for 30MB blocks,

xridxr = tuple(Variable(dims="__points__", data=idxr) for idxr in npidxr)
block = data[xridxr].to_numpy()
else:
block = np.asarray(data).ravel()[slice(start, end)]
yield block.astype(dtype.str).tobytes()


def parse_slice_constraint(constraint):
Expand Down Expand Up @@ -571,6 +574,6 @@ def set_dask_encoding_chunk_size(chunk_size: int):
"""
chunk_size = int(chunk_size)
if chunk_size > 0:
Config.DASK_ENCODE_CHUNK_SIZE = chunk_size
Config.STREAMING_BLOCK_SIZE = chunk_size
else:
raise ValueError('Encoding chunk size needs to be greather than 0.')
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

test_requirements = [
'pytest',
'xarray',
]

extras = {
Expand Down
11 changes: 8 additions & 3 deletions tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import dask.array as da
import numpy as np
import opendap_protocol as dap
import xarray as xr
import pytest
from opendap_protocol.protocol import dods_encode

Expand Down Expand Up @@ -68,9 +69,13 @@ def test_dods_encode():
data_vals = da.from_array(np_data,
chunks=(14, y_dim, 1, vertical_dim, 1, 1))

variable = xr.Variable(dims=("x", "y", "time", "vertical", "real", "ref_time"),
data=np_data)

x = dap.dods_encode(data_vals, dap.Int32)
y = dap.dods_encode(np_data, dap.Int32)
assert b''.join(x) == b''.join(y)
z = dap.dods_encode(variable, dap.Int32)
assert b''.join(x) == b''.join(y) == b''.join(z)

int_arrdata = np.arange(0, 20, 2, dtype='<i4')
assert b''.join(dods_encode(int_arrdata,
Expand Down Expand Up @@ -290,11 +295,11 @@ def test_set_dask_encoding_chunk_size():
from opendap_protocol.protocol import set_dask_encoding_chunk_size
set_dask_encoding_chunk_size(chunk_size)

assert opendap_protocol.protocol.Config.DASK_ENCODE_CHUNK_SIZE == chunk_size
assert opendap_protocol.protocol.Config.STREAMING_BLOCK_SIZE == chunk_size

# Restore the default value
reload(opendap_protocol.protocol)
assert opendap_protocol.protocol.Config.DASK_ENCODE_CHUNK_SIZE == 20e6
assert opendap_protocol.protocol.Config.STREAMING_BLOCK_SIZE == 20e6

invalid_values = [ 0, -10, [], 'cloud', '', {}, ]
for val in invalid_values:
Expand Down