In this recipe, we aggregate credit card transactions for each customer over a 2 hour period and join it with the customer’s average credit card spend. If the total credit card spend over the past 2 hours is more than the average credit card usage of a customer, the account will be flagged as a possible case of credit card theft.
-
Docker
-
If running on Mac/Windows, at least 4GB allocated to Docker:
docker system info | grep Memory
Should return a value greater than 8GB - if not, the Kafka stack will probably not work.
Minimum version is Confluent Platform 5.0
-
Clone this repository
git clone https://github.com/confluentinc/ksql-recipes-try-it-at-home.git
-
Launch:
cd ksql-recipes-try-it-at-home/detecting-unusual-card-activity docker-compose up -d
-
Run KSQL CLI:
docker-compose exec ksql-cli ksql http://ksql-server:8088
-
Register the existing
transactions
topic as a KSQL stream:CREATE STREAM TRANSACTIONS_RAW (ACCOUNT_ID VARCHAR, \ TIMESTAMP VARCHAR, \ CARD_TYPE VARCHAR, \ AMOUNT DOUBLE, \ IP_ADDRESS VARCHAR, \ TRANSACTION_ID VARCHAR) \ WITH (KAFKA_TOPIC='transactions',\ VALUE_FORMAT='JSON');
-
Re-partition the stream on
account_id
, and use Avro for the target stream (this is optional):CREATE STREAM TRANSACTIONS_SOURCE \ WITH (VALUE_FORMAT='AVRO') AS \ SELECT * \ FROM TRANSACTIONS_RAW \ PARTITION BY ACCOUNT_ID;
-
Register the existing stream of customer data from Oracle in the topic
customers
as a KSQL stream:CREATE STREAM CUST_RAW_STREAM (ID BIGINT, \ FIRST_NAME VARCHAR, \ LAST_NAME VARCHAR, \ EMAIL VARCHAR, \ AVG_CREDIT_SPEND DOUBLE) \ WITH (KAFKA_TOPIC='customers', \ VALUE_FORMAT='JSON');
-
Re-partition the customer data stream by
account_id
to prepare for the join, and use Avro for the target stream (this is optional):CREATE STREAM CUSTOMER_REKEYED \ WITH (VALUE_FORMAT='AVRO') AS \ SELECT * \ FROM CUST_RAW_STREAM \ PARTITION BY ID;
-
Register the partitioned customer data topic as a KSQL Table used for the join with the incoming stream of transactions:
CREATE TABLE customer \ WITH (KAFKA_TOPIC='CUSTOMER_REKEYED', \ VALUE_FORMAT='AVRO', \ KEY='ID');
-
Join the transactions to customer information
CREATE STREAM TRANSACTIONS_ENRICHED AS \ SELECT T.ACCOUNT_ID, T.CARD_TYPE, T.AMOUNT, \ C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, \ C.AVG_CREDIT_SPEND \ FROM TRANSACTIONS_SOURCE T \ INNER JOIN CUSTOMER C \ ON T.ACCOUNT_ID = C.ID;
-
Aggregate the stream of transactions for each account ID using a 2 hour tumbling window, and filter for accounts in which the total spend in a two hour period is greater than the customer’s average
CREATE TABLE POSSIBLE_STOLEN_CARD AS \ SELECT TIMESTAMPTOSTRING(WindowStart(), 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START, \ T.ACCOUNT_ID, T.CARD_TYPE, SUM(T.AMOUNT) AS TOTAL_CREDIT_SPEND, \ T.FULL_NAME, MAX(T.AVG_CREDIT_SPEND) AS AVG_CREDIT_SPEND \ FROM TRANSACTIONS_ENRICHED T \ WINDOW TUMBLING (SIZE 2 HOURS) \ GROUP BY T.ACCOUNT_ID, T.CARD_TYPE, T.FULL_NAME \ HAVING SUM(T.AMOUNT) > MAX(T.AVG_CREDIT_SPEND) ;
Examine the output:
ksql> SELECT WINDOW_START, ACCOUNT_ID, CARD_TYPE, \ TOTAL_CREDIT_SPEND, FULL_NAME, AVG_CREDIT_SPEND \ FROM POSSIBLE_STOLEN_CARD; 2019-01-11 16:00:00 +0000 | 100019 | jcb | 90.69 | Horatius Keefe | 60.58 2019-01-11 16:00:00 +0000 | 100012 | mastercard | 84.04 | Juditha Shwalbe | 53.94 2019-01-11 16:00:00 +0000 | 100016 | maestro | 76.01 | Milo Drewes | 68.33 2019-01-11 16:00:00 +0000 | 100035 | visa-electron | 69.61 | Roxine Furminger | 59.68 …