diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py index 1d268875f..4075f40be 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_file_processor.py @@ -83,6 +83,7 @@ def process_file(self, f_name: str) -> None: self.last_extension = name_extension[1] else: out_files, stats = self.transform.transform(folder_name=f_name) + self.last_file_name = f_name self.logger.debug(f"Done transforming file {f_name}, got {len(out_files)} files") # save results self._submit_file(t_start=t_start, out_files=out_files, stats=stats) @@ -148,15 +149,21 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats ) case 1: # we have exactly 1 output file - file_ext = out_files[0] - lfn = self.last_file_name - if self.last_file_name_next_index is not None: - lfn = f"{lfn}_{self.last_file_name_next_index}" - output_name = self.data_access.get_output_location(path=f"{lfn}{file_ext[1]}") + if self.is_folder: + # its folder + output_name = out_files[0][1] + dt = out_files[0][0] + else: + file_ext = out_files[0] + lfn = self.last_file_name + if self.last_file_name_next_index is not None: + lfn = f"{lfn}_{self.last_file_name_next_index}" + output_name = self.data_access.get_output_location(path=f"{lfn}{file_ext[1]}") + dt = file_ext[0] self.logger.debug( f"Writing transformed file {self.last_file_name}{self.last_extension} to {output_name}" ) - save_res, retries = self.data_access.save_file(path=output_name, data=file_ext[0]) + save_res, retries = self.data_access.save_file(path=output_name, data=dt) if retries > 0: self._publish_stats({"data access retries": retries}) if save_res is None: @@ -166,7 +173,7 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats self._publish_stats( { "result_files": 1, - "result_size": len(file_ext[0]), + "result_size": len(dt), "processing_time": time.time() - t_start, } ) @@ -183,14 +190,21 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats start_index = 0 count = len(out_files) for index in range(count): - file_ext = out_files[index] - output_name_indexed = f"{output_file_name}_{start_index + index}{file_ext[1]}" - file_sizes += len(file_ext[0]) - self.logger.debug( - f"Writing transformed file {self.last_file_name}{self.last_extension}, {index + 1} " - f"of {count} to {output_name_indexed}" - ) - save_res, retries = self.data_access.save_file(path=output_name_indexed, data=file_ext[0]) + if self.is_folder: + # its a folder + output_name_indexed = out_files[index][1] + dt = out_files[index][0] + else: + # files + file_ext = out_files[index] + output_name_indexed = f"{output_file_name}_{start_index + index}{file_ext[1]}" + self.logger.debug( + f"Writing transformed file {self.last_file_name}{self.last_extension}, {index + 1} " + f"of {count} to {output_name_indexed}" + ) + dt = file_ext[0] + file_sizes += len(dt) + save_res, retries = self.data_access.save_file(path=output_name_indexed, data=dt) if retries > 0: self._publish_stats({"data access retries": retries}) if save_res is None: diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py index 0e90f7ffd..04d6f3b0f 100644 --- a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py @@ -1,6 +1,11 @@ -from .table_transform_test import AbstractTableTransformTest -from .binary_transform_test import AbstractBinaryTransformTest -from .noop_transform import ( +from data_processing.test_support.transform.table_transform_test import AbstractTableTransformTest +from data_processing.test_support.transform.binary_transform_test import AbstractBinaryTransformTest +from data_processing.test_support.transform.noop_transform import ( NOOPTransform, - NOOPPythonTransformConfiguration, + NOOPTransformConfiguration, + NOOPPythonTransformConfiguration ) +from data_processing.test_support.transform.noop_folder_transform import ( + NOOPFolderTransform, + NOOPFolderPythonTransformConfiguration +) \ No newline at end of file diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/noop_folder_transform.py b/data-processing-lib/python/src/data_processing/test_support/transform/noop_folder_transform.py new file mode 100644 index 000000000..5baab7858 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/test_support/transform/noop_folder_transform.py @@ -0,0 +1,105 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import time +from typing import Any + +from data_processing.data_access import DataAccess +from data_processing.runtime.pure_python import ( + PythonTransformLauncher, + PythonTransformRuntimeConfiguration, + DefaultPythonTransformRuntime) +from data_processing.transform import AbstractFolderTransform +from data_processing.utils import get_logger +from data_processing.test_support.transform import NOOPTransformConfiguration + + +logger = get_logger(__name__) + + +class NOOPFolderTransform(AbstractFolderTransform): + """ + Implements a simple copy of a pyarrow Table. + """ + + def __init__(self, config: dict[str, Any]): + """ + Initialize based on the dictionary of configuration information. + This is generally called with configuration parsed from the CLI arguments defined + by the companion runtime, NOOPTransformRuntime. If running inside the RayMutatingDriver, + these will be provided by that class with help from the RayMutatingDriver. + """ + # Make sure that the param name corresponds to the name used in apply_input_params method + # of NOOPTransformConfiguration class + super().__init__(config) + self.sleep = config.get("sleep_sec", 1) + self.data_access = config.get("data_access") + + def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: + """ + Converts input folder into o or more output files. + If there is an error, an exception must be raised - exit()ing is not generally allowed. + :param folder_name: the name of the folder containing arbitrary amount of files. + :return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated + to metadata. Each element of the return list, is a tuple of the transformed bytes and a string + holding the file name to use. + """ + logger.debug(f"Transforming one folder {folder_name}") + metadata = {} + # get folder files + files, retries = self.data_access.get_folder_files(path=folder_name) + if retries > 0: + metadata |= {"data access retries": retries} + result = [()] * len(files) + index = 0 + for name, file in files.items(): + result[index] = (file, self.data_access.get_output_location(name)) + if self.sleep is not None: + logger.info(f"Sleep for {self.sleep} seconds") + time.sleep(self.sleep) + logger.info("Sleep completed - continue") + index += 1 + # Add some sample metadata. + metadata |= {"nfiles": len(files)} + return result, metadata + + +class NOOPFolderPythonRuntime(DefaultPythonTransformRuntime): + def get_folders(self, data_access: DataAccess) -> list[str]: + """ + Get folders to process + :param data_access: data access + :return: list of folders to process + """ + return [data_access.get_input_folder()] + + +class NOOPFolderPythonTransformConfiguration(PythonTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config=NOOPTransformConfiguration(clazz=NOOPFolderTransform), + runtime_class=NOOPFolderPythonRuntime) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = PythonTransformLauncher(NOOPFolderPythonTransformConfiguration()) + logger.info("Launching noop transform") + launcher.launch() diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/noop_transform.py b/data-processing-lib/python/src/data_processing/test_support/transform/noop_transform.py index 0dee013a4..2fea35506 100644 --- a/data-processing-lib/python/src/data_processing/test_support/transform/noop_transform.py +++ b/data-processing-lib/python/src/data_processing/test_support/transform/noop_transform.py @@ -19,7 +19,7 @@ from data_processing.runtime.pure_python.runtime_configuration import ( PythonTransformRuntimeConfiguration, ) -from data_processing.transform import AbstractTableTransform, TransformConfiguration +from data_processing.transform import AbstractTableTransform, TransformConfiguration, AbstractTransform from data_processing.utils import CLIArgumentProvider, get_logger @@ -75,10 +75,10 @@ class NOOPTransformConfiguration(TransformConfiguration): configuration with CLI args. """ - def __init__(self): + def __init__(self, clazz: type[AbstractTransform] = NOOPTransform): super().__init__( name=short_name, - transform_class=NOOPTransform, + transform_class=clazz, remove_from_metadata=[pwd_key], ) diff --git a/data-processing-lib/python/src/data_processing/transform/folder_transform.py b/data-processing-lib/python/src/data_processing/transform/folder_transform.py index 9a2fb3713..caa3bfa52 100644 --- a/data-processing-lib/python/src/data_processing/transform/folder_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/folder_transform.py @@ -35,6 +35,6 @@ def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str :param folder_name: the name of the folder containing arbitrary amount of files. :return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated to metadata. Each element of the return list, is a tuple of the transformed bytes and a string - holding the extension to be used when writing out the new bytes. + holding the file name to use. """ raise NotImplemented() \ No newline at end of file diff --git a/data-processing-lib/python/src/data_processing/transform/transform_configuration.py b/data-processing-lib/python/src/data_processing/transform/transform_configuration.py index 033e92f2a..a5c9ec9ad 100644 --- a/data-processing-lib/python/src/data_processing/transform/transform_configuration.py +++ b/data-processing-lib/python/src/data_processing/transform/transform_configuration.py @@ -13,7 +13,7 @@ from argparse import ArgumentParser from typing import Any -from data_processing.transform import AbstractBinaryTransform +from data_processing.transform import AbstractTransform from data_processing.utils import CLIArgumentProvider @@ -23,7 +23,7 @@ class TransformConfiguration(CLIArgumentProvider): """ def __init__( - self, name: str, transform_class: type[AbstractBinaryTransform], remove_from_metadata: list[str] = [] + self, name: str, transform_class: type[AbstractTransform], remove_from_metadata: list[str] = [] ): """ Initialization @@ -36,7 +36,7 @@ def __init__( self.remove_from_metadata = remove_from_metadata self.params = {} - def get_transform_class(self) -> type[AbstractBinaryTransform]: + def get_transform_class(self) -> type[AbstractTransform]: """ Get the class extending AbstractBinaryTransform which implements a specific transformation. The class will generally be instantiated with a dictionary of configuration produced by diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_folders_noop.py b/data-processing-lib/python/test/data_processing_tests/transform/test_folders_noop.py new file mode 100644 index 000000000..e0fdd86c8 --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_folders_noop.py @@ -0,0 +1,33 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.transform import NOOPFolderPythonTransformConfiguration + + +class TestRayNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + basedir = "../../../test-data/data_processing/python/noop/" + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir)) + launcher = PythonTransformLauncher(NOOPFolderPythonTransformConfiguration()) + fixtures = [(launcher, {"noop_sleep_sec": 0}, basedir + "/input", basedir + "/expected")] + return fixtures diff --git a/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py index d4cc874f0..e706a4dfa 100644 --- a/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py +++ b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py @@ -12,7 +12,6 @@ import os -import pyarrow as pa from data_processing.test_support.launch.transform_test import ( AbstractTransformLauncherTest, ) @@ -20,11 +19,6 @@ from data_processing_ray.test_support.transform import NOOPRayTransformConfiguration -table = pa.Table.from_pydict({"name": pa.array(["Tom"]), "age": pa.array([23])}) -expected_table = table # We're a noop after all. -expected_metadata_list = [{"nfiles": 1, "nrows": 1}, {}] # transform() result # flush() result - - class TestRayNOOPTransform(AbstractTransformLauncherTest): """ Extends the super-class to define the test data for the tests defined there. diff --git a/transforms/universal/ededup/ray/src/ededup_transform_ray.py b/transforms/universal/ededup/ray/src/ededup_transform_ray.py index c0823a22e..d90dfa780 100644 --- a/transforms/universal/ededup/ray/src/ededup_transform_ray.py +++ b/transforms/universal/ededup/ray/src/ededup_transform_ray.py @@ -149,13 +149,12 @@ def _load_snapshots(self, data_access_factory: DataAccessFactoryBase, statistics statistics.add_stats.remote({"data access retries": retries}) self.logger.info(f"Found the following snapshot files {files.keys()}") # process snapshot files - for file in files.keys(): - # load the file + for file in files.values(): + # convert the file try: - b_hashes, _ = data_access.get_file(file) - snaps = pickle.loads(b_hashes) + snaps = pickle.loads(file) except Exception as e: - self.logger.warning(f"Failed to load hashes from file {file} with exception {e}") + self.logger.warning(f"Failed to load hashes with exception {e}") raise UnrecoverableException("failed to load hashes") request = [[] for _ in range(len(self.filters))] for h in snaps: