Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB lock for babies #1974

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions anvio/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,121 @@ def __init__(self, coverage, skip_outliers=False):
self.is_outlier = get_list_of_outliers(coverage, median=self.median) # this is an array not a list


class DatabaseLockForBabies:
"""A quick-and-dirty way to avoid the same anvi'o program to be run on the same database in parallel.

Through an example via anvi-run-hmms we learned the hard way that not preventing parallel runs,
frequently lead to race conditions for small contigs databses that ruin database structure. Since
there is no way to stop users from doing it, this class helps with some explicit rules.

The most straightforward way of using this class goes like this. Let's assume you have a `contigs_db`
and you are running the program `anvi-run-hmms` on it (the program name could be anything, so if
you fancy, you can put a general aim there rather than a program name such as `HMMs`). In this setting,
if you want your code to only run while there is no other instance of the same is running on that
database, you can do this:

>>> lock = DatabaseLockForBabies()
>>> lock.wait()
>>> (DO YOUR STUFF)
>>> lock.release()

If there is already a lock, `lock.wait()` will wait until there is none, and then create a lock for your
process before moving on so you can do your stuff before releasing it. If there is no lock, then the
# `lock.wait()` will simply serve as a way to generate a lock.

Alternatively, you can create a lock and then release it yourself without waiting, in which case any other
program that also tries to create a lock without `wait` would get an error message:

>>> lock = DatabaseLockForBabies()
>>> lock.set()
>>> (DO YOUR STUFF)
>>> lock.release()

Simple stuff.
"""

def __init__(self, db_path, program, run=Run(), progress=Progress()):
self.db_path = db_path
self.program = program

self.run = run
self.progress = progress

dir_path = os.path.dirname(os.path.abspath(self.db_path))
file_name = os.path.basename(os.path.abspath(self.db_path))

# make sure we have write access to the directory
filesnpaths.is_output_dir_writable(dir_path)

# this is the lock file we will use
self._lock = os.path.join(dir_path, f".{file_name}.{self.program}.lock")


def is_locked(self):
return True if os.path.exists(self._lock) else False


def set(self):
if self.is_locked():
raise ConfigError(f"There is a lock for this database set by the program '{self.program}'. "
f"You can either wait for it to be released, or manually remove the lock "
f"before running the member function `set()` again. FYI, the lock file is "
f"here: {self._lock}")
else:
open(self._lock, 'w').write(f"{time.time()}\n")


def wait(self, set_lock_after_wait=True):
"""Wait until an existing lock, if there is one, released.

If you do not want this function to start a lock, set `set_lock_after_wait` to False
"""

there_was_a_lock = False

if self.is_locked():
there_was_a_lock = True
self.run.warning(f"It seems an instance of '{self.program}' is already running on your database. "
f"Your current request will wait here until the other process releases the "
f"lock it set on this database. If this is taking forever and you are sure that "
f"nothing is running on the database anymore (which can happen if your "
f"program was killed by your HPC or something), you can manually remove "
f"the lock in a separete terminal, and this process will then continue. If you "
f"are unsure, please wait and hope that things will sort themselves out. Just FYI, "
f"the lock file is located here: {self._lock}",
header="🔒 WARNING: LOCK FOUND, NOT YET RUNNING 🔒", lc="yellow")

self.progress.new("Lock")
self.progress.update('...')

while self.is_locked():
try:
delta = time.time() - float(open(self._lock).read())
delta_txt = time.strftime("%H hours, %M mins, %S secs", time.gmtime(delta))
self.progress.update(f"Has been there since {delta_txt} ago.")
except:
pass

time.sleep(5)

# no lock!
self.progress.end()

if there_was_a_lock:
self.run.info_single(f"The lock that was put in {delta_txt} ago is now removed :)", nl_after=1)


if set_lock_after_wait:
self.set()


def release(self):
try:
os.remove(self._lock)
except:
pass


class RunInDirectory(object):
""" Run any block of code in a specified directory. Return to original directory

Expand Down
32 changes: 25 additions & 7 deletions bin/anvi-run-hmms
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

import os
import sys
import time

import anvio
import anvio.utils as utils
import anvio.terminal as terminal
import anvio.filesnpaths as filesnpaths

with terminal.SuppressAllOutput():
import anvio.data.hmm as hmm_data
Expand Down Expand Up @@ -91,14 +93,30 @@ def main(args):
"this output in a directory with --hmmer-output-dir. There is no point to requesting this output "
"if you are never going to see it, so we figured we'd stop you right there. :)")

search_tables = TablesForHMMHits(args.contigs_db, num_threads_to_use=args.num_threads, just_do_it=args.just_do_it,
hmm_program_to_use=args.hmmer_program, hmmer_output_directory=args.hmmer_output_dir,
get_domain_table_output=args.domain_hits_table, add_to_functions_table=args.add_to_functions_table)
search_tables.populate_search_tables(sources)
# right before we start, we want to make sure there is no other program actually
# running on the same database, and we will not start until it is over:
lock = utils.DatabaseLockForBabies(args.contigs_db, 'HMMs')
lock.wait()

if not args.hmm_profile_dir and not args.installed_hmm_profile and args.also_scan_trnas:
tables_for_trna_hits = TablesForTransferRNAs(args)
tables_for_trna_hits.populate_search_tables(args.contigs_db)
# this try/except block is critical, because we don't want to find any lingering
# lock files around when a process is dead due to an exception.
try:
search_tables = TablesForHMMHits(args.contigs_db, num_threads_to_use=args.num_threads, just_do_it=args.just_do_it,
hmm_program_to_use=args.hmmer_program, hmmer_output_directory=args.hmmer_output_dir,
get_domain_table_output=args.domain_hits_table, add_to_functions_table=args.add_to_functions_table)
search_tables.populate_search_tables(sources)

if not args.hmm_profile_dir and not args.installed_hmm_profile and args.also_scan_trnas:
tables_for_trna_hits = TablesForTransferRNAs(args)
tables_for_trna_hits.populate_search_tables(args.contigs_db)
except Exception as e:
# there was a snafu? get rid of your release first before passing the error to
# the user:
lock.release()
raise ConfigError(e.e)

# all good
lock.release()


if __name__ == '__main__':
Expand Down
16 changes: 14 additions & 2 deletions bin/anvi-scan-trnas
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys

import anvio
import anvio.utils as utils
import anvio.terminal as terminal

from anvio.tables.trnahits import TablesForTransferRNAs
Expand All @@ -28,8 +29,19 @@ progress = terminal.Progress()

@time_program
def main(args):
tables_for_trna_hits = TablesForTransferRNAs(args)
tables_for_trna_hits.populate_search_tables(args.contigs_db)
# right before we start, we want to make sure there is no other program actually
# running on the same database, and we will not start until it is over:
lock = utils.DatabaseLockForBabies(args.contigs_db, 'HMMs')
lock.wait()

try:
tables_for_trna_hits = TablesForTransferRNAs(args)
tables_for_trna_hits.populate_search_tables(args.contigs_db)
except Exception as e:
lock.release()
raise ConfigError(e.e)

lock.release()


if __name__ == '__main__':
Expand Down