Skip to content

Commit

Permalink
Merge branch 'main' into support-python-submissions
Browse files Browse the repository at this point in the history
  • Loading branch information
Avinash-1394 committed Jun 14, 2023
2 parents 209e02c + d9ac51f commit d044c35
Show file tree
Hide file tree
Showing 15 changed files with 812 additions and 22 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ repos:
# Identify invalid files
- id: check-ast
- id: check-yaml
args: ['--unsafe']
- id: check-json
- id: check-toml

Expand Down
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ _Additional information_
* `field_delimiter` (`default=none`)
* Custom field delimiter, for when format is set to `TEXTFILE`
* `table_properties`: table properties to add to the table, valid for Iceberg only
+ `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be made to manage data in S3. Data in S3 will only be cleared up for Iceberg tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds.
+ `seed_by_insert` (`default=false`)
+ default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement
+ large seed files cannot use `seed_by_insert`, as the SQL insert statement would exceed [the Athena limit of 262144 bytes](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html)
* `lf_tags_config` (`default=none`)
* [AWS lakeformation](#aws-lakeformation-integration) tags to associate with the table and columns
* format for model config:
Expand Down Expand Up @@ -283,20 +287,20 @@ Iceberg supports bucketing as hidden partitions, therefore use the `partitioned_
Iceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`.

It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported:
* `append`: new records are appended to the table, this can lead to duplicates
* `merge`: must be used in combination with `unique_key` and it's only available with Engine version 3.
It performs an upsert, new record are added, and record already existing are updated. If
`delete_condition` is provided in the model config, it can also delete records based on the
provided condition (SQL condition). You can use any column of the incremental table (`src`) or
the final table (`target`). You must prefix the column by the name of the table to prevent
`Column is ambiguous` error.
* `append`: New records are appended to the table, this can lead to duplicates.
* `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only available with Athena engine version 3.
- `unique_key` **(required)**: columns that define a unique record in the source and target tables.
- `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can be useful for improving performance via predicate pushdown on the target table.
- `delete_condition` (optional): SQL condition used to identify records that should be deleted.
- `delete_condition` and `incremental_predicates` can include any column of the incremental table (`src`) or the final table (`target`). Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error.

```sql
{{ config(
materialized='incremental',
table_type='iceberg',
incremental_strategy='merge',
unique_key='user_id',
incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"],
delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year",
format='parquet'
) }}
Expand Down
6 changes: 1 addition & 5 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,7 @@ def open(cls, connection: Connection) -> Connection:
session=get_boto3_session(connection),
retry_config=RetryConfig(
attempt=creds.num_retries,
exceptions=(
"ThrottlingException",
"TooManyRequestsException",
"InternalServerException",
),
exceptions=("ThrottlingException", "TooManyRequestsException", "InternalServerException"),
),
config=get_boto3_config(),
)
Expand Down
45 changes: 41 additions & 4 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
import agate
from botocore.exceptions import ClientError
from mypy_boto3_athena.type_defs import DataCatalogTypeDef
from mypy_boto3_glue.type_defs import ColumnTypeDef, TableTypeDef, TableVersionTypeDef
from mypy_boto3_glue.type_defs import (
ColumnTypeDef,
GetTableResponseTypeDef,
TableTypeDef,
TableVersionTypeDef,
)

from dbt.adapters.athena import AthenaConnectionManager
from dbt.adapters.athena.column import AthenaColumn
Expand All @@ -33,6 +38,7 @@
RELATION_TYPE_MAP,
AthenaRelation,
AthenaSchemaSearchMap,
TableType,
get_table_type,
)
from dbt.adapters.athena.s3 import S3DataNaming
Expand Down Expand Up @@ -191,10 +197,9 @@ def generate_s3_location(
return mapping[self._s3_data_naming(s3_data_naming)]

@available
def get_glue_table_location(self, relation: AthenaRelation) -> Optional[str]:
def get_glue_table(self, relation: AthenaRelation) -> Optional[GetTableResponseTypeDef]:
"""
Helper function to get location of a relation in S3.
Will return None if the table does not exist or does not have a location (views)
Helper function to get a relation via Glue
"""
conn = self.connections.get_thread_connection()
client = conn.handle
Expand All @@ -208,6 +213,30 @@ def get_glue_table_location(self, relation: AthenaRelation) -> Optional[str]:
LOGGER.debug(f"Table {relation.render()} does not exists - Ignoring")
return None
raise e
return table

@available
def get_glue_table_type(self, relation: AthenaRelation) -> Optional[TableType]:
"""
Get the table type of the relation from Glue
"""
table = self.get_glue_table(relation)
if not table:
LOGGER.debug(f"Table {relation.render()} does not exist - Ignoring")
return None

return get_table_type(table["Table"])

@available
def get_glue_table_location(self, relation: AthenaRelation) -> Optional[str]:
"""
Helper function to get location of a relation in S3.
Will return None if the table does not exist or does not have a location (views)
"""
table = self.get_glue_table(relation)
if not table:
LOGGER.debug(f"Table {relation.render()} does not exist - Ignoring")
return None

table_type = get_table_type(table["Table"])
table_location = table["Table"].get("StorageDescriptor", {}).get("Location")
Expand Down Expand Up @@ -811,3 +840,11 @@ def _generate_snapshot_migration_sql(self, relation: AthenaRelation, table_colum
drop_staging_sql.strip(),
]
)

