Skip to content

Commit

Permalink
Merge pull request #55 from genomic-medicine-sweden/doodads
Browse files Browse the repository at this point in the history
Search improvments and refactoring
  • Loading branch information
sylvinite authored Nov 23, 2022
2 parents 4bbdb8d + 7c8aa49 commit e27040e
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 153 deletions.
3 changes: 3 additions & 0 deletions NGPIris/cli/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from NGPIris import log, logformat
from NGPIris.hcp import HCPManager
from NGPIris.hci import HCIManager
from NGPIris.hcp.interactive import HCPInteracter
from NGPIris.preproc import preproc
from NGPIris.cli.functions import delete, search, upload, download
from NGPIris.cli.utils import utils
Expand Down Expand Up @@ -33,6 +34,8 @@ def root(ctx, endpoint, access_key_id, access_key, bucket, credentials, password
hcpm.attach_bucket(bucket)
hcpm.test_connection()
ctx.obj["hcpm"] = hcpm
interact = HCPInteracter(hcpm)
ctx.obj['interactive'] = interact

if pw:
hcim = HCIManager(pw)
Expand Down
139 changes: 9 additions & 130 deletions NGPIris/cli/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,10 @@
# Downloads or deletes files from selected bucket on the HCP.

import click
import glob
import os
import json
import sys
import time

from pathlib import Path

from NGPIris import log, TIMESTAMP
from NGPIris.preproc import preproc
from NGPIris import log

##############################################

Expand Down Expand Up @@ -66,111 +60,28 @@ def search(ctx, query, index, output, mode):
@click.pass_obj
def delete(ctx,query,force):
"""Delete a file on the HCP"""

objs = ctx['hcpm'].search_objects(query) # Get object with query
if len(objs) < 1:
log.info(f"File: {query} does not exist on {ctx['hcpm'].bucket.name}")
else:
hits = list()
for obj in objs:
hits.append(obj.key)
log.info(f"Found {len(objs)} entries matching query '{query}':")
log.info(f"{hits}")
for obj in objs:
if not force:
sys.stdout.write(f"You are about to delete the file {obj.key} " \
f"on {ctx['hcpm'].bucket.name}, are you sure? [Y/N/Q]?\n")
sys.stdout.flush()
answer = sys.stdin.readline()
if answer[0].lower() == "y":
ctx['hcpm'].delete_object(obj) # Delete file.
time.sleep(2)
log.info(f"Deleted file {obj.key} ")
elif answer[0].lower() == "q":
break
else:
log.info(f"Skipped deleting {obj.key} ")
elif force:
ctx['hcpm'].delete_object(obj) # Delete file.
time.sleep(2)
log.info(f"Deleted file {obj.key} ")

ctx['interactive'].delete_interactive(query, force)

@click.command()
@click.argument("input")
@click.option('-o',"--output",help="Destination file name on HCP", default="")
@click.option('-t',"--tag", default="None", help="Tag for downstream pipeline execution")
@click.option('-m',"--meta",help="Local path for generated metadata file",default=f"")
@click.option('-m',"--meta",help="Local path for metadata file",default=f"")
@click.option('-s',"--silent",help="Suppresses file progress output",is_flag=True,default=False)
@click.option('-a',"--atypical",help="Allows upload of non-fastq file", is_flag=True,default=False)
@click.pass_obj
def upload(ctx, input, output, tag, meta,silent,atypical):
def upload(ctx, input, output, tag, meta, silent,atypical):
"""Upload fastq files / fastq folder structure"""
file_lst = []

#Defaults output to input name
if output == "":
output = os.path.basename(input)
#If output is folder. Default file name to input name
elif output[-1] in ["/","\\"]:
output = os.path.join(output, os.path.basename(input))

dstfld = Path(output)
dstfld = dstfld.parent
if dstfld.parts == ():
dstfld = ""

if os.path.isdir(input):
#Recursively loop over all folders
for root, dirs, files in os.walk(folder):
for f in files:
try:
if not atypical:
preproc.verify_fq_suffix(os.path.join(root,f))
preproc.verify_fq_content(os.path.join(root,f))
if meta != "":
preproc.generate_tagmap(os.path.join(root,f), tag, meta)
file_lst.append(os.path.join(root,f))
except Exception as e:
log.warning(f"{f} is not a valid upload file: {e}")
else:
input = os.path.abspath(input)
try:
if not atypical:
preproc.verify_fq_suffix(input)
preproc.verify_fq_content(input)
if meta != "":
preproc.generate_tagmap(input, tag, meta)
file_lst.append(input)
except Exception as e:
log.warning(f"{input} is not a valid upload file: {e}")
sys.exit(-1)

if file_lst == []:
log.error(f"{input} could not be uploaded to NGPr. Try using an atypical upload")
for file_pg in file_lst:
if silent:
ctx['hcpm'].upload_file(file_pg, output, callback=False)
else:
ctx['hcpm'].upload_file(file_pg, output)
#time.sleep(2)
log.info(f"Uploaded: {file_pg}")

if meta != "":
meta_fn = Path(meta).name
# Uploads associated json files.
if silent:
ctx['hcpm'].upload_file(meta, os.path.join(dstfld, meta_fn), callback=False)
else:
ctx['hcpm'].upload_file(meta, os.path.join(dstfld, meta_fn))
ctx['interactive'].upload_interactive(input, output, fastq_only=(not atypical), metadata=meta, silent=silent)

@click.command()
@click.argument("query")
@click.option('-o',"--output",help="Specify output file to write to",default="")
@click.option("-m", "--mode",help="Search mode", type=click.Choice(['ngpi','ngpr','none','legacy-ngpi'], case_sensitive=False),default='ngpr')
@click.option('-s',"--silent",help="Suppresses file progress output",is_flag=True,default=False)
@click.option('-f',"--force",help="Overwrites any local file with same name if present",is_flag=True, default=False)
@click.pass_obj
def download(ctx, query, output,mode, silent):
def download(ctx, query, output,mode, silent,force):
"""Download files using a given query"""

#Defaults output to input name
Expand Down Expand Up @@ -217,43 +128,11 @@ def download(ctx, query, output,mode, silent):
log.error(f"File: '{name}' does not exist in bucket '{bucket}' on the HCP")

elif mode == "ngpr":
found_objs = ctx['hcpm'].search_objects(query)
if len(found_objs) == 0:
log.info(f"File: {query} does not exist on {ctx['hcpm'].bucket.name}")
elif len(found_objs) > 1:
for obj in found_objs:
log.info(f"Found {len(found_objs)} files matching query")
log.info(f"Download {obj}? [Y/N]")
sys.stdout.write(f"[--] Do you wish to download {obj.key} on {ctx['hcpm'].bucket.name}? [Y/N]?\n")
sys.stdout.flush()
answer = sys.stdin.readline()
if answer[0].lower() == "y":
obj = ctx['hcpm'].get_object(query) # Get object with key.
#Default output name to key
if output == "":
output = obj.key
#If output is folder. Default file name to obj.key
elif output[-1] in ["/","\\"]:
output = os.path.join(output, obj.key)
if silent:
ctx['hcpm'].download_file(obj, output, force=True, callback=False) # Downloads file.
else:
ctx['hcpm'].download_file(obj, output, force=True) # Downloads file.
#log.info(f"Downloaded {obj.key}"

elif len(found_objs) == 1:
obj = ctx['hcpm'].get_object(query) # Get object with key.
if silent:
ctx['hcpm'].download_file(obj, output, force=True, callback=False) # Downloads file.
else:
ctx['hcpm'].download_file(obj, output, force=True) # Downloads file.
ctx['interactive'].download_interactive(query, destination=output, force=force, silent=silent)

elif mode =="none":
obj = ctx['hcpm'].get_object(query) # Get object with key.
if silent:
ctx['hcpm'].download_file(obj, output, force=True, callback=False) # Downloads file.
else:
ctx['hcpm'].download_file(obj, output, force=True) # Downloads file.
ctx['hcpm'].download_file(obj, output, force=force, callback=(not silent)) # Downloads file.

def main():
pass
Expand Down
15 changes: 15 additions & 0 deletions NGPIris/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ def indices(ctx, index):
pretty = json.dumps(index_list)
print(json.dumps(pretty, indent=4))

@utils.command()
@click.pass_obj
def list_buckets(ctx):
"""Lists all accessible buckets for the provided credentials"""
hcpm = ctx['hcpm']
ls = hcpm.list_buckets()
log.info(f"Buckets: {ls}")

@utils.command()
@click.pass_obj
def test_connection(ctx):
"""Tests credentials for validity"""
if not (40 in log._cache or 30 in log._cache):
log.info(f"A successful connection has been established!")

def main():
pass

Expand Down
4 changes: 3 additions & 1 deletion NGPIris/hcp/hcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ def search_objects(self, string, mode="all"):
"""Return all objects whose keys contain the given string."""
if not hasattr(self, 'objects'):
self.get_objects()
cstr = re.compile(string)
string = string.replace('.', '\.') #escape dots
string = string.replace('*','.*') #support normie asterix
cstr = re.compile(f'.*{string}.*') #pad string with 'any' on both sides
if mode == "all":
return [obj for obj in self.objects if re.search(cstr, obj.key) ]
elif mode == "file":
Expand Down
122 changes: 122 additions & 0 deletions NGPIris/hcp/interactive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python
"""
Module for interactive usage.
"""
import os
import sys
import time

from pathlib import Path

from NGPIris import log, TIMESTAMP
from NGPIris.preproc import preproc

class HCPInteracter:
def __init__(self, hcpmanager):
self.hcpm = hcpmanager

def delete_interactive(self,query,force=False):
objs = self.hcpm.search_objects(query) # Get object with query
if len(objs) < 1:
log.info(f"File: {query} does not exist on {self.hcpm.bucket.name}")
else:
hits = list()
for obj in objs:
hits.append(obj.key)
log.info(f"Found {len(objs)} entries matching query '{query}':")
log.info(f"{hits}")
for obj in objs:
if not force:
sys.stdout.write(f"You are about to delete the file {obj.key} " \
f"on {self.hcpm.bucket.name}, are you sure? [Y/N/Q]?\n")
sys.stdout.flush()
answer = sys.stdin.readline()
if answer[0].lower() == "y":
self.hcpm.delete_object(obj) # Delete file.
time.sleep(2)
log.info(f"Deleted file {obj.key} ")
elif answer[0].lower() == "q":
break
else:
log.info(f"Skipped deleting {obj.key} ")
elif force:
self.hcpm.delete_object(obj) # Delete file.
time.sleep(2)
log.info(f"Deleted file {obj.key} ")


def download_interactive(self, query, destination="", force=False, silent=False):
found_objs = self.hcpm.search_objects(query)

#Defaults destination to input name
if destination == "":
destination = os.path.basename(query)
#If destination is folder. Default file name to input name
elif destination[-1] in ["/","\\"]:
destination = os.path.join(destination, os.path.basename(query))


if found_objs is None or len(found_objs) == 0:
log.info(f"File: {query} does not exist on {self.hcpm.bucket.name}")

elif len(found_objs) == 1:
obj = self.hcpm.get_object(query) # Get object with key.
self.hcpm.download_file(obj, destination, force=force, callback=(not silent)) # Downloads file.

elif len(found_objs) > 1:
for obj in found_objs:
log.info(f"Found {len(found_objs)} files matching query")
log.info(f"Download {obj}? [Y/N]")
sys.stdout.write(f"[--] Do you wish to download {obj.key} on {self.hcpm.bucket.name}? [Y/N]?\n")
sys.stdout.flush()
answer = sys.stdin.readline()
if answer[0].lower() == "y":
obj = self.hcpm.get_object(query) # Get object with key.
#Default destination name to key
if destination == "":
destination = obj.key
#If destination is folder. Default file name to obj.key
elif destination[-1] in ["/","\\"]:
destination = os.path.join(destination, obj.key)

self.hcpm.download_file(obj, destination, force=force, callback=(not silent)) # Downloads file.
#log.info(f"Downloaded {obj.key}"

def upload_interactive(self, source, destination="", fastq_only=False, metadata="", silent=False):
file_lst = []
###Sets destinations
#Defaults destination to source name
if destination == "":
destination = os.path.basename(source)
#If destination is folder. Default file name to source name
elif destination[-1] in ["/","\\"]:
destination = os.path.join(destination, os.path.basename(source))

###Verify fastq contents and metadata existence
if os.path.isdir(source) and fastq_only:
[file_lst, mdfile]=preproc.verify_upload_folder(source, metadata=metadata)
elif os.path.isdir(source):
[file_lst, mdfile]=preproc.folder_to_list(source, metadata=metadata)
elif fastq_only:
[file_lst, mdfile]=preproc.verify_upload_file(source, metadata=metadata)
if fastq_only and file_lst == []:
log.error(f"{source} could not be uploaded to NGPr. Try using an atypical (-a) upload")

if os.path.isdir(source):
for file_pg in file_lst:
self.hcpm.upload_file(file_pg, destination, callback=(not silent))
#time.sleep(2)
log.info(f"Uploaded: {file_pg}")
else:
self.hcpm.upload_file(file_lst, destination, callback=(not silent))
#time.sleep(2)
log.info(f"Uploaded: {file_lst}")

#Upload metadata
#dstfld = Path(destination)
#dstfld = dstfld.parent
#if dstfld.parts == ():
# dstfld = ""
#meta_fn = Path(mdfile).name
#ctx['hcpm'].upload_file(mdfile, os.path.join(dstfld, meta_fn), callback=(not silent))
Loading

0 comments on commit e27040e

Please sign in to comment.