Skip to content

Commit

Permalink
[HWORKS-835][Append] Check if path is a dir on item attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
javierdlrm committed Nov 13, 2023
1 parent 4f17aad commit ecb4034
Showing 1 changed file with 68 additions and 39 deletions.
107 changes: 68 additions & 39 deletions python/hsml/engine/model_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ def _upload_additional_resources(self, model_instance):
model_instance.model_schema = None
return model_instance

def _copy_or_move_hopsfs_model_item(
self, item_attr, to_model_version_path, keep_original_files
):
"""Copy or move model item from a hdfs path to the model version folder in the Models dataset. It works with files and folders."""
path = item_attr["path"]
to_hdfs_path = os.path.join(to_model_version_path, os.path.basename(path))
if keep_original_files:
self._engine.copy(path, to_hdfs_path)
else:
self._engine.move(path, to_hdfs_path)

def _copy_or_move_hopsfs_model(
self,
from_hdfs_model_path,
Expand All @@ -102,19 +113,28 @@ def _copy_or_move_hopsfs_model(
from_hdfs_model_path = from_hdfs_model_path[projects_index:]

n_dirs, n_files = 0, 0
for entry in self._dataset_api.list(from_hdfs_model_path, sort_by="NAME:desc")[
"items"
]:
path = entry["attributes"]["path"]
_, file_name = os.path.split(path)
if keep_original_files:
self._engine.copy(path, to_model_version_path + "/" + file_name)
else:
self._engine.move(path, to_model_version_path + "/" + file_name)
if "." in path:
n_files += 1
else:
n_dirs += 1

model_path_attr = self._dataset_api.get(from_hdfs_model_path)["attributes"]
if model_path_attr.get("dir", False):
# if path is a directory, iterate of the directory content
for entry in self._dataset_api.list(
from_hdfs_model_path, sort_by="NAME:desc"
)["items"]:
path_attr = entry["attributes"]
self._copy_or_move_hopsfs_model_item(
path_attr, to_model_version_path, keep_original_files
)
if path_attr.get("dir", False):
n_dirs += 1
else:
n_files += 1
update_upload_progress(n_dirs=n_dirs, n_files=n_files)
else:
# if path is a file, copy/move it
self._copy_or_move_hopsfs_model_item(
model_path_attr, to_model_version_path, keep_original_files
)
n_files += 1
update_upload_progress(n_dirs=n_dirs, n_files=n_files)

def _download_model_from_hopsfs_recursive(
Expand All @@ -130,16 +150,12 @@ def _download_model_from_hopsfs_recursive(
for entry in self._dataset_api.list(from_hdfs_model_path, sort_by="NAME:desc")[
"items"
]:
path = entry["attributes"]["path"]
path_attr = entry["attributes"]
path = path_attr["path"]
basename = os.path.basename(path)
if "." in path:
# we assume that if a dot is contained in the path, it's the path to a file
local_file_path = os.path.join(to_local_path, basename)
self._engine.download(path, local_file_path)
n_files += 1
update_download_progress(n_dirs=n_dirs, n_files=n_files)
else:
# otherwise, it's a folder

if path_attr.get("dir", False):
# otherwise, make a recursive call for the folder
if basename == "Artifacts":
continue # skip Artifacts subfolder
local_folder_path = os.path.join(to_local_path, basename)
Expand All @@ -153,6 +169,12 @@ def _download_model_from_hopsfs_recursive(
)
n_dirs += 1
update_download_progress(n_dirs=n_dirs, n_files=n_files)
else:
# if it's a file, download it
local_file_path = os.path.join(to_local_path, basename)
self._engine.download(path, local_file_path)
n_files += 1
update_download_progress(n_dirs=n_dirs, n_files=n_files)

return n_dirs, n_files

Expand All @@ -178,23 +200,30 @@ def _upload_local_model(
):
"""Copy or upload model files from a local path to the model version folder in the Models dataset."""
n_dirs, n_files = 0, 0
for root, dirs, files in os.walk(from_local_model_path):
# os.walk(local_model_path), where local_model_path is expected to be an absolute path
# - root is the absolute path of the directory being walked
# - dirs is the list of directory names present in the root dir
# - files is the list of file names present in the root dir
# we need to replace the local path prefix with the hdfs path prefix (i.e., /srv/hops/....../root with /Projects/.../)
remote_base_path = root.replace(
from_local_model_path, to_model_version_path
)
for d_name in dirs:
self._engine.mkdir(remote_base_path + "/" + d_name)
n_dirs += 1
update_upload_progress(n_dirs, n_files)
for f_name in files:
self._engine.upload(root + "/" + f_name, remote_base_path)
n_files += 1
update_upload_progress(n_dirs, n_files)
if os.path.isdir(from_local_model_path):
# if path is a dir, upload files and folders iteratively
for root, dirs, files in os.walk(from_local_model_path):
# os.walk(local_model_path), where local_model_path is expected to be an absolute path
# - root is the absolute path of the directory being walked
# - dirs is the list of directory names present in the root dir
# - files is the list of file names present in the root dir
# we need to replace the local path prefix with the hdfs path prefix (i.e., /srv/hops/....../root with /Projects/.../)
remote_base_path = root.replace(
from_local_model_path, to_model_version_path
)
for d_name in dirs:
self._engine.mkdir(remote_base_path + "/" + d_name)
n_dirs += 1
update_upload_progress(n_dirs, n_files)
for f_name in files:
self._engine.upload(root + "/" + f_name, remote_base_path)
n_files += 1
update_upload_progress(n_dirs, n_files)
else:
# if path is a file, upload file
self._engine.upload(from_local_model_path, to_model_version_path)
n_files += 1
update_upload_progress(n_dirs, n_files)

def _save_model_from_local_or_hopsfs_mount(
self, model_instance, model_path, keep_original_files, update_upload_progress
Expand Down

0 comments on commit ecb4034

Please sign in to comment.