Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OSError: {'We encountered an internal error. Please try again.'} during finalize_target. #406

Closed
alxmrs opened this issue Aug 29, 2022 · 16 comments · Fixed by #409
Closed

Comments

@alxmrs
Copy link
Contributor

alxmrs commented Aug 29, 2022

A pretty significant error consistently occurs for datasets of a certain scale (>1 TiB) during the finalize_target step of the XarrayZarrRecipe. In the finalize step, during dimension consolidation, we try to overwrite a Zarr dimension. This triggers the underlying filesystem implementation, here gcsfs to remove all the files in that operation. During a remove of the directory, we hit an internal OS Error.

So far, I've investigated a few possible causes. I thought this could be a Beam-specific race condition (I tried to fix it here). However, when I reproduced the issue on a single worker (via running the pure python finalize_target step on a single VM), I was able to reproduce the issue.

I'll add more context to this issue later today, but this is a good-enough summary of what I'm experiencing.

Full trace
Error message from worker: Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/fsspec/mapping.py", line 167, in __delitem__
    self.fs.rm(self._key_to_str(key))
  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 111, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 96, in sync
    raise return_result
  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 53, in _runner
    result[0] = await coro
  File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 987, in _rm
    raise exs[0]
  File "/usr/local/lib/python3.8/site-packages/gcsfs/core.py", line 952, in _rm_files
    raise OSError(out)
