diff --git a/docs/book/component-guide/data-validators/great-expectations.md b/docs/book/component-guide/data-validators/great-expectations.md index 9759e1f47bb..b5e1a6202f9 100644 --- a/docs/book/component-guide/data-validators/great-expectations.md +++ b/docs/book/component-guide/data-validators/great-expectations.md @@ -6,17 +6,16 @@ description: >- # Great Expectations -The Great Expectations [Data Validator](./data-validators.md) flavor provided with the ZenML integration uses [Great Expectations](https://greatexpectations.io/) to run data profiling and data quality tests on the data circulated through your pipelines. The test results can be used to implement automated corrective actions in your pipelines. They are also automatically rendered into documentation for further visual interpretation and evaluation. +The Great Expectations [Data Validator](./data-validators.md) flavor provided with the ZenML integration uses [Great Expectations](https://greatexpectations.io/) to run data validation tests on the data circulated through your pipelines. The test results can be used to implement automated corrective actions in your pipelines. They are also automatically rendered into documentation for further visual interpretation and evaluation. ### When would you want to use it? -[Great Expectations](https://greatexpectations.io/) is an open-source library that helps keep the quality of your data in check through data testing, documentation, and profiling, and to improve communication and observability. Great Expectations works with tabular data in a variety of formats and data sources, of which ZenML currently supports only `pandas.DataFrame` as part of its pipelines. +[Great Expectations](https://greatexpectations.io/) is an open-source library that helps keep the quality of your data in check through data testing, and documentation and to improve communication and observability. Great Expectations works with tabular data in a variety of formats and data sources, of which ZenML currently supports only `pandas.DataFrame` as part of its pipelines. You should use the Great Expectations Data Validator when you need the following data validation features that are possible with Great Expectations: -* [Data Profiling](https://docs.greatexpectations.io/docs/oss/guides/expectations/creating_custom_expectations/how_to_add_support_for_the_auto_initializing_framework_to_a_custom_expectation/#build-a-custom-profiler-for-your-expectation): generates a set of validation rules (Expectations) automatically by inferring them from the properties of an input dataset. -* [Data Quality](https://docs.greatexpectations.io/docs/oss/guides/validation/checkpoints/how_to_pass_an_in_memory_dataframe_to_a_checkpoint/): runs a set of predefined or inferred validation rules (Expectations) against an in-memory dataset. -* [Data Docs](https://docs.greatexpectations.io/docs/reference/learn/terms/data_docs_store/): generate and maintain human-readable documentation of all your data validation rules, data quality checks and their results. +* [Data Validation](https://docs.greatexpectations.io/docs/core/trigger_actions_based_on_results/run_a_checkpoint): runs a set of predefined or inferred validation rules (Expectations) against an in-memory dataset. +* [Data Docs](https://docs.greatexpectations.io/docs/core/configure_project_settings/configure_data_docs/): generate and maintain human-readable documentation of all your data validation rules, data quality checks and their results. You should consider one of the other [Data Validator flavors](./data-validators.md#data-validator-flavors) if you need a different set of data validation features. @@ -105,80 +104,16 @@ For more, up-to-date information on the Great Expectations Data Validator config The core Great Expectations concepts that you should be aware of when using it within ZenML pipelines are Expectations / Expectation Suites, Validations and Data Docs. -ZenML wraps the Great Expectations' functionality in the form of two standard steps: - -* a Great Expectations data profiler that can be used to automatically generate Expectation Suites from an input `pandas.DataFrame` dataset -* a Great Expectations data validator that uses an existing Expectation Suite to validate an input `pandas.DataFrame` dataset +ZenML wraps the Great Expectations' functionality in the form of a standard data validator step that uses an existing Expectation Suite or a list of Expectations to validate an input `pandas.DataFrame` dataset You can visualize Great Expectations Suites and Results in Jupyter notebooks or view them directly in the ZenML dashboard. -#### The Great Expectation's data profiler step - -The standard Great Expectation's data profiler step builds an Expectation Suite automatically by running a [`UserConfigurableProfiler`](https://docs.greatexpectations.io/docs/guides/expectations/how\_to\_create\_and\_edit\_expectations\_with\_a\_profiler) on an input `pandas.DataFrame` dataset. The generated Expectation Suite is saved in the Great Expectations Expectation Store, but also returned as an `ExpectationSuite` artifact that is versioned and saved in the ZenML Artifact Store. The step automatically rebuilds the Data Docs. - -At a minimum, the step configuration expects a name to be used for the Expectation Suite: - -```python -from zenml.integrations.great_expectations.steps import ( - great_expectations_profiler_step, -) - -ge_profiler_step = great_expectations_profiler_step.with_options( - parameters={ - "expectation_suite_name": "steel_plates_suite", - "data_asset_name": "steel_plates_train_df", - } -) -``` - -The step can then be inserted into your pipeline where it can take in a pandas dataframe, e.g.: - -```python -from zenml import pipeline - -docker_settings = DockerSettings(required_integrations=[SKLEARN, GREAT_EXPECTATIONS]) - -@pipeline(settings={"docker": docker_settings}) -def profiling_pipeline(): - """Data profiling pipeline for Great Expectations. - - The pipeline imports a reference dataset from a source then uses the builtin - Great Expectations profiler step to generate an expectation suite (i.e. - validation rules) inferred from the schema and statistical properties of the - reference dataset. - - Args: - importer: reference data importer step - profiler: data profiler step - """ - dataset, _ = importer() - ge_profiler_step(dataset) - - -profiling_pipeline() -``` - -As can be seen from the [step definition](https://apidocs.zenml.io/latest/integration\_code\_docs/integrations-great\_expectations/#zenml.integrations.great\_expectations.steps.ge\_profiler.great\_expectations\_profiler\_step) , the step takes in a `pandas.DataFrame` dataset, and it returns a Great Expectations `ExpectationSuite` object: - -```python -@step -def great_expectations_profiler_step( - dataset: pd.DataFrame, - expectation_suite_name: str, - data_asset_name: Optional[str] = None, - profiler_kwargs: Optional[Dict[str, Any]] = None, - overwrite_existing_suite: bool = True, -) -> ExpectationSuite: - ... -``` - -You can view [the complete list of configuration parameters](https://apidocs.zenml.io/latest/integration\_code\_docs/integrations-great\_expectations/#zenml.integrations.great\_expectations.steps.ge\_profiler.great\_expectations\_profiler\_step) in the SDK docs. #### The Great Expectations data validator step -The standard Great Expectations data validator step validates an input `pandas.DataFrame` dataset by running an existing Expectation Suite on it. The validation results are saved in the Great Expectations Validation Store, but also returned as an `CheckpointResult` artifact that is versioned and saved in the ZenML Artifact Store. The step automatically rebuilds the Data Docs. +The standard Great Expectations data validator step validates an input `pandas.DataFrame` dataset by running an existing Expectation Suite or a list of Expectations on it. The validation results are saved in the Great Expectations Validation Store, but also returned as an `CheckpointResult` artifact that is versioned and saved in the ZenML Artifact Store. The step automatically rebuilds the Data Docs. -At a minimum, the step configuration expects the name of the Expectation Suite to be used for the validation: +At a minimum, the step configuration expects the name of the Expectation Suite or a list of Expectations to be used for the validation. In the example below, we use a list of Expectations. Each expectation is defined as a `GreatExpectationExpectationConfig` object with the name of the expectation written in snake case and the arguments of the expectation defined as a dictionary: ```python from zenml.integrations.great_expectations.steps import ( @@ -187,13 +122,18 @@ from zenml.integrations.great_expectations.steps import ( ge_validator_step = great_expectations_validator_step.with_options( parameters={ - "expectation_suite_name": "steel_plates_suite", - "data_asset_name": "steel_plates_train_df", - } + "expectations_list": [ + GreatExpectationExpectationConfig( + expectation_name="expect_column_values_to_not_be_null", + expectation_args={"column": "X_Minimum"}, + ) + ], + "data_asset_name": "my_data_asset", + }, ) ``` -The step can then be inserted into your pipeline where it can take in a pandas dataframe and a bool flag used solely for order reinforcement purposes, e.g.: +The step can then be inserted into your pipeline where it can take in a pandas dataframe e.g.: ```python docker_settings = DockerSettings(required_integrations=[SKLEARN, GREAT_EXPECTATIONS]) @@ -211,23 +151,26 @@ def validation_pipeline(): validator: dataset validation step checker: checks the validation results """ - dataset, condition = importer() - results = ge_validator_step(dataset, condition) + dataset = importer() + results = ge_validator_step(dataset) message = checker(results) validation_pipeline() ``` -As can be seen from the [step definition](https://apidocs.zenml.io/latest/integration\_code\_docs/integrations-great\_expectations/#zenml.integrations.great\_expectations.steps.ge\_validator.great\_expectations\_validator\_step) , the step takes in a `pandas.DataFrame` dataset and a boolean `condition` and it returns a Great Expectations `CheckpointResult` object. The boolean `condition` is only used as a means of ordering steps in a pipeline (e.g. if you must force it to run only after the data profiling step generates an Expectation Suite): +As can be seen from the [step definition](https://apidocs.zenml.io/latest/integration\_code\_docs/integrations-great\_expectations/#zenml.integrations.great\_expectations.steps.ge\_validator.great\_expectations\_validator\_step) , the step takes in a `pandas.DataFrame` dataset and it returns a Great Expectations `CheckpointResult` object: ```python @step def great_expectations_validator_step( dataset: pd.DataFrame, - expectation_suite_name: str, + expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, - action_list: Optional[List[Dict[str, Any]]] = None, + action_list: Optional[List[ge.checkpoint.actions.ValidationAction]] = None, + expectation_parameters: Optional[Dict[str, Any]] = None, + expectations_list: Optional[List[GreatExpectationExpectationConfig]] = None, + result_format: str = "SUMMARY", exit_on_error: bool = False, ) -> CheckpointResult: ``` @@ -262,18 +205,13 @@ def create_custom_expectation_suite( # context = ge.get_context() expectation_suite_name = "custom_suite" - suite = context.create_expectation_suite( - expectation_suite_name=expectation_suite_name - ) - expectation_configuration = ExpectationConfiguration(...) - suite.add_expectation(expectation_configuration=expectation_configuration) - ... - context.save_expectation_suite( - expectation_suite=suite, - expectation_suite_name=expectation_suite_name, + expectation_suite = ExpectationSuite( + name=expectation_suite_name, + expectations=[], ) + context.suites.add(expectation_suite) context.build_data_docs() - return suite + return expectation_suite ``` The same approach must be used if you are using a Great Expectations configuration managed by ZenML and are using the Jupyter notebooks generated by the Great Expectations CLI. diff --git a/docs/mocked_libs.json b/docs/mocked_libs.json index a7746f14bed..d9cb9e0bc6d 100644 --- a/docs/mocked_libs.json +++ b/docs/mocked_libs.json @@ -101,6 +101,10 @@ "google.oauth2", "great_expectations", "great_expectations.checkpoint", + "great_expectations.checkpoint.checkpoint", + "great_expectations.core.batch_definition", + "great_expectations.expectations", + "great_expectations.expectations.expectation", "great_expectations.checkpoint.types", "great_expectations.checkpoint.types.checkpoint_result", "great_expectations.core", diff --git a/examples/e2e_nlp/.copier-answers.yml b/examples/e2e_nlp/.copier-answers.yml index ca8152a6309..3ca2ba198fe 100644 --- a/examples/e2e_nlp/.copier-answers.yml +++ b/examples/e2e_nlp/.copier-answers.yml @@ -1,5 +1,5 @@ # Changes here will be overwritten by Copier -_commit: 2024.08.29 +_commit: 2024.09.23 _src_path: gh:zenml-io/template-nlp accelerator: cpu cloud_of_choice: aws diff --git a/examples/e2e_nlp/config.yaml b/examples/e2e_nlp/config.yaml index 764ff93c8fc..e5c0b0cdfe6 100644 --- a/examples/e2e_nlp/config.yaml +++ b/examples/e2e_nlp/config.yaml @@ -28,7 +28,6 @@ settings: - mlflow - discord requirements: - - accelerate - zenml[server] extra: diff --git a/examples/e2e_nlp/requirements.txt b/examples/e2e_nlp/requirements.txt index f7f98175b16..e79245c3df9 100644 --- a/examples/e2e_nlp/requirements.txt +++ b/examples/e2e_nlp/requirements.txt @@ -1,4 +1,4 @@ torchvision -accelerate gradio zenml[server]>=0.56.3 +datasets>=2.12.0,<3.0.0 diff --git a/examples/mlops_starter/.copier-answers.yml b/examples/mlops_starter/.copier-answers.yml index 48aecdfa31b..49b01a7c71e 100644 --- a/examples/mlops_starter/.copier-answers.yml +++ b/examples/mlops_starter/.copier-answers.yml @@ -1,5 +1,5 @@ # Changes here will be overwritten by Copier -_commit: 2024.08.28 +_commit: 2024.09.23 _src_path: gh:zenml-io/template-starter email: info@zenml.io full_name: ZenML GmbH diff --git a/examples/mlops_starter/README.md b/examples/mlops_starter/README.md index 10dc01e622d..b25258f78bd 100644 --- a/examples/mlops_starter/README.md +++ b/examples/mlops_starter/README.md @@ -24,7 +24,7 @@ Along the way we will also show you how to: You can use Google Colab to see ZenML in action, no signup / installation required! -Open In Colab +Open In Colab ## :computer: Run Locally @@ -36,7 +36,7 @@ pip install "zenml[server]" # clone the ZenML repository git clone https://github.com/zenml-io/zenml.git -cd zenml/examples/quickstart +cd zenml/examples/mlops_starter ``` Now we're ready to start. You have two options for running the quickstart locally: @@ -45,13 +45,13 @@ Now we're ready to start. You have two options for running the quickstart locall ```bash pip install notebook jupyter notebook -# open notebooks/quickstart.ipynb +# open quickstart.ipynb ``` #### Option 2 - Execute the whole ML pipeline from a Python script: ```bash # Install required zenml integrations -zenml integration install sklearn -y +zenml integration install sklearn pandas -y # Initialize ZenML zenml init diff --git a/examples/mlops_starter/configs/feature_engineering.yaml b/examples/mlops_starter/configs/feature_engineering.yaml index d5ab212951e..055bb6facac 100644 --- a/examples/mlops_starter/configs/feature_engineering.yaml +++ b/examples/mlops_starter/configs/feature_engineering.yaml @@ -3,6 +3,7 @@ settings: docker: required_integrations: - sklearn + - pandas requirements: - pyarrow diff --git a/examples/mlops_starter/configs/inference.yaml b/examples/mlops_starter/configs/inference.yaml index 1dcefe44cf6..8f73d76256a 100644 --- a/examples/mlops_starter/configs/inference.yaml +++ b/examples/mlops_starter/configs/inference.yaml @@ -3,6 +3,7 @@ settings: docker: required_integrations: - sklearn + - pandas requirements: - pyarrow diff --git a/examples/mlops_starter/configs/training_rf.yaml b/examples/mlops_starter/configs/training_rf.yaml index 8d75610985f..70fa6413646 100644 --- a/examples/mlops_starter/configs/training_rf.yaml +++ b/examples/mlops_starter/configs/training_rf.yaml @@ -3,6 +3,7 @@ settings: docker: required_integrations: - sklearn + - pandas requirements: - pyarrow diff --git a/examples/mlops_starter/configs/training_sgd.yaml b/examples/mlops_starter/configs/training_sgd.yaml index 857cdf7f82a..386b53b8c30 100644 --- a/examples/mlops_starter/configs/training_sgd.yaml +++ b/examples/mlops_starter/configs/training_sgd.yaml @@ -3,6 +3,7 @@ settings: docker: required_integrations: - sklearn + - pandas requirements: - pyarrow diff --git a/examples/mlops_starter/quickstart.ipynb b/examples/mlops_starter/quickstart.ipynb index 3604e7dd6a7..df8c010b5ea 100644 --- a/examples/mlops_starter/quickstart.ipynb +++ b/examples/mlops_starter/quickstart.ipynb @@ -31,7 +31,7 @@ "required!\n", "\n", "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](\n", - "https://colab.research.google.com/github/zenml-io/zenml/blob/main/examples/quickstart/quickstart.ipynb)" + "https://colab.research.google.com/github/zenml-io/zenml/blob/main/examples/mlops_starter/quickstart.ipynb)" ] }, { diff --git a/examples/mlops_starter/requirements.txt b/examples/mlops_starter/requirements.txt index d38ead5e262..1e0a8ac5dac 100644 --- a/examples/mlops_starter/requirements.txt +++ b/examples/mlops_starter/requirements.txt @@ -2,3 +2,4 @@ zenml[server]>=0.50.0 notebook scikit-learn pyarrow +pandas diff --git a/src/zenml/integrations/great_expectations/__init__.py b/src/zenml/integrations/great_expectations/__init__.py index fb1fe8e4a76..06a6398e36a 100644 --- a/src/zenml/integrations/great_expectations/__init__.py +++ b/src/zenml/integrations/great_expectations/__init__.py @@ -30,7 +30,7 @@ class GreatExpectationsIntegration(Integration): """Definition of Great Expectations integration for ZenML.""" NAME = GREAT_EXPECTATIONS - REQUIREMENTS = ["great-expectations>=0.17.15,<1.0"] + REQUIREMENTS = ["great-expectations~=1.0.0"] REQUIREMENTS_IGNORED_ON_UNINSTALL = ["pandas"] diff --git a/src/zenml/integrations/great_expectations/data_validators/expectations.py b/src/zenml/integrations/great_expectations/data_validators/expectations.py new file mode 100644 index 00000000000..1c082a1bc2f --- /dev/null +++ b/src/zenml/integrations/great_expectations/data_validators/expectations.py @@ -0,0 +1,102 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Definition of the Great Expectations expectation configuration.""" + +from typing import Any, Dict, Type + +from great_expectations.expectations.expectation import Expectation + +from pydantic import BaseModel + +from zenml.config.source import Source, SourceType +from zenml.logger import get_logger +from zenml.utils import source_utils +from importlib import import_module + +logger = get_logger(__name__) + + +class GreatExpectationExpectationConfig(BaseModel): + """Great Expectation expectation configuration. + + This class defines the configuration parameters that can be used to + customize the behavior of a Great Expectation Expectation. This includes + the expectation name (in snake case), and any input options that the + expectation expects. This could be the column name, the value to + compare against, etc. + + You can also choose to pass in parameters as the value to an input option. + You can then supply the parameters as a dictionary to the expectation_parameters + field. But this is not particularly useful when you are defining the expectations + directly with ZenML. The value of defining parameters in expectations and + passing the values at runtime is more pronounced when you already have a suite + of expectations defined and you use this suite with ZenML, with different + parameters. + + Attributes: + expectation_name: The name of the Great Expectation expectation to apply in snake case. + expectation_args: Input arguments to pass to the expectation. These can be + used to pass the column name, the value to compare against, etc. + + """ + expectation_name: str + expectation_args: Dict[str, Any] = {} + + @staticmethod + def get_expectation_class(expectation_name: str) -> Type[Expectation]: + """Get the Great Expectation expectation class associated with this config. + + Args: + expectation_name: The name of the Great Expectation expectation in snake case. + + Returns: + The Great Expectation expectation class associated with this config. + + Raises: + TypeError: If the expectation name could not be converted to a valid + Great Expectation expectation class. This can happen for example + if the expectation name does not map to a valid Great Expectation + expectation class. + """ + module_name = f"great_expectations.expectations.core.{expectation_name}" + class_name = "".join(word.capitalize() for word in expectation_name.split("_")) + + return source_utils.load_and_validate_class( + source=Source(module=module_name, attribute=class_name, type=SourceType.USER), + expected_class=Expectation, + ) + + def get_expectation(self) -> Expectation: + """Get the Great Expectation expectation object associated with this config. + + Returns: + The Great Expectation expectation object associated with this config. + """ + try: + expectation_class = self.get_expectation_class(self.expectation_name) + expectation = expectation_class(**self.expectation_args) + except TypeError: + raise ValueError( + f"Could not map the `{self.expectation_name}` expectation " + f"identifier to a valid Great Expectation expectation class." + ) + except Exception as e: + raise ValueError( + f"An error occurred while trying to instantiate the " + f"`{self.expectation_name}` expectation class " + f"with the following parameters: {self.expectation_args}" + f"Exception: {str(e)}" + ) + + return expectation diff --git a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py index 0eb8f7a590c..c1f713e8eac 100644 --- a/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py +++ b/src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py @@ -17,12 +17,16 @@ from typing import Any, ClassVar, Dict, List, Optional, Sequence, Type, cast import pandas as pd -from great_expectations.checkpoint.types.checkpoint_result import ( # type: ignore[import-untyped] +import great_expectations as ge +from great_expectations.checkpoint.checkpoint import ( # type: ignore[import-untyped] CheckpointResult, ) from great_expectations.core import ( # type: ignore[import-untyped] ExpectationSuite, ) +from great_expectations.data_context.data_context import ( + project_manager +) from great_expectations.data_context.data_context.abstract_data_context import ( AbstractDataContext, ) @@ -38,13 +42,11 @@ from great_expectations.data_context.types.resource_identifiers import ( ExpectationSuiteIdentifier, ) -from great_expectations.profile.user_configurable_profiler import ( # type: ignore[import-untyped] - UserConfigurableProfiler, -) from zenml import get_step_context from zenml.client import Client from zenml.data_validators import BaseDataValidator, BaseDataValidatorFlavor +from zenml.integrations.great_expectations.data_validators.expectations import GreatExpectationExpectationConfig from zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor import ( GreatExpectationsDataValidatorConfig, GreatExpectationsDataValidatorFlavor, @@ -52,7 +54,7 @@ from zenml.integrations.great_expectations.ge_store_backend import ( ZenMLArtifactStoreBackend, ) -from zenml.integrations.great_expectations.utils import create_batch_request +from zenml.integrations.great_expectations.utils import create_batch_definition from zenml.io import fileio from zenml.logger import get_logger from zenml.utils import io_utils @@ -198,10 +200,9 @@ def data_context(self) -> AbstractDataContext: """ if not self._context: expectations_store_name = "zenml_expectations_store" - validations_store_name = "zenml_validations_store" + validation_results_store_name = "zenml_validation_results_store" checkpoint_store_name = "zenml_checkpoint_store" - profiler_store_name = "zenml_profiler_store" - evaluation_parameter_store_name = "evaluation_parameter_store" + validation_definition_store_name = "validation_definition_store" # Define default configuration options that plug the GX stores # in the active ZenML artifact store @@ -210,24 +211,19 @@ def data_context(self) -> AbstractDataContext: expectations_store_name: self.get_store_config( "ExpectationsStore", "expectations" ), - validations_store_name: self.get_store_config( - "ValidationsStore", "validations" + validation_results_store_name: self.get_store_config( + "ValidationResultsStore", "validation_results" ), checkpoint_store_name: self.get_store_config( "CheckpointStore", "checkpoints" ), - profiler_store_name: self.get_store_config( - "ProfilerStore", "profilers" - ), - evaluation_parameter_store_name: { - "class_name": "EvaluationParameterStore" - }, + validation_definition_store_name: self.get_store_config( + "ValidationDefinitionStore", "validation_definitions" + ) }, expectations_store_name=expectations_store_name, - validations_store_name=validations_store_name, + validation_results_store_name=validation_results_store_name, checkpoint_store_name=checkpoint_store_name, - profiler_store_name=profiler_store_name, - evaluation_parameter_store_name=evaluation_parameter_store_name, data_docs_sites={ "zenml_artifact_store": self.get_data_docs_config( "data_docs" @@ -245,7 +241,7 @@ def data_context(self) -> AbstractDataContext: else: # create an ephemeral in-memory data context that is not - # backed by a local YAML file (see https://docs.greatexpectations.io/docs/oss/guides/setup/configuring_data_contexts/instantiating_data_contexts/instantiate_data_context/). + # backed by a local YAML file (see https://docs.greatexpectations.io/docs/core/set_up_a_gx_environment/create_a_data_context?context_type=ephemeral). if self.context_config: # Use the data context configuration provided in the stack # component configuration @@ -259,24 +255,18 @@ def data_context(self) -> AbstractDataContext: # already baked in the initial configuration configure_zenml_stores = False - self._context = EphemeralDataContext( - project_config=context_config - ) + self._context = get_context(mode="ephemeral", project_config=context_config) if configure_zenml_stores: self._context.config.expectations_store_name = ( expectations_store_name ) - self._context.config.validations_store_name = ( - validations_store_name + self._context.config.validation_results_store_name = ( + validation_results_store_name ) self._context.config.checkpoint_store_name = ( checkpoint_store_name ) - self._context.config.profiler_store_name = profiler_store_name - self._context.config.evaluation_parameter_store_name = ( - evaluation_parameter_store_name - ) for store_name, store_config in zenml_context_config[ "stores" ].items(): @@ -303,6 +293,7 @@ def data_context(self) -> AbstractDataContext: self.get_data_docs_config("data_docs", local=True) ) + project_manager.set_project(self._context) return self._context @property @@ -323,126 +314,17 @@ def root_directory(self) -> str: return path - def data_profiling( - self, - dataset: pd.DataFrame, - comparison_dataset: Optional[Any] = None, - profile_list: Optional[Sequence[str]] = None, - expectation_suite_name: Optional[str] = None, - data_asset_name: Optional[str] = None, - profiler_kwargs: Optional[Dict[str, Any]] = None, - overwrite_existing_suite: bool = True, - **kwargs: Any, - ) -> ExpectationSuite: - """Infer a Great Expectation Expectation Suite from a given dataset. - - This Great Expectations specific data profiling method implementation - builds an Expectation Suite automatically by running a - UserConfigurableProfiler on an input dataset [as covered in the official - GE documentation](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_with_a_profiler). - - Args: - dataset: The dataset from which the expectation suite will be - inferred. - comparison_dataset: Optional dataset used to generate data - comparison (i.e. data drift) profiles. Not supported by the - Great Expectation data validator. - profile_list: Optional list identifying the categories of data - profiles to be generated. Not supported by the Great Expectation - data validator. - expectation_suite_name: The name of the expectation suite to create - or update. If not supplied, a unique name will be generated from - the current pipeline and step name, if running in the context of - a pipeline step. - data_asset_name: The name of the data asset to use to identify the - dataset in the Great Expectations docs. - profiler_kwargs: A dictionary of custom keyword arguments to pass to - the profiler. - overwrite_existing_suite: Whether to overwrite an existing - expectation suite, if one exists with that name. - kwargs: Additional keyword arguments (unused). - - Returns: - The inferred Expectation Suite. - - Raises: - ValueError: if an `expectation_suite_name` value is not supplied and - a name for the expectation suite cannot be generated from the - current step name and pipeline name. - """ - context = self.data_context - - if comparison_dataset is not None: - logger.warning( - "A comparison dataset is not required by Great Expectations " - "to do data profiling. Silently ignoring the supplied dataset " - ) - - if not expectation_suite_name: - try: - step_context = get_step_context() - pipeline_name = step_context.pipeline.name - step_name = step_context.step_run.name - expectation_suite_name = f"{pipeline_name}_{step_name}" - except RuntimeError: - raise ValueError( - "A expectation suite name is required when not running in " - "the context of a pipeline step." - ) - - suite_exists = False - if context.expectations_store.has_key( # noqa - ExpectationSuiteIdentifier(expectation_suite_name) - ): - suite_exists = True - suite = context.get_expectation_suite(expectation_suite_name) - if not overwrite_existing_suite: - logger.info( - f"Expectation Suite `{expectation_suite_name}` " - f"already exists and `overwrite_existing_suite` is not set " - f"in the step configuration. Skipping re-running the " - f"profiler." - ) - return suite - - batch_request = create_batch_request(context, dataset, data_asset_name) - - try: - if suite_exists: - validator = context.get_validator( - batch_request=batch_request, - expectation_suite_name=expectation_suite_name, - ) - else: - validator = context.get_validator( - batch_request=batch_request, - create_expectation_suite_with_name=expectation_suite_name, - ) - - profiler = UserConfigurableProfiler( - profile_dataset=validator, **profiler_kwargs - ) - - suite = profiler.build_suite() - context.save_expectation_suite( - expectation_suite=suite, - expectation_suite_name=expectation_suite_name, - ) - - context.build_data_docs() - finally: - context.delete_datasource(batch_request.datasource_name) - - return suite - def data_validation( self, dataset: pd.DataFrame, comparison_dataset: Optional[Any] = None, check_list: Optional[Sequence[str]] = None, + expectations_list: Optional[Sequence[GreatExpectationExpectationConfig]] = None, + expectation_parameters: Optional[Dict[str, Any]] = None, expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, - action_list: Optional[List[Dict[str, Any]]] = None, + action_list: Optional[List[ge.checkpoint.actions.ValidationAction]] = None, + result_format: str = "SUMMARY", **kwargs: Any, ) -> CheckpointResult: """Great Expectations data validation. @@ -460,29 +342,43 @@ def data_validation( check_list: Optional list identifying the data validation checks to be performed. Not supported by the Great Expectations data validator. + expectations_list: A list of Great Expectations expectations to + use to validate the dataset. Either this or expectation_suite_name + must be provided, but not both. + expectation_parameters: Optional parameters to pass to the + expectations if you have defined any parameters in the + expectations. expectation_suite_name: The name of the expectation suite to use to - validate the dataset. A value must be provided. + validate the dataset. Either this or expectations_list must be + provided, but not both. data_asset_name: The name of the data asset to use to identify the dataset in the Great Expectations docs. action_list: A list of additional Great Expectations actions to run after the validation check. - kwargs: Additional keyword arguments (unused). + result_format: The format in which to return the results of the validation definitions. Default is "SUMMARY". + Other options are: "BOOLEAN_ONLY", "BASIC", "COMPLETE". Details in the docs: + https://docs.greatexpectations.io/docs/core/trigger_actions_based_on_results/choose_a_result_format/#define-a-result-format-configuration + kwargs: Additional keyword arguments. Returns: The Great Expectations validation (checkpoint) result. Raises: - ValueError: if the `expectation_suite_name` argument is omitted. + ValueError: If both expectation_suite_name and expectations_list are provided, + or if neither are provided. """ - if not expectation_suite_name: - raise ValueError("Missing expectation_suite_name argument value.") - if comparison_dataset is not None: logger.warning( "A comparison dataset is not required by Great Expectations " "to do data validation. Silently ignoring the supplied dataset " ) + if expectation_suite_name and expectations_list: + raise ValueError("Only one of `expectation_suite_name` and `expectations_list` can be provided.") + + if not expectation_suite_name and not expectations_list: + raise ValueError("Either `expectation_suite_name` or `expectations_list` must be provided.") + try: step_context = get_step_context() run_name = step_context.pipeline_run.name @@ -494,42 +390,75 @@ def data_validation( context = self.data_context - checkpoint_name = f"{run_name}_{step_name}" - - batch_request = create_batch_request(context, dataset, data_asset_name) - - action_list = action_list or [ - { - "name": "store_validation_result", - "action": {"class_name": "StoreValidationResultAction"}, - }, - { - "name": "store_evaluation_params", - "action": {"class_name": "StoreEvaluationParametersAction"}, + # get all expectations from the list + if expectations_list: + # construct an expectation suite name from the pipeline and step names + suite_name = f"{run_name}_{step_name}" + expectation_suite = ExpectationSuite( + name=suite_name, + expectations=[exp.get_expectation() for exp in expectations_list] + ) + context.suites.add(expectation_suite) + + else: # when the expectation_suite_name is provided + expectation_suite = context.suites.get(name=expectation_suite_name) + + batch_definition, batch_parameters, data_source = create_batch_definition(context, dataset, data_asset_name) + + validation_definition = { + "name": f"{run_name}_{step_name}", + "data": { + "datasource": { + "name": data_source.name, + "id": data_source.id + }, + "asset": { + "name": data_source.assets[0].name, + "id": data_source.assets[0].id + }, + "batch_definition": { + "name": batch_definition.name, + "id": batch_definition.id + } }, - { - "name": "update_data_docs", - "action": {"class_name": "UpdateDataDocsAction"}, + "suite": { + "name": expectation_suite.name, + "id": expectation_suite.id }, - ] - - checkpoint_config: Dict[str, Any] = { - "name": checkpoint_name, - "run_name_template": run_name, - "config_version": 1, - "class_name": "Checkpoint", - "expectation_suite_name": expectation_suite_name, - "action_list": action_list, } - context.add_checkpoint(**checkpoint_config) # type: ignore[has-type] + validation_definition_obj = ge.ValidationDefinition( + data=batch_definition, suite=expectation_suite, + name=f"{run_name}_{step_name}" + ) + # create a validation definition + _ = context.validation_definitions.add(validation_definition_obj) + + # add an action to update all data docs sites + # not specifying site_names, so this will update all data docs sites + action_update_data_docs = { + "name": "update_data_docs", + "type": "update_data_docs" + } + + # create a checkpoint + checkpoint_name = f"{run_name}_{step_name}" + checkpoint = ge.Checkpoint( + name=checkpoint_name, + validation_definitions=[validation_definition], + actions=action_list or [action_update_data_docs], + result_format={"result_format": result_format}, + ) + + checkpoint = context.checkpoints.add(checkpoint) + + # run a checkpoint try: - results = context.run_checkpoint( - checkpoint_name=checkpoint_name, - validations=[{"batch_request": batch_request}], + results = checkpoint.run( + batch_parameters=batch_parameters, + expectation_parameters=expectation_parameters ) - finally: - context.delete_datasource(batch_request.datasource_name) - context.delete_checkpoint(checkpoint_name) + except Exception as e: + raise e return results diff --git a/src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py b/src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py index fdbda159fbe..18d98170361 100644 --- a/src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py +++ b/src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py @@ -40,6 +40,7 @@ class GreatExpectationsDataValidatorConfig(BaseDataValidatorConfig): """Config for the Great Expectations data validator. Attributes: + project_root_dir: location of the root directory of the Great Expectations project. context_root_dir: location of an already initialized Great Expectations data context. If configured, the data validator will only be usable with local orchestrators. @@ -54,6 +55,7 @@ class GreatExpectationsDataValidatorConfig(BaseDataValidatorConfig): Expectations docs are generated and can be visualized locally. """ + project_root_dir: Optional[str] = None context_root_dir: Optional[str] = None context_config: Optional[Dict[str, Any]] = None configure_zenml_stores: bool = False diff --git a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py index 88fd66364f4..702f7b1b811 100644 --- a/src/zenml/integrations/great_expectations/materializers/ge_materializer.py +++ b/src/zenml/integrations/great_expectations/materializers/ge_materializer.py @@ -14,20 +14,18 @@ """Implementation of the Great Expectations materializers.""" import os +import re from typing import TYPE_CHECKING, Any, ClassVar, Dict, Tuple, Type, Union, cast +import json -from great_expectations.checkpoint.types.checkpoint_result import ( # type: ignore[import-untyped] - CheckpointResult, -) +from great_expectations.checkpoint.checkpoint import Checkpoint, CheckpointResult from great_expectations.core import ( # type: ignore[import-untyped] ExpectationSuite, ) from great_expectations.core.expectation_validation_result import ( # type: ignore[import-untyped] ExpectationSuiteValidationResult, ) -from great_expectations.data_context.types.base import ( - CheckpointConfig, -) + from great_expectations.data_context.types.resource_identifiers import ( ExpectationSuiteIdentifier, ValidationResultIdentifier, @@ -42,6 +40,8 @@ ) from zenml.materializers.base_materializer import BaseMaterializer from zenml.utils import source_utils, yaml_utils +from datetime import datetime +import uuid if TYPE_CHECKING: from zenml.metadata.metadata_types import MetadataType @@ -80,14 +80,33 @@ def preprocess_run_result(key: str, value: Any) -> Any: return ExpectationSuiteValidationResult(**value) return value - artifact_dict["checkpoint_config"] = CheckpointConfig( - **artifact_dict["checkpoint_config"] - ) + # artifact_dict["checkpoint_config"] = Checkpoint( + # **json.loads(artifact_dict["checkpoint_config"]) + # ) + artifact_dict["checkpoint_config"] = Checkpoint(name="test", validation_definitions=[]) validation_dict = {} for result_ident, results in artifact_dict["run_results"].items(): + after_double_colon = result_ident.split('::', 1)[1] + + # Use a regular expression to extract the JSON part + json_match = re.search(r'\{.*?\}', after_double_colon, re.DOTALL) + if json_match: + run_id_json_str = json_match.group(0) + # Parse the JSON string to extract run_name and run_time + run_id_data = json.loads(run_id_json_str) + + # Extract run_name and run_time + run_name = run_id_data.get("run_name") + run_time = run_id_data.get("run_time") + + # Now split the remaining part by colons to get the components + components = after_double_colon.split(':') + expectation_suite_identifier = components[0] + batch_identifier = components[-1] + validation_ident_tuple = (expectation_suite_identifier, run_name, run_time, batch_identifier) validation_ident = ( ValidationResultIdentifier.from_fixed_length_tuple( # type: ignore[no-untyped-call] - result_ident.split("::")[1].split("/") + validation_ident_tuple ) ) validation_results = { @@ -109,6 +128,8 @@ def load(self, data_type: Type[Any]) -> SerializableDictDot: filepath = os.path.join(self.uri, ARTIFACT_FILENAME) artifact_dict = yaml_utils.read_json(filepath) data_type = source_utils.load(artifact_dict.pop("data_type")) + # load active data context + _ = GreatExpectationsDataValidator.get_data_context() if data_type is CheckpointResult: self.preprocess_checkpoint_result_dict(artifact_dict) @@ -122,13 +143,80 @@ def save(self, obj: SerializableDictDot) -> None: obj: A Great Expectations object. """ filepath = os.path.join(self.uri, ARTIFACT_FILENAME) - artifact_dict = obj.to_json_dict() + artifact_dict = self.serialize_ge_object(obj) artifact_type = type(obj) artifact_dict["data_type"] = ( f"{artifact_type.__module__}.{artifact_type.__name__}" ) yaml_utils.write_json(filepath, artifact_dict) + def serialize_ge_object(self, obj: Any) -> Any: + """Serialize a Great Expectations object to a JSON-serializable structure. + + Args: + obj: A Great Expectations object. + + Returns: + A JSON-serializable representation of the object. + """ + if isinstance(obj, CheckpointResult): + return { + "name": obj.name, + "run_id": self.serialize_ge_object(obj.run_id), + "run_results": self.serialize_ge_object(obj.run_results), + "checkpoint_config": self.serialize_ge_object(obj.checkpoint_config), + "success": obj.success, + } + elif isinstance(obj, dict): + return {self.serialize_key(k): self.serialize_ge_object(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self.serialize_ge_object(v) for v in obj] + elif hasattr(obj, 'to_json_dict'): + return obj.to_json_dict() + elif hasattr(obj, 'to_tuple'): + return obj.to_tuple() + elif hasattr(obj, 'json'): + return obj.json() + elif isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, uuid.UUID): + return str(obj) + else: + return obj + + def serialize_key(self, key: Any) -> str: + """Serialize a dictionary key to a string. + + Args: + key: The key to serialize. + + Returns: + A string representation of the key. + """ + if isinstance(key, (str, int, float, bool)) or key is None: + return str(key) + elif isinstance(key, (ExpectationSuiteIdentifier, ValidationResultIdentifier)): + return self.serialize_identifier(key) + else: + return str(key) + + def serialize_identifier(self, identifier: Union[ExpectationSuiteIdentifier, ValidationResultIdentifier]) -> str: + """Serialize ExpectationSuiteIdentifier or ValidationResultIdentifier to a string. + + Args: + identifier: The identifier to serialize. + + Returns: + A string representation of the identifier. + + Raises: + ValueError: If the identifier type is not supported. + """ + if isinstance(identifier, ExpectationSuiteIdentifier): + return f"ExpectationSuiteIdentifier:{identifier.expectation_suite_name}" + elif isinstance(identifier, ValidationResultIdentifier): + return f"ValidationResultIdentifier:{identifier.expectation_suite_identifier}:{identifier.run_id}:{identifier.batch_identifier}" + def save_visualizations( self, data: Union[ExpectationSuite, CheckpointResult] ) -> Dict[str, VisualizationType]: diff --git a/src/zenml/integrations/great_expectations/steps/__init__.py b/src/zenml/integrations/great_expectations/steps/__init__.py index eca1af5290c..b8229e45462 100644 --- a/src/zenml/integrations/great_expectations/steps/__init__.py +++ b/src/zenml/integrations/great_expectations/steps/__init__.py @@ -14,9 +14,6 @@ """Great Expectations data profiling and validation standard steps.""" -from zenml.integrations.great_expectations.steps.ge_profiler import ( - great_expectations_profiler_step, -) from zenml.integrations.great_expectations.steps.ge_validator import ( great_expectations_validator_step, ) diff --git a/src/zenml/integrations/great_expectations/steps/ge_profiler.py b/src/zenml/integrations/great_expectations/steps/ge_profiler.py deleted file mode 100644 index cc1edd2d187..00000000000 --- a/src/zenml/integrations/great_expectations/steps/ge_profiler.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Great Expectations data profiling standard step.""" - -from typing import Any, Dict, Optional - -import pandas as pd -from great_expectations.core import ( # type: ignore[import-untyped] - ExpectationSuite, -) - -from zenml import step -from zenml.integrations.great_expectations.data_validators.ge_data_validator import ( - GreatExpectationsDataValidator, -) - - -@step -def great_expectations_profiler_step( - dataset: pd.DataFrame, - expectation_suite_name: str, - data_asset_name: Optional[str] = None, - profiler_kwargs: Optional[Dict[str, Any]] = None, - overwrite_existing_suite: bool = True, -) -> ExpectationSuite: - """Infer data validation rules from a pandas dataset. - - Args: - dataset: The dataset from which the expectation suite will be inferred. - expectation_suite_name: The name of the expectation suite to infer. - data_asset_name: The name of the data asset to profile. - profiler_kwargs: A dictionary of keyword arguments to pass to the - profiler. - overwrite_existing_suite: Whether to overwrite an existing expectation - suite. - - Returns: - The generated Great Expectations suite. - """ - data_validator = GreatExpectationsDataValidator.get_active_data_validator() - - return data_validator.data_profiling( - dataset, - expectation_suite_name=expectation_suite_name, - data_asset_name=data_asset_name, - profiler_kwargs=profiler_kwargs or {}, - overwrite_existing_suite=overwrite_existing_suite, - ) diff --git a/src/zenml/integrations/great_expectations/steps/ge_validator.py b/src/zenml/integrations/great_expectations/steps/ge_validator.py index e23c9cd0e0a..c50a5e76901 100644 --- a/src/zenml/integrations/great_expectations/steps/ge_validator.py +++ b/src/zenml/integrations/great_expectations/steps/ge_validator.py @@ -16,22 +16,26 @@ from typing import Any, Dict, List, Optional import pandas as pd -from great_expectations.checkpoint.types.checkpoint_result import ( # type: ignore[import-untyped] +from great_expectations.checkpoint.checkpoint import ( # type: ignore[import-untyped] CheckpointResult, ) from zenml import step +from zenml.integrations.great_expectations.data_validators.expectations import GreatExpectationExpectationConfig from zenml.integrations.great_expectations.data_validators.ge_data_validator import ( GreatExpectationsDataValidator, ) - +import great_expectations as ge @step def great_expectations_validator_step( dataset: pd.DataFrame, - expectation_suite_name: str, + expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, - action_list: Optional[List[Dict[str, Any]]] = None, + action_list: Optional[List[ge.checkpoint.actions.ValidationAction]] = None, + expectation_parameters: Optional[Dict[str, Any]] = None, + expectations_list: Optional[List[GreatExpectationExpectationConfig]] = None, + result_format: str = "SUMMARY", exit_on_error: bool = False, ) -> CheckpointResult: """Shortcut function to create a new instance of the GreatExpectationsValidatorStep step. @@ -51,6 +55,11 @@ def great_expectations_validator_step( after the validation check. exit_on_error: Set this flag to raise an error and exit the pipeline early if the validation fails. + expectation_parameters: Additional parameters to pass to the + expectation suite. + expectations_list: A list of expectations to run. + result_format: The format of the validation results. + validator. Returns: The Great Expectations validation (checkpoint) result. @@ -66,6 +75,9 @@ def great_expectations_validator_step( expectation_suite_name=expectation_suite_name, data_asset_name=data_asset_name, action_list=action_list, + expectation_parameters=expectation_parameters, + expectations_list=expectations_list, + result_format=result_format, ) if exit_on_error and not results.success: diff --git a/src/zenml/integrations/great_expectations/utils.py b/src/zenml/integrations/great_expectations/utils.py index aeeeec94ff8..b90b42a94dd 100644 --- a/src/zenml/integrations/great_expectations/utils.py +++ b/src/zenml/integrations/great_expectations/utils.py @@ -13,15 +13,14 @@ # permissions and limitations under the License. """Great Expectations data profiling standard step.""" -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Tuple import pandas as pd -from great_expectations.core.batch import ( # type: ignore[import-untyped] - RuntimeBatchRequest, -) +from great_expectations.core.batch_definition import BatchDefinition from great_expectations.data_context.data_context.abstract_data_context import ( AbstractDataContext, ) +from great_expectations.datasource.fluent.interfaces import Datasource from zenml import get_step_context from zenml.logger import get_logger @@ -30,11 +29,11 @@ logger = get_logger(__name__) -def create_batch_request( +def create_batch_definition( context: AbstractDataContext, dataset: pd.DataFrame, data_asset_name: Optional[str], -) -> RuntimeBatchRequest: +) -> Tuple[BatchDefinition, Dict[str, Any], Datasource]: """Create a temporary runtime GE batch request from a dataset step artifact. Args: @@ -43,7 +42,9 @@ def create_batch_request( data_asset_name: Optional custom name for the data asset. Returns: - A Great Expectations runtime batch request. + batch_definition: A Great Expectations runtime batch request. + batch_parameters: A dictionary of batch parameters. + datasource_name: The name of the datasource. """ try: # get pipeline name, step name and run id @@ -58,33 +59,16 @@ def create_batch_request( step_name = f"step_{random_str(5)}" datasource_name = f"{run_name}_{step_name}" - data_connector_name = datasource_name data_asset_name = data_asset_name or f"{pipeline_name}_{step_name}" - batch_identifier = "default" + batch_definition_name = "default" - datasource_config: Dict[str, Any] = { - "name": datasource_name, - "class_name": "Datasource", - "module_name": "great_expectations.datasource", - "execution_engine": { - "module_name": "great_expectations.execution_engine", - "class_name": "PandasExecutionEngine", - }, - "data_connectors": { - data_connector_name: { - "class_name": "RuntimeDataConnector", - "batch_identifiers": [batch_identifier], - }, - }, - } + data_source = context.data_sources.add_pandas(name=datasource_name) + data_asset = data_source.add_dataframe_asset(name=data_asset_name) - context.add_datasource(**datasource_config) - batch_request = RuntimeBatchRequest( - datasource_name=datasource_name, - data_connector_name=data_connector_name, - data_asset_name=data_asset_name, - runtime_parameters={"batch_data": dataset}, - batch_identifiers={batch_identifier: batch_identifier}, + batch_definition = data_asset.add_batch_definition_whole_dataframe( + batch_definition_name ) - return batch_request + batch_parameters = {"dataframe": dataset} + + return batch_definition, batch_parameters, data_source