forked from mailgun/mailgun-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
418 lines (379 loc) · 12.1 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
package mailgun
import (
"fmt"
"time"
"github.com/pkg/errors"
)
// Events are open-ended, loosely-defined JSON documents.
// They will always have an event and a timestamp field, however.
type Event map[string]interface{}
// Parse the timestamp field for this event into a Time object
func (e Event) ParseTimeStamp() (time.Time, error) {
obj, ok := e["timestamp"]
if !ok {
return time.Time{}, errors.New("'timestamp' field not found in event")
}
timestamp, ok := obj.(float64)
if !ok {
return time.Time{}, errors.New("'timestamp' field not a float64")
}
microseconds := int64(timestamp * 1000000)
return time.Unix(0, microseconds*int64(time.Microsecond/time.Nanosecond)).UTC(), nil
}
func (e Event) ParseMessageId() (string, error) {
message, err := toMapInterface("message", e)
if err != nil {
return "", err
}
headers, err := toMapInterface("headers", message)
if err != nil {
return "", err
}
return headers["message-id"].(string), nil
}
// noTime always equals an uninitialized Time structure.
// It's used to detect when a time parameter is provided.
var noTime time.Time
// GetEventsOptions lets the caller of GetEvents() specify how the results are to be returned.
// Begin and End time-box the results returned.
// ForceAscending and ForceDescending are used to force Mailgun to use a given traversal order of the events.
// If both ForceAscending and ForceDescending are true, an error will result.
// If none, the default will be inferred from the Begin and End parameters.
// Limit caps the number of results returned. If left unspecified, Mailgun assumes 100.
// Compact, if true, compacts the returned JSON to minimize transmission bandwidth.
// Otherwise, the JSON is spaced appropriately for human consumption.
// Filter allows the caller to provide more specialized filters on the query.
// Consult the Mailgun documentation for more details.
type EventsOptions struct {
Begin, End time.Time
ForceAscending, ForceDescending, Compact bool
Limit int
Filter map[string]string
ThresholdAge time.Duration
PollInterval time.Duration
}
// Depreciated See `ListEvents()`
type GetEventsOptions struct {
Begin, End time.Time
ForceAscending, ForceDescending, Compact bool
Limit int
Filter map[string]string
}
// EventIterator maintains the state necessary for paging though small parcels of a larger set of events.
type EventIterator struct {
events []Event
NextURL, PrevURL, FirstURL, LastURL string
mg Mailgun
err error
}
// NewEventIterator creates a new iterator for events.
// Use GetFirstPage to retrieve the first batch of events.
// Use GetNext and GetPrevious thereafter as appropriate to iterate through sets of data.
//
// *This call is Deprecated, use ListEvents() instead*
func (mg *MailgunImpl) NewEventIterator() *EventIterator {
return &EventIterator{mg: mg}
}
// Create an new iterator to fetch a page of events from the events api
// it := mg.ListEvents(EventsOptions{})
// var events []Event
// for it.Next(&events) {
// for _, event := range events {
// // Do things with events
// }
// }
// if it.Err() != nil {
// log.Fatal(it.Err())
// }
func (mg *MailgunImpl) ListEvents(opts *EventsOptions) *EventIterator {
req := newHTTPRequest(generateApiUrl(mg, eventsEndpoint))
if opts != nil {
if opts.Limit != 0 {
req.addParameter("limit", fmt.Sprintf("%d", opts.Limit))
}
if opts.Compact {
req.addParameter("pretty", "no")
}
if opts.ForceAscending {
req.addParameter("ascending", "yes")
}
if opts.ForceDescending {
req.addParameter("ascending", "no")
}
if opts.Begin != noTime {
req.addParameter("begin", formatMailgunTime(&opts.Begin))
}
if opts.End != noTime {
req.addParameter("end", formatMailgunTime(&opts.End))
}
if opts.Filter != nil {
for k, v := range opts.Filter {
req.addParameter(k, v)
}
}
}
url, err := req.generateUrlWithParameters()
return &EventIterator{
mg: mg,
NextURL: url,
FirstURL: url,
PrevURL: "",
err: err,
}
}
// If an error occurred during iteration `Err()` will return non nil
func (ei *EventIterator) Err() error {
return ei.err
}
// Events returns the most recently retrieved batch of events.
// The length is guaranteed to fall between 0 and the limit set in the GetEventsOptions structure passed to GetFirstPage.
func (ei *EventIterator) Events() []Event {
return ei.events
}
// GetFirstPage retrieves the first batch of events, according to your criteria.
// See the GetEventsOptions structure for more details on how the fields affect the data returned.
func (ei *EventIterator) GetFirstPage(opts GetEventsOptions) error {
if opts.ForceAscending && opts.ForceDescending {
return fmt.Errorf("collation cannot at once be both ascending and descending")
}
payload := newUrlEncodedPayload()
if opts.Limit != 0 {
payload.addValue("limit", fmt.Sprintf("%d", opts.Limit))
}
if opts.Compact {
payload.addValue("pretty", "no")
}
if opts.ForceAscending {
payload.addValue("ascending", "yes")
}
if opts.ForceDescending {
payload.addValue("ascending", "no")
}
if opts.Begin != noTime {
payload.addValue("begin", formatMailgunTime(&opts.Begin))
}
if opts.End != noTime {
payload.addValue("end", formatMailgunTime(&opts.End))
}
if opts.Filter != nil {
for k, v := range opts.Filter {
payload.addValue(k, v)
}
}
url, err := generateParameterizedUrl(ei.mg, eventsEndpoint, payload)
if err != nil {
return err
}
return ei.fetch(url)
}
// Retrieves the chronologically previous batch of events, if any exist.
// You know you're at the end of the list when len(Events())==0.
func (ei *EventIterator) GetPrevious() error {
return ei.fetch(ei.PrevURL)
}
// Retrieves the chronologically next batch of events, if any exist.
// You know you're at the end of the list when len(Events())==0.
func (ei *EventIterator) GetNext() error {
return ei.fetch(ei.NextURL)
}
// Retrieves the next page of events from the api. Returns false when there
// no more pages to retrieve or if there was an error. Use `.Err()` to retrieve
// the error
func (ei *EventIterator) Next(events *[]Event) bool {
if ei.err != nil {
return false
}
ei.err = ei.fetch(ei.NextURL)
if ei.err != nil {
return false
}
*events = ei.events
if len(ei.events) == 0 {
return false
}
return true
}
// Retrieves the first page of events from the api. Returns false if there
// was an error. It also sets the iterator object to the first page.
// Use `.Err()` to retrieve the error.
func (ei *EventIterator) First(events *[]Event) bool {
if ei.err != nil {
return false
}
ei.err = ei.fetch(ei.FirstURL)
if ei.err != nil {
return false
}
*events = ei.events
return true
}
// Retrieves the last page of events from the api.
// Calling Last() is invalid unless you first call First() or Next()
// Returns false if there was an error. It also sets the iterator object
// to the last page. Use `.Err()` to retrieve the error.
func (ei *EventIterator) Last(events *[]Event) bool {
if ei.err != nil {
return false
}
ei.err = ei.fetch(ei.LastURL)
if ei.err != nil {
return false
}
*events = ei.events
return true
}
// Retrieves the previous page of events from the api. Returns false when there
// no more pages to retrieve or if there was an error. Use `.Err()` to retrieve
// the error if any
func (ei *EventIterator) Previous(events *[]Event) bool {
if ei.err != nil {
return false
}
if ei.PrevURL == "" {
return false
}
ei.err = ei.fetch(ei.PrevURL)
if ei.err != nil {
return false
}
*events = ei.events
if len(ei.events) == 0 {
return false
}
return true
}
// EventPoller maintains the state necessary for polling events
type EventPoller struct {
it *EventIterator
opts EventsOptions
thresholdTime time.Time
sleepUntil time.Time
mg Mailgun
err error
}
// Poll the events api and return new events as they occur
// it = mg.PollEvents(&EventsOptions{
// // Poll() returns after this threshold is met, or events older than this threshold appear
// ThresholdAge: time.Second * 10,
// // Only events with a timestamp after this date/time will be returned
// Begin: time.Now().Add(time.Second * -3),
// // How often we poll the api for new events
// PollInterval: time.Second * 4})
// var events []Event
// // Blocks until new events appear
// for it.Poll(&events) {
// for _, event := range(events) {
// fmt.Printf("Event %+v\n", event)
// }
// }
// if it.Err() != nil {
// log.Fatal(it.Err())
// }
func (mg *MailgunImpl) PollEvents(opts *EventsOptions) *EventPoller {
now := time.Now()
// ForceAscending must be set
opts.ForceAscending = true
// Default begin time is 30 minutes ago
if opts.Begin == noTime {
opts.Begin = now.Add(time.Minute * -30)
}
// Default threshold age is 30 minutes
if opts.ThresholdAge.Nanoseconds() == 0 {
opts.ThresholdAge = time.Duration(time.Minute * 30)
}
// Set a 15 second poll interval if none set
if opts.PollInterval.Nanoseconds() == 0 {
opts.PollInterval = time.Duration(time.Second * 15)
}
return &EventPoller{
it: mg.ListEvents(opts),
opts: *opts,
mg: mg,
}
}
// If an error occurred during polling `Err()` will return non nil
func (ep *EventPoller) Err() error {
return ep.err
}
func (ep *EventPoller) Poll(events *[]Event) bool {
var currentPage string
ep.thresholdTime = time.Now().UTC().Add(ep.opts.ThresholdAge)
for {
if ep.sleepUntil != noTime {
// Sleep the rest of our duration
time.Sleep(ep.sleepUntil.Sub(time.Now()))
}
// Remember our current page url
currentPage = ep.it.NextURL
// Attempt to get a page of events
var page []Event
if ep.it.Next(&page) == false {
if ep.it.Err() == nil && len(page) == 0 {
// No events, sleep for our poll interval
ep.sleepUntil = time.Now().Add(ep.opts.PollInterval)
continue
}
ep.err = ep.it.Err()
return false
}
// Last event on the page
lastEvent := page[len(page)-1]
timeStamp, err := lastEvent.ParseTimeStamp()
if err != nil {
ep.err = errors.Wrap(err, "event timestamp error")
return false
}
// Record the next time we should query for new events
ep.sleepUntil = time.Now().Add(ep.opts.PollInterval)
// If the last event on the page is older than our threshold time
// or we have been polling for longer than our threshold time
if timeStamp.After(ep.thresholdTime) || time.Now().UTC().After(ep.thresholdTime) {
ep.thresholdTime = time.Now().UTC().Add(ep.opts.ThresholdAge)
// Return the page of events to the user
*events = page
return true
}
// Since we didn't find an event older than our
// threshold, fetch this same page again
ep.it.NextURL = currentPage
}
}
// GetFirstPage, GetPrevious, and GetNext all have a common body of code.
// fetch completes the API fetch common to all three of these functions.
func (ei *EventIterator) fetch(url string) error {
r := newHTTPRequest(url)
r.setClient(ei.mg.Client())
r.setBasicAuth(basicAuthUser, ei.mg.ApiKey())
var response map[string]interface{}
err := getResponseFromJSON(r, &response)
if err != nil {
return err
}
items := response["items"].([]interface{})
ei.events = make([]Event, len(items))
for i, item := range items {
ei.events[i] = item.(map[string]interface{})
}
pagings := response["paging"].(map[string]interface{})
links := make(map[string]string, len(pagings))
for key, page := range pagings {
links[key] = page.(string)
}
ei.NextURL = links["next"]
ei.PrevURL = links["previous"]
ei.FirstURL = links["first"]
ei.LastURL = links["last"]
return err
}
func toMapInterface(field string, thingy map[string]interface{}) (map[string]interface{}, error) {
var empty map[string]interface{}
obj, ok := thingy[field]
if !ok {
return empty, errors.Errorf("'%s' field not found in event", field)
}
result, ok := obj.(map[string]interface{})
if !ok {
return empty, errors.Errorf("'%s' field not a map[string]interface{}", field)
}
return result, nil
}