Skip to content

Commit

Permalink
Payments: Summarized authorisations model (#3001)
Browse files Browse the repository at this point in the history
* create authorisation deduping model and add upstream test

* pivoted authorisations working wip

* refactor payments row access policy macro for reusability

* wip: summarize authorisations -- get latest rather than pivot

* fix deduplication logic

* completed summarized authorisations

* remove unused yaml anchor
  • Loading branch information
lauriemerrell authored Oct 16, 2023
1 parent 8005934 commit 11ec0ee
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 67 deletions.
76 changes: 76 additions & 0 deletions warehouse/macros/create_row_access_policy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,79 @@ filter using (
{% endif %}
)
{% endmacro %}

{% macro payments_row_access_policy() %}

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'mst',
principals = ['serviceAccount:[email protected]']
) }};

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'sacrt',
principals = ['serviceAccount:[email protected]']
) }};

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'sbmtd',
principals = ['serviceAccount:[email protected]']
) }};

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'clean-air-express',
principals = ['serviceAccount:clean-air-payments-user@cal-itp-data-infra.iam.gserviceaccount.com']
) }} ;

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'ccjpa',
principals = ['serviceAccount:[email protected]']
) }};

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'humboldt-transit-authority',
principals = ['serviceAccount:humboldt-transit-authority@cal-itp-data-infra.iam.gserviceaccount.com']
) }};

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'lake-transit-authority',
principals = ['serviceAccount:lake-transit-authority@cal-itp-data-infra.iam.gserviceaccount.com']
) }};

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'mendocino-transit-authority',
principals = ['serviceAccount:mendocino-transit-authority@cal-itp-data-infra.iam.gserviceaccount.com']
) }};

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'redwood-coast-transit',
principals = ['serviceAccount:[email protected]']
) }};

{{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'atn',
principals = ['serviceAccount:[email protected]']
) }};

{{ create_row_access_policy(
principals = ['serviceAccount:[email protected]',
'serviceAccount:[email protected]',
'serviceAccount:github-actions-services-accoun@cal-itp-data-infra.iam.gserviceaccount.com',
'group:[email protected]',
'domain:calitp.org',
'user:[email protected]',
'user:[email protected]',
'user:[email protected]',
]
) }};
-- TODO: In the last policy of the macro call above, see if we can get the prod warehouse service account out of context
{% endmacro %}
32 changes: 32 additions & 0 deletions warehouse/models/intermediate/payments/_int_payments.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
version: 2

models:
- name: int_payments__authorisations_deduped
description: |
This model deduplicates authorisations rows with duplicate `_payments_key` values that require additional handling
beyond what is covered in the staging model, essentially rows that require a join for deduplication.
Columns have the same meanings as in the upstream staging model.
The most important test for this model is on the upstream staging model: to ensure that no
`_payments_key` values are fully dropped betweeen that model and this one (i.e., to ensure
that all rows dropped do in fact have a duplicate and no authorisations are lost.)
columns:
- name: request_type
tests:
- accepted_values:
values: ['AUTHORISATION', 'DEBT_RECOVERY_AUTHCHECK', 'DEBT_RECOVERY_REVERSAL', 'CARD_CHECK']
- name: aggregation_id
tests:
- relationships:
to: ref('int_payments__authorisations_summarized')
field: aggregation_id
- name: int_payments__authorisations_summarized
description: |
This model contains only the most recent authorisations data per `aggregation_id`.
Many aggregations pass through a few different authorisations (for example, perhaps a card check
before getting authorised, or multiple debt recovery attempts.) This model keeps only the most
recent row according to `authorisation_date_time_utc`.
columns:
- name: aggregation_id
tests:
- not_null
- unique
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{{ config(materialized = "table") }}

WITH auth AS (
SELECT *
FROM {{ ref('stg_littlepay__authorisations') }}
),

settlement_rrns AS (
SELECT DISTINCT retrieval_reference_number
FROM {{ ref('stg_littlepay__settlements') }}
),

-- as of 10/10/23, we have two aggregation_id/authorisation_date_time_utc pairs that are duplicates
-- for one of them, we want to keep the one that has an RRN that appears in settlements
-- for the other, neither RRN appears in settlements, so we can just keep the later line number
identify_dups AS (
SELECT
_payments_key,
COUNT(DISTINCT _key) > 1 AS is_dup,
COUNTIF(settlement_rrns.retrieval_reference_number IS NOT NULL) > 0 AS payment_key_has_settlement
FROM auth
LEFT JOIN settlement_rrns USING (retrieval_reference_number)
GROUP BY 1
),

