Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review of the ZEP2 spec - Sharding storage transformer #152

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
293 changes: 293 additions & 0 deletions docs/extensions/storage-transformers/sharding/v1.0.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
.. _sharding-storage-transformer-v1:

==========================================
Sharding storage transformer (version 1.0)
==========================================
-----------------------------
Editor's draft 18 02 2022
-----------------------------

Specification URI:
@@TODO
http://purl.org/zarr/spec/storage-transformers/sharding/1.0

Issue tracking:
`GitHub issues <https://github.com/zarr-developers/zarr-specs/labels/storage-transformers-sharding-v1.0>`-

Suggest an edit for this spec:
`GitHub editor <https://github.com/zarr-developers/zarr-specs/blob/main/docs/storage-transformers/sharding/v1.0.rst>`_

Copyright 2022-Present `Zarr core development team
<https://github.com/orgs/zarr-developers/teams/core-devs>`_. This work
is licensed under a `Creative Commons Attribution 3.0 Unported License
<https://creativecommons.org/licenses/by/3.0/>`_.

----


Abstract
========

This specification defines an implementation of the Zarr storage transformer
specification for sharding.

Sharding co-locates multiple chunks within a storage object, bundling them in
shards.


Motivation
==========

In many cases, it becomes inefficient or impractical to store a large number of
chunks in single files or objects due to the design constraints of the
underlying storage. For example, the file block size and maximum inode number
restrict the usage of numerous small files for typical file systems, also cloud
storage such as S3, GCS, and various distributed filesystems do not efficiently
handle large numbers of small files or objects.

Increasing the chunk size works only up to a certain point, as chunk sizes need
to be small for read and write efficiency requirements, for example to stream
data in browser-based visualization software.

Therefore, chunks may need to be smaller than the minimum size of one storage
key. In those cases, it is efficient to store objects at a more coarse
granularity than reading chunks.

**Sharding solves this by allowing to store multiple chunks in one storage key,
which is called a shard**:

.. image:: sharding.png


Document conventions
====================

Conformance requirements are expressed with a combination of descriptive
assertions and [RFC2119]_ terminology. The key words "MUST", "MUST NOT",
"REQUIRED", "SHALL", "SHALL NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY",
and "OPTIONAL" in the normative parts of this document are to be interpreted as
described in [RFC2119]_. However, for readability, these words do not appear in
all uppercase letters in this specification.

All of the text of this specification is normative except sections explicitly
marked as non-normative, examples, and notes. Examples in this specification are
introduced with the words "for example".


Configuration
=============

Sharding can be configured per array in the :ref:`array-metadata` as follows:

.. code-block::

