Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure sibling files in toil-wdl-runner #4610

Merged
merged 5 commits into from
Oct 16, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 22 additions & 38 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,15 @@ def recursive_dependencies(root: WDL.Tree.WorkflowNode) -> Set[str]:

TOIL_URI_SCHEME = 'toilfile:'

def pack_toil_uri(file_id: FileID, file_dir: str, file_basename: str) -> str:
def pack_toil_uri(file_id: FileID, dir_id: uuid.UUID, file_basename: str) -> str:
"""
Encode a Toil file ID and its source path in a URI that starts with the scheme in TOIL_URI_SCHEME.
"""

# We urlencode everything, including any slashes. We need to use a slash to
# set off the actual filename, so the WDL standard library basename
# function works correctly.
return f"{TOIL_URI_SCHEME}{quote(file_id.pack(), safe='')}/{quote(file_dir)}/{quote(file_basename, safe='')}"
return f"{TOIL_URI_SCHEME}{quote(file_id.pack(), safe='')}/{quote(str(dir_id))}/{quote(file_basename, safe='')}"

def unpack_toil_uri(toil_uri: str) -> Tuple[FileID, str, str]:
"""
Expand Down Expand Up @@ -431,7 +431,7 @@ class ToilWDLStdLibBase(WDL.StdLib.Base):
"""
Standard library implementation for WDL as run on Toil.
"""
def __init__(self, file_store: AbstractFileStore, stdlib_id: Optional[Any] = None, execution_dir: Optional[str] = None):
def __init__(self, file_store: AbstractFileStore, execution_dir: Optional[str] = None):
"""
Set up the standard library.
"""
Expand All @@ -450,7 +450,7 @@ def __init__(self, file_store: AbstractFileStore, stdlib_id: Optional[Any] = Non
self._file_store = file_store

# UUID to differentiate which node files are virtualized from
self.stdlib_id = stdlib_id or uuid.uuid4()
self._parent_dir_to_ids: Dict[str, uuid.UUID] = dict()

self._execution_dir = execution_dir

Expand All @@ -463,16 +463,6 @@ def _is_url(self, filename: str, schemes: List[str] = ['http:', 'https:', 's3:',
return True
return False

def _is_uuid(self, id: str) -> bool:
"""
Test if a string is a valid UUID
"""
try:
uuid.UUID(id)
return True
except ValueError:
return False

@memoize
def _devirtualize_filename(self, filename: str) -> str:
"""
Expand All @@ -488,18 +478,13 @@ def _devirtualize_filename(self, filename: str) -> str:
file_id, parent_id, file_basename = unpack_toil_uri(filename)

# Decide where it should be put
if self._is_uuid(parent_id):
# This is a URI with the "parent" UUID attached to the filename
# Use UUID as folder name rather than a new temp folder to reduce internal clutter
if not os.path.exists(parent_id):
os.mkdir(parent_id)
# Put the UUID in the destination path in order for tasks to see where to put files depending on their parents
dest_path = os.path.join(self._file_store.localTempDir, parent_id, file_basename)
else:
# In case a file was not virtualized with a UUID
# But this shouldn't happen
file_tmp_dir = self._file_store.getLocalTempDir()
dest_path = os.path.join(file_tmp_dir, file_basename)
# This is a URI with the "parent" UUID attached to the filename
# Use UUID as folder name rather than a new temp folder to reduce internal clutter
dir_path = os.path.join(self._file_store.localTempDir, parent_id)
if not os.path.exists(parent_id):
os.mkdir(dir_path)
# Put the UUID in the destination path in order for tasks to see where to put files depending on their parents
dest_path = os.path.join(dir_path, file_basename)

# And get a local path to the file
result = self._file_store.readGlobalFile(file_id, dest_path)
Expand Down Expand Up @@ -531,8 +516,6 @@ def _virtualize_filename(self, filename: str) -> str:
from a local path in write_dir, 'virtualize' into the filename as it should present in a
File value
"""


if self._is_url(filename):
# Already virtual
logger.debug('Already virtualized %s as WDL file %s', filename, filename)
Expand All @@ -546,7 +529,9 @@ def _virtualize_filename(self, filename: str) -> str:
file_id = self._file_store.writeGlobalFile(os.path.join(self._execution_dir, filename))
else:
file_id = self._file_store.writeGlobalFile(filename)
result = pack_toil_uri(file_id, str(self.stdlib_id), os.path.basename(filename))
dir = os.path.dirname(os.path.abspath(filename)) # is filename always an abspath?
parent_id = self._parent_dir_to_ids.setdefault(dir, uuid.uuid4())
result = pack_toil_uri(file_id, parent_id, os.path.basename(filename))
logger.debug('Virtualized %s as WDL file %s', filename, result)
return result

Expand All @@ -559,13 +544,13 @@ class ToilWDLStdLibTaskCommand(ToilWDLStdLibBase):
are host-side paths.
"""

