From 93d8130bd01707adbbd5b856d8f7c658bdf5e614 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 6 Nov 2024 14:35:35 -0500 Subject: [PATCH] Don't leave behind job store if CWL options don't parse, and add more debugging to tests I can't replicate --- src/toil/cwl/cwltoil.py | 332 ++++++++++++++++--------------- src/toil/test/utils/utilsTest.py | 24 ++- src/toil/utils/toilStatus.py | 16 +- 3 files changed, 199 insertions(+), 173 deletions(-) diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 1f2521b7a1..34e77a8149 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -90,7 +90,7 @@ from toil.batchSystems.abstractBatchSystem import InsufficientSystemResources from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM -from toil.common import Toil, addOptions +from toil.common import Config, Toil, addOptions from toil.cwl import check_cwltool_version from toil.lib.integration import resolve_workflow from toil.lib.misc import call_command @@ -3997,182 +3997,190 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: runtime_context.research_obj = research_obj try: - with Toil(options) as toil: - if options.restart: - outobj = toil.restart() - else: - # Before showing the options to any cwltool stuff that wants to - # load the workflow, transform options.cwltool, where our - # argument for what to run is, to handle Dockstore workflows. - options.cwltool = resolve_workflow(options.cwltool) - - # TODO: why are we doing this? Does this get applied to all - # tools as a default or something? - loading_context.hints = [ - { - "class": "ResourceRequirement", - "coresMin": toil.config.defaultCores, - # Don't include any RAM requirement because we want to - # know when tools don't manually ask for RAM. - "outdirMin": toil.config.defaultDisk / (2**20), - "tmpdirMin": 0, - } - ] - loading_context.construct_tool_object = toil_make_tool - loading_context.strict = not options.not_strict - options.workflow = options.cwltool - options.job_order = options.cwljob - try: - uri, tool_file_uri = cwltool.load_tool.resolve_tool_uri( - options.cwltool, - loading_context.resolver, - loading_context.fetcher_constructor, - ) - except ValidationException: - print( - "\nYou may be getting this error because your arguments are incorrect or out of order." - + usage_message, - file=sys.stderr, - ) - raise + if not options.restart: + # Make a version of the config based on the initial options, for + # setting up CWL option stuff + expected_config = Config() + expected_config.setOptions(options) - # Attempt to prepull the containers - if not options.no_prepull: - if not options.enable_ext: - # The CWL utils parser does not support cwltool extensions and will crash if encountered, so don't prepull if extensions are enabled - # See https://github.com/common-workflow-language/cwl-utils/issues/309 - try_prepull(uri, runtime_context, toil.config.batchSystem) - else: - logger.debug( - "Not prepulling containers as cwltool extensions are not supported." - ) + # Before showing the options to any cwltool stuff that wants to + # load the workflow, transform options.cwltool, where our + # argument for what to run is, to handle Dockstore workflows. + options.cwltool = resolve_workflow(options.cwltool) - options.tool_help = None - options.debug = options.logLevel == "DEBUG" - job_order_object, options.basedir, jobloader = ( - cwltool.main.load_job_order( - options, - sys.stdin, - loading_context.fetcher_constructor, - loading_context.overrides_list, - tool_file_uri, - ) + # TODO: why are we doing this? Does this get applied to all + # tools as a default or something? + loading_context.hints = [ + { + "class": "ResourceRequirement", + "coresMin": expected_config.defaultCores, + # Don't include any RAM requirement because we want to + # know when tools don't manually ask for RAM. + "outdirMin": expected_config.defaultDisk / (2**20), + "tmpdirMin": 0, + } + ] + loading_context.construct_tool_object = toil_make_tool + loading_context.strict = not options.not_strict + options.workflow = options.cwltool + options.job_order = options.cwljob + + try: + uri, tool_file_uri = cwltool.load_tool.resolve_tool_uri( + options.cwltool, + loading_context.resolver, + loading_context.fetcher_constructor, ) - if options.overrides: - loading_context.overrides_list.extend( - cwltool.load_tool.load_overrides( - schema_salad.ref_resolver.file_uri( - os.path.abspath(options.overrides) - ), - tool_file_uri, - ) + except ValidationException: + print( + "\nYou may be getting this error because your arguments are incorrect or out of order." + + usage_message, + file=sys.stderr, + ) + raise + + # Attempt to prepull the containers + if not options.no_prepull: + if not options.enable_ext: + # The CWL utils parser does not support cwltool extensions and will crash if encountered, so don't prepull if extensions are enabled + # See https://github.com/common-workflow-language/cwl-utils/issues/309 + try_prepull(uri, runtime_context, expected_config.batchSystem) + else: + logger.debug( + "Not prepulling containers as cwltool extensions are not supported." ) - loading_context, workflowobj, uri = cwltool.load_tool.fetch_document( - uri, loading_context + options.tool_help = None + options.debug = options.logLevel == "DEBUG" + job_order_object, options.basedir, jobloader = ( + cwltool.main.load_job_order( + options, + sys.stdin, + loading_context.fetcher_constructor, + loading_context.overrides_list, + tool_file_uri, ) - loading_context, uri = cwltool.load_tool.resolve_and_validate_document( - loading_context, workflowobj, uri + ) + if options.overrides: + loading_context.overrides_list.extend( + cwltool.load_tool.load_overrides( + schema_salad.ref_resolver.file_uri( + os.path.abspath(options.overrides) + ), + tool_file_uri, + ) ) - if not loading_context.loader: - raise RuntimeError("cwltool loader is not set.") - processobj, metadata = loading_context.loader.resolve_ref(uri) - processobj = cast(Union[CommentedMap, CommentedSeq], processobj) - document_loader = loading_context.loader + loading_context, workflowobj, uri = cwltool.load_tool.fetch_document( + uri, loading_context + ) + loading_context, uri = cwltool.load_tool.resolve_and_validate_document( + loading_context, workflowobj, uri + ) + if not loading_context.loader: + raise RuntimeError("cwltool loader is not set.") + processobj, metadata = loading_context.loader.resolve_ref(uri) + processobj = cast(Union[CommentedMap, CommentedSeq], processobj) - if options.provenance and runtime_context.research_obj: - cwltool.cwlprov.writablebagfile.packed_workflow( - runtime_context.research_obj, - cwltool.main.print_pack(loading_context, uri), - ) + document_loader = loading_context.loader - try: - tool = cwltool.load_tool.make_tool(uri, loading_context) - scan_for_unsupported_requirements( - tool, bypass_file_store=options.bypass_file_store - ) - except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err: - logging.error(err) - return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE - runtime_context.secret_store = SecretStore() + if options.provenance and runtime_context.research_obj: + cwltool.cwlprov.writablebagfile.packed_workflow( + runtime_context.research_obj, + cwltool.main.print_pack(loading_context, uri), + ) - try: - # Get the "order" for the execution of the root job. CWLTool - # doesn't document this much, but this is an "order" in the - # sense of a "specification" for running a single job. It - # describes the inputs to the workflow. - initialized_job_order = cwltool.main.init_job_order( - job_order_object, - options, - tool, - jobloader, - sys.stdout, - make_fs_access=runtime_context.make_fs_access, - input_basedir=options.basedir, - secret_store=runtime_context.secret_store, - input_required=True, - ) - except SystemExit as err: - if err.code == 2: # raised by argparse's parse_args() function - print( - "\nIf both a CWL file and an input object (YAML/JSON) file were " - "provided, this may be the argument order." + usage_message, - file=sys.stderr, - ) - raise + try: + tool = cwltool.load_tool.make_tool(uri, loading_context) + scan_for_unsupported_requirements( + tool, bypass_file_store=options.bypass_file_store + ) + except CWL_UNSUPPORTED_REQUIREMENT_EXCEPTION as err: + logging.error(err) + return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE + runtime_context.secret_store = SecretStore() - # Leave the defaults un-filled in the top-level order. The tool or - # workflow will fill them when it runs - - for inp in tool.tool["inputs"]: - if ( - shortname(inp["id"]) in initialized_job_order - and inp["type"] == "File" - ): - cast( - CWLObjectType, initialized_job_order[shortname(inp["id"])] - )["streamable"] = inp.get("streamable", False) - # TODO also for nested types that contain streamable Files - - runtime_context.use_container = not options.no_container - runtime_context.tmp_outdir_prefix = os.path.realpath(tmp_outdir_prefix) - runtime_context.job_script_provider = job_script_provider - runtime_context.force_docker_pull = options.force_docker_pull - runtime_context.no_match_user = options.no_match_user - runtime_context.no_read_only = options.no_read_only - runtime_context.basedir = options.basedir - if not options.bypass_file_store: - # If we're using the file store we need to start moving output - # files now. - runtime_context.move_outputs = "move" - - # We instantiate an early builder object here to populate indirect - # secondaryFile references using cwltool's library because we need - # to resolve them before toil imports them into the filestore. - # A second builder will be built in the job's run method when toil - # actually starts the cwl job. - # Note that this accesses input files for tools, so the - # ToilFsAccess needs to be set up if we want to be able to use - # URLs. - builder = tool._init_job(initialized_job_order, runtime_context) - - # make sure this doesn't add listing items; if shallow_listing is - # selected, it will discover dirs one deep and then again later on - # (probably when the cwltool builder gets ahold of the job in the - # CWL job's run()), producing 2+ deep listings instead of only 1. - builder.loadListing = "no_listing" - - builder.bind_input( - tool.inputs_record_schema, - initialized_job_order, - discover_secondaryFiles=True, + try: + # Get the "order" for the execution of the root job. CWLTool + # doesn't document this much, but this is an "order" in the + # sense of a "specification" for running a single job. It + # describes the inputs to the workflow. + initialized_job_order = cwltool.main.init_job_order( + job_order_object, + options, + tool, + jobloader, + sys.stdout, + make_fs_access=runtime_context.make_fs_access, + input_basedir=options.basedir, + secret_store=runtime_context.secret_store, + input_required=True, ) + except SystemExit as err: + if err.code == 2: # raised by argparse's parse_args() function + print( + "\nIf both a CWL file and an input object (YAML/JSON) file were " + "provided, this may be the argument order." + usage_message, + file=sys.stderr, + ) + raise + + # Leave the defaults un-filled in the top-level order. The tool or + # workflow will fill them when it runs - logger.info("Creating root job") - logger.debug("Root tool: %s", tool) - tool = remove_pickle_problems(tool) + for inp in tool.tool["inputs"]: + if ( + shortname(inp["id"]) in initialized_job_order + and inp["type"] == "File" + ): + cast( + CWLObjectType, initialized_job_order[shortname(inp["id"])] + )["streamable"] = inp.get("streamable", False) + # TODO also for nested types that contain streamable Files + + runtime_context.use_container = not options.no_container + runtime_context.tmp_outdir_prefix = os.path.realpath(tmp_outdir_prefix) + runtime_context.job_script_provider = job_script_provider + runtime_context.force_docker_pull = options.force_docker_pull + runtime_context.no_match_user = options.no_match_user + runtime_context.no_read_only = options.no_read_only + runtime_context.basedir = options.basedir + if not options.bypass_file_store: + # If we're using the file store we need to start moving output + # files now. + runtime_context.move_outputs = "move" + + # We instantiate an early builder object here to populate indirect + # secondaryFile references using cwltool's library because we need + # to resolve them before toil imports them into the filestore. + # A second builder will be built in the job's run method when toil + # actually starts the cwl job. + # Note that this accesses input files for tools, so the + # ToilFsAccess needs to be set up if we want to be able to use + # URLs. + builder = tool._init_job(initialized_job_order, runtime_context) + + # make sure this doesn't add listing items; if shallow_listing is + # selected, it will discover dirs one deep and then again later on + # (probably when the cwltool builder gets ahold of the job in the + # CWL job's run()), producing 2+ deep listings instead of only 1. + builder.loadListing = "no_listing" + + builder.bind_input( + tool.inputs_record_schema, + initialized_job_order, + discover_secondaryFiles=True, + ) + + logger.info("Creating root job") + logger.debug("Root tool: %s", tool) + tool = remove_pickle_problems(tool) + + with Toil(options) as toil: + if options.restart: + outobj = toil.restart() + else: try: wf1 = makeRootJob( tool=tool, diff --git a/src/toil/test/utils/utilsTest.py b/src/toil/test/utils/utilsTest.py index ab831a72be..1f16f50da6 100644 --- a/src/toil/test/utils/utilsTest.py +++ b/src/toil/test/utils/utilsTest.py @@ -379,14 +379,22 @@ def testMultipleJobsPerWorkerStats(self): ) def check_status(self, status, status_fn, seconds=20): - i = 0.0 - while status_fn(self.toilDir) != status: + time_elapsed = 0.0 + current_status = status_fn(self.toilDir) + while current_status != status: + logger.debug( + "Workflow is %s; waiting for %s (%s/%s elapsed)", + current_status, + status, + time_elapsed, + seconds + ) time.sleep(0.5) - i += 0.5 - if i > seconds: - s = status_fn(self.toilDir) + time_elapsed += 0.5 + current_status = status_fn(self.toilDir) + if time_elapsed > seconds: self.assertEqual( - s, + current_status, status, f"Waited {seconds} seconds without status reaching {status}; stuck at {s}", ) @@ -422,6 +430,7 @@ def testGetStatusFailedCWLWF(self): # --badWorker is set to force failure. cmd = [ "toil-cwl-runner", + "--logDebug", "--jobStore", self.toilDir, "--clean=never", @@ -432,6 +441,7 @@ def testGetStatusFailedCWLWF(self): "src/toil/test/cwl/whale.txt", f"--outdir={self.tempDir}", ] + logger.info("Run command: %s", " ".join(cmd)) wf = subprocess.Popen(cmd) self.check_status("RUNNING", status_fn=ToilStatus.getStatus, seconds=60) wf.wait() @@ -464,6 +474,7 @@ def testPrintJobLog(self, mock_print): # Run a workflow that will always fail cmd = [ "toil-cwl-runner", + "--logDebug", "--jobStore", self.toilDir, "--clean=never", @@ -471,6 +482,7 @@ def testPrintJobLog(self, mock_print): "--message", "Testing", ] + logger.info("Run command: %s", " ".join(cmd)) wf = subprocess.Popen(cmd) wf.wait() # print log and check output diff --git a/src/toil/utils/toilStatus.py b/src/toil/utils/toilStatus.py index 9fc4da97cc..cd3e43c91d 100644 --- a/src/toil/utils/toilStatus.py +++ b/src/toil/utils/toilStatus.py @@ -34,11 +34,17 @@ def __init__(self, jobStoreName: str, specifiedJobs: Optional[list[str]] = None) self.jobStore = Toil.resumeJobStore(jobStoreName) if specifiedJobs is None: - rootJob = self.fetchRootJob() - logger.info( - "Traversing the job graph gathering jobs. This may take a couple of minutes." - ) - self.jobsToReport = self.traverseJobGraph(rootJob) + try: + rootJob = self.fetchRootJob() + logger.info( + "Traversing the job graph gathering jobs. This may take a couple of minutes." + ) + self.jobsToReport = self.traverseJobGraph(rootJob) + except JobException: + # Root job isn't set. + logger.warning("Workflow does not have a root job (yet? anymore?). Cannot look for jobs.") + self.jobsToReport = [] + else: self.jobsToReport = self.fetchUserJobs(specifiedJobs)