Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cli command to create directfs to table mapping #3427

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
18 changes: 18 additions & 0 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,24 @@ def create_table_mapping(
webbrowser.open(f"{w.config.host}/#workspace{path}")


@ucx.command
def create_directfs_mapping(
w: WorkspaceClient,
ctx: WorkspaceContext | None = None,
run_as_collection: bool = False,
a: AccountClient | None = None,
):
"""Create DirectFS mapping for all the direcfs references in the workspace"""
workspace_contexts = _get_workspace_contexts(w, a, run_as_collection)

if ctx:
workspace_contexts = [ctx]
for workspace_ctx in workspace_contexts:
workspace_ctx.directfs_mapping.save(
workspace_ctx.directfs_crawler, workspace_ctx.tables_crawler, workspace_ctx.workspace_info
)


@ucx.command
def validate_external_locations(
w: WorkspaceClient,
Expand Down
5 changes: 5 additions & 0 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from databricks.labs.ucx.assessment.jobs import JobsCrawler
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
from databricks.labs.ucx.hive_metastore.directfs_mapping import DirectFsMapping
from databricks.labs.ucx.hive_metastore.pipelines_migrate import PipelinesMigrator
from databricks.labs.ucx.recon.data_comparator import StandardDataComparator
from databricks.labs.ucx.recon.data_profiler import StandardDataProfiler
Expand Down Expand Up @@ -456,6 +457,10 @@ def iam_credential_manager(self) -> CredentialManager:
def table_mapping(self) -> TableMapping:
return TableMapping(self.installation, self.workspace_client, self.sql_backend)

@cached_property
def directfs_mapping(self) -> DirectFsMapping:
return DirectFsMapping(self.installation, self.workspace_client, self.sql_backend)

@cached_property
def catalog_schema(self) -> CatalogSchema:
return CatalogSchema(self.workspace_client, self.table_mapping, self.migrate_grants, self.config.ucx_catalog)
Expand Down
114 changes: 114 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/directfs_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging
import re
from collections.abc import Iterable
from dataclasses import dataclass

from databricks.labs.blueprint.installation import Installation
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.account.workspaces import WorkspaceInfo
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler

logger = logging.getLogger(__name__)


@dataclass
class DirectFsRule:
"""
A rule for direct filesystem access to UC table mapping.
"""

workspace_name: str
path: str
is_read: bool
is_write: bool
catalog_name: str
dst_schema: str
dst_table: str

@classmethod
def initial(
cls,
workspace_name: str,
path: str,
is_read: bool,
is_write: bool,
catalog_name: str,
dst_schema: str,
dst_table: str,
) -> "DirectFsRule":
return cls(
workspace_name=workspace_name,
path=path,
is_read=is_read,
is_write=is_write,
catalog_name=catalog_name,
dst_schema=dst_schema,
dst_table=dst_table,
)


class DirectFsMapping:
FILENAME = 'directfs_mapping.csv'
UCX_SKIP_PROPERTY = "databricks.labs.ucx.skip"

def __init__(
self,
installation: Installation,
ws: WorkspaceClient,
sql_backend: SqlBackend,
) -> None:
self._installation = installation
self._ws = ws
self._sql_backend = sql_backend

def directfs_list(
self,
directfs_crawler: DirectFsAccessCrawler,
tables_crawler: TablesCrawler,
workspace_name: str,
catalog_name: str,
) -> Iterable["DirectFsRule"]:
"""
List all direct filesystem access records.
"""
directfs_snapshot = []
directfs_crawler.dump_all(directfs_snapshot)
pritishpai marked this conversation as resolved.
Show resolved Hide resolved
tables_snapshot = list(tables_crawler.snapshot())
if not tables_snapshot:
msg = "No tables found. Please run: databricks labs ucx ensure-assessment-run"
raise ValueError(msg)
if not directfs_snapshot:
msg = "No directfs references found in code"
raise ValueError(msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not find any directfs references then we just raise the error and return


# TODO: very inefficient search, just for initial testing
for table in tables_snapshot:
for directfs_record in directfs_snapshot:
if table.location:
if directfs_record.path in table.location:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about dfsa's that do not have a match with tables? We want to include those too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wont be having a mapping with a table to replace them and ignore them right?

yield DirectFsRule.initial(
workspace_name=workspace_name,
path=directfs_record.path,
is_read=directfs_record.is_read,
is_write=directfs_record.is_write,
catalog_name=catalog_name,
dst_schema=table.database,
dst_table=table.name,
)

def save(
self, directfs_crawler: DirectFsAccessCrawler, tables_crawler: TablesCrawler, workspace_info: WorkspaceInfo
) -> str:
"""
Save direct filesystem access records to a CSV file.
"""
workspace_name = workspace_info.current()
default_catalog_name = re.sub(r"\W+", "_", workspace_name)
directfs_records = []


directfs_records = self.directfs_list(directfs_crawler, tables_crawler, workspace_name, default_catalog_name)
return self._installation.save(list(directfs_records), filename=self.FILENAME)
Loading