Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial authorisations modeling #2954

Merged
merged 10 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions warehouse/macros/littlepay_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
SELECT
*,
-- have to parse the filename since there are no other timestamps seemingly
TIMESTAMP(PARSE_DATETIME(
'%Y%m%d%H%M',
REGEXP_EXTRACT(extract_filename, '([0-9]{12})_.*')
), 'UTC') AS littlepay_export_ts
{{ extract_littlepay_filename_ts() }} AS littlepay_export_ts
FROM {{ source(src, tbl) }}
)
{% endmacro %}
52 changes: 52 additions & 0 deletions warehouse/macros/littlepay_staging_transforms.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- regex to identify timestamp within a Littlepay filename
{% macro lp_filename_timestamp_regex() %}
'^([0-9]{12})_.*'
{% endmacro %}

-- regex to identify date within a Littlepay filename
{% macro lp_filename_date_regex() %}
'^([0-9]{8})[0-9]{4}_.*'
{% endmacro %}

-- use regex to actually extract timestamp from filename
{% macro extract_littlepay_filename_ts(column='extract_filename') %}
CASE
-- check that the column actually contains this pattern; there are some invalid filenames
WHEN REGEXP_EXTRACT({{ column }}, {{ lp_filename_timestamp_regex() }}) IS NOT NULL
THEN TIMESTAMP(PARSE_DATETIME(
'%Y%m%d%H%M',
REGEXP_EXTRACT({{ column }}, {{ lp_filename_timestamp_regex() }})
), 'UTC')
END
{% endmacro %}

-- use regex to actually extract date from filename
{% macro extract_littlepay_filename_date(column='extract_filename') %}
CASE
-- check that the column actually contains this pattern; there are some invalid filenames
WHEN REGEXP_EXTRACT({{ column }}, {{ lp_filename_date_regex() }}) IS NOT NULL
THEN PARSE_DATE(
'%Y%m%d',
REGEXP_EXTRACT({{ column }}, {{ lp_filename_date_regex() }})
)
END
{% endmacro %}

{% macro qualify_dedupe_lp_files(instance_col = 'instance', file_dt_col = 'littlepay_export_date', file_ts_col = 'littlepay_export_ts', ts_col = 'ts') %}

-- remove duplicate instances of the same file (file defined as date-level update from LP)
-- partition by file date, order by LP-defined timestamp (most recent first), and then order by our extract timestamp (most recent first)
-- use dense rank instead of row number because we need to allow all rows from a given file to be included (allow ties)
QUALIFY DENSE_RANK()
OVER (PARTITION BY {{ instance_col }}, {{ file_dt_col }} ORDER BY {{ file_ts_col }} DESC, {{ ts_col }} DESC) = 1

{% endmacro %}

{% macro qualify_dedupe_full_duplicate_lp_rows(content_hash_col = 'content_hash', file_ts_col = 'littlepay_export_ts', line_number_col = '_line_number') %}

-- remove full duplicate rows where *all* content is the same
-- get most recent instance across files and then highest-line-number instance within most recent file
QUALIFY ROW_NUMBER()
OVER (PARTITION BY {{ content_hash_col }} ORDER BY {{ file_ts_col }} DESC, {{ line_number_col }} DESC) = 1

{% endmacro %}
45 changes: 36 additions & 9 deletions warehouse/models/staging/payments/littlepay/_littlepay.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,6 @@ models:
- name: pending

- name: stg_littlepay__authorisations
tests:
- &littlepay_uniqueness
dbt_utils.unique_combination_of_columns:
combination_of_columns:
- instance
- extract_filename
- ts
- _line_number
columns:
- name: participant_id
description: Identifies the participant that the authorisation belongs to.
Expand All @@ -167,10 +159,45 @@ models:
- name: status
description: Status of authorisation. One of (`Authorised`, `Declined`, `Failed`, `Invalid`, `Lost`, `Stolen`, `Unavailable`, `Unknown`, `Verified`)
- name: authorisation_date_time_utc
- &lp_export_date
name: littlepay_export_date
description: |
Date of the source file from Littlepay. Date is extracted from filenames, which generally have the
structure {timestamp}_{data_type}.{file extension}.
- &lp_export_ts
name: littlepay_export_ts
description: |
Timestamp of the source file from Littlepay. Timestamp is extracted from filenames, which generally have the
structure {timestamp}_{data_type}.{file extension}.
- &lp_line_number
name: _line_number
description: |
Line number of this row in the source file.
Some line numbers may be missing because we drop extra copies of rows that are full duplicates of another row.
- &payments_input_row_key
name: _key
description: |
Synthetic key composed of Littlepay file date and line number to uniquely identify a row within source data.
tests:
- not_null
- unique
- name: _payments_key
description: |
Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.)
tests:
- not_null
- unique_proportion:
at_least: 0.999

