-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathttd_uid2.dig
156 lines (135 loc) · 4.45 KB
/
ttd_uid2.dig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
_export:
# pytd SDK Config
pytd:
apiserver: https://api.treasuredata.com/
# TTD API Config
ttd:
parallel_max: 10 # Max parallel allowed by TTD is 10 (ten), probably will never change
environment: operator-integ.uidapi.com
url_map: /v2/identity/map
url_buckets: /v2/identity/buckets
# TTD/TD Integration Database
td_uid2_env:
db: cc_ttd_uid2_001
# TD Source Tables for DII/UID2 Mapping
!include : config/td_uid2_src_lst.yaml
#
# Installation Phase
#
+ttd_wf_install:
+create_database:
td_ddl>:
create_databases: ["${td_uid2_env.db}"]
database: ${td_uid2_env.db}
+create_tables:
td_ddl>:
create_tables: [ttd_uid2_rqst, ttd_uid2_resp]
database: ${td_uid2_env.db}
+create_schema:
td>: sql/install_schema.sql
database: ${td_uid2_env.db}
#
# Collection Phase
#
+ttd_wf_dii_collect:
for_each>:
td_uid2_src: ${td_uid2_src_lst}
_parallel:
limit: 8
_do:
+echo_dii_src:
echo>: Collect all DII from ${td_uid2_src.src_db} - ${td_uid2_src.src_tbl} - ${td_uid2_src.src_id_col} - ${td_uid2_src.src_dii_col} - ${td_uid2_src.src_dii_typ}
+dii_collect_src:
td>: sql/uid2_collect_new.sql
database: ${td_uid2_env.db}
insert_into: ttd_uid2_ids
#
# Bucket Rotation Phase
#
+ttd_wf_bucket_rotation:
+ttd_bucket_since_timestamp:
td>: sql/uid2_bucket_since_timestamp.sql
database: ${td_uid2_env.db}
store_last_results: true
+ttd_bucket_since_timestamp_echo:
echo>: TTD Bucket Since Timestamp [${td.last_results.since_timestamp}]
+ttd_stale_bucket_fetch:
py>: py_scripts.uid2_request.post_bucket_requests
since_ts: ${td.last_results.since_timestamp}
db: ${td_uid2_env.db}
url: https://${ttd.environment}${ttd.url_buckets}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:pytd.apikey}
TD_API_SERVER: ${pytd.apiserver}
TTD_API_KEY: ${secret:ttd.apikey}
TTD_CLIENT_SECRET: ${secret:ttd.clientsecret}
# Refresh not used yet
# TDD_REFRESH_TOKEN: ${secret:ttd.refreshtoken}
# TDD_REFRESH_RESPONSE_KEY: ${secret:ttd.refreshresponsekey}
+bucket_rot_insert:
td>: sql/uid2_stale_bucket_insert.sql
database: ${td_uid2_env.db}
insert_into: ttd_uid2_ids
+bucket_rot_delete:
td>: sql/uid2_stale_bucket_delete.sql
database: ${td_uid2_env.db}
#
# UID2 Mapping Phase
#
+ttd_wf_uid2_map:
+uid2_requests_generate:
_parallel: true
+uid2_requests_generate_email:
td>: sql/uid2_requests_email_generate.sql
database: ${td_uid2_env.db}
insert_into: ttd_uid2_rqst
+uid2_requests_generate_phone:
td>: sql/uid2_requests_phone_generate.sql
database: ${td_uid2_env.db}
insert_into: ttd_uid2_rqst
+ttd_uid2_fetch_parallel:
for_range>:
step: 1
from: 0
to: ${ttd.parallel_max}
_parallel: true
_do:
+echo_parallel:
echo>: UID2 Parallel [${td_uid2_env.db}], [https://${ttd.environment}${ttd.url_map}], [${ttd.parallel_max}], [${range.index}]
+ttd_uid2_fetch:
py>: py_scripts.uid2_request.post_uid2_requests
db: ${td_uid2_env.db}
url: https://${ttd.environment}${ttd.url_map}
mod_div: ${ttd.parallel_max}
mod_idx: ${range.index}
docker:
image: "digdag/digdag-python:3.9"
_env:
TD_API_KEY: ${secret:pytd.apikey}
TD_API_SERVER: ${pytd.apiserver}
TTD_API_KEY: ${secret:ttd.apikey}
TTD_CLIENT_SECRET: ${secret:ttd.clientsecret}
# Refresh not used yet
# TDD_REFRESH_TOKEN: ${secret:ttd.refreshtoken}
# TDD_REFRESH_RESPONSE_KEY: ${secret:ttd.refreshresponsekey}
+uid2_response_insert:
_export:
# 1 Indicates current as of last WF run, will potentially get updated per salt-bucket rotations in future WF runs
is_current: 1
td>: sql/uid2_response_insert.sql
database: ${td_uid2_env.db}
insert_into: ttd_uid2_ids
+uid2_response_archive:
_export:
# -1 Indicates archived status, archive records are intended to be immutable
# Future UID2 for given DII record in [ttd_uid2_ids] table may change per salt bucket rotation
# But the archive records in [ttd_uid2_ids_archive] table will remain unchanged
is_current: -1
td>: sql/uid2_response_insert.sql
database: ${td_uid2_env.db}
insert_into: ttd_uid2_ids_archive
+uid2_response_delete_stale:
td>: sql/uid2_response_delete_stale.sql
database: ${td_uid2_env.db}