Skip to content

Commit

Permalink
Initial authorisations modeling (#2954)
Browse files Browse the repository at this point in the history
* clean strings and use lp source macro

* use column macros to extract lp filename attributes

* update macro file name and start duplicate handling

* fix date macro

* add tests and key construction

* dedupe full dup rows and only do date imputation once

* refactor macros for qualify dedupe statements

* add descriptions in yaml

* fix duplicate yaml anchor

* relax uniqueness and drop simple dup rows
  • Loading branch information
lauriemerrell authored Oct 3, 2023
1 parent bfbc6a8 commit 32c1f76
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 15 deletions.
5 changes: 1 addition & 4 deletions warehouse/macros/littlepay_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
SELECT
*,
-- have to parse the filename since there are no other timestamps seemingly
TIMESTAMP(PARSE_DATETIME(
'%Y%m%d%H%M',
REGEXP_EXTRACT(extract_filename, '([0-9]{12})_.*')
), 'UTC') AS littlepay_export_ts
{{ extract_littlepay_filename_ts() }} AS littlepay_export_ts
FROM {{ source(src, tbl) }}
)
{% endmacro %}
52 changes: 52 additions & 0 deletions warehouse/macros/littlepay_staging_transforms.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- regex to identify timestamp within a Littlepay filename
{% macro lp_filename_timestamp_regex() %}
'^([0-9]{12})_.*'
{% endmacro %}

-- regex to identify date within a Littlepay filename
{% macro lp_filename_date_regex() %}
'^([0-9]{8})[0-9]{4}_.*'
{% endmacro %}

-- use regex to actually extract timestamp from filename
{% macro extract_littlepay_filename_ts(column='extract_filename') %}
CASE
-- check that the column actually contains this pattern; there are some invalid filenames
WHEN REGEXP_EXTRACT({{ column }}, {{ lp_filename_timestamp_regex() }}) IS NOT NULL
THEN TIMESTAMP(PARSE_DATETIME(
'%Y%m%d%H%M',
REGEXP_EXTRACT({{ column }}, {{ lp_filename_timestamp_regex() }})
), 'UTC')
END
{% endmacro %}

-- use regex to actually extract date from filename
{% macro extract_littlepay_filename_date(column='extract_filename') %}
CASE
-- check that the column actually contains this pattern; there are some invalid filenames
WHEN REGEXP_EXTRACT({{ column }}, {{ lp_filename_date_regex() }}) IS NOT NULL
THEN PARSE_DATE(
'%Y%m%d',
REGEXP_EXTRACT({{ column }}, {{ lp_filename_date_regex() }})
)
END
{% endmacro %}

{% macro qualify_dedupe_lp_files(instance_col = 'instance', file_dt_col = 'littlepay_export_date', file_ts_col = 'littlepay_export_ts', ts_col = 'ts') %}

-- remove duplicate instances of the same file (file defined as date-level update from LP)
-- partition by file date, order by LP-defined timestamp (most recent first), and then order by our extract timestamp (most recent first)
-- use dense rank instead of row number because we need to allow all rows from a given file to be included (allow ties)
QUALIFY DENSE_RANK()
OVER (PARTITION BY {{ instance_col }}, {{ file_dt_col }} ORDER BY {{ file_ts_col }} DESC, {{ ts_col }} DESC) = 1

{% endmacro %}

{% macro qualify_dedupe_full_duplicate_lp_rows(content_hash_col = 'content_hash', file_ts_col = 'littlepay_export_ts', line_number_col = '_line_number') %}

-- remove full duplicate rows where *all* content is the same
-- get most recent instance across files and then highest-line-number instance within most recent file
QUALIFY ROW_NUMBER()
OVER (PARTITION BY {{ content_hash_col }} ORDER BY {{ file_ts_col }} DESC, {{ line_number_col }} DESC) = 1

{% endmacro %}
45 changes: 36 additions & 9 deletions warehouse/models/staging/payments/littlepay/_littlepay.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,6 @@ models:
- name: pending

- name: stg_littlepay__authorisations
tests:
- &littlepay_uniqueness
dbt_utils.unique_combination_of_columns:
combination_of_columns:
- instance
- extract_filename
- ts
- _line_number
columns:
- name: participant_id
description: Identifies the participant that the authorisation belongs to.
Expand All @@ -167,10 +159,45 @@ models:
- name: status
description: Status of authorisation. One of (`Authorised`, `Declined`, `Failed`, `Invalid`, `Lost`, `Stolen`, `Unavailable`, `Unknown`, `Verified`)
- name: authorisation_date_time_utc
- &lp_export_date
name: littlepay_export_date
description: |
Date of the source file from Littlepay. Date is extracted from filenames, which generally have the
structure {timestamp}_{data_type}.{file extension}.
- &lp_export_ts
name: littlepay_export_ts
description: |
Timestamp of the source file from Littlepay. Timestamp is extracted from filenames, which generally have the
structure {timestamp}_{data_type}.{file extension}.
- &lp_line_number
name: _line_number
description: |
Line number of this row in the source file.
Some line numbers may be missing because we drop extra copies of rows that are full duplicates of another row.
- &payments_input_row_key
name: _key
description: |
Synthetic key composed of Littlepay file date and line number to uniquely identify a row within source data.
tests:
- not_null
- unique
- name: _payments_key
description: |
Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.)
tests:
- not_null
- unique_proportion:
at_least: 0.999

