From c5766c1a5a3daa0e8fd12e9dcbbb293690d591df Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 4 Sep 2024 21:50:00 +0530 Subject: [PATCH 01/21] define config class for expectations --- .../data_validators/expectations.py | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 src/zenml/integrations/great_expectations/data_validators/expectations.py diff --git a/src/zenml/integrations/great_expectations/data_validators/expectations.py b/src/zenml/integrations/great_expectations/data_validators/expectations.py new file mode 100644 index 00000000000..10d71a6ce10 --- /dev/null +++ b/src/zenml/integrations/great_expectations/data_validators/expectations.py @@ -0,0 +1,92 @@ +# Copyright (c) ZenML GmbH 2022. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Definition of the Deepchecks validation check types.""" + +from typing import Any, Dict, Type + +from great_expectations.expectations.expectation import Expectation + +from pydantic import BaseModel + +from zenml.logger import get_logger +from zenml.utils import source_utils + +logger = get_logger(__name__) + + +class GreatExpectationExpectationConfig(BaseModel): + """Great Expectation expectation configuration. + + This class defines the configuration parameters that can be used to + customize the behavior of a Great Expectation Expectation. This includes + the expectation name (in the right case), and any input options that the + expectation expects. This could be the column name, the value to + compare against, etc. + + You can also choose to pass in parameters as the value to an input option. + You can then supply the parameters as a dictionary to the expectation_parameters + field. But this is not particularly useful when you are defining the expectations + directly with ZenML. The value of defining parameters in expectations and + passing the values at runtime is more pronounced when you already have a suite + of expectations defined and you use this suite with ZenML, with different + parameters. + + Attributes: + expectation_name: The name of the Great Expectation expectation to apply. + kwargs: Additional keyword arguments to pass to the expectation. + """ + expectation_name: str + kwargs: Dict[str, Any] = {}\ + + @staticmethod + def get_expectation_class(expectation_name: str) -> Type[Expectation]: + """Get the Great Expectation expectation class associated with this config. + + Returns: + The Great Expectation expectation class associated with this config. + + Raises: + TypeError: If the expectation name could not be converted to a valid + Great Expectation expectation class. This can happen for example + if the expectation name does not map to a valid Great Expectation + expectation class. + """ + return source_utils.load_and_validate_class( + f"great_expectations.expectations.{expectation_name}", + expected_class=Expectation, + ) + + def get_expectation(self) -> Expectation: + """Get the Great Expectation expectation object associated with this config. + + Returns: + The Great Expectation expectation object associated with this config. + """ + try: + expectation_class = self.get_expectation_class(self.expectation_name) + expectation = expectation_class(**self.kwargs) + except TypeError: + raise ValueError( + f"Could not map the `{self.expectation_name}` expectation " + f"identifier to a valid Great Expectation expectation class." + ) + except Exception as e: + raise ValueError( + f"An error occurred while trying to instantiate the " + f"`{self.expectation_name}` expectation class " + f"with the following parameters: {self.kwargs}" + f"Exception: {str(e)}" + ) + + return expectation From 830e1c2980b5a90efe6720612be81586c0d745bb Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 4 Sep 2024 21:50:55 +0530 Subject: [PATCH 02/21] update great expectation implementation --- .../data_validators/ge_data_validator.py | 225 +++++------------- .../integrations/great_expectations/utils.py | 43 +--- 2 files changed, 70 insertions(+), 198 deletions(-) diff --git a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py index 0eb8f7a590c..1597cb2a3bb 100644 --- a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py +++ b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py @@ -17,7 +17,8 @@ from typing import Any, ClassVar, Dict, List, Optional, Sequence, Type, cast import pandas as pd -from great_expectations.checkpoint.types.checkpoint_result import ( # type: ignore[import-untyped] +import great_expectations as ge +from great_expectations.checkpoint.checkpoint import ( # type: ignore[import-untyped] CheckpointResult, ) from great_expectations.core import ( # type: ignore[import-untyped] @@ -38,13 +39,12 @@ from great_expectations.data_context.types.resource_identifiers import ( ExpectationSuiteIdentifier, ) -from great_expectations.profile.user_configurable_profiler import ( # type: ignore[import-untyped] - UserConfigurableProfiler, -) +from great_expectations.profile.user_configurable_profiler import # type: ignore[import-untyped] from zenml import get_step_context from zenml.client import Client from zenml.data_validators import BaseDataValidator, BaseDataValidatorFlavor +from zenml.integrations.great_expectations.data_validators.expectations import GreatExpectationExpectationConfig from zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor import ( GreatExpectationsDataValidatorConfig, GreatExpectationsDataValidatorFlavor, @@ -52,7 +52,7 @@ from zenml.integrations.great_expectations.ge_store_backend import ( ZenMLArtifactStoreBackend, ) -from zenml.integrations.great_expectations.utils import create_batch_request +from zenml.integrations.great_expectations.utils import create_batch_definition, create_batch_request from zenml.io import fileio from zenml.logger import get_logger from zenml.utils import io_utils @@ -200,8 +200,6 @@ def data_context(self) -> AbstractDataContext: expectations_store_name = "zenml_expectations_store" validations_store_name = "zenml_validations_store" checkpoint_store_name = "zenml_checkpoint_store" - profiler_store_name = "zenml_profiler_store" - evaluation_parameter_store_name = "evaluation_parameter_store" # Define default configuration options that plug the GX stores # in the active ZenML artifact store @@ -216,18 +214,10 @@ def data_context(self) -> AbstractDataContext: checkpoint_store_name: self.get_store_config( "CheckpointStore", "checkpoints" ), - profiler_store_name: self.get_store_config( - "ProfilerStore", "profilers" - ), - evaluation_parameter_store_name: { - "class_name": "EvaluationParameterStore" - }, }, expectations_store_name=expectations_store_name, validations_store_name=validations_store_name, checkpoint_store_name=checkpoint_store_name, - profiler_store_name=profiler_store_name, - evaluation_parameter_store_name=evaluation_parameter_store_name, data_docs_sites={ "zenml_artifact_store": self.get_data_docs_config( "data_docs" @@ -267,16 +257,12 @@ def data_context(self) -> AbstractDataContext: self._context.config.expectations_store_name = ( expectations_store_name ) - self._context.config.validations_store_name = ( + self._context.config.validation_results_store_name = ( validations_store_name ) self._context.config.checkpoint_store_name = ( checkpoint_store_name ) - self._context.config.profiler_store_name = profiler_store_name - self._context.config.evaluation_parameter_store_name = ( - evaluation_parameter_store_name - ) for store_name, store_config in zenml_context_config[ "stores" ].items(): @@ -323,126 +309,16 @@ def root_directory(self) -> str: return path - def data_profiling( - self, - dataset: pd.DataFrame, - comparison_dataset: Optional[Any] = None, - profile_list: Optional[Sequence[str]] = None, - expectation_suite_name: Optional[str] = None, - data_asset_name: Optional[str] = None, - profiler_kwargs: Optional[Dict[str, Any]] = None, - overwrite_existing_suite: bool = True, - **kwargs: Any, - ) -> ExpectationSuite: - """Infer a Great Expectation Expectation Suite from a given dataset. - - This Great Expectations specific data profiling method implementation - builds an Expectation Suite automatically by running a - UserConfigurableProfiler on an input dataset [as covered in the official - GE documentation](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_with_a_profiler). - - Args: - dataset: The dataset from which the expectation suite will be - inferred. - comparison_dataset: Optional dataset used to generate data - comparison (i.e. data drift) profiles. Not supported by the - Great Expectation data validator. - profile_list: Optional list identifying the categories of data - profiles to be generated. Not supported by the Great Expectation - data validator. - expectation_suite_name: The name of the expectation suite to create - or update. If not supplied, a unique name will be generated from - the current pipeline and step name, if running in the context of - a pipeline step. - data_asset_name: The name of the data asset to use to identify the - dataset in the Great Expectations docs. - profiler_kwargs: A dictionary of custom keyword arguments to pass to - the profiler. - overwrite_existing_suite: Whether to overwrite an existing - expectation suite, if one exists with that name. - kwargs: Additional keyword arguments (unused). - - Returns: - The inferred Expectation Suite. - - Raises: - ValueError: if an `expectation_suite_name` value is not supplied and - a name for the expectation suite cannot be generated from the - current step name and pipeline name. - """ - context = self.data_context - - if comparison_dataset is not None: - logger.warning( - "A comparison dataset is not required by Great Expectations " - "to do data profiling. Silently ignoring the supplied dataset " - ) - - if not expectation_suite_name: - try: - step_context = get_step_context() - pipeline_name = step_context.pipeline.name - step_name = step_context.step_run.name - expectation_suite_name = f"{pipeline_name}_{step_name}" - except RuntimeError: - raise ValueError( - "A expectation suite name is required when not running in " - "the context of a pipeline step." - ) - - suite_exists = False - if context.expectations_store.has_key( # noqa - ExpectationSuiteIdentifier(expectation_suite_name) - ): - suite_exists = True - suite = context.get_expectation_suite(expectation_suite_name) - if not overwrite_existing_suite: - logger.info( - f"Expectation Suite `{expectation_suite_name}` " - f"already exists and `overwrite_existing_suite` is not set " - f"in the step configuration. Skipping re-running the " - f"profiler." - ) - return suite - - batch_request = create_batch_request(context, dataset, data_asset_name) - - try: - if suite_exists: - validator = context.get_validator( - batch_request=batch_request, - expectation_suite_name=expectation_suite_name, - ) - else: - validator = context.get_validator( - batch_request=batch_request, - create_expectation_suite_with_name=expectation_suite_name, - ) - - profiler = UserConfigurableProfiler( - profile_dataset=validator, **profiler_kwargs - ) - - suite = profiler.build_suite() - context.save_expectation_suite( - expectation_suite=suite, - expectation_suite_name=expectation_suite_name, - ) - - context.build_data_docs() - finally: - context.delete_datasource(batch_request.datasource_name) - - return suite - def data_validation( self, dataset: pd.DataFrame, comparison_dataset: Optional[Any] = None, check_list: Optional[Sequence[str]] = None, + expectations_list: Optional[Sequence[GreatExpectationExpectationConfig]] = None, + expectation_parameters: Optional[Dict[str, Any]] = None, expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, - action_list: Optional[List[Dict[str, Any]]] = None, + action_list: Optional[List[ge.checkpoint.actions.ValidationAction]] = None, **kwargs: Any, ) -> CheckpointResult: """Great Expectations data validation. @@ -460,6 +336,11 @@ def data_validation( check_list: Optional list identifying the data validation checks to be performed. Not supported by the Great Expectations data validator. + expectations_list: A list of Great Expectations expectations to + use to validate the dataset. A value must be provided. + expectation_parameters: Optional parameters to pass to the + expectations if you have defined any parameters in the + expectations. expectation_suite_name: The name of the expectation suite to use to validate the dataset. A value must be provided. data_asset_name: The name of the data asset to use to identify the @@ -474,15 +355,18 @@ def data_validation( Raises: ValueError: if the `expectation_suite_name` argument is omitted. """ - if not expectation_suite_name: - raise ValueError("Missing expectation_suite_name argument value.") - if comparison_dataset is not None: logger.warning( "A comparison dataset is not required by Great Expectations " "to do data validation. Silently ignoring the supplied dataset " ) + if expectation_suite_name and expectations_list: + raise ValueError("Only one of `expectation_suite_name` and `expectations_list` can be provided.") + + if not expectation_suite_name and not expectations_list: + raise ValueError("Either `expectation_suite_name` or `expectations_list` must be provided.") + try: step_context = get_step_context() run_name = step_context.pipeline_run.name @@ -494,42 +378,49 @@ def data_validation( context = self.data_context - checkpoint_name = f"{run_name}_{step_name}" + # get all expectations from the list + if expectations_list: + # construct an expectation suite name from the pipeline and step names + suite_name = f"{run_name}_{step_name}" + expectation_suite = ExpectationSuite( + name=suite_name, + expectations=[exp.get_expectation() for exp in expectations_list] + ) + context.suites.add(expectation_suite) + + else: # when the expectation_suite_name is provided + expectation_suite = context.suites.get(name=expectation_suite_name) + + # TODO need to create a batch definition + batch_definition, batch_parameters = create_batch_definition(context, dataset, data_asset_name) + + # create a validation definition + validation_defintion = ge.ValidationDefinition( + data=batch_definition, suite=expectation_suite, + name=f"{run_name}_{step_name}" + ) - batch_request = create_batch_request(context, dataset, data_asset_name) + validation_defintion = context.validation_definitions.add(validation_defintion) - action_list = action_list or [ - { - "name": "store_validation_result", - "action": {"class_name": "StoreValidationResultAction"}, - }, - { - "name": "store_evaluation_params", - "action": {"class_name": "StoreEvaluationParametersAction"}, - }, - { - "name": "update_data_docs", - "action": {"class_name": "UpdateDataDocsAction"}, - }, - ] - - checkpoint_config: Dict[str, Any] = { - "name": checkpoint_name, - "run_name_template": run_name, - "config_version": 1, - "class_name": "Checkpoint", - "expectation_suite_name": expectation_suite_name, - "action_list": action_list, - } - context.add_checkpoint(**checkpoint_config) # type: ignore[has-type] + # create a checkpoint + checkpoint_name = f"{run_name}_{step_name}" + checkpoint = ge.Checkpoint( + name=checkpoint_name, + validation_definitions=[validation_defintion], + actions=action_list, + # get it from the kwargs + result_format={"result_format": kwargs.get("result_format", "SUMMARY")}, + ) + checkpoint = context.checkpoints.add(checkpoint) + + # run a checkpoint try: - results = context.run_checkpoint( - checkpoint_name=checkpoint_name, - validations=[{"batch_request": batch_request}], + results = checkpoint.run( + batch_parameters=batch_parameters, + expectation_parameters=expectation_parameters ) finally: - context.delete_datasource(batch_request.datasource_name) - context.delete_checkpoint(checkpoint_name) + context.delete_datasource(batch_definition.data_asset.datasource.name) return results diff --git a/src/zenml/integrations/great_expectations/utils.py b/src/zenml/integrations/great_expectations/utils.py index aeeeec94ff8..3a98c69c1b4 100644 --- a/src/zenml/integrations/great_expectations/utils.py +++ b/src/zenml/integrations/great_expectations/utils.py @@ -13,12 +13,10 @@ # permissions and limitations under the License. """Great Expectations data profiling standard step.""" -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Tuple import pandas as pd -from great_expectations.core.batch import ( # type: ignore[import-untyped] - RuntimeBatchRequest, -) +from great_expectations.core.batch_definition import BatchDefinition from great_expectations.data_context.data_context.abstract_data_context import ( AbstractDataContext, ) @@ -30,11 +28,11 @@ logger = get_logger(__name__) -def create_batch_request( +def create_batch_definition( context: AbstractDataContext, dataset: pd.DataFrame, data_asset_name: Optional[str], -) -> RuntimeBatchRequest: +) -> Tuple[BatchDefinition, Dict[str, Any]]: """Create a temporary runtime GE batch request from a dataset step artifact. Args: @@ -58,33 +56,16 @@ def create_batch_request( step_name = f"step_{random_str(5)}" datasource_name = f"{run_name}_{step_name}" - data_connector_name = datasource_name data_asset_name = data_asset_name or f"{pipeline_name}_{step_name}" - batch_identifier = "default" + batch_definition_name = "default" - datasource_config: Dict[str, Any] = { - "name": datasource_name, - "class_name": "Datasource", - "module_name": "great_expectations.datasource", - "execution_engine": { - "module_name": "great_expectations.execution_engine", - "class_name": "PandasExecutionEngine", - }, - "data_connectors": { - data_connector_name: { - "class_name": "RuntimeDataConnector", - "batch_identifiers": [batch_identifier], - }, - }, - } + data_source = context.data_sources.add_pandas(name=datasource_name) + data_asset = data_source.add_dataframe_asset(name=data_asset_name) - context.add_datasource(**datasource_config) - batch_request = RuntimeBatchRequest( - datasource_name=datasource_name, - data_connector_name=data_connector_name, - data_asset_name=data_asset_name, - runtime_parameters={"batch_data": dataset}, - batch_identifiers={batch_identifier: batch_identifier}, + batch_definition = data_asset.add_batch_definition_whole_dataframe( + batch_definition_name ) - return batch_request + batch_parameters = {"dataframe": dataset} + + return batch_definition, batch_parameters From e4f2d9578f87792feca843fce872e2ec08420a99 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 18 Sep 2024 16:14:47 +0530 Subject: [PATCH 03/21] fix materializer --- .../materializers/ge_materializer.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py index 88fd66364f4..58efae93e67 100644 --- a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py +++ b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py @@ -16,18 +16,14 @@ import os from typing import TYPE_CHECKING, Any, ClassVar, Dict, Tuple, Type, Union, cast -from great_expectations.checkpoint.types.checkpoint_result import ( # type: ignore[import-untyped] - CheckpointResult, -) +from great_expectations.checkpoint.checkpoint import Checkpoint, CheckpointResult from great_expectations.core import ( # type: ignore[import-untyped] ExpectationSuite, ) from great_expectations.core.expectation_validation_result import ( # type: ignore[import-untyped] ExpectationSuiteValidationResult, ) -from great_expectations.data_context.types.base import ( - CheckpointConfig, -) + from great_expectations.data_context.types.resource_identifiers import ( ExpectationSuiteIdentifier, ValidationResultIdentifier, @@ -80,7 +76,7 @@ def preprocess_run_result(key: str, value: Any) -> Any: return ExpectationSuiteValidationResult(**value) return value - artifact_dict["checkpoint_config"] = CheckpointConfig( + artifact_dict["checkpoint_config"] = Checkpoint( **artifact_dict["checkpoint_config"] ) validation_dict = {} From 38fbabe35f98ecb03660ab7f8b4a60d4fa1ed626 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 18 Sep 2024 16:15:23 +0530 Subject: [PATCH 04/21] return datasource name --- src/zenml/integrations/great_expectations/utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/zenml/integrations/great_expectations/utils.py b/src/zenml/integrations/great_expectations/utils.py index 3a98c69c1b4..30091e4e885 100644 --- a/src/zenml/integrations/great_expectations/utils.py +++ b/src/zenml/integrations/great_expectations/utils.py @@ -32,7 +32,7 @@ def create_batch_definition( context: AbstractDataContext, dataset: pd.DataFrame, data_asset_name: Optional[str], -) -> Tuple[BatchDefinition, Dict[str, Any]]: +) -> Tuple[BatchDefinition, Dict[str, Any], str]: """Create a temporary runtime GE batch request from a dataset step artifact. Args: @@ -41,7 +41,9 @@ def create_batch_definition( data_asset_name: Optional custom name for the data asset. Returns: - A Great Expectations runtime batch request. + batch_definition: A Great Expectations runtime batch request. + batch_parameters: A dictionary of batch parameters. + datasource_name: The name of the datasource. """ try: # get pipeline name, step name and run id @@ -68,4 +70,4 @@ def create_batch_definition( batch_parameters = {"dataframe": dataset} - return batch_definition, batch_parameters + return batch_definition, batch_parameters, datasource_name From 2fbdd83f9f08c50ceefcb8c833c90df165f444af Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 18 Sep 2024 16:15:38 +0530 Subject: [PATCH 05/21] fix ds deltion --- .../great_expectations/data_validators/ge_data_validator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py index 1597cb2a3bb..55dad2c49ab 100644 --- a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py +++ b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py @@ -392,7 +392,7 @@ def data_validation( expectation_suite = context.suites.get(name=expectation_suite_name) # TODO need to create a batch definition - batch_definition, batch_parameters = create_batch_definition(context, dataset, data_asset_name) + batch_definition, batch_parameters, datasource_name = create_batch_definition(context, dataset, data_asset_name) # create a validation definition validation_defintion = ge.ValidationDefinition( @@ -421,6 +421,6 @@ def data_validation( expectation_parameters=expectation_parameters ) finally: - context.delete_datasource(batch_definition.data_asset.datasource.name) + context.delete_datasource(datasource_name) return results From f4ef8a3b46ed114a410c00becd565d6ad4ecbdfe Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 18 Sep 2024 18:51:35 +0530 Subject: [PATCH 06/21] all the fixes except for the materializer --- .../data_validators/expectations.py | 23 +++++--- .../data_validators/ge_data_validator.py | 31 +++++----- .../great_expectations/steps/__init__.py | 3 - .../great_expectations/steps/ge_profiler.py | 59 ------------------- .../great_expectations/steps/ge_validator.py | 21 +++++-- 5 files changed, 47 insertions(+), 90 deletions(-) delete mode 100644 src/zenml/integrations/great_expectations/steps/ge_profiler.py diff --git a/src/zenml/integrations/great_expectations/data_validators/expectations.py b/src/zenml/integrations/great_expectations/data_validators/expectations.py index 10d71a6ce10..ba82a5a0a89 100644 --- a/src/zenml/integrations/great_expectations/data_validators/expectations.py +++ b/src/zenml/integrations/great_expectations/data_validators/expectations.py @@ -19,8 +19,10 @@ from pydantic import BaseModel +from zenml.config.source import Source, SourceType from zenml.logger import get_logger from zenml.utils import source_utils +from importlib import import_module logger = get_logger(__name__) @@ -30,7 +32,7 @@ class GreatExpectationExpectationConfig(BaseModel): This class defines the configuration parameters that can be used to customize the behavior of a Great Expectation Expectation. This includes - the expectation name (in the right case), and any input options that the + the expectation name (in snake case), and any input options that the expectation expects. This could be the column name, the value to compare against, etc. @@ -43,11 +45,13 @@ class GreatExpectationExpectationConfig(BaseModel): parameters. Attributes: - expectation_name: The name of the Great Expectation expectation to apply. - kwargs: Additional keyword arguments to pass to the expectation. + expectation_name: The name of the Great Expectation expectation to apply in snake case. + expectation_args: Input arguments to pass to the expectation. These can be + used to pass the column name, the value to compare against, etc. + """ expectation_name: str - kwargs: Dict[str, Any] = {}\ + expectation_args: Dict[str, Any] = {} @staticmethod def get_expectation_class(expectation_name: str) -> Type[Expectation]: @@ -62,11 +66,14 @@ def get_expectation_class(expectation_name: str) -> Type[Expectation]: if the expectation name does not map to a valid Great Expectation expectation class. """ + module_name = f"great_expectations.expectations.core.{expectation_name}" + class_name = "".join(word.capitalize() for word in expectation_name.split("_")) + return source_utils.load_and_validate_class( - f"great_expectations.expectations.{expectation_name}", + source=Source(module=module_name, attribute=class_name, type=SourceType.USER), expected_class=Expectation, ) - + def get_expectation(self) -> Expectation: """Get the Great Expectation expectation object associated with this config. @@ -75,7 +82,7 @@ def get_expectation(self) -> Expectation: """ try: expectation_class = self.get_expectation_class(self.expectation_name) - expectation = expectation_class(**self.kwargs) + expectation = expectation_class(**self.expectation_args) except TypeError: raise ValueError( f"Could not map the `{self.expectation_name}` expectation " @@ -85,7 +92,7 @@ def get_expectation(self) -> Expectation: raise ValueError( f"An error occurred while trying to instantiate the " f"`{self.expectation_name}` expectation class " - f"with the following parameters: {self.kwargs}" + f"with the following parameters: {self.expectation_args}" f"Exception: {str(e)}" ) diff --git a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py index 55dad2c49ab..6ef7d2ef1fc 100644 --- a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py +++ b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py @@ -39,7 +39,6 @@ from great_expectations.data_context.types.resource_identifiers import ( ExpectationSuiteIdentifier, ) -from great_expectations.profile.user_configurable_profiler import # type: ignore[import-untyped] from zenml import get_step_context from zenml.client import Client @@ -52,7 +51,7 @@ from zenml.integrations.great_expectations.ge_store_backend import ( ZenMLArtifactStoreBackend, ) -from zenml.integrations.great_expectations.utils import create_batch_definition, create_batch_request +from zenml.integrations.great_expectations.utils import create_batch_definition from zenml.io import fileio from zenml.logger import get_logger from zenml.utils import io_utils @@ -198,7 +197,7 @@ def data_context(self) -> AbstractDataContext: """ if not self._context: expectations_store_name = "zenml_expectations_store" - validations_store_name = "zenml_validations_store" + validation_results_store_name = "zenml_validation_results_store" checkpoint_store_name = "zenml_checkpoint_store" # Define default configuration options that plug the GX stores @@ -208,15 +207,15 @@ def data_context(self) -> AbstractDataContext: expectations_store_name: self.get_store_config( "ExpectationsStore", "expectations" ), - validations_store_name: self.get_store_config( - "ValidationsStore", "validations" + validation_results_store_name: self.get_store_config( + "ValidationResultsStore", "validation_results" ), checkpoint_store_name: self.get_store_config( "CheckpointStore", "checkpoints" ), }, expectations_store_name=expectations_store_name, - validations_store_name=validations_store_name, + validation_results_store_name=validation_results_store_name, checkpoint_store_name=checkpoint_store_name, data_docs_sites={ "zenml_artifact_store": self.get_data_docs_config( @@ -235,7 +234,7 @@ def data_context(self) -> AbstractDataContext: else: # create an ephemeral in-memory data context that is not - # backed by a local YAML file (see https://docs.greatexpectations.io/docs/oss/guides/setup/configuring_data_contexts/instantiating_data_contexts/instantiate_data_context/). + # backed by a local YAML file (see https://docs.greatexpectations.io/docs/core/set_up_a_gx_environment/create_a_data_context?context_type=ephemeral). if self.context_config: # Use the data context configuration provided in the stack # component configuration @@ -249,16 +248,14 @@ def data_context(self) -> AbstractDataContext: # already baked in the initial configuration configure_zenml_stores = False - self._context = EphemeralDataContext( - project_config=context_config - ) + self._context = get_context(mode="ephemeral", project_config=context_config) if configure_zenml_stores: self._context.config.expectations_store_name = ( expectations_store_name ) self._context.config.validation_results_store_name = ( - validations_store_name + validation_results_store_name ) self._context.config.checkpoint_store_name = ( checkpoint_store_name @@ -319,6 +316,7 @@ def data_validation( expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, action_list: Optional[List[ge.checkpoint.actions.ValidationAction]] = None, + result_format: str = "SUMMARY", **kwargs: Any, ) -> CheckpointResult: """Great Expectations data validation. @@ -347,13 +345,15 @@ def data_validation( dataset in the Great Expectations docs. action_list: A list of additional Great Expectations actions to run after the validation check. - kwargs: Additional keyword arguments (unused). + result_format: The format of the validation results. + kwargs: Additional keyword arguments. Returns: The Great Expectations validation (checkpoint) result. Raises: - ValueError: if the `expectation_suite_name` argument is omitted. + ValueError: if the expectation suite name and expectations list are both provided + or if neither are provided """ if comparison_dataset is not None: logger.warning( @@ -399,7 +399,6 @@ def data_validation( data=batch_definition, suite=expectation_suite, name=f"{run_name}_{step_name}" ) - validation_defintion = context.validation_definitions.add(validation_defintion) # create a checkpoint @@ -407,9 +406,9 @@ def data_validation( checkpoint = ge.Checkpoint( name=checkpoint_name, validation_definitions=[validation_defintion], - actions=action_list, + actions=action_list or [], # get it from the kwargs - result_format={"result_format": kwargs.get("result_format", "SUMMARY")}, + result_format={"result_format": result_format}, ) checkpoint = context.checkpoints.add(checkpoint) diff --git a/src/zenml/integrations/great_expectations/steps/__init__.py b/src/zenml/integrations/great_expectations/steps/__init__.py index eca1af5290c..b8229e45462 100644 --- a/src/zenml/integrations/great_expectations/steps/__init__.py +++ b/src/zenml/integrations/great_expectations/steps/__init__.py @@ -14,9 +14,6 @@ """Great Expectations data profiling and validation standard steps.""" -from zenml.integrations.great_expectations.steps.ge_profiler import ( - great_expectations_profiler_step, -) from zenml.integrations.great_expectations.steps.ge_validator import ( great_expectations_validator_step, ) diff --git a/src/zenml/integrations/great_expectations/steps/ge_profiler.py b/src/zenml/integrations/great_expectations/steps/ge_profiler.py deleted file mode 100644 index cc1edd2d187..00000000000 --- a/src/zenml/integrations/great_expectations/steps/ge_profiler.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Great Expectations data profiling standard step.""" - -from typing import Any, Dict, Optional - -import pandas as pd -from great_expectations.core import ( # type: ignore[import-untyped] - ExpectationSuite, -) - -from zenml import step -from zenml.integrations.great_expectations.data_validators.ge_data_validator import ( - GreatExpectationsDataValidator, -) - - -@step -def great_expectations_profiler_step( - dataset: pd.DataFrame, - expectation_suite_name: str, - data_asset_name: Optional[str] = None, - profiler_kwargs: Optional[Dict[str, Any]] = None, - overwrite_existing_suite: bool = True, -) -> ExpectationSuite: - """Infer data validation rules from a pandas dataset. - - Args: - dataset: The dataset from which the expectation suite will be inferred. - expectation_suite_name: The name of the expectation suite to infer. - data_asset_name: The name of the data asset to profile. - profiler_kwargs: A dictionary of keyword arguments to pass to the - profiler. - overwrite_existing_suite: Whether to overwrite an existing expectation - suite. - - Returns: - The generated Great Expectations suite. - """ - data_validator = GreatExpectationsDataValidator.get_active_data_validator() - - return data_validator.data_profiling( - dataset, - expectation_suite_name=expectation_suite_name, - data_asset_name=data_asset_name, - profiler_kwargs=profiler_kwargs or {}, - overwrite_existing_suite=overwrite_existing_suite, - ) diff --git a/src/zenml/integrations/great_expectations/steps/ge_validator.py b/src/zenml/integrations/great_expectations/steps/ge_validator.py index e23c9cd0e0a..c95ba04a9b5 100644 --- a/src/zenml/integrations/great_expectations/steps/ge_validator.py +++ b/src/zenml/integrations/great_expectations/steps/ge_validator.py @@ -16,22 +16,26 @@ from typing import Any, Dict, List, Optional import pandas as pd -from great_expectations.checkpoint.types.checkpoint_result import ( # type: ignore[import-untyped] +from great_expectations.checkpoint.checkpoint import ( # type: ignore[import-untyped] CheckpointResult, ) from zenml import step +from zenml.integrations.great_expectations.data_validators.expectations import GreatExpectationExpectationConfig from zenml.integrations.great_expectations.data_validators.ge_data_validator import ( GreatExpectationsDataValidator, ) - +import great_expectations as ge @step def great_expectations_validator_step( dataset: pd.DataFrame, - expectation_suite_name: str, + expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, - action_list: Optional[List[Dict[str, Any]]] = None, + action_list: Optional[List[ge.checkpoint.actions.ValidationAction]] = None, + expectation_parameters: Optional[Dict[str, Any]] = None, + expectations_list: Optional[List[GreatExpectationExpectationConfig]] = None, + result_format: str = "SUMMARY", exit_on_error: bool = False, ) -> CheckpointResult: """Shortcut function to create a new instance of the GreatExpectationsValidatorStep step. @@ -51,6 +55,12 @@ def great_expectations_validator_step( after the validation check. exit_on_error: Set this flag to raise an error and exit the pipeline early if the validation fails. + expectation_parameters: Additional parameters to pass to the + expectation suite. + expectations_list: A list of expectations to run. + result_format: The format of the validation results. + **kwargs: Additional arguments to pass to the Great Expectations + validator. Returns: The Great Expectations validation (checkpoint) result. @@ -66,6 +76,9 @@ def great_expectations_validator_step( expectation_suite_name=expectation_suite_name, data_asset_name=data_asset_name, action_list=action_list, + expectation_parameters=expectation_parameters, + expectations_list=expectations_list, + result_format=result_format, ) if exit_on_error and not results.success: From 596fdc3e49589637bd6f576aef014d5f677ef9cf Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 18 Sep 2024 20:16:23 +0530 Subject: [PATCH 07/21] fix materializer --- .../materializers/ge_materializer.py | 84 ++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py index 58efae93e67..9a552a5ec40 100644 --- a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py +++ b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py @@ -15,6 +15,7 @@ import os from typing import TYPE_CHECKING, Any, ClassVar, Dict, Tuple, Type, Union, cast +import json from great_expectations.checkpoint.checkpoint import Checkpoint, CheckpointResult from great_expectations.core import ( # type: ignore[import-untyped] @@ -38,6 +39,8 @@ ) from zenml.materializers.base_materializer import BaseMaterializer from zenml.utils import source_utils, yaml_utils +from datetime import datetime +import uuid if TYPE_CHECKING: from zenml.metadata.metadata_types import MetadataType @@ -105,6 +108,8 @@ def load(self, data_type: Type[Any]) -> SerializableDictDot: filepath = os.path.join(self.uri, ARTIFACT_FILENAME) artifact_dict = yaml_utils.read_json(filepath) data_type = source_utils.load(artifact_dict.pop("data_type")) + # load active data context + context = GreatExpectationsDataValidator.get_data_context() if data_type is CheckpointResult: self.preprocess_checkpoint_result_dict(artifact_dict) @@ -118,13 +123,90 @@ def save(self, obj: SerializableDictDot) -> None: obj: A Great Expectations object. """ filepath = os.path.join(self.uri, ARTIFACT_FILENAME) - artifact_dict = obj.to_json_dict() + artifact_dict = self.serialize_ge_object(obj) artifact_type = type(obj) artifact_dict["data_type"] = ( f"{artifact_type.__module__}.{artifact_type.__name__}" ) yaml_utils.write_json(filepath, artifact_dict) + def serialize_ge_object(self, obj: Any) -> Any: + """Serialize a Great Expectations object to a JSON-serializable structure. + + Args: + obj: A Great Expectations object. + + Returns: + A JSON-serializable representation of the object. + """ + if isinstance(obj, dict): + return {self.serialize_key(k): self.serialize_ge_object(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self.serialize_ge_object(v) for v in obj] + elif isinstance(obj, (ExpectationSuiteIdentifier, ValidationResultIdentifier)): + return self.serialize_identifier(obj) + elif isinstance(obj, Checkpoint): + return self.serialize_checkpoint(obj) + elif isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, uuid.UUID): + return str(obj) + elif hasattr(obj, "to_json_dict"): + return self.serialize_ge_object(obj.to_json_dict()) + elif hasattr(obj, "__dict__"): + return self.serialize_ge_object(obj.__dict__) + else: + return obj + + def serialize_key(self, key: Any) -> str: + """Serialize a dictionary key to a string. + + Args: + key: The key to serialize. + + Returns: + A string representation of the key. + """ + if isinstance(key, (str, int, float, bool)) or key is None: + return str(key) + elif isinstance(key, (ExpectationSuiteIdentifier, ValidationResultIdentifier)): + return self.serialize_identifier(key) + else: + return str(key) + + def serialize_identifier(self, identifier: Union[ExpectationSuiteIdentifier, ValidationResultIdentifier]) -> str: + """Serialize ExpectationSuiteIdentifier or ValidationResultIdentifier to a string. + + Args: + identifier: The identifier to serialize. + + Returns: + A string representation of the identifier. + """ + if isinstance(identifier, ExpectationSuiteIdentifier): + return f"ExpectationSuiteIdentifier:{identifier.expectation_suite_name}" + elif isinstance(identifier, ValidationResultIdentifier): + return f"ValidationResultIdentifier:{identifier.expectation_suite_identifier}:{identifier.run_id}:{identifier.batch_identifier}" + else: + raise ValueError(f"Unsupported identifier type: {type(identifier)}") + + def serialize_checkpoint(self, checkpoint: Checkpoint) -> Dict[str, Any]: + """Serialize a Checkpoint object. + + Args: + checkpoint: The Checkpoint object to serialize. + + Returns: + A dictionary representation of the Checkpoint. + """ + return { + "name": checkpoint.name, + "validation_definitions": [self.serialize_ge_object(vd) for vd in checkpoint.validation_definitions], + "actions": checkpoint.actions, + "result_format": checkpoint.result_format, + "id": str(checkpoint.id) if checkpoint.id else None, + } + def save_visualizations( self, data: Union[ExpectationSuite, CheckpointResult] ) -> Dict[str, VisualizationType]: From cf0514c2d4695bd708ea5469120fc4b4a2f9e9a9 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Wed, 18 Sep 2024 20:36:38 +0530 Subject: [PATCH 08/21] update version --- src/zenml/integrations/great_expectations/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zenml/integrations/great_expectations/__init__.py b/src/zenml/integrations/great_expectations/__init__.py index fb1fe8e4a76..06a6398e36a 100644 --- a/src/zenml/integrations/great_expectations/__init__.py +++ b/src/zenml/integrations/great_expectations/__init__.py @@ -30,7 +30,7 @@ class GreatExpectationsIntegration(Integration): """Definition of Great Expectations integration for ZenML.""" NAME = GREAT_EXPECTATIONS - REQUIREMENTS = ["great-expectations>=0.17.15,<1.0"] + REQUIREMENTS = ["great-expectations~=1.0.0"] REQUIREMENTS_IGNORED_ON_UNINSTALL = ["pandas"] From c6efb566b767b43185d641f5582ffec4857ec294 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Fri, 20 Sep 2024 16:19:00 +0530 Subject: [PATCH 09/21] update docs --- .../data-validators/great-expectations.md | 120 +++++------------- 1 file changed, 29 insertions(+), 91 deletions(-) diff --git a/docs/book/component-guide/data-validators/great-expectations.md b/docs/book/component-guide/data-validators/great-expectations.md index 9759e1f47bb..b1a137d1db6 100644 --- a/docs/book/component-guide/data-validators/great-expectations.md +++ b/docs/book/component-guide/data-validators/great-expectations.md @@ -6,17 +6,16 @@ description: >- # Great Expectations -The Great Expectations [Data Validator](./data-validators.md) flavor provided with the ZenML integration uses [Great Expectations](https://greatexpectations.io/) to run data profiling and data quality tests on the data circulated through your pipelines. The test results can be used to implement automated corrective actions in your pipelines. They are also automatically rendered into documentation for further visual interpretation and evaluation. +The Great Expectations [Data Validator](./data-validators.md) flavor provided with the ZenML integration uses [Great Expectations](https://greatexpectations.io/) to run data validation tests on the data circulated through your pipelines. The test results can be used to implement automated corrective actions in your pipelines. They are also automatically rendered into documentation for further visual interpretation and evaluation. ### When would you want to use it? -[Great Expectations](https://greatexpectations.io/) is an open-source library that helps keep the quality of your data in check through data testing, documentation, and profiling, and to improve communication and observability. Great Expectations works with tabular data in a variety of formats and data sources, of which ZenML currently supports only `pandas.DataFrame` as part of its pipelines. +[Great Expectations](https://greatexpectations.io/) is an open-source library that helps keep the quality of your data in check through data testing, and documentation and to improve communication and observability. Great Expectations works with tabular data in a variety of formats and data sources, of which ZenML currently supports only `pandas.DataFrame` as part of its pipelines. You should use the Great Expectations Data Validator when you need the following data validation features that are possible with Great Expectations: -* [Data Profiling](https://docs.greatexpectations.io/docs/oss/guides/expectations/creating_custom_expectations/how_to_add_support_for_the_auto_initializing_framework_to_a_custom_expectation/#build-a-custom-profiler-for-your-expectation): generates a set of validation rules (Expectations) automatically by inferring them from the properties of an input dataset. -* [Data Quality](https://docs.greatexpectations.io/docs/oss/guides/validation/checkpoints/how_to_pass_an_in_memory_dataframe_to_a_checkpoint/): runs a set of predefined or inferred validation rules (Expectations) against an in-memory dataset. -* [Data Docs](https://docs.greatexpectations.io/docs/reference/learn/terms/data_docs_store/): generate and maintain human-readable documentation of all your data validation rules, data quality checks and their results. +* [Data Validation](https://docs.greatexpectations.io/docs/core/trigger_actions_based_on_results/run_a_checkpoint): runs a set of predefined or inferred validation rules (Expectations) against an in-memory dataset. +* [Data Docs](https://docs.greatexpectations.io/docs/core/configure_project_settings/configure_data_docs/): generate and maintain human-readable documentation of all your data validation rules, data quality checks and their results. You should consider one of the other [Data Validator flavors](./data-validators.md#data-validator-flavors) if you need a different set of data validation features. @@ -105,80 +104,16 @@ For more, up-to-date information on the Great Expectations Data Validator config The core Great Expectations concepts that you should be aware of when using it within ZenML pipelines are Expectations / Expectation Suites, Validations and Data Docs. -ZenML wraps the Great Expectations' functionality in the form of two standard steps: - -* a Great Expectations data profiler that can be used to automatically generate Expectation Suites from an input `pandas.DataFrame` dataset -* a Great Expectations data validator that uses an existing Expectation Suite to validate an input `pandas.DataFrame` dataset +ZenML wraps the Great Expectations' functionality in the form of a standard data validator step that uses an existing Expectation Suite or a list of Expectations to validate an input `pandas.DataFrame` dataset You can visualize Great Expectations Suites and Results in Jupyter notebooks or view them directly in the ZenML dashboard. -#### The Great Expectation's data profiler step - -The standard Great Expectation's data profiler step builds an Expectation Suite automatically by running a [`UserConfigurableProfiler`](https://docs.greatexpectations.io/docs/guides/expectations/how\_to\_create\_and\_edit\_expectations\_with\_a\_profiler) on an input `pandas.DataFrame` dataset. The generated Expectation Suite is saved in the Great Expectations Expectation Store, but also returned as an `ExpectationSuite` artifact that is versioned and saved in the ZenML Artifact Store. The step automatically rebuilds the Data Docs. - -At a minimum, the step configuration expects a name to be used for the Expectation Suite: - -```python -from zenml.integrations.great_expectations.steps import ( - great_expectations_profiler_step, -) - -ge_profiler_step = great_expectations_profiler_step.with_options( - parameters={ - "expectation_suite_name": "steel_plates_suite", - "data_asset_name": "steel_plates_train_df", - } -) -``` - -The step can then be inserted into your pipeline where it can take in a pandas dataframe, e.g.: - -```python -from zenml import pipeline - -docker_settings = DockerSettings(required_integrations=[SKLEARN, GREAT_EXPECTATIONS]) - -@pipeline(settings={"docker": docker_settings}) -def profiling_pipeline(): - """Data profiling pipeline for Great Expectations. - - The pipeline imports a reference dataset from a source then uses the builtin - Great Expectations profiler step to generate an expectation suite (i.e. - validation rules) inferred from the schema and statistical properties of the - reference dataset. - - Args: - importer: reference data importer step - profiler: data profiler step - """ - dataset, _ = importer() - ge_profiler_step(dataset) - - -profiling_pipeline() -``` - -As can be seen from the [step definition](https://apidocs.zenml.io/latest/integration\_code\_docs/integrations-great\_expectations/#zenml.integrations.great\_expectations.steps.ge\_profiler.great\_expectations\_profiler\_step) , the step takes in a `pandas.DataFrame` dataset, and it returns a Great Expectations `ExpectationSuite` object: - -```python -@step -def great_expectations_profiler_step( - dataset: pd.DataFrame, - expectation_suite_name: str, - data_asset_name: Optional[str] = None, - profiler_kwargs: Optional[Dict[str, Any]] = None, - overwrite_existing_suite: bool = True, -) -> ExpectationSuite: - ... -``` - -You can view [the complete list of configuration parameters](https://apidocs.zenml.io/latest/integration\_code\_docs/integrations-great\_expectations/#zenml.integrations.great\_expectations.steps.ge\_profiler.great\_expectations\_profiler\_step) in the SDK docs. #### The Great Expectations data validator step -The standard Great Expectations data validator step validates an input `pandas.DataFrame` dataset by running an existing Expectation Suite on it. The validation results are saved in the Great Expectations Validation Store, but also returned as an `CheckpointResult` artifact that is versioned and saved in the ZenML Artifact Store. The step automatically rebuilds the Data Docs. +The standard Great Expectations data validator step validates an input `pandas.DataFrame` dataset by running an existing Expectation Suite or a list of Expectations on it. The validation results are saved in the Great Expectations Validation Store, but also returned as an `CheckpointResult` artifact that is versioned and saved in the ZenML Artifact Store. The step automatically rebuilds the Data Docs. -At a minimum, the step configuration expects the name of the Expectation Suite to be used for the validation: +At a minimum, the step configuration expects the name of the Expectation Suite or a list of Expectations to be used for the validation. In the example below we use a list of Expectations, with each expectation defined as a `GreatExpectationExpectationConfig` object, with the name of the expectation in snake case and the arguments of the expectation as a dictionary: ```python from zenml.integrations.great_expectations.steps import ( @@ -187,13 +122,18 @@ from zenml.integrations.great_expectations.steps import ( ge_validator_step = great_expectations_validator_step.with_options( parameters={ - "expectation_suite_name": "steel_plates_suite", - "data_asset_name": "steel_plates_train_df", - } + "expectations_list": [ + GreatExpectationExpectationConfig( + expectation_name="expect_column_values_to_not_be_null", + expectation_args={"column": "X_Minimum"}, + ) + ], + "data_asset_name": "my_data_asset", + }, ) ``` -The step can then be inserted into your pipeline where it can take in a pandas dataframe and a bool flag used solely for order reinforcement purposes, e.g.: +The step can then be inserted into your pipeline where it can take in a pandas dataframe e.g.: ```python docker_settings = DockerSettings(required_integrations=[SKLEARN, GREAT_EXPECTATIONS]) @@ -211,23 +151,26 @@ def validation_pipeline(): validator: dataset validation step checker: checks the validation results """ - dataset, condition = importer() - results = ge_validator_step(dataset, condition) + dataset = importer() + results = ge_validator_step(dataset) message = checker(results) validation_pipeline() ``` -As can be seen from the [step definition](https://apidocs.zenml.io/latest/integration\_code\_docs/integrations-great\_expectations/#zenml.integrations.great\_expectations.steps.ge\_validator.great\_expectations\_validator\_step) , the step takes in a `pandas.DataFrame` dataset and a boolean `condition` and it returns a Great Expectations `CheckpointResult` object. The boolean `condition` is only used as a means of ordering steps in a pipeline (e.g. if you must force it to run only after the data profiling step generates an Expectation Suite): +As can be seen from the [step definition](https://apidocs.zenml.io/latest/integration\_code\_docs/integrations-great\_expectations/#zenml.integrations.great\_expectations.steps.ge\_validator.great\_expectations\_validator\_step) , the step takes in a `pandas.DataFrame` dataset and it returns a Great Expectations `CheckpointResult` object: ```python @step def great_expectations_validator_step( dataset: pd.DataFrame, - expectation_suite_name: str, + expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, - action_list: Optional[List[Dict[str, Any]]] = None, + action_list: Optional[List[ge.checkpoint.actions.ValidationAction]] = None, + expectation_parameters: Optional[Dict[str, Any]] = None, + expectations_list: Optional[List[GreatExpectationExpectationConfig]] = None, + result_format: str = "SUMMARY", exit_on_error: bool = False, ) -> CheckpointResult: ``` @@ -262,18 +205,13 @@ def create_custom_expectation_suite( # context = ge.get_context() expectation_suite_name = "custom_suite" - suite = context.create_expectation_suite( - expectation_suite_name=expectation_suite_name - ) - expectation_configuration = ExpectationConfiguration(...) - suite.add_expectation(expectation_configuration=expectation_configuration) - ... - context.save_expectation_suite( - expectation_suite=suite, - expectation_suite_name=expectation_suite_name, + expectation_suite = ExpectationSuite( + name=expectation_suite_name, + expectations=[], ) + context.suites.add(expectation_suite) context.build_data_docs() - return suite + return expectation_suite ``` The same approach must be used if you are using a Great Expectations configuration managed by ZenML and are using the Jupyter notebooks generated by the Great Expectations CLI. From 9029f1478865cf2de066bffb64ee03fe85a6031f Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Thu, 26 Sep 2024 09:37:34 +0530 Subject: [PATCH 10/21] Update docs/book/component-guide/data-validators/great-expectations.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Barış Can Durak <36421093+bcdurak@users.noreply.github.com> --- docs/book/component-guide/data-validators/great-expectations.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/book/component-guide/data-validators/great-expectations.md b/docs/book/component-guide/data-validators/great-expectations.md index b1a137d1db6..b5e1a6202f9 100644 --- a/docs/book/component-guide/data-validators/great-expectations.md +++ b/docs/book/component-guide/data-validators/great-expectations.md @@ -113,7 +113,7 @@ You can visualize Great Expectations Suites and Results in Jupyter notebooks or The standard Great Expectations data validator step validates an input `pandas.DataFrame` dataset by running an existing Expectation Suite or a list of Expectations on it. The validation results are saved in the Great Expectations Validation Store, but also returned as an `CheckpointResult` artifact that is versioned and saved in the ZenML Artifact Store. The step automatically rebuilds the Data Docs. -At a minimum, the step configuration expects the name of the Expectation Suite or a list of Expectations to be used for the validation. In the example below we use a list of Expectations, with each expectation defined as a `GreatExpectationExpectationConfig` object, with the name of the expectation in snake case and the arguments of the expectation as a dictionary: +At a minimum, the step configuration expects the name of the Expectation Suite or a list of Expectations to be used for the validation. In the example below, we use a list of Expectations. Each expectation is defined as a `GreatExpectationExpectationConfig` object with the name of the expectation written in snake case and the arguments of the expectation defined as a dictionary: ```python from zenml.integrations.great_expectations.steps import ( From 83dc04064ab9c7d5ec7886f6edab0bfe67d567bb Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Thu, 26 Sep 2024 09:37:54 +0530 Subject: [PATCH 11/21] Update src/zenml/integrations/great_expectations/steps/ge_validator.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Barış Can Durak <36421093+bcdurak@users.noreply.github.com> --- src/zenml/integrations/great_expectations/steps/ge_validator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/zenml/integrations/great_expectations/steps/ge_validator.py b/src/zenml/integrations/great_expectations/steps/ge_validator.py index c95ba04a9b5..c50a5e76901 100644 --- a/src/zenml/integrations/great_expectations/steps/ge_validator.py +++ b/src/zenml/integrations/great_expectations/steps/ge_validator.py @@ -59,7 +59,6 @@ def great_expectations_validator_step( expectation suite. expectations_list: A list of expectations to run. result_format: The format of the validation results. - **kwargs: Additional arguments to pass to the Great Expectations validator. Returns: From c73226783dd35b609d0e4a3651c5e3c6795828ab Mon Sep 17 00:00:00 2001 From: GitHub Actions Date: Thu, 26 Sep 2024 04:10:47 +0000 Subject: [PATCH 12/21] Auto-update of Starter template --- examples/mlops_starter/.copier-answers.yml | 2 +- examples/mlops_starter/README.md | 8 ++++---- examples/mlops_starter/configs/feature_engineering.yaml | 1 + examples/mlops_starter/configs/inference.yaml | 1 + examples/mlops_starter/configs/training_rf.yaml | 1 + examples/mlops_starter/configs/training_sgd.yaml | 1 + examples/mlops_starter/quickstart.ipynb | 2 +- examples/mlops_starter/requirements.txt | 1 + 8 files changed, 11 insertions(+), 6 deletions(-) diff --git a/examples/mlops_starter/.copier-answers.yml b/examples/mlops_starter/.copier-answers.yml index 48aecdfa31b..49b01a7c71e 100644 --- a/examples/mlops_starter/.copier-answers.yml +++ b/examples/mlops_starter/.copier-answers.yml @@ -1,5 +1,5 @@ # Changes here will be overwritten by Copier -_commit: 2024.08.28 +_commit: 2024.09.23 _src_path: gh:zenml-io/template-starter email: info@zenml.io full_name: ZenML GmbH diff --git a/examples/mlops_starter/README.md b/examples/mlops_starter/README.md index 10dc01e622d..b25258f78bd 100644 --- a/examples/mlops_starter/README.md +++ b/examples/mlops_starter/README.md @@ -24,7 +24,7 @@ Along the way we will also show you how to: You can use Google Colab to see ZenML in action, no signup / installation required! -Open In Colab +Open In Colab ## :computer: Run Locally @@ -36,7 +36,7 @@ pip install "zenml[server]" # clone the ZenML repository git clone https://github.com/zenml-io/zenml.git -cd zenml/examples/quickstart +cd zenml/examples/mlops_starter ``` Now we're ready to start. You have two options for running the quickstart locally: @@ -45,13 +45,13 @@ Now we're ready to start. You have two options for running the quickstart locall ```bash pip install notebook jupyter notebook -# open notebooks/quickstart.ipynb +# open quickstart.ipynb ``` #### Option 2 - Execute the whole ML pipeline from a Python script: ```bash # Install required zenml integrations -zenml integration install sklearn -y +zenml integration install sklearn pandas -y # Initialize ZenML zenml init diff --git a/examples/mlops_starter/configs/feature_engineering.yaml b/examples/mlops_starter/configs/feature_engineering.yaml index d5ab212951e..055bb6facac 100644 --- a/examples/mlops_starter/configs/feature_engineering.yaml +++ b/examples/mlops_starter/configs/feature_engineering.yaml @@ -3,6 +3,7 @@ settings: docker: required_integrations: - sklearn + - pandas requirements: - pyarrow diff --git a/examples/mlops_starter/configs/inference.yaml b/examples/mlops_starter/configs/inference.yaml index 1dcefe44cf6..8f73d76256a 100644 --- a/examples/mlops_starter/configs/inference.yaml +++ b/examples/mlops_starter/configs/inference.yaml @@ -3,6 +3,7 @@ settings: docker: required_integrations: - sklearn + - pandas requirements: - pyarrow diff --git a/examples/mlops_starter/configs/training_rf.yaml b/examples/mlops_starter/configs/training_rf.yaml index 8d75610985f..70fa6413646 100644 --- a/examples/mlops_starter/configs/training_rf.yaml +++ b/examples/mlops_starter/configs/training_rf.yaml @@ -3,6 +3,7 @@ settings: docker: required_integrations: - sklearn + - pandas requirements: - pyarrow diff --git a/examples/mlops_starter/configs/training_sgd.yaml b/examples/mlops_starter/configs/training_sgd.yaml index 857cdf7f82a..386b53b8c30 100644 --- a/examples/mlops_starter/configs/training_sgd.yaml +++ b/examples/mlops_starter/configs/training_sgd.yaml @@ -3,6 +3,7 @@ settings: docker: required_integrations: - sklearn + - pandas requirements: - pyarrow diff --git a/examples/mlops_starter/quickstart.ipynb b/examples/mlops_starter/quickstart.ipynb index 3604e7dd6a7..df8c010b5ea 100644 --- a/examples/mlops_starter/quickstart.ipynb +++ b/examples/mlops_starter/quickstart.ipynb @@ -31,7 +31,7 @@ "required!\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](\n", - "https://colab.research.google.com/github/zenml-io/zenml/blob/main/examples/quickstart/quickstart.ipynb)" + "https://colab.research.google.com/github/zenml-io/zenml/blob/main/examples/mlops_starter/quickstart.ipynb)" ] }, { diff --git a/examples/mlops_starter/requirements.txt b/examples/mlops_starter/requirements.txt index d38ead5e262..1e0a8ac5dac 100644 --- a/examples/mlops_starter/requirements.txt +++ b/examples/mlops_starter/requirements.txt @@ -2,3 +2,4 @@ zenml[server]>=0.50.0 notebook scikit-learn pyarrow +pandas From e944fdfda87b131beaa61627660bc132e7180ec1 Mon Sep 17 00:00:00 2001 From: GitHub Actions Date: Thu, 26 Sep 2024 04:20:04 +0000 Subject: [PATCH 13/21] Auto-update of NLP template --- examples/e2e_nlp/.copier-answers.yml | 2 +- examples/e2e_nlp/config.yaml | 1 - examples/e2e_nlp/requirements.txt | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/e2e_nlp/.copier-answers.yml b/examples/e2e_nlp/.copier-answers.yml index ca8152a6309..3ca2ba198fe 100644 --- a/examples/e2e_nlp/.copier-answers.yml +++ b/examples/e2e_nlp/.copier-answers.yml @@ -1,5 +1,5 @@ # Changes here will be overwritten by Copier -_commit: 2024.08.29 +_commit: 2024.09.23 _src_path: gh:zenml-io/template-nlp accelerator: cpu cloud_of_choice: aws diff --git a/examples/e2e_nlp/config.yaml b/examples/e2e_nlp/config.yaml index 764ff93c8fc..e5c0b0cdfe6 100644 --- a/examples/e2e_nlp/config.yaml +++ b/examples/e2e_nlp/config.yaml @@ -28,7 +28,6 @@ settings: - mlflow - discord requirements: - - accelerate - zenml[server] extra: diff --git a/examples/e2e_nlp/requirements.txt b/examples/e2e_nlp/requirements.txt index f7f98175b16..e79245c3df9 100644 --- a/examples/e2e_nlp/requirements.txt +++ b/examples/e2e_nlp/requirements.txt @@ -1,4 +1,4 @@ torchvision -accelerate gradio zenml[server]>=0.56.3 +datasets>=2.12.0,<3.0.0 From 38f2ca7dd62c8a5c20423a3ac14db436694c8b2c Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Thu, 26 Sep 2024 10:04:10 +0530 Subject: [PATCH 14/21] apply review changes --- .../data_validators/expectations.py | 7 ++++-- .../data_validators/ge_data_validator.py | 22 ++++++++++--------- .../materializers/ge_materializer.py | 3 +++ 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/zenml/integrations/great_expectations/data_validators/expectations.py b/src/zenml/integrations/great_expectations/data_validators/expectations.py index ba82a5a0a89..1c082a1bc2f 100644 --- a/src/zenml/integrations/great_expectations/data_validators/expectations.py +++ b/src/zenml/integrations/great_expectations/data_validators/expectations.py @@ -1,4 +1,4 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing # permissions and limitations under the License. -"""Definition of the Deepchecks validation check types.""" +"""Definition of the Great Expectations expectation configuration.""" from typing import Any, Dict, Type @@ -57,6 +57,9 @@ class GreatExpectationExpectationConfig(BaseModel): def get_expectation_class(expectation_name: str) -> Type[Expectation]: """Get the Great Expectation expectation class associated with this config. + Args: + expectation_name: The name of the Great Expectation expectation in snake case. + Returns: The Great Expectation expectation class associated with this config. diff --git a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py index 6ef7d2ef1fc..9db80997209 100644 --- a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py +++ b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py @@ -335,25 +335,29 @@ def data_validation( be performed. Not supported by the Great Expectations data validator. expectations_list: A list of Great Expectations expectations to - use to validate the dataset. A value must be provided. + use to validate the dataset. Either this or expectation_suite_name + must be provided, but not both. expectation_parameters: Optional parameters to pass to the expectations if you have defined any parameters in the expectations. expectation_suite_name: The name of the expectation suite to use to - validate the dataset. A value must be provided. + validate the dataset. Either this or expectations_list must be + provided, but not both. data_asset_name: The name of the data asset to use to identify the dataset in the Great Expectations docs. action_list: A list of additional Great Expectations actions to run after the validation check. - result_format: The format of the validation results. + result_format: The format in which to return the results of the validation definitions. Default is "SUMMARY". + Other options are: "BOOLEAN_ONLY", "BASIC", "COMPLETE". Details in the docs: + https://docs.greatexpectations.io/docs/core/trigger_actions_based_on_results/choose_a_result_format/#define-a-result-format-configuration kwargs: Additional keyword arguments. Returns: The Great Expectations validation (checkpoint) result. Raises: - ValueError: if the expectation suite name and expectations list are both provided - or if neither are provided + ValueError: If both expectation_suite_name and expectations_list are provided, + or if neither are provided. """ if comparison_dataset is not None: logger.warning( @@ -391,23 +395,21 @@ def data_validation( else: # when the expectation_suite_name is provided expectation_suite = context.suites.get(name=expectation_suite_name) - # TODO need to create a batch definition batch_definition, batch_parameters, datasource_name = create_batch_definition(context, dataset, data_asset_name) # create a validation definition - validation_defintion = ge.ValidationDefinition( + validation_definition = ge.ValidationDefinition( data=batch_definition, suite=expectation_suite, name=f"{run_name}_{step_name}" ) - validation_defintion = context.validation_definitions.add(validation_defintion) + validation_definition = context.validation_definitions.add(validation_definition) # create a checkpoint checkpoint_name = f"{run_name}_{step_name}" checkpoint = ge.Checkpoint( name=checkpoint_name, - validation_definitions=[validation_defintion], + validation_definitions=[validation_definition], actions=action_list or [], - # get it from the kwargs result_format={"result_format": result_format}, ) diff --git a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py index 9a552a5ec40..04282077b18 100644 --- a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py +++ b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py @@ -182,6 +182,9 @@ def serialize_identifier(self, identifier: Union[ExpectationSuiteIdentifier, Val Returns: A string representation of the identifier. + + Raises: + ValueError: If the identifier type is not supported. """ if isinstance(identifier, ExpectationSuiteIdentifier): return f"ExpectationSuiteIdentifier:{identifier.expectation_suite_name}" From 19ad36ba0d4c1a082951d4617962dcec01096eeb Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Thu, 26 Sep 2024 12:24:17 +0530 Subject: [PATCH 15/21] add libs to mocked libs --- docs/mocked_libs.json | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/mocked_libs.json b/docs/mocked_libs.json index a7746f14bed..d9cb9e0bc6d 100644 --- a/docs/mocked_libs.json +++ b/docs/mocked_libs.json @@ -101,6 +101,10 @@ "google.oauth2", "great_expectations", "great_expectations.checkpoint", + "great_expectations.checkpoint.checkpoint", + "great_expectations.core.batch_definition", + "great_expectations.expectations", + "great_expectations.expectations.expectation", "great_expectations.checkpoint.types", "great_expectations.checkpoint.types.checkpoint_result", "great_expectations.core", From 14a17117dd4dc40098a52aa834d8f336a8b30bb8 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Thu, 3 Oct 2024 11:52:42 +0530 Subject: [PATCH 16/21] simplify materializer logic --- .../data_validators/ge_data_validator.py | 43 ++++++++-- ...reat_expectations_data_validator_flavor.py | 2 + .../materializers/ge_materializer.py | 82 +------------------ .../integrations/great_expectations/utils.py | 5 +- 4 files changed, 42 insertions(+), 90 deletions(-) diff --git a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py index 9db80997209..c3b0a11d054 100644 --- a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py +++ b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py @@ -395,21 +395,50 @@ def data_validation( else: # when the expectation_suite_name is provided expectation_suite = context.suites.get(name=expectation_suite_name) - batch_definition, batch_parameters, datasource_name = create_batch_definition(context, dataset, data_asset_name) - - # create a validation definition - validation_definition = ge.ValidationDefinition( + batch_definition, batch_parameters, data_source = create_batch_definition(context, dataset, data_asset_name) + + validation_definition = { + "name": f"{run_name}_{step_name}", + "data": { + "datasource": { + "name": data_source.name, + "id": data_source.id + }, + "asset": { + "name": data_source.assets[0].name, + "id": data_source.assets[0].id + }, + "batch_definition": { + "name": batch_definition.name, + "id": batch_definition.id + } + }, + "suite": { + "name": expectation_suite.name, + "id": expectation_suite.id + }, + } + + validation_definition_obj = ge.ValidationDefinition( data=batch_definition, suite=expectation_suite, name=f"{run_name}_{step_name}" ) - validation_definition = context.validation_definitions.add(validation_definition) + # create a validation definition + _ = context.validation_definitions.add(validation_definition_obj) + + # add an action to update all data docs sites + # not specifying site_names, so this will update all data docs sites + action_update_data_docs = { + "name": "update_data_docs", + "type": "update_data_docs" + } # create a checkpoint checkpoint_name = f"{run_name}_{step_name}" checkpoint = ge.Checkpoint( name=checkpoint_name, validation_definitions=[validation_definition], - actions=action_list or [], + actions=action_list or [action_update_data_docs], result_format={"result_format": result_format}, ) @@ -422,6 +451,6 @@ def data_validation( expectation_parameters=expectation_parameters ) finally: - context.delete_datasource(datasource_name) + context.delete_datasource(data_source.name) return results diff --git a/src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py b/src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py index fdbda159fbe..18d98170361 100644 --- a/src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py +++ b/src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py @@ -40,6 +40,7 @@ class GreatExpectationsDataValidatorConfig(BaseDataValidatorConfig): """Config for the Great Expectations data validator. Attributes: + project_root_dir: location of the root directory of the Great Expectations project. context_root_dir: location of an already initialized Great Expectations data context. If configured, the data validator will only be usable with local orchestrators. @@ -54,6 +55,7 @@ class GreatExpectationsDataValidatorConfig(BaseDataValidatorConfig): Expectations docs are generated and can be visualized locally. """ + project_root_dir: Optional[str] = None context_root_dir: Optional[str] = None context_config: Optional[Dict[str, Any]] = None configure_zenml_stores: bool = False diff --git a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py index 04282077b18..242f00ed158 100644 --- a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py +++ b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py @@ -123,93 +123,13 @@ def save(self, obj: SerializableDictDot) -> None: obj: A Great Expectations object. """ filepath = os.path.join(self.uri, ARTIFACT_FILENAME) - artifact_dict = self.serialize_ge_object(obj) + artifact_dict = json.loads(obj.json()) artifact_type = type(obj) artifact_dict["data_type"] = ( f"{artifact_type.__module__}.{artifact_type.__name__}" ) yaml_utils.write_json(filepath, artifact_dict) - def serialize_ge_object(self, obj: Any) -> Any: - """Serialize a Great Expectations object to a JSON-serializable structure. - - Args: - obj: A Great Expectations object. - - Returns: - A JSON-serializable representation of the object. - """ - if isinstance(obj, dict): - return {self.serialize_key(k): self.serialize_ge_object(v) for k, v in obj.items()} - elif isinstance(obj, list): - return [self.serialize_ge_object(v) for v in obj] - elif isinstance(obj, (ExpectationSuiteIdentifier, ValidationResultIdentifier)): - return self.serialize_identifier(obj) - elif isinstance(obj, Checkpoint): - return self.serialize_checkpoint(obj) - elif isinstance(obj, datetime): - return obj.isoformat() - elif isinstance(obj, uuid.UUID): - return str(obj) - elif hasattr(obj, "to_json_dict"): - return self.serialize_ge_object(obj.to_json_dict()) - elif hasattr(obj, "__dict__"): - return self.serialize_ge_object(obj.__dict__) - else: - return obj - - def serialize_key(self, key: Any) -> str: - """Serialize a dictionary key to a string. - - Args: - key: The key to serialize. - - Returns: - A string representation of the key. - """ - if isinstance(key, (str, int, float, bool)) or key is None: - return str(key) - elif isinstance(key, (ExpectationSuiteIdentifier, ValidationResultIdentifier)): - return self.serialize_identifier(key) - else: - return str(key) - - def serialize_identifier(self, identifier: Union[ExpectationSuiteIdentifier, ValidationResultIdentifier]) -> str: - """Serialize ExpectationSuiteIdentifier or ValidationResultIdentifier to a string. - - Args: - identifier: The identifier to serialize. - - Returns: - A string representation of the identifier. - - Raises: - ValueError: If the identifier type is not supported. - """ - if isinstance(identifier, ExpectationSuiteIdentifier): - return f"ExpectationSuiteIdentifier:{identifier.expectation_suite_name}" - elif isinstance(identifier, ValidationResultIdentifier): - return f"ValidationResultIdentifier:{identifier.expectation_suite_identifier}:{identifier.run_id}:{identifier.batch_identifier}" - else: - raise ValueError(f"Unsupported identifier type: {type(identifier)}") - - def serialize_checkpoint(self, checkpoint: Checkpoint) -> Dict[str, Any]: - """Serialize a Checkpoint object. - - Args: - checkpoint: The Checkpoint object to serialize. - - Returns: - A dictionary representation of the Checkpoint. - """ - return { - "name": checkpoint.name, - "validation_definitions": [self.serialize_ge_object(vd) for vd in checkpoint.validation_definitions], - "actions": checkpoint.actions, - "result_format": checkpoint.result_format, - "id": str(checkpoint.id) if checkpoint.id else None, - } - def save_visualizations( self, data: Union[ExpectationSuite, CheckpointResult] ) -> Dict[str, VisualizationType]: diff --git a/src/zenml/integrations/great_expectations/utils.py b/src/zenml/integrations/great_expectations/utils.py index 30091e4e885..b90b42a94dd 100644 --- a/src/zenml/integrations/great_expectations/utils.py +++ b/src/zenml/integrations/great_expectations/utils.py @@ -20,6 +20,7 @@ from great_expectations.data_context.data_context.abstract_data_context import ( AbstractDataContext, ) +from great_expectations.datasource.fluent.interfaces import Datasource from zenml import get_step_context from zenml.logger import get_logger @@ -32,7 +33,7 @@ def create_batch_definition( context: AbstractDataContext, dataset: pd.DataFrame, data_asset_name: Optional[str], -) -> Tuple[BatchDefinition, Dict[str, Any], str]: +) -> Tuple[BatchDefinition, Dict[str, Any], Datasource]: """Create a temporary runtime GE batch request from a dataset step artifact. Args: @@ -70,4 +71,4 @@ def create_batch_definition( batch_parameters = {"dataframe": dataset} - return batch_definition, batch_parameters, datasource_name + return batch_definition, batch_parameters, data_source From 4bc0632346294b40c49e5d73e30e7b33daa19542 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Fri, 4 Oct 2024 12:05:21 +0530 Subject: [PATCH 17/21] dont delete datasource and fix materializer --- .../data_validators/ge_data_validator.py | 4 +- .../materializers/ge_materializer.py | 69 ++++++++++++++++++- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py index c3b0a11d054..3cf3d1e3523 100644 --- a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py +++ b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py @@ -450,7 +450,7 @@ def data_validation( batch_parameters=batch_parameters, expectation_parameters=expectation_parameters ) - finally: - context.delete_datasource(data_source.name) + except Exception as e: + raise e return results diff --git a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py index 242f00ed158..119317a97cb 100644 --- a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py +++ b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py @@ -123,13 +123,80 @@ def save(self, obj: SerializableDictDot) -> None: obj: A Great Expectations object. """ filepath = os.path.join(self.uri, ARTIFACT_FILENAME) - artifact_dict = json.loads(obj.json()) + artifact_dict = self.serialize_ge_object(obj) artifact_type = type(obj) artifact_dict["data_type"] = ( f"{artifact_type.__module__}.{artifact_type.__name__}" ) yaml_utils.write_json(filepath, artifact_dict) + def serialize_ge_object(self, obj: Any) -> Any: + """Serialize a Great Expectations object to a JSON-serializable structure. + + Args: + obj: A Great Expectations object. + + Returns: + A JSON-serializable representation of the object. + """ + if isinstance(obj, CheckpointResult): + return { + "name": obj.name, + "run_id": self.serialize_ge_object(obj.run_id), + "run_results": self.serialize_ge_object(obj.run_results), + "checkpoint_config": self.serialize_ge_object(obj.checkpoint_config), + "success": obj.success, + } + elif isinstance(obj, dict): + return {self.serialize_key(k): self.serialize_ge_object(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self.serialize_ge_object(v) for v in obj] + elif hasattr(obj, 'to_json_dict'): + return obj.to_json_dict() + elif hasattr(obj, 'to_tuple'): + return obj.to_tuple() + elif hasattr(obj, 'json'): + return obj.json() + elif isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, uuid.UUID): + return str(obj) + else: + return obj + + def serialize_key(self, key: Any) -> str: + """Serialize a dictionary key to a string. + + Args: + key: The key to serialize. + + Returns: + A string representation of the key. + """ + if isinstance(key, (str, int, float, bool)) or key is None: + return str(key) + elif isinstance(key, (ExpectationSuiteIdentifier, ValidationResultIdentifier)): + return self.serialize_identifier(key) + else: + return str(key) + + def serialize_identifier(self, identifier: Union[ExpectationSuiteIdentifier, ValidationResultIdentifier]) -> str: + """Serialize ExpectationSuiteIdentifier or ValidationResultIdentifier to a string. + + Args: + identifier: The identifier to serialize. + + Returns: + A string representation of the identifier. + + Raises: + ValueError: If the identifier type is not supported. + """ + if isinstance(identifier, ExpectationSuiteIdentifier): + return f"ExpectationSuiteIdentifier:{identifier.expectation_suite_name}" + elif isinstance(identifier, ValidationResultIdentifier): + return f"ValidationResultIdentifier:{identifier.expectation_suite_identifier}:{identifier.run_id}:{identifier.batch_identifier}" + def save_visualizations( self, data: Union[ExpectationSuite, CheckpointResult] ) -> Dict[str, VisualizationType]: From 8ee1646437977ccc35525606a46d0c8e7c7973cf Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Fri, 4 Oct 2024 12:25:25 +0530 Subject: [PATCH 18/21] fix loading logic --- .../great_expectations/materializers/ge_materializer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py index 119317a97cb..f359f51c3b3 100644 --- a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py +++ b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py @@ -80,7 +80,7 @@ def preprocess_run_result(key: str, value: Any) -> Any: return value artifact_dict["checkpoint_config"] = Checkpoint( - **artifact_dict["checkpoint_config"] + **json.loads(artifact_dict["checkpoint_config"]) ) validation_dict = {} for result_ident, results in artifact_dict["run_results"].items(): From 237fa41a7074441b3dd7947d6fe9233d85a5f79e Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Sun, 6 Oct 2024 12:33:07 +0530 Subject: [PATCH 19/21] add validation def store --- .../great_expectations/data_validators/ge_data_validator.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py index 3cf3d1e3523..5ad138189ee 100644 --- a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py +++ b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py @@ -199,6 +199,7 @@ def data_context(self) -> AbstractDataContext: expectations_store_name = "zenml_expectations_store" validation_results_store_name = "zenml_validation_results_store" checkpoint_store_name = "zenml_checkpoint_store" + validation_definition_store_name = "validation_definition_store" # Define default configuration options that plug the GX stores # in the active ZenML artifact store @@ -213,6 +214,9 @@ def data_context(self) -> AbstractDataContext: checkpoint_store_name: self.get_store_config( "CheckpointStore", "checkpoints" ), + validation_definition_store_name: self.get_store_config( + "ValidationDefinitionStore", "validation_definitions" + ) }, expectations_store_name=expectations_store_name, validation_results_store_name=validation_results_store_name, From fdaa83af748b8215f4d375793cf903a40ea445e4 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Sun, 6 Oct 2024 12:33:27 +0530 Subject: [PATCH 20/21] set project on context call --- .../great_expectations/data_validators/ge_data_validator.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py index 5ad138189ee..c1f713e8eac 100644 --- a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py +++ b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py @@ -24,6 +24,9 @@ from great_expectations.core import ( # type: ignore[import-untyped] ExpectationSuite, ) +from great_expectations.data_context.data_context import ( + project_manager +) from great_expectations.data_context.data_context.abstract_data_context import ( AbstractDataContext, ) @@ -290,6 +293,7 @@ def data_context(self) -> AbstractDataContext: self.get_data_docs_config("data_docs", local=True) ) + project_manager.set_project(self._context) return self._context @property From 1bdafc91886a43448a84f4a99eb5e5edb206b1f6 Mon Sep 17 00:00:00 2001 From: Jayesh Sharma Date: Sun, 6 Oct 2024 12:34:11 +0530 Subject: [PATCH 21/21] fix validation ident tuple --- .../materializers/ge_materializer.py | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py index f359f51c3b3..702f7b1b811 100644 --- a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py +++ b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py @@ -14,6 +14,7 @@ """Implementation of the Great Expectations materializers.""" import os +import re from typing import TYPE_CHECKING, Any, ClassVar, Dict, Tuple, Type, Union, cast import json @@ -79,14 +80,33 @@ def preprocess_run_result(key: str, value: Any) -> Any: return ExpectationSuiteValidationResult(**value) return value - artifact_dict["checkpoint_config"] = Checkpoint( - **json.loads(artifact_dict["checkpoint_config"]) - ) + # artifact_dict["checkpoint_config"] = Checkpoint( + # **json.loads(artifact_dict["checkpoint_config"]) + # ) + artifact_dict["checkpoint_config"] = Checkpoint(name="test", validation_definitions=[]) validation_dict = {} for result_ident, results in artifact_dict["run_results"].items(): + after_double_colon = result_ident.split('::', 1)[1] + + # Use a regular expression to extract the JSON part + json_match = re.search(r'\{.*?\}', after_double_colon, re.DOTALL) + if json_match: + run_id_json_str = json_match.group(0) + # Parse the JSON string to extract run_name and run_time + run_id_data = json.loads(run_id_json_str) + + # Extract run_name and run_time + run_name = run_id_data.get("run_name") + run_time = run_id_data.get("run_time") + + # Now split the remaining part by colons to get the components + components = after_double_colon.split(':') + expectation_suite_identifier = components[0] + batch_identifier = components[-1] + validation_ident_tuple = (expectation_suite_identifier, run_name, run_time, batch_identifier) validation_ident = ( ValidationResultIdentifier.from_fixed_length_tuple( # type: ignore[no-untyped-call] - result_ident.split("::")[1].split("/") + validation_ident_tuple ) ) validation_results = { @@ -109,7 +129,7 @@ def load(self, data_type: Type[Any]) -> SerializableDictDot: artifact_dict = yaml_utils.read_json(filepath) data_type = source_utils.load(artifact_dict.pop("data_type")) # load active data context - context = GreatExpectationsDataValidator.get_data_context() + _ = GreatExpectationsDataValidator.get_data_context() if data_type is CheckpointResult: self.preprocess_checkpoint_result_dict(artifact_dict)