diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index 0ae5c8816b..fa6df1d325 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -66,6 +66,11 @@ class FileJobStore(AbstractJobStore): # 10Mb RAM chunks when reading/writing files BUFFER_SIZE = 10485760 # 10Mb + # When a log file is still being written, what will its name end with? + LOG_TEMP_SUFFIX = ".new" + # All log files start with this prefix + LOG_PREFIX = "stats" + def default_caching(self) -> bool: """ Jobstore's preference as to whether it likes caching or doesn't care about it. @@ -90,6 +95,9 @@ def __init__(self, path: str, fanOut: int = 1000) -> None: self.jobsDir = os.path.join(self.jobStoreDir, "jobs") # Directory where stats files go self.statsDir = os.path.join(self.jobStoreDir, "stats") + # Which has subdirectories for new and seen stats files + self.stats_inbox = os.path.join(self.statsDir, "inbox") + self.stats_archive = os.path.join(self.statsDir, "archive") # Directory where non-job-associated files for the file store go self.filesDir = os.path.join(self.jobStoreDir, "files/no-job") # Directory where job-associated files for the file store go. @@ -118,6 +126,8 @@ def initialize(self, config): raise os.makedirs(self.jobsDir, exist_ok=True) os.makedirs(self.statsDir, exist_ok=True) + os.makedirs(self.stats_inbox, exist_ok=True) + os.makedirs(self.stats_archive, exist_ok=True) os.makedirs(self.filesDir, exist_ok=True) os.makedirs(self.jobFilesDir, exist_ok=True) os.makedirs(self.sharedFilesDir, exist_ok=True) @@ -836,29 +846,66 @@ def list_all_file_names(self, for_job: Optional[str] = None) -> Iterable[str]: def write_logs(self, msg): # Temporary files are placed in the stats directory tree - tempStatsFileName = "stats" + str(uuid.uuid4().hex) + ".new" - tempStatsFile = os.path.join(self._get_arbitrary_stats_dir(), tempStatsFileName) + tempStatsFileName = self.LOG_PREFIX + str(uuid.uuid4().hex) + self.LOG_TEMP_SUFFIX + tempStatsFile = os.path.join(self._get_arbitrary_stats_inbox_dir(), tempStatsFileName) writeFormat = "w" if isinstance(msg, str) else "wb" with open(tempStatsFile, writeFormat) as f: f.write(msg) - os.rename(tempStatsFile, tempStatsFile[:-4]) # This operation is atomic + os.rename(tempStatsFile, tempStatsFile[:-len(self.LOG_TEMP_SUFFIX)]) # This operation is atomic def read_logs(self, callback, read_all=False): - numberOfFilesProcessed = 0 - for tempDir in self._stats_directories(): - for tempFile in os.listdir(tempDir): - if tempFile.startswith("stats"): - absTempFile = os.path.join(tempDir, tempFile) - if os.path.isfile(absTempFile): - if read_all or not tempFile.endswith(".new"): - with open(absTempFile, "rb") as fH: - callback(fH) - numberOfFilesProcessed += 1 - newName = tempFile.rsplit(".", 1)[0] + ".new" - newAbsTempFile = os.path.join(tempDir, newName) + files_processed = 0 + + # Holds pairs of a function to call to get directories to look at, and + # a flag for whether to archive the files found. + queries = [] + if read_all: + # If looking at all logs, check the archive + queries.append((self._stats_archive_directories, False)) + # Always check the inbox and archive from it. But do it after checking + # the archive to avoid duplicates in the same pass. + queries.append((self._stats_inbox_directories, True)) + + for to_call, should_archive in queries: + for log_dir in to_call(): + for log_file in os.listdir(log_dir): + if not log_file.startswith(self.LOG_PREFIX): + # Skip anything not a log file (like the other spray + # directories) + continue + if log_file.endswith(self.LOG_TEMP_SUFFIX): + # Skip partially-written files, always. + continue + + abs_log_file = os.path.join(log_dir, log_file) + if not os.path.isfile(abs_log_file): + # This can't be a log file. + continue + try: + opened_file = open(abs_log_file, "rb") + except FileNotFoundError: + # File disappeared before we could open it. + # Maybe someone else is reading logs? + continue + with opened_file as f: + callback(f) + files_processed += 1 + + if should_archive: + # We need to move the stats file to the archive. + # Since we have UUID stats file names we don't need + # to worry about collisions when it gets there. + new_dir = self._get_arbitrary_stats_archive_dir() + new_abs_log_file = os.path.join(new_dir, log_file) + try: # Mark this item as read - os.rename(absTempFile, newAbsTempFile) - return numberOfFilesProcessed + os.rename(abs_log_file, new_abs_log_file) + except FileNotFoundError: + # File we wanted to archive disappeared. + # Maybe someone else is reading logs? + # TODO: Raise ConcurrentFileModificationException? + continue + return files_processed ########################################## # Private methods @@ -1010,9 +1057,10 @@ def _get_arbitrary_jobs_dir_for_name(self, jobNameSlug): os.path.join(self.jobsDir, self.JOB_NAME_DIR_PREFIX + jobNameSlug) ) - def _get_arbitrary_stats_dir(self): + def _get_arbitrary_stats_inbox_dir(self): """ - Gets a temporary directory in a multi-level hierarchy in self.statsDir. + Gets a temporary directory in a multi-level hierarchy in + self.stats_inbox, where stats files not yet seen by the leader live. The directory is not unique and may already have other stats files in it. :rtype : string, path to temporary directory in which to place files/directories. @@ -1020,7 +1068,20 @@ def _get_arbitrary_stats_dir(self): """ - return self._get_dynamic_spray_dir(self.statsDir) + return self._get_dynamic_spray_dir(self.stats_inbox) + + def _get_arbitrary_stats_archive_dir(self): + """ + Gets a temporary directory in a multi-level hierarchy in + self.stats_archive, where stats files already seen by the leader live. + The directory is not unique and may already have other stats files in it. + + :rtype : string, path to temporary directory in which to place files/directories. + + + """ + + return self._get_dynamic_spray_dir(self.stats_archive) def _get_arbitrary_files_dir(self): """ @@ -1156,14 +1217,23 @@ def _job_directories(self): os.path.join(jobHoldingDir, jobNameDir) ) - def _stats_directories(self): + def _stats_inbox_directories(self): """ - :rtype : an iterator to the temporary directories containing stats - files. They may also contain directories containing more - stats files. + :returns: an iterator to the temporary directories containing new stats + files. They may also contain directories containing more stats + files. + """ + + return self._walk_dynamic_spray_dir(self.stats_inbox) + + def _stats_archive_directories(self): + """ + :returns: an iterator to the temporary directories containing + previously observed stats files. They may also contain directories + containing more stats files. """ - return self._walk_dynamic_spray_dir(self.statsDir) + return self._walk_dynamic_spray_dir(self.stats_archive) def _get_unique_file_path(self, fileName, jobStoreID=None, cleanup=False): """