OSError: {'We encountered an internal error. Please try again.'}

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/opt/homebrew/Caskroom/miniconda/base/envs/era5/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1843, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/executors/beam.py", line 14, in _no_arg_stage
    fun(config=config)
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py", line 659, in finalize_target
    new = group.array(
  File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 1091, in array
    return self._write_op(self._array_nosync, name, data, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 800, in _write_op
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 1097, in _array_nosync
    return array(data, store=self._store, path=path, chunk_store=self._chunk_store,
  File "/usr/local/lib/python3.8/site-packages/zarr/creation.py", line 377, in array
    z = create(**kwargs)
  File "/usr/local/lib/python3.8/site-packages/zarr/creation.py", line 161, in create
    init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor,
  File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 419, in init_array
    _init_array_metadata(store, shape=shape, chunks=chunks, dtype=dtype,
  File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 451, in _init_array_metadata
    rmdir(store, path)
  File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 181, in rmdir
    store.rmdir(path)  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/zarr/_storage/store.py", line 154, in rmdir
    _rmdir_from_keys(self, path)
  File "/usr/local/lib/python3.8/site-packages/zarr/_storage/store.py", line 387, in _rmdir_from_keys
    del store[key]
  File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 723, in __delitem__
    del self._mutable_mapping[key]
  File "/usr/local/lib/python3.8/site-packages/fsspec/mapping.py", line 169, in __delitem__
    raise KeyError
KeyError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/opt/homebrew/Caskroom/miniconda/base/envs/era5/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1843, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/executors/beam.py", line 14, in _no_arg_stage
    fun(config=config)
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py", line 659, in finalize_target
    new = group.array(
  File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 1091, in array
    return self._write_op(self._array_nosync, name, data, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 800, in _write_op
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/zarr/hierarchy.py", line 1097, in _array_nosync
    return array(data, store=self._store, path=path, chunk_store=self._chunk_store,
  File "/usr/local/lib/python3.8/site-packages/zarr/creation.py", line 377, in array
    z = create(**kwargs)
  File "/usr/local/lib/python3.8/site-packages/zarr/creation.py", line 161, in create
    init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor,
  File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 419, in init_array
    _init_array_metadata(store, shape=shape, chunks=chunks, dtype=dtype,
  File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 451, in _init_array_metadata
    rmdir(store, path)
  File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 181, in rmdir
    store.rmdir(path)  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/zarr/_storage/store.py", line 154, in rmdir
    _rmdir_from_keys(self, path)
  File "/usr/local/lib/python3.8/site-packages/zarr/_storage/store.py", line 387, in _rmdir_from_keys
    del store[key]
  File "/usr/local/lib/python3.8/site-packages/zarr/storage.py", line 723, in __delitem__
    del self._mutable_mapping[key]
  File "/usr/local/lib/python3.8/site-packages/fsspec/mapping.py", line 169, in __delitem__
    raise KeyError
RuntimeError: KeyError [while running 'Start|cache_input|Reshuffle_000|prepare_target|Reshuffle_001|store_chunk|Reshuffle_002|finalize_target|Reshuffle_003/finalize_target-ptransform-56']
@alxmrs
Copy link
Contributor Author

alxmrs commented Aug 29, 2022

CC: @rabernat

@alxmrs
Copy link
Contributor Author

alxmrs commented Aug 29, 2022

I just met with an engineer in Google Cloud. Apparently, bulk deletion of a number of files can cause problems in GCS! To perform this operation correctly, retry logic (with exponential backoff) is needed due to these kinds of failures. See these docs for example.

I'm going to try an experiment where I separate out the delete step with the write step in finalize_target, where I add in some simple retry logic, to see if I can solve this issue.

@alxmrs
Copy link
Contributor Author

alxmrs commented Aug 29, 2022

I'm happy to implement this in Pangeo-Forge for now, but I see that this could be a better fit for gcsfs -- does that track with you @martindurant?

alxmrs added a commit to alxmrs/gcsfs that referenced this issue Aug 29, 2022
I've encountered a case where large bulk deletions throws an `OSError`. The GCS documentation points out that this is a general problem (https://cloud.google.com/storage/docs/deleting-objects#delete-objects-in-bulk). I did not expect, however, that this would throw an OSError instead of a 500 error.

In any case, this PR extends `gcsfs`'s retry logic to include internal errors that throw `OSError`s. My hope is that this fixes an associated issue in Pangeo Forge Recipes (pangeo-forge/pangeo-forge-recipes#406).
@rabernat
Copy link
Contributor

Yes this seems clearly like a gcsfs-level issue to me.

At the Pangeo Forge level, this would be a perfect use case for zarr-developers/zarr-specs#154. With an iceberg-style data lake, you never actually have to delete anything. So the problem goes away.

@alxmrs
Copy link
Contributor Author

alxmrs commented Aug 31, 2022

I think there is a subtle bug somewhere in gcsfs. There is a separate API in GCS for bulk deletes -- and gcsfs supports this. However, from the trace above, this code path isn't used. Instead of deleting the directory, it deletes each file individually.

@martindurant @rabernat Any idea why that may be the case? On the Zarr storage level, it looks like if rmdir is available, it will try to use that.

Either way, I'm happy to try to fix this issue on both sides: By making bulk file removes robust (via retries) and by using the better API.

@martindurant
Copy link
Contributor

Fixing both sounds good. Your PR is probably ready to go in for gcsfs.

@alxmrs
Copy link
Contributor Author

alxmrs commented Aug 31, 2022

Ok, I know why the inefficient remove operation is chosen here. Zarr with use the rmdir op when it's available in the store (see link above). However, in this case the store is the FSMap and not the filesystem (fs). FSMap does not have a rmdir; instead it has a fs which has it.

I think the simplest place to fix this is in pangeo-forge-recipes. Does that sound right to you two?

@martindurant
Copy link
Contributor

Zarr has FSStore, which is a thin layer over fsspec, and has the rmdir implementation. That's what should be passed. If you pass a URL to zarr rather than an instantiated mapper, you will get an FSStore.

@alxmrs
Copy link
Contributor Author

alxmrs commented Aug 31, 2022

That's really useful to know! Thanks Martin!

I just noticed something though: the rmdir docs say that it will only delete the directory if it is empty. This suggests to me that this implementation path isn't the right way, since I specifically want to bulk-delete data.

@martindurant
Copy link
Contributor

It actually calls rmdir(path, recursive=True), which does delete everything.

@alxmrs
Copy link
Contributor Author

alxmrs commented Sep 1, 2022

Ah, before, I didn't understand what expand_path did (esp, with the recursive=True argument)! I thought it was individually deleting files.

@rabernat
Copy link
Contributor

rabernat commented Sep 1, 2022

The FSMap vs FSStore problem is really confusing. As a first step towards resolving it, I made zarr-developers/zarr-python#911, which allows you to create an FSStore from an already-instantiated filesystem.

There is a potential follow-up PR which could be made to zarr-python which automatically promotes an FSmap to a proper FSStore. That feature would eliminate Alex's problem. We discussed this a bit (see zarr-developers/zarr-python#911 (review)), but I didn't have the patience to figure it out. Alex, maybe I can nerd-swipe you into giving it a shot? 🙃

@alxmrs
Copy link
Contributor Author

alxmrs commented Sep 2, 2022

I'm happy to be nerd-sniped, in general. However, #409 provides a short-term fix that unblocks us. Do you think this follow-up is the best use of time, given how PGF intends to change in the near term?

@alxmrs
Copy link
Contributor Author

alxmrs commented Sep 2, 2022

Hey @rabernat: Can we tag a release with this fix (0.9.1)? Is this something that I can do?

@rabernat
Copy link
Contributor

rabernat commented Sep 2, 2022

Can we tag a release with this fix (0.9.1)? Is this something that I can do?

Yes! Once you accept the invite to join the pangeo-forge org (already expired once; just resent 😉 ), you should be able to follow the very simple release procedure.

I would recommend cleaning up the release notes a bit first.

@alxmrs
Copy link
Contributor Author

alxmrs commented Sep 4, 2022

already expired once; just resent 😉

Oh! Sorry, my Github notifications are a bit messed up. Thank you! I'll be happy to follow the process next week :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants