Skip to content

Commit

Permalink
Merge pull request #10 from CNES/fix/chunk_management
Browse files Browse the repository at this point in the history
  • Loading branch information
fbriol authored Jan 16, 2024
2 parents 3c3e3f6 + 2957a9a commit 087e603
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 6 deletions.
42 changes: 42 additions & 0 deletions zcollection/collection/tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,3 +1040,45 @@ def test_invalid_partitions(
with pytest.warns(RuntimeWarning, match='Invalid partition'):
zcollection.validate_partitions(fix=True)
assert zcollection.load() is not None


# pylint: disable=too-many-statements
@pytest.mark.parametrize('fs', ['local_fs', 's3_fs'])
def test_insert_with_chunks(
dask_client, # pylint: disable=redefined-outer-name,unused-argument
fs,
request,
tmpdir,
) -> None:
"""Test the insertion of a dataset."""
tested_fs = request.getfixturevalue(fs)
datasets = list(create_test_dataset(delayed=False))[:2]

ds_meta = datasets[0].metadata()
chunk_size = 5
ds_meta.chunks = [meta.Dimension(name='num_pixels', value=chunk_size)]

zcollection = collection.Collection('time',
ds_meta,
partitioning.Date(('time', ), 'M'),
str(tested_fs.collection),
filesystem=tested_fs.fs,
synchronizer=sync.ProcessSync(
str(tmpdir / 'lock.lck')))

# First insertion
zcollection.insert(datasets[0], merge_callable=merging.merge_time_series)
data = zcollection.load()

assert data is not None
assert data.variables['var1'].data.chunksize[1] == chunk_size
assert data.variables['var2'].data.chunksize[1] == chunk_size

# Insertion with merge
zcollection.insert(datasets[1], merge_callable=merging.merge_time_series)
data = zcollection.load()

assert data is not None
# Insertion properties are kept
assert data.variables['var1'].data.chunksize[1] == chunk_size
assert data.variables['var2'].data.chunksize[1] == chunk_size
14 changes: 12 additions & 2 deletions zcollection/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ def __init__(self,
#: <zcollection.variable.delayed_array.DelayedArray>` objects.
#: Otherwise, the values are :py:class:`Array
#: <zcollection.variable.array.Array>` objects.
self.variables = collections.OrderedDict(
(item.name, item) for item in variables)
self.variables: collections.OrderedDict[
str, Variable] = collections.OrderedDict(
(item.name, item) for item in variables)

#: A dictionary of dimension names and their index in the dataset
self.dimensions: DimensionType = {}
Expand Down Expand Up @@ -508,6 +509,15 @@ def set_for_insertion(self, mds: meta.Dataset) -> Dataset:
block_size_limit=mds.block_size_limit,
)

def copy_properties(self, ds: Dataset):
"""Copy chunks and block properties from provided dataset.
Args:
ds: Dataset from which to copy properties..
"""
self.chunks = ds.chunks
self.block_size_limit = ds.block_size_limit

def fill_attrs(self, mds: meta.Dataset) -> None:
"""Fill the dataset and its variables attributes using the provided
metadata.
Expand Down
13 changes: 9 additions & 4 deletions zcollection/merging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,13 @@ def perform(
**kwargs: Additional keyword arguments are passed through to the merge
callable.
"""
zds: dataset.Dataset = merge_callable(
storage.open_zarr_group(
dirname, fs, delayed=delayed), ds_inserted, axis, partitioning_dim,
**kwargs) if merge_callable is not None else ds_inserted
if merge_callable is None:
zds = ds_inserted
else:
ds = storage.open_zarr_group(dirname, fs, delayed=delayed)
# Read dataset does not contain insertion properties.
# This properties might be loss in the merge_callable depending on which
# dataset is used.
ds.copy_properties(ds=ds_inserted)
zds = merge_callable(ds, ds_inserted, axis, partitioning_dim, **kwargs)
_update_fs(dirname, zds, fs, synchronizer=synchronizer)

0 comments on commit 087e603

Please sign in to comment.