Skip to content

Commit

Permalink
Make file job store stats and logging files flow one way, and never s…
Browse files Browse the repository at this point in the history
…how partially written logs (#5141)

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
adamnovak and github-actions[bot] authored Nov 20, 2024
1 parent 42c7caa commit a5657cf
Showing 1 changed file with 95 additions and 25 deletions.
120 changes: 95 additions & 25 deletions src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class FileJobStore(AbstractJobStore):
# 10Mb RAM chunks when reading/writing files
BUFFER_SIZE = 10485760 # 10Mb

# When a log file is still being written, what will its name end with?
LOG_TEMP_SUFFIX = ".new"
# All log files start with this prefix
LOG_PREFIX = "stats"

def default_caching(self) -> bool:
"""
Jobstore's preference as to whether it likes caching or doesn't care about it.
Expand All @@ -90,6 +95,9 @@ def __init__(self, path: str, fanOut: int = 1000) -> None:
self.jobsDir = os.path.join(self.jobStoreDir, "jobs")
# Directory where stats files go
self.statsDir = os.path.join(self.jobStoreDir, "stats")
# Which has subdirectories for new and seen stats files
self.stats_inbox = os.path.join(self.statsDir, "inbox")
self.stats_archive = os.path.join(self.statsDir, "archive")
# Directory where non-job-associated files for the file store go
self.filesDir = os.path.join(self.jobStoreDir, "files/no-job")
# Directory where job-associated files for the file store go.
Expand Down Expand Up @@ -118,6 +126,8 @@ def initialize(self, config):
raise
os.makedirs(self.jobsDir, exist_ok=True)
os.makedirs(self.statsDir, exist_ok=True)
os.makedirs(self.stats_inbox, exist_ok=True)
os.makedirs(self.stats_archive, exist_ok=True)
os.makedirs(self.filesDir, exist_ok=True)
os.makedirs(self.jobFilesDir, exist_ok=True)
os.makedirs(self.sharedFilesDir, exist_ok=True)
Expand Down Expand Up @@ -836,29 +846,66 @@ def list_all_file_names(self, for_job: Optional[str] = None) -> Iterable[str]:

def write_logs(self, msg):
# Temporary files are placed in the stats directory tree
tempStatsFileName = "stats" + str(uuid.uuid4().hex) + ".new"
tempStatsFile = os.path.join(self._get_arbitrary_stats_dir(), tempStatsFileName)
tempStatsFileName = self.LOG_PREFIX + str(uuid.uuid4().hex) + self.LOG_TEMP_SUFFIX
tempStatsFile = os.path.join(self._get_arbitrary_stats_inbox_dir(), tempStatsFileName)
writeFormat = "w" if isinstance(msg, str) else "wb"
with open(tempStatsFile, writeFormat) as f:
f.write(msg)
os.rename(tempStatsFile, tempStatsFile[:-4]) # This operation is atomic
os.rename(tempStatsFile, tempStatsFile[:-len(self.LOG_TEMP_SUFFIX)]) # This operation is atomic

def read_logs(self, callback, read_all=False):
numberOfFilesProcessed = 0
for tempDir in self._stats_directories():
for tempFile in os.listdir(tempDir):
if tempFile.startswith("stats"):
absTempFile = os.path.join(tempDir, tempFile)
if os.path.isfile(absTempFile):
if read_all or not tempFile.endswith(".new"):
with open(absTempFile, "rb") as fH:
callback(fH)
numberOfFilesProcessed += 1
newName = tempFile.rsplit(".", 1)[0] + ".new"
newAbsTempFile = os.path.join(tempDir, newName)
files_processed = 0

# Holds pairs of a function to call to get directories to look at, and
# a flag for whether to archive the files found.
queries = []
if read_all:
# If looking at all logs, check the archive
queries.append((self._stats_archive_directories, False))
# Always check the inbox and archive from it. But do it after checking
# the archive to avoid duplicates in the same pass.
queries.append((self._stats_inbox_directories, True))

for to_call, should_archive in queries:
for log_dir in to_call():
for log_file in os.listdir(log_dir):
if not log_file.startswith(self.LOG_PREFIX):
# Skip anything not a log file (like the other spray
# directories)
continue
if log_file.endswith(self.LOG_TEMP_SUFFIX):
# Skip partially-written files, always.
continue

abs_log_file = os.path.join(log_dir, log_file)
if not os.path.isfile(abs_log_file):
# This can't be a log file.
continue
try:
opened_file = open(abs_log_file, "rb")
except FileNotFoundError:
# File disappeared before we could open it.
# Maybe someone else is reading logs?
continue
with opened_file as f:
callback(f)
files_processed += 1

if should_archive:
# We need to move the stats file to the archive.
# Since we have UUID stats file names we don't need
# to worry about collisions when it gets there.
new_dir = self._get_arbitrary_stats_archive_dir()
new_abs_log_file = os.path.join(new_dir, log_file)
try:
# Mark this item as read
os.rename(absTempFile, newAbsTempFile)
return numberOfFilesProcessed
os.rename(abs_log_file, new_abs_log_file)
except FileNotFoundError:
# File we wanted to archive disappeared.
# Maybe someone else is reading logs?
# TODO: Raise ConcurrentFileModificationException?
continue
return files_processed

##########################################
# Private methods
Expand Down Expand Up @@ -1010,17 +1057,31 @@ def _get_arbitrary_jobs_dir_for_name(self, jobNameSlug):
os.path.join(self.jobsDir, self.JOB_NAME_DIR_PREFIX + jobNameSlug)
)

def _get_arbitrary_stats_dir(self):
def _get_arbitrary_stats_inbox_dir(self):
"""
Gets a temporary directory in a multi-level hierarchy in self.statsDir.
Gets a temporary directory in a multi-level hierarchy in
self.stats_inbox, where stats files not yet seen by the leader live.
The directory is not unique and may already have other stats files in it.
:rtype : string, path to temporary directory in which to place files/directories.
"""

return self._get_dynamic_spray_dir(self.statsDir)
return self._get_dynamic_spray_dir(self.stats_inbox)

def _get_arbitrary_stats_archive_dir(self):
"""
Gets a temporary directory in a multi-level hierarchy in
self.stats_archive, where stats files already seen by the leader live.
The directory is not unique and may already have other stats files in it.
:rtype : string, path to temporary directory in which to place files/directories.
"""

return self._get_dynamic_spray_dir(self.stats_archive)

def _get_arbitrary_files_dir(self):
"""
Expand Down Expand Up @@ -1156,14 +1217,23 @@ def _job_directories(self):
os.path.join(jobHoldingDir, jobNameDir)
)

def _stats_directories(self):
def _stats_inbox_directories(self):
"""
:rtype : an iterator to the temporary directories containing stats
files. They may also contain directories containing more
stats files.
:returns: an iterator to the temporary directories containing new stats
files. They may also contain directories containing more stats
files.
"""

return self._walk_dynamic_spray_dir(self.stats_inbox)

def _stats_archive_directories(self):
"""
:returns: an iterator to the temporary directories containing
previously observed stats files. They may also contain directories
containing more stats files.
"""

return self._walk_dynamic_spray_dir(self.statsDir)
return self._walk_dynamic_spray_dir(self.stats_archive)

def _get_unique_file_path(self, fileName, jobStoreID=None, cleanup=False):
"""
Expand Down

0 comments on commit a5657cf

Please sign in to comment.