diff --git a/src/toil/test/src/jobServiceTest.py b/src/toil/test/src/jobServiceTest.py index 732b591444..0fec4a1b04 100644 --- a/src/toil/test/src/jobServiceTest.py +++ b/src/toil/test/src/jobServiceTest.py @@ -153,18 +153,41 @@ def testServiceRecursive(self, checkpoint=True): SingleMachineBatchSystem.numCores < 4, "Need at least four cores to run this test", ) - @retry_flaky_test(prepare=[ToilTest.tearDown, ToilTest.setUp]) @pytest.mark.timeout(1200) def testServiceParallelRecursive(self, checkpoint=True): """ Tests the creation of a Job.Service, creating parallel chains of services and accessing jobs. Randomly fails the worker. """ + + # This test needs to have something like 10 jobs succeed + + BUNDLE_SIZE = 3 + BUNDLE_COUNT = 2 + RETRY_COUNT = 4 + FAIL_FRACTION = 0.5 + MAX_ATTEMPTS = 10 + + total_jobs = BUNDLE_SIZE * BUNDLE_COUNT * 2 + 1 + p_complete_job_failure = FAIL_FRACTION ** (RETRY_COUNT + 1) + p_workflow_success = (1 - p_complete_job_failure) ** total_jobs + logger.info("Going to run %s total jobs, each of which completely fails %s of the time, so the workflow will succeed with probability %s", total_jobs, p_complete_job_failure, p_workflow_success) + p_test_failure = (1 - p_workflow_success) ** MAX_ATTEMPTS + logger.info("This test will fail spuriously with probability %s", p_test_failure) + + # We want to run the workflow through several times to test restarting, so we need it to often fail but reliably sometimes succeed, and almost always succeed when repeated. + + self.assertGreater(0.8, p_workflow_success) + self.assertGreater(p_workflow_success, 0.2) + self.assertGreater(0.001, p_test_failure) + + for test in range(1): # Temporary file - outFiles = [get_temp_file(rootDir=self._createTempDir()) for j in range(2)] + outFiles = [get_temp_file(rootDir=self._createTempDir()) for j in range(BUNDLE_COUNT)] + # We send 3 messages each in 2 sets, each of which needs a service and a client messageBundles = [ - [random.randint(1, sys.maxsize) for i in range(3)] for j in range(2) + [random.randint(1, sys.maxsize) for i in range(BUNDLE_SIZE)] for j in range(BUNDLE_COUNT) ] try: # Wire up the services/jobs @@ -176,7 +199,7 @@ def testServiceParallelRecursive(self, checkpoint=True): ) # Run the workflow repeatedly until success - self.runToil(t, retryCount=2) + self.runToil(t, retryCount=RETRY_COUNT, badWorker=FAIL_FRACTION, max_attempts=MAX_ATTEMPTS) # Check output for messages, outFile in zip(messageBundles, outFiles): @@ -194,6 +217,7 @@ def runToil( badWorkedFailInterval=0.1, maxServiceJobs=sys.maxsize, deadlockWait=60, + max_attempts=50 ): # Create the runner for the workflow. options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) @@ -207,17 +231,19 @@ def runToil( options.deadlockWait = deadlockWait # Run the workflow - totalTrys = 0 + total_tries = 1 while True: try: Job.Runner.startToil(rootJob, options) break except FailedJobsException as e: i = e.numberOfFailedJobs - if totalTrys > 50: # p(fail after this many restarts) = 0.5**32 + logger.info("Workflow attempt %s/%s failed with %s failed jobs", total_tries, max_attempts, i) + if total_tries == max_attempts: self.fail() # Exceeded a reasonable number of restarts - totalTrys += 1 + total_tries += 1 options.restart = True + logger.info("Succeeded after %s/%s attempts", total_tries, max_attempts) class PerfectServiceTest(JobServiceTest): @@ -250,13 +276,16 @@ def serviceTest(job, outFile, messageInt): """ # Clean out out-file open(outFile, "w").close() - randInt = random.randint( + # We create a random number that is added to messageInt and subtracted by + # the serviceAccessor, to prove that when service test is checkpointed and + # restarted there is never a connection made between an earlier service and + # later serviceAccessor, or vice versa. + to_subtract = random.randint( 1, sys.maxsize - ) # We create a random number that is added to messageInt and subtracted by the serviceAccessor, to prove that - # when service test is checkpointed and restarted there is never a connection made between an earlier service and later serviceAccessor, or vice versa. + ) job.addChildJobFn( serviceAccessor, - job.addService(ToyService(messageInt + randInt)), + job.addService(ToyService(messageInt + to_subtract)), outFile, randInt, ) @@ -406,14 +435,14 @@ def serviceWorker( raise -def serviceAccessor(job, communicationFiles, outFile, randInt): +def serviceAccessor(job, communicationFiles, outFile, to_subtract): """ Writes a random integer iinto the inJobStoreFileID file, then tries 10 times reading from outJobStoreFileID to get a pair of integers, the first equal to i the second written into the outputFile. """ inJobStoreFileID, outJobStoreFileID = communicationFiles - # Get a random integer + # Get a random integer to advertise ourselves key = random.randint(1, sys.maxsize) # Write the integer into the file @@ -438,10 +467,10 @@ def serviceAccessor(job, communicationFiles, outFile, randInt): if int(key2) == key: logger.debug( - f"Matched key's: {key}, writing message: {int(message) - randInt} with randInt: {randInt}" + f"Matched key's: {key}, writing message: {int(message) - to_subtract} with to_subtract: {to_subtract}" ) with open(outFile, "a") as fH: - fH.write("%s\n" % (int(message) - randInt)) + fH.write("%s\n" % (int(message) - to_subtract)) return assert 0 # Job failed to get info from the service