Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuzzy dedup #699

Open
wants to merge 86 commits into
base: dev
Choose a base branch
from
Open

Fuzzy dedup #699

wants to merge 86 commits into from

Conversation

Kibnelson
Copy link
Collaborator

Why are these changes needed?

Provide fuzzy dedup implementation for Python, Spark and Ray

Related issue number (if any).

#152
#79

blublinsky and others added 30 commits October 10, 2024 19:05
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
@cmadam cmadam requested a review from daw3rd November 13, 2024 01:29

# Copy and install data processing libraries
# These are expected to be placed in the docker context before this is run (see the make image).
COPY --chown=dpk:root data-processing-lib-python/ data-processing-lib-python/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new makefile will copy the whl file (not the source) to the context folder

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 6cc18cd

COPY src/ src/

# copy source data
COPY ./src/signature_calc_transform_python.py fdedup_transform_python.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Questionable practice!!! Can we find an alternative ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on renaming, not the move

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why is signature_calc_transform the main entry point? Shouldn't it be fuzzy_dedup_python.py?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit fa5959b

{ name = "Nelson Bore", email = "[email protected]" },
{ name = "Constantin Adam", email = "[email protected]" },
]
dependencies = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move to requirements.txt and use dynamic dependencies in the pyproject.toml

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 528457c

@@ -13,24 +12,30 @@ COPY --chown=ray:users data-processing-lib-python/ data-processing-lib-python/
RUN cd data-processing-lib-python && pip install --no-cache-dir -e .
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be getting the wheel instead of the source.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 6cc18cd

]
dependencies = [
"dpk_fdedup_transform_python==0.2.2.dev1",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to requirements.txt and also use dynamic requirements

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 528457c

]
dependencies = [
"dpk_fdedup_transform_python==0.2.2.dev1",
"data-prep-toolkit-ray==0.2.2.dev1",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be using data-prep-toolkit[ray] and also once the branch is updated with dev, this will change to 0.2.2.dev2

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 2f61be7


# Copy in the data processing framework source/project and install it
# This is expected to be placed in the docker context before this is run (see the make image).
COPY --chown=spark:root data-processing-lib-python/ data-processing-lib-python/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be getting the wheel and installing extra for ray and spark

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 6cc18cd

]
dependencies = [
"dpk_fdedup_transform_python==0.2.2.dev1",
"data-prep-toolkit-spark==0.2.2.dev1",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use data-prep-toolkit[spark]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 6cc18cd

@@ -0,0 +1,10 @@
pyarrow==16.1.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need it here? This is already. installed by the runtime and it will help to have a single place for this dependency

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, I removed pyarrow from all requirements.txt files.

Copy link
Collaborator

@touma-I touma-I left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to update the branch from dev and resolve any conflicts first before tackling some of these points.

@@ -0,0 +1,8 @@
pyspark
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use >= or <= for the dependencies. Getting the latest is not a good idea.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 528457c

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be part of the primary Makefile ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more words here to provide a gentle introduction would be nice. In addition, you need to describe all of the configuration keys. See doc_chunk for a template.

Copy link
Collaborator

@cmadam cmadam Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still working on the documentation.

transforms/universal/fdedup/python/pyproject.toml Outdated Show resolved Hide resolved
transforms/universal/fdedup/python/pyproject.toml Outdated Show resolved Hide resolved
jaccard_similarity_threshold_key, jaccard_similarity_threshold_default
)
self.sort_output = config.get(sort_output_key, sort_output_default)
self.data_access = config.get("data_access")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If data_access is not provided, what happens. Either throw an exception, or use DataAccessLocal() as the default?

folder_name = f"{folder_name}/"
return folder_name

