From 8c2fff8b9ddc767cf05fbb3518211278fdada298 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Tue, 22 Oct 2024 15:14:00 -0700 Subject: [PATCH] address Jacky's comments --- .../databricks/python_models/python_config.py | 2 +- .../python_models/python_submissions.py | 215 ++++++++---------- tests/unit/python/test_python_config.py | 6 +- tests/unit/python/test_python_job_support.py | 138 ++++------- tests/unit/python/test_python_submitters.py | 7 +- 5 files changed, 146 insertions(+), 222 deletions(-) diff --git a/dbt/adapters/databricks/python_models/python_config.py b/dbt/adapters/databricks/python_models/python_config.py index 9b74223e..6398397d 100644 --- a/dbt/adapters/databricks/python_models/python_config.py +++ b/dbt/adapters/databricks/python_models/python_config.py @@ -32,7 +32,7 @@ class PythonModelConfig(BaseModel): packages: List[str] = Field(default_factory=list) index_url: Optional[str] = None additional_libs: List[Dict[str, Any]] = Field(default_factory=list) - python_job_config: Optional[PythonJobConfig] = None + python_job_config: PythonJobConfig = Field(default_factory=lambda: PythonJobConfig(**{})) cluster_id: Optional[str] = None http_path: Optional[str] = None create_notebook: bool = False diff --git a/dbt/adapters/databricks/python_models/python_submissions.py b/dbt/adapters/databricks/python_models/python_submissions.py index c9e349bf..de139f3a 100644 --- a/dbt/adapters/databricks/python_models/python_submissions.py +++ b/dbt/adapters/databricks/python_models/python_submissions.py @@ -95,24 +95,11 @@ def submit(self, compiled_code: str) -> None: class PythonNotebookUploader: """Uploads a compiled Python model as a notebook to the Databricks workspace.""" - def __init__( - self, api_client: DatabricksApiClient, catalog: str, schema: str, identifier: str - ) -> None: + def __init__(self, api_client: DatabricksApiClient, parsed_model: ParsedPythonModel) -> None: self.api_client = api_client - self.catalog = catalog - self.schema = schema - self.identifier = identifier - - @staticmethod - def create( - api_client: DatabricksApiClient, parsed_model: ParsedPythonModel - ) -> "PythonNotebookUploader": - return PythonNotebookUploader( - api_client, - parsed_model.catalog, - parsed_model.schema_, - parsed_model.identifier, - ) + self.catalog = parsed_model.catalog + self.schema = parsed_model.schema_ + self.identifier = parsed_model.identifier def upload(self, compiled_code: str) -> str: """Upload the compiled code to the Databricks workspace.""" @@ -137,25 +124,8 @@ class PythonPermissionBuilder: def __init__( self, api_client: DatabricksApiClient, - job_grants: Dict[str, List[Dict[str, Any]]], - acls: List[Dict[str, str]], ) -> None: self.api_client = api_client - self.job_grants = job_grants - self.acls = acls - - @staticmethod - def create( - api_client: DatabricksApiClient, parsed_model: ParsedPythonModel - ) -> "PythonPermissionBuilder": - if parsed_model.config.python_job_config: - job_grants = parsed_model.config.python_job_config.grants - else: - job_grants = {} - - return PythonPermissionBuilder( - api_client, job_grants, parsed_model.config.access_control_list or [] - ) def _get_job_owner_for_config(self) -> Tuple[str, str]: """Get the owner of the job (and type) for the access control list.""" @@ -165,7 +135,17 @@ def _get_job_owner_for_config(self) -> Tuple[str, str]: source = "service_principal_name" if is_service_principal else "user_name" return curr_user, source - def build_job_permissions(self) -> List[Dict[str, Any]]: + @staticmethod + def _build_job_permission( + job_grants: List[Dict[str, Any]], permission: str + ) -> List[Dict[str, Any]]: + return [{**grant, **{"permission_level": permission}} for grant in job_grants] + + def build_job_permissions( + self, + job_grants: Dict[str, List[Dict[str, Any]]], + acls: List[Dict[str, str]], + ) -> List[Dict[str, Any]]: """Build the access control list for the job.""" access_control_list = [] @@ -177,57 +157,38 @@ def build_job_permissions(self) -> List[Dict[str, Any]]: } ) - for grant in self.job_grants.get("view", []): - acl_grant = grant.copy() - acl_grant.update( - { - "permission_level": "CAN_VIEW", - } - ) - access_control_list.append(acl_grant) - for grant in self.job_grants.get("run", []): - acl_grant = grant.copy() - acl_grant.update( - { - "permission_level": "CAN_MANAGE_RUN", - } - ) - access_control_list.append(acl_grant) - for grant in self.job_grants.get("manage", []): - acl_grant = grant.copy() - acl_grant.update( - { - "permission_level": "CAN_MANAGE", - } - ) - access_control_list.append(acl_grant) + access_control_list.extend( + self._build_job_permission(job_grants.get("view", []), "CAN_VIEW") + ) + access_control_list.extend( + self._build_job_permission(job_grants.get("run", []), "CAN_MANAGE_RUN") + ) + access_control_list.extend( + self._build_job_permission(job_grants.get("manage", []), "CAN_MANAGE") + ) - return access_control_list + self.acls + return access_control_list + acls -class PythonLibraryConfigurer: - """Configures the libraries component for a Python job.""" +def get_library_config( + packages: List[str], + index_url: Optional[str], + additional_libraries: List[Dict[str, Any]], +) -> Dict[str, Any]: + """Update the job configuration with the required libraries.""" - @staticmethod - def get_library_config( - packages: List[str], - index_url: Optional[str], - additional_libraries: List[Dict[str, Any]], - ) -> Dict[str, Any]: - """Update the job configuration with the required libraries.""" - - libraries = [] - - for package in packages: - if index_url: - libraries.append({"pypi": {"package": package, "repo": index_url}}) - else: - libraries.append({"pypi": {"package": package}}) + libraries = [] + + for package in packages: + if index_url: + libraries.append({"pypi": {"package": package, "repo": index_url}}) + else: + libraries.append({"pypi": {"package": package}}) - for library in additional_libraries: - libraries.append(library) + for library in additional_libraries: + libraries.append(library) - return {"libraries": libraries} + return {"libraries": libraries} class PythonJobConfigCompiler: @@ -237,42 +198,20 @@ def __init__( self, api_client: DatabricksApiClient, permission_builder: PythonPermissionBuilder, - run_name: str, + parsed_model: ParsedPythonModel, cluster_spec: Dict[str, Any], - additional_job_settings: Dict[str, Any], ) -> None: self.api_client = api_client self.permission_builder = permission_builder - self.run_name = run_name - self.cluster_spec = cluster_spec - self.additional_job_settings = additional_job_settings - - @staticmethod - def create( - api_client: DatabricksApiClient, - parsed_model: ParsedPythonModel, - cluster_spec: Dict[str, Any], - ) -> "PythonJobConfigCompiler": - permission_builder = PythonPermissionBuilder.create(api_client, parsed_model) + self.run_name = parsed_model.run_name packages = parsed_model.config.packages index_url = parsed_model.config.index_url additional_libraries = parsed_model.config.additional_libs - library_config = PythonLibraryConfigurer.get_library_config( - packages, index_url, additional_libraries - ) - cluster_spec.update(library_config) - if parsed_model.config.python_job_config: - additional_job_settings = parsed_model.config.python_job_config.dict() - else: - additional_job_settings = {} - - return PythonJobConfigCompiler( - api_client, - permission_builder, - parsed_model.run_name, - cluster_spec, - additional_job_settings, - ) + library_config = get_library_config(packages, index_url, additional_libraries) + self.cluster_spec = {**cluster_spec, **library_config} + self.job_grants = parsed_model.config.python_job_config.grants + self.acls = parsed_model.config.access_control_list + self.additional_job_settings = parsed_model.config.python_job_config.dict() def compile(self, path: str) -> PythonJobDetails: @@ -285,7 +224,9 @@ def compile(self, path: str) -> PythonJobDetails: job_spec.update(self.cluster_spec) # updates 'new_cluster' config additional_job_config = self.additional_job_settings - access_control_list = self.permission_builder.build_job_permissions() + access_control_list = self.permission_builder.build_job_permissions( + self.job_grants, self.acls + ) if access_control_list: job_spec["access_control_list"] = access_control_list @@ -314,9 +255,11 @@ def create( parsed_model: ParsedPythonModel, cluster_spec: Dict[str, Any], ) -> "PythonNotebookSubmitter": - notebook_uploader = PythonNotebookUploader.create(api_client, parsed_model) - config_compiler = PythonJobConfigCompiler.create( + notebook_uploader = PythonNotebookUploader(api_client, parsed_model) + permission_builder = PythonPermissionBuilder(api_client) + config_compiler = PythonJobConfigCompiler( api_client, + permission_builder, parsed_model, cluster_spec, ) @@ -363,10 +306,29 @@ class AllPurposeClusterPythonJobHelper(BaseDatabricksHelper): Top level helper for Python models using job runs or Command API on an all-purpose cluster. """ + def __init__(self, parsed_model: Dict, credentials: DatabricksCredentials) -> None: + self.credentials = credentials + self.credentials.validate_creds() + self.parsed_model = ParsedPythonModel(**parsed_model) + + self.api_client = DatabricksApiClient.create( + credentials, + self.parsed_model.config.timeout, + self.parsed_model.config.user_folder_for_python, + ) + + config = self.parsed_model.config + self.create_notebook = config.create_notebook + self.cluster_id = config.cluster_id or self.credentials.extract_cluster_id( + config.http_path or self.credentials.http_path or "" + ) + self.validate_config() + + self.command_submitter = self.build_submitter() + @override def build_submitter(self) -> PythonSubmitter: - config = self.parsed_model.config - if config.create_notebook: + if self.create_notebook: return PythonNotebookSubmitter.create( self.api_client, self.tracker, @@ -378,10 +340,6 @@ def build_submitter(self) -> PythonSubmitter: @override def validate_config(self) -> None: - config = self.parsed_model.config - self.cluster_id = config.cluster_id or self.credentials.extract_cluster_id( - config.http_path or self.credentials.http_path or "" - ) if not self.cluster_id: raise ValueError( "Databricks `http_path` or `cluster_id` of an all-purpose cluster is required " @@ -508,6 +466,8 @@ def __init__( config_compiler: PythonWorkflowConfigCompiler, permission_builder: PythonPermissionBuilder, workflow_creater: PythonWorkflowCreater, + job_grants: Dict[str, List[Dict[str, str]]], + acls: List[Dict[str, str]], ) -> None: self.api_client = api_client self.tracker = tracker @@ -515,17 +475,26 @@ def __init__( self.config_compiler = config_compiler self.permission_builder = permission_builder self.workflow_creater = workflow_creater + self.job_grants = job_grants + self.acls = acls @staticmethod def create( api_client: DatabricksApiClient, tracker: PythonRunTracker, parsed_model: ParsedPythonModel ) -> "PythonNotebookWorkflowSubmitter": - uploader = PythonNotebookUploader.create(api_client, parsed_model) + uploader = PythonNotebookUploader(api_client, parsed_model) config_compiler = PythonWorkflowConfigCompiler.create(parsed_model) - permission_builder = PythonPermissionBuilder.create(api_client, parsed_model) + permission_builder = PythonPermissionBuilder(api_client) workflow_creater = PythonWorkflowCreater(api_client.workflows) return PythonNotebookWorkflowSubmitter( - api_client, tracker, uploader, config_compiler, permission_builder, workflow_creater + api_client, + tracker, + uploader, + config_compiler, + permission_builder, + workflow_creater, + parsed_model.config.python_job_config.grants, + parsed_model.config.access_control_list, ) @override @@ -535,7 +504,9 @@ def submit(self, compiled_code: str) -> None: workflow_config, existing_job_id = self.config_compiler.compile(file_path) job_id = self.workflow_creater.create_or_update(workflow_config, existing_job_id) - access_control_list = self.permission_builder.build_job_permissions() + access_control_list = self.permission_builder.build_job_permissions( + self.job_grants, self.acls + ) self.api_client.workflow_permissions.put(job_id, access_control_list) run_id = self.api_client.workflows.run(job_id, enable_queueing=True) diff --git a/tests/unit/python/test_python_config.py b/tests/unit/python/test_python_config.py index 032283c8..ef450afc 100644 --- a/tests/unit/python/test_python_config.py +++ b/tests/unit/python/test_python_config.py @@ -39,7 +39,11 @@ def test_parsed_model__empty_model_config(self): assert config.packages == [] assert config.index_url is None assert config.additional_libs == [] - assert config.python_job_config is None + assert config.python_job_config.name is None + assert config.python_job_config.grants == {} + assert config.python_job_config.existing_job_id == "" + assert config.python_job_config.post_hook_tasks == [] + assert config.python_job_config.additional_task_settings == {} assert config.cluster_id is None assert config.http_path is None assert config.create_notebook is False diff --git a/tests/unit/python/test_python_job_support.py b/tests/unit/python/test_python_job_support.py index 6853483f..0d7acb53 100644 --- a/tests/unit/python/test_python_job_support.py +++ b/tests/unit/python/test_python_job_support.py @@ -1,9 +1,9 @@ from unittest.mock import Mock import pytest +from dbt.adapters.databricks.python_models import python_submissions from dbt.adapters.databricks.python_models.python_submissions import ( PythonJobConfigCompiler, - PythonLibraryConfigurer, PythonNotebookUploader, PythonPermissionBuilder, ) @@ -30,12 +30,13 @@ def workdir(self): return "workdir" @pytest.fixture - def identifier(self): + def identifier(self, parsed_model): return "identifier" @pytest.fixture - def uploader(self, client, identifier): - return PythonNotebookUploader(client, "", "", identifier) + def uploader(self, client, parsed_model, identifier): + parsed_model.identifier = identifier + return PythonNotebookUploader(client, parsed_model) def test_upload__golden_path(self, uploader, client, compiled_code, workdir, identifier): client.workspace.create_python_model_dir.return_value = workdir @@ -44,40 +45,30 @@ def test_upload__golden_path(self, uploader, client, compiled_code, workdir, ide assert file_path == f"{workdir}{identifier}" client.workspace.upload_notebook.assert_called_once_with(file_path, compiled_code) - def test_create__golden_path(self, client, parsed_model): - parsed_model.catalog = "catalog" - parsed_model.schema_ = "schema" - parsed_model.identifier = "identifier" - - uploader = PythonNotebookUploader.create(client, parsed_model) - assert uploader.catalog == "catalog" - assert uploader.schema == "schema" - assert uploader.identifier == "identifier" - assert uploader.api_client == client - class TestPythonPermissionBuilder: - def test_build_permission__no_grants_no_acls_user_owner(self, client): - builder = PythonPermissionBuilder(client, {}, []) + @pytest.fixture + def builder(self, client): + return PythonPermissionBuilder(client) + + def test_build_permission__no_grants_no_acls_user_owner(self, builder, client): client.curr_user.get_username.return_value = "user" client.curr_user.is_service_principal.return_value = False - acls = builder.build_job_permissions() + acls = builder.build_job_permissions({}, []) assert acls == [{"user_name": "user", "permission_level": "IS_OWNER"}] - def test_build_permission__no_grants_no_acls_sp_owner(self, client): - builder = PythonPermissionBuilder(client, {}, []) + def test_build_permission__no_grants_no_acls_sp_owner(self, builder, client): client.curr_user.get_username.return_value = "user" client.curr_user.is_service_principal.return_value = True - acls = builder.build_job_permissions() + acls = builder.build_job_permissions({}, []) assert acls == [{"service_principal_name": "user", "permission_level": "IS_OWNER"}] - def test_build_permission__grants_no_acls(self, client): + def test_build_permission__grants_no_acls(self, builder, client): grants = { "view": [{"user_name": "user1"}], "run": [{"user_name": "user2"}], "manage": [{"user_name": "user3"}], } - builder = PythonPermissionBuilder(client, grants, []) client.curr_user.get_username.return_value = "user" client.curr_user.is_service_principal.return_value = False @@ -88,14 +79,13 @@ def test_build_permission__grants_no_acls(self, client): {"user_name": "user3", "permission_level": "CAN_MANAGE"}, ] - assert builder.build_job_permissions() == expected + assert builder.build_job_permissions(grants, []) == expected - def test_build_permission__grants_and_acls(self, client): + def test_build_permission__grants_and_acls(self, builder, client): grants = { "view": [{"user_name": "user1"}], } acls = [{"user_name": "user2", "permission_level": "CAN_MANAGE_RUN"}] - builder = PythonPermissionBuilder(client, grants, acls) client.curr_user.get_username.return_value = "user" client.curr_user.is_service_principal.return_value = False @@ -105,37 +95,23 @@ def test_build_permission__grants_and_acls(self, client): {"user_name": "user2", "permission_level": "CAN_MANAGE_RUN"}, ] - assert builder.build_job_permissions() == expected - - def test_create__with_python_job_config(self, client, parsed_model): - parsed_model.config.python_job_config.grants = {"view": [{"user_name": "user"}]} - builder = PythonPermissionBuilder.create(client, parsed_model) - - assert builder.job_grants == {"view": [{"user_name": "user"}]} - assert builder.acls == parsed_model.config.access_control_list - - def test_create__without_python_job_config(self, client, parsed_model): - parsed_model.config.python_job_config = None - builder = PythonPermissionBuilder.create(client, parsed_model) + assert builder.build_job_permissions(grants, acls) == expected - assert builder.job_grants == {} - assert builder.acls == parsed_model.config.access_control_list - -class TestPythonLibraryConfigurer: +class TestGetLibraryConfig: def test_get_library_config__no_packages_no_libraries(self): - config = PythonLibraryConfigurer.get_library_config([], None, []) + config = python_submissions.get_library_config([], None, []) assert config == {"libraries": []} def test_get_library_config__packages_no_index_no_libraries(self): - config = PythonLibraryConfigurer.get_library_config(["package1", "package2"], None, []) + config = python_submissions.get_library_config(["package1", "package2"], None, []) assert config == { "libraries": [{"pypi": {"package": "package1"}}, {"pypi": {"package": "package2"}}] } def test_get_library_config__packages_index_url_no_libraries(self): index_url = "http://example.com" - config = PythonLibraryConfigurer.get_library_config(["package1", "package2"], index_url, []) + config = python_submissions.get_library_config(["package1", "package2"], index_url, []) assert config == { "libraries": [ {"pypi": {"package": "package1", "repo": index_url}}, @@ -144,7 +120,7 @@ def test_get_library_config__packages_index_url_no_libraries(self): } def test_get_library_config__packages_libraries(self): - config = PythonLibraryConfigurer.get_library_config( + config = python_submissions.get_library_config( ["package1", "package2"], None, [{"pypi": {"package": "package3"}}] ) assert config == { @@ -162,24 +138,16 @@ def permission_builder(self): return Mock() @pytest.fixture - def cluster_spec(self): - return {} - - @pytest.fixture - def run_name(self): - return "run_name" - - @pytest.fixture - def additional_job_settings(self): - return {} - - @pytest.fixture - def compiler(self, client, permission_builder, run_name, cluster_spec, additional_job_settings): - return PythonJobConfigCompiler( - client, permission_builder, run_name, cluster_spec, additional_job_settings - ) + def run_name(self, parsed_model): + run_name = "run_name" + parsed_model.run_name = run_name + parsed_model.config.packages = [] + parsed_model.config.additional_libs = [] + return run_name - def test_compile__empty_configs(self, compiler, run_name, permission_builder): + def test_compile__empty_configs(self, client, permission_builder, parsed_model, run_name): + parsed_model.config.python_job_config.dict.return_value = {} + compiler = PythonJobConfigCompiler(client, permission_builder, parsed_model, {}) permission_builder.build_job_permissions.return_value = [] details = compiler.compile("path") assert details.run_name == run_name @@ -188,17 +156,21 @@ def test_compile__empty_configs(self, compiler, run_name, permission_builder): "notebook_task": { "notebook_path": "path", }, + "libraries": [], } assert details.additional_job_config == {} - def test_compile__nonempty_configs( - self, compiler, run_name, permission_builder, cluster_spec, additional_job_settings - ): + def test_compile__nonempty_configs(self, client, permission_builder, parsed_model, run_name): + parsed_model.config.packages = ["foo"] + parsed_model.config.index_url = None + parsed_model.config.python_job_config.dict.return_value = {"foo": "bar"} + permission_builder.build_job_permissions.return_value = [ {"user_name": "user", "permission_level": "IS_OWNER"} ] - cluster_spec["libraries"] = [{"pypi": {"package": "package"}}] - additional_job_settings["foo"] = "bar" + compiler = PythonJobConfigCompiler( + client, permission_builder, parsed_model, {"cluster_id": "id"} + ) details = compiler.compile("path") assert details.run_name == run_name assert details.job_spec == { @@ -206,34 +178,8 @@ def test_compile__nonempty_configs( "notebook_task": { "notebook_path": "path", }, - "libraries": [{"pypi": {"package": "package"}}], + "cluster_id": "id", + "libraries": [{"pypi": {"package": "foo"}}], "access_control_list": [{"user_name": "user", "permission_level": "IS_OWNER"}], } assert details.additional_job_config == {"foo": "bar"} - - def test_create__empty_configs(self, client, parsed_model, cluster_spec): - parsed_model.config.packages = [] - parsed_model.config.additional_libs = [] - parsed_model.config.python_job_config = None - compiler = PythonJobConfigCompiler.create(client, parsed_model, cluster_spec) - assert compiler.api_client == client - assert isinstance(compiler.permission_builder, PythonPermissionBuilder) - assert compiler.run_name == parsed_model.run_name - assert compiler.cluster_spec == cluster_spec - assert compiler.additional_job_settings == {} - - def test_create__full_configs(self, client, parsed_model): - cluster_spec = {"existing_cluster_id": "cluster_id"} - parsed_model.config.packages = ["foo"] - parsed_model.config.index_url = None - parsed_model.config.additional_libs = [{"pypi": {"package": "bar"}}] - parsed_model.config.python_job_config.dict.return_value = {"baz": "qux"} - compiler = PythonJobConfigCompiler.create(client, parsed_model, cluster_spec) - assert compiler.api_client == client - assert isinstance(compiler.permission_builder, PythonPermissionBuilder) - assert compiler.run_name == parsed_model.run_name - assert compiler.cluster_spec == { - "existing_cluster_id": "cluster_id", - "libraries": [{"pypi": {"package": "foo"}}, {"pypi": {"package": "bar"}}], - } - assert compiler.additional_job_settings == {"baz": "qux"} diff --git a/tests/unit/python/test_python_submitters.py b/tests/unit/python/test_python_submitters.py index 4c53d231..8de8bc65 100644 --- a/tests/unit/python/test_python_submitters.py +++ b/tests/unit/python/test_python_submitters.py @@ -132,7 +132,7 @@ def submitter( self, client, tracker, uploader, config_compiler, permission_builder, workflow_creater ): return PythonNotebookWorkflowSubmitter( - client, tracker, uploader, config_compiler, permission_builder, workflow_creater + client, tracker, uploader, config_compiler, permission_builder, workflow_creater, {}, [] ) def test_submit__golden_path(self, submitter): @@ -159,7 +159,10 @@ def test_submit__poll_fails__cleans_up(self, submitter): def test_create__golden_path(self, client, tracker): parsed_model = Mock() - parsed_model.config.python_job_config = None + parsed_model.config.python_job_config.grants = {} + parsed_model.config.python_job_config.additional_task_settings = {} + parsed_model.config.python_job_config.dict.return_value = {} + parsed_model.config.access_control_list = [] submitter = PythonNotebookWorkflowSubmitter.create(client, tracker, parsed_model) assert submitter.api_client == client assert submitter.tracker == tracker