-
Notifications
You must be signed in to change notification settings - Fork 0
/
ddl.sql
41 lines (31 loc) · 4.22 KB
/
ddl.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
CREATE OR REPLACE STREAM "FC_CUSTOMER_STREAM" (EMAIL STRING, FIRST_NAME STRING, ID BIGINT, LAST_NAME STRING)
WITH (kafka_topic='fc_customers', partitions=6, key_format='KAFKA', value_format='AVRO');
CREATE OR REPLACE STREAM "FC_CUSTOMER_STREAM_REKEYED"
WITH (kafka_topic='fc_customers_rekeyed', partitions=6, key_format='AVRO', value_format='AVRO')
AS SELECT * FROM "FC_CUSTOMER_STREAM" PARTITION BY ID;
CREATE OR REPLACE TABLE "FC_CUST_TRANS_TRAP" (ACCOUNT_ID BIGINT, ACC_CUST_TYPE STRING PRIMARY KEY, FULL_NAME STRING, TRANSACTION_ID BIGINT, TRANSACTION_TIME STRING, TRANSACTION_TYPE STRING, TRAP_AMT_MULTI DOUBLE, TRAP_DIVIDE_AMT DOUBLE)
WITH (kafka_topic='fc_trans_trap_calc', partitions=6, key_format='AVRO', value_format='AVRO');
CREATE OR REPLACE TABLE "FC_CUST_TRANS_TRAP_SCORES_TABLE"
WITH (kafka_topic='fc_cust_trans_trap_scores_table', partitions=6, key_format='AVRO', value_format='AVRO')
AS SELECT ((TRAP_AMT_MULTI / TRAP_DIVIDE_AMT) / 100) AS TRAP_ESTIMATE, (CASE WHEN (TRANSACTION_TYPE = 'transfer') THEN 'TRANSFER_COMP' WHEN (TRANSACTION_TYPE = 'payment') THEN 'PAYMENT_COMP' END) AS TRAP_NAME, ACCOUNT_ID, ACC_CUST_TYPE, FULL_NAME, TRANSACTION_ID, TRANSACTION_TIME FROM "FC_CUST_TRANS_TRAP";
CREATE OR REPLACE TABLE "FC_PAYMENTS_CHECK" (ACC_CUST_TYPE STRING PRIMARY KEY, CUST_TYPE_AVG_AMT DOUBLE, WINSTART STRING)
WITH (kafka_topic='fc_avg_cust_payment_transactions_5_days', partitions=6, key_format='AVRO', value_format='AVRO');
CREATE OR REPLACE TABLE "FC_CUSTOMERS_TABLE" (EMAIL STRING, FIRST_NAME STRING, ID BIGINT PRIMARY KEY, LAST_NAME STRING)
WITH (kafka_topic='fc_customers_rekeyed', partitions=6, key_format='AVRO', value_format='AVRO');
CREATE OR REPLACE STREAM "BANKAPP_DATA" (FORMDATA STRING)
WITH (kafka_topic='bankapp_data_raw', partitions=6, key_format='KAFKA', value_format='JSON');
CREATE OR REPLACE STREAM "BANKAPP_LIVE"
WITH (kafka_topic='BANKAPP_LIVE', partitions=6, value_format='AVRO')
AS SELECT CAST(EXTRACTJSONFIELD(FORMDATA, '$.ACCOUNT_ID') AS BIGINT) AS ACCOUNT_ID, CAST(EXTRACTJSONFIELD(FORMDATA, '$.AMOUNT') AS DOUBLE) AS AMOUNT, CAST(EXTRACTJSONFIELD(FORMDATA, '$.TRANSACTION_ID') AS BIGINT) AS TRANSACTION_ID, EXTRACTJSONFIELD(FORMDATA, '$.BENEF_COUNTRY_NAME') AS BENEF_COUNTRY_NAME, EXTRACTJSONFIELD(FORMDATA, '$.BENEF_NAME') AS BENEF_NAME, EXTRACTJSONFIELD(FORMDATA, '$.IP_ADDRESS') AS IP_ADDRESS, EXTRACTJSONFIELD(FORMDATA, '$.MERCH_COUNTRY_NAME') AS MERCH_COUNTRY_NAME, EXTRACTJSONFIELD(FORMDATA, '$.MERCH_NAME') AS MERCH_NAME, EXTRACTJSONFIELD(FORMDATA, '$.TRANSACTION_TIME') AS TRANSACTION_TIME, EXTRACTJSONFIELD(FORMDATA, '$.TRANSACTION_TYPE') AS TRANSACTION_TYPE FROM "BANKAPP_DATA";
CREATE OR REPLACE STREAM "FC_TRANSACTION_STREAM_ENRICHED"
WITH (kafka_topic='fc_transactions_enriched', partitions=6, key_format='AVRO', value_format='AVRO')
AS SELECT ((((((CAST(T.ACCOUNT_ID AS STRING) + '_') + C.FIRST_NAME) + '_') + C.LAST_NAME) + '_') + T.TRANSACTION_TYPE) AS ACC_CUST_TYPE, ((C.FIRST_NAME + ' ') + C.LAST_NAME) AS FULL_NAME, T.ACCOUNT_ID AS ACCOUNT_ID, T.AMOUNT AS AMOUNT, T.IP_ADDRESS AS IP_ADDRESS, T.TRANSACTION_ID AS TRANSACTION_ID, T.TRANSACTION_TIME AS TRANSACTION_TIME, T.TRANSACTION_TYPE AS TRANSACTION_TYPE FROM "BANKAPP_LIVE" T INNER JOIN "FC_CUSTOMERS_TABLE" C
ON T.ACCOUNT_ID = C.ID;
CREATE OR REPLACE STREAM "FC_TRANS_TRAP_CALC"
WITH (kafka_topic='fc_trans_trap_calc', partitions=6, key_format='AVRO', value_format='AVRO')
AS SELECT ((4 * FTSE.AMOUNT) / FPC.CUST_TYPE_AVG_AMT) AS TRAP_DIVIDE_AMT, (4 * FTSE.AMOUNT) AS TRAP_AMT_MULTI, FTSE.ACCOUNT_ID AS ACCOUNT_ID, FTSE.ACC_CUST_TYPE AS ACC_CUST_TYPE, FTSE.FULL_NAME AS FULL_NAME, FTSE.TRANSACTION_ID AS TRANSACTION_ID, FTSE.TRANSACTION_TIME AS TRANSACTION_TIME, FTSE.TRANSACTION_TYPE AS TRANSACTION_TYPE FROM "FC_TRANSACTION_STREAM_ENRICHED" FTSE INNER JOIN "FC_PAYMENTS_CHECK" FPC
ON FTSE.ACC_CUST_TYPE = FPC.ACC_CUST_TYPE
PARTITION BY FTSE.ACC_CUST_TYPE;
CREATE OR REPLACE TABLE "FC_CUST_TYPE_AVG_TUMBLING_5_DAYS"
WITH (kafka_topic='fc_avg_cust_payment_transactions_5_days', partitions=6, key_format='AVRO', value_format='AVRO')
AS SELECT ACC_CUST_TYPE, AVG(AMOUNT) AS CUST_TYPE_AVG_AMT, TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WINSTART FROM "FC_TRANSACTION_STREAM_ENRICHED" WINDOW TUMBLING (SIZE 5 DAYS) GROUP BY ACC_CUST_TYPE;