Skip to content

Commit

Permalink
Try and fix flaky parallel recursive service test by bounding failure…
Browse files Browse the repository at this point in the history
… probability
  • Loading branch information
adamnovak committed Nov 8, 2024
1 parent f9df47c commit 5516a81
Showing 1 changed file with 44 additions and 15 deletions.
59 changes: 44 additions & 15 deletions src/toil/test/src/jobServiceTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,41 @@ def testServiceRecursive(self, checkpoint=True):
SingleMachineBatchSystem.numCores < 4,
"Need at least four cores to run this test",
)
@retry_flaky_test(prepare=[ToilTest.tearDown, ToilTest.setUp])
@pytest.mark.timeout(1200)
def testServiceParallelRecursive(self, checkpoint=True):
"""
Tests the creation of a Job.Service, creating parallel chains of services and accessing jobs.
Randomly fails the worker.
"""

# This test needs to have something like 10 jobs succeed

BUNDLE_SIZE = 3
BUNDLE_COUNT = 2
RETRY_COUNT = 4
FAIL_FRACTION = 0.5
MAX_ATTEMPTS = 10

total_jobs = BUNDLE_SIZE * BUNDLE_COUNT * 2 + 1
p_complete_job_failure = FAIL_FRACTION ** (RETRY_COUNT + 1)
p_workflow_success = (1 - p_complete_job_failure) ** total_jobs
logger.info("Going to run %s total jobs, each of which completely fails %s of the time, so the workflow will succeed with probability %s", total_jobs, p_complete_job_failure, p_workflow_success)
p_test_failure = (1 - p_workflow_success) ** MAX_ATTEMPTS
logger.info("This test will fail spuriously with probability %s", p_test_failure)

# We want to run the workflow through several times to test restarting, so we need it to often fail but reliably sometimes succeed, and almost always succeed when repeated.

self.assertGreater(0.8, p_workflow_success)
self.assertGreater(p_workflow_success, 0.2)
self.assertGreater(0.001, p_test_failure)


for test in range(1):
# Temporary file
outFiles = [get_temp_file(rootDir=self._createTempDir()) for j in range(2)]
outFiles = [get_temp_file(rootDir=self._createTempDir()) for j in range(BUNDLE_COUNT)]
# We send 3 messages each in 2 sets, each of which needs a service and a client
messageBundles = [
[random.randint(1, sys.maxsize) for i in range(3)] for j in range(2)
[random.randint(1, sys.maxsize) for i in range(BUNDLE_SIZE)] for j in range(BUNDLE_COUNT)
]
try:
# Wire up the services/jobs
Expand All @@ -176,7 +199,7 @@ def testServiceParallelRecursive(self, checkpoint=True):
)

# Run the workflow repeatedly until success
self.runToil(t, retryCount=2)
self.runToil(t, retryCount=RETRY_COUNT, badWorker=FAIL_FRACTION, max_attempts=MAX_ATTEMPTS)

# Check output
for messages, outFile in zip(messageBundles, outFiles):
Expand All @@ -194,6 +217,7 @@ def runToil(
badWorkedFailInterval=0.1,
maxServiceJobs=sys.maxsize,
deadlockWait=60,
max_attempts=50
):
# Create the runner for the workflow.
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
Expand All @@ -207,17 +231,19 @@ def runToil(
options.deadlockWait = deadlockWait

# Run the workflow
totalTrys = 0
total_tries = 1
while True:
try:
Job.Runner.startToil(rootJob, options)
break
except FailedJobsException as e:
i = e.numberOfFailedJobs
if totalTrys > 50: # p(fail after this many restarts) = 0.5**32
logger.info("Workflow attempt %s/%s failed with %s failed jobs", total_tries, max_attempts, i)
if total_tries == max_attempts:
self.fail() # Exceeded a reasonable number of restarts
totalTrys += 1
total_tries += 1
options.restart = True
logger.info("Succeeded after %s/%s attempts", total_tries, max_attempts)


class PerfectServiceTest(JobServiceTest):
Expand Down Expand Up @@ -250,13 +276,16 @@ def serviceTest(job, outFile, messageInt):
"""
# Clean out out-file
open(outFile, "w").close()
randInt = random.randint(
# We create a random number that is added to messageInt and subtracted by
# the serviceAccessor, to prove that when service test is checkpointed and
# restarted there is never a connection made between an earlier service and
# later serviceAccessor, or vice versa.
to_subtract = random.randint(
1, sys.maxsize
) # We create a random number that is added to messageInt and subtracted by the serviceAccessor, to prove that
# when service test is checkpointed and restarted there is never a connection made between an earlier service and later serviceAccessor, or vice versa.
)
job.addChildJobFn(
serviceAccessor,
job.addService(ToyService(messageInt + randInt)),
job.addService(ToyService(messageInt + to_subtract)),
outFile,
randInt,
)
Expand Down Expand Up @@ -406,14 +435,14 @@ def serviceWorker(
raise


def serviceAccessor(job, communicationFiles, outFile, randInt):
def serviceAccessor(job, communicationFiles, outFile, to_subtract):
"""
Writes a random integer iinto the inJobStoreFileID file, then tries 10 times reading
from outJobStoreFileID to get a pair of integers, the first equal to i the second written into the outputFile.
"""
inJobStoreFileID, outJobStoreFileID = communicationFiles

# Get a random integer
# Get a random integer to advertise ourselves
key = random.randint(1, sys.maxsize)

# Write the integer into the file
Expand All @@ -438,10 +467,10 @@ def serviceAccessor(job, communicationFiles, outFile, randInt):

if int(key2) == key:
logger.debug(
f"Matched key's: {key}, writing message: {int(message) - randInt} with randInt: {randInt}"
f"Matched key's: {key}, writing message: {int(message) - to_subtract} with to_subtract: {to_subtract}"
)
with open(outFile, "a") as fH:
fH.write("%s\n" % (int(message) - randInt))
fH.write("%s\n" % (int(message) - to_subtract))
return

assert 0 # Job failed to get info from the service
Expand Down

0 comments on commit 5516a81

Please sign in to comment.