- name: stg_littlepay__customer_funding_source
tests:
- *littlepay_uniqueness
- &littlepay_uniqueness
dbt_utils.unique_combination_of_columns:
combination_of_columns:
- instance
- extract_filename
- ts
- _line_number
columns:
- name: littlepay_export_ts
description: Export timestamp parsed from filename.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ WITH source AS (
SELECT * FROM {{ source('external_littlepay', 'authorisations') }}
),

stg_littlepay__authorisations AS (
clean_columns_and_dedupe_files AS (
SELECT
{{ trim_make_empty_string_null('participant_id') }} AS participant_id,
{{ trim_make_empty_string_null('aggregation_id') }} AS aggregation_id,
Expand All @@ -16,11 +16,84 @@ stg_littlepay__authorisations AS (
{{ trim_make_empty_string_null('response_code') }} AS response_code,
{{ trim_make_empty_string_null('status') }} AS status,
{{ safe_cast('authorisation_date_time_utc', type_timestamp()) }} AS authorisation_date_time_utc,
_line_number,
CAST(_line_number AS INTEGER) AS _line_number,
`instance`,
extract_filename,
-- we have two files with invalid names that cause attributes derived from filename to be missing
CASE
WHEN extract_filename = "24jan_datafeed.psv" THEN TIMESTAMP(DATE '2023-01-24')
WHEN extract_filename = "25jan_datafeed.psv" THEN TIMESTAMP(DATE '2023-01-25')
ELSE {{ extract_littlepay_filename_ts() }}
END AS littlepay_export_ts,

CASE
WHEN extract_filename = "24jan_datafeed.psv" THEN DATE '2023-01-24'
WHEN extract_filename = "25jan_datafeed.psv" THEN DATE '2023-01-25'
ELSE {{ extract_littlepay_filename_date() }}
END AS littlepay_export_date,
ts,
-- hash all content not generated by us to enable deduping full dup rows
-- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream
{{ dbt_utils.generate_surrogate_key(['participant_id',
'aggregation_id', 'acquirer_id', 'request_type', 'transaction_amount', 'currency_code',
'retrieval_reference_number', 'littlepay_reference_number', 'external_reference_number',
'response_code', 'status', 'authorisation_date_time_utc']) }} AS content_hash,
FROM source
-- drop extra header rows
WHERE aggregation_id != "aggregation_id"
{{ qualify_dedupe_lp_files() }}
),

add_keys_drop_full_dupes AS (
SELECT
*,
-- generate keys now that input columns have been trimmed & cast and files deduped
{{ dbt_utils.generate_surrogate_key(['littlepay_export_date', '_line_number', 'instance']) }} AS _key,
{{ dbt_utils.generate_surrogate_key(['aggregation_id', 'authorisation_date_time_utc']) }} AS _payments_key,
FROM clean_columns_and_dedupe_files
{{ qualify_dedupe_full_duplicate_lp_rows() }}
),

-- we have some authorisations where the same aggregation has multiple rows with the same timestamp
-- these seem like clear duplicates, and some of them one of the two copies is missing status and RRN; these can be dropped
-- the rest need to be handled downstream by checking against settlements data
same_timestamp_simple_dupes AS (
SELECT
_payments_key,
TRUE AS to_drop,
COUNT(DISTINCT retrieval_reference_number) AS ct_rrn,
COUNT(*) AS ct
FROM add_keys_drop_full_dupes
GROUP BY 1
HAVING ct > 1 AND ct_rrn = 1
),

stg_littlepay__authorisations AS (
SELECT
participant_id,
aggregation_id,
acquirer_id,
request_type,
transaction_amount,
currency_code,
retrieval_reference_number,
littlepay_reference_number,
external_reference_number,
response_code,
status,
authorisation_date_time_utc,
_line_number,
`instance`,
extract_filename,
littlepay_export_ts,
littlepay_export_date,
ts,
_key,
_payments_key,
FROM add_keys_drop_full_dupes
LEFT JOIN same_timestamp_simple_dupes
USING(_payments_key)
WHERE NOT COALESCE(to_drop, FALSE)
)

SELECT * FROM stg_littlepay__authorisations

0 comments on commit 32c1f76

Please sign in to comment.