From 9499236149147647669b703e5e2bd4f22eb35599 Mon Sep 17 00:00:00 2001 From: Dean Roehrich Date: Thu, 7 Sep 2023 16:31:18 -0500 Subject: [PATCH 1/6] Adjust slurm_bb_get_status() 2-arg API vs 4-arg API (#42) Adjust slurm_bb_get_status() to work with both the older 2-arg API and the newer 4-arg API that comes in with slurm 23.02.4.1. Signed-off-by: Dean Roehrich --- src/burst_buffer/burst_buffer.lua | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/burst_buffer/burst_buffer.lua b/src/burst_buffer/burst_buffer.lua index 492102a..e088c0a 100644 --- a/src/burst_buffer/burst_buffer.lua +++ b/src/burst_buffer/burst_buffer.lua @@ -844,10 +844,20 @@ function slurm_bb_get_status(...) local args = {...} args.n = select("#", ...) + local found_jid = false + local jid = 0 if args.n == 2 and args[1] == "workflow" then + -- Slurm 22.05 + jid = args[2] + found_jid = true + elseif args.n == 4 and args[3] == "workflow" then + -- Slurm 23.02 + jid = args[4] + found_jid = true + end + if found_jid == true then local done = false local status = {} - local jid = args[2] if string.find(jid, "^%d+$") == nil then msg = "A job ID must contain only digits." else From b017665f332820d79768293049d782808dd510bb Mon Sep 17 00:00:00 2001 From: Dean Roehrich Date: Mon, 18 Sep 2023 13:01:38 -0500 Subject: [PATCH 2/6] Follow DWS from HewlettPackard/dws to DataWorkflowServices/dws (#43) Use DWS v0.0.13 and dws-test-driver v0.0.3. Signed-off-by: Dean Roehrich --- .gitmodules | 8 +++---- src/burst_buffer/burst_buffer.lua | 4 ++-- testsuite/integration/Makefile | 18 +++++++-------- .../src/tests/dws_bb_plugin/workflow.py | 6 ++--- .../src/tests/environment/test_environment.py | 8 +++---- testsuite/submodules/dws | 2 +- testsuite/submodules/dws-test-driver | 2 +- testsuite/unit/src/burst_buffer/dws-test.lua | 22 +++++++++---------- 8 files changed, 34 insertions(+), 36 deletions(-) diff --git a/.gitmodules b/.gitmodules index e6b3b60..2997528 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,11 +1,9 @@ -[submodule "testsuite/submodules/dws"] - path = testsuite/submodules/dws - url = https://github.com/HewlettPackard/dws.git - branch = releases/v0 [submodule "testsuite/submodules/slurm-docker-cluster"] path = testsuite/submodules/slurm-docker-cluster url = git@github.com:DataWorkflowServices/slurm-docker-cluster.git [submodule "testsuite/submodules/dws-test-driver"] path = testsuite/submodules/dws-test-driver url = git@github.com:DataWorkflowServices/dws-test-driver.git - branch = releases/v0 +[submodule "testsuite/submodules/dws"] + path = testsuite/submodules/dws + url = git@github.com:DataWorkflowServices/dws.git diff --git a/src/burst_buffer/burst_buffer.lua b/src/burst_buffer/burst_buffer.lua index e088c0a..be0a963 100644 --- a/src/burst_buffer/burst_buffer.lua +++ b/src/burst_buffer/burst_buffer.lua @@ -35,7 +35,7 @@ DEFAULT_LABEL_KEY = "origin" DEFAULT_LABEL_VAL = lua_script_name -- The fully-qualified name of the DWS Workflow CRD. -local WORKFLOW_CRD = "workflows.dws.cray.hpe.com" +local WORKFLOW_CRD = "workflows.dataworkflowservices.github.io" KUBECTL_CACHE_DIR = "/tmp/burst_buffer_kubectl_cache" @@ -118,7 +118,7 @@ end -- resource with keywords that must be replaced by the caller. function DWS:template() return [[ -apiVersion: dws.cray.hpe.com/v1alpha2 +apiVersion: dataworkflowservices.github.io/v1alpha2 kind: Workflow metadata: name: WF_NAME diff --git a/testsuite/integration/Makefile b/testsuite/integration/Makefile index 55c1696..3c7cb6b 100644 --- a/testsuite/integration/Makefile +++ b/testsuite/integration/Makefile @@ -39,10 +39,10 @@ setup-dws: @{\ set -e ; \ cd ../submodules/dws ; \ - docker buildx build -t local/dws-operator:test --load . ; \ - IMAGE_TAG_BASE=local/dws-operator VERSION=test KIND_CLUSTER=dws make kind-push deploy ; \ - kubectl wait deployment --timeout=120s -n dws-operator-system dws-operator-controller-manager --for condition=Available=True ; \ - kubectl wait deployment --timeout=120s -n dws-operator-system dws-operator-webhook --for condition=Available=True ; \ + docker buildx build -t local/dws:test --load . ; \ + IMAGE_TAG_BASE=local/dws VERSION=test KIND_CLUSTER=dws make kind-push deploy ; \ + kubectl wait deployment --timeout=120s -n dws-system dws-controller-manager --for condition=Available=True ; \ + kubectl wait deployment --timeout=120s -n dws-system dws-webhook --for condition=Available=True ; \ } .PHONY: setup-dws-test-driver @@ -50,9 +50,9 @@ setup-dws-test-driver: @{\ set -e ; \ cd ../submodules/dws-test-driver ; \ - docker buildx build -t local/dws-test-driver-operator:test --load . ; \ - IMAGE_TAG_BASE=local/dws-test-driver-operator VERSION=test KIND_CLUSTER=dws make kind-push deploy ; \ - kubectl wait deployment --timeout=60s -n dws-test-operator-system dws-test-driver-controller-manager --for condition=Available=True ; \ + docker buildx build -t local/dws-test-driver:test --load . ; \ + IMAGE_TAG_BASE=local/dws-test-driver VERSION=test KIND_CLUSTER=dws make kind-push deploy ; \ + kubectl wait deployment --timeout=60s -n dws-test-system dws-test-driver-controller-manager --for condition=Available=True ; \ } .PHONY: setup @@ -75,10 +75,10 @@ debug: kubectl describe node dws-control-plane dws-worker echo echo "***** DWS DEPLOYMENT *****" - kubectl describe deployment -n dws-operator-system dws-operator-controller-manager + kubectl describe deployment -n dws-system dws-controller-manager echo echo "***** DWS LOGS *****" - kubectl logs -n dws-operator-system deployment/dws-operator-controller-manager + kubectl logs -n dws-system deployment/dws-controller-manager .PHONY: reports reports: diff --git a/testsuite/integration/src/tests/dws_bb_plugin/workflow.py b/testsuite/integration/src/tests/dws_bb_plugin/workflow.py index 5373613..03bda1b 100644 --- a/testsuite/integration/src/tests/dws_bb_plugin/workflow.py +++ b/testsuite/integration/src/tests/dws_bb_plugin/workflow.py @@ -49,7 +49,7 @@ def data(self): ) def _get_data(self): workflowData = self.k8s.CustomObjectsApi().get_namespaced_custom_object( - "dws.cray.hpe.com", self._api_version, "slurm", "workflows", self.name + "dataworkflowservices.github.io", self._api_version, "slurm", "workflows", self.name ) return workflowData @@ -75,10 +75,10 @@ def save_driver_statuses(self): } } self.k8s.CustomObjectsApi().patch_namespaced_custom_object( - "dws.cray.hpe.com", self._api_version, "slurm", "workflows", self.name, patchData + "dataworkflowservices.github.io", self._api_version, "slurm", "workflows", self.name, patchData ) def delete(self): self.k8s.CustomObjectsApi().delete_namespaced_custom_object( - "dws.cray.hpe.com", self._api_version, "slurm", "workflows", self.name + "dataworkflowservices.github.io", self._api_version, "slurm", "workflows", self.name ) diff --git a/testsuite/integration/src/tests/environment/test_environment.py b/testsuite/integration/src/tests/environment/test_environment.py index 7c62728..fde5d6a 100644 --- a/testsuite/integration/src/tests/environment/test_environment.py +++ b/testsuite/integration/src/tests/environment/test_environment.py @@ -1,5 +1,5 @@ # -# Copyright 2022 Hewlett Packard Enterprise Development LP +# Copyright 2022-2023 Hewlett Packard Enterprise Development LP # Other additional copyright holders may be indicated within. # # The entirety of this work is licensed under the Apache License, @@ -47,8 +47,8 @@ def _(k8s): def _(k8s): """the DataWorkflowServices deployment is queried.""" return k8s.AppsV1Api().list_namespaced_deployment( - namespace="dws-operator-system", - field_selector="metadata.name=dws-operator-controller-manager" + namespace="dws-system", + field_selector="metadata.name=dws-controller-manager" ) @when('the kube-system UID is queried from slurmctld', target_fixture="kube_system_uid_from_slurmctld") @@ -72,4 +72,4 @@ def _(dws_deployment_list): @then('the UIDs match and the cluster is the same') def _(kube_system_uid, kube_system_uid_from_slurmctld): """the UIDs match and the cluster is the same.""" - assert kube_system_uid == kube_system_uid_from_slurmctld \ No newline at end of file + assert kube_system_uid == kube_system_uid_from_slurmctld diff --git a/testsuite/submodules/dws b/testsuite/submodules/dws index e0df046..8aa0d9d 160000 --- a/testsuite/submodules/dws +++ b/testsuite/submodules/dws @@ -1 +1 @@ -Subproject commit e0df04642b2285f0bd1fc80a43e683ee357bd164 +Subproject commit 8aa0d9dd2c6942f1c3588d5bb758358083886ca2 diff --git a/testsuite/submodules/dws-test-driver b/testsuite/submodules/dws-test-driver index 3adce62..96d2a0e 160000 --- a/testsuite/submodules/dws-test-driver +++ b/testsuite/submodules/dws-test-driver @@ -1 +1 @@ -Subproject commit 3adce6281b9243b19d7db32a2539769f3af5ba63 +Subproject commit 96d2a0e548efd40d63d0cfb6fe14c394863827b6 diff --git a/testsuite/unit/src/burst_buffer/dws-test.lua b/testsuite/unit/src/burst_buffer/dws-test.lua index b788dbe..85cd6ad 100644 --- a/testsuite/unit/src/burst_buffer/dws-test.lua +++ b/testsuite/unit/src/burst_buffer/dws-test.lua @@ -234,7 +234,7 @@ describe("The dws library", function() end local apply_workflow = function() - local result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " created\n" + local result_wanted = "workflow.dataworkflowservices.github.io/" .. workflow_name .. " created\n" dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) @@ -250,7 +250,7 @@ describe("The dws library", function() local delete_workflow = function() -- Delete the resource. - local result_wanted = 'workflow.dws.cray.hpe.com "' .. workflow_name .. '" deleted\n' + local result_wanted = 'workflow.dataworkflowservices.github.io "' .. workflow_name .. '" deleted\n' dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) @@ -363,7 +363,7 @@ describe("The dws library", function() -- the hurry flag on the state, if indicated. local set_desired_state = function(new_state, hurry) local ret_wanted = true - local result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " patched\n" + local result_wanted = "workflow.dataworkflowservices.github.io/" .. workflow_name .. " patched\n" if skip_state == true then result_wanted = 'Error from server (Spec.DesiredState: Invalid value: "' .. new_state .. '": states cannot be skipped): admission webhook "vworkflow.kb.io" denied the request: Spec.DesiredState: Invalid value: "' .. new_state .. '": states cannot be skipped\n' @@ -477,7 +477,7 @@ describe("The dws library", function() local new_state = "Setup" - local set_result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " patched\n" + local set_result_wanted = "workflow.dataworkflowservices.github.io/" .. workflow_name .. " patched\n" local wait_result_wanted = "desiredState=" .. new_state .. "\ncurrentState=" .. new_state .. "\nstatus=Completed\n" dwsmq_enqueue(true, "") -- kubectl_cache_home @@ -654,7 +654,7 @@ describe("Burst buffer helpers", function() after_each(function() if resource_exists and expect_exists then - local result_wanted = 'workflow.dws.cray.hpe.com "' .. workflow_name .. '" deleted\n' + local result_wanted = 'workflow.dataworkflowservices.github.io "' .. workflow_name .. '" deleted\n' dwsmq_reset() dwsmq_enqueue(true, "") -- kubectl_cache_home @@ -674,7 +674,7 @@ describe("Burst buffer helpers", function() end) local create_workflow = function(labels) - local result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " created\n" + local result_wanted = "workflow.dataworkflowservices.github.io/" .. workflow_name .. " created\n" dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) @@ -823,7 +823,7 @@ describe("Slurm API", function() end) local mock_popen_calls = function(state, status, k8s_cmd_result) - local k8s_cmd_result = k8s_cmd_result or "workflow.dws.cray.hpe.com/" .. workflow_name .. " patched\n" + local k8s_cmd_result = k8s_cmd_result or "workflow.dataworkflowservices.github.io/" .. workflow_name .. " patched\n" local state_result = "desiredState=".. state .."\ncurrentState=".. state .."\nstatus=".. status .."\n" dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, k8s_cmd_result) @@ -844,7 +844,7 @@ describe("Slurm API", function() local job_script = "#!/bin/bash\nsrun application.sh\n" write_job_script(job_script_name, job_script) - local apply_result = "workflow.dws.cray.hpe.com/" .. workflow_name .. " created\n" + local apply_result = "workflow.dataworkflowservices.github.io/" .. workflow_name .. " created\n" local popen_count = mock_popen_calls("Proposal", "Completed", apply_result) popen_count = popen_count + mock_popen_calls("Setup", "Completed") @@ -855,7 +855,7 @@ describe("Slurm API", function() end local call_bb_teardown = function(hurry) - local delete_result = 'workflow.dws.cray.hpe.com "' .. workflow_name .. '" deleted\n' + local delete_result = 'workflow.dataworkflowservices.github.io "' .. workflow_name .. '" deleted\n' local popen_count = mock_popen_calls("Teardown", "Completed") dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, delete_result) @@ -927,7 +927,7 @@ describe("Slurm API", function() local job_script = "#!/bin/bash\nsrun application.sh\n" write_job_script(job_script_name, job_script) - local apply_result = "workflow.dws.cray.hpe.com/" .. workflow_name .. " created\n" + local apply_result = "workflow.dataworkflowservices.github.io/" .. workflow_name .. " created\n" local popen_count = mock_popen_calls("Proposal", "Error", apply_result) local driver_id_1 = "driver1" @@ -1000,7 +1000,7 @@ describe("Slurm API", function() -- Call the appropriate slurm_bb_* function to induce an -- error condition. local call_bb_state_negative = function(new_state) - local set_state_result_wanted = 'Error from server (NotFound): workflows.dws.cray.hpe.com "' .. workflow_name .. '" not found\n' + local set_state_result_wanted = 'Error from server (NotFound): workflows.dataworkflowservices.github.io "' .. workflow_name .. '" not found\n' dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(false, set_state_result_wanted) From bb4fe6ca7377a93df2fa99e89ece9c1d5d831bab Mon Sep 17 00:00:00 2001 From: Dean Roehrich Date: Fri, 22 Sep 2023 13:50:03 -0500 Subject: [PATCH 3/6] Handle DWS fatal errors (#44) Set Flags=TeardownFailure in burst_buffer.conf. This tells Slurm to go to Teardown if there are errors during stage_in or stage_out. In the burst_buffer.lua plugin, adjust slurm_bb_job_teardown() to check the workflow for any fatal errors recorded in the drivers array. If any are found, then we know we were called in a fatal error situation and we not only delete the workflow, per usual, but we also call the Slurm `scancel` command to cancel the Slurm job. Signed-off-by: Dean Roehrich --- src/burst_buffer/burst_buffer.conf | 6 ++ src/burst_buffer/burst_buffer.lua | 62 +++++++++++-- .../src/features/test_dws_states.feature | 65 +++++++------- .../src/features/test_environment.feature | 2 +- testsuite/integration/src/pytest.ini | 5 +- testsuite/integration/src/tests/conftest.py | 18 +--- .../tests/dws_bb_plugin/test_dws_states.py | 86 +++++-------------- testsuite/integration/src/tests/slurmctld.py | 45 +++++----- testsuite/unit/src/burst_buffer/dws-test.lua | 4 +- 9 files changed, 149 insertions(+), 144 deletions(-) diff --git a/src/burst_buffer/burst_buffer.conf b/src/burst_buffer/burst_buffer.conf index 67d41cc..04427d2 100644 --- a/src/burst_buffer/burst_buffer.conf +++ b/src/burst_buffer/burst_buffer.conf @@ -3,3 +3,9 @@ # See https://slurm.schedmd.com/burst_buffer.conf.html Directive=DW +# If set, then teardown a burst buffer after file staging error. Otherwise +# preserve the burst buffer for analysis and manual teardown. +# See https://slurm.schedmd.com/burst_buffer.conf.html +# and https://slurm.schedmd.com/burst_buffer.html#states +Flags=TeardownFailure + diff --git a/src/burst_buffer/burst_buffer.lua b/src/burst_buffer/burst_buffer.lua index be0a963..b1c80a8 100644 --- a/src/burst_buffer/burst_buffer.lua +++ b/src/burst_buffer/burst_buffer.lua @@ -280,9 +280,16 @@ end -- DWS:get_driver_errors will collect driver errors from the Workflow resource -- with respect to the given state. -function DWS:get_driver_errors(state) - local error_list = {} - local jsonpath = [[{range .status.drivers[?(@.watchState=="]].. state ..[[")]}==={"\n"}{@.status}{"\n"}{@.driverID}{"\n"}{@.error}{"\n"}{end}]] +-- If all_errors=true then collect all errors from all states in all drivers. +-- On success this returns true and a string with all of the errors. +-- On failure this returns false, an empty string for the errors, and a string +-- explaining why it couldn't collect the errors. +function DWS:get_driver_errors(state, all_errors) + local driver_index = [[?(@.watchState=="]].. state ..[[")]] + if all_errors == true then + driver_index = "*" + end + local jsonpath = [[{range .status.drivers[]] .. driver_index .. [[]}==={"\n"}{@.status}{"\n"}{@.driverID}{"\n"}{@.error}{"\n"}{end}]] local ret, output = self:get_jsonpath(jsonpath) if ret == false then return ret, "", "could not get driver errors: " .. output @@ -442,6 +449,18 @@ function DWS:kubectl(cmd) return self:io_popen(kcmd) end +-- DWS:scancel will run the Slurm scancel command and collect its output. +-- On success this returns true and the output of the command. +-- On failure this returns false and the output of the command. +function DWS:scancel(jobId, hurry) + local hurry_opt = "" + if hurry == true then + hurry_opt = "--hurry " + end + local scmd = "scancel " .. hurry_opt .. jobId + return self:io_popen(scmd) +end + -- DWS:io_popen will run the given command and collect its output. -- On success this returns true and the output of the command. -- On failure this returns false and the output of the command. @@ -627,24 +646,51 @@ function slurm_bb_job_teardown(job_id, job_script, hurry) hurry_flag = true end local workflow = DWS(make_workflow_name(job_id)) - local done, err = workflow:set_workflow_state_and_wait("Teardown", hurry_flag) + + local ret = slurm.SUCCESS + -- Does the workflow have a fatal error in it? + -- If so, we'll call scancel as well. + local done, state_errors, err = workflow:get_driver_errors("", true) if done == false then if string.find(err, [["]] .. workflow.name .. [[" not found]]) then -- It's already gone, and that's what we wanted anyway. return slurm.SUCCESS else - slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s: %s", lua_script_name, workflow.name, err) - return slurm.ERROR, err + slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s: unable to check driver errors: %s", lua_script_name, workflow.name, err) + ret = slurm.ERROR + -- fall-through, let the Workflow delete happen. end end + done, err = workflow:set_workflow_state_and_wait("Teardown", hurry_flag) + if done == false then + slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s: %s", lua_script_name, workflow.name, err) + ret = slurm.ERROR + -- fall-through, let the Workflow delete happen. + end + done, err = workflow:delete() if done == false then slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s, delete: %s", lua_script_name, workflow.name, err) - return slurm.ERROR, err + ret = slurm.ERROR + -- fall-through, let any necessary scancel happen. + end + + if state_errors ~= "" then + -- Now do the scancel. This will terminate this Lua script and will + -- trigger slurm to call our teardown again, but that'll be a no-op + -- when it comes back here. + slurm.log_info("%s: slurm_bb_job_teardown(), workflow=%s: executing scancel --hurry %s, found driver errors: %s", lua_script_name, workflow.name, job_id, state_errors) + _, err = workflow:scancel(job_id, true) + if err == "" then + err = "(no output)" + end end - return slurm.SUCCESS + if ret == slurm.SUCCESS then + err = nil + end + return ret, err end --[[ diff --git a/testsuite/integration/src/features/test_dws_states.feature b/testsuite/integration/src/features/test_dws_states.feature index 92b921b..4510826 100644 --- a/testsuite/integration/src/features/test_dws_states.feature +++ b/testsuite/integration/src/features/test_dws_states.feature @@ -22,6 +22,7 @@ Feature: Data Workflow Services State Progression Verify that the DWS-Slurm Burst Buffer Plugin progresses through Data Workflow Services states + @happy_one Scenario: The DWS-BB Plugin progresses through DWS states Given a job script: #!/bin/bash @@ -44,13 +45,15 @@ Feature: Data Workflow Services State Progression And the Workflow and job progress to the PostRun state And the Workflow and job progress to the DataOut state And the Workflow and job progress to the Teardown state - And the job state is COMPLETED + And the job has eventually been COMPLETED # DWS does not allow spaces in key/value pairs in directives. To skirt around this # constraint, the dws-test-driver replaces underscores ("_") in the message value with # spaces. This ensures that the dws-slurm-plugin can handle whitespace in error messages # It also makes it easier to check that the error is included in scontrol output. - Scenario Outline: The DWS-BB Plugin can handle fatal driver errors before being canceled + # This scenario assumes that "Flags=TeardownFailure" is set in burst_buffer.conf. + @fatal_one + Scenario Outline: Report fatal errors from Proposal, Setup, DataIn, PreRun Given a job script: #!/bin/bash @@ -59,12 +62,13 @@ Feature: Data Workflow Services State Progression /bin/hostname When the job is run - Then a Workflow has been created for the job - And the Workflow and job report fatal errors at the state - And the job is canceled - And the Workflow and job progress to the Teardown state - And the job's final system comment contains the following: + And some Workflow has been created for the job + And the Workflow reports fatal errors at the state + Then the job's system comment eventually contains the following: TEST FATAL ERROR + And the Workflow and job progress to the Teardown state + And the Workflow has eventually been deleted + And the job has eventually been CANCELLED Examples: # *** HEADER *** @@ -73,14 +77,15 @@ Feature: Data Workflow Services State Progression | Proposal | | Setup | | DataIn | - | PostRun | - | DataOut | + | PreRun | - # With the exception of PreRun, states will need to be canceled with the - # "--hurry" flag to transition to the Teardown state. If - # "Flags=TeardownFailure" is set in burst_buffer.conf, then all states will - # transition to Teardown without needing to be canceled - Scenario Outline: The DWS-BB Plugin can handle fatal driver errors for PreRun + # DWS does not allow spaces in key/value pairs in directives. To skirt around this + # constraint, the dws-test-driver replaces underscores ("_") in the message value with + # spaces. This ensures that the dws-slurm-plugin can handle whitespace in error messages + # It also makes it easier to check that the error is included in scontrol output. + # This scenario assumes that "Flags=TeardownFailure" is set in burst_buffer.conf. + @fatal_two + Scenario Outline: Report fatal errors from PostRun and DataOut Given a job script: #!/bin/bash @@ -89,22 +94,23 @@ Feature: Data Workflow Services State Progression /bin/hostname When the job is run - Then a Workflow has been created for the job - And the Workflow reports a fatal error in the state - And the Workflow and job progress to the Teardown state - # Slurm moved it from PreRun/Error to Teardown without canceling - # the job. So the driver (this test) must cancel it. - And the job is canceled - And the job's final system comment contains the following: + And some Workflow has been created for the job + And the Workflow reports fatal errors at the state + Then the job's system comment eventually contains the following: TEST FATAL ERROR + And the Workflow and job progress to the Teardown state + And the Workflow has eventually been deleted + And the job has eventually been COMPLETED Examples: # *** HEADER *** | workflowState | # *** VALUES *** - | PreRun | + | PostRun | + | DataOut | - Scenario: The DWS-BB Plugin can handle fatal driver errors during Teardown + @fatal_three + Scenario: Report fatal errors from Teardown Given a job script: #!/bin/bash @@ -112,12 +118,7 @@ Feature: Data Workflow Services State Progression /bin/hostname When the job is run - Then a Workflow has been created for the job - And the Workflow reports a fatal error in the Teardown state - And the job's intermediate system comment contains the following: - TEST FATAL ERROR - # Eventually the driver (this test) must work through the Teardown - # issues and complete that step. Slurm has already marked the job - # as completed and is now looping over slurm_bb_job_teardown() in - # burst_buffer.lua. - And the Workflow error is cleared from the Teardown state + And some Workflow has been created for the job + And the Workflow reports fatal errors at the Teardown state + Then the Workflow has eventually been deleted + And the job has eventually been COMPLETED diff --git a/testsuite/integration/src/features/test_environment.feature b/testsuite/integration/src/features/test_environment.feature index dfbd412..4e2f657 100644 --- a/testsuite/integration/src/features/test_environment.feature +++ b/testsuite/integration/src/features/test_environment.feature @@ -36,7 +36,7 @@ Feature: Integration test environment srun -l /bin/hostname srun -l /bin/pwd When the job is run - Then the job state is COMPLETED + Then the job has eventually been COMPLETED Scenario: Kubernetes and slurm are connected Given the kubernetes cluster kube-system UID diff --git a/testsuite/integration/src/pytest.ini b/testsuite/integration/src/pytest.ini index 5399438..f58024e 100644 --- a/testsuite/integration/src/pytest.ini +++ b/testsuite/integration/src/pytest.ini @@ -22,4 +22,7 @@ bdd_features_base_dir = features markers = environment dws_states - + happy_one + fatal_one + fatal_two + fatal_three diff --git a/testsuite/integration/src/tests/conftest.py b/testsuite/integration/src/tests/conftest.py index 255d323..126d9c2 100644 --- a/testsuite/integration/src/tests/conftest.py +++ b/testsuite/integration/src/tests/conftest.py @@ -72,17 +72,7 @@ def _(slurmctld, script_path): # remove the slurm output from the jobs folder slurmctld.remove_job_output(jobId, outputFilePath, errorFilePath) -@then(parsers.parse('the job state is {expectedJobState}')) -def _(slurmctld, jobId, expectedJobState): - """the job state is """ - jobState, out = slurmctld.get_final_job_state(jobId) - - if expectedJobState == "COMPLETED" and jobState == "FAILED": - warnings.warn(ResourceWarning((f"Job {jobId} failed unexpectedly.\n") + \ - "This may happen if Slurm doesn't have enough resources to schedule the job.\n" + \ - "This is not considered a test failure, in this context, since DWS isn't\n" + \ - "dependent on the job's failure or success." - )) - return - - assert jobState == expectedJobState, "Unexpected Job State: " + jobState + "\n" + out +@then(parsers.parse('the job has eventually been {job_state:l}')) +def _(slurmctld, jobId, job_state): + """the job has eventually been """ + slurmctld.wait_until_job_has_been_x(jobId, job_state) diff --git a/testsuite/integration/src/tests/dws_bb_plugin/test_dws_states.py b/testsuite/integration/src/tests/dws_bb_plugin/test_dws_states.py index ff0277e..c5433ba 100644 --- a/testsuite/integration/src/tests/dws_bb_plugin/test_dws_states.py +++ b/testsuite/integration/src/tests/dws_bb_plugin/test_dws_states.py @@ -46,21 +46,19 @@ def _(k8s, jobId): except k8sclient.exceptions.ApiException: pass -@when(parsers.parse('the Workflow status becomes {status:l}')) -def _(slurmctld, jobId, status): - """the Workflow status becomes """ - workflowStatus = slurmctld.get_workflow_status(jobId) - assert workflowStatus["status"] == status +@when('some Workflow has been created for the job') +def _(k8s, jobId): + """some Workflow has been created for the job.""" + workflow = Workflow(k8s, jobId) + assert workflow.data is not None, "Workflow for Job: " + str(jobId) + " not found" -@then('the job is canceled') +@then('the Workflow has eventually been deleted') def _(slurmctld, jobId): - """the job is canceled""" - time.sleep(2) # Sleep long enough for bb plugin to poll workflow once or twice - slurmctld.cancel_job(jobId, False) - time.sleep(2) # Sleep long enough for the workflow to be deleted + """the Workflow has eventually been deleted""" + slurmctld.wait_until_workflow_is_gone(jobId) -def verify_job_status(slurmctld, jobId, state, status): - jobStatus = slurmctld.get_workflow_status(jobId) +def verify_job_bbstat(slurmctld, jobId, state, status): + jobStatus = slurmctld.scontrol_show_bbstat(jobId) assert jobStatus["desiredState"] == state, "Incorrect desired state: " + str(jobStatus) assert jobStatus["currentState"] == state, "Incorrect current state: " + str(jobStatus) assert jobStatus["status"] == status, "Incorrect status: " + str(jobStatus) @@ -73,18 +71,18 @@ def _(k8s, slurmctld, jobId, state): workflow = Workflow(k8s, jobId) workflow.wait_until( - f"the workflow transitions to {state}/{expectedStatus}", + f"the workflow transitioned to {state}/{expectedStatus}", lambda wf: wf.data["status"]["state"] == state and wf.data["status"]["status"] == expectedStatus ) - print("job %s progressed to state %s" % (str(jobId),state)) + print(f"job {jobId} progressed to state {state}") - verify_job_status(slurmctld, jobId, state, expectedStatus) + verify_job_bbstat(slurmctld, jobId, state, expectedStatus) # Set driver status to completed so the workflow can progress to the next state foundPendingDriverStatus = False for driverStatus in workflow.data["status"]["drivers"]: if driverStatus["driverID"] == "tester" and driverStatus["watchState"] == state and driverStatus["status"] == "Pending": - print("updating workflow %s to complete state %s" % (str(jobId), state)) + print(f"updating workflow {jobId} to complete state {state}") driverStatus["completed"] = True driverStatus["status"] = "Completed" foundPendingDriverStatus = True @@ -93,27 +91,6 @@ def _(k8s, slurmctld, jobId, state): assert foundPendingDriverStatus, "Driver not found with \"Pending\" status" workflow.save_driver_statuses() -@then(parsers.parse('the Workflow error is cleared from the {state:l} state')) -def _(k8s, slurmctld, jobId, state): - """the Workflow error is cleared from the state.""" - - workflow = Workflow(k8s, jobId) - - # Set driver status to completed so the workflow can progress to the next state - foundPendingDriverStatus = False - for driverStatus in workflow.data["status"]["drivers"]: - if driverStatus["driverID"] == "tester" and driverStatus["watchState"] == state and driverStatus["status"] == "Error": - print(f"updating workflow %s to complete state %s" % (str(jobId), state)) - driverStatus["completed"] = True - driverStatus["status"] = "Completed" - # The DWS webhook requires that the error message be cleared as well. - del driverStatus["error"] - foundPendingDriverStatus = True - break - - assert foundPendingDriverStatus, "Driver not found with \"Error\" status" - workflow.save_driver_statuses() - def driver_state_check(workflow, state, expected_status): found_it = False print(f"check drivers for state {state} with status {expected_status}") @@ -127,26 +104,9 @@ def driver_state_check(workflow, state, expected_status): break return found_it -@then(parsers.parse('the Workflow and job report fatal errors at the {state:l} state')) -def _(k8s, slurmctld, jobId, state): - """the Workflow and job report errors at the state.""" - - expected_status = "Error" - - def driver_check(workflow): - return driver_state_check(workflow, state, expected_status) - - workflow = Workflow(k8s, jobId) - workflow.wait_until( - f"the workflow {state} state shows a status of {expected_status}", - lambda wf: driver_check(wf) is True - ) - - verify_job_status(slurmctld, jobId, state, expected_status) - -@then(parsers.parse('the Workflow reports a fatal error in the {state:l} state')) +@when(parsers.parse('the Workflow reports fatal errors at the {state:l} state')) def _(k8s, slurmctld, jobId, state): - """the Workflow reports a fatal error in the state.""" + """the Workflow reports fatal errors at the state.""" expected_status = "Error" @@ -159,13 +119,7 @@ def driver_check(workflow): lambda wf: driver_check(wf) is True ) -@then(parsers.parse("the job's {disposition:l} system comment contains the following:\n{message}")) -def _(slurmctld, jobId, disposition, message): - assert disposition in ["final", "intermediate"], f"unknown disposition: {disposition}" - must_be_gone = True if disposition == "final" else False - _,out = slurmctld.get_final_job_state(jobId, must_be_gone) - m = re.search(r'\n\s+SystemComment=(.*)\n\s+StdErr=', out, re.DOTALL) - assert m is not None, f"Could not find SystemComment in job state from Slurm\n{out}" - if message in m.group(1): - print(f"Found \"{message}\" in SystemComment") - assert message in m.group(1) +@then(parsers.parse("the job's system comment eventually contains the following:\n{message}")) +def _(slurmctld, jobId, message): + print(f"looking for system comment with: {message}") + slurmctld.wait_until_job_system_comment(jobId, message) diff --git a/testsuite/integration/src/tests/slurmctld.py b/testsuite/integration/src/tests/slurmctld.py index f7869d1..4903d86 100644 --- a/testsuite/integration/src/tests/slurmctld.py +++ b/testsuite/integration/src/tests/slurmctld.py @@ -20,6 +20,7 @@ import os import time import docker +import re from tenacity import * # Submitting jobs can fail, occasionally, when the DWS webhook rejects the @@ -59,13 +60,6 @@ def submit_job(self, scriptPath): jobId = int(out.split()[-1]) return jobId, scriptPath + ".out", scriptPath + ".error.out" - def cancel_job(self, jobId, hurry_flag=False): - print("cancel job" + str(jobId)) - cmd = "scancel --hurry %s" % str(jobId) - rc, out = self.exec_run(cmd) - if rc != 0: - raise JobCancelError(out) - def remove_job_output(self, jobId, outputFilePath, errorFilePath): """ The creation of the job's output file will sometimes lag behind the @@ -92,19 +86,7 @@ def wait_until_workflow_is_gone(self, jobId): print(f"Workflow {jobId} still exists: " + out) raise JobNotCompleteError() - @retry( - wait=wait_fixed(2), - stop=stop_after_attempt(30), - retry=retry_if_result(lambda state: state[0] not in ["COMPLETED", "FAILED", "CANCELLED"]), - retry_error_callback=lambda retry_state: retry_state.outcome.result() - ) - def get_final_job_state(self, jobId, must_be_gone=True): - # When the job is finished, the workflow should not exist. - if must_be_gone: - self.wait_until_workflow_is_gone(jobId) - else: - time.sleep(5) # wait for workflow info to be transferred to the job - + def scontrol_show_job(self, jobId): rc, out = self.exec_run("scontrol show job " + str(jobId)) assert rc==0, "Could not get job state from Slurm:\n" + out @@ -119,8 +101,29 @@ def get_final_job_state(self, jobId, must_be_gone=True): print("JobState=" + keyVal[1]) return keyVal[1], out assert False, "Could not parse state from: " + out + + @retry( + wait=wait_fixed(2), + stop=stop_after_attempt(5) + ) + def wait_until_job_has_been_x(self, jobId, job_state): + job_state, _ = self.scontrol_show_job(jobId) + print(f"Found \"{job_state}\" in JobState") + assert job_state == job_state - def get_workflow_status(self, jobId): + @retry( + wait=wait_fixed(2), + stop=stop_after_attempt(5) + ) + def wait_until_job_system_comment(self, jobId, message): + _,out = self.scontrol_show_job(jobId) + m = re.search(r'\n\s+SystemComment=(.*)\n\s+StdErr=', out, re.DOTALL) + assert m is not None, f"Could not find SystemComment in job state from Slurm\n{out}" + if message in m.group(1): + print(f"Found \"{message}\" in SystemComment") + assert message in m.group(1) + + def scontrol_show_bbstat(self, jobId): rc, out = self.exec_run("scontrol show bbstat workflow " + str(jobId)) assert rc == 0, "Could not get job status from Slurm:\n" + out # This next check is because the scontrol command does not exit non-zero diff --git a/testsuite/unit/src/burst_buffer/dws-test.lua b/testsuite/unit/src/burst_buffer/dws-test.lua index 85cd6ad..2be76de 100644 --- a/testsuite/unit/src/burst_buffer/dws-test.lua +++ b/testsuite/unit/src/burst_buffer/dws-test.lua @@ -855,11 +855,13 @@ describe("Slurm API", function() end local call_bb_teardown = function(hurry) + dwsmq_enqueue(true, "") -- get_driver_errors + dwsmq_enqueue(true, "") -- kubectl_cache_home local delete_result = 'workflow.dataworkflowservices.github.io "' .. workflow_name .. '" deleted\n' local popen_count = mock_popen_calls("Teardown", "Completed") dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, delete_result) - popen_count = popen_count + 2 + popen_count = popen_count + 4 io.popen:clear() local ret, err = slurm_bb_job_teardown(jobID, job_script_name, hurry) From 13e5e9a6f9cb6b41d3542f20258f473457e7a3b9 Mon Sep 17 00:00:00 2001 From: Dean Roehrich Date: Mon, 25 Sep 2023 10:33:09 -0500 Subject: [PATCH 4/6] Remove the fatal-eror-during-teardown test (#45) The feature code appears to be ok. The test has a race; work it as a separate effort. Signed-off-by: Dean Roehrich --- .../src/features/test_dws_states.feature | 14 -------------- testsuite/integration/src/pytest.ini | 1 - 2 files changed, 15 deletions(-) diff --git a/testsuite/integration/src/features/test_dws_states.feature b/testsuite/integration/src/features/test_dws_states.feature index 4510826..ced55ff 100644 --- a/testsuite/integration/src/features/test_dws_states.feature +++ b/testsuite/integration/src/features/test_dws_states.feature @@ -108,17 +108,3 @@ Feature: Data Workflow Services State Progression # *** VALUES *** | PostRun | | DataOut | - - @fatal_three - Scenario: Report fatal errors from Teardown - Given a job script: - #!/bin/bash - - #DW Teardown action=error message=TEST_FATAL_ERROR severity=Fatal - /bin/hostname - - When the job is run - And some Workflow has been created for the job - And the Workflow reports fatal errors at the Teardown state - Then the Workflow has eventually been deleted - And the job has eventually been COMPLETED diff --git a/testsuite/integration/src/pytest.ini b/testsuite/integration/src/pytest.ini index f58024e..799830c 100644 --- a/testsuite/integration/src/pytest.ini +++ b/testsuite/integration/src/pytest.ini @@ -25,4 +25,3 @@ markers = happy_one fatal_one fatal_two - fatal_three From 09530d63f1769bc9187d5a4c908cba08217d75da Mon Sep 17 00:00:00 2001 From: Kevin Pelzel Date: Tue, 26 Sep 2023 13:45:04 -0600 Subject: [PATCH 5/6] return string as second return value --- src/burst_buffer/burst_buffer.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/burst_buffer/burst_buffer.lua b/src/burst_buffer/burst_buffer.lua index b1c80a8..3c3de7e 100644 --- a/src/burst_buffer/burst_buffer.lua +++ b/src/burst_buffer/burst_buffer.lua @@ -688,7 +688,7 @@ function slurm_bb_job_teardown(job_id, job_script, hurry) end if ret == slurm.SUCCESS then - err = nil + err = "" end return ret, err end From daaa147ce205553af4d3fe4cbc6245f9e8726ccc Mon Sep 17 00:00:00 2001 From: Dean Roehrich Date: Thu, 28 Sep 2023 09:59:45 -0500 Subject: [PATCH 6/6] Fix unit test Signed-off-by: Dean Roehrich --- testsuite/unit/src/burst_buffer/dws-test.lua | 1 - 1 file changed, 1 deletion(-) diff --git a/testsuite/unit/src/burst_buffer/dws-test.lua b/testsuite/unit/src/burst_buffer/dws-test.lua index 2be76de..ec3b8cd 100644 --- a/testsuite/unit/src/burst_buffer/dws-test.lua +++ b/testsuite/unit/src/burst_buffer/dws-test.lua @@ -837,7 +837,6 @@ describe("Slurm API", function() assert.stub(io.popen).was_called(popen_calls) io.popen:clear() assert.is_equal(ret, slurm.SUCCESS) - assert.is_nil(err, err) end local call_bb_setup = function()