def __init__(self, file_store: AbstractFileStore, container: TaskContainer, stdlib_id: Optional[Any] = None):
def __init__(self, file_store: AbstractFileStore, container: TaskContainer):
"""
Set up the standard library for the task command section.
"""

# TODO: Don't we want to make sure we don't actually use the file store?
super().__init__(file_store, stdlib_id)
super().__init__(file_store)
self.container = container

@memoize
Expand Down Expand Up @@ -614,7 +599,7 @@ class ToilWDLStdLibTaskOutputs(ToilWDLStdLibBase, WDL.StdLib.TaskOutputs):
functions only allowed in task output sections.
"""

def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path: str, current_directory_override: Optional[str] = None, stdlib_id: Optional[Any] = None):
def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path: str, current_directory_override: Optional[str] = None):
"""
Set up the standard library for a task output section. Needs to know
where standard output and error from the task have been stored.
Expand All @@ -625,7 +610,7 @@ def __init__(self, file_store: AbstractFileStore, stdout_path: str, stderr_path:

# Just set up as ToilWDLStdLibBase, but it will call into
# WDL.StdLib.TaskOutputs next.
super().__init__(file_store, stdlib_id)
super().__init__(file_store)

# Remember task putput files
self._stdout_path = stdout_path
Expand Down Expand Up @@ -919,7 +904,7 @@ def import_file_from_uri(uri: str) -> str:
# Pack a UUID of the parent directory
dir_id = path_to_id.setdefault(os.path.dirname(candidate_uri), uuid.uuid4())

return pack_toil_uri(imported, str(dir_id), file_basename)
return pack_toil_uri(imported, dir_id, file_basename)

# If we get here we tried all the candidates
raise RuntimeError(f"Could not find {uri} at any of: {tried}")
Expand Down Expand Up @@ -1235,8 +1220,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
bindings = combine_bindings(unwrap_all(self._prev_node_results))
# Set up the WDL standard library
# UUID to use for virtualizing files
stdlib_id = uuid.uuid4()
standard_library = ToilWDLStdLibBase(file_store, stdlib_id=stdlib_id)
standard_library = ToilWDLStdLibBase(file_store)

if self._task.inputs:
logger.debug("Evaluating task inputs")
Expand Down Expand Up @@ -1501,7 +1485,7 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]:
contained_bindings = map_over_files_in_bindings(bindings, lambda path: task_container.input_path_map[path])

# Make a new standard library for evaluating the command specifically, which only deals with in-container paths and out-of-container paths.
command_library = ToilWDLStdLibTaskCommand(file_store, task_container, stdlib_id)
command_library = ToilWDLStdLibTaskCommand(file_store, task_container)

# Work out the command string, and unwrap it
command_string: str = evaluate_named_expression(self._task, "command", WDL.Type.String(), self._task.command, contained_bindings, command_library).coerce(WDL.Type.String()).value
Expand Down Expand Up @@ -1548,7 +1532,7 @@ def patched_run_invocation(*args: Any, **kwargs: Any) -> List[str]:
# container-determined strings that are absolute paths to WDL File
# objects, and like MiniWDL we can say we only support
# working-directory-based relative paths for globs.
outputs_library = ToilWDLStdLibTaskOutputs(file_store, host_stdout_txt, host_stderr_txt, current_directory_override=workdir_in_container, stdlib_id=stdlib_id)
outputs_library = ToilWDLStdLibTaskOutputs(file_store, host_stdout_txt, host_stderr_txt, current_directory_override=workdir_in_container)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stxue1 If we don't pass the directory to ID mapping between the different standard library instantiations, is it possible to get two different standard library instances to upload two sibling files, but with different source directory UUIDs? Maybe by abusing the string-to-File logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think string-to-file logic will probably not support sibling files since the standard library instances might be different. The directory to ID mappings would probably have to be passed around in order to support this.

output_bindings = evaluate_output_decls(self._task.outputs, bindings, outputs_library)

# Drop any files from the output which don't actually exist
Expand Down