- name: stg_littlepay__customer_funding_source
tests:
- *littlepay_uniqueness
- &littlepay_uniqueness
dbt_utils.unique_combination_of_columns:
combination_of_columns:
- instance
- extract_filename
- ts
- _line_number
columns:
- name: littlepay_export_ts
description: Export timestamp parsed from filename.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ WITH source AS (
SELECT * FROM {{ source('external_littlepay', 'authorisations') }}
),

stg_littlepay__authorisations AS (
clean_columns_and_dedupe_files AS (
SELECT
{{ trim_make_empty_string_null('participant_id') }} AS participant_id,
{{ trim_make_empty_string_null('aggregation_id') }} AS aggregation_id,
Expand All @@ -16,11 +16,84 @@ stg_littlepay__authorisations AS (
{{ trim_make_empty_string_null('response_code') }} AS response_code,
{{ trim_make_empty_string_null('status') }} AS status,
{{ safe_cast('authorisation_date_time_utc', type_timestamp()) }} AS authorisation_date_time_utc,
_line_number,
CAST(_line_number AS INTEGER) AS _line_number,
`instance`,
extract_filename,
-- we have two files with invalid names that cause attributes derived from filename to be missing
CASE
WHEN extract_filename = "24jan_datafeed.psv" THEN TIMESTAMP(DATE '2023-01-24')
WHEN extract_filename = "25jan_datafeed.psv" THEN TIMESTAMP(DATE '2023-01-25')
ELSE {{ extract_littlepay_filename_ts() }}
END AS littlepay_export_ts,

CASE
WHEN extract_filename = "24jan_datafeed.psv" THEN DATE '2023-01-24'
WHEN extract_filename = "25jan_datafeed.psv" THEN DATE '2023-01-25'
ELSE {{ extract_littlepay_filename_date() }}
END AS littlepay_export_date,
ts,
-- hash all content not generated by us to enable deduping full dup rows
-- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream
{{ dbt_utils.generate_surrogate_key(['participant_id',
'aggregation_id', 'acquirer_id', 'request_type', 'transaction_amount', 'currency_code',
'retrieval_reference_number', 'littlepay_reference_number', 'external_reference_number',
'response_code', 'status', 'authorisation_date_time_utc']) }} AS content_hash,
FROM source
-- drop extra header rows
WHERE aggregation_id != "aggregation_id"
{{ qualify_dedupe_lp_files() }}
),

add_keys_drop_full_dupes AS (
SELECT
*,
-- generate keys now that input columns have been trimmed & cast and files deduped
{{ dbt_utils.generate_surrogate_key(['littlepay_export_date', '_line_number', 'instance']) }} AS _key,
{{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key,
FROM clean_columns_and_dedupe_files
{{ qualify_dedupe_full_duplicate_lp_rows() }}
),

-- we have some authorisations where the same aggregation has multiple rows with the same timestamp
-- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped
-- the rest need to be handled downstream by checking against settlements data
same_timestamp_simple_dupes AS (
SELECT
_payments_key,
TRUE AS to_drop,
COUNT(DISTINCT retrieval_reference_number) AS ct_rrn,
COUNT(*) AS ct
FROM add_keys_drop_full_dupes
GROUP BY 1
HAVING ct > 1 AND ct_rrn = 1
),

stg_littlepay__authorisations AS (
SELECT
participant_id,
aggregation_id,
acquirer_id,
request_type,
transaction_amount,
currency_code,
retrieval_reference_number,
littlepay_reference_number,
external_reference_number,
response_code,
status,
authorisation_date_time_utc,
_line_number,
`instance`,
extract_filename,
littlepay_export_ts,
littlepay_export_date,
ts,
_key,
_payments_key,
FROM add_keys_drop_full_dupes
LEFT JOIN same_timestamp_simple_dupes
USING(_payments_key)
WHERE NOT COALESCE(to_drop, FALSE)
)

SELECT * FROM stg_littlepay__authorisations
Loading