Skip to content

Commit

Permalink
add support for glue.id
Browse files Browse the repository at this point in the history
  • Loading branch information
jrouly committed Mar 1, 2024
1 parent 36b56eb commit b2aff44
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
27 changes: 27 additions & 0 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@
if TYPE_CHECKING:
import pyarrow as pa


# There is a unique Glue metastore in each AWS account and each AWS region. By default, GlueCatalog chooses the Glue
# metastore to use based on the user's default AWS client credential and region setup. You can specify the Glue catalog
# ID through glue.id catalog property to point to a Glue catalog in a different AWS account. The Glue catalog ID is your
# numeric AWS account ID.
GLUE_ID = "glue.id"

# If Glue should skip archiving an old table version when creating a new version in a commit. By
# default, Glue archives all old table versions after an UpdateTable call, but Glue has a default
# max number of archived table versions (can be increased). So for streaming use case with lots
Expand Down Expand Up @@ -252,6 +259,22 @@ def _construct_database_input(database_name: str, properties: Properties) -> Dat
return database_input


def _register_glue_catalog_id_with_glue_client(glue: GlueClient, glue_catalog_id: str) -> None:
"""
Register the Glue Catalog ID (AWS Account ID) as a parameter on all Glue client methods.
It's more ergonomic to do this than to pass the CatalogId as a parameter to every client call since it's an optional
parameter and boto3 does not support 'None' values for missing parameters.
"""
event_system = glue.meta.events

def add_glue_catalog_id(params, **kwargs) -> None: # type: ignore[no-untyped-def]
if "CatalogId" not in params:
params["CatalogId"] = glue_catalog_id

event_system.register("provide-client-params.glue", add_glue_catalog_id)


class GlueCatalog(Catalog):
def __init__(self, name: str, **properties: Any):
super().__init__(name, **properties)
Expand All @@ -266,6 +289,10 @@ def __init__(self, name: str, **properties: Any):
)
self.glue: GlueClient = session.client("glue")

self.glue_catalog_id: Optional[str] = properties.get("glue.id")
if self.glue_catalog_id is not None:
_register_glue_catalog_id_with_glue_client(self.glue, self.glue_catalog_id)

def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
properties: Properties = glue_table["Parameters"]

Expand Down
21 changes: 21 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,27 @@ def test_create_table_with_no_database(
test_catalog.create_table(identifier=identifier, schema=table_schema_nested)


@mock_aws
def test_create_table_with_glue_catalog_id(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
catalog_id = "444444444444"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(
catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}", "glue.id": catalog_id}
)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0

glue = boto3.client("glue")
databases = glue.get_databases()
assert databases["DatabaseList"][0]["CatalogId"] == catalog_id


@mock_aws
def test_create_duplicated_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
Expand Down

0 comments on commit b2aff44

Please sign in to comment.