@available
def is_list(self, value: Any) -> bool:
"""
This function is intended to test whether a Jinja object is
a list since this is complicated with purely Jinja syntax.
"""
return isinstance(value, list)
26 changes: 26 additions & 0 deletions dbt/include/athena/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,34 @@
{% macro athena__drop_relation(relation) -%}
{%- set native_drop = config.get('native_drop', default=false) -%}
{%- set rel_type_object = adapter.get_glue_table_type(relation) -%}
{%- set rel_type = none if rel_type_object == none else rel_type_object.value -%}
{%- set natively_droppable = rel_type == 'iceberg_table' or relation.type == 'view' -%}

{%- if native_drop and natively_droppable -%}
{%- do drop_relation_sql(relation) -%}
{%- else -%}
{%- do drop_relation_glue(relation) -%}
{%- endif -%}
{% endmacro %}

{% macro drop_relation_glue(relation) -%}
{%- do log('Dropping relation via Glue and S3 APIs') -%}
{%- do adapter.clean_up_table(relation) -%}
{%- do adapter.delete_from_glue_catalog(relation) -%}
{% endmacro %}

{% macro drop_relation_sql(relation) -%}

{%- do log('Dropping relation via SQL only') -%}
{% call statement('drop_relation', auto_begin=False) -%}
{%- if relation.type == 'view' -%}
drop {{ relation.type }} if exists {{ relation.render() }}
{%- else -%}
drop {{ relation.type }} if exists {{ relation.render_hive() }}
{% endif %}
{%- endcall %}
{% endmacro %}

{% macro set_table_classification(relation) -%}
{%- set format = config.get('format', default='parquet') -%}
{% call statement('set_table_classification', auto_begin=False) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,22 @@
{% do to_drop.append(tmp_relation) %}
{% elif strategy == 'merge' and table_type == 'iceberg' %}
{% set unique_key = config.get('unique_key') %}
{% set incremental_predicates = config.get('incremental_predicates') %}
{% set delete_condition = config.get('delete_condition') %}
{% set empty_unique_key -%}
Merge strategy must implement unique_key as a single column or a list of columns.
{%- endset %}
{% if unique_key is none %}
{% do exceptions.raise_compiler_error(empty_unique_key) %}
{% endif %}

{% if incremental_predicates is not none %}
{% set inc_predicates_not_list -%}
Merge strategy must implement incremental_predicates as a list of predicates.
{%- endset %}
{% if not adapter.is_list(incremental_predicates) %}
{% do exceptions.raise_compiler_error(inc_predicates_not_list) %}
{% endif %}
{% endif %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% if tmp_relation is not none %}
{% do drop_relation(tmp_relation) %}
Expand All @@ -76,10 +84,10 @@
{% do run_query(create_table_as(True, tmp_relation, compiled_code, model_language)) %}
{% else %}
{% call statement('py_save_table', language=model_language) -%}
{{ create_table_as(False, target_relation, compiled_code, model_language) }}
{{ create_table_as(True, target_relation, compiled_code, model_language) }}
{%- endcall %}
{% endif %}
{% set build_sql = iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, existing_relation, delete_condition) %}
{% set build_sql = iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, incremental_predicates, existing_relation, delete_condition) %}
{% do to_drop.append(tmp_relation) %}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
{{ "," if not is_last }}
{%- endmacro -%}

