Skip to content

Commit

Permalink
Don't leave behind job store if CWL options don't parse, and add more…
Browse files Browse the repository at this point in the history
… debugging to tests I can't replicate
  • Loading branch information
adamnovak committed Nov 6, 2024
1 parent d26b6d7 commit 93d8130
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 173 deletions.
332 changes: 170 additions & 162 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@

from toil.batchSystems.abstractBatchSystem import InsufficientSystemResources
from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM
from toil.common import Toil, addOptions
from toil.common import Config, Toil, addOptions
from toil.cwl import check_cwltool_version
from toil.lib.integration import resolve_workflow
from toil.lib.misc import call_command
Expand Down Expand Up @@ -3997,182 +3997,190 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:
runtime_context.research_obj = research_obj

try:
with Toil(options) as toil:
if options.restart:
outobj = toil.restart()
else:
# Before showing the options to any cwltool stuff that wants to
# load the workflow, transform options.cwltool, where our
# argument for what to run is, to handle Dockstore workflows.
options.cwltool = resolve_workflow(options.cwltool)

# TODO: why are we doing this? Does this get applied to all
# tools as a default or something?
loading_context.hints = [
{
"class": "ResourceRequirement",
"coresMin": toil.config.defaultCores,
# Don't include any RAM requirement because we want to
# know when tools don't manually ask for RAM.
"outdirMin": toil.config.defaultDisk / (2**20),
"tmpdirMin": 0,
}
]
loading_context.construct_tool_object = toil_make_tool
loading_context.strict = not options.not_strict
options.workflow = options.cwltool
options.job_order = options.cwljob

try:
uri, tool_file_uri = cwltool.load_tool.resolve_tool_uri(
options.cwltool,
loading_context.resolver,
loading_context.fetcher_constructor,
)
except ValidationException:
print(
"\nYou may be getting this error because your arguments are incorrect or out of order."
+ usage_message,
file=sys.stderr,
)
raise
if not options.restart:
# Make a version of the config based on the initial options, for
# setting up CWL option stuff
expected_config = Config()
expected_config.setOptions(options)

# Attempt to prepull the containers
if not options.no_prepull:
if not options.enable_ext:
# The CWL utils parser does not support cwltool extensions and will crash if encountered, so don't prepull if extensions are enabled
# See https://github.com/common-workflow-language/cwl-utils/issues/309
try_prepull(uri, runtime_context, toil.config.batchSystem)
else:
logger.debug(
"Not prepulling containers as cwltool extensions are not supported."
)
# Before showing the options to any cwltool stuff that wants to
# load the workflow, transform options.cwltool, where our
# argument for what to run is, to handle Dockstore workflows.
options.cwltool = resolve_workflow(options.cwltool)