dedupe_criteria AS (
SELECT
auth.*,
is_dup,
payment_key_has_settlement,
settlement_rrns.retrieval_reference_number IS NOT NULL AS has_settlement,
ROW_NUMBER() OVER (PARTITION BY _payments_key ORDER BY littlepay_export_ts DESC, _line_number DESC) AS payments_key_appearance_num,
FROM auth
LEFT JOIN identify_dups USING (_payments_key)
LEFT JOIN settlement_rrns USING (retrieval_reference_number)
),

int_payments__authorisations_deduped AS (
SELECT
participant_id,
aggregation_id,
acquirer_id,
request_type,
transaction_amount,
currency_code,
retrieval_reference_number,
littlepay_reference_number,
external_reference_number,
response_code,
status,
authorisation_date_time_utc,
_line_number,
`instance`,
extract_filename,
littlepay_export_ts,
littlepay_export_date,
ts,
_key,
_payments_key,
_content_hash,
FROM dedupe_criteria
-- filter out duplicate row where RRN doesn't map to a settlement (but its duplicate's RRN does map)
-- and filter out duplicate row where both have RRNs but neither maps to a settlement
WHERE (NOT is_dup) OR (is_dup AND has_settlement) OR (is_dup AND NOT payment_key_has_settlement AND payments_key_appearance_num = 1)
)

SELECT * FROM int_payments__authorisations_deduped
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{{ config(materialized = 'table',) }}

WITH auth AS (
SELECT *
FROM {{ ref('int_payments__authorisations_deduped') }}
),

-- TODO: do we want to add any additional summary columns here?
-- for example: number of attempted authorisations over all?

-- get the payments key values of rows that are the final update for that aggregation ID
final_update AS (
SELECT
_payments_key
FROM auth
QUALIFY ROW_NUMBER() OVER(PARTITION BY aggregation_id ORDER BY authorisation_date_time_utc DESC) = 1
),

int_payments__authorisations_summarized AS (
SELECT
participant_id,
aggregation_id,
acquirer_id,
request_type,
transaction_amount,
currency_code,
retrieval_reference_number,
littlepay_reference_number,
external_reference_number,
response_code,
status,
authorisation_date_time_utc,
_line_number,
`instance`,
extract_filename,
littlepay_export_ts,
littlepay_export_date,
ts,
_key,
_payments_key,
_content_hash,
FROM final_update
LEFT JOIN auth USING(_payments_key)
)

SELECT * FROM int_payments__authorisations_summarized
66 changes: 1 addition & 65 deletions warehouse/models/mart/payments/fct_payments_rides_v2.sql
Original file line number Diff line number Diff line change
@@ -1,70 +1,6 @@
{{ config(
post_hook=[
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'mst',
principals = ['serviceAccount:[email protected]']
) }}",
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'sacrt',
principals = ['serviceAccount:[email protected]']
) }}",
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'sbmtd',
principals = ['serviceAccount:[email protected]']
) }}",
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'clean-air-express',
principals = ['serviceAccount:clean-air-payments-user@cal-itp-data-infra.iam.gserviceaccount.com']
) }}",
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'ccjpa',
principals = ['serviceAccount:[email protected]']
) }}",
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'humboldt-transit-authority',
principals = ['serviceAccount:humboldt-transit-authority@cal-itp-data-infra.iam.gserviceaccount.com']
) }}",
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'lake-transit-authority',
principals = ['serviceAccount:lake-transit-authority@cal-itp-data-infra.iam.gserviceaccount.com']
) }}",
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'mendocino-transit-authority',
principals = ['serviceAccount:mendocino-transit-authority@cal-itp-data-infra.iam.gserviceaccount.com']
) }}",
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'redwood-coast-transit',
principals = ['serviceAccount:[email protected]']
) }}",
" {{ create_row_access_policy(
filter_column = 'participant_id',
filter_value = 'atn',
principals = ['serviceAccount:[email protected]']
) }}",
" {{ create_row_access_policy(
principals = ['serviceAccount:[email protected]',
'serviceAccount:[email protected]',
'serviceAccount:github-actions-services-accoun@cal-itp-data-infra.iam.gserviceaccount.com',
'group:[email protected]',
'domain:calitp.org',
'user:[email protected]',
'user:[email protected]',
'user:[email protected]',
]
) }}",
]

post_hook="{{ payments_row_access_policy() }}"
) }}
-- TODO: In the last policy of the macro call above, see if we can get the prod warehouse service account out of context

WITH

Expand Down
6 changes: 4 additions & 2 deletions warehouse/models/staging/payments/littlepay/_littlepay.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,16 @@ models:
tests:
- not_null
- unique
- &payments_key_fuzzy_uniqueness
name: _payments_key
- name: _payments_key
description: |
Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.)
tests:
- not_null
- unique_proportion:
at_least: 0.999
- relationships:
to: ref('int_payments__authorisations_deduped')
field: _payments_key
- &_content_hash
name: _content_hash
description: |
Expand Down

0 comments on commit 11ec0ee

Please sign in to comment.