This repository has been archived by the owner on Aug 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi_helper.go
225 lines (207 loc) · 6.37 KB
/
api_helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidAnalytics
import (
"compress/gzip"
"encoding/json"
"io"
"net/http"
"strings"
"time"
)
/*
Implements all the helper methods needed to process the POST /analytics payload
and send it to the internal buffer channel
*/
type developerInfo struct {
ApiProduct string
DeveloperApp string
DeveloperEmail string
Developer string
}
type axRecords struct {
Tenant tenant
// Records is an array of multiple analytics records
Records []interface{}
}
type tenant struct {
Org string
Env string
}
func getJsonBody(r *http.Request) (map[string]interface{}, errResponse) {
var gzipEncoded bool
if r.Header.Get("Content-Encoding") != "" {
if !strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") {
return nil, errResponse{
ErrorCode: "UNSUPPORTED_CONTENT_ENCODING",
Reason: "Only supported content encoding is gzip"}
} else {
gzipEncoded = true
}
}
var reader io.ReadCloser
var err error
if gzipEncoded {
reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
if err != nil {
return nil, errResponse{
ErrorCode: "BAD_DATA",
Reason: "Gzip Encoded data cannot be read"}
}
} else {
reader = r.Body
}
var raw map[string]interface{}
decoder := json.NewDecoder(reader) // Decode payload to JSON data
decoder.UseNumber()
if err := decoder.Decode(&raw); err != nil {
return nil, errResponse{ErrorCode: "BAD_DATA",
Reason: "Not a valid JSON payload"}
}
return raw, errResponse{}
}
/*
Get tenant from payload based on the 2 required fields - organization and environment
*/
func getTenantFromPayload(raw map[string]interface{}) (tenant, errResponse) {
elems := []string{"organization", "environment"}
for _, elem := range elems {
if raw[elem] == nil || raw[elem].(string) == "" {
return tenant{}, errResponse{
ErrorCode: "MISSING_FIELD",
Reason: "Missing Required field: " + elem}
}
}
org := raw["organization"].(string)
env := raw["environment"].(string)
return tenant{Org: org, Env: env}, errResponse{}
}
func validateEnrichPublish(tenant tenant, raw map[string]interface{}) errResponse {
if records := raw["records"]; records != nil {
records, isArray := records.([]interface{})
if !isArray {
return errResponse{
ErrorCode: "BAD_DATA",
Reason: "records should be a list of analytics records"}
}
if len(records) == 0 {
return errResponse{
ErrorCode: "NO_RECORDS",
Reason: "No analytics records in the payload"}
}
// Iterate through each record to validate and enrich it
for _, eachRecord := range records {
recordMap, isMap := eachRecord.(map[string]interface{})
if !isMap {
return errResponse{
ErrorCode: "BAD_DATA",
Reason: "Each Analytics record in records should be a json object"}
}
valid, err := validate(recordMap)
if valid {
enrich(recordMap, tenant)
} else {
// Even if there is one bad record, then reject entire batch
return err
}
}
axRecords := axRecords{
Tenant: tenant,
Records: records}
// publish batch of records to channel (blocking call)
internalBuffer <- axRecords
} else {
return errResponse{
ErrorCode: "NO_RECORDS",
Reason: "No analytics records in the payload"}
}
return errResponse{}
}
/*
Does basic validation on each analytics message
1. client_received_start_timestamp, client_received_end_timestamp should exist
2. client_received_start_timestamp, client_received_end_timestamp should be a number
3. client_received_end_timestamp should be > client_received_start_timestamp and not 0
*/
func validate(recordMap map[string]interface{}) (bool, errResponse) {
elems := []string{"client_received_start_timestamp", "client_received_end_timestamp"}
for _, elem := range elems {
if recordMap[elem] == nil {
return false, errResponse{
ErrorCode: "MISSING_FIELD",
Reason: "Missing Required field: " + elem}
}
}
crst, exists1 := recordMap["client_received_start_timestamp"]
cret, exists2 := recordMap["client_received_end_timestamp"]
if exists1 && exists2 {
crst, isNumber1 := crst.(json.Number)
cret, isNumber2 := cret.(json.Number)
if !isNumber1 || !isNumber2 {
return false, errResponse{
ErrorCode: "BAD_DATA",
Reason: "client_received_start_timestamp and " +
"client_received_end_timestamp has to be number"}
} else if crst == json.Number("0") || cret == json.Number("0") {
return false, errResponse{
ErrorCode: "BAD_DATA",
Reason: "client_received_start_timestamp or " +
"client_received_end_timestamp cannot be 0"}
} else if crst > cret {
return false, errResponse{
ErrorCode: "BAD_DATA",
Reason: "client_received_start_timestamp " +
"> client_received_end_timestamp"}
} else {
ts, _ := crst.Int64()
crstTime := time.Unix(ts/1000, 0) // Convert crst(ms) to seconds
diff := time.Now().UTC().Sub(crstTime)
if diff <= 0 {
return false, errResponse{
ErrorCode: "BAD_DATA",
Reason: "client_received_start_timestamp " +
"cannot be after current time"}
} else if diff.Hours() > 90*24 { // 90 Days
return false, errResponse{
ErrorCode: "BAD_DATA",
Reason: "client_received_start_timestamp " +
"cannot be older than 90 days"}
} else {
return true, errResponse{}
}
}
}
return true, errResponse{}
}
/*
Enrich each record by adding org and env fields
*/
func enrich(recordMap map[string]interface{}, tenant tenant) {
// Always overwrite organization/environment value with the tenant information provided in the payload
recordMap["organization"] = tenant.Org
recordMap["environment"] = tenant.Env
}
func writeError(w http.ResponseWriter, status int, code string, reason string) {
w.WriteHeader(status)
e := errResponse{
ErrorCode: code,
Reason: reason,
}
bytes, err := json.Marshal(e)
if err != nil {
log.Errorf("unable to marshal errorResponse: %v", err)
} else {
w.Write(bytes)
}
}