Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider tensorstore as backend #111

Open
ErlendHaa opened this issue Oct 17, 2022 · 5 comments
Open

Consider tensorstore as backend #111

ErlendHaa opened this issue Oct 17, 2022 · 5 comments
Labels
enhancement New feature or request performance Performance

Comments

@ErlendHaa
Copy link

Hi,

I know that google's tensorstore is rather fresh and not officially supported by google (hopefully it will be), but I thought I'd make an issue / future feature request on it anyway as it offer significant performance improvements over the other backends.

I did some measurement that might be of interest

Cube dimensions: 6400x3200x1500
Environment: Cloud 2 cloud - client running in same datacenter (Azure) as where the data is stored.
Fetching a depth slice:

[mdio - zarr] Read slice of size 77.51423645019531 MB in 231.33055152895395 s
[mdio - dask] Read slice of size 77.51423645019531 MB in 16.46115321794059 s
[tensorstore] Read slice of size 77.51423645019531 MB in 5.266401566041168 s

For reference, here is the tensorstore script that I used to read the underlying zarr-array from a mdio-file:

import time
import tensorstore as ts


def fetch():
    dataset = ts.open({
        'driver': 'zarr',
        'kvstore' : {
            'driver': 'http',
            'base_url': 'https://account.blob.core.windows.net?<sas>',
            'path': 'somefile.mdio/data/chunked_012'
        }
    }).result()

    start = time.perf_counter()
    zslice = dataset[:,:,200]
    data = zslice.read().result()

    print(f'[tensorstore] Read slice of size {data.nbytes / (1024 * 1024)} MB in {time.perf_counter() - start} s')


if __name__ == '__main__':
    fetch()
@tasansal
Copy link
Collaborator

tasansal commented Oct 17, 2022

@ErlendHaa, Thanks a lot for for the benchmarks! I have been watching tensorstore closely and it certainly is an option to make that a backend.

It also has very good support for fancy indexing whereas the zarr backend isn't that great.

The only challenge is, it reimplements a lot of core zarr + numcodecs + dask functionality which would limit some use cases and rapid experimentation (since everything is c++). But I see no issue having that backend for reading / writing with default blosc compression.

Can you elaborate on the Dask configuration? It would be awesome if you could also run the same benchmark by generating a Dask client like below before you open and read. That may be a more apples to apples with tensorstore because it uses its own threading primitive etc.

from distributed import client
client = Client()

Reading from the cloud usually scales up by multiprocessing in python so a dask client always helps. And the threads speed up decompression and/or parallel fetching.

@tasansal tasansal added enhancement New feature or request performance Performance labels Oct 17, 2022
@ErlendHaa
Copy link
Author

Yes, of course. I should have included the dask config in the first place. It took quite a bit of fiddling with the cluster setup too squeeze out more performance. And I have a feeling that the most optimal cluster depends a bit on the query, but I havn't really measured much.

For tensorstore I get an IO throughput of ~2GB/s which is about half of my link. Dask, on the other hand (with the below configuration) max out at about 700MB/s, while with zarr can't muster more than 100MB/s, with periods of 10MB/s and even no I/O. This suggest to me that it's only tensorstore that bottlenecks on decompression while for the others it's the concurrency / threading implementations that is the bottleneck.

I also made an uncompressed mdio file for comparison, as we've seen from other projects that with high IO throughput it cab be faster to not compress. On that file tensorstore manages to utilise my entire link. The others still doesn't.

import time
import numpy as np

from mdio import MDIOReader
from dask.distributed import Client, LocalCluster

def fetch(backend):
    mdio = MDIOReader(
        'abfs://somefile.mdio',
        backend=backend,
        return_metadata=True,
        new_chunks=(512, 512, 512),
        storage_options={
            'account_name': 'account',
            'credential': '<sas>'
        }

    )
    start_time = time.perf_counter()

    _, _, zslice = mdio[:,:,200]
    if backend == 'dask':
        _ = zslice.compute()

    print(f'[mdio - {backend}] Read slice of size {zslice.size * 4 / (1024 * 1024)} MB in {time.perf_counter() - start_time} s')


if __name__ == '__main__':
    cluster = LocalCluster(n_workers=4, threads_per_worker=16, processes=True)
    client = Client(cluster)

    fetch('zarr')
    fetch('dask')

    client.close()
    cluster.close()

If I run it without explicitly defining a cluster config (i.e. client = Client()) I get these numbers:

[mdio - dask] Read slice of size 77.51423645019531 MB in 42.75259730510879 s

@tasansal
Copy link
Collaborator

tasansal commented Oct 17, 2022

That is a very interesting result, we can typically saturate a 40gbps link with a 48 vCPU machine.

(assuming dataset is chunked (128, 128, 128))
Since you are fetching time slices, can you try new_chunks=(256, 256, None) and n_workers=24 and threads_per_worker=2, which will make Dask only read multiple chunks spatially? This should force each worker to read 4 chunks (32MB) and it would have a total of ~72 chunks to read for a full time slice.

With 512x512 spatial chunks, you end up with 18 virtual chunks to read, so it won't saturate all the cores.

If it still doesn't scale up, maybe it is an adlfs issue? I will try to reproduce your results when I have some time. tensorstore seems to have its own http based key / value store driver. It may be faster than adlfs, but big may :)

@ErlendHaa
Copy link
Author

Ah, ok! I'm running on a potato compared to you. I have a 4 vCPU machine 😄 My thinking is that since tensorstore manages to saturate my link, then I shouldn't really need to scale up cpu's, which are expensive running on a cloud provider.

(assuming dataset is chunked (128, 128, 128))
Since you are fetching time slices, can you try new_chunks=(256, 256, None) and n_workers=24 and threads_per_worker=2, which will make Dask only read multiple chunks spatially? This should force each worker to read 4 chunks (32MB) and it would have a total of ~72 chunks to read for a full time slice.

I've played with different configurations, and as you point I've yet to find the ideal one. But I think my main concern is that these dask clusters require knowledge about 1) the current hardware 2) the current dataset, and 3) the desired access patter (e.g. optimising for reading in a specific direction) in order to get good performance. Which means that it's not really a plug-and-play solution for your average geophysicist that wants fast access to his seismic data. This is where tensorstore shines imo, as it seam to perform very well without any configuration

@tasansal
Copy link
Collaborator

Very fair, if it performs much better with default configuration we could replace the Zarr backend with it (unless it has some critical missing feature). However, tensorstore doesn't support ZFP, we would have to work with them to add it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Performance
Projects
None yet
Development

No branches or pull requests

2 participants