{% macro iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, existing_relation, delete_condition, statement_name="main") %}
{% macro iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, incremental_predicates, existing_relation, delete_condition, statement_name="main") %}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set merge_update_columns_default_rule = config.get('merge_update_columns_default_rule', 'replace') -%}
Expand Down Expand Up @@ -79,6 +79,13 @@
target.{{ key }} = src.{{ key }} {{ "and " if not loop.last }}
{%- endfor %}
)
{% if incremental_predicates is not none -%}
and (
{%- for inc_predicate in incremental_predicates %}
{{ inc_predicate }} {{ "and " if not loop.last }}
{%- endfor %}
)
{%- endif %}
{% if delete_condition is not none -%}
when matched and ({{ delete_condition }})
then delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
s3_data_naming,
external_location,
temporary) -%}
{%- set native_drop = config.get('native_drop', default=false) -%}

{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
Expand Down Expand Up @@ -47,7 +48,11 @@
{%- endif -%}
{%- endif %}

{% do adapter.delete_from_s3(location) %}
{%- if native_drop and table_type == 'iceberg' -%}
{% do log('Config native_drop enabled, skipping direct S3 delete') %}
{%- else -%}
{% do adapter.delete_from_s3(location) %}
{%- endif -%}
{%- if language == 'sql' -%}
create table {{ relation }}
with (
Expand Down
87 changes: 86 additions & 1 deletion dbt/include/athena/macros/materializations/seeds/helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,61 @@
) as {{ col }}
{% endmacro %}

{% macro athena__create_csv_table(model, agate_table) %}
{% macro create_csv_table_insert(model, agate_table) %}
{%- set identifier = model['alias'] -%}

{%- set lf_tags_config = config.get('lf_tags_config') -%}
{%- set lf_grants = config.get('lf_grants') -%}
{%- set column_override = config.get('column_types', {}) -%}
{%- set quote_seed_column = config.get('quote_columns') -%}
{%- set s3_data_dir = config.get('s3_data_dir', target.s3_data_dir) -%}
{%- set s3_data_naming = config.get('s3_data_naming', target.s3_data_naming) -%}
{%- set external_location = config.get('external_location') -%}

{%- set relation = api.Relation.create(
identifier=identifier,
schema=model.schema,
database=model.database,
type='table'
) -%}

{%- set location = adapter.generate_s3_location(relation,
s3_data_dir,
s3_data_naming,
external_location,
temporary) -%}

{% set sql_table %}
create external table {{ relation.render_hive() }} (
{%- for col_name in agate_table.column_names -%}
{%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%}
{%- set type = column_override.get(col_name, inferred_type) -%}
{%- set type = type if type != "string" else "varchar" -%}
{%- set column_name = (col_name | string) -%}
{{ adapter.quote_seed_column(column_name, quote_seed_column) }} {{ ddl_data_type(type) }} {%- if not loop.last -%}, {% endif -%}
{%- endfor -%}
)
location '{{ location }}'

{% endset %}

{% call statement('_') -%}
{{ sql_table }}
{%- endcall %}

{% if lf_tags_config is not none %}
{{ adapter.add_lf_tags(relation, lf_tags_config) }}
{% endif %}

{% if lf_grants is not none %}
{{ adapter.apply_lf_grants(relation, lf_grants) }}
{% endif %}

{{ return(sql) }}
{% endmacro %}


{% macro create_csv_table_upload(model, agate_table) %}
{%- set identifier = model['alias'] -%}

{%- set lf_tags_config = config.get('lf_tags_config') -%}
Expand Down Expand Up @@ -131,7 +185,38 @@
{{ return(sql_table) }}
{% endmacro %}

{% macro athena__create_csv_table(model, agate_table) %}

{%- set seed_by_insert = config.get('seed_by_insert', False) | as_bool -%}

{%- if seed_by_insert -%}
{% do log('seed by insert...') %}
{%- set sql_table = create_csv_table_insert(model, agate_table) -%}
{%- else -%}
{% do log('seed by upload...') %}
{%- set sql_table = create_csv_table_upload(model, agate_table) -%}
{%- endif -%}

{%- set lf_tags_config = config.get('lf_tags_config') -%}
{%- set lf_grants = config.get('lf_grants') -%}

{% if lf_tags_config is not none %}
{{ adapter.add_lf_tags(relation, lf_tags_config) }}
{% endif %}

{% if lf_grants is not none %}
{{ adapter.apply_lf_grants(relation, lf_grants) }}
{% endif %}

{{ return(sql_table) }}
{% endmacro %}

{# Overwrite to satisfy dbt-core logic #}
{% macro athena__load_csv_rows(model, agate_table) %}
{%- set seed_by_insert = config.get('seed_by_insert', False) | as_bool -%}
{%- if seed_by_insert %}
{{ default__load_csv_rows(model, agate_table) }}
{%- else -%}
select 1
{% endif %}
{% endmacro %}
Loading

0 comments on commit d044c35

Please sign in to comment.