Skip to content

Commit

Permalink
pipeline manager improvments
Browse files Browse the repository at this point in the history
  • Loading branch information
khoroshevskyi committed Nov 30, 2023
1 parent 694296b commit f5b82f4
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions bedboss/bedboss.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ def run_all(
output_folder_bedstat = os.path.join(outfolder, "output")
os.environ["BEDBOSS_OUTPUT_PATH"] = output_folder_bedstat

if not pm:
pm_out_folder = os.path.join(os.path.abspath(outfolder), "pipeline_manager")
pm = pypiper.PipelineManager(
name="bedboss-pipeline",
outfolder=pm_out_folder,
version=__version__,
recover=True,
)

BedMaker(
input_file=input_file,
input_type=input_type,
Expand Down Expand Up @@ -274,18 +283,7 @@ def insert_pep(
pep.samples[i].record_identifier = bed_id

if upload_s3:
command = f"aws s3 sync {os.path.join(output_folder, BED_FOLDER_NAME)} s3://bedbase/{BED_FOLDER_NAME} --size-only --exclude 'bed_qc/*'"
_LOGGER.info("Uploading to s3 bed files")
pm.run(cmd=command, lock_name="s3_sync_big")

command = f"aws s3 sync {os.path.join(output_folder, BIGBED_FOLDER_NAME)} s3://bedbase/{BIGBED_FOLDER_NAME} --size-only"
_LOGGER.info("Uploading to s3 bigbed files")
pm.run(cmd=command, lock_name="s3_sync_bigbed")

command = f"aws s3 sync {os.path.join(output_folder, OUTPUT_FOLDER_NAME)} s3://bedbase/{OUTPUT_FOLDER_NAME} --size-only"
_LOGGER.info("Uploading to s3 bed statistics files")
pm.run(cmd=command, lock_name="s3_sync_bedstat")

load_to_s3(output_folder, pm)
else:
_LOGGER.info("Skipping uploading to s3. Flag `upload_s3` is set to False")

Expand All @@ -302,6 +300,25 @@ def insert_pep(
)


def load_to_s3(output_folder: str, pm: pypiper.PipelineManager) -> NoReturn:
"""
Load bedfiles and statistics to s3
:param output_folder: base output folder
:param pm: pipelineManager object
:return: NoReturn
"""
command = f"aws s3 sync {os.path.join(output_folder, BED_FOLDER_NAME)} s3://bedbase/{BED_FOLDER_NAME} --size-only --exclude 'bed_qc/*'"
_LOGGER.info("Uploading to s3 bed files")
pm.run(cmd=command, lock_name="s3_sync_big")
command = f"aws s3 sync {os.path.join(output_folder, BIGBED_FOLDER_NAME)} s3://bedbase/{BIGBED_FOLDER_NAME} --size-only"
_LOGGER.info("Uploading to s3 bigbed files")
pm.run(cmd=command, lock_name="s3_sync_bigbed")
command = f"aws s3 sync {os.path.join(output_folder, OUTPUT_FOLDER_NAME)} s3://bedbase/{OUTPUT_FOLDER_NAME} --size-only"
_LOGGER.info("Uploading to s3 bed statistics files")
pm.run(cmd=command, lock_name="s3_sync_bedstat")


def main(test_args: dict = None) -> NoReturn:
"""
Run pipeline that was specified in as positional argument.
Expand Down

0 comments on commit f5b82f4

Please sign in to comment.