From 09803c129fb96796c48701c3861b582169ad1115 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Fri, 28 Apr 2023 10:03:25 -0400 Subject: [PATCH 01/25] added fetch wording to logs in README.md --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index c3b49a1f..b2aea8cd 100644 --- a/README.md +++ b/README.md @@ -79,8 +79,7 @@ functions as Cromshell 1 but has been rebuilt in python with many added benefits #### Logs * `logs [workflow-id] [[workflow-id]...]` * List the log files produced by a workflow. - * [COMING SOON] `fetch-logs [workflow-id] [[workflow-id]...]` - * Download all logs produced by a workflow. + * *`-f`* Download the log files produced by a workflow. #### Job Outputs * [COMING SOON] `list-outputs [workflow-id] [[workflow-id]...]` From a2f86e308f64a84dfc59b3c89f38a2c28b03a877 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Mon, 8 May 2023 16:36:13 -0400 Subject: [PATCH 02/25] first draft of logs refactoring --- src/cromshell/logs/command.py | 482 +++++++++++++++++----------- src/cromshell/utilities/io_utils.py | 107 +++++- 2 files changed, 397 insertions(+), 192 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index b2187803..994bb87b 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -1,27 +1,21 @@ +import json import logging import os import click -import gcsfs from termcolor import colored +import requests +import cromshell.utilities.http_utils as http_utils +import cromshell.utilities.io_utils as io_utils from cromshell.metadata import command as metadata_command -from cromshell.utilities import command_setup_utils, http_utils -from cromshell.utilities.io_utils import get_color_for_status_key +from cromshell.utilities import command_setup_utils LOGGER = logging.getLogger(__name__) @click.command(name="logs") -@click.argument("workflow_id") -@click.option( - "-s", - "--status", - default="Failed", - help="Return a list with links to the logs with the indicated status. " - "Separate multiple keys by comma or use 'ALL' to print all logs. " - "Some standard Cromwell status options are 'ALL', 'Done', 'RetryableFailure', 'Running', and 'Failed'.", -) +@click.argument("workflow_ids", required=True, nargs=-1) @click.option( "-p", "--print-logs", @@ -30,6 +24,13 @@ help="Print the contents of the logs to stdout if true. " "Note: This assumes GCS bucket logs with default permissions otherwise this may not work", ) +@click.option( + "-f", + "--fetch-logs", + is_flag=True, + default=False, + help="Download the logs to the current directory if true. " +) @click.option( "-des", "--dont-expand-subworkflows", @@ -37,18 +38,37 @@ default=False, help="Do not expand subworkflow info in metadata", ) +@click.option( + "-j", + "--json-summary", + is_flag=True, + default=False, + help="Print a json summary of logs, including non-file types.", +) +@click.option( + "-s", + "--status", + default="Failed", + help="Return a list with links to the task logs with the indicated status. " + "Separate multiple keys by comma or use 'ALL' to print all logs. " + "Some standard Cromwell status options are 'ALL', 'Done', 'RetryableFailure', 'Running', and 'Failed'.", +) @click.pass_obj def main( config, - workflow_id: str, + workflow_ids: list, + json_summary: bool, status: list, dont_expand_subworkflows: bool, print_logs: bool, + fetch_logs: bool, ): """Get a subset of the workflow metadata.""" LOGGER.info("logs") + return_code = 0 + # If no keys were provided then set key_param to empty else # strip trailing comma from keys and split keys by comma status_param = ( @@ -57,79 +77,82 @@ def main( else str(status).strip(",").split(",") ) - command_setup_utils.resolve_workflow_id_and_server( - workflow_id=workflow_id, cromshell_config=config - ) - LOGGER.info("Status keys set to %s", status_param) - # To grab the logs we only need a subset of the metadata from the server - obtain_and_print_logs( - config=config, - metadata_param=[ - "id", - "executionStatus", - "backendLogs", - "subWorkflowMetadata", - "subWorkflowId", - "failures", - ], - status_params=status_param, - dont_expand_subworkflows=dont_expand_subworkflows, - print_logs=print_logs, - ) - - return 0 + for workflow_id in workflow_ids: + command_setup_utils.resolve_workflow_id_and_server( + workflow_id=workflow_id, cromshell_config=config + ) -def check_workflow_for_calls(workflow_status_json: dict) -> None: - """Check if the workflow has any calls""" + # if not detailed: + # workflow_logs = get_workflow_level_logs(config).get("calls") + # + # if json_summary: + # io_utils.pretty_print_json(format_json=workflow_logs) + # else: + task_logs = get_task_level_outputs( + config, + requested_status=status_param, + expand_subworkflows=not dont_expand_subworkflows + ) - if not workflow_status_json.get("calls"): - if workflow_status_json.get("failures"): - LOGGER.error( - "Empty 'calls' key found in workflow metadata. " - "Workflow failed with the following error(s): %s" - % workflow_status_json["failures"], - ) - raise KeyError( - "Empty 'calls' key found in workflow metadata. " - "Workflow failed with the following error(s): %s" - % workflow_status_json["failures"], - ) + if fetch_logs: + download_task_level_logs(all_task_log_metadata=task_logs) else: - LOGGER.error( - "Empty 'calls' key found in workflow metadata. " - "This may indicate no tasks were run by the workflow, " - "the workflow has yet to run any tasks, or " - "a failure occurred before the workflow started." - ) - raise KeyError( - "Empty 'calls' key found in workflow metadata." - "This may indicate no tasks were run by the workflow, " - "the workflow has yet to run any tasks, or " - "a failure occurred before the workflow started." - ) + if json_summary: + io_utils.pretty_print_json(format_json=task_logs) + else: + print_task_level_logs( + all_task_log_metadata=task_logs, cat_logs=print_logs + ) + return return_code -def obtain_and_print_logs( - config, - metadata_param: list, - print_logs: bool, - status_params: list, - dont_expand_subworkflows: bool, -) -> None: - """Format metadata parameters and obtains metadata from cromwell server""" - # Combine keys and flags into a dictionary +def get_workflow_level_logs(config) -> dict: + """Get the workflow level logs from the workflow metadata + + Args: + config (dict): The cromshell config object + """ + + requests_out = requests.get( + f"{config.cromwell_api_workflow_id}/logs", + timeout=config.requests_connect_timeout, + verify=config.requests_verify_certs, + headers=http_utils.generate_headers(config), + ) + + if requests_out.ok: + # check_for_empty_logs(requests_out.json().get("outputs"), config.workflow_id) + return requests_out.json() + else: + http_utils.check_http_request_status_code( + short_error_message="Failed to retrieve logs for " + f"workflow: {config.workflow_id}", + response=requests_out, + # Raising exception is set false to allow + # command to retrieve outputs of remaining workflows. + raise_exception=False, + ) + + +def get_task_level_outputs(config, expand_subworkflows, requested_status) -> dict: + """Get the task level outputs from the workflow metadata + + Args: + config (dict): The cromshell config object + :param expand_subworkflows: Whether to expand subworkflows + """ + # Get metadata formatted_metadata_parameter = metadata_command.format_metadata_params( - list_of_keys=metadata_param, - exclude_keys=False, - expand_subworkflows=not dont_expand_subworkflows, # Invert variable + list_of_keys=config.METADATA_KEYS_TO_OMIT, + exclude_keys=True, + expand_subworkflows=expand_subworkflows, ) - # Request workflow metadata - workflow_status_json = metadata_command.get_workflow_metadata( + workflow_metadata = metadata_command.get_workflow_metadata( meta_params=formatted_metadata_parameter, api_workflow_id=config.cromwell_api_workflow_id, timeout=config.requests_connect_timeout, @@ -137,142 +160,186 @@ def obtain_and_print_logs( headers=http_utils.generate_headers(config), ) - check_workflow_for_calls(workflow_status_json) + return filter_task_logs_from_workflow_metadata(workflow_metadata=workflow_metadata, requested_status=requested_status) - # Parse the metadata for logs and print them to the output - found_logs = print_workflow_logs( - workflow_metadata=workflow_status_json, - indent="", - expand_sub_workflows=not dont_expand_subworkflows, - status_keys=status_params, - cat_logs=print_logs, + +def filter_task_logs_from_workflow_metadata( + workflow_metadata: dict, requested_status: list +) -> dict: + """Get the logs from the workflow metadata + + Args: + workflow_metadata (dict): The workflow metadata + requested_status (list): The list of requested status + """ + calls_metadata = workflow_metadata["calls"] + all_task_logs = {} + + for call, index_list in calls_metadata.items(): + if "subWorkflowMetadata" in calls_metadata[call][0]: + all_task_logs[call] = [] + for scatter in calls_metadata[call]: + all_task_logs[call].append( + filter_task_logs_from_workflow_metadata( + scatter["subWorkflowMetadata"], requested_status=requested_status + ) + ) + else: + all_task_logs[call] = [] + for index in index_list: + if "ALL" in requested_status or index.get("executionStatus") in requested_status: + all_task_logs[call].append( + { + "attempt": index.get("attempt"), + "backendLogs": get_backend_logs(task_instance=index), + "backend": index.get("backend"), + "executionStatus": index.get("executionStatus"), + "shardIndex": index.get("shardIndex"), + "stderr": index.get("stderr"), + "stdout": index.get("stdout") + }, + ) + + check_for_empty_logs( + workflow_logs=all_task_logs, + workflow_id=workflow_metadata["id"], + requested_status=requested_status ) - if not found_logs: - print( - f"No logs with status {status_params} found for workflow, try adding " - f"the argument '-s ALL' to list logs with any status" - ) + return all_task_logs -def print_workflow_logs( - workflow_metadata: dict, - indent: str, - expand_sub_workflows: bool, - status_keys: list, - cat_logs: bool, -) -> bool: - """ - Recursively runs through each task of a workflow metadata and calls function to - call out to the helper in order to print found logs - :param workflow_metadata: Metadata of the workflow to process - :param indent: Indent string given as "\t", used to indent print out - :param expand_sub_workflows: Boolean, whether to print subworkflows - :return: true if any logs matching the parameters were found +def print_task_level_logs(all_task_log_metadata: dict, cat_logs: bool) -> None: + """Print the logs from the workflow metadata + task_logs_metadata: {call_name:[index1{task_log_name: taskvalue}, index2{...}, ...], call_name:[], ...} + + Args: + all_task_log_metadata (dict): All task logs metadata from the workflow """ - did_print = False - - tasks = list(workflow_metadata["calls"].keys()) - - for task in tasks: # For each task in given metadata - # If task has a key called 'subworkflowMetadata' in its first (zero) element - # (shard) and expand_sub_workflow parameter is set to true then rerun this - # function on that subworkflow - if ( - "subWorkflowMetadata" in workflow_metadata["calls"][task][0] - and expand_sub_workflows - ): - sub_workflow_name = task - task_shards = workflow_metadata["calls"][sub_workflow_name] - print(f"{indent}SubWorkflow {sub_workflow_name}") - - # For each element in total number of subworkflow calls get the subworkflow - # metadata. This loop will go through each shard if task is scattered - for i in range(len(task_shards) - 1): - sub_workflow_metadata = task_shards[i]["subWorkflowMetadata"] - - print_workflow_logs( - workflow_metadata=sub_workflow_metadata, - indent=indent + "\t", - expand_sub_workflows=expand_sub_workflows, - status_keys=status_keys, - cat_logs=cat_logs, - ) - # If no subworkflow is found then print status summary for the task - else: - did_print = ( - print_task_logs( - task=task, - indent=indent, - workflow_metadata=workflow_metadata, - status_keys=status_keys, - cat_logs=cat_logs, + for call, index_list in all_task_log_metadata.items(): + + print(f"{call}:") + for call_index in index_list: + if call_index is not None: + print_file_like_value_in_dict( + task_log_metadata=call_index, indent=1, cat_logs=cat_logs ) - or did_print - ) - return did_print +def print_file_like_value_in_dict(task_log_metadata: dict, indent: int, cat_logs: bool) -> None: + """Print the file like values in the output metadata dictionary -def print_task_logs( - task: str, - indent: str, - workflow_metadata: dict, - status_keys: list, - cat_logs: bool, -) -> bool: + Args: + task_log_metadata (dict): The output metadata + indent (int): The number of tabs to indent the output """ - Prints the backend logs from the workflow - :param task: Name of the task - :param indent: Indent string given as a string of "\t" characters, - used to indent print out - :param workflow_metadata: Metadata of the workflow to process - :param status_keys: Determines what logs to show based on call status - :param cat_logs: Will use GCS to attempt to print the logs - :return: true if any logs were printed - """ - - did_print = False - shard_list = workflow_metadata["calls"][task] + i = "\t" * indent - sharded = workflow_metadata["calls"][task][0]["shardIndex"] != -1 + task_status_font = io_utils.get_color_for_status_key( + task_log_metadata.get("executionStatus")) if task_log_metadata.get('executionStatus') else None - for i in range(len(shard_list)): - status = shard_list[i]["executionStatus"] - if "ALL" in status_keys or status in status_keys: - task_status_font = get_color_for_status_key(status) + print( + colored( + f"{i}status: {task_log_metadata.get('executionStatus')}", + color=task_status_font + ) + ) - shardstring = ( - "" if not sharded else "-shard-" + str(shard_list[i]["shardIndex"]) + for log_name, log_value in task_log_metadata.items(): + if isinstance(log_value, str): + print_output_name_and_file( + output_name=log_name, + output_value=log_value, + indent=indent, + txt_color=task_status_font, + cat_logs=cat_logs, ) + elif isinstance(log_value, dict): + print_file_like_value_in_dict(log_value, indent=indent) + elif isinstance(log_value, list): # Lists are subworkflows, an item is a task + print(f"{i}{log_name}:\t") # Print the subworkflow task name + for output_value_item in log_value: + print_file_like_value_in_dict( + task_log_metadata=output_value_item, + indent=indent+1, + cat_logs=cat_logs, + ) - logs = get_backend_logs(shard_list[i]) +def print_output_name_and_file( + output_name: str, + output_value: str, + indent: int = 0, + txt_color: str = None, + cat_logs: bool = False +) -> None: + """Print the task name and the file name + + Args: + output_name (str): The task output name + output_value (str): The task output value + indent (bool): Whether to indent the output + cat_logs (bool): Whether to cat the log file + txt_color (str): The color to use for printing the output. Default is None. """ + + i = "\t" * indent + + if isinstance(output_value, str): + if io_utils.is_path_or_url_like(output_value): if cat_logs: - print( - colored( - f"\n\n\n{'=' * os.get_terminal_size().columns}\n{indent}{task}{shardstring}:\t{status}\t {logs}\n{'=' * os.get_terminal_size().columns}", - color=task_status_font, - ) + print_log_file_content( + output_name=output_name, + output_value=output_value, + txt_color=txt_color, ) - fs = gcsfs.GCSFileSystem() - if fs.exists(logs): - with fs.open(logs, "r") as f: - print(f.read()) - else: - print(f"Unable to locate logs at {logs}.") - else: - print( - colored( - f"{indent}{task}{shardstring}:\t{status}\t {logs}", - color=task_status_font, - ) - ) - did_print = True - return did_print + print(colored(f"{i}{output_name}: {output_value}", color=txt_color)) + + +def print_log_file_content(output_name: str, output_value: str, txt_color: str = "blue") -> None: + """Prints output logs and cat the file if possible. + + Args: + output_name (str): The name of the output log. + output_value (str): The value of the output log. + txt_color (str): The color to use for printing the output. Default is "blue". + """ + term_size = os.get_terminal_size().columns + print(colored( + f"{'=' * term_size}\n{output_name}: {output_value}\n{'=' * term_size}", + color=txt_color, + ) + ) + + file_contents = io_utils.cat_file(output_value) + if file_contents: + print(file_contents) + else: + print(f"Unable to locate logs at {output_value}.") + print("\n\n\n") # Add some space between logs + + +def check_for_empty_logs(workflow_logs: dict, workflow_id: str, requested_status) -> None: + """Check if the workflow logs are empty + + Args: + :param requested_status: The status requested to be filtered + :param workflow_logs: The workflow logs + :param workflow_id: The workflow id + """ + if not workflow_logs: + LOGGER.error(f"No calls found for workflow: {workflow_id}") + raise Exception(f"No calls found for workflow: {workflow_id}") + + if "log" not in json.dumps(workflow_logs): + LOGGER.error( + f"No log found for workflow: {workflow_id} with status: {requested_status}" + ) + raise Exception( + f"No logs found for workflow: {workflow_id} with status: {requested_status}" + ) def get_backend_logs(task_instance: dict) -> str: @@ -294,5 +361,40 @@ def get_backend_logs(task_instance: dict) -> str: return backend_logs.get("log") -if __name__ == "__main__": - main() +def download_file_like_value_in_dict(task_log_metadata): + """Download the file like values in the output metadata dictionary""" + + files_to_download = [] + + for log_name, log_value in task_log_metadata.items(): + if isinstance(log_value, str): + if io_utils.is_path_or_url_like(log_value): + files_to_download.append(log_value) + elif isinstance(log_value, dict): + download_file_like_value_in_dict(log_value) + elif isinstance(log_value, list): # Lists are subworkflows, an item is a task + for output_value_item in log_value: + download_file_like_value_in_dict( + task_log_metadata=output_value_item + ) + + flattened_list = [x for sublist in files_to_download for x in sublist] + io_utils.download_gcs_files(files_to_download, local_dir=os.getcwd()) + # print(files_to_download) + + +def download_task_level_logs(all_task_log_metadata): + """Download the logs from the workflow metadata + task_logs_metadata: {call_name:[index1{task_log_name: taskvalue}, index2{...}, ...], call_name:[], ...} + + Args: + all_task_log_metadata (dict): All task logs metadata from the workflow + """ + + for call, index_list in all_task_log_metadata.items(): + + for call_index in index_list: + if call_index is not None: + download_file_like_value_in_dict( + task_log_metadata=call_index + ) diff --git a/src/cromshell/utilities/io_utils.py b/src/cromshell/utilities/io_utils.py index 022380aa..44eba806 100644 --- a/src/cromshell/utilities/io_utils.py +++ b/src/cromshell/utilities/io_utils.py @@ -8,6 +8,10 @@ from typing import BinaryIO, List, Union from zipfile import ZIP_DEFLATED, ZipFile +from azure.storage.blob import BlobServiceClient +from google.cloud import storage +import boto3 + from pygments import formatters, highlight, lexers from termcolor import colored @@ -230,6 +234,103 @@ def copy_files_to_directory( shutil.copy(inputs, directory) +def cat_file(file_path: str or Path) -> str: + """Prints the contents of a file to stdout. The path can either be a local file path, + GCP file path, Azure file path, or AWS file path.""" + + # Check if the file path is a local path + if Path(file_path).is_file(): + with open(file_path, 'r') as file: + file_contents = file.read() + # Check if the file path is a GCP path + elif file_path.startswith('gs://'): + file_contents = get_gcp_file_content(file_path) + # Check if the file path is an Azure path + elif file_path.startswith('https://'): + file_contents = get_azure_file_content(file_path) + # Check if the file path is an AWS path + elif file_path.startswith('s3://'): + file_contents = get_aws_file_content(file_path) + else: + raise ValueError('Invalid file path') + return file_contents + + +def get_gcp_file_content(file_path: str) -> str or None: + """Returns the contents of a file located on GCP""" + + bucket_name, blob_path = file_path.split('//')[-1].split('/', 1) + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_path) + + return blob.download_as_string().decode('utf-8') if blob.exists() else None + + + +def get_azure_file_content(file_path: str) -> str: + """Returns the contents of a file located on Azure""" + + container_url, blob_path = file_path.rsplit('/', 1) + account_url = '/'.join(container_url.split('/')[:3]) + container_name = container_url.split('/')[-1] + blob_service_client = BlobServiceClient(account_url=account_url) + blob_client = blob_service_client.get_blob_client(container=container_name, + blob=blob_path) + return blob_client.download_blob().content_as_text() + + +def get_aws_file_content(file_path: str) -> str: + """Returns the contents of a file located on AWS""" + + bucket_name, blob_path = file_path.split('//')[-1].split('/', 1) + s3 = boto3.resource('s3') + obj = s3.Object(bucket_name, blob_path) + + return obj.get()['Body'].read().decode('utf-8') + + +def is_path_or_url_like(in_string: str) -> bool: + """Check if the string is a path or url + + Args: + in_string (str): The string to check for path or url like-ness + """ + if ( + in_string.startswith("gs://") + or in_string.startswith("/") + or in_string.startswith("http://") + or in_string.startswith("https://") + or in_string.startswith("s3://") + ): + return True + else: + return False + + +def download_gcs_files(file_paths, local_dir): + """Downloads GCS files to local_dir while preserving directory structure""" + storage_client = storage.Client() + + for file_path in file_paths: + # Extract bucket and blob path from file path + print(file_path) + bucket_name, blob_path = file_path.split('//')[-1].split('/', 1) + + # Create local subdirectory if it doesn't exist + local_subdir = Path(local_dir) / Path(blob_path).parent + Path.mkdir(local_subdir, exist_ok=True, parents=True) + + # Download file to local subdirectory + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_path) + if blob.exists(): + local_path = Path(local_subdir).join(Path(blob_path).name) + blob.download_to_filename(local_path) + + print(f"Downloaded {file_path} to {local_path}") + + class TextStatusesColor: """Holds stdout formatting per workflow status""" @@ -247,14 +348,16 @@ class TextStatusesColor: TASK_COLOR_FAILED = "red" -def get_color_for_status_key(status: str) -> str: +def get_color_for_status_key(status: str) -> str or None: """ Helper method for getting the correct font color for a given execution status for a job (or none for unrecognized statuses) """ - task_status_font = None + from cromshell.utilities.cromshellconfig import color_output + if not color_output: + return None if "Done" in status: task_status_font = TextStatusesColor.TASK_COLOR_SUCCEEDED elif "Running" in status: From 1ec2a856f31445f3082f18cd009bb29fa0c6cf68 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Tue, 9 May 2023 10:29:05 -0400 Subject: [PATCH 03/25] Added 'azure_storage_account' to config options --- src/cromshell/utilities/config_options_file_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cromshell/utilities/config_options_file_utils.py b/src/cromshell/utilities/config_options_file_utils.py index 85a3d4e0..23aa888d 100644 --- a/src/cromshell/utilities/config_options_file_utils.py +++ b/src/cromshell/utilities/config_options_file_utils.py @@ -12,6 +12,7 @@ "gcloud_token_email": "str", "referer_header_url": "str", "bq_cost_table": "str", + "azure_storage_account": "str", } From aad6a58581522da7a532f86a3edf64d706cfd291 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Tue, 9 May 2023 10:38:24 -0400 Subject: [PATCH 04/25] Add option to download from azure --- src/cromshell/logs/command.py | 98 +++++++++++++++++------ src/cromshell/utilities/io_utils.py | 120 +++++++++++++++++++++------- 2 files changed, 163 insertions(+), 55 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index 994bb87b..e3fb716a 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -145,10 +145,25 @@ def get_task_level_outputs(config, expand_subworkflows, requested_status) -> dic config (dict): The cromshell config object :param expand_subworkflows: Whether to expand subworkflows """ + + metadata_keys = [ + "id", + "executionStatus", + "subWorkflowMetadata", + "subWorkflowId", + "failures", + "attempt", + "backendLogs", + "backend", + "shardIndex", + "stderr", + "stdout", + ] + # Get metadata formatted_metadata_parameter = metadata_command.format_metadata_params( - list_of_keys=config.METADATA_KEYS_TO_OMIT, - exclude_keys=True, + list_of_keys=metadata_keys, + exclude_keys=False, expand_subworkflows=expand_subworkflows, ) @@ -160,7 +175,9 @@ def get_task_level_outputs(config, expand_subworkflows, requested_status) -> dic headers=http_utils.generate_headers(config), ) - return filter_task_logs_from_workflow_metadata(workflow_metadata=workflow_metadata, requested_status=requested_status) + return filter_task_logs_from_workflow_metadata( + workflow_metadata=workflow_metadata, requested_status=requested_status + ) def filter_task_logs_from_workflow_metadata( @@ -227,18 +244,22 @@ def print_task_level_logs(all_task_log_metadata: dict, cat_logs: bool) -> None: ) -def print_file_like_value_in_dict(task_log_metadata: dict, indent: int, cat_logs: bool) -> None: +def print_file_like_value_in_dict( + task_log_metadata: dict, indent: int, cat_logs: bool +) -> None: """Print the file like values in the output metadata dictionary Args: task_log_metadata (dict): The output metadata indent (int): The number of tabs to indent the output + cat_logs (bool): Whether to print the logs """ i = "\t" * indent - task_status_font = io_utils.get_color_for_status_key( - task_log_metadata.get("executionStatus")) if task_log_metadata.get('executionStatus') else None + task_status_font = ( + io_utils.get_color_for_status_key(task_log_metadata.get("executionStatus")) if task_log_metadata.get('executionStatus') else None + ) print( colored( @@ -255,9 +276,10 @@ def print_file_like_value_in_dict(task_log_metadata: dict, indent: int, cat_logs indent=indent, txt_color=task_status_font, cat_logs=cat_logs, + backend=task_log_metadata.get('backend') ) elif isinstance(log_value, dict): - print_file_like_value_in_dict(log_value, indent=indent) + print_file_like_value_in_dict(log_value, indent=indent, cat_logs=cat_logs) elif isinstance(log_value, list): # Lists are subworkflows, an item is a task print(f"{i}{log_name}:\t") # Print the subworkflow task name for output_value_item in log_value: @@ -273,7 +295,8 @@ def print_output_name_and_file( output_value: str, indent: int = 0, txt_color: str = None, - cat_logs: bool = False + cat_logs: bool = False, + backend: str = None, ) -> None: """Print the task name and the file name @@ -282,7 +305,9 @@ def print_output_name_and_file( output_value (str): The task output value indent (bool): Whether to indent the output cat_logs (bool): Whether to cat the log file - txt_color (str): The color to use for printing the output. Default is None. """ + txt_color (str): The color to use for printing the output. Default is None. + backend: The backend to use for printing the output. Default is None. + """ i = "\t" * indent @@ -293,18 +318,22 @@ def print_output_name_and_file( output_name=output_name, output_value=output_value, txt_color=txt_color, + backend=backend, ) else: print(colored(f"{i}{output_name}: {output_value}", color=txt_color)) -def print_log_file_content(output_name: str, output_value: str, txt_color: str = "blue") -> None: +def print_log_file_content( + output_name: str, output_value: str, txt_color: str = "blue", backend: str = None +) -> None: """Prints output logs and cat the file if possible. Args: output_name (str): The name of the output log. output_value (str): The value of the output log. txt_color (str): The color to use for printing the output. Default is "blue". + backend (str): The backend to used to run workflow. Default is None. """ term_size = os.get_terminal_size().columns print(colored( @@ -313,15 +342,18 @@ def print_log_file_content(output_name: str, output_value: str, txt_color: str = ) ) - file_contents = io_utils.cat_file(output_value) - if file_contents: - print(file_contents) - else: + file_contents = io_utils.cat_file(output_value, backend=backend) + if file_contents is None: print(f"Unable to locate logs at {output_value}.") + else: + print(file_contents) + print("\n\n\n") # Add some space between logs -def check_for_empty_logs(workflow_logs: dict, workflow_id: str, requested_status) -> None: +def check_for_empty_logs( + workflow_logs: dict, workflow_id: str, requested_status +) -> None: """Check if the workflow logs are empty Args: @@ -334,12 +366,19 @@ def check_for_empty_logs(workflow_logs: dict, workflow_id: str, requested_status raise Exception(f"No calls found for workflow: {workflow_id}") if "log" not in json.dumps(workflow_logs): - LOGGER.error( - f"No log found for workflow: {workflow_id} with status: {requested_status}" - ) - raise Exception( - f"No logs found for workflow: {workflow_id} with status: {requested_status}" - ) + if "TES" in json.dumps(workflow_logs): + # Cromwell does not return backendlogs for TES backend at the moment. + pass + else: + print(json.dumps(workflow_logs)) + LOGGER.error( + f"No log found for workflow: {workflow_id} with status: " + f"{requested_status}" + ) + raise Exception( + f"No logs found for workflow: {workflow_id} with status: " + f"{requested_status}" + ) def get_backend_logs(task_instance: dict) -> str: @@ -378,9 +417,20 @@ def download_file_like_value_in_dict(task_log_metadata): task_log_metadata=output_value_item ) - flattened_list = [x for sublist in files_to_download for x in sublist] - io_utils.download_gcs_files(files_to_download, local_dir=os.getcwd()) - # print(files_to_download) + path_to_downloaded_files = os.getcwd() + if task_log_metadata.get("backend") == "PAPIv2": + io_utils.download_gcs_files( + files_to_download, local_dir=os.getcwd() + ) + print(f"Downloaded files to: {path_to_downloaded_files}") + elif task_log_metadata.get("backend") == "TES": + io_utils.download_azure_files( + files_to_download, local_dir=os.getcwd() + ) + print(f"Downloaded files to: {path_to_downloaded_files}") + else: + print(f"Unsupported backend : {task_log_metadata.get('backend')}") + def download_task_level_logs(all_task_log_metadata): diff --git a/src/cromshell/utilities/io_utils.py b/src/cromshell/utilities/io_utils.py index 44eba806..d9bb6b93 100644 --- a/src/cromshell/utilities/io_utils.py +++ b/src/cromshell/utilities/io_utils.py @@ -8,7 +8,9 @@ from typing import BinaryIO, List, Union from zipfile import ZIP_DEFLATED, ZipFile +import azure.core.exceptions from azure.storage.blob import BlobServiceClient +from azure.identity import DefaultAzureCredential from google.cloud import storage import boto3 @@ -234,23 +236,20 @@ def copy_files_to_directory( shutil.copy(inputs, directory) -def cat_file(file_path: str or Path) -> str: - """Prints the contents of a file to stdout. The path can either be a local file path, - GCP file path, Azure file path, or AWS file path.""" +def cat_file(file_path: str or Path, backend: str = None) -> str: + """Prints the contents of a file to stdout. The path can either be a + local file path, GCP file path, Azure file path, or AWS file path.""" # Check if the file path is a local path - if Path(file_path).is_file(): + if backend == "Local": with open(file_path, 'r') as file: file_contents = file.read() # Check if the file path is a GCP path - elif file_path.startswith('gs://'): + elif backend == "PAPIv2": file_contents = get_gcp_file_content(file_path) # Check if the file path is an Azure path - elif file_path.startswith('https://'): + elif backend == "TES": file_contents = get_azure_file_content(file_path) - # Check if the file path is an AWS path - elif file_path.startswith('s3://'): - file_contents = get_aws_file_content(file_path) else: raise ValueError('Invalid file path') return file_contents @@ -267,27 +266,45 @@ def get_gcp_file_content(file_path: str) -> str or None: return blob.download_as_string().decode('utf-8') if blob.exists() else None - -def get_azure_file_content(file_path: str) -> str: +def get_azure_file_content(file_path: str) -> str or None: """Returns the contents of a file located on Azure""" - container_url, blob_path = file_path.rsplit('/', 1) - account_url = '/'.join(container_url.split('/')[:3]) - container_name = container_url.split('/')[-1] - blob_service_client = BlobServiceClient(account_url=account_url) - blob_client = blob_service_client.get_blob_client(container=container_name, - blob=blob_path) - return blob_client.download_blob().content_as_text() - - -def get_aws_file_content(file_path: str) -> str: - """Returns the contents of a file located on AWS""" - - bucket_name, blob_path = file_path.split('//')[-1].split('/', 1) - s3 = boto3.resource('s3') - obj = s3.Object(bucket_name, blob_path) + blob_service_client = BlobServiceClient( + account_url=f"https://{get_az_storage_account()}.blob.core.windows.net", + credential=DefaultAzureCredential() + ) + container_name, blob_path = file_path.split('/', 2)[1:] + blob_client = blob_service_client.get_blob_client( + container=container_name, blob=blob_path + ) + blob_client.download_blob() - return obj.get()['Body'].read().decode('utf-8') + try: + if blob_client.exists(): + return blob_client.download_blob().readall().decode('utf-8') + else: + return None + except azure.core.exceptions.HttpResponseError as e: + if "AuthorizationPermissionMismatch" in str(e): + LOGGER.error("Caught an AuthorizationPermissionMismatch error, check that" + "the Azure Storage Container has your account listed to have" + "Storage Blob Data Contributor") + else: + LOGGER.error( + "Caught an error while trying to download the file from Azure: %s", e + ) + + +def get_az_storage_account() -> str: + """Returns the account name of the Azure storage account""" + import cromshell.utilities.cromshellconfig as config + try: + return config.cromshell_config_options["azure_storage_account"] + except KeyError: + LOGGER.error( + "An 'azure_storage_account' is required for this action but" + "was not found in Cromshell configuration file. " + ) def is_path_or_url_like(in_string: str) -> bool: @@ -308,27 +325,68 @@ def is_path_or_url_like(in_string: str) -> bool: return False -def download_gcs_files(file_paths, local_dir): +def download_gcs_files(file_paths, local_dir) -> None: """Downloads GCS files to local_dir while preserving directory structure""" storage_client = storage.Client() for file_path in file_paths: # Extract bucket and blob path from file path - print(file_path) + LOGGER.debug("Downloading file %s", file_path) bucket_name, blob_path = file_path.split('//')[-1].split('/', 1) # Create local subdirectory if it doesn't exist + LOGGER.debug("Creating local subdirectory %s", blob_path) local_subdir = Path(local_dir) / Path(blob_path).parent Path.mkdir(local_subdir, exist_ok=True, parents=True) # Download file to local subdirectory + LOGGER.debug("Downloading file %s to %s", file_path, local_subdir) bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_path) if blob.exists(): - local_path = Path(local_subdir).join(Path(blob_path).name) + local_path = Path(local_subdir) / Path(blob_path).name blob.download_to_filename(local_path) - print(f"Downloaded {file_path} to {local_path}") + LOGGER.debug("Downloaded file %s to %s", file_path, local_subdir) + else: + LOGGER.warning("File %s does not exist", file_path) + + +def download_azure_files(file_paths, local_dir) -> None: + """Downloads Azure files to local_dir while preserving directory structure""" + # connection_string = "" + default_credential = DefaultAzureCredential() + account_url = f"https://{get_az_storage_account()}.blob.core.windows.net" + + for file_path in file_paths: + # Extract container and blob path from file path + LOGGER.debug("Downloading file %s", file_path) + blob_service_client = BlobServiceClient( + account_url=account_url, + credential=default_credential + ) + container_name, blob_path = file_path.split('/', 2)[1:] + blob_client = blob_service_client.get_blob_client(container=container_name, + blob=blob_path) + # blob_client.download_blob() + + # Create local subdirectory if it doesn't exist + LOGGER.debug("Creating local subdirectory %s", blob_path) + local_subdir = Path(local_dir) / Path(blob_path).parent + Path.mkdir(local_subdir, exist_ok=True, parents=True) + + # Download file to local subdirectory + LOGGER.debug("Downloading file %s to %s", file_path, local_subdir) + # container_client = blob_service_client.get_container_client(container_name) + # blob_client = container_client.get_blob_client(blob_path) + if blob_client.exists(): + local_path = Path(local_subdir) / Path(blob_path).name + with open(local_path, "wb") as file: + file.write(blob_client.download_blob().readall()) + + LOGGER.debug("Downloaded file %s to %s", file_path, local_subdir) + else: + LOGGER.warning("File %s does not exist", file_path) class TextStatusesColor: From 88a54309c5f04a90170e71bdf3f9a8de6d613038 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Tue, 9 May 2023 10:43:12 -0400 Subject: [PATCH 05/25] update log readme options --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f5772c00..e22f5975 100644 --- a/README.md +++ b/README.md @@ -73,8 +73,12 @@ Cromshell is a CLI for submitting workflows to a Cromwell server and monitoring/ #### Logs * `logs [workflow-id] [[workflow-id]...]` - * List the log files produced by a workflow. - * *`-f`* Download the log files produced by a workflow. + * List the log files produced by a workflow, Defaults to print `Failed` status only. + * `-f` Download the log files produced by a workflow. + * `-p` Print the log files produced by a workflow. + * `-des` Don't expand the subworkflows. + * `-j` Print the log files produced by a workflow in JSON format. + * `-s [STATUS]` Only print logs for jobs with the given `[STATUS]`. #### Job Outputs * `list-outputs [workflow-id] [[workflow-id]...]` From b0df586e9f6827c06f163e64b9019d2b428ef978 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Tue, 9 May 2023 13:59:42 -0400 Subject: [PATCH 06/25] minor edits, removed function that called logs api --- src/cromshell/logs/command.py | 44 ++++------------------------- src/cromshell/utilities/io_utils.py | 4 +-- 2 files changed, 6 insertions(+), 42 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index e3fb716a..82890fb3 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -4,7 +4,6 @@ import click from termcolor import colored -import requests import cromshell.utilities.http_utils as http_utils import cromshell.utilities.io_utils as io_utils @@ -85,12 +84,6 @@ def main( workflow_id=workflow_id, cromshell_config=config ) - # if not detailed: - # workflow_logs = get_workflow_level_logs(config).get("calls") - # - # if json_summary: - # io_utils.pretty_print_json(format_json=workflow_logs) - # else: task_logs = get_task_level_outputs( config, requested_status=status_param, @@ -110,40 +103,13 @@ def main( return return_code -def get_workflow_level_logs(config) -> dict: - """Get the workflow level logs from the workflow metadata - - Args: - config (dict): The cromshell config object - """ - - requests_out = requests.get( - f"{config.cromwell_api_workflow_id}/logs", - timeout=config.requests_connect_timeout, - verify=config.requests_verify_certs, - headers=http_utils.generate_headers(config), - ) - - if requests_out.ok: - # check_for_empty_logs(requests_out.json().get("outputs"), config.workflow_id) - return requests_out.json() - else: - http_utils.check_http_request_status_code( - short_error_message="Failed to retrieve logs for " - f"workflow: {config.workflow_id}", - response=requests_out, - # Raising exception is set false to allow - # command to retrieve outputs of remaining workflows. - raise_exception=False, - ) - - def get_task_level_outputs(config, expand_subworkflows, requested_status) -> dict: """Get the task level outputs from the workflow metadata Args: - config (dict): The cromshell config object - :param expand_subworkflows: Whether to expand subworkflows + config (object): The cromshell config object + requested_status (list): The list of requested status + expand_subworkflows (bool) : Whether to expand subworkflows """ metadata_keys = [ @@ -198,7 +164,8 @@ def filter_task_logs_from_workflow_metadata( for scatter in calls_metadata[call]: all_task_logs[call].append( filter_task_logs_from_workflow_metadata( - scatter["subWorkflowMetadata"], requested_status=requested_status + scatter["subWorkflowMetadata"], + requested_status=requested_status ) ) else: @@ -432,7 +399,6 @@ def download_file_like_value_in_dict(task_log_metadata): print(f"Unsupported backend : {task_log_metadata.get('backend')}") - def download_task_level_logs(all_task_log_metadata): """Download the logs from the workflow metadata task_logs_metadata: {call_name:[index1{task_log_name: taskvalue}, index2{...}, ...], call_name:[], ...} diff --git a/src/cromshell/utilities/io_utils.py b/src/cromshell/utilities/io_utils.py index d9bb6b93..b54e1fa7 100644 --- a/src/cromshell/utilities/io_utils.py +++ b/src/cromshell/utilities/io_utils.py @@ -9,11 +9,9 @@ from zipfile import ZIP_DEFLATED, ZipFile import azure.core.exceptions -from azure.storage.blob import BlobServiceClient from azure.identity import DefaultAzureCredential +from azure.storage.blob import BlobServiceClient from google.cloud import storage -import boto3 - from pygments import formatters, highlight, lexers from termcolor import colored From 88077ed89e78d13335f83f985f429b5063531b60 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Tue, 9 May 2023 14:26:57 -0400 Subject: [PATCH 07/25] lint fixes --- src/cromshell/logs/command.py | 62 ++++++++++++++--------------- src/cromshell/utilities/io_utils.py | 58 +++++++++++++++++---------- 2 files changed, 66 insertions(+), 54 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index 82890fb3..bc7647c3 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -28,7 +28,7 @@ "--fetch-logs", is_flag=True, default=False, - help="Download the logs to the current directory if true. " + help="Download the logs to the current directory if true. ", ) @click.option( "-des", @@ -79,7 +79,6 @@ def main( LOGGER.info("Status keys set to %s", status_param) for workflow_id in workflow_ids: - command_setup_utils.resolve_workflow_id_and_server( workflow_id=workflow_id, cromshell_config=config ) @@ -87,7 +86,7 @@ def main( task_logs = get_task_level_outputs( config, requested_status=status_param, - expand_subworkflows=not dont_expand_subworkflows + expand_subworkflows=not dont_expand_subworkflows, ) if fetch_logs: @@ -147,7 +146,7 @@ def get_task_level_outputs(config, expand_subworkflows, requested_status) -> dic def filter_task_logs_from_workflow_metadata( - workflow_metadata: dict, requested_status: list + workflow_metadata: dict, requested_status: list ) -> dict: """Get the logs from the workflow metadata @@ -165,13 +164,16 @@ def filter_task_logs_from_workflow_metadata( all_task_logs[call].append( filter_task_logs_from_workflow_metadata( scatter["subWorkflowMetadata"], - requested_status=requested_status + requested_status=requested_status, ) ) else: all_task_logs[call] = [] for index in index_list: - if "ALL" in requested_status or index.get("executionStatus") in requested_status: + if ( + "ALL" in requested_status + or index.get("executionStatus") in requested_status + ): all_task_logs[call].append( { "attempt": index.get("attempt"), @@ -180,14 +182,14 @@ def filter_task_logs_from_workflow_metadata( "executionStatus": index.get("executionStatus"), "shardIndex": index.get("shardIndex"), "stderr": index.get("stderr"), - "stdout": index.get("stdout") - }, + "stdout": index.get("stdout"), + }, ) check_for_empty_logs( workflow_logs=all_task_logs, workflow_id=workflow_metadata["id"], - requested_status=requested_status + requested_status=requested_status, ) return all_task_logs @@ -199,10 +201,10 @@ def print_task_level_logs(all_task_log_metadata: dict, cat_logs: bool) -> None: Args: all_task_log_metadata (dict): All task logs metadata from the workflow + cat_logs (bool): Whether to print the logs """ for call, index_list in all_task_log_metadata.items(): - print(f"{call}:") for call_index in index_list: if call_index is not None: @@ -225,13 +227,15 @@ def print_file_like_value_in_dict( i = "\t" * indent task_status_font = ( - io_utils.get_color_for_status_key(task_log_metadata.get("executionStatus")) if task_log_metadata.get('executionStatus') else None + io_utils.get_color_for_status_key(task_log_metadata.get("executionStatus")) + if task_log_metadata.get("executionStatus") + else None ) print( colored( f"{i}status: {task_log_metadata.get('executionStatus')}", - color=task_status_font + color=task_status_font, ) ) @@ -243,7 +247,7 @@ def print_file_like_value_in_dict( indent=indent, txt_color=task_status_font, cat_logs=cat_logs, - backend=task_log_metadata.get('backend') + backend=task_log_metadata.get("backend"), ) elif isinstance(log_value, dict): print_file_like_value_in_dict(log_value, indent=indent, cat_logs=cat_logs) @@ -252,18 +256,18 @@ def print_file_like_value_in_dict( for output_value_item in log_value: print_file_like_value_in_dict( task_log_metadata=output_value_item, - indent=indent+1, + indent=indent + 1, cat_logs=cat_logs, ) def print_output_name_and_file( output_name: str, - output_value: str, - indent: int = 0, - txt_color: str = None, - cat_logs: bool = False, - backend: str = None, + output_value: str, + indent: int = 0, + txt_color: str = None, + cat_logs: bool = False, + backend: str = None, ) -> None: """Print the task name and the file name @@ -303,7 +307,8 @@ def print_log_file_content( backend (str): The backend to used to run workflow. Default is None. """ term_size = os.get_terminal_size().columns - print(colored( + print( + colored( f"{'=' * term_size}\n{output_name}: {output_value}\n{'=' * term_size}", color=txt_color, ) @@ -319,7 +324,7 @@ def print_log_file_content( def check_for_empty_logs( - workflow_logs: dict, workflow_id: str, requested_status + workflow_logs: dict, workflow_id: str, requested_status ) -> None: """Check if the workflow logs are empty @@ -380,19 +385,15 @@ def download_file_like_value_in_dict(task_log_metadata): download_file_like_value_in_dict(log_value) elif isinstance(log_value, list): # Lists are subworkflows, an item is a task for output_value_item in log_value: - download_file_like_value_in_dict( - task_log_metadata=output_value_item - ) + download_file_like_value_in_dict(task_log_metadata=output_value_item) path_to_downloaded_files = os.getcwd() if task_log_metadata.get("backend") == "PAPIv2": - io_utils.download_gcs_files( - files_to_download, local_dir=os.getcwd() - ) + io_utils.download_gcs_files(file_paths=files_to_download, local_dir=os.getcwd()) print(f"Downloaded files to: {path_to_downloaded_files}") elif task_log_metadata.get("backend") == "TES": io_utils.download_azure_files( - files_to_download, local_dir=os.getcwd() + file_paths=files_to_download, local_dir=os.getcwd() ) print(f"Downloaded files to: {path_to_downloaded_files}") else: @@ -408,9 +409,6 @@ def download_task_level_logs(all_task_log_metadata): """ for call, index_list in all_task_log_metadata.items(): - for call_index in index_list: if call_index is not None: - download_file_like_value_in_dict( - task_log_metadata=call_index - ) + download_file_like_value_in_dict(task_log_metadata=call_index) diff --git a/src/cromshell/utilities/io_utils.py b/src/cromshell/utilities/io_utils.py index b54e1fa7..5ff2d16a 100644 --- a/src/cromshell/utilities/io_utils.py +++ b/src/cromshell/utilities/io_utils.py @@ -240,7 +240,7 @@ def cat_file(file_path: str or Path, backend: str = None) -> str: # Check if the file path is a local path if backend == "Local": - with open(file_path, 'r') as file: + with open(file_path, "r") as file: file_contents = file.read() # Check if the file path is a GCP path elif backend == "PAPIv2": @@ -249,19 +249,19 @@ def cat_file(file_path: str or Path, backend: str = None) -> str: elif backend == "TES": file_contents = get_azure_file_content(file_path) else: - raise ValueError('Invalid file path') + raise ValueError("Invalid file path") return file_contents def get_gcp_file_content(file_path: str) -> str or None: """Returns the contents of a file located on GCP""" - bucket_name, blob_path = file_path.split('//')[-1].split('/', 1) + bucket_name, blob_path = file_path.split("//")[-1].split("/", 1) storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_path) - return blob.download_as_string().decode('utf-8') if blob.exists() else None + return blob.download_as_string().decode("utf-8") if blob.exists() else None def get_azure_file_content(file_path: str) -> str or None: @@ -269,9 +269,9 @@ def get_azure_file_content(file_path: str) -> str or None: blob_service_client = BlobServiceClient( account_url=f"https://{get_az_storage_account()}.blob.core.windows.net", - credential=DefaultAzureCredential() + credential=DefaultAzureCredential(), ) - container_name, blob_path = file_path.split('/', 2)[1:] + container_name, blob_path = file_path.split("/", 2)[1:] blob_client = blob_service_client.get_blob_client( container=container_name, blob=blob_path ) @@ -279,14 +279,16 @@ def get_azure_file_content(file_path: str) -> str or None: try: if blob_client.exists(): - return blob_client.download_blob().readall().decode('utf-8') + return blob_client.download_blob().readall().decode("utf-8") else: return None except azure.core.exceptions.HttpResponseError as e: if "AuthorizationPermissionMismatch" in str(e): - LOGGER.error("Caught an AuthorizationPermissionMismatch error, check that" - "the Azure Storage Container has your account listed to have" - "Storage Blob Data Contributor") + LOGGER.error( + "Caught an AuthorizationPermissionMismatch error, check that" + "the Azure Storage Container has your account listed to have" + "Storage Blob Data Contributor" + ) else: LOGGER.error( "Caught an error while trying to download the file from Azure: %s", e @@ -295,7 +297,9 @@ def get_azure_file_content(file_path: str) -> str or None: def get_az_storage_account() -> str: """Returns the account name of the Azure storage account""" + import cromshell.utilities.cromshellconfig as config + try: return config.cromshell_config_options["azure_storage_account"] except KeyError: @@ -324,13 +328,19 @@ def is_path_or_url_like(in_string: str) -> bool: def download_gcs_files(file_paths, local_dir) -> None: - """Downloads GCS files to local_dir while preserving directory structure""" + """ + Downloads GCS files to local_dir while preserving directory structure + + Args: + file_paths: list of GCS file paths to download + local_dir: local directory to download files to + """ storage_client = storage.Client() for file_path in file_paths: # Extract bucket and blob path from file path LOGGER.debug("Downloading file %s", file_path) - bucket_name, blob_path = file_path.split('//')[-1].split('/', 1) + bucket_name, blob_path = file_path.split("//")[-1].split("/", 1) # Create local subdirectory if it doesn't exist LOGGER.debug("Creating local subdirectory %s", blob_path) @@ -351,8 +361,15 @@ def download_gcs_files(file_paths, local_dir) -> None: def download_azure_files(file_paths, local_dir) -> None: - """Downloads Azure files to local_dir while preserving directory structure""" - # connection_string = "" + """ + Downloads Azure files to local_dir while preserving directory structure + + Args: + file_paths (list): List of Azure file paths to download + local_dir (str): Local directory to download files to + + """ + default_credential = DefaultAzureCredential() account_url = f"https://{get_az_storage_account()}.blob.core.windows.net" @@ -360,13 +377,12 @@ def download_azure_files(file_paths, local_dir) -> None: # Extract container and blob path from file path LOGGER.debug("Downloading file %s", file_path) blob_service_client = BlobServiceClient( - account_url=account_url, - credential=default_credential + account_url=account_url, credential=default_credential + ) + container_name, blob_path = file_path.split("/", 2)[1:] + blob_client = blob_service_client.get_blob_client( + container=container_name, blob=blob_path ) - container_name, blob_path = file_path.split('/', 2)[1:] - blob_client = blob_service_client.get_blob_client(container=container_name, - blob=blob_path) - # blob_client.download_blob() # Create local subdirectory if it doesn't exist LOGGER.debug("Creating local subdirectory %s", blob_path) @@ -375,8 +391,6 @@ def download_azure_files(file_paths, local_dir) -> None: # Download file to local subdirectory LOGGER.debug("Downloading file %s to %s", file_path, local_subdir) - # container_client = blob_service_client.get_container_client(container_name) - # blob_client = container_client.get_blob_client(blob_path) if blob_client.exists(): local_path = Path(local_subdir) / Path(blob_path).name with open(local_path, "wb") as file: From a7b82fc7cca269d4b59c87d0d4f50c5878a27986 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Tue, 9 May 2023 14:34:54 -0400 Subject: [PATCH 08/25] updated requirements.txt to include azure packages --- requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index 0d9de70a..6eedb85c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +azure-identity>=1.12.0 +azure-storage-blob>=12.16.0 gcsfs>=2022.3.0 google-cloud-bigquery>=3.5.0 termcolor>=1.1.0 From 09a3859d86dd2df94fff3399b6ded25843e76905 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Tue, 23 May 2023 10:46:21 -0400 Subject: [PATCH 09/25] Minor updates to command.py, added several unit tests for logs command --- src/cromshell/logs/command.py | 17 +- tests/unit/test_logs.py | 301 +++++++++++++++++++++++++++++----- 2 files changed, 267 insertions(+), 51 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index bc7647c3..048f6819 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -1,6 +1,7 @@ import json import logging import os +import sys import click from termcolor import colored @@ -296,7 +297,7 @@ def print_output_name_and_file( def print_log_file_content( - output_name: str, output_value: str, txt_color: str = "blue", backend: str = None + output_name: str, output_value: str, txt_color: None or str = "blue", backend: str = None ) -> None: """Prints output logs and cat the file if possible. @@ -306,7 +307,7 @@ def print_log_file_content( txt_color (str): The color to use for printing the output. Default is "blue". backend (str): The backend to used to run workflow. Default is None. """ - term_size = os.get_terminal_size().columns + term_size = os.get_terminal_size().columns if sys.stdout.isatty() else 80 print( colored( f"{'=' * term_size}\n{output_name}: {output_value}\n{'=' * term_size}", @@ -344,7 +345,7 @@ def check_for_empty_logs( else: print(json.dumps(workflow_logs)) LOGGER.error( - f"No log found for workflow: {workflow_id} with status: " + f"No logs found for workflow: {workflow_id} with status: " f"{requested_status}" ) raise Exception( @@ -372,7 +373,7 @@ def get_backend_logs(task_instance: dict) -> str: return backend_logs.get("log") -def download_file_like_value_in_dict(task_log_metadata): +def download_file_like_value_in_dict(task_log_metadata: dict): """Download the file like values in the output metadata dictionary""" files_to_download = [] @@ -389,18 +390,20 @@ def download_file_like_value_in_dict(task_log_metadata): path_to_downloaded_files = os.getcwd() if task_log_metadata.get("backend") == "PAPIv2": - io_utils.download_gcs_files(file_paths=files_to_download, local_dir=os.getcwd()) + io_utils.download_gcs_files( + file_paths=files_to_download, local_dir=path_to_downloaded_files + ) print(f"Downloaded files to: {path_to_downloaded_files}") elif task_log_metadata.get("backend") == "TES": io_utils.download_azure_files( - file_paths=files_to_download, local_dir=os.getcwd() + file_paths=files_to_download, local_dir=path_to_downloaded_files ) print(f"Downloaded files to: {path_to_downloaded_files}") else: print(f"Unsupported backend : {task_log_metadata.get('backend')}") -def download_task_level_logs(all_task_log_metadata): +def download_task_level_logs(all_task_log_metadata: dict): """Download the logs from the workflow metadata task_logs_metadata: {call_name:[index1{task_log_name: taskvalue}, index2{...}, ...], call_name:[], ...} diff --git a/tests/unit/test_logs.py b/tests/unit/test_logs.py index ea97baee..0ac37018 100644 --- a/tests/unit/test_logs.py +++ b/tests/unit/test_logs.py @@ -12,77 +12,290 @@ class TestLogs: @pytest.mark.parametrize( "test_file, status_keys, expect_logs", [ - ("success.json", ["Failed"], False), - ("success.json", ["Done"], True), - ("will_fail.json", ["Failed"], True), - ("will_fail.json", ["Failed", "Done"], True), - ("will_fail.json", ["RetryableFailure"], False), - ("will_fail.json", ["ALL"], True), + ("success.json", ["Done"], {'HelloWorld.HelloWorldTask': [ + {'attempt': 1, 'backend': None, + 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log', + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None}], }), + ("will_fail.json", ["Failed"], {'WillFailTester.FailFastTask': [ + {'attempt': 1, 'backend': None, + 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log', + 'executionStatus': 'Failed', 'shardIndex': -1, 'stderr': None, + 'stdout': None}], 'WillFailTester.PassRunsLong': [], }), + ("will_fail.json", ["Failed", "Done"], {'WillFailTester.FailFastTask': [ + {'attempt': 1, 'backend': None, + 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log', + 'executionStatus': 'Failed', 'shardIndex': -1, 'stderr': None, + 'stdout': None}], 'WillFailTester.PassRunsLong': [ + {'attempt': 1, 'backend': None, + 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-PassRunsLong/PassRunsLong.log', + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None}], }), + # ("will_fail.json", ["RetryableFailure"], "Exception: No logs found for workflow: 019d7962-4c0c-4651-87ac-b90efff26ff6 with status: ['RetryableFailure']"), + ("will_fail.json", ["ALL"], {'WillFailTester.FailFastTask': [ + {'attempt': 1, 'backend': None, + 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log', + 'executionStatus': 'Failed', 'shardIndex': -1, 'stderr': None, + 'stdout': None}], 'WillFailTester.PassRunsLong': [ + {'attempt': 1, 'backend': None, + 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-PassRunsLong/PassRunsLong.log', + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None}], }), ], ) - def test_workflow_that_is_doomed( - self, test_file, status_keys, expect_logs, mock_data_path + def test_filter_task_logs_from_workflow_metadata( + self, test_file, status_keys, expect_logs, mock_data_path ): workflow_metadata_path = os.path.join(mock_data_path, test_file) with open(workflow_metadata_path, "r") as f: workflow_metadata = json.load(f) - logs_output = logs_command.print_workflow_logs( + logs_output = logs_command.filter_task_logs_from_workflow_metadata( workflow_metadata=workflow_metadata, - expand_sub_workflows=True, - indent="", - status_keys=status_keys, - cat_logs=False, + requested_status=status_keys, ) assert logs_output == expect_logs @pytest.mark.parametrize( - "test_file, task, expect_logs", + "test_file, status_keys, expect_logs", [ - ( - "local_helloworld_metadata.json", - "HelloWorld.HelloWorldTask", - "Backend Logs Not Available Due to Local Execution", - ), - ( - "PAPIV2_helloworld_metadata.json", - "HelloWorld.HelloWorldTask", - "gs://broad-dsp-lrma-cromwell-central/HelloWorld/9ee4aa2e-7ac5-4c61-88b2-88a4d10f168b/call-HelloWorldTask/HelloWorldTask.log", - ), + + ("success.json", ["Failed"], + "No logs found for workflow: 261ee81a-b6c4-4547-8373-4c879eb24858 with status: ['Failed']"), + ("will_fail.json", ["RetryableFailure"], + "No logs found for workflow: 019d7962-4c0c-4651-87ac-b90efff26ff6 with status: ['RetryableFailure']"), ], ) - def test_get_backend_logs(self, test_file, task, expect_logs, mock_data_path): + def test_filter_task_logs_from_workflow_metadata_failure( + self, test_file, status_keys, expect_logs, mock_data_path + ): workflow_metadata_path = os.path.join(mock_data_path, test_file) - with open(workflow_metadata_path, "r") as f: workflow_metadata = json.load(f) - shard_list = workflow_metadata["calls"][task] + with pytest.raises(Exception) as e: + logs_command.filter_task_logs_from_workflow_metadata( + workflow_metadata=workflow_metadata, + requested_status=status_keys, + ) - assert logs_command.get_backend_logs(task_instance=shard_list[0]) == expect_logs + assert str(e.value) == expect_logs @pytest.mark.parametrize( - "metadata_json", + "all_task_log_metadata, expect_logs", [ - { - "backend": "Local", - "calls": {}, - "failures": [{"message": "Runtime validation failed"}], - }, - { - "backend": "Local", - "calls": {"blah": "blah"}, - "failures": [{"message": "Runtime validation failed"}], - }, + ( + {'HelloWorld.HelloWorldTask': [ + {'attempt': 1, 'backend': None, + 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log', + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None}], + }, + "HelloWorld.HelloWorldTask:\n\tstatus: Done\x1b[0m\n\tbackendLogs: gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log\x1b[0m\n", + ) + ] + ) + def test_print_task_level_logs(self, all_task_log_metadata, expect_logs, capsys): + logs_command.print_task_level_logs( + all_task_log_metadata=all_task_log_metadata, + cat_logs=False, + ) + captured = capsys.readouterr() + assert captured.out == expect_logs + + @pytest.mark.parametrize( + "task_log_metadata, expect_logs", + [ + ( + { + 'attempt': 1, 'backend': None, + 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log', + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None, + }, + "\tstatus: Done\x1b[0m\n\tbackendLogs: gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log\x1b[0m\n", + ) + ] + ) + def test_print_file_like_value_in_dict(self, task_log_metadata, expect_logs, + capsys): + logs_command.print_file_like_value_in_dict( + task_log_metadata=task_log_metadata, + indent=1, + cat_logs=False, + ) + captured = capsys.readouterr() + assert captured.out == expect_logs + + @pytest.mark.parametrize( + "output_name, output_value, indent, expect_logs", + [ + ( + "bla", + "/bla/bla.txt", + 0, + "bla: /bla/bla.txt\x1b[0m\n", + ), + ( # Test when output is string but not file like + "bla", + "not a file", + 0, + "", + ), + ( # Test when output is a float + "bla", + 0.0, + 0, + "", + ), ], ) - def test_check_workflow_for_calls(self, metadata_json): - if not metadata_json.get("calls"): - with pytest.raises(KeyError): - logs_command.check_workflow_for_calls(metadata_json) + def test_print_output_name_and_file( + self, + output_name, + output_value, + indent, + expect_logs, + capsys, + ): + logs_command.print_output_name_and_file( + output_name=output_name, + output_value=output_value, + indent=indent, + txt_color=None, + ) + captured = capsys.readouterr() + assert captured.out == expect_logs + + @pytest.mark.parametrize( + "output_name, output_value", + [ + ( + "fileName", + "./mock_data/logs/success.json", + ), + ] + ) + def test_print_log_file_content( + self, output_name, output_value, capsys + ): + with open(output_value, "r") as f: + file_content = f.read() + logs_command.print_log_file_content( + output_name=output_name, + output_value=output_value, + txt_color=None, + backend="Local", + ) + captured = capsys.readouterr() + assert file_content in captured.out + + @pytest.mark.parametrize( + "workflow_logs, workflow_id, requested_status", + [ + ( + {'HelloWorld.HelloWorldTask': [ + {'attempt': 1, 'backend': None, + 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log', + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None} + ], + } + , "261ee81a-b6c4-4547-8373-4c879eb24858", "Done" + ), + ( + {'HelloWorld.HelloWorldTask': [ + {'attempt': 1, 'backend': 'TES', + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None} + ], + } + , "261ee81a-b6c4-4547-8373-4c879eb24858", "Done" + ), + + ] + ) + def test_check_for_empty_logs( + self, workflow_logs: dict, workflow_id: str, requested_status + ): + logs_command.check_for_empty_logs( + workflow_logs=workflow_logs, + workflow_id=workflow_id, + requested_status=requested_status, + ) + + @pytest.mark.parametrize( + "workflow_logs, workflow_id, requested_status", + [ + ( + {'HelloWorld.HelloWorldTask': [ + {'attempt': 1, 'backend': None, + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None} + ], + } + , "261ee81a-b6c4-4547-8373-4c879eb24858", "Done" + ), + ( + {} + , "261ee81a-b6c4-4547-8373-4c879eb24858", "Done" + ), + + ] + ) + def test_check_for_empty_logs_failure( + self, workflow_logs: dict, workflow_id: str, requested_status + ): + if workflow_logs: + expected_error = f"No logs found for workflow: {workflow_id} with status: " \ + f"{requested_status}" + else: + expected_error = f"No calls found for workflow: {workflow_id}" + + with pytest.raises(Exception) as e: + logs_command.check_for_empty_logs( + workflow_logs=workflow_logs, + workflow_id=workflow_id, + requested_status=requested_status, + ) + + assert str(e.value) == expected_error + + @pytest.mark.parametrize( + "task_instance ", + [ + ( + {'attempt': 1, 'backend': 'PAPI_V2', + 'backendLogs': {'log': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log'}, + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None} + ), + ( + {'attempt': 1, 'backend': 'PAPI_V2', + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None} + ), + ( + {'attempt': 1, 'backend': 'Local', + 'backendLogs': { + 'log': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log'}, + 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, + 'stdout': None} + ), + ] + + ) + def test_get_backend_logs(self, task_instance: dict): + + backend_log = logs_command.get_backend_logs(task_instance=task_instance) + if task_instance["backend"] == "Local": + assert backend_log == "Backend Logs Not Available Due to Local Execution" + elif task_instance["backend"] != "Local" and "backendLogs" not in task_instance: + assert backend_log == "Backend Logs Not Found" else: - logs_command.check_workflow_for_calls(metadata_json) + assert backend_log == task_instance["backendLogs"]["log"] + @pytest.fixture def mock_data_path(self): From f7ee8cdb506adee729bd9a34ddb787d636efcd21 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Mon, 26 Jun 2023 18:31:24 +0300 Subject: [PATCH 10/25] Added function to replace uuids in integration tests --- tests/integration/utility_test_functions.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/integration/utility_test_functions.py b/tests/integration/utility_test_functions.py index c628f0fe..7bf2d460 100644 --- a/tests/integration/utility_test_functions.py +++ b/tests/integration/utility_test_functions.py @@ -144,3 +144,23 @@ def run_cromshell_submit( f"\n{print_exception(*result.exc_info)}" ) return result + + +import re +def replace_uuids(input_string: str, replacement_uuid: str): + """ + Replace all UUIDs in a string with a given UUID + :param input_string: the string to replace UUIDs in + :param replacement_uuid: the UUID to replace all UUIDs in the string with + :return: + """ + # Define the pattern to match 128-bit UUIDs + uuid_pattern = r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" + + # Generate the replacement UUID + new_uuid = str(replacement_uuid) + + # Use regular expressions to find and replace UUIDs in the string + output_string = re.sub(uuid_pattern, new_uuid, input_string) + + return output_string From 21bfa56020243219e257cc7a92995584bff842af Mon Sep 17 00:00:00 2001 From: bshifaw Date: Mon, 26 Jun 2023 18:32:52 +0300 Subject: [PATCH 11/25] functional integration tests for logs --- tests/integration/test_logs.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_logs.py b/tests/integration/test_logs.py index 4b3316a0..b0b0c622 100644 --- a/tests/integration/test_logs.py +++ b/tests/integration/test_logs.py @@ -9,17 +9,21 @@ class TestLogs: @pytest.mark.parametrize( - "wdl, json_file, expected_logs", + "wdl, json_file, status, expected_logs, exit_code", [ ( "tests/workflows/helloWorld.wdl", "tests/workflows/helloWorld.json", - "No logs with status ['Failed'] found for workflow, try adding the argument '-s ALL' to list logs with any status\n", + "ALL", + """HelloWorld.HelloWorldTask:\n\tstatus: Done\n\tstderr: /cromwell-executions/HelloWorld/2686fb3f-d2e6-4a4c-aa66-5dede568310f/call-HelloWorldTask/execution/stderr\n\tstdout: /cromwell-executions/HelloWorld/2686fb3f-d2e6-4a4c-aa66-5dede568310f/call-HelloWorldTask/execution/stdout\n""", + 0, ), ( "tests/workflows/helloWorldFail.wdl", "tests/workflows/helloWorld.json", - "HelloWorld.HelloWorldTask:\tFailed\t Backend Logs Not Found\n", + "Done", + "No logs found for workflow: 2686fb3f-d2e6-4a4c-aa66-5dede568310f with status: ['Done']", + 1, ), ], ) @@ -28,7 +32,9 @@ def test_logs( local_cromwell_url: str, wdl: str, json_file: str, + status: str, expected_logs: str, + exit_code: int, ansi_escape, ): test_workflow_id = utility_test_functions.submit_workflow( @@ -44,13 +50,17 @@ def test_logs( # run logs logs_result = utility_test_functions.run_cromshell_command( - command=["logs", test_workflow_id], - exit_code=0, + command=["logs", "-s", status, test_workflow_id], + exit_code=exit_code, ) - print("Print workflow counts results:") + print("Print workflow logs results:") print(logs_result.stdout) + print(logs_result.stderr) + print(logs_result.exception) + + workflow_logs = ansi_escape.sub("", logs_result.stdout) if exit_code == 0 else str(logs_result.exception) - workflow_logs = ansi_escape.sub("", logs_result.stdout) + id_updated_expected_logs = utility_test_functions.replace_uuids(expected_logs, test_workflow_id) - assert workflow_logs == expected_logs + assert workflow_logs == id_updated_expected_logs From 4cb1e6cc00019530b920d46bfc9074505c44fa2a Mon Sep 17 00:00:00 2001 From: bshifaw Date: Mon, 26 Jun 2023 18:41:26 +0300 Subject: [PATCH 12/25] refractoring logs command.py --- src/cromshell/logs/command.py | 73 ++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index 048f6819..c0b5ff26 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -22,14 +22,16 @@ is_flag=True, default=False, help="Print the contents of the logs to stdout if true. " - "Note: This assumes GCS bucket logs with default permissions otherwise this may not work", + "Note: This assumes GCS bucket logs with default permissions otherwise this may not work", # todo: add a note about this for azure ) -@click.option( - "-f", +@click.option( # TODO: option to specify the location of downloaded logs + "-d", "--fetch-logs", + "--download-logs", is_flag=True, default=False, help="Download the logs to the current directory if true. ", + ) @click.option( "-des", @@ -52,7 +54,7 @@ help="Return a list with links to the task logs with the indicated status. " "Separate multiple keys by comma or use 'ALL' to print all logs. " "Some standard Cromwell status options are 'ALL', 'Done', 'RetryableFailure', 'Running', and 'Failed'.", -) +) # TODO: Show defaults in help @click.pass_obj def main( config, @@ -63,7 +65,11 @@ def main( print_logs: bool, fetch_logs: bool, ): - """Get a subset of the workflow metadata.""" + """Get the logs for a workflow. + + Note: + By default, only failed tasks are returned unless + otherwise specified using -s/--status.""" LOGGER.info("logs") @@ -90,7 +96,7 @@ def main( expand_subworkflows=not dont_expand_subworkflows, ) - if fetch_logs: + if fetch_logs: # Todo: mutually exclusive options download_task_level_logs(all_task_log_metadata=task_logs) else: if json_summary: @@ -205,12 +211,12 @@ def print_task_level_logs(all_task_log_metadata: dict, cat_logs: bool) -> None: cat_logs (bool): Whether to print the logs """ - for call, index_list in all_task_log_metadata.items(): + for call, list_of_call_instances in all_task_log_metadata.items(): print(f"{call}:") - for call_index in index_list: - if call_index is not None: + for call_instance in list_of_call_instances: + if call_instance is not None: print_file_like_value_in_dict( - task_log_metadata=call_index, indent=1, cat_logs=cat_logs + task_log_metadata=call_instance, indent=1, cat_logs=cat_logs ) @@ -227,18 +233,12 @@ def print_file_like_value_in_dict( i = "\t" * indent + task_status = task_log_metadata.get("executionStatus") task_status_font = ( - io_utils.get_color_for_status_key(task_log_metadata.get("executionStatus")) - if task_log_metadata.get("executionStatus") - else None + io_utils.get_color_for_status_key(task_status) if task_status else None ) - print( - colored( - f"{i}status: {task_log_metadata.get('executionStatus')}", - color=task_status_font, - ) - ) + print(colored(f"{i}status: {task_status}", color=task_status_font)) for log_name, log_value in task_log_metadata.items(): if isinstance(log_value, str): @@ -283,17 +283,16 @@ def print_output_name_and_file( i = "\t" * indent - if isinstance(output_value, str): - if io_utils.is_path_or_url_like(output_value): - if cat_logs: - print_log_file_content( - output_name=output_name, - output_value=output_value, - txt_color=txt_color, - backend=backend, - ) - else: - print(colored(f"{i}{output_name}: {output_value}", color=txt_color)) + if isinstance(output_value, str) and io_utils.is_path_or_url_like(output_value): + if cat_logs: + print_log_file_content( + output_name=output_name, + output_value=output_value, + txt_color=txt_color, + backend=backend, + ) + else: + print(colored(f"{i}{output_name}: {output_value}", color=txt_color)) def print_log_file_content( @@ -321,7 +320,7 @@ def print_log_file_content( else: print(file_contents) - print("\n\n\n") # Add some space between logs + print("\n") # Add some space between logs def check_for_empty_logs( @@ -339,11 +338,11 @@ def check_for_empty_logs( raise Exception(f"No calls found for workflow: {workflow_id}") if "log" not in json.dumps(workflow_logs): - if "TES" in json.dumps(workflow_logs): + substrings = ["TES", "Local"] + if any(substring in json.dumps(workflow_logs) for substring in substrings): # Cromwell does not return backendlogs for TES backend at the moment. pass else: - print(json.dumps(workflow_logs)) LOGGER.error( f"No logs found for workflow: {workflow_id} with status: " f"{requested_status}" @@ -373,8 +372,12 @@ def get_backend_logs(task_instance: dict) -> str: return backend_logs.get("log") -def download_file_like_value_in_dict(task_log_metadata: dict): - """Download the file like values in the output metadata dictionary""" +def download_file_like_value_in_dict(task_log_metadata: dict) -> None: + """Download the file like values in the output metadata dictionary + + :param task_log_metadata: The task log metadata + :return: None + """ files_to_download = [] From e1d631677b360092dc6a57e0f8b4e0429316cb17 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Mon, 26 Jun 2023 18:47:09 +0300 Subject: [PATCH 13/25] show default status to retrieve in help message --- src/cromshell/logs/command.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index c0b5ff26..193b0376 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -51,10 +51,11 @@ "-s", "--status", default="Failed", + show_default=True, help="Return a list with links to the task logs with the indicated status. " "Separate multiple keys by comma or use 'ALL' to print all logs. " - "Some standard Cromwell status options are 'ALL', 'Done', 'RetryableFailure', 'Running', and 'Failed'.", -) # TODO: Show defaults in help + "Some standard Cromwell status options are 'Done', 'RetryableFailure', 'Running', and 'Failed'.", +) @click.pass_obj def main( config, From f6e4977d743ad03b0beecac6400b2ed9b58d0b1f Mon Sep 17 00:00:00 2001 From: bshifaw Date: Mon, 26 Jun 2023 20:43:11 +0300 Subject: [PATCH 14/25] Allow the user to specify where to download logs if needed --- src/cromshell/logs/command.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index 193b0376..7b813c5a 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -2,6 +2,7 @@ import logging import os import sys +from pathlib import Path import click from termcolor import colored @@ -24,13 +25,16 @@ help="Print the contents of the logs to stdout if true. " "Note: This assumes GCS bucket logs with default permissions otherwise this may not work", # todo: add a note about this for azure ) -@click.option( # TODO: option to specify the location of downloaded logs +@click.option( "-d", "--fetch-logs", "--download-logs", - is_flag=True, - default=False, - help="Download the logs to the current directory if true. ", + is_flag=False, + flag_value=Path.cwd(), + default=None, + show_default=True, + type=click.Path(exists=True), + help="Download the logs to the current directory or provided directory path. ", ) @click.option( @@ -64,7 +68,7 @@ def main( status: list, dont_expand_subworkflows: bool, print_logs: bool, - fetch_logs: bool, + fetch_logs, ): """Get the logs for a workflow. @@ -98,7 +102,9 @@ def main( ) if fetch_logs: # Todo: mutually exclusive options - download_task_level_logs(all_task_log_metadata=task_logs) + download_task_level_logs( + all_task_log_metadata=task_logs, path_to_download=fetch_logs + ) else: if json_summary: io_utils.pretty_print_json(format_json=task_logs) @@ -373,7 +379,9 @@ def get_backend_logs(task_instance: dict) -> str: return backend_logs.get("log") -def download_file_like_value_in_dict(task_log_metadata: dict) -> None: +def download_file_like_value_in_dict( + task_log_metadata: dict, path_to_download: Path or str +) -> None: """Download the file like values in the output metadata dictionary :param task_log_metadata: The task log metadata @@ -392,7 +400,7 @@ def download_file_like_value_in_dict(task_log_metadata: dict) -> None: for output_value_item in log_value: download_file_like_value_in_dict(task_log_metadata=output_value_item) - path_to_downloaded_files = os.getcwd() + path_to_downloaded_files = path_to_download if task_log_metadata.get("backend") == "PAPIv2": io_utils.download_gcs_files( file_paths=files_to_download, local_dir=path_to_downloaded_files @@ -407,15 +415,20 @@ def download_file_like_value_in_dict(task_log_metadata: dict) -> None: print(f"Unsupported backend : {task_log_metadata.get('backend')}") -def download_task_level_logs(all_task_log_metadata: dict): +def download_task_level_logs( + all_task_log_metadata: dict, path_to_download: Path or str +): """Download the logs from the workflow metadata task_logs_metadata: {call_name:[index1{task_log_name: taskvalue}, index2{...}, ...], call_name:[], ...} Args: all_task_log_metadata (dict): All task logs metadata from the workflow + path_to_download: Path to download log files """ for call, index_list in all_task_log_metadata.items(): for call_index in index_list: if call_index is not None: - download_file_like_value_in_dict(task_log_metadata=call_index) + download_file_like_value_in_dict( + task_log_metadata=call_index, path_to_download=path_to_download + ) From b826399a7a0d37b8e6a475f001154a656c363fb4 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Mon, 26 Jun 2023 22:14:09 +0300 Subject: [PATCH 15/25] made fetch_logs and printing log summary option mutually exclusive options --- src/cromshell/logs/command.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index 7b813c5a..ec67bf9f 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -23,7 +23,7 @@ is_flag=True, default=False, help="Print the contents of the logs to stdout if true. " - "Note: This assumes GCS bucket logs with default permissions otherwise this may not work", # todo: add a note about this for azure + "Note: This assumes GCS or Azure stored logs with default permissions otherwise this will not work", ) @click.option( "-d", @@ -101,17 +101,17 @@ def main( expand_subworkflows=not dont_expand_subworkflows, ) - if fetch_logs: # Todo: mutually exclusive options + if fetch_logs: download_task_level_logs( all_task_log_metadata=task_logs, path_to_download=fetch_logs ) + + if json_summary: + io_utils.pretty_print_json(format_json=task_logs) else: - if json_summary: - io_utils.pretty_print_json(format_json=task_logs) - else: - print_task_level_logs( - all_task_log_metadata=task_logs, cat_logs=print_logs - ) + print_task_level_logs( + all_task_log_metadata=task_logs, cat_logs=print_logs + ) return return_code @@ -412,7 +412,9 @@ def download_file_like_value_in_dict( ) print(f"Downloaded files to: {path_to_downloaded_files}") else: - print(f"Unsupported backend : {task_log_metadata.get('backend')}") + print( + f"Downloading items is unsupported for backend : {task_log_metadata.get('backend')}" + ) def download_task_level_logs( From 67b05664cc6840842e466c819c5f3b6869fb2a6c Mon Sep 17 00:00:00 2001 From: bshifaw Date: Mon, 26 Jun 2023 23:14:07 +0300 Subject: [PATCH 16/25] linting changes --- src/cromshell/logs/command.py | 10 +++++----- tests/integration/test_logs.py | 10 ++++++++-- tests/integration/utility_test_functions.py | 2 +- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index ec67bf9f..f8ed70e9 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -35,7 +35,6 @@ show_default=True, type=click.Path(exists=True), help="Download the logs to the current directory or provided directory path. ", - ) @click.option( "-des", @@ -109,9 +108,7 @@ def main( if json_summary: io_utils.pretty_print_json(format_json=task_logs) else: - print_task_level_logs( - all_task_log_metadata=task_logs, cat_logs=print_logs - ) + print_task_level_logs(all_task_log_metadata=task_logs, cat_logs=print_logs) return return_code @@ -303,7 +300,10 @@ def print_output_name_and_file( def print_log_file_content( - output_name: str, output_value: str, txt_color: None or str = "blue", backend: str = None + output_name: str, + output_value: str, + txt_color: None or str = "blue", + backend: str = None, ) -> None: """Prints output logs and cat the file if possible. diff --git a/tests/integration/test_logs.py b/tests/integration/test_logs.py index b0b0c622..4b48cb35 100644 --- a/tests/integration/test_logs.py +++ b/tests/integration/test_logs.py @@ -59,8 +59,14 @@ def test_logs( print(logs_result.stderr) print(logs_result.exception) - workflow_logs = ansi_escape.sub("", logs_result.stdout) if exit_code == 0 else str(logs_result.exception) + workflow_logs = ( + ansi_escape.sub("", logs_result.stdout) + if exit_code == 0 + else str(logs_result.exception) + ) - id_updated_expected_logs = utility_test_functions.replace_uuids(expected_logs, test_workflow_id) + id_updated_expected_logs = utility_test_functions.replace_uuids( + expected_logs, test_workflow_id + ) assert workflow_logs == id_updated_expected_logs diff --git a/tests/integration/utility_test_functions.py b/tests/integration/utility_test_functions.py index 7bf2d460..16d5c2e8 100644 --- a/tests/integration/utility_test_functions.py +++ b/tests/integration/utility_test_functions.py @@ -1,4 +1,5 @@ import json +import re from importlib import reload from pathlib import Path from traceback import print_exception @@ -146,7 +147,6 @@ def run_cromshell_submit( return result -import re def replace_uuids(input_string: str, replacement_uuid: str): """ Replace all UUIDs in a string with a given UUID From a79f0e18d271375f798884839bc20842bbc54a76 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Tue, 27 Jun 2023 19:02:42 +0300 Subject: [PATCH 17/25] linting changes, fix to path for test function --- tests/unit/test_logs.py | 362 ++++++++++++++++++++++++++-------------- 1 file changed, 239 insertions(+), 123 deletions(-) diff --git a/tests/unit/test_logs.py b/tests/unit/test_logs.py index 0ac37018..9697bb4a 100644 --- a/tests/unit/test_logs.py +++ b/tests/unit/test_logs.py @@ -1,5 +1,6 @@ import json import os +from pathlib import Path import pytest @@ -12,39 +13,102 @@ class TestLogs: @pytest.mark.parametrize( "test_file, status_keys, expect_logs", [ - ("success.json", ["Done"], {'HelloWorld.HelloWorldTask': [ - {'attempt': 1, 'backend': None, - 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log', - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None}], }), - ("will_fail.json", ["Failed"], {'WillFailTester.FailFastTask': [ - {'attempt': 1, 'backend': None, - 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log', - 'executionStatus': 'Failed', 'shardIndex': -1, 'stderr': None, - 'stdout': None}], 'WillFailTester.PassRunsLong': [], }), - ("will_fail.json", ["Failed", "Done"], {'WillFailTester.FailFastTask': [ - {'attempt': 1, 'backend': None, - 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log', - 'executionStatus': 'Failed', 'shardIndex': -1, 'stderr': None, - 'stdout': None}], 'WillFailTester.PassRunsLong': [ - {'attempt': 1, 'backend': None, - 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-PassRunsLong/PassRunsLong.log', - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None}], }), + ( + "success.json", + ["Done"], + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + ), + ( + "will_fail.json", + ["Failed"], + { + "WillFailTester.FailFastTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log", + "executionStatus": "Failed", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + "WillFailTester.PassRunsLong": [], + }, + ), + ( + "will_fail.json", + ["Failed", "Done"], + { + "WillFailTester.FailFastTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log", + "executionStatus": "Failed", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + "WillFailTester.PassRunsLong": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-PassRunsLong/PassRunsLong.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + ), # ("will_fail.json", ["RetryableFailure"], "Exception: No logs found for workflow: 019d7962-4c0c-4651-87ac-b90efff26ff6 with status: ['RetryableFailure']"), - ("will_fail.json", ["ALL"], {'WillFailTester.FailFastTask': [ - {'attempt': 1, 'backend': None, - 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log', - 'executionStatus': 'Failed', 'shardIndex': -1, 'stderr': None, - 'stdout': None}], 'WillFailTester.PassRunsLong': [ - {'attempt': 1, 'backend': None, - 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-PassRunsLong/PassRunsLong.log', - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None}], }), + ( + "will_fail.json", + ["ALL"], + { + "WillFailTester.FailFastTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-FailFastTask/FailFastTask.log", + "executionStatus": "Failed", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + "WillFailTester.PassRunsLong": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/WillFailTester/019d7962-4c0c-4651-87ac-b90efff26ff6/call-PassRunsLong/PassRunsLong.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + ), ], ) def test_filter_task_logs_from_workflow_metadata( - self, test_file, status_keys, expect_logs, mock_data_path + self, test_file, status_keys, expect_logs, mock_data_path ): workflow_metadata_path = os.path.join(mock_data_path, test_file) with open(workflow_metadata_path, "r") as f: @@ -60,15 +124,20 @@ def test_filter_task_logs_from_workflow_metadata( @pytest.mark.parametrize( "test_file, status_keys, expect_logs", [ - - ("success.json", ["Failed"], - "No logs found for workflow: 261ee81a-b6c4-4547-8373-4c879eb24858 with status: ['Failed']"), - ("will_fail.json", ["RetryableFailure"], - "No logs found for workflow: 019d7962-4c0c-4651-87ac-b90efff26ff6 with status: ['RetryableFailure']"), + ( + "success.json", + ["Failed"], + "No logs found for workflow: 261ee81a-b6c4-4547-8373-4c879eb24858 with status: ['Failed']", + ), + ( + "will_fail.json", + ["RetryableFailure"], + "No logs found for workflow: 019d7962-4c0c-4651-87ac-b90efff26ff6 with status: ['RetryableFailure']", + ), ], ) def test_filter_task_logs_from_workflow_metadata_failure( - self, test_file, status_keys, expect_logs, mock_data_path + self, test_file, status_keys, expect_logs, mock_data_path ): workflow_metadata_path = os.path.join(mock_data_path, test_file) with open(workflow_metadata_path, "r") as f: @@ -86,15 +155,22 @@ def test_filter_task_logs_from_workflow_metadata_failure( "all_task_log_metadata, expect_logs", [ ( - {'HelloWorld.HelloWorldTask': [ - {'attempt': 1, 'backend': None, - 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log', - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None}], - }, - "HelloWorld.HelloWorldTask:\n\tstatus: Done\x1b[0m\n\tbackendLogs: gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log\x1b[0m\n", + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } + ], + }, + "HelloWorld.HelloWorldTask:\n\tstatus: Done\n\tbackendLogs: gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log\n", ) - ] + ], ) def test_print_task_level_logs(self, all_task_log_metadata, expect_logs, capsys): logs_command.print_task_level_logs( @@ -108,18 +184,22 @@ def test_print_task_level_logs(self, all_task_log_metadata, expect_logs, capsys) "task_log_metadata, expect_logs", [ ( - { - 'attempt': 1, 'backend': None, - 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log', - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None, - }, - "\tstatus: Done\x1b[0m\n\tbackendLogs: gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log\x1b[0m\n", + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + }, + "\tstatus: Done\n\tbackendLogs: gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log\n", ) - ] + ], ) - def test_print_file_like_value_in_dict(self, task_log_metadata, expect_logs, - capsys): + def test_print_file_like_value_in_dict( + self, task_log_metadata, expect_logs, capsys + ): logs_command.print_file_like_value_in_dict( task_log_metadata=task_log_metadata, indent=1, @@ -132,32 +212,32 @@ def test_print_file_like_value_in_dict(self, task_log_metadata, expect_logs, "output_name, output_value, indent, expect_logs", [ ( - "bla", - "/bla/bla.txt", - 0, - "bla: /bla/bla.txt\x1b[0m\n", + "bla", + "/bla/bla.txt", + 0, + "bla: /bla/bla.txt\n", ), ( # Test when output is string but not file like - "bla", - "not a file", - 0, - "", + "bla", + "not a file", + 0, + "", ), ( # Test when output is a float - "bla", - 0.0, - 0, - "", + "bla", + 0.0, + 0, + "", ), ], ) def test_print_output_name_and_file( - self, - output_name, - output_value, - indent, - expect_logs, - capsys, + self, + output_name, + output_value, + indent, + expect_logs, + capsys, ): logs_command.print_output_name_and_file( output_name=output_name, @@ -172,19 +252,22 @@ def test_print_output_name_and_file( "output_name, output_value", [ ( - "fileName", - "./mock_data/logs/success.json", + "fileName", + "success.json", ), - ] + ], ) def test_print_log_file_content( - self, output_name, output_value, capsys + self, output_name, output_value, mock_data_path, capsys ): - with open(output_value, "r") as f: + abs_output_value = Path(mock_data_path).joinpath(output_value) + + with open(abs_output_value, "r") as f: file_content = f.read() + logs_command.print_log_file_content( output_name=output_name, - output_value=output_value, + output_value=str(abs_output_value), txt_color=None, backend="Local", ) @@ -195,26 +278,39 @@ def test_print_log_file_content( "workflow_logs, workflow_id, requested_status", [ ( - {'HelloWorld.HelloWorldTask': [ - {'attempt': 1, 'backend': None, - 'backendLogs': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log', - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None} + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": None, + "backendLogs": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } ], - } - , "261ee81a-b6c4-4547-8373-4c879eb24858", "Done" + }, + "261ee81a-b6c4-4547-8373-4c879eb24858", + "Done", ), ( - {'HelloWorld.HelloWorldTask': [ - {'attempt': 1, 'backend': 'TES', - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None} + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": "TES", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } ], - } - , "261ee81a-b6c4-4547-8373-4c879eb24858", "Done" + }, + "261ee81a-b6c4-4547-8373-4c879eb24858", + "Done", ), - - ] + ], ) def test_check_for_empty_logs( self, workflow_logs: dict, workflow_id: str, requested_status @@ -229,27 +325,32 @@ def test_check_for_empty_logs( "workflow_logs, workflow_id, requested_status", [ ( - {'HelloWorld.HelloWorldTask': [ - {'attempt': 1, 'backend': None, - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None} + { + "HelloWorld.HelloWorldTask": [ + { + "attempt": 1, + "backend": None, + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } ], - } - , "261ee81a-b6c4-4547-8373-4c879eb24858", "Done" - ), - ( - {} - , "261ee81a-b6c4-4547-8373-4c879eb24858", "Done" + }, + "261ee81a-b6c4-4547-8373-4c879eb24858", + "Done", ), - - ] + ({}, "261ee81a-b6c4-4547-8373-4c879eb24858", "Done"), + ], ) def test_check_for_empty_logs_failure( - self, workflow_logs: dict, workflow_id: str, requested_status + self, workflow_logs: dict, workflow_id: str, requested_status ): if workflow_logs: - expected_error = f"No logs found for workflow: {workflow_id} with status: " \ - f"{requested_status}" + expected_error = ( + f"No logs found for workflow: {workflow_id} with status: " + f"{requested_status}" + ) else: expected_error = f"No calls found for workflow: {workflow_id}" @@ -266,28 +367,44 @@ def test_check_for_empty_logs_failure( "task_instance ", [ ( - {'attempt': 1, 'backend': 'PAPI_V2', - 'backendLogs': {'log': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log'}, - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None} + { + "attempt": 1, + "backend": "PAPI_V2", + "backendLogs": { + "log": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log" + }, + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } ), ( - {'attempt': 1, 'backend': 'PAPI_V2', - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None} + { + "attempt": 1, + "backend": "PAPI_V2", + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } ), ( - {'attempt': 1, 'backend': 'Local', - 'backendLogs': { - 'log': 'gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log'}, - 'executionStatus': 'Done', 'shardIndex': -1, 'stderr': None, - 'stdout': None} + { + "attempt": 1, + "backend": "Local", + "backendLogs": { + "log": "gs://broad-methods-cromwell-exec-bucket-instance-8/HelloWorld/261ee81a-b6c4-4547-8373-4c879eb24858/call-HelloWorldTask/HelloWorldTask.log" + }, + "executionStatus": "Done", + "shardIndex": -1, + "stderr": None, + "stdout": None, + } ), - ] - + ], ) def test_get_backend_logs(self, task_instance: dict): - backend_log = logs_command.get_backend_logs(task_instance=task_instance) if task_instance["backend"] == "Local": assert backend_log == "Backend Logs Not Available Due to Local Execution" @@ -296,7 +413,6 @@ def test_get_backend_logs(self, task_instance: dict): else: assert backend_log == task_instance["backendLogs"]["log"] - @pytest.fixture def mock_data_path(self): return os.path.join(os.path.dirname(__file__), "mock_data/logs/") From 00fe4ed11bbc0ec3d16a74f71cc2aec967d1a1e3 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Tue, 27 Jun 2023 22:47:57 +0300 Subject: [PATCH 18/25] Added more explicit message about using ALL when default 'Failed' logs are not found --- src/cromshell/logs/command.py | 6 ++++-- tests/unit/test_logs.py | 7 ++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index f8ed70e9..728bc083 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -352,11 +352,13 @@ def check_for_empty_logs( else: LOGGER.error( f"No logs found for workflow: {workflow_id} with status: " - f"{requested_status}" + f"{requested_status}. Try adding the argument '-s ALL' to " + f"list logs with any status." ) raise Exception( f"No logs found for workflow: {workflow_id} with status: " - f"{requested_status}" + f"{requested_status}. Try adding the argument '-s ALL' to " + f"list logs with any status." ) diff --git a/tests/unit/test_logs.py b/tests/unit/test_logs.py index 9697bb4a..2c31c1cb 100644 --- a/tests/unit/test_logs.py +++ b/tests/unit/test_logs.py @@ -127,12 +127,12 @@ def test_filter_task_logs_from_workflow_metadata( ( "success.json", ["Failed"], - "No logs found for workflow: 261ee81a-b6c4-4547-8373-4c879eb24858 with status: ['Failed']", + "No logs found for workflow: 261ee81a-b6c4-4547-8373-4c879eb24858 with status: ['Failed']. Try adding the argument '-s ALL' to list logs with any status.", ), ( "will_fail.json", ["RetryableFailure"], - "No logs found for workflow: 019d7962-4c0c-4651-87ac-b90efff26ff6 with status: ['RetryableFailure']", + "No logs found for workflow: 019d7962-4c0c-4651-87ac-b90efff26ff6 with status: ['RetryableFailure']. Try adding the argument '-s ALL' to list logs with any status.", ), ], ) @@ -349,7 +349,8 @@ def test_check_for_empty_logs_failure( if workflow_logs: expected_error = ( f"No logs found for workflow: {workflow_id} with status: " - f"{requested_status}" + f"{requested_status}. Try adding the argument '-s ALL' to " + f"list logs with any status." ) else: expected_error = f"No calls found for workflow: {workflow_id}" From 1228b5ac00ceee5620160a70a224d89374d32bbb Mon Sep 17 00:00:00 2001 From: bshifaw Date: Wed, 28 Jun 2023 21:59:33 +0300 Subject: [PATCH 19/25] fix integration test --- tests/integration/test_logs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_logs.py b/tests/integration/test_logs.py index 4b48cb35..aa701d22 100644 --- a/tests/integration/test_logs.py +++ b/tests/integration/test_logs.py @@ -22,7 +22,7 @@ class TestLogs: "tests/workflows/helloWorldFail.wdl", "tests/workflows/helloWorld.json", "Done", - "No logs found for workflow: 2686fb3f-d2e6-4a4c-aa66-5dede568310f with status: ['Done']", + "No logs found for workflow: 2686fb3f-d2e6-4a4c-aa66-5dede568310f with status: ['Done']. Try adding the argument '-s ALL' to list logs with any status.", 1, ), ], From 389eae33909fbcee86b0baddf9909b4a264a259b Mon Sep 17 00:00:00 2001 From: bshifaw Date: Thu, 29 Jun 2023 20:33:11 +0300 Subject: [PATCH 20/25] enum for backends --- src/cromshell/logs/command.py | 6 ++-- src/cromshell/utilities/io_utils.py | 56 +++++++++++++++++++++++------ 2 files changed, 48 insertions(+), 14 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index 728bc083..8d2cfdeb 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -345,7 +345,7 @@ def check_for_empty_logs( raise Exception(f"No calls found for workflow: {workflow_id}") if "log" not in json.dumps(workflow_logs): - substrings = ["TES", "Local"] + substrings = io_utils.BackendType.AZURE.value + io_utils.BackendType.LOCAL.value if any(substring in json.dumps(workflow_logs) for substring in substrings): # Cromwell does not return backendlogs for TES backend at the moment. pass @@ -403,12 +403,12 @@ def download_file_like_value_in_dict( download_file_like_value_in_dict(task_log_metadata=output_value_item) path_to_downloaded_files = path_to_download - if task_log_metadata.get("backend") == "PAPIv2": + if task_log_metadata.get("backend") in io_utils.BackendType.GCP.value: io_utils.download_gcs_files( file_paths=files_to_download, local_dir=path_to_downloaded_files ) print(f"Downloaded files to: {path_to_downloaded_files}") - elif task_log_metadata.get("backend") == "TES": + elif task_log_metadata.get("backend") in io_utils.BackendType.AZURE.value: io_utils.download_azure_files( file_paths=files_to_download, local_dir=path_to_downloaded_files ) diff --git a/src/cromshell/utilities/io_utils.py b/src/cromshell/utilities/io_utils.py index 5ff2d16a..99308e2a 100644 --- a/src/cromshell/utilities/io_utils.py +++ b/src/cromshell/utilities/io_utils.py @@ -3,6 +3,7 @@ import re import shutil from contextlib import nullcontext +from enum import Enum from io import BytesIO from pathlib import Path from typing import BinaryIO, List, Union @@ -239,14 +240,14 @@ def cat_file(file_path: str or Path, backend: str = None) -> str: local file path, GCP file path, Azure file path, or AWS file path.""" # Check if the file path is a local path - if backend == "Local": + if backend in BackendType.LOCAL.value: with open(file_path, "r") as file: file_contents = file.read() # Check if the file path is a GCP path - elif backend == "PAPIv2": + elif backend in BackendType.GCP.value: file_contents = get_gcp_file_content(file_path) # Check if the file path is an Azure path - elif backend == "TES": + elif backend in BackendType.AZURE.value: file_contents = get_azure_file_content(file_path) else: raise ValueError("Invalid file path") @@ -293,6 +294,7 @@ def get_azure_file_content(file_path: str) -> str or None: LOGGER.error( "Caught an error while trying to download the file from Azure: %s", e ) + raise e def get_az_storage_account() -> str: @@ -307,6 +309,7 @@ def get_az_storage_account() -> str: "An 'azure_storage_account' is required for this action but" "was not found in Cromshell configuration file. " ) + raise KeyError("Missing 'azure_storage_account' in Cromshell configuration") def is_path_or_url_like(in_string: str) -> bool: @@ -327,7 +330,25 @@ def is_path_or_url_like(in_string: str) -> bool: return False -def download_gcs_files(file_paths, local_dir) -> None: +def create_local_subdirectory(local_dir: str or Path, blob_path: str or Path) -> Path: + """ + Creates a local subdirectory for a given blob path. + A blob path is a path to a file in a GCS bucket. + + :param local_dir: Path to local directory + :param blob_path: Path to blob in GCS bucket + :return: + """ + + LOGGER.debug("Creating local subdirectory %s", blob_path) + + local_subdir = Path(local_dir) / Path(blob_path).parent + Path.mkdir(local_subdir, exist_ok=True, parents=True) + + return local_subdir + + +def download_gcs_files(file_paths: list, local_dir: str or Path) -> None: """ Downloads GCS files to local_dir while preserving directory structure @@ -343,9 +364,9 @@ def download_gcs_files(file_paths, local_dir) -> None: bucket_name, blob_path = file_path.split("//")[-1].split("/", 1) # Create local subdirectory if it doesn't exist - LOGGER.debug("Creating local subdirectory %s", blob_path) - local_subdir = Path(local_dir) / Path(blob_path).parent - Path.mkdir(local_subdir, exist_ok=True, parents=True) + local_subdir = create_local_subdirectory( + local_dir=local_dir, blob_path=blob_path + ) # Download file to local subdirectory LOGGER.debug("Downloading file %s to %s", file_path, local_subdir) @@ -360,7 +381,7 @@ def download_gcs_files(file_paths, local_dir) -> None: LOGGER.warning("File %s does not exist", file_path) -def download_azure_files(file_paths, local_dir) -> None: +def download_azure_files(file_paths: list, local_dir: str or Path) -> None: """ Downloads Azure files to local_dir while preserving directory structure @@ -385,9 +406,9 @@ def download_azure_files(file_paths, local_dir) -> None: ) # Create local subdirectory if it doesn't exist - LOGGER.debug("Creating local subdirectory %s", blob_path) - local_subdir = Path(local_dir) / Path(blob_path).parent - Path.mkdir(local_subdir, exist_ok=True, parents=True) + local_subdir = create_local_subdirectory( + local_dir=local_dir, blob_path=blob_path + ) # Download file to local subdirectory LOGGER.debug("Downloading file %s to %s", file_path, local_subdir) @@ -401,6 +422,19 @@ def download_azure_files(file_paths, local_dir) -> None: LOGGER.warning("File %s does not exist", file_path) +class BackendType(Enum): + """Enum to hold supported backend types""" + + # Backends listed here: https://cromwell.readthedocs.io/en/latest/backends/Backends/ + + AWS = ("AWSBatch", "AWSBatchOld", "AWSBatchOld_Single", "AWSBatch_Single") + AZURE = ("TES", "AzureBatch", "AzureBatch_Single") + GA4GH = ("TES",) + GCP = ("PAPIv2", "PAPIv2alpha1", "PAPIv2beta", "PAPIv2alpha") + LOCAL = ("Local",) + HPC = ("SGE", "SLURM", "LSF", "SunGridEngine", "HtCondor") + + class TextStatusesColor: """Holds stdout formatting per workflow status""" From 69ac9071fe82370a2e837a991eb4064c71b7b3e8 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Thu, 29 Jun 2023 21:08:49 +0300 Subject: [PATCH 21/25] Added reason why the logs api isn't being used by the logs command --- src/cromshell/logs/command.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/cromshell/logs/command.py b/src/cromshell/logs/command.py index 8d2cfdeb..972ad1ec 100644 --- a/src/cromshell/logs/command.py +++ b/src/cromshell/logs/command.py @@ -94,7 +94,7 @@ def main( workflow_id=workflow_id, cromshell_config=config ) - task_logs = get_task_level_outputs( + task_logs = get_task_level_logs( config, requested_status=status_param, expand_subworkflows=not dont_expand_subworkflows, @@ -113,8 +113,12 @@ def main( return return_code -def get_task_level_outputs(config, expand_subworkflows, requested_status) -> dict: - """Get the task level outputs from the workflow metadata +def get_task_level_logs(config, expand_subworkflows, requested_status) -> dict: + """Get the task level logs from the workflow metadata + + Note: This command isn't using Cromwell's 'log' api to obtain the logs. + Instead, the logs is extracted from the metadata, this allows us to filter + logs by task status, also retrieve subworkflows logs of a workflow. Args: config (object): The cromshell config object From 07fa6a2a58695a024fd7434e98f1fcbc689c32d7 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Fri, 30 Jun 2023 21:17:37 +0300 Subject: [PATCH 22/25] added unit tests for io utils functions, added warning for azure/gcp files that are empty --- src/cromshell/utilities/io_utils.py | 31 ++++++----- tests/unit/test_io_utils.py | 83 +++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 12 deletions(-) diff --git a/src/cromshell/utilities/io_utils.py b/src/cromshell/utilities/io_utils.py index 99308e2a..5ba2d62f 100644 --- a/src/cromshell/utilities/io_utils.py +++ b/src/cromshell/utilities/io_utils.py @@ -262,11 +262,21 @@ def get_gcp_file_content(file_path: str) -> str or None: bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_path) - return blob.download_as_string().decode("utf-8") if blob.exists() else None + if not blob.exists(): + LOGGER.warning( + "Unable to find file '%s' in bucket '%s'", blob_path, bucket_name + ) + return None + else: + return blob.download_as_string().decode("utf-8") def get_azure_file_content(file_path: str) -> str or None: - """Returns the contents of a file located on Azure""" + """Returns the contents of a file located on Azure + + file_path: full blob path to file on Azure example: + "/cromwell-executions/HelloWorld/5dd14f5c-4bf5-413a-9641-b6498a1778c3/call-HelloWorldTask/execution/stdout" + """ blob_service_client = BlobServiceClient( account_url=f"https://{get_az_storage_account()}.blob.core.windows.net", @@ -282,6 +292,9 @@ def get_azure_file_content(file_path: str) -> str or None: if blob_client.exists(): return blob_client.download_blob().readall().decode("utf-8") else: + LOGGER.warning( + "Unable to find file '%s' in container '%s'", blob_path, container_name + ) return None except azure.core.exceptions.HttpResponseError as e: if "AuthorizationPermissionMismatch" in str(e): @@ -317,17 +330,11 @@ def is_path_or_url_like(in_string: str) -> bool: Args: in_string (str): The string to check for path or url like-ness + Returns: + bool: True if the string is a path or URL, False otherwise. """ - if ( - in_string.startswith("gs://") - or in_string.startswith("/") - or in_string.startswith("http://") - or in_string.startswith("https://") - or in_string.startswith("s3://") - ): - return True - else: - return False + prefixes = ("gs://", "/", "http://", "https://", "s3://") + return any(in_string.startswith(prefix) for prefix in prefixes) def create_local_subdirectory(local_dir: str or Path, blob_path: str or Path) -> Path: diff --git a/tests/unit/test_io_utils.py b/tests/unit/test_io_utils.py index ce9c86b2..a6b1dbbe 100644 --- a/tests/unit/test_io_utils.py +++ b/tests/unit/test_io_utils.py @@ -328,6 +328,89 @@ def test_update_all_workflow_database_tsv( ): assert row[column_to_update] == update_value + @pytest.mark.parametrize( + "file_path, backend, should_fail", + [ + [ + "", + "Local", + False, + ], + [ + "gs://gcs-public-data--genomics/cannabis/README.txt", + "PAPIv2", + False, + ], + ] + ) + def test_cat_file( + self, + file_path: str, + backend: str, + should_fail: bool, + mock_workflow_database_tsv, + ) -> None: + + if file_path: + io_utils.cat_file( + file_path=file_path, + backend=backend + ) + else: + io_utils.cat_file( + file_path=mock_workflow_database_tsv, + backend=backend + ) + + @pytest.mark.parametrize( + "file_path, should_fail", + [ + [ + "gs://gcs-public-data--genomics/cannabis/README.txt", + False, + ], + [ + "gs://path2fail/cannabis/README.txt", + True, + ], + ] + ) + def test_get_gcp_file_content(self, file_path, should_fail) -> None: + if should_fail: + assert io_utils.get_gcp_file_content(file_path=file_path) is None + else: + assert io_utils.get_gcp_file_content(file_path=file_path) + + @pytest.mark.parametrize( + "file_path, should_fail", + [ + [ + "gs://gcs-public-data--genomics/cannabis/README.txt", + False, + ], + [ + "README.txt", + True, + ], + [ + "http://", + False, + ], + [ + "https://", + False, + ], + [ + "s3://", + False, + ], + ] + ) + def test_is_path_or_url_like(self, file_path: str, should_fail: bool) -> None: + if should_fail: + assert not io_utils.is_path_or_url_like(in_string=file_path) + else: + assert io_utils.is_path_or_url_like(in_string=file_path) @pytest.fixture def mock_data_path(self): return Path(__file__).parent.joinpath("mock_data/") From 5a6f3dc4ae7030685728d4788c67471b3ee3b21b Mon Sep 17 00:00:00 2001 From: bshifaw Date: Thu, 20 Jul 2023 10:41:42 -0400 Subject: [PATCH 23/25] lint fix --- tests/unit/test_io_utils.py | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/tests/unit/test_io_utils.py b/tests/unit/test_io_utils.py index a6b1dbbe..96b1e874 100644 --- a/tests/unit/test_io_utils.py +++ b/tests/unit/test_io_utils.py @@ -341,26 +341,19 @@ def test_update_all_workflow_database_tsv( "PAPIv2", False, ], - ] + ], ) def test_cat_file( - self, - file_path: str, - backend: str, - should_fail: bool, - mock_workflow_database_tsv, + self, + file_path: str, + backend: str, + should_fail: bool, + mock_workflow_database_tsv, ) -> None: - if file_path: - io_utils.cat_file( - file_path=file_path, - backend=backend - ) + io_utils.cat_file(file_path=file_path, backend=backend) else: - io_utils.cat_file( - file_path=mock_workflow_database_tsv, - backend=backend - ) + io_utils.cat_file(file_path=mock_workflow_database_tsv, backend=backend) @pytest.mark.parametrize( "file_path, should_fail", @@ -373,7 +366,7 @@ def test_cat_file( "gs://path2fail/cannabis/README.txt", True, ], - ] + ], ) def test_get_gcp_file_content(self, file_path, should_fail) -> None: if should_fail: @@ -404,13 +397,14 @@ def test_get_gcp_file_content(self, file_path, should_fail) -> None: "s3://", False, ], - ] + ], ) def test_is_path_or_url_like(self, file_path: str, should_fail: bool) -> None: if should_fail: assert not io_utils.is_path_or_url_like(in_string=file_path) else: assert io_utils.is_path_or_url_like(in_string=file_path) + @pytest.fixture def mock_data_path(self): return Path(__file__).parent.joinpath("mock_data/") From 7a7459e8929083e6c799ec2df43e49d543a2da76 Mon Sep 17 00:00:00 2001 From: bshifaw Date: Thu, 20 Jul 2023 11:27:27 -0400 Subject: [PATCH 24/25] removed get_gcp_file_content because it requires in git tox test due to lack of default creds --- tests/unit/test_io_utils.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/tests/unit/test_io_utils.py b/tests/unit/test_io_utils.py index 96b1e874..dcac5335 100644 --- a/tests/unit/test_io_utils.py +++ b/tests/unit/test_io_utils.py @@ -355,24 +355,6 @@ def test_cat_file( else: io_utils.cat_file(file_path=mock_workflow_database_tsv, backend=backend) - @pytest.mark.parametrize( - "file_path, should_fail", - [ - [ - "gs://gcs-public-data--genomics/cannabis/README.txt", - False, - ], - [ - "gs://path2fail/cannabis/README.txt", - True, - ], - ], - ) - def test_get_gcp_file_content(self, file_path, should_fail) -> None: - if should_fail: - assert io_utils.get_gcp_file_content(file_path=file_path) is None - else: - assert io_utils.get_gcp_file_content(file_path=file_path) @pytest.mark.parametrize( "file_path, should_fail", From ff3ff456255be9810b4fb5c289468b71d2177e7b Mon Sep 17 00:00:00 2001 From: bshifaw Date: Thu, 20 Jul 2023 11:30:37 -0400 Subject: [PATCH 25/25] lint fix --- tests/unit/test_io_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_io_utils.py b/tests/unit/test_io_utils.py index dcac5335..99208f68 100644 --- a/tests/unit/test_io_utils.py +++ b/tests/unit/test_io_utils.py @@ -355,7 +355,6 @@ def test_cat_file( else: io_utils.cat_file(file_path=mock_workflow_database_tsv, backend=backend) - @pytest.mark.parametrize( "file_path, should_fail", [