diff --git a/CHANGELOG.md b/CHANGELOG.md index f7ee40186..31f62f8f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Under the hood - Explicitly close cursors ([#163](https://github.com/databricks/dbt-databricks/pull/163)) - Upgrade databricks-sql-connector to 2.0.5 ([#166](https://github.com/databricks/dbt-databricks/pull/166)) +- Embed dbt-databricks and databricks-sql-connector versions to SQL comments ([#167](https://github.com/databricks/dbt-databricks/pull/167)) ## dbt-databricks 1.2.1 (August 24, 2022) diff --git a/dbt/adapters/databricks/connections.py b/dbt/adapters/databricks/connections.py index fea88ca89..2869d2625 100644 --- a/dbt/adapters/databricks/connections.py +++ b/dbt/adapters/databricks/connections.py @@ -22,9 +22,16 @@ import dbt.exceptions from dbt.adapters.base import Credentials -from dbt.adapters.databricks import __version__ +from dbt.adapters.base.query_headers import MacroQueryStringSetter +from dbt.adapters.databricks.__version__ import version as __version__ from dbt.clients import agate_helper -from dbt.contracts.connection import AdapterResponse, Connection, ConnectionState +from dbt.contracts.connection import ( + AdapterResponse, + Connection, + ConnectionState, + DEFAULT_QUERY_COMMENT, +) +from dbt.contracts.graph.manifest import Manifest from dbt.events import AdapterLogger from dbt.events.functions import fire_event from dbt.events.types import ConnectionUsed, SQLQuery, SQLQueryStatus @@ -233,9 +240,42 @@ def __del__(self) -> None: warnings.warn("The cursor was closed by destructor.") +DATABRICKS_QUERY_COMMENT = f""" +{{%- set comment_dict = {{}} -%}} +{{%- do comment_dict.update( + app='dbt', + dbt_version=dbt_version, + dbt_databricks_version='{__version__}', + databricks_sql_connector_version='{dbsql.__version__}', + profile_name=target.get('profile_name'), + target_name=target.get('target_name'), +) -%}} +{{%- if node is not none -%}} + {{%- do comment_dict.update( + node_id=node.unique_id, + ) -%}} +{{% else %}} + {{# in the node context, the connection name is the node_id #}} + {{%- do comment_dict.update(connection_name=connection_name) -%}} +{{%- endif -%}} +{{{{ return(tojson(comment_dict)) }}}} +""" + + +class DatabricksMacroQueryStringSetter(MacroQueryStringSetter): + def _get_comment_macro(self) -> Optional[str]: + if self.config.query_comment.comment == DEFAULT_QUERY_COMMENT: + return DATABRICKS_QUERY_COMMENT + else: + return self.config.query_comment.comment + + class DatabricksConnectionManager(SparkConnectionManager): TYPE: ClassVar[str] = "databricks" + def set_query_header(self, manifest: Manifest) -> None: + self.query_header = DatabricksMacroQueryStringSetter(self.profile, manifest) + @contextmanager def exception_handler(self, sql: str) -> Iterator[None]: try: @@ -347,8 +387,7 @@ def open(cls, connection: Connection) -> Connection: creds: DatabricksCredentials = connection.credentials exc: Optional[Exception] = None - dbt_databricks_version = __version__.version - user_agent_entry = f"dbt-databricks/{dbt_databricks_version}" + user_agent_entry = f"dbt-databricks/{__version__}" invocation_env = os.environ.get(DBT_DATABRICKS_INVOCATION_ENV) if invocation_env is not None and len(invocation_env) > 0: