Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement support bucket function for more than 100 partitions #549

Conversation

sanromeo
Copy link
Contributor

@sanromeo sanromeo commented Dec 28, 2023

Description

Resolves: #529

  • Fix error FUNCTION_NOT_FOUND: line 2:21: Function 'bucket' not registered by adding regex for extract bucket column from partitioned_by config
  • Implement murmur3_hash method to calculate bucket number of bucket column value as it works for Iceberg tables in Athena
  • Implement support for more than 100 partitions when bucket function used into partitioned_by config for Iceberg tables
  • Improve get_partition_batches macros with put bucket_column values to appropriate bucket number and than and add values to final WHERE condition for batches

Tested on next column types for bucket function:

  • Int
  • Long (Bigint in Athena)
  • String
  • Date
  • Timestamp

Not works now for next types (needed additional implementation in dbt-athena-community adapter):

  • Bytes
  • UUID
  • Decimal

Model used to test

{{
  config(
    schema='sandbox',
    materialized='table',
    partitioned_by=['DAY(date_column)', 'doy', 'bucket(random_str, 5)'],
    table_type='iceberg'
  )
}}

WITH random_strings AS (
    SELECT
        CHR(CAST(65 + FLOOR(RANDOM() * 26) AS BIGINT)) ||
        CHR(CAST(65 + FLOOR(RANDOM() * 26) AS BIGINT)) ||
        CHR(CAST(65 + FLOOR(RANDOM() * 26) AS BIGINT)) AS random_str
    FROM
        (SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3) AS temp_table
)
SELECT
    CAST(date_column AS DATE) as date_column,
    doy(date_column) as doy,
    rnd.random_str
FROM (
    VALUES (
        SEQUENCE(FROM_ISO8601_DATE('2023-01-01'), FROM_ISO8601_DATE('2023-07-24'), INTERVAL '1' DAY)
    )
) AS t1(date_array)
CROSS JOIN UNNEST(date_array) AS t2(date_column)
JOIN random_strings rnd ON true

A part of log output:

10:19:45  BATCH PROCESSING: 5 OF 5
10:19:45  Using athena connection "model.test"
10:19:45  On model.test: 
    insert into "awsdatacatalog"."sandbox"."test__ha" ("date_column", "doy", "random_str")
                select "date_column", "doy", "random_str"
                from "awsdatacatalog"."sandbox"."test__ha__tmp_not_partitioned"
                where (date_trunc('day', date_column)=DATE'2023-07-20' and doy=201 and random_str IN ('ASK', 'WLI')) or (date_trunc('day', date_column)=DATE'2023-07-20' and doy=201 and random_str IN ('JIC')) or (date_trunc('day', date_column)=DATE'2023-07-21' and doy=202 and random_str IN ('ASK', 'WLI')) or (date_trunc('day', date_column)=DATE'2023-07-21' and doy=202 and random_str IN ('JIC')) or (date_trunc('day', date_column)=DATE'2023-07-22' and doy=203 and random_str IN ('ASK', 'WLI')) or (date_trunc('day', date_column)=DATE'2023-07-22' and doy=203 and random_str IN ('JIC')) or (date_trunc('day', date_column)=DATE'2023-07-23' and doy=204 and random_str IN ('ASK', 'WLI')) or (date_trunc('day', date_column)=DATE'2023-07-23' and doy=204 and random_str IN ('JIC')) or (date_trunc('day', date_column)=DATE'2023-07-24' and doy=205 and random_str IN ('ASK', 'WLI')) or (date_trunc('day', date_column)=DATE'2023-07-24' and doy=205 and random_str IN ('JIC'))
10:19:45  Athena adapter: Athena query ID 75e66acd-b28d-44c1-aa69-95164b903b9d
10:19:52  SQL status: OK 15 in 7.0 seconds

Info to compare how WHERE clause above generated for this values:

`JIC` has bucket number - 3
`ASK` ,`WLI` have bucket number - 4

Checklist

  • You followed contributing section
  • You kept your Pull Request small and focused on a single feature or bug fix.
  • You added unit testing when necessary
  • You added functional testing when necessary

@sanromeo sanromeo changed the title Implement support bucket function for more than 100 partitions feat: Implement support bucket function for more than 100 partitions Dec 28, 2023
@sanromeo
Copy link
Contributor Author

Now I will also write and add the unit and functional tests 👌

dev-requirements.txt Outdated Show resolved Hide resolved
@svdimchenko
Copy link
Contributor

@sanromeo awesome job done 💪
@mrshu could you please test if this PR resolves your issue with bucketing ?

tests/unit/test_adapter.py Outdated Show resolved Hide resolved
@svdimchenko
Copy link
Contributor

@nicor88 @Jrmyy do you want to test it as well ?

Copy link
Contributor

@nicor88 nicor88 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@svdimchenko good looks good and functional tests are enough for me.

Thanks for the implementation @sanromeo 💯

@svdimchenko svdimchenko merged commit 7e53aca into dbt-labs:main Jan 4, 2024
10 checks passed
@Gatsby-Lee
Copy link
Contributor

encountered this issue.
Thank you for the fix @sanromeo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Athena partitions limit fix (#360) fails with partitions defined as non-Athena functions
4 participants