In this example, we implement a noop transform that takes no action on the input datum and returns it unmodified - a no operation (noop). This effectively enables a copy of a directory tree of files to an output directory. This is functionally not too powerful, but allows us to focus on the minimum requirements for a transform.
NOTE: What follows is a discussion of pyarrow Table transform that
will run in either the Ray or Python runtimes.
Mapping the tutorial to byte arrays would use the
AbstractBinaryTransform
instead of AbstractTableTransform
(a sub-class of the former).
Mapping the tutorial to a Spark runtime would use
AbstractSparkTransform
instead of AbstractTableTransform
and use DataFrame
instead of pyarrow Table as
the DATA
type. In addition, the
SparkTransformLauncher
would be used in place of the RayTransformLauncher
and PythonTransformLauncher
shown below.
That said, we will show the following:
- How to write the noop transform to generate the output table.
- How to define transform-specific metadata that can be associated with each table transformation and aggregated across all transformations in a single run of the transform.
- How to define command line arguments that can be used to configure the operation of our noop transform.
We will not be showing the following:
- The creation of a custom
TransformRuntime
that would enable more global state and/or coordination among the transforms running in other Ray actors. This will be covered in an advanced tutorial.
The complete task involves the following:
noop_main.py
- a empty file to start writing code as described belowNOOPTransform
- class that implements the specific transformationNOOPTableTransformConfiguration
- class that provides configuration for theNOOPTransform
, specifically the command line arguments used to configure it.main()
- simple creation and use of theTransformLauncher
.
(Currently, the complete code for the noop transform used for this tutorial can be found in the noop transform directory.
Finally, we show how to use the command line to run the transform in a local ray cluster.
Note: You will need to run the setup commands in the
README
before running the following examples.
First, let's define the transform class. To do this we extend
the base abstract/interface class
AbstractTableTransform
,
which requires definition of the following:
- an initializer (i.e.
init()
) that accepts a dictionary of configuration data. For this example, the configuration data will only be defined by command line arguments (defined below). - the
transform()
method itself that takes an input table and produces an output table with any associated metadata for that table transformation.
Other methods such as flush()
need not be overridden/redefined for this simple example.
We start with the simple definition of the class, its initializer and the imports required by subsequent code:
import time
from argparse import ArgumentParser, Namespace
from typing import Any
import pyarrow as pa
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing_ray.runtime.ray.runtime_configuration import (
RayTransformRuntimeConfiguration,
)
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.utils import CLIArgumentProvider, get_logger
class NOOPTransform(AbstractTableTransform):
def __init__(self, config: dict[str, Any]):
self.sleep = config.get("sleep", 1)
The NOOPTransform
class extends the AbstractTableTransform
, which defines the required methods.
For purposes of the tutorial and to simulate a more complex processing
job, our initializer allows our transform to be configurable
with an amount of seconds to sleep/delay during the call to transform()
.
Configuration is provided by the framework in a dictionary provided to the initializer.
Below we will cover how this sleep
argument is made available to the initializer.
Note that in more complex transforms that might, for example, load a Hugging Face or other model, or perform other deep initializations, these can be done in the initializer.
Next we define the transform()
method itself, which includes the addition of some
almost trivial metadata.
def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
if self.sleep is not None:
time.sleep(self.sleep)
# Add some sample metadata.
metadata = {"nfiles": 1, "nrows": len(table)}
return [table], metadata
The single input to this method is the in-memory pyarrow table to be transformed. The return value of this method is a list of tables and optional metadata. In this case, we are doing a simple 1:1 table conversion, so the list will contain a single table, the input table. The metadata is a free-form dictionary of keys with numeric values that will be aggregated by the framework and reported as aggregated job statistics metadata. If there is no metadata then simply return an empty dictionary.
Next we define the NOOPTransformConfiguration
class and its initializer that defines the following:
- The short name for the transform
- The class implementing the transform - in our case
NOOPTransform
- Command line argument support.
We also define the NOOPRayTransformationConfiguration
so we can run the transform
in the Ray runtime as well. This adds allows the option to add a transform-specific
Ray runtime class allowing more complex distributed memory and data sharing operations.
The NOOP transform will not make use of this so is a simple extension.:
First we define the pure python transform configuration class and its initializer,
short_name = "noop"
cli_prefix = f"{short_name}_"
sleep_key = "sleep_sec"
pwd_key = "pwd"
sleep_cli_param = f"{cli_prefix}{sleep_key}"
pwd_cli_param = f"{cli_prefix}{pwd_key}"
class NOOPTransformConfiguration(TransformConfiguration):
def __init__(self):
super().__init__(
name=short_name,
transform_class=NOOPTransform,
remove_from_metadata=[pwd_key],
)
The initializer extends the TransformConfiguration
that provides simple
capture of our configuration data and enables the ability to pickle through the network.
It also adds a params
field that will be used below to hold the transform's
configuration data (used in NOOPTransform.init()
above).
Next, we provide two methods that define and capture the command line configuration that
is specific to the NOOPTransform
, in this case the parameters are the number of seconds to sleep during transformation
and an example command line parameter, pwd
("password"), option holding sensitive data that we don't want reported
in the job metadata produced by the Ray orchestrator.
The first method establishes the command line arguments.
It is given a global argument parser to which the NOOPTransform
arguments are added.
It is a good practice to include a common prefix to all transform-specific options (i.e. pii, lang, etc).
In our case we will use noop_
.
def add_input_params(self, parser: ArgumentParser) -> None:
parser.add_argument(
f"--{sleep_cli_param}",
type=int,
default=1,
help="Sleep actor for a number of seconds while processing the data frame, before writing the file to COS",
)
parser.add_argument(
f"--{pwd_cli_param}",
type=str,
default="nothing",
help="A dummy password which should be filtered out of the metadata",
)
Next we implement a method that is called after the CLI args are parsed (usually by one
of the runtimes) and which allows us to capture the NOOPTransform
-specific arguments.
def apply_input_params(self, args: Namespace) -> bool:
captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False)
if captured.get(sleep_key) < 0:
print(f"Parameter noop_sleep_sec should be non-negative. you specified {args.noop_sleep_sec}")
return False
self.params = captured
return True
To run the transform on a set of input data, we use one of the runtimes, each described below.
To run in the python runtime, we need to create the instance of PythonTransformLauncher
using the NOOPTransformConfiguration
, and launch it as follows:
from data_processing.runtime.pure_python import PythonTransformLauncher
if __name__ == "__main__":
launcher = PythonTransformLauncher(runtime_config=NOOPTransformConfiguration())
launcher.launch()
Assuming the above main
code is placed in noop_main.py
we can run the transform on some test data. We'll use data in the repo for the noop transform
and create a temporary directory to hold the output:
export DPK_REPOROOT=...
export NOOP_INPUT=$DPK_REPOROOT/transforms/universal/noop/python/test-data/input
To run
python noop_main.py --noop_sleep_sec 2 \
--data_local_config "{'input_folder': '"$NOOP_INPUT"', 'output_folder': '/tmp/noop-output'}"
See the python launcher options for a complete list of transform-independent command line options.
To run in the Ray runtime, instead of creating the PythonTransformLauncher
we use the RayTransformLauncher
.
as follows:
class NOOPRayTransformConfiguration(RayTransformRuntimeConfiguration):
def __init__(self):
super().__init__(transform_config=NOOPTransformConfiguration())
from data_processing_ray.runtime.ray import RayTransformLauncher
if __name__ == "__main__":
launcher = RayTransformLauncher(runtime_config=NOOPRayTransformConfiguration())
launcher.launch()
We can run this with the same command as for the python runtime but to run in local Ray
add the --run_locally True
option.
python noop_main.py --noop_sleep_sec 2 \
--data_local_config "{'input_folder': '"$NOOP_INPUT"', 'output_folder': '/tmp/noop-output'}" --run_locally True
which will start local ray instance ( ray should be pre installed). See the ray launcher options for a complete list of transform-independent command line options.