{
storage_transformers: [
{
"extension": "https://purl.org/zarr/spec/storage_transformers/sharding/1.0",
"type": "indexed",
"configuration": {
"chunks_per_shard": [
2,
2
]
}
]
}

``type``

Specifies a `Binary shard format`_. In this version, the only binary format
is the ``indexed`` format.

``chunks_per_shard``

An array of integers providing the number of chunks that are combined in a
shard for each dimension of the Zarr array, where each chunk may only start
at a position that is divisible by ``chunks_per_shard`` per dimension, e.g.
starting at the zero-origin. The length of the array must match the length
of the array metadata ``shape`` entry. For example, a value ``[32, 2]``
indicates that 64 chunks are combined in one shard, 32 along the first
dimension, and for each of those 2 along the second dimension. Some valid
starting positions for a shard in the chunk-grid therefore are `[0, 0]`,
`[32, 2]`, `[32, 4]`, `[64, 2]` or `[96, 18]`.


Storage transformer implementation
==================================

Key & value transformation
--------------------------

The storage transformer specification defines the abstract interface to be the
same as the :ref:`abstract-store-interface`.

The Zarr store interface is defined as a mapping of `keys` and `values`, where a
`key` is a sequence of characters and a `value` is a sequence of bytes. A
key-value pair is called `entry` in the following part.

This sharding transformer only adapts entries where the key starts with
`data/root`, as they indicate data keys for array chunks, see
:ref:`storage-keys`. All other entries are simply passed on.

Entries starting with ``data/root`` are grouped by their common shard, assuming
storage keys from a regular chunk grid which may use a custom configured
``chunk separator``: For all entries that are part of the same shard the key is
changed to the shard-key and the values are combined in the
`Binary shard format`_ as described below. The new shard-key is the chunk key
divided by ``chunks_per_shard`` and floored per dimension. For example for
``chunks_per_shard=[32, 2]``, the chunk grid position ``[96, 18]`` (e.g. key
"data/root/foo/baz/c96/18") is transformed to the shard grid position
``[3, 9]`` and reassigned to the respective new key, honoring the original chunk
separator (e.g. "data/root/foo/baz/c3/9"). Chunk grid positions ``[96, 19]``,
``[97, 18]``, …, up to ``[127, 19]`` will also have the same shard grid position
``[3, 9]``.


Binary shard format
-------------------

The only binary format is the ``indexed`` format, as specified by the ``type``
configuration key. Other binary formats might be added in future versions.

In the indexed binary format, chunks are written successively in a shard, where
unused space between them is allowed, followed by an index referencing them. The
index is placed at the end of the file and has a size of 16 bytes multiplied by
the number of chunks in a shard, for example ``16 bytes * 64 = 1024 bytes`` for
``chunks_per_shard=[32, 2]``. The index holds an `offset, nbytes` pair of
little-endian uint64 per chunk, the chunks-order in the index is row-major (C)
order, for example for ``chunks_per_shard=[2, 2]`` an index would look like:

.. code-block::

| chunk (0, 0) | chunk (0, 1) | chunk (1, 0) | chunk (1, 1) |
| offset | nbytes | offset | nbytes | offset | nbytes | offset | nbytes |
| uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 |


Empty chunks are denoted by setting both offset and nbytes to ``2^64 - 1``. The
index always has the full shape of all possible chunks per shard, even if they
are outside of the array size.

The actual order of the chunk content is not fixed and may be chosen by the
implementation. All possible write orders are valid according to this
specification and therefore can be read by any other implementation. When
writing partial chunks into an existing shard, no specific order of the existing
chunks may be expected. Some writing strategies might be

* **Fixed order**: Specify a fixed order (e.g. row-, column-major, or Morton
order). When replacing existing chunks larger or equal-sized chunks may be
jstriebel marked this conversation as resolved.
Show resolved Hide resolved
replaced in-place, leaving unused space up to an upper limit that might
possibly be specified. Please note that, for regular-sized uncompressed data,
all chunks have the same size and can therefore be replaced in-place. > *
* **Append-only**: Any chunk to write is appended to the existing shard,
followed by an updated index. If previous chunks are updated, their storage
space becomes unused, as well as the previous index. This might be useful for
storage that only allows append-only updates.
jstriebel marked this conversation as resolved.
Show resolved Hide resolved
* **Other formats**: Other formats that accept additional bytes at the end of
the file (such as HDF) could be used for storing shards, by writing the chunks
in the order the format prescribes and appending a binary index derived from
the byte offsets and lengths at the end of the file.

Any configuration parameters for the write strategy must not be part of the
metadata document; instead they need to be configured at runtime, as this is
implementation specific.


API implementation
------------------

The section below defines a non-normative implementation of the
:ref:`abstract-store-interface` in terms of the operations of this storage
transformer as a ``StoreWithPartialAccess``. The term `underlying store`
references either the next storage transformer in the stack or the actual store
if this transformer is the last one in the stack. Any operations with keys not
starting with ``data/root`` are simply relayed to the underlying store and not
described explicitly.

* ``get_partial_values(key_ranges) -> values``: For each referenced key, request
the indices from the underlying store using ``get_partial_values``. For each
`key`, `range` pair in in `key_ranges`, check if the chunk exists by checking
if the index offset and nbytes are both ``2^64 - 1``. For existing keys,
request the actual chunks by their ranges as read from the index using
``get_partial_values``. This operation should be implemented using two
``get_partial_values`` operations on the underlying store, one for retrieving
the indices and one for retrieving existing chunks.

* ``set_partial_values(key_start_values)``: For each referenced key, check if
all available chunks in a shard are referenced. In this case, a shard can be
constructed according to the `Binary shard format`_ directly. For all other
keys, request the indices from the underlying store using
``get_partial_values``. All chunks that are not updated completely and exist
according to the index (index offset and nbytes are both ``2^64 - 1``) need to
be read via ``get_partial_values`` from the underlying store. For
simplification purposes a shard may also be read completely, combining the
previous two `get` operations into one. Based on the existing chunks and value
ranges that need to be updated new shards are constructed according to the
`Binary shard format`_. All shards that need to be updated must now be set via
``set`` or ``set_partial_values(key_start_values)``, depending on the chosen
writing strategy provided by the implementation. Specialized store
implementations that allow appending to a storage object may only need to read
the index to update it.

* ``erase_values(keys)``: For each referenced key, check if all available chunks
in a shard are referenced. In this case, the full shard is removed using
``erase_values`` on the underlying store. For all other keys, request the
indices from the underlying store using ``get_partial_values``. Update the
index using an offset and nbytes of ``2^64 - 1`` to mark missing chunks. The
updated index may be written in-place using
``set_partial_values(key_start_values)``, or a larger rewrite of the shard may
be done including the index update, but also removing value ranges
corresponding to the erased chunks.

* ``erase_prefix()``: If the prefix contains a part of the chunk-grid key, this
part is translated to the referenced shard and contained chunks. For affected
shards where all contained chunks are erased the prefix is rewritten to the
corresponding shard key and the operation is relayed to the underlying store.
For all shards where only some chunks are erased the affected chunks are
removed by invoking the operation ``erase_values`` on this storage transformer
with the respective chunk keys.

* ``list()``: See ``list_prefix`` with the prefix ``/``.

* ``list_prefix(prefix)``: If the prefix contains a part of the chunk-grid key,
this part is translated to the referenced shard and contained chunks. Then,
``list_prefix`` is called on the underlying store with the translated prefix.
For all listed shards request the indices from the underlying store using
``get_partial_values``. Existing chunks, where the index offset or nbytes are
not ``2^64 - 1`` are then listed by their original key.
jstriebel marked this conversation as resolved.
Show resolved Hide resolved

* ``list_dir(prefix)``: If the prefix contains a part of the chunk-grid key,
this part is translated to the referenced shard and contained chunks. Then,
``list_dir`` is called on the underlying store with the translated prefix. For
all *retrieved prefixes* (not full keys) with partial shard keys, the
corresponding original prefixes covering all possible chunks in the shard are
listed, which might include non-existing prefixes. For *retrieved full keys*
the indices from the underlying store are requested using
``get_partial_values``. Existing chunks, where the index offset or nbytes are
not ``2^64 - 1`` are then listed by their original key.

.. note::

For performance reasons, one might also implement ``list_prefix`` and
``list_dir`` without any index lookups, always assuming that all possible
chunks in a shard are available. Alternatively, for many use-cases,
implementations of these operations might also be omitted with sharding,
since array access does not use them. Global operations like copying can
operate directly on the underlying store, not on the sharded transformed
store.


References
==========

.. [RFC2119] S. Bradner. Key words for use in RFCs to Indicate
Requirement Levels. March 1997. Best Current Practice. URL:
https://tools.ietf.org/html/rfc2119


Change log
==========

This section is a placeholder for keeping a log of the snapshots of this
document that are tagged in GitHub and what changed between them.