Skip to content

Commit

Permalink
Merge branch 'main' into as-replace-table-as-select
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored Feb 29, 2024
2 parents 53f2d2c + a1c6a5a commit db62f56
Show file tree
Hide file tree
Showing 22 changed files with 1,271 additions and 110 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/check-md-link.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: Check Markdown links

on:
push:
paths:
- mkdocs/**

jobs:
markdown-link-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: gaurav-nelson/github-action-markdown-link-check@v1
57 changes: 57 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,63 @@ with table.update_schema(allow_incompatible_changes=True) as update:
update.delete_column("some_field")
```

## Partition evolution

PyIceberg supports partition evolution. See the [partition evolution](https://iceberg.apache.org/spec/#partition-evolution)
for more details.

The API to use when evolving partitions is the `update_spec` API on the table.

```python
with table.update_spec() as update:
update.add_field("id", BucketTransform(16), "bucketed_id")
update.add_field("event_ts", DayTransform(), "day_ts")
```

Updating the partition spec can also be done as part of a transaction with other operations.

```python
with table.transaction() as transaction:
with transaction.update_spec() as update_spec:
update_spec.add_field("id", BucketTransform(16), "bucketed_id")
update_spec.add_field("event_ts", DayTransform(), "day_ts")
# ... Update properties etc
```

### Add fields

New partition fields can be added via the `add_field` API which takes in the field name to partition on,
the partition transform, and an optional partition name. If the partition name is not specified,
one will be created.

```python
with table.update_spec() as update:
update.add_field("id", BucketTransform(16), "bucketed_id")
update.add_field("event_ts", DayTransform(), "day_ts")
# identity is a shortcut API for adding an IdentityTransform
update.identity("some_field")
```

### Remove fields

Partition fields can also be removed via the `remove_field` API if it no longer makes sense to partition on those fields.

```python
with table.update_spec() as update:some_partition_name
# Remove the partition field with the name
update.remove_field("some_partition_name")
```

### Rename fields

Partition fields can also be renamed via the `rename_field` API.

```python
with table.update_spec() as update:
# Rename the partition field with the name bucketed_id to sharded_id
update.rename_field("bucketed_id", "sharded_id")
```

## Table properties

Set and remove properties through the `Transaction` API:
Expand Down
1 change: 1 addition & 0 deletions mkdocs/docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Options:
--catalog TEXT
--verbose BOOLEAN
--output [text|json]
--ugi TEXT
--uri TEXT
--credential TEXT
--help Show this message and exit.
Expand Down
14 changes: 14 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,27 @@ catalog:
| Key | Example | Description |
| ---------------------- | ----------------------- | -------------------------------------------------------------------------------------------------- |
| uri | https://rest-catalog/ws | URI identifying the REST Server |
| ugi | t-1234:secret | Hadoop UGI for Hive client. |
| credential | t-1234:secret | Credential to use for OAuth2 credential flow when initializing the catalog |
| token | FEW23.DFSDF.FSDF | Bearer token value to use for `Authorization` header |
| rest.sigv4-enabled | true | Sign requests to the REST Server using AWS SigV4 protocol |
| rest.signing-region | us-east-1 | The region to use when SigV4 signing a request |
| rest.signing-name | execute-api | The service signing name to use when SigV4 signing a request |
| rest.authorization-url | https://auth-service/cc | Authentication URL to use for client credentials authentication (default: uri + 'v1/oauth/tokens') |

### Headers in RESTCatalog

To configure custom headers in RESTCatalog, include them in the catalog properties with the prefix `header.`. This
ensures that all HTTP requests to the REST service include the specified headers.

```yaml
catalog:
default:
uri: http://rest-catalog/ws/
credential: t-1234:secret
header.content-type: application/vnd.api+json
```

## SQL Catalog

The SQL catalog requires a database for its backend. PyIceberg supports PostgreSQL and SQLite through psycopg2. The database connection has to be configured using the `uri` property. See SQLAlchemy's [documentation for URL format](https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls):
Expand Down
8 changes: 6 additions & 2 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,21 @@ class _HiveClient:

_transport: TTransport
_client: Client
_ugi: Optional[List[str]]

def __init__(self, uri: str):
def __init__(self, uri: str, ugi: Optional[str] = None):
url_parts = urlparse(uri)
transport = TSocket.TSocket(url_parts.hostname, url_parts.port)
self._transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)

self._client = Client(protocol)
self._ugi = ugi.split(':') if ugi else None

def __enter__(self) -> Client:
self._transport.open()
if self._ugi:
self._client.set_ugi(*self._ugi)
return self._client

def __exit__(
Expand Down Expand Up @@ -233,7 +237,7 @@ class HiveCatalog(Catalog):

def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = _HiveClient(properties["uri"])
self._client = _HiveClient(properties["uri"], properties.get("ugi"))

def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
properties: Dict[str, str] = table.parameters
Expand Down
30 changes: 20 additions & 10 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class Endpoints:
SIGV4_REGION = "rest.signing-region"
SIGV4_SERVICE = "rest.signing-name"
AUTH_URL = "rest.authorization-url"
HEADER_PREFIX = "header."

NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8)

Expand All @@ -127,7 +128,7 @@ def _retry_hook(retry_state: RetryCallState) -> None:
_RETRY_ARGS = {
"retry": retry_if_exception_type(AuthorizationExpiredError),
"stop": stop_after_attempt(2),
"before": _retry_hook,
"before_sleep": _retry_hook,
"reraise": True,
}

Expand Down Expand Up @@ -242,10 +243,7 @@ def _create_session(self) -> Session:
self._refresh_token(session, self.properties.get(TOKEN))

# Set HTTP headers
session.headers["Content-type"] = "application/json"
session.headers["X-Client-Version"] = ICEBERG_REST_SPEC_VERSION
session.headers["User-Agent"] = f"PyIceberg/{__version__}"
session.headers["X-Iceberg-Access-Delegation"] = "vended-credentials"
self._config_headers(session)

# Configure SigV4 Request Signing
if str(self.properties.get(SIGV4, False)).lower() == "true":
Expand Down Expand Up @@ -292,8 +290,9 @@ def _fetch_access_token(self, session: Session, credential: str) -> str:
else:
client_id, client_secret = None, credential
data = {GRANT_TYPE: CLIENT_CREDENTIALS, CLIENT_ID: client_id, CLIENT_SECRET: client_secret, SCOPE: CATALOG_SCOPE}
# Uses application/x-www-form-urlencoded by default
response = session.post(url=self.auth_url, data=data)
response = session.post(
url=self.auth_url, data=data, headers={**session.headers, "Content-type": "application/x-www-form-urlencoded"}
)
try:
response.raise_for_status()
except HTTPError as exc:
Expand Down Expand Up @@ -447,17 +446,28 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response:
catalog=self,
)

def _refresh_token(self, session: Optional[Session] = None, new_token: Optional[str] = None) -> None:
def _refresh_token(self, session: Optional[Session] = None, initial_token: Optional[str] = None) -> None:
session = session or self._session
if new_token is not None:
self.properties[TOKEN] = new_token
if initial_token is not None:
self.properties[TOKEN] = initial_token
elif CREDENTIAL in self.properties:
self.properties[TOKEN] = self._fetch_access_token(session, self.properties[CREDENTIAL])

# Set Auth token for subsequent calls in the session
if token := self.properties.get(TOKEN):
session.headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {token}"

def _config_headers(self, session: Session) -> None:
session.headers["Content-type"] = "application/json"
session.headers["X-Client-Version"] = ICEBERG_REST_SPEC_VERSION
session.headers["User-Agent"] = f"PyIceberg/{__version__}"
session.headers["X-Iceberg-Access-Delegation"] = "vended-credentials"
header_properties = self._extract_headers_from_properties()
session.headers.update(header_properties)

def _extract_headers_from_properties(self) -> Dict[str, str]:
return {key[len(HEADER_PREFIX) :]: value for key, value in self.properties.items() if key.startswith(HEADER_PREFIX)}

@retry(**_RETRY_ARGS)
def create_table(
self,
Expand Down
4 changes: 3 additions & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,9 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
database_name = self.identifier_to_database(namespace)
if not self._namespace_exists(database_name):
raise NoSuchNamespaceError(f"Database {database_name} does not exists")

stmt = select(IcebergNamespaceProperties).where(
IcebergNamespaceProperties.catalog_name == self.name, IcebergNamespaceProperties.namespace == database_name
Expand Down
13 changes: 12 additions & 1 deletion pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,22 @@ def wrapper(*args: Any, **kwargs: Any): # type: ignore
@click.option("--catalog")
@click.option("--verbose", type=click.BOOL)
@click.option("--output", type=click.Choice(["text", "json"]), default="text")
@click.option("--ugi")
@click.option("--uri")
@click.option("--credential")
@click.pass_context
def run(ctx: Context, catalog: Optional[str], verbose: bool, output: str, uri: Optional[str], credential: Optional[str]) -> None:
def run(
ctx: Context,
catalog: Optional[str],
verbose: bool,
output: str,
ugi: Optional[str],
uri: Optional[str],
credential: Optional[str],
) -> None:
properties = {}
if ugi:
properties["ugi"] = ugi
if uri:
properties["uri"] = uri
if credential:
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,10 @@ def pyarrow_to_schema(schema: pa.Schema, name_mapping: Optional[NameMapping] = N
return visit_pyarrow(schema, visitor)


def _pyarrow_to_schema_without_ids(schema: pa.Schema) -> Schema:
return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs())


@singledispatch
def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], visitor: PyArrowSchemaVisitor[T]) -> T:
"""Apply a pyarrow schema visitor to any point within a schema.
Expand Down
Loading

0 comments on commit db62f56

Please sign in to comment.