Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1140 - List Files #1626

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions idmtools_core/idmtools/assets/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dataclasses import dataclass, field, InitVar
from io import BytesIO
from logging import getLogger, DEBUG
from pathlib import PurePosixPath
from pathlib import PurePosixPath, Path
from typing import TypeVar, Union, List, Callable, Any, Optional, Generator, BinaryIO
import backoff
import requests
Expand Down Expand Up @@ -361,28 +361,30 @@ def __write_download_generator_to_stream(self, stream: BinaryIO, progress: bool
if progress:
gen.close()

def download_to_path(self, dest: str, force: bool = False):
def download_to_path(self, dest: Union[str, Path], overwrite: bool = False, include_relative_path: bool = False):
"""
Download an asset to path. This requires loadings the object through the platform.

Args:
dest: Path to write to. If it is a directory, the asset filename will be added to it
force: Force download even if file exists
overwrite: Force download even if file exists
include_relative_path: When target is a directory, when choosing filename, should we use
{dest}/{asset.filename} or {dest}/{asset.short_remote_path()}

Returns:
None
"""
if os.path.isdir(dest):
path = os.path.join(dest, self.short_remote_path())
path = path.replace("\\", os.path.sep)
os.makedirs(os.path.dirname(path), exist_ok=True)
path = Path(dest)
if path.is_dir() or (isinstance(dest, str) and dest.endswith("/")):
path = path.joinpath(self.short_remote_path() if include_relative_path else self.filename)
path.parent.mkdir(exist_ok=True, parents=True)
else:
path = dest
path = Path(dest)

if not os.path.exists(path) or force:
if not os.path.exists(path) or overwrite:
if logger.isEnabledFor(DEBUG):
logger.debug(f"Download {self.filename} to {path}")
with open(path, 'wb') as out:
if logger.isEnabledFor(DEBUG):
logger.debug(f"Download {self.filename} to {path}")
self.__write_download_generator_to_stream(out)

def calculate_checksum(self) -> str:
Expand Down
21 changes: 19 additions & 2 deletions idmtools_core/idmtools/assets/asset_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from idmtools.core.interfaces.iitem import IItem
from idmtools.utils.entities import get_default_tags
from idmtools.utils.file import scan_directory
from idmtools.utils.filters.asset_filters import default_asset_file_filter
from idmtools.utils.filters.asset_filters import default_asset_file_filter, TFILE_FILTER_TYPE
from idmtools.utils.info import get_doc_base_url

IGNORE_DIRECTORIES = ['.git', '.svn', '.venv', '.idea', '.Rproj.user', '$RECYCLE.BIN', '__pycache__']
Expand Down Expand Up @@ -83,7 +83,7 @@ def from_id(cls, item_id: Union[str, UUID], platform: 'IPlatform' = None, as_cop
return AssetCollection(item) if as_copy else item

@classmethod
def from_directory(cls, assets_directory: str, recursive: bool = True, flatten: bool = False,
def from_directory(cls, assets_directory: Union[str, PathLike], recursive: bool = True, flatten: bool = False,
filters: 'TAssetFilterList' = None, filters_mode: FilterMode = FilterMode.OR, # noqa: F821
relative_path: str = None) -> 'TAssetCollection':
"""
Expand Down Expand Up @@ -511,5 +511,22 @@ def add_tags(self, tags: Dict[str, Any]):
"""
self.tags.update(tags)

def list_assets(self, platform: 'IPlatform' = None, filters: TFILE_FILTER_TYPE = None, **kwargs) -> List['Asset']:
"""
List assets for an asset collection.

Args:
platform: Platform
filters: Filters to use
**kwargs: Additional arguments

Returns:
List of filtered assets
"""
if self.id is None:
raise ValueError("You can only list static assets on an existing Asset Collections")
p = super()._check_for_platform_from_context(platform)
return p.list_assets(self, filters=filters, **kwargs)


TAssetCollection = TypeVar("TAssetCollection", bound=AssetCollection)
57 changes: 54 additions & 3 deletions idmtools_core/idmtools/entities/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""
import copy
import uuid
import warnings
from dataclasses import dataclass, field, InitVar, fields
from logging import getLogger, DEBUG
from types import GeneratorType
Expand All @@ -31,6 +32,7 @@
from idmtools.registry.plugin_specification import get_description_impl
from idmtools.utils.collections import ExperimentParentIterator
from idmtools.utils.entities import get_default_tags
from idmtools.utils.filters.asset_filters import TFILE_FILTER_TYPE

if TYPE_CHECKING: # pragma: no cover
from idmtools.entities.iplatform import IPlatform
Expand Down Expand Up @@ -79,6 +81,15 @@ class Experiment(IAssetsEnabled, INamedEntity, IRunnableEntity):
# simulations
__replace_task_with_proxy: bool = field(default=True, init=False, compare=False)

def __hash__(self):
"""
Hash of simulation(id).

Returns:
Hash of simulation
"""
return id(self.uid)

def __post_init__(self, simulations):
"""
Initialize Experiment.
Expand Down Expand Up @@ -485,22 +496,62 @@ def __deepcopy__(self, memo):
result._task_log = getLogger(__name__)
return result

def list_static_assets(self, children: bool = False, platform: 'IPlatform' = None, **kwargs) -> List[Asset]:
def list_assets(self, children: bool = False, platform: 'IPlatform' = None, filters: TFILE_FILTER_TYPE = None, **kwargs) -> List[Asset]:
"""
List assets that have been uploaded to a server already.
List assets(shared files) for the experiment.

Args:
children: When set to true, simulation assets will be loaded as well
platform: Optional platform to load assets list from
filters: Filters to apply. These should be a function that takes a str and return true or false
**kwargs:
Returns:
List of assets
"""
if self.id is None:
raise ValueError("You can only list static assets on an existing experiment")
p = super()._check_for_platform_from_context(platform)
return p.list_assets(self, children=children, filters=filters, **kwargs)

def list_files(self, children: bool = False, platform: 'IPlatform' = None, filters: TFILE_FILTER_TYPE = None, **kwargs) -> List[Asset]:
"""
List files for item.

Args:
children: When set to true, simulation assets will be loaded as well
platform: Optional platform to load assets list from
filters: Filters to apply. These should be a function that takes a str and return true or false
**kwargs:
Returns:
List of assets
"""
if self.id is None:
raise ValueError("You can only list static assets on an existing experiment")
p = super()._check_for_platform_from_context(platform)
return p._experiments.list_assets(self, children, **kwargs)
return p.list_files(self, children=children, filters=filters, **kwargs)

def list_children_files(self, platform: 'IPlatform' = None, filters: TFILE_FILTER_TYPE = None, **kwargs) -> Dict['Simulation', List[Asset]]:
"""
List Children Files.

Args:
platform: Optional platform to load assets list from
filters: Filters to apply. These should be a function that takes a str and return true or false
**kwargs:
Returns:
Dictionary of Simulation -> List of Assets
"""
if self.id is None:
raise ValueError("You can only list static assets on an existing experiment")
p = super()._check_for_platform_from_context(platform)
return p.list_children_files(self, filters=filters, **kwargs)

def list_static_assets(self, children: bool = False, platform: 'IPlatform' = None, filters: TFILE_FILTER_TYPE = None, **kwargs) -> List[Asset]:
"""
Soon to be deprecated method in favor of list_assets.
"""
warnings.warn("list_static_assets will be removed in 1.7.0. Use list_assets instead")
return self.list_assets(children=children, platform=platform, filters=filters, **kwargs)

def run(self, wait_until_done: bool = False, platform: 'IPlatform' = None, regather_common_assets: bool = None,
wait_on_done_progress: bool = True, wait_on_done: bool = False,
Expand Down
94 changes: 88 additions & 6 deletions idmtools_core/idmtools/entities/iplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pathlib import PureWindowsPath, PurePath
from itertools import groupby
from logging import getLogger, DEBUG
from typing import Dict, List, NoReturn, Type, TypeVar, Any, Union, Tuple, Set, Iterator, Callable, Optional
from typing import Dict, List, NoReturn, Type, TypeVar, Any, Union, Tuple, Set, Iterator, Callable, Optional, TYPE_CHECKING
from uuid import UUID
from idmtools import IdmConfigParser
from idmtools.core import CacheEnabled, UnknownItemException, EntityContainer, UnsupportedPlatformType
Expand All @@ -40,14 +40,19 @@
from idmtools.assets.asset_collection import AssetCollection
from idmtools.services.platforms import PlatformPersistService
from idmtools.utils.entities import validate_user_inputs_against_dataclass
from idmtools.utils.filters.asset_filters import TFILE_FILTER_TYPE, normalize_filters

if TYPE_CHECKING:
from idmtools.assets.asset import Asset

logger = getLogger(__name__)
user_logger = getLogger('user')

CALLER_LIST = ['_create_from_block', # create platform through Platform Factory
'fetch', # create platform through un-pickle
'get', # create platform through platform spec' get method
'__newobj__', # create platform through copy.deepcopy
'_main'] # create platform through analyzer manager
CALLER_LIST = ['_create_from_block', # create platform through Platform Factory
'fetch', # create platform through un-pickle
'get', # create platform through platform spec' get method
'__newobj__', # create platform through copy.deepcopy
'_main'] # create platform through analyzer manager

# Maps an object type to a platform interface object. We use strings to use getattr. This also let's us also reduce
# all the if else crud
Expand All @@ -65,6 +70,11 @@
Suite: ItemType.SUITE
}

# Returns for different child file lists
TSIMULATION_CHILDREN_FILES = Dict[Simulation, List['Asset']]
TEXPERIMENT_CHILDREN_FILES = Dict[Experiment, TSIMULATION_CHILDREN_FILES]
TSUITE_CHILDREN_FILES = Dict[Suite, TEXPERIMENT_CHILDREN_FILES]


@dataclass(repr=False)
class IPlatform(IItem, CacheEnabled, metaclass=ABCMeta):
Expand Down Expand Up @@ -822,6 +832,78 @@ def wait_till_done_progress(self, item: IRunnableEntity, timeout: int = 60 * 60
refresh_interval
)

def list_assets(self, item: Union[Experiment, IWorkflowItem, Suite, Simulation, AssetCollection], filters: TFILE_FILTER_TYPE = None, **kwargs) -> List['Asset']:
"""
List assets(shared files) for the item.

Args:
item: Item to list shared files for
filters: Filters to apply. These should be a function that takes a str and return true or false
**kwargs:

Returns:
List of assets
"""
if item.item_type not in self.platform_type_map.values():
raise UnsupportedPlatformType("The provided type is invalid or not supported by this platform...")

if item.item_type not in (ItemType.SIMULATION, ItemType.WORKFLOW_ITEM, ItemType.SUITE, ItemType.EXPERIMENT, ItemType.ASSETCOLLECTION):
raise ValueError("Only Simulation, Workflow Items, Suites, and Experiments can list files")

filters = normalize_filters(filters)
interface = ITEM_TYPE_TO_OBJECT_INTERFACE[item.item_type]
ret = getattr(self, interface).list_assets(item, filters=filters, **kwargs)
return ret

def list_files(self, item: Union[Experiment, IWorkflowItem, Suite, Simulation], filters: TFILE_FILTER_TYPE = None, **kwargs) -> List['Asset']:
"""
List files for an item.

Args:
item: Item to list files for.
filters: Filters to apply. These should be a function that takes a str and return true or false
**kwargs:

Returns:
List of files for item
"""
if item.item_type not in self.platform_type_map.values():
raise UnsupportedPlatformType("The provided type is invalid or not supported by this platform...")

if item.item_type not in (ItemType.SIMULATION, ItemType.WORKFLOW_ITEM, ItemType.SUITE, ItemType.EXPERIMENT):
raise ValueError("Only Simulation, Workflow Items, Suites, and Experiments can list files")
filters = normalize_filters(filters)
interface = ITEM_TYPE_TO_OBJECT_INTERFACE[item.item_type]
ret = getattr(self, interface).list_files(item, filters=filters, **kwargs)
return ret

def list_children_files(self, item: Union[Experiment, Suite], filters: TFILE_FILTER_TYPE = None, **kwargs) -> Union[TEXPERIMENT_CHILDREN_FILES, TSIMULATION_CHILDREN_FILES]:
"""
Dict Children files for an item.

The return depends on item type.

For experiments, the return is in the form of Dict[Simulation, List[Asset]]
For suites, the return is in the form of Dict[Experiment, Dict[Simulation, List[Asset]]]

Args:
item: Item to list files for
filters: Filters to apply. These should be a function that takes a str and return true or false
**kwargs:

Returns:
hierarchical dictionary of files
"""
if item.item_type not in self.platform_type_map.values():
raise UnsupportedPlatformType("The provided type is invalid or not supported by this platform...")

if item.item_type not in (ItemType.SUITE, ItemType.EXPERIMENT):
raise ValueError("Only Suites and Experiments can list children files")
filters = normalize_filters(filters)
interface = ITEM_TYPE_TO_OBJECT_INTERFACE[item.item_type]
ret = getattr(self, interface).list_children_files(item, filters=filters, **kwargs)
return ret

def get_related_items(self, item: IWorkflowItem, relation_type: RelationType) -> Dict[str, Dict[str, str]]:
"""
Retrieve all related objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
from idmtools.core import CacheEnabled
from idmtools.entities.iplatform_ops.utils import batch_create_items
from idmtools.registry.functions import FunctionPluginManager
from idmtools.utils.filters.asset_filters import TFILE_FILTER_TYPE

if TYPE_CHECKING: # pragma: no cover
from idmtools.entities.iplatform import IPlatform
from idmtools.assets.asset import Asset
logger = getLogger(__name__)


Expand Down Expand Up @@ -59,7 +61,7 @@ def post_create(self, asset_collection: AssetCollection, **kwargs) -> NoReturn:
logger.debug("Calling post_creation")
asset_collection.post_creation(self.platform)

def create(self, asset_collection: AssetCollection, do_pre: bool = True, do_post: bool = True, **kwargs) -> Any:
def create(self, asset_collection: AssetCollection, do_pre: bool = True, do_post: bool = True, **kwargs) -> AssetCollection:
"""
Creates an AssetCollection from an IDMTools AssetCollection object.

Expand All @@ -72,22 +74,22 @@ def create(self, asset_collection: AssetCollection, do_pre: bool = True, do_post
**kwargs: Optional arguments mainly for extensibility

Returns:
Created platform item and the UUID of said item
Created AssetCollection
"""
if asset_collection.status is not None:
return asset_collection._platform_object
return asset_collection
if do_pre:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling pre_create")
self.pre_create(asset_collection, **kwargs)
if logger.isEnabledFor(DEBUG):
logger.debug("Calling platform_create")
ret = self.platform_create(asset_collection, **kwargs)
asset_collection._platform_object = self.platform_create(asset_collection, **kwargs)
if do_post:
if logger.isEnabledFor(DEBUG):
logger.debug("Calling post_create")
self.post_create(asset_collection, **kwargs)
return ret
return asset_collection

@abstractmethod
def platform_create(self, asset_collection: AssetCollection, **kwargs) -> Any:
Expand Down Expand Up @@ -144,3 +146,35 @@ def to_entity(self, asset_collection: Any, **kwargs) -> AssetCollection:
IDMTools suite object
"""
return asset_collection

def platform_list_assets(self, asset_collection: AssetCollection, filters: TFILE_FILTER_TYPE = None, **kwargs) -> List['Asset']:
"""
List the assets on an asset collection.

Args:
asset_collection: Asset collection to list.
filters: Filters to apply. These should be a function that takes a str and return true or false
**kwargs: Extra Arguments

Returns:
List of Assets
"""
return []

def list_assets(self, asset_collection: AssetCollection, filters: TFILE_FILTER_TYPE = None, **kwargs):
"""
List the assets for an asset collection.

Args:
asset_collection: Asset collection to list
filters: Filters to apply. These should be a function that takes a str and return true or false
**kwargs:

Returns:
List of assets

Notes:
At the moment, we just call platform list assets. In the future, we can add extra functionality here if needed
to apply to all platforms.
"""
return self.platform_list_assets(asset_collection, filters=filters, **kwargs)
Loading