Skip to content

Commit

Permalink
Merge pull request #466 from martindurant/complex_cdf3_dt
Browse files Browse the repository at this point in the history
Explicitly calculate dtype element size in netCDF3 records
  • Loading branch information
martindurant authored Jun 24, 2024
2 parents f2d9252 + f5f073a commit ae692fe
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 32 deletions.
1 change: 0 additions & 1 deletion kerchunk/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,6 @@ def store_coords(self):
elif k in z:
# Fall back to existing fill value
kw["fill_value"] = z[k].fill_value

arr = group.create_dataset(
name=k,
data=data,
Expand Down
32 changes: 14 additions & 18 deletions kerchunk/netCDF3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import base64
from functools import reduce
from operator import mul

import numpy as np
from fsspec.implementations.reference import LazyReferenceMapper
import fsspec

from kerchunk.utils import _encode_for_JSON
from kerchunk.utils import _encode_for_JSON, inline_array

try:
from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable
Expand Down Expand Up @@ -202,21 +201,11 @@ def translate(self):
)
part = ".".join(["0"] * len(shape)) or "0"
k = f"{dim}/{part}"
if self.threshold and int(self.chunks[dim][1]) < self.threshold:
self.fp.seek(int(self.chunks[dim][0]))
data = self.fp.read(int(self.chunks[dim][1]))
try:
# easiest way to test if data is ascii
data.decode("ascii")
except UnicodeDecodeError:
data = b"base64:" + base64.b64encode(data)
out[k] = data
else:
out[k] = [
self.filename,
int(self.chunks[dim][0]),
int(self.chunks[dim][1]),
]
out[k] = [
self.filename,
int(self.chunks[dim][0]),
int(self.chunks[dim][1]),
]
arr.attrs.update(
{
k: v.decode() if isinstance(v, bytes) else str(v)
Expand All @@ -232,7 +221,8 @@ def translate(self):
# native chunks version (no codec, no options)
start, size, dt = self.chunks["record_array"][0]
dt = np.dtype(dt)
outer_shape = size // dt.itemsize
itemsize = sum(dt[_].itemsize for _ in dt.names)
outer_shape = size // itemsize
offset = start
for name in dt.names:
dtype = dt[name]
Expand Down Expand Up @@ -294,6 +284,12 @@ def translate(self):
if k != "filename" # special "attribute"
}
)
if self.threshold:
out = inline_array(
out,
self.threshold,
remote_options=dict(remote_options=self.storage_options),
)

if isinstance(out, LazyReferenceMapper):
out.flush()
Expand Down
2 changes: 1 addition & 1 deletion kerchunk/tests/test_grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def test_grib_tree():
"atmosphere latitude longitude step time valid_time".split()
)
# Assert that the fill value is set correctly
assert zg.refc.instant.atmosphere.step.fill_value is np.NaN
assert zg.refc.instant.atmosphere.step.fill_value is np.nan


# The following two tests use json fixture data generated from calling scan grib
Expand Down
9 changes: 5 additions & 4 deletions kerchunk/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io

import fsspec
import json
import kerchunk.utils
import kerchunk.zarr
import numpy as np
Expand Down Expand Up @@ -72,16 +73,16 @@ def test_inline_array():
"data/.zattrs": '{"foo": "bar"}',
}
fs = fsspec.filesystem("reference", fo=refs)
out1 = kerchunk.utils.inline_array(refs, threshold=1000) # does nothing
out1 = kerchunk.utils.inline_array(refs, threshold=1) # does nothing
assert out1 == refs
out2 = kerchunk.utils.inline_array(refs, threshold=1000, names=["data"]) # explicit
out2 = kerchunk.utils.inline_array(refs, threshold=1, names=["data"]) # explicit
assert "data/1" not in out2
assert out2["data/.zattrs"] == refs["data/.zattrs"]
assert json.loads(out2["data/.zattrs"]) == json.loads(refs["data/.zattrs"])
fs = fsspec.filesystem("reference", fo=out2)
g = zarr.open(fs.get_mapper())
assert g.data[:].tolist() == [1, 2]

out3 = kerchunk.utils.inline_array(refs, threshold=1) # inlines because of size
out3 = kerchunk.utils.inline_array(refs, threshold=1000) # inlines because of size
assert "data/1" not in out3
fs = fsspec.filesystem("reference", fo=out3)
g = zarr.open(fs.get_mapper())
Expand Down
44 changes: 36 additions & 8 deletions kerchunk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def _inline_array(group, threshold, names, prefix=""):
if isinstance(thing, zarr.Group):
_inline_array(thing, threshold=threshold, prefix=prefix1, names=names)
else:
cond1 = threshold and thing.nbytes < threshold and thing.nchunks > 1
cond1 = threshold and thing.nbytes < threshold
cond2 = prefix1 in names
if cond1 or cond2:
original_attrs = dict(thing.attrs)
Expand All @@ -194,6 +194,7 @@ def _inline_array(group, threshold, names, prefix=""):
chunks=thing.shape,
compression=None,
overwrite=True,
fill_value=thing.fill_value,
)
arr.attrs.update(original_attrs)

Expand Down Expand Up @@ -249,35 +250,62 @@ def subchunk(store, variable, factor):
modified store
"""
fs = fsspec.filesystem("reference", fo=store)
store = copy.deepcopy(store)
store = fs.references
meta_file = f"{variable}/.zarray"
meta = ujson.loads(fs.cat(meta_file))
if meta["compressor"] is not None:
raise ValueError("Can only subchunk an uncompressed array")
chunks_orig = meta["chunks"]
if chunks_orig[0] % factor == 0:
chunk_new = [chunks_orig[0] // factor] + chunks_orig[1:]
else:
raise ValueError("Must subchunk by exact integer factor")
chunk_new = []
# plan
multi = None
for ind, this_chunk in enumerate(chunks_orig):
if this_chunk == 1:
chunk_new.append(1)
continue
elif this_chunk % factor == 0:
chunk_new.extend([this_chunk // factor] + chunks_orig[ind + 1 :])
break
elif factor % this_chunk == 0:
# if factor // chunks_orig[0] > 1:
chunk_new.append(1)
if multi is None:
multi = this_chunk
factor //= this_chunk
else:
raise ValueError("Must subchunk by exact integer factor")

if multi:
# TODO: this reloads the referenceFS; *maybe* reuses it
return subchunk(store, variable, multi)

# execute
meta["chunks"] = chunk_new
store = copy.deepcopy(store)
store[meta_file] = ujson.dumps(meta)

for k, v in store.copy().items():
if k.startswith(f"{variable}/"):
kpart = k[len(variable) + 1 :]
if kpart.startswith(".z"):
continue
sep = "." if "." in k else "/"
sep = "." if "." in kpart else "/"
chunk_index = [int(_) for _ in kpart.split(sep)]
if isinstance(v, (str, bytes)):
# TODO: check this early, as some refs might have been edited already
raise ValueError("Refusing to sub-chunk inlined data")
if len(v) > 1:
url, offset, size = v
else:
(url,) = v
offset = 0
size = fs.size(k)
for subpart in range(factor):
new_index = [chunk_index[0] * factor + subpart] + chunk_index[1:]
new_index = (
chunk_index[:ind]
+ [chunk_index[ind] * factor + subpart]
+ chunk_index[ind + 1 :]
)
newpart = sep.join(str(_) for _ in new_index)
newv = [url, offset + subpart * size // factor, size // factor]
store[f"{variable}/{newpart}"] = newv
Expand Down

0 comments on commit ae692fe

Please sign in to comment.