Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(doris): add catalog support for Apache Doris #31580

Merged
merged 8 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions superset/db_engine_specs/doris.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

from flask_babel import gettext as __
from sqlalchemy import Float, Integer, Numeric, String, TEXT, types
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.url import URL
from sqlalchemy.sql.type_api import TypeEngine

from superset.db_engine_specs.mysql import MySQLEngineSpec
from superset.errors import SupersetErrorType
from superset.models.core import Database
from superset.utils.core import GenericDataType

# Regular expressions to catch custom errors
Expand Down Expand Up @@ -111,6 +113,7 @@ class DorisEngineSpec(MySQLEngineSpec):
)
encryption_parameters = {"ssl": "0"}
supports_dynamic_schema = True
supports_catalog = supports_dynamic_catalog = True

column_type_mappings = ( # type: ignore
(
Expand Down Expand Up @@ -245,17 +248,45 @@ def adjust_engine_params(
catalog: Optional[str] = None,
schema: Optional[str] = None,
) -> tuple[URL, dict[str, Any]]:
database = uri.database
if schema and database:
schema = parse.quote(schema, safe="")
if "." in database:
database = database.split(".")[0] + "." + schema
else:
database = "internal." + schema
uri = uri.set(database=database)

if uri.database and "." in uri.database:
current_catalog, _ = uri.database.split(".", 1)
else:
current_catalog = "internal"

# In Apache Doris, each catalog has an information_schema for BI tool
# compatibility. See: https://github.com/apache/doris/pull/28919
adjusted_database = ".".join(
[catalog or current_catalog or "", "information_schema"]
).rstrip(".")
Copy link
Member

@villebro villebro Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may sound like a total nit, but I actually had some issues following what's going on here, especially the catalog or current_catalog or "" logic. As current_catalog is unnecessary if catalog is defined, I would have maybe just reused the latter variable for all these uses. Something like:

        if catalog:
            pass
        elif uri.database and "." in uri.database:
            catalog, _ = uri.database.split(".", 1) or ""  # notice how I also moved the `or ""` part here
        else:
            catalog = "internal"

Then later just

        adjusted_database = ".".join([catalog, "information_schema"])

Also, why is .rstrip(".") needed? I don't see how we can ever hit that, as adjusted_database will always end with .information_schema, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@villebro Thanks for your advice. After in-depth testing with Doris, we found that there is still a problem. The previous test only tested the case of linking data sources. When operating on SQL Lab, it will also go to this function and cannot use the information_schema library fixedly. When there is a schema value, the user-provided schema should be used for querying. This implementation is the correct behavior at present.

  1. When linking data sources, the schema is empty and the information_schema library is used uniformly
  2. When the schema has a value, the schema value provided by the user is used


uri = uri.set(database=adjusted_database)
return uri, connect_args

@classmethod
def get_default_catalog(cls, database: Database) -> Optional[str]:
"""
Return the default catalog.
"""
if database.url_object.database is None:
return None

return database.url_object.database.split(".")[0]

@classmethod
def get_catalog_names(
cls,
database: Database,
inspector: Inspector,
) -> set[str]:
"""
Get all catalogs.
For Doris, the SHOW CATALOGS command returns multiple columns:
CatalogId, CatalogName, Type, IsCurrent, CreateTime, LastUpdateTime, Comment
We need to extract just the CatalogName column.
"""
result = inspector.bind.execute("SHOW CATALOGS")
return {row.CatalogName for row in result}

@classmethod
def get_schema_from_engine_params(
cls,
Expand Down
97 changes: 83 additions & 14 deletions tests/unit_tests/db_engine_specs/test_doris.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

from typing import Any, Optional
from unittest.mock import Mock

import pytest
from sqlalchemy import JSON, types
Expand Down Expand Up @@ -85,25 +86,25 @@ def test_get_column_spec(
(
"doris://user:password@host/db1",
{"param1": "some_value"},
"db1",
"internal.information_schema",
villebro marked this conversation as resolved.
Show resolved Hide resolved
{"param1": "some_value"},
),
(
"pydoris://user:password@host/db1",
{"param1": "some_value"},
"db1",
"internal.information_schema",
{"param1": "some_value"},
),
(
"doris://user:password@host/catalog1.db1",
{"param1": "some_value"},
"catalog1.db1",
"catalog1.information_schema",
{"param1": "some_value"},
),
(
"pydoris://user:password@host/catalog1.db1",
{"param1": "some_value"},
"catalog1.db1",
"catalog1.information_schema",
{"param1": "some_value"},
),
],
Expand All @@ -120,28 +121,96 @@ def test_adjust_engine_params(
returned_url, returned_connect_args = DorisEngineSpec.adjust_engine_params(
url, connect_args
)

assert returned_url.database == return_schema
assert returned_connect_args == return_connect_args


def test_get_schema_from_engine_params() -> None:
@pytest.mark.parametrize(
"url,expected_schema",
[
("doris://localhost:9030/hive.test", "test"),
("doris://localhost:9030/hive", None),
],
)
def test_get_schema_from_engine_params(
url: str, expected_schema: Optional[str]
) -> None:
"""
Test the ``get_schema_from_engine_params`` method.
"""
from superset.db_engine_specs.doris import DorisEngineSpec

assert (
DorisEngineSpec.get_schema_from_engine_params(
make_url("doris://localhost:9030/hive.test"),
make_url(url),
{},
)
== "test"
== expected_schema
)

assert (
DorisEngineSpec.get_schema_from_engine_params(
make_url("doris://localhost:9030/hive"),
{},
)
is None
)

@pytest.mark.parametrize(
"database_value,expected_catalog",
[
("catalog1.schema1", "catalog1"),
("catalog1", "catalog1"),
(None, None),
],
)
def test_get_default_catalog(
database_value: Optional[str], expected_catalog: Optional[str]
) -> None:
"""
Test the ``get_default_catalog`` method.
"""
from superset.db_engine_specs.doris import DorisEngineSpec
from superset.models.core import Database

database = Mock(spec=Database)
database.url_object.database = database_value

assert DorisEngineSpec.get_default_catalog(database) == expected_catalog


@pytest.mark.parametrize(
"mock_catalogs,expected_result",
[
(
[
Mock(CatalogName="catalog1"),
Mock(CatalogName="catalog2"),
Mock(CatalogName="catalog3"),
],
{"catalog1", "catalog2", "catalog3"},
),
(
[Mock(CatalogName="single_catalog")],
{"single_catalog"},
),
(
[],
set(),
),
],
)
def test_get_catalog_names(
mock_catalogs: list[Mock], expected_result: set[str]
) -> None:
"""
Test the ``get_catalog_names`` method.
"""
from superset.db_engine_specs.doris import DorisEngineSpec
from superset.models.core import Database

database = Mock(spec=Database)
inspector = Mock()
inspector.bind.execute.return_value = mock_catalogs

catalogs = DorisEngineSpec.get_catalog_names(database, inspector)

# Verify the SQL query
inspector.bind.execute.assert_called_once_with("SHOW CATALOGS")

# Verify the returned catalog names
assert catalogs == expected_result
Loading