def consolidate_band_segment_files(self, files: dict[str, bytes]) -> tuple[pl.DataFrame, dict[str, Any]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should hide all these internal methods by prefixing them with _ or __.

f"--{sort_output_cli_param}",
type=bool,
default=sort_output_default,
help="Sort",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort all output or within a file and sort by what?

Copy link
Member

@daw3rd daw3rd Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think you should rename this file to fdedup_transform_python.py. By convention, this indicates that this file provides the main() entry point to the transform execution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit fa5959b

parser.add_argument(
"--document_id_column", type=str, required=False, help="name of the column that stores document text"
)
parser.add_argument("--seed", type=int, required=False, help="name of the column that stores document text")
Copy link
Member

@daw3rd daw3rd Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document_id and seed seem to have the wrong help text.

super().__init__(params=params)
self.logger = get_logger(__name__)

def get_folders(self, data_access: DataAccess) -> list[str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This get_folders method should be defined as an @abstractmethod in a new super-class for this framework. I wouldn't suggest this for one class, but you seem to using the pattern in your other transforms.

super().__init__(
name=short_name,
transform_class=SignatureCalculationTransform,
remove_from_metadata=[sigcalc_data_factory_key],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this is the right key. It needs to be the key corresponding to the command line parameter for the s3 credentials. I'm not sure but I believe in this case is scdata_data_s3_cred. See https://github.com/IBM/data-prep-kit/blob/dev/data-processing-lib/doc/ray-launcher-options.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check in the metadata.json when using S3 to see what shows up there.


if __name__ == "__main__":
launcher = PythonTransformLauncher(SignatureCalculationTransformConfiguration())
logger.info("Launching noop transform")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noop. Maybe just remove this line.


# create parameters
input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "expected/cluster_analysis"))
output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "expected"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should never write directly to test-data. Write somewhere else (output is the convention) then manually copy if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit fa5959b

self.docs_to_remove_df = self.docs_to_remove_df.rename({"docs_to_remove": self.document_id_column})

def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
self.logger.info(f"Transforming table with {table.num_rows} rows from file {file_name}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe make this debug so the kfp log doesn't get overwhelmed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only log message I output in this transform, and it is invoked once per file.

super().__init__(
name=short_name,
transform_class=transform_class,
remove_from_metadata=[dataclean_data_factory_key],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as elsewhere about the correct key to pass here


self.logger = get_logger(__name__)

def get_transform_config(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function looks like a duplicate of that in DataCleaningPythonRuntime. Can you make a shared method somehow, either as a global or in a shared superclass?

super().__init__(params=params)
self.logger = get_logger(__name__)

def get_transform_config(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this function looks like a duplicate of that in DataCleaningPythonRuntime. Can you make a shared method somehow, either as a global or in a shared superclass?

Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
Signed-off-by: Constantin M Adam <[email protected]>
ComponentUtils.add_settings_to_component(execute_data_cleaning_job, ONE_WEEK_SEC)
# FIXME: see https://github.com/kubeflow/pipelines/issues/10914
if os.getenv("KFPv2", "0") != "1":
ComponentUtils.set_s3_env_vars_to_component(execute_data_cleaning_job, data_s3_access_secret)
Copy link
Collaborator

@revit13 revit13 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, in KFP v2 the secret name is hard coded (in kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py) as a workaround for kubeflow/pipelines#10914. Thus, this pipeline is not expected to run on kfp V2. Should it be added to the blacklist, given that we currently can't restrict it to run only for v1 in the CI/CD tests? @roytman what do you think? Thanks


## Summary

The basic implementation of the fuzzy dedup is based on [MinHash](https://en.wikipedia.org/wiki/MinHash). Also see
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive me if this is a duplicate comment as I thought I had submitted once already, but...

  1. A more gentle introduction to what the transform does instead of only providing the links.
  2. The set of configuration keys should be documented. See doc_chunk for a nice example.
  3. This file needs to be linked from a ../README.md, which now only points to ray and python.

)


if __name__ == "__main__":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need these sub-transform main()s? They are not exposed in the Dockerfile (in the home dir) so are not even directly callable (in the standard way). Do we ever need to run this sub-transform manually outside of fdedup orchestrator/transform? If so, then it should be promoted to the home directory in the Dockerfile, otherwise maybe delete main()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main() in all these transforms is needed by the kfp pipeline here:
https://github.com/IBM/data-prep-kit/blob/4941d5bab37a0bdc1e5873ce8e7288483703751f/transforms/universal/fdedup/kfp_ray/fdedup_wf.py#L30C1-L35C1
If I remove these, the pipeline will stop working

COPY src/ src/

# copy source data
COPY ./src/signature_calc_transform_python.py fdedup_transform_python.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why is signature_calc_transform the main entry point? Shouldn't it be fuzzy_dedup_python.py?



if __name__ == "__main__":
# launcher = NOOPRayLauncher()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOOP, although per earlier common, do we need the main() for the sub-transforms orchestrated by fdedup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention, this file should be named cluster_analysis_s3_spark.py

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention, this should be named data_cleaning_s3_spark.py

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention, this should be named fuzzy_dedup_s3_spark.py

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention...signature_calc_s3_spark.py

$(PIP) install --upgrade pip; \
$(PIP) install -r requirements.txt; \
fi
set-versions:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you don't need this if you've renamed the file. If you go back to Makefile, you will also need to add

clean test build publish

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants