Skip to content

Commit

Permalink
address Jacky's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Oct 22, 2024
1 parent a4d8800 commit 8c2fff8
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 222 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/python_models/python_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class PythonModelConfig(BaseModel):
packages: List[str] = Field(default_factory=list)
index_url: Optional[str] = None
additional_libs: List[Dict[str, Any]] = Field(default_factory=list)
python_job_config: Optional[PythonJobConfig] = None
python_job_config: PythonJobConfig = Field(default_factory=lambda: PythonJobConfig(**{}))
cluster_id: Optional[str] = None
http_path: Optional[str] = None
create_notebook: bool = False
Expand Down
215 changes: 93 additions & 122 deletions dbt/adapters/databricks/python_models/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,11 @@ def submit(self, compiled_code: str) -> None:
class PythonNotebookUploader:
"""Uploads a compiled Python model as a notebook to the Databricks workspace."""

def __init__(
self, api_client: DatabricksApiClient, catalog: str, schema: str, identifier: str
) -> None:
def __init__(self, api_client: DatabricksApiClient, parsed_model: ParsedPythonModel) -> None:
self.api_client = api_client
self.catalog = catalog
self.schema = schema
self.identifier = identifier

@staticmethod
def create(
api_client: DatabricksApiClient, parsed_model: ParsedPythonModel
) -> "PythonNotebookUploader":
return PythonNotebookUploader(
api_client,
parsed_model.catalog,
parsed_model.schema_,
parsed_model.identifier,
)
self.catalog = parsed_model.catalog
self.schema = parsed_model.schema_
self.identifier = parsed_model.identifier

def upload(self, compiled_code: str) -> str:
"""Upload the compiled code to the Databricks workspace."""
Expand All @@ -137,25 +124,8 @@ class PythonPermissionBuilder:
def __init__(
self,
api_client: DatabricksApiClient,
job_grants: Dict[str, List[Dict[str, Any]]],
acls: List[Dict[str, str]],
) -> None:
self.api_client = api_client
self.job_grants = job_grants
self.acls = acls

@staticmethod
def create(
api_client: DatabricksApiClient, parsed_model: ParsedPythonModel
) -> "PythonPermissionBuilder":
if parsed_model.config.python_job_config:
job_grants = parsed_model.config.python_job_config.grants
else:
job_grants = {}

return PythonPermissionBuilder(
api_client, job_grants, parsed_model.config.access_control_list or []
)

def _get_job_owner_for_config(self) -> Tuple[str, str]:
"""Get the owner of the job (and type) for the access control list."""
Expand All @@ -165,7 +135,17 @@ def _get_job_owner_for_config(self) -> Tuple[str, str]:
source = "service_principal_name" if is_service_principal else "user_name"
return curr_user, source

def build_job_permissions(self) -> List[Dict[str, Any]]:
@staticmethod
def _build_job_permission(
job_grants: List[Dict[str, Any]], permission: str
) -> List[Dict[str, Any]]:
return [{**grant, **{"permission_level": permission}} for grant in job_grants]

def build_job_permissions(
self,
job_grants: Dict[str, List[Dict[str, Any]]],
acls: List[Dict[str, str]],
) -> List[Dict[str, Any]]:
"""Build the access control list for the job."""

access_control_list = []
Expand All @@ -177,57 +157,38 @@ def build_job_permissions(self) -> List[Dict[str, Any]]:
}
)

for grant in self.job_grants.get("view", []):
acl_grant = grant.copy()
acl_grant.update(
{
"permission_level": "CAN_VIEW",
}
)
access_control_list.append(acl_grant)
for grant in self.job_grants.get("run", []):
acl_grant = grant.copy()
acl_grant.update(
{
"permission_level": "CAN_MANAGE_RUN",
}
)
access_control_list.append(acl_grant)
for grant in self.job_grants.get("manage", []):
acl_grant = grant.copy()
acl_grant.update(
{
"permission_level": "CAN_MANAGE",
}
)
access_control_list.append(acl_grant)
access_control_list.extend(
self._build_job_permission(job_grants.get("view", []), "CAN_VIEW")
)
access_control_list.extend(
self._build_job_permission(job_grants.get("run", []), "CAN_MANAGE_RUN")
)
access_control_list.extend(
self._build_job_permission(job_grants.get("manage", []), "CAN_MANAGE")
)

return access_control_list + self.acls
return access_control_list + acls


class PythonLibraryConfigurer:
"""Configures the libraries component for a Python job."""
def get_library_config(
packages: List[str],
index_url: Optional[str],
additional_libraries: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Update the job configuration with the required libraries."""

@staticmethod
def get_library_config(
packages: List[str],
index_url: Optional[str],
additional_libraries: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Update the job configuration with the required libraries."""

libraries = []

for package in packages:
if index_url:
libraries.append({"pypi": {"package": package, "repo": index_url}})
else:
libraries.append({"pypi": {"package": package}})
libraries = []

for package in packages:
if index_url:
libraries.append({"pypi": {"package": package, "repo": index_url}})
else:
libraries.append({"pypi": {"package": package}})

for library in additional_libraries:
libraries.append(library)
for library in additional_libraries:
libraries.append(library)

return {"libraries": libraries}
return {"libraries": libraries}


class PythonJobConfigCompiler:
Expand All @@ -237,42 +198,20 @@ def __init__(
self,
api_client: DatabricksApiClient,
permission_builder: PythonPermissionBuilder,
run_name: str,
parsed_model: ParsedPythonModel,
cluster_spec: Dict[str, Any],
additional_job_settings: Dict[str, Any],
) -> None:
self.api_client = api_client
self.permission_builder = permission_builder
self.run_name = run_name
self.cluster_spec = cluster_spec
self.additional_job_settings = additional_job_settings

@staticmethod
def create(
api_client: DatabricksApiClient,
parsed_model: ParsedPythonModel,
cluster_spec: Dict[str, Any],
) -> "PythonJobConfigCompiler":
permission_builder = PythonPermissionBuilder.create(api_client, parsed_model)
self.run_name = parsed_model.run_name
packages = parsed_model.config.packages
index_url = parsed_model.config.index_url
additional_libraries = parsed_model.config.additional_libs
library_config = PythonLibraryConfigurer.get_library_config(
packages, index_url, additional_libraries
)
cluster_spec.update(library_config)
if parsed_model.config.python_job_config:
additional_job_settings = parsed_model.config.python_job_config.dict()
else:
additional_job_settings = {}

return PythonJobConfigCompiler(
api_client,
permission_builder,
parsed_model.run_name,
cluster_spec,
additional_job_settings,
)
library_config = get_library_config(packages, index_url, additional_libraries)
self.cluster_spec = {**cluster_spec, **library_config}
self.job_grants = parsed_model.config.python_job_config.grants
self.acls = parsed_model.config.access_control_list
self.additional_job_settings = parsed_model.config.python_job_config.dict()

def compile(self, path: str) -> PythonJobDetails:

Expand All @@ -285,7 +224,9 @@ def compile(self, path: str) -> PythonJobDetails:
job_spec.update(self.cluster_spec) # updates 'new_cluster' config

additional_job_config = self.additional_job_settings
access_control_list = self.permission_builder.build_job_permissions()
access_control_list = self.permission_builder.build_job_permissions(
self.job_grants, self.acls
)
if access_control_list:
job_spec["access_control_list"] = access_control_list

Expand Down Expand Up @@ -314,9 +255,11 @@ def create(
parsed_model: ParsedPythonModel,
cluster_spec: Dict[str, Any],
) -> "PythonNotebookSubmitter":
notebook_uploader = PythonNotebookUploader.create(api_client, parsed_model)
config_compiler = PythonJobConfigCompiler.create(
notebook_uploader = PythonNotebookUploader(api_client, parsed_model)
permission_builder = PythonPermissionBuilder(api_client)
config_compiler = PythonJobConfigCompiler(
api_client,
permission_builder,
parsed_model,
cluster_spec,
)
Expand Down Expand Up @@ -363,10 +306,29 @@ class AllPurposeClusterPythonJobHelper(BaseDatabricksHelper):
Top level helper for Python models using job runs or Command API on an all-purpose cluster.
"""

def __init__(self, parsed_model: Dict, credentials: DatabricksCredentials) -> None:
self.credentials = credentials
self.credentials.validate_creds()
self.parsed_model = ParsedPythonModel(**parsed_model)

self.api_client = DatabricksApiClient.create(
credentials,
self.parsed_model.config.timeout,
self.parsed_model.config.user_folder_for_python,
)

config = self.parsed_model.config
self.create_notebook = config.create_notebook
self.cluster_id = config.cluster_id or self.credentials.extract_cluster_id(
config.http_path or self.credentials.http_path or ""
)
self.validate_config()

self.command_submitter = self.build_submitter()

@override
def build_submitter(self) -> PythonSubmitter:
config = self.parsed_model.config
if config.create_notebook:
if self.create_notebook:
return PythonNotebookSubmitter.create(
self.api_client,
self.tracker,
Expand All @@ -378,10 +340,6 @@ def build_submitter(self) -> PythonSubmitter:

@override
def validate_config(self) -> None:
config = self.parsed_model.config
self.cluster_id = config.cluster_id or self.credentials.extract_cluster_id(
config.http_path or self.credentials.http_path or ""
)
if not self.cluster_id:
raise ValueError(
"Databricks `http_path` or `cluster_id` of an all-purpose cluster is required "
Expand Down Expand Up @@ -508,24 +466,35 @@ def __init__(
config_compiler: PythonWorkflowConfigCompiler,
permission_builder: PythonPermissionBuilder,
workflow_creater: PythonWorkflowCreater,
job_grants: Dict[str, List[Dict[str, str]]],
acls: List[Dict[str, str]],
) -> None:
self.api_client = api_client
self.tracker = tracker
self.uploader = uploader
self.config_compiler = config_compiler
self.permission_builder = permission_builder
self.workflow_creater = workflow_creater
self.job_grants = job_grants
self.acls = acls

@staticmethod
def create(
api_client: DatabricksApiClient, tracker: PythonRunTracker, parsed_model: ParsedPythonModel
) -> "PythonNotebookWorkflowSubmitter":
uploader = PythonNotebookUploader.create(api_client, parsed_model)
uploader = PythonNotebookUploader(api_client, parsed_model)
config_compiler = PythonWorkflowConfigCompiler.create(parsed_model)
permission_builder = PythonPermissionBuilder.create(api_client, parsed_model)
permission_builder = PythonPermissionBuilder(api_client)
workflow_creater = PythonWorkflowCreater(api_client.workflows)
return PythonNotebookWorkflowSubmitter(
api_client, tracker, uploader, config_compiler, permission_builder, workflow_creater
api_client,
tracker,
uploader,
config_compiler,
permission_builder,
workflow_creater,
parsed_model.config.python_job_config.grants,
parsed_model.config.access_control_list,
)

@override
Expand All @@ -535,7 +504,9 @@ def submit(self, compiled_code: str) -> None:
workflow_config, existing_job_id = self.config_compiler.compile(file_path)
job_id = self.workflow_creater.create_or_update(workflow_config, existing_job_id)

access_control_list = self.permission_builder.build_job_permissions()
access_control_list = self.permission_builder.build_job_permissions(
self.job_grants, self.acls
)
self.api_client.workflow_permissions.put(job_id, access_control_list)

run_id = self.api_client.workflows.run(job_id, enable_queueing=True)
Expand Down
6 changes: 5 additions & 1 deletion tests/unit/python/test_python_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ def test_parsed_model__empty_model_config(self):
assert config.packages == []
assert config.index_url is None
assert config.additional_libs == []
assert config.python_job_config is None
assert config.python_job_config.name is None
assert config.python_job_config.grants == {}
assert config.python_job_config.existing_job_id == ""
assert config.python_job_config.post_hook_tasks == []
assert config.python_job_config.additional_task_settings == {}
assert config.cluster_id is None
assert config.http_path is None
assert config.create_notebook is False
Expand Down
Loading

0 comments on commit 8c2fff8

Please sign in to comment.