Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(doris): add catalog support for Apache Doris #31580

Merged
merged 8 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions superset/db_engine_specs/doris.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

from flask_babel import gettext as __
from sqlalchemy import Float, Integer, Numeric, String, TEXT, types
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.url import URL
from sqlalchemy.sql.type_api import TypeEngine

from superset.db_engine_specs.mysql import MySQLEngineSpec
from superset.errors import SupersetErrorType
from superset.models.core import Database
from superset.utils.core import GenericDataType

# Regular expressions to catch custom errors
Expand Down Expand Up @@ -111,6 +113,7 @@
)
encryption_parameters = {"ssl": "0"}
supports_dynamic_schema = True
supports_catalog = supports_dynamic_catalog = True

column_type_mappings = ( # type: ignore
(
Expand Down Expand Up @@ -245,17 +248,45 @@
catalog: Optional[str] = None,
schema: Optional[str] = None,
) -> tuple[URL, dict[str, Any]]:
database = uri.database
if schema and database:
schema = parse.quote(schema, safe="")
if "." in database:
database = database.split(".")[0] + "." + schema
else:
database = "internal." + schema
uri = uri.set(database=database)

if uri.database and "." in uri.database:
current_catalog, _ = uri.database.split(".", 1)

Check warning on line 252 in superset/db_engine_specs/doris.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/doris.py#L251-L252

Added lines #L251 - L252 were not covered by tests
else:
current_catalog = uri.database

Check warning on line 254 in superset/db_engine_specs/doris.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/doris.py#L254

Added line #L254 was not covered by tests

# In Apache Doris, each catalog has an information_schema for BI tool
# compatibility. See: https://github.com/apache/doris/pull/28919
adjusted_database = ".".join(

Check warning on line 258 in superset/db_engine_specs/doris.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/doris.py#L258

Added line #L258 was not covered by tests
[catalog or current_catalog or "", "information_schema"]
).rstrip(".")

uri = uri.set(database=adjusted_database)

Check warning on line 262 in superset/db_engine_specs/doris.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/doris.py#L262

Added line #L262 was not covered by tests
return uri, connect_args

@classmethod
def get_default_catalog(cls, database: Database) -> Optional[str]:
"""
Return the default catalog.
"""
if database.url_object.database is None:
return None

Check warning on line 271 in superset/db_engine_specs/doris.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/doris.py#L270-L271

Added lines #L270 - L271 were not covered by tests

return database.url_object.database.split(".")[0]

Check warning on line 273 in superset/db_engine_specs/doris.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/doris.py#L273

Added line #L273 was not covered by tests

@classmethod
def get_catalog_names(
cls,
database: Database,
inspector: Inspector,
) -> set[str]:
"""
Get all catalogs.
For Doris, the SHOW CATALOGS command returns multiple columns:
CatalogId, CatalogName, Type, IsCurrent, CreateTime, LastUpdateTime, Comment
We need to extract just the CatalogName column.
"""
result = inspector.bind.execute("SHOW CATALOGS")
return {row.CatalogName for row in result}

Check warning on line 288 in superset/db_engine_specs/doris.py

View check run for this annotation

Codecov / codecov/patch

superset/db_engine_specs/doris.py#L287-L288

Added lines #L287 - L288 were not covered by tests

@classmethod
def get_schema_from_engine_params(
cls,
Expand Down
50 changes: 50 additions & 0 deletions tests/unit_tests/db_engine_specs/test_doris.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

from typing import Any, Optional
from unittest.mock import Mock

import pytest
from sqlalchemy import JSON, types
Expand Down Expand Up @@ -145,3 +146,52 @@ def test_get_schema_from_engine_params() -> None:
)
is None
)


def test_get_default_catalog() -> None:
"""
Test the ``get_default_catalog`` method.
"""
from superset.db_engine_specs.doris import DorisEngineSpec
from superset.models.core import Database

database = Mock(spec=Database)

# Test with catalog.schema format
database.url_object.database = "catalog1.schema1"
assert DorisEngineSpec.get_default_catalog(database) == "catalog1"

# Test with only catalog format
database.url_object.database = "catalog1"
assert DorisEngineSpec.get_default_catalog(database) == "catalog1"

# Test with None
database.url_object.database = None
assert DorisEngineSpec.get_default_catalog(database) is None
villebro marked this conversation as resolved.
Show resolved Hide resolved


def test_get_catalog_names() -> None:
"""
Test the ``get_catalog_names`` method.
"""
from superset.db_engine_specs.doris import DorisEngineSpec
from superset.models.core import Database

database = Mock(spec=Database)
inspector = Mock()

# Mock the execute result
mock_result = [
Mock(CatalogName="catalog1"),
Mock(CatalogName="catalog2"),
Mock(CatalogName="catalog3"),
]
inspector.bind.execute.return_value = mock_result

catalogs = DorisEngineSpec.get_catalog_names(database, inspector)

# Verify the SQL query
inspector.bind.execute.assert_called_once_with("SHOW CATALOGS")

# Verify the returned catalog names
assert catalogs == {"catalog1", "catalog2", "catalog3"}
Loading