Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error saving TensorFlowModelDataset to Azure Blob Storage #839

Closed
gitgud5000 opened this issue Sep 17, 2024 · 3 comments
Closed

Error saving TensorFlowModelDataset to Azure Blob Storage #839

gitgud5000 opened this issue Sep 17, 2024 · 3 comments

Comments

@gitgud5000
Copy link
Contributor

gitgud5000 commented Sep 17, 2024

Description

I get the following error when trying to save a TensorFlowModelDataset to Azure Blob Storage. The issue occurs only in Azure Blob Storage, not in the local filesystem:

DatasetError: Cannot save versioned dataset 'model_prestamos.keras' to 
'azureml/modelling/reprecios/data/06_models/run_2024-09-09' because a file with the same name already exists in the
directory. This is likely because versioning was enabled on a dataset already saved previously. Either remove 
'model_prestamos.keras' from the directory or manually convert it into a versioned dataset by placing it in a 
versioned directory (e.g. with default versioning format 
'azureml/modelling/reprecios/data/06_models/run_2024-09-09/model_prestamos.keras/YYYY-MM-DDThh.mm.ss.sssZ/model_pre
stamos.keras').

The error indicates that versioning is enabled despite the versioned: False setting in the dataset catalog. The file is never created or exists at any point in the Azure Blob Storage.

Context

Dataset Catalog Definition

_azure_base_path: 'abfs://azureml/modelling/reprecios/data'
"{namespace}.model":
  type: ${_datasets.tensormodel}
  credentials: azure_credentials
  versioned: False
  filepath: ${_azure_base_path}/06_models/run_${runtime_params:run_date}/model_{namespace}.keras
  save_args:
    overwrite: True 

Steps to Reproduce

Edit: Included code below to reproduce. (credit to @merelcht, @astrojuanlu)

from kedro_datasets.tensorflow import TensorFlowModelDataset
import tensorflow as tf
import numpy as np

credentials = {
    'account_name': '',
    'account_key': ''
}

data_set_local = TensorFlowModelDataset(filepath="data/tensorflow_model.keras")
data_set = TensorFlowModelDataset(filepath="abfs://azureml/modelling/data/tensorflow_model.keras",
                                  credentials=credentials)

inputs = tf.keras.Input(shape=(3,))
x = tf.keras.layers.Dense(4, activation=tf.nn.relu)(inputs)
outputs = tf.keras.layers.Dense(5, activation=tf.nn.softmax)(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)

input_data = np.array([[0.5, 0.3, 0.2]])
predictions = model.predict(input_data)

# This works
data_set_local.save(model)

# This Fails
try:
    data_set.save(model)
except Exception as e:
    print("🥲", e)
  1. Define a TensorFlowModelDataset in the Kedro catalog with the configuration above 👆
  2. Attempt to save a model using the .save() method targeting Azure Blob Storage.

I suspect this issue might be related to the following issue: kedro-plugins/issues/359, as the behavior appears to be similar to what is described there.

Environment

  • Kedro=0.19.8
  • kedro-datasets=4.1.0
  • TensorFlow=2.17.0
  • fsspec=2024.9.0
  • adlfs=2024.7.0
  • OS=Ubuntu 20.04

Full Error Traceback

Shorten for readability

INFO     Saving data to prestamos.model (TensorFlowModelDataset)...         [data_catalog.py]
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/core.py:727 in save               │
│                                                                                                  │
│   724 │   │   │   │   self.resolve_save_version()                                                │
│   725 │   │   │   )  # Make sure last save version is set                                        │
│   726 │   │   │   try:                                                                           │
│ ❱ 727 │   │   │   │   super()._save_wrapper(save_func)(self, data)                               │
│   728 │   │   │   except (FileNotFoundError, NotADirectoryError) as err:                         │
│   729 │   │   │   │   # FileNotFoundError raised in Win, NotADirectoryError raised in Unix       │
│   730 │   │   │   │   _default_version = "YYYY-MM-DDThh.mm.ss.sssZ"                              │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/core.py:249 in save               │
│                                                                                                  │
│   246 │   │   │                                                                                  │
│   247 │   │   │   try:                                                                           │
│   248 │   │   │   │   self._logger.debug("Saving %s", str(self))                                 │
│ ❱ 249 │   │   │   │   save_func(self, data)                                                      │
│   250 │   │   │   except (DatasetError, FileNotFoundError, NotADirectoryError):                  │
│   251 │   │   │   │   raise                                                                      │
│   252 │   │   │   except Exception as exc:                                                       │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro_datasets/tensorflow/tensorflow_model │
│ _dataset.py:176 in _save                                                                         │
│                                                                                                  │
│   173 │   │   │                                                                                  │
│   174 │   │   │   # Use fsspec to take from local tempfile directory/file and                    │
│   175 │   │   │   # put in ArbitraryFileSystem                                                   │
│ ❱ 176 │   │   │   self._fs.copy(path, save_path)                                                 │
│   177 │                                                                                          │
│   178def _exists(self) -> bool:                                                             │
│   179 │   │   try:                                                                               │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:118 in wrapper              │
│                                                                                                  │
│    115 │   @functools.wraps(func)                                                                │
│    116def wrapper(*args, **kwargs):                                                         │
│    117 │   │   self = obj or args[0]                                                             │
│ ❱  118 │   │   return sync(self.loop, func, *args, **kwargs)                                     │
│    119 │                                                                                         │
│    120return wrapper                                                                        │
│    121                                                                                           │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:103 in sync                 │
│                                                                                                  │
│    100 │   │   # suppress asyncio.TimeoutError, raise FSTimeoutError                             │
│    101 │   │   raise FSTimeoutError from return_result                                           │
│    102elif isinstance(return_result, BaseException):                                        │
│ ❱  103 │   │   raise return_result                                                               │
│    104else:                                                                                 │
│    105 │   │   return return_result                                                              │
│    106                                                                                           │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:56 in _runner               │
│                                                                                                  │
│     53if timeout is not None:                                                               │
│     54 │   │   coro = asyncio.wait_for(coro, timeout=timeout)                                    │
│     55try:                                                                                  │
│ ❱   56 │   │   result[0] = await coro                                                            │
│     57except Exception as ex:                                                               │
│     58 │   │   result[0] = ex                                                                    │
│     59finally:                                                                              │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:369 in _copy                │
│                                                                                                  │
│    366 │   │   │   paths2 = path2                                                                │
│    367 │   │   else:                                                                             │
│    368 │   │   │   source_is_str = isinstance(path1, str)                                        │
│ ❱  369 │   │   │   paths1 = await self._expand_path(                                             │
│    370 │   │   │   │   path1, maxdepth=maxdepth, recursive=recursive                             │
│    371 │   │   │   )                                                                             │
│    372 │   │   │   if source_is_str and (not recursive or maxdepth is not None):                 │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/adlfs/spec.py:1595 in _expand_path         │
│                                                                                                  │
│   1592 │   │   │   if not path.endswith("*"):                                                    │
│   1593 │   │   │   │   path = f"{path.strip('/')}"                                               │
│   1594 │   │   if isinstance(path, str):                                                         │
│ ❱ 1595 │   │   │   out = await self._expand_path(                                                │
│   1596 │   │   │   │   [path],                                                                   │
│   1597 │   │   │   │   recursive,                                                                │
│   1598 │   │   │   │   maxdepth,                                                                 │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/adlfs/spec.py:1640 in _expand_path         │
│                                                                                                  │
│   1637 │   │   │   │   │   out.add(fullpath)                                                     │
│   1638 │   │                                                                                     │
│   1639 │   │   if not out:                                                                       │
│ ❱ 1640 │   │   │   raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)        │
│   1641 │   │   return list(sorted(out))                                                          │
│   1642 │                                                                                         │
│   1643async def _put_file(                                                                  │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
FileNotFoundError: [Errno 2] No such file or directory: 
['tmp/kedro_tensorflow_tmpqic1m6vw/tmp_tensorflow_model.keras']

The above exception was the direct cause of the following exception:

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮

│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/runner/runner.py:530 in              │
│ _run_node_sequential                                                                             │
│                                                                                                  │
│   527 │                                                                                          │
│   528for name, data in items:                                                               │
│   529 │   │   hook_manager.hook.before_dataset_saved(dataset_name=name, data=data, node=node)    │
│ ❱ 530 │   │   catalog.save(name, data)                                                           │
│   531 │   │   hook_manager.hook.after_dataset_saved(dataset_name=name, data=data, node=node)     │
│   532return node                                                                            │
│   533                                                                                            │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/data_catalog.py:588 in save       │
│                                                                                                  │
│   585 │   │   │   extra={"markup": True},                                                        │
│   586 │   │   )                                                                                  │
│   587 │   │                                                                                      │
│ ❱ 588 │   │   dataset.save(data)                                                                 │
│   589 │                                                                                          │
│   590def exists(self, name: str) -> bool:                                                   │
│   591 │   │   """Checks whether registered data set exists by calling its `exists()`             │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/core.py:731 in save               │
│                                                                                                  │
│   728 │   │   │   except (FileNotFoundError, NotADirectoryError) as err:                         │
│   729 │   │   │   │   # FileNotFoundError raised in Win, NotADirectoryError raised in Unix       │
│   730 │   │   │   │   _default_version = "YYYY-MM-DDThh.mm.ss.sssZ"                              │
│ ❱ 731 │   │   │   │   raise DatasetError(                                                        │
│   732 │   │   │   │   │   f"Cannot save versioned dataset '{self._filepath.name}' to "           │
│   733 │   │   │   │   │   f"'{self._filepath.parent.as_posix()}' because a file with the same    │734 │   │   │   │   │   f"name already exists in the directory. This is likely because "       │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
@gitgud5000
Copy link
Contributor Author

gitgud5000 commented Sep 22, 2024

I've identified the issue within the _save method of TensorFlowModelDataset. Specifically, at line 172, it calls .copy() method. ( PR #608)

tf.keras.models.save_model(data, path, **self._save_args)
# Use fsspec to take from local tempfile directory/file and
# put in ArbitraryFileSystem
self._fs.copy(path, save_path)

According to the fsspec documentation, the .copy() method is designed for copying files between two remote locations. However, in this case, since we're copying from a local fs to ABS, the correct method would be .put() rather than .copy().

Determine if the target is remote and switching to .put() accordingly should resolve the issue!

edit: something similar should be done with the _load method as the appropriate method in this case would be .get()

@gitgud5000
Copy link
Contributor Author

Ran some test and it seems is not necessary to use .copy() at all. .get() and put() will work with local-to-local copying as well.

@gitgud5000
Copy link
Contributor Author

Closing this issue as it was addressed in PR #844

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant