diff --git a/anvio/utils.py b/anvio/utils.py index c0dd44a9d3..fd18ab8d59 100644 --- a/anvio/utils.py +++ b/anvio/utils.py @@ -452,6 +452,121 @@ def __init__(self, coverage, skip_outliers=False): self.is_outlier = get_list_of_outliers(coverage, median=self.median) # this is an array not a list +class DatabaseLockForBabies: + """A quick-and-dirty way to avoid the same anvi'o program to be run on the same database in parallel. + + Through an example via anvi-run-hmms we learned the hard way that not preventing parallel runs, + frequently lead to race conditions for small contigs databses that ruin database structure. Since + there is no way to stop users from doing it, this class helps with some explicit rules. + + The most straightforward way of using this class goes like this. Let's assume you have a `contigs_db` + and you are running the program `anvi-run-hmms` on it (the program name could be anything, so if + you fancy, you can put a general aim there rather than a program name such as `HMMs`). In this setting, + if you want your code to only run while there is no other instance of the same is running on that + database, you can do this: + + >>> lock = DatabaseLockForBabies() + >>> lock.wait() + >>> (DO YOUR STUFF) + >>> lock.release() + + If there is already a lock, `lock.wait()` will wait until there is none, and then create a lock for your + process before moving on so you can do your stuff before releasing it. If there is no lock, then the + # `lock.wait()` will simply serve as a way to generate a lock. + + Alternatively, you can create a lock and then release it yourself without waiting, in which case any other + program that also tries to create a lock without `wait` would get an error message: + + >>> lock = DatabaseLockForBabies() + >>> lock.set() + >>> (DO YOUR STUFF) + >>> lock.release() + + Simple stuff. + """ + + def __init__(self, db_path, program, run=Run(), progress=Progress()): + self.db_path = db_path + self.program = program + + self.run = run + self.progress = progress + + dir_path = os.path.dirname(os.path.abspath(self.db_path)) + file_name = os.path.basename(os.path.abspath(self.db_path)) + + # make sure we have write access to the directory + filesnpaths.is_output_dir_writable(dir_path) + + # this is the lock file we will use + self._lock = os.path.join(dir_path, f".{file_name}.{self.program}.lock") + + + def is_locked(self): + return True if os.path.exists(self._lock) else False + + + def set(self): + if self.is_locked(): + raise ConfigError(f"There is a lock for this database set by the program '{self.program}'. " + f"You can either wait for it to be released, or manually remove the lock " + f"before running the member function `set()` again. FYI, the lock file is " + f"here: {self._lock}") + else: + open(self._lock, 'w').write(f"{time.time()}\n") + + + def wait(self, set_lock_after_wait=True): + """Wait until an existing lock, if there is one, released. + + If you do not want this function to start a lock, set `set_lock_after_wait` to False + """ + + there_was_a_lock = False + + if self.is_locked(): + there_was_a_lock = True + self.run.warning(f"It seems an instance of '{self.program}' is already running on your database. " + f"Your current request will wait here until the other process releases the " + f"lock it set on this database. If this is taking forever and you are sure that " + f"nothing is running on the database anymore (which can happen if your " + f"program was killed by your HPC or something), you can manually remove " + f"the lock in a separete terminal, and this process will then continue. If you " + f"are unsure, please wait and hope that things will sort themselves out. Just FYI, " + f"the lock file is located here: {self._lock}", + header="🔒 WARNING: LOCK FOUND, NOT YET RUNNING 🔒", lc="yellow") + + self.progress.new("Lock") + self.progress.update('...') + + while self.is_locked(): + try: + delta = time.time() - float(open(self._lock).read()) + delta_txt = time.strftime("%H hours, %M mins, %S secs", time.gmtime(delta)) + self.progress.update(f"Has been there since {delta_txt} ago.") + except: + pass + + time.sleep(5) + + # no lock! + self.progress.end() + + if there_was_a_lock: + self.run.info_single(f"The lock that was put in {delta_txt} ago is now removed :)", nl_after=1) + + + if set_lock_after_wait: + self.set() + + + def release(self): + try: + os.remove(self._lock) + except: + pass + + class RunInDirectory(object): """ Run any block of code in a specified directory. Return to original directory diff --git a/bin/anvi-run-hmms b/bin/anvi-run-hmms index d46491c872..9d6387e279 100755 --- a/bin/anvi-run-hmms +++ b/bin/anvi-run-hmms @@ -3,10 +3,12 @@ import os import sys +import time import anvio import anvio.utils as utils import anvio.terminal as terminal +import anvio.filesnpaths as filesnpaths with terminal.SuppressAllOutput(): import anvio.data.hmm as hmm_data @@ -91,14 +93,30 @@ def main(args): "this output in a directory with --hmmer-output-dir. There is no point to requesting this output " "if you are never going to see it, so we figured we'd stop you right there. :)") - search_tables = TablesForHMMHits(args.contigs_db, num_threads_to_use=args.num_threads, just_do_it=args.just_do_it, - hmm_program_to_use=args.hmmer_program, hmmer_output_directory=args.hmmer_output_dir, - get_domain_table_output=args.domain_hits_table, add_to_functions_table=args.add_to_functions_table) - search_tables.populate_search_tables(sources) + # right before we start, we want to make sure there is no other program actually + # running on the same database, and we will not start until it is over: + lock = utils.DatabaseLockForBabies(args.contigs_db, 'HMMs') + lock.wait() - if not args.hmm_profile_dir and not args.installed_hmm_profile and args.also_scan_trnas: - tables_for_trna_hits = TablesForTransferRNAs(args) - tables_for_trna_hits.populate_search_tables(args.contigs_db) + # this try/except block is critical, because we don't want to find any lingering + # lock files around when a process is dead due to an exception. + try: + search_tables = TablesForHMMHits(args.contigs_db, num_threads_to_use=args.num_threads, just_do_it=args.just_do_it, + hmm_program_to_use=args.hmmer_program, hmmer_output_directory=args.hmmer_output_dir, + get_domain_table_output=args.domain_hits_table, add_to_functions_table=args.add_to_functions_table) + search_tables.populate_search_tables(sources) + + if not args.hmm_profile_dir and not args.installed_hmm_profile and args.also_scan_trnas: + tables_for_trna_hits = TablesForTransferRNAs(args) + tables_for_trna_hits.populate_search_tables(args.contigs_db) + except Exception as e: + # there was a snafu? get rid of your release first before passing the error to + # the user: + lock.release() + raise ConfigError(e.e) + + # all good + lock.release() if __name__ == '__main__': diff --git a/bin/anvi-scan-trnas b/bin/anvi-scan-trnas index c68a1ea427..675a32f8f9 100755 --- a/bin/anvi-scan-trnas +++ b/bin/anvi-scan-trnas @@ -4,6 +4,7 @@ import sys import anvio +import anvio.utils as utils import anvio.terminal as terminal from anvio.tables.trnahits import TablesForTransferRNAs @@ -28,8 +29,19 @@ progress = terminal.Progress() @time_program def main(args): - tables_for_trna_hits = TablesForTransferRNAs(args) - tables_for_trna_hits.populate_search_tables(args.contigs_db) + # right before we start, we want to make sure there is no other program actually + # running on the same database, and we will not start until it is over: + lock = utils.DatabaseLockForBabies(args.contigs_db, 'HMMs') + lock.wait() + + try: + tables_for_trna_hits = TablesForTransferRNAs(args) + tables_for_trna_hits.populate_search_tables(args.contigs_db) + except Exception as e: + lock.release() + raise ConfigError(e.e) + + lock.release() if __name__ == '__main__':