options.tool_help = None
options.debug = options.logLevel == "DEBUG"
job_order_object, options.basedir, jobloader = (
cwltool.main.load_job_order(
options,
sys.stdin,
loading_context.fetcher_constructor,
loading_context.overrides_list,
tool_file_uri,
)
# TODO: why are we doing this? Does this get applied to all
# tools as a default or something?
loading_context.hints = [
{
"class": "ResourceRequirement",
"coresMin": expected_config.defaultCores,
# Don't include any RAM requirement because we want to
# know when tools don't manually ask for RAM.
"outdirMin": expected_config.defaultDisk / (2**20),
"tmpdirMin": 0,
}
]
loading_context.construct_tool_object = toil_make_tool
loading_context.strict = not options.not_strict
options.workflow = options.cwltool
options.job_order = options.cwljob

try:
uri, tool_file_uri = cwltool.load_tool.resolve_tool_uri(
options.cwltool,
loading_context.resolver,
loading_context.fetcher_constructor,
)
if options.overrides:
loading_context.overrides_list.extend(
cwltool.load_tool.load_overrides(
schema_salad.ref_resolver.file_uri(
os.path.abspath(options.overrides)
),
tool_file_uri,
)
except ValidationException:
print(
"\nYou may be getting this error because your arguments are incorrect or out of order."
+ usage_message,
file=sys.stderr,
)
raise

# Attempt to prepull the containers
if not options.no_prepull:
if not options.enable_ext:
# The CWL utils parser does not support cwltool extensions and will crash if encountered, so don't prepull if extensions are enabled
# See https://github.com/common-workflow-language/cwl-utils/issues/309
try_prepull(uri, runtime_context, expected_config.batchSystem)
else:
logger.debug(
"Not prepulling containers as cwltool extensions are not supported."
)

loading_context, workflowobj, uri = cwltool.load_tool.fetch_document(
uri, loading_context
options.tool_help = None
options.debug = options.logLevel == "DEBUG"
job_order_object, options.basedir, jobloader = (
cwltool.main.load_job_order(
options,
sys.stdin,
loading_context.fetcher_constructor,
loading_context.overrides_list,
tool_file_uri,
)
loading_context, uri = cwltool.load_tool.resolve_and_validate_document(
loading_context, workflowobj, uri
)
if options.overrides:
loading_context.overrides_list.extend(
cwltool.load_tool.load_overrides(
schema_salad.ref_resolver.file_uri(
os.path.abspath(options.overrides)
),
tool_file_uri,
)
)
if not loading_context.loader:
raise RuntimeError("cwltool loader is not set.")
processobj, metadata = loading_context.loader.resolve_ref(uri)
processobj = cast(Union[CommentedMap, CommentedSeq], processobj)

document_loader = loading_context.loader
loading_context, workflowobj, uri = cwltool.load_tool.fetch_document(
uri, loading_context
)
loading_context, uri = cwltool.load_tool.resolve_and_validate_document(
loading_context, workflowobj, uri
)
if not loading_context.loader:
raise RuntimeError("cwltool loader is not set.")
processobj, metadata = loading_context.loader.resolve_ref(uri)
processobj = cast(Union[CommentedMap, CommentedSeq], processobj)

if options.provenance and runtime_context.research_obj:
cwltool.cwlprov.writablebagfile.packed_workflow(
runtime_context.research_obj,
cwltool.main.print_pack(loading_context, uri),
)
document_loader = loading_context.loader

try:
tool = cwltool.load_tool.make_tool(uri, loading_context)
scan_for_unsupported_requirements(
tool, bypass_file_store=options.bypass_file_store
)
except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err:
logging.error(err)
return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE
runtime_context.secret_store = SecretStore()
if options.provenance and runtime_context.research_obj:
cwltool.cwlprov.writablebagfile.packed_workflow(
runtime_context.research_obj,
cwltool.main.print_pack(loading_context, uri),
)

try:
# Get the "order" for the execution of the root job. CWLTool
# doesn't document this much, but this is an "order" in the
# sense of a "specification" for running a single job. It
# describes the inputs to the workflow.
initialized_job_order = cwltool.main.init_job_order(
job_order_object,
options,
tool,
jobloader,
sys.stdout,
make_fs_access=runtime_context.make_fs_access,
input_basedir=options.basedir,
secret_store=runtime_context.secret_store,
input_required=True,
)
except SystemExit as err:
if err.code == 2: # raised by argparse's parse_args() function
print(
"\nIf both a CWL file and an input object (YAML/JSON) file were "
"provided, this may be the argument order." + usage_message,
file=sys.stderr,
)
raise
try:
tool = cwltool.load_tool.make_tool(uri, loading_context)
scan_for_unsupported_requirements(
tool, bypass_file_store=options.bypass_file_store
)
except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err:
logging.error(err)
return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE
runtime_context.secret_store = SecretStore()

# Leave the defaults un-filled in the top-level order. The tool or
# workflow will fill them when it runs

for inp in tool.tool["inputs"]:
if (
shortname(inp["id"]) in initialized_job_order
and inp["type"] == "File"
):
cast(
CWLObjectType, initialized_job_order[shortname(inp["id"])]
)["streamable"] = inp.get("streamable", False)
# TODO also for nested types that contain streamable Files

runtime_context.use_container = not options.no_container
runtime_context.tmp_outdir_prefix = os.path.realpath(tmp_outdir_prefix)
runtime_context.job_script_provider = job_script_provider
runtime_context.force_docker_pull = options.force_docker_pull
runtime_context.no_match_user = options.no_match_user
runtime_context.no_read_only = options.no_read_only
runtime_context.basedir = options.basedir
if not options.bypass_file_store:
# If we're using the file store we need to start moving output
# files now.
runtime_context.move_outputs = "move"

# We instantiate an early builder object here to populate indirect
# secondaryFile references using cwltool's library because we need
# to resolve them before toil imports them into the filestore.
# A second builder will be built in the job's run method when toil
# actually starts the cwl job.
# Note that this accesses input files for tools, so the
# ToilFsAccess needs to be set up if we want to be able to use
# URLs.
builder = tool._init_job(initialized_job_order, runtime_context)

# make sure this doesn't add listing items; if shallow_listing is
# selected, it will discover dirs one deep and then again later on
# (probably when the cwltool builder gets ahold of the job in the
# CWL job's run()), producing 2+ deep listings instead of only 1.
builder.loadListing = "no_listing"

builder.bind_input(
tool.inputs_record_schema,
initialized_job_order,
discover_secondaryFiles=True,
try:
# Get the "order" for the execution of the root job. CWLTool
# doesn't document this much, but this is an "order" in the
# sense of a "specification" for running a single job. It
# describes the inputs to the workflow.
initialized_job_order = cwltool.main.init_job_order(
job_order_object,
options,
tool,
jobloader,
sys.stdout,
make_fs_access=runtime_context.make_fs_access,
input_basedir=options.basedir,
secret_store=runtime_context.secret_store,
input_required=True,
)
except SystemExit as err:
if err.code == 2: # raised by argparse's parse_args() function
print(
"\nIf both a CWL file and an input object (YAML/JSON) file were "
"provided, this may be the argument order." + usage_message,
file=sys.stderr,
)
raise

# Leave the defaults un-filled in the top-level order. The tool or
# workflow will fill them when it runs

logger.info("Creating root job")
logger.debug("Root tool: %s", tool)
tool = remove_pickle_problems(tool)
for inp in tool.tool["inputs"]:
if (
shortname(inp["id"]) in initialized_job_order
and inp["type"] == "File"
):
cast(
CWLObjectType, initialized_job_order[shortname(inp["id"])]
)["streamable"] = inp.get("streamable", False)
# TODO also for nested types that contain streamable Files

runtime_context.use_container = not options.no_container
runtime_context.tmp_outdir_prefix = os.path.realpath(tmp_outdir_prefix)
runtime_context.job_script_provider = job_script_provider
runtime_context.force_docker_pull = options.force_docker_pull
runtime_context.no_match_user = options.no_match_user
runtime_context.no_read_only = options.no_read_only
runtime_context.basedir = options.basedir
if not options.bypass_file_store:
# If we're using the file store we need to start moving output
# files now.
runtime_context.move_outputs = "move"

# We instantiate an early builder object here to populate indirect
# secondaryFile references using cwltool's library because we need
# to resolve them before toil imports them into the filestore.
# A second builder will be built in the job's run method when toil
# actually starts the cwl job.
# Note that this accesses input files for tools, so the
# ToilFsAccess needs to be set up if we want to be able to use
# URLs.
builder = tool._init_job(initialized_job_order, runtime_context)

# make sure this doesn't add listing items; if shallow_listing is
# selected, it will discover dirs one deep and then again later on
# (probably when the cwltool builder gets ahold of the job in the
# CWL job's run()), producing 2+ deep listings instead of only 1.
builder.loadListing = "no_listing"

builder.bind_input(
tool.inputs_record_schema,
initialized_job_order,
discover_secondaryFiles=True,
)

logger.info("Creating root job")
logger.debug("Root tool: %s", tool)
tool = remove_pickle_problems(tool)

with Toil(options) as toil:
if options.restart:
outobj = toil.restart()
else:
try:
wf1 = makeRootJob(
tool=tool,
Expand Down
Loading

0 comments on commit 93d8130

Please sign in to comment.