diff --git a/export_main.py b/export_main.py index 37614729f..678988a14 100644 --- a/export_main.py +++ b/export_main.py @@ -2,6 +2,13 @@ import os from importlib import reload + +# Change to the project repository location +my_wd = os.getcwd() +my_repo = "research-and-development" +if not my_wd.endswith(my_repo): + os.chdir(my_repo) + from src.outputs import export_files reload(export_files) diff --git a/export_mods_main.py b/export_mods_main.py new file mode 100644 index 000000000..f788f6713 --- /dev/null +++ b/export_mods_main.py @@ -0,0 +1,103 @@ +"""Script that creates all directories""" +import os + +# Change to the project repository location +my_wd = os.getcwd() +my_repo = "research-and-development" +if not my_wd.endswith(my_repo): + os.chdir(my_repo) + +from src.utils.singleton_boto import SingletonBoto + +config = { + "s3": { + "ssl_file": "/etc/pki/tls/certs/ca-bundle.crt", + "s3_bucket": "onscdp-dev-data01-5320d6ca" + } +} + +boto3_client = SingletonBoto.get_client(config) +import src.utils.s3_mods as mods + + +if __name__ == "__main__": + + my_path = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/2023_staged_BERD_full_responses_24-10-02_v20.csv" +# to_delete_path = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/2023_staged_BERD_full_responses_test_to_delete.csv" + my_dir = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" +# # Checking that file exists + my_size = mods.rd_file_size(my_path) + print(f"File size is {my_size}") + + # Calculating md5sum + my_sum = mods.rd_md5sum(my_path) + expected_output = "ea94424aceecf11c8a70d289e51c34ea" + print(type(my_sum)) + if expected_output == my_sum: + print("Same md5sum") + + # Calculating rd_isdir + mydir = "/bat" + response = mods.rd_isdir(mydir) + + print("Got response") + print(response) + + # Checking rd_isfile + response = mods.rd_isfile(my_path) + print(response) + + # Checking that rd_stat_size works for files and directories + file_size = mods.rd_stat_size(my_path) + print(f"File {my_path} size is {file_size} bytes.") + + dir_size = mods.rd_stat_size(my_dir) + print(f"Directory {my_dir} size is {dir_size} bytes.") + + # Testing rd_read_headerĀ  + response = mods.rd_read_header(my_path) + print(response) + + # Testing rd_write_string_to_file + out_path = "/bat/res_dev/project_data/new_write_string_test_2.txt" + content = "New content" + mods.rd_write_string_to_file(content.encode(encoding="utf-8"), out_path) + print("all done") + + # Testing rd_copy_file + src_path = "/bat/res_dev/project_data/new_write_string_test_2.txt" + dst_path = "/bat/res_dev/" + success = mods.rd_copy_file(src_path, dst_path) + if success: + print("File copied successfully") + else: + print("File not copied successfully") + + + # Testing rd_move_file + src_path = "/bat/res_dev/new_write_string_test_2.txt" + dst_path = "/bat/res_dev/project_data/" + success = mods.rd_move_file(src_path, dst_path) + if success: + print("File moved successfully") + else: + print("File not moved successfully") + + + # Testing rd_search_file + dir_path = "bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" + ending = "24-10-02_v20.csv" + + found_file = mods.rd_search_file(dir_path, ending) + print(f"Found file: {found_file}") + + # Deleting a file +# status = mods.rd_delete_file(my_path) +# if status: +# print(f"File {to_delete_path} successfully deleted") + + # Testing read_excel +# my_path = "bat/res_dev/project_data/test_excel_gz.xlsx" +# df = mods.read_excel(my_path) +# print(df.head()) + \ No newline at end of file diff --git a/src/_version.py b/src/_version.py index 4eabd0b3f..8a124bf64 100644 --- a/src/_version.py +++ b/src/_version.py @@ -1 +1 @@ -__version__ = "2.1.2" +__version__ = "2.2.0" diff --git a/src/dev_config.yaml b/src/dev_config.yaml index 08e7f8afd..a65f8ff2e 100644 --- a/src/dev_config.yaml +++ b/src/dev_config.yaml @@ -8,7 +8,7 @@ global: table_config: "SingleLine" # Environment settings dev_test : False - platform: network #whether to load from hdfs, network (Windows) or s3 (CDP) + platform: network # network #whether to load from hdfs, network (Windows) or s3 (CDP) load_from_feather: False runlog_writer: write_csv: True # Write the runlog to a CSV file diff --git a/src/outputs/export_files.py b/src/outputs/export_files.py index 304b4740c..1b19b3e16 100644 --- a/src/outputs/export_files.py +++ b/src/outputs/export_files.py @@ -131,7 +131,7 @@ def check_files_exist(file_list: List, config: dict, isfile: callable): for file in file_list: file_path = Path(file) # Changes to path if str OutgoingLogger.debug(f"Using {platform} isfile function") - if not isfile(file_path): + if not isfile(str(file_path)): OutgoingLogger.error( f"File {file} does not exist. Check existence and spelling" ) @@ -152,7 +152,7 @@ def transfer_files(source, destination, method, logger, copy_files, move_files): """ transfer_func = {"copy": copy_files, "move": move_files}[method] past_tense = {"copy": "copied", "move": "moved"}[method] - transfer_func(source, destination) + transfer_func(str(source), destination) logger.info(f"Files {source} successfully {past_tense} to {destination}.") @@ -227,10 +227,12 @@ def run_export(user_config_path: str, dev_config_path: str): platform = config["global"]["platform"] if platform == "s3": + # create singletion boto3 client object & pass in bucket string + from src.utils.singleton_boto import SingletonBoto + + boto3_client = SingletonBoto.get_client(config) # noqa from src.utils import s3_mods as mods - # Creating boto3 client and adding it to the config dict - config["client"] = mods.create_client(config) elif platform == "network": # If the platform is "network" or "hdfs", there is no need for a client. # Adding a client = None for consistency. diff --git a/src/pipeline.py b/src/pipeline.py index f275892d7..5ea26e2be 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -53,8 +53,6 @@ def run_pipeline(user_config_path, dev_config_path): boto3_client = SingletonBoto.get_client(config) # noqa from src.utils import s3_mods as mods - # Creating boto3 client and adding it to the config dict - # config["client"] = boto3_client elif platform == "network": # If the platform is "network" or "hdfs", there is no need for a client. # Adding a client = None for consistency. diff --git a/src/user_config.yaml b/src/user_config.yaml index 66381dfb3..2a03fcde8 100644 --- a/src/user_config.yaml +++ b/src/user_config.yaml @@ -151,8 +151,8 @@ export_choices: export_fte_total_qa: None export_status_filtered: None export_frozen_group: None - export_staged_BERD_full_responses: None + export_staged_BERD_full_responses: "2023_staged_BERD_full_responses_24-10-14_v33.csv" export_staged_NI_full_responses: None export_full_responses_imputed: None export_full_estimation_qa: None # "2022_full_estimation_qa_24-07-15_v555.csv" - export_invalid_unrecognised_postcodes: "2022_invalid_unrecognised_postcodes_24-07-04_v503.csv" + export_invalid_unrecognised_postcodes: None # "2022_invalid_unrecognised_postcodes_24-07-04_v503.csv" diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 39a32c558..c36df5e4c 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -15,10 +15,6 @@ To do: Read feather - possibly, not needed Write to feather - possibly, not needed - Copy file - Move file - Compute md5 sum - TBC """ # Standard libraries @@ -28,10 +24,20 @@ # Third party libraries import pandas as pd -from io import StringIO +from io import StringIO, TextIOWrapper, BytesIO + # Local libraries -from rdsa_utils.cdp.helpers.s3_utils import file_exists, create_folder_on_s3 +from rdsa_utils.cdp.helpers.s3_utils import ( + file_exists, + create_folder_on_s3, + delete_file, + is_s3_directory, + copy_file, + move_file, + validate_bucket_name, + validate_s3_file_path, +) from src.utils.singleton_boto import SingletonBoto # from src.utils.singleton_config import SingletonConfig @@ -40,6 +46,7 @@ s3_client = SingletonBoto.get_client() s3_bucket = SingletonBoto.get_bucket() + # Read a CSV file into a Pandas dataframe def rd_read_csv(filepath: str, **kwargs) -> pd.DataFrame: """Reads a csv from s3 bucket into a Pandas Dataframe using boto3. @@ -167,3 +174,312 @@ def rd_write_feather(filepath, df): def rd_read_feather(filepath): """Placeholder Function to read feather file from HDFS""" return None + + +def rd_file_size(filepath: str) -> int: + """Function to check the size of a file on s3 bucket. + + Args: + filepath (string) -- The filepath in s3 bucket + + Returns: + Int - an integer value indicating the size + of the file in bytes + """ + + _response = s3_client.head_object(Bucket=s3_bucket, Key=filepath) + file_size = _response['ContentLength'] + + return file_size + + +def rd_delete_file(filepath: str) -> bool: + """ + Delete a file from s3 bucket. + Args: + filepath (string): The filepath in s3 bucket to be deleted + Returns: + status (bool): True for successfully completed deletion. Else False. + """ + status = delete_file(s3_client, s3_bucket, filepath) + return status + + +def rd_md5sum(filepath: str) -> str: + """ + Get md5sum of a specific file on s3. + Args: + filepath (string): The filepath in s3 bucket. + Returns: + md5result (int): The control sum md5. + """ + + try: + md5result = s3_client.head_object( + Bucket=s3_bucket, + Key=filepath + )['ETag'][1:-1] + except s3_client.exceptions.ClientError as e: + s3_logger.error(f"Failed to compute the md5 checksum: {str(e)}") + md5result = None + return md5result + + +def rd_isdir(dirpath: str) -> bool: + """ + Test if directory exists in s3 bucket. + + Args: + dirpath (string): The "directory" path in s3 bucket. + Returns: + status (bool): True if the dirpath is a directory, false otherwise. + + """ + # The directory name must end with forward slash + if not dirpath.endswith('/'): + dirpath = dirpath + '/' + + # Any slashes at the beginning should be removed + while dirpath.startswith('/'): + dirpath = dirpath[1:] + + # Use the function from rdsa_utils + response = is_s3_directory( + client=s3_client, + bucket_name=s3_bucket, + object_name=dirpath + ) + return response + + +def rd_isfile(filepath: str) -> bool: + """ + Test if given path is a file in s3 bucket. Check that it exists, not a + directory and the size is greater than 0. + + Args: + filepath (string): The "directory" path in s3 bucket. + Returns: + status (bool): True if the dirpath is a directory, false otherwise. + + """ + if filepath is None: + response = False + + if rd_file_exists(filepath): + isdir = rd_isdir(filepath) + size = rd_file_size(filepath) + response = (not isdir) and (size > 0) + else: + response = False + return response + + +def rd_stat_size(path: str) -> int: + """ + Gets the file size of a file or directory in bytes. + Alias of as rd_file_size. + Works for directories, but returns 0 bytes, which is typical for s3. + """ + return rd_file_size(path) + + +def rd_read_header(path: str) -> str: + """ + Reads the first line of a file on s3. Gets the entire file using boto3 get_objects, + converts its body into an input stream, reads the first line and remove the carriage + return character (backslash-n) from the end. + + Args: + filepath (string): The "directory" path in s3 bucket. + + Returns: + status (bool): True if the dirpath is a directory, false otherwise. + """ + # Create an input/output stream pointer, same as open + stream = TextIOWrapper(s3_client.get_object(Bucket=s3_bucket, Key=path)['Body']) + + # Read the first line from the stream + response = stream.readline() + + # Remove the last character (carriage return, or new line) + response = response[:-1] + + return response + + +def rd_write_string_to_file(content: bytes, filepath: str): + """ + Writes a string into the specified file path + """ + + # Put context to a new Input-Output buffer + str_buffer = StringIO(content.decode("utf-8")) + + # "Rewind" the stream to the start of the buffer + str_buffer.seek(0) + + # Write the buffer into the s3 bucket + _ = s3_client.put_object( + Bucket=s3_bucket, Body=str_buffer.getvalue(), Key=filepath + ) + return None + + +def _path_long2short(path: "str") -> str: + """ + Extracts a short file name from the full path. + If there is at least one forward slash, finds the lates slash to the right + and rerurns all characrers after it. + + If there are no slashes, returns the path as is. + """ + if "/" in path: + last_slash = path.rfind("/") + return path[last_slash + 1:] + else: + return path + + +def _remove_end_slashes(path: "str") -> str: + """ + Removes any amount of consequitive forward slashes from a path. + """ + while path.endswith("/"): + path = path[:-1] + + return path + + +def rd_copy_file(src_path: str, dst_path: str) -> bool: + """ + Copy a file from one location to another. Uses rdsa_utils. + If destination path ends with any number of forward slashes, they are + removed. This is needed for the library method copy_file to work correctly. + + Library method copy_file requires that the paths are file paths: + old_dir/old.file and new_dir/new.file. The rd_copy_file takes full file name + with the full file path as a source, and just a directory path as a + destination, like this: old_dir/old.file and new_dir/ or new_dir without the + slash at the end. old.file will become new_dir/old.file, i.e. the file is + copied with the same name, not renamed. + Supplementary function _path_long2short decouples old.file from the full + source path and "glues it" to the end of destination path. + + Args: + src_path (string): Full path of the source file, not including the + bucket name, but including the quasi-directories and slashes preceding + the file name. + + dst_path (string): Full path of the destination directory, not including + bucket name, but including the quasi-directories and slashes preceding + the file name. It must be a directory, not a file. I + + Returns: + status (bool): True if copying was successful, False otherwise. + """ + + # If destination ends with any number of slashes, they are removed + dst_path = _remove_end_slashes(dst_path) + + # Disconnect the source file name from the full source path and adds it tp + # the end of destination directory, separated by one forward slash. + dst_path += "/" + _path_long2short(src_path) + + success = copy_file( + client=s3_client, + source_bucket_name=s3_bucket, + source_object_name=src_path, + destination_bucket_name=s3_bucket, + destination_object_name=dst_path, + ) + return success + + +def rd_move_file(src_path: str, dst_path: str) -> bool: + """ + Move a file from one location to another. Uses rdsa_utils. + + """ + dst_path = _remove_end_slashes(dst_path) + dst_path += "/" + _path_long2short(src_path) + success = move_file( + client=s3_client, + source_bucket_name=s3_bucket, + source_object_name=src_path, + destination_bucket_name=s3_bucket, + destination_object_name=dst_path + ) + return success + + +def s3walk(locations: list, prefix: str) -> tuple: + """ + Mimics the functionality of os.walk in s3 bucket using long filenames with slashes. + Recursively goes through the long filenames and splits it into "locations" - + subdirectories, and "files" - short file names. + + Args: + locations (list): a list of s3 locations that can be "directories" + prefix (str): Name of "subdirectory" of root where further locations + will be found. + + Returns: + A tuple of (root, (subdir, files)). + """ + # recursively add location to roots starting from prefix + def processLocation(root, prefixLocal, location): + # add new root location if not available + if prefixLocal not in root: + root[prefixLocal] = (set(), set()) + # check how many folders are available after prefix + remainder = location[len(prefixLocal):] + structure = remainder.split('/') + + # If we are not yet in the folder of the file we need to continue with + # a larger prefix + if len(structure) > 1: + # add folder dir + root[prefixLocal][0].add(structure[0]) + # make sure file is added allong the way + processLocation(root, prefixLocal + '/' + structure[0], location) + else: + # add to file + root[prefixLocal][1].add(structure[0]) + + root = {} + for location in locations: + processLocation(root, prefix, location) + + return root.items() + + +def rd_search_file(dir_path: str, ending: str) -> str: + """Find a file in a directory with a specific ending. + + Args: + dir_path (str): s3 "directory" where to search for files + ending (str): File name ending to search for. + Returns: + Full file name that ends with the given string. + + """ + target_file = None + + # Remove preceding forward slashes if needed + while dir_path.startswith("/"): + dir_path = dir_path[1:] + + # get list of objects with prefix + response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=dir_path) + + # retrieve key values + locations = [object['Key'] for object in response['Contents']] + + for _, (__, files) in s3walk(locations, dir_path): + for file in files: + + # Check for ending + if file.endswith(ending): + target_file = str(file) + return target_file