-
Notifications
You must be signed in to change notification settings - Fork 0
/
callback.go
124 lines (99 loc) · 3 KB
/
callback.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main
import (
"encoding/base64"
"fmt"
"net/http"
"net/url"
"time"
retryablehttp "github.com/hashicorp/go-retryablehttp"
"github.com/ninlil/envsubst"
"github.com/rs/zerolog/log"
)
var netClient = retryablehttp.NewClient()
func init() {
netClient.RetryMax = 5
netClient.RetryWaitMin = time.Millisecond * 500
netClient.RetryWaitMax = time.Second * 3
}
type httpLogger struct{}
func (hlog *httpLogger) Format(format string, v ...interface{}) string {
return fmt.Sprintf("http-client: "+format, v...)
}
func (hlog *httpLogger) Error(format string, v ...interface{}) {
log.Error().Msg(hlog.Format(format, v...))
}
func (hlog *httpLogger) Info(format string, v ...interface{}) {
log.Info().Msgf(hlog.Format(format, v...))
}
func (hlog *httpLogger) Debug(format string, v ...interface{}) {
log.Debug().Msgf(hlog.Format(format, v...))
}
func (hlog *httpLogger) Warn(format string, v ...interface{}) {
log.Warn().Msgf(hlog.Format(format, v...))
}
func expandMap(txt string, vars map[string]string) string {
envsubst.SetPrefix('%')
str, err := envsubst.ConvertString(txt, func(key string) (string, bool) {
v, ok := vars[key]
return v, ok
})
if err != nil {
log.Error().Msgf("expandMap-error: %v", err)
}
return str
}
var melpUserAgent = fmt.Sprintf("melp-%s", versionFunc())
func (callback *melpCallback) Send(message *Message) error {
netClient.Logger = new(httpLogger)
log.Trace().Msgf("Send-> preparing to send message to '%s'...", callback.URL)
// target := os.Expand(callback.URL, func(key string) string {
// return message.Metadata[key]
// })
target := expandMap(callback.URL, message.Metadata)
if _, err := url.Parse(target); err != nil {
return fmt.Errorf("invalid url: %s", target)
}
log.Trace().Msgf("Send-> url = '%s'", target)
//buffer := bytes.NewBuffer(message.Body)
//resp, err := netClient.Post(url, message.ContentType(), buffer)
req, err := retryablehttp.NewRequest("POST", target, message.Body)
if err != nil {
log.Error().Msgf("create request failed: %v", err)
return err
}
if callback.Auth != nil {
if callback.Auth.Bearer != "" {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", callback.Auth.Bearer))
}
if len(callback.Auth.Basic) > 0 {
var text string
for k, v := range callback.Auth.Basic {
text = fmt.Sprintf("%s:%s", k, v)
}
basic := base64.StdEncoding.EncodeToString([]byte(text))
req.Header.Add("Authorization", fmt.Sprintf("Basic %s", basic))
}
}
req.Header.Add("User-Agent", melpUserAgent)
for k, v := range callback.Headers {
req.Header.Add(k, v)
}
for k, v := range message.Metadata {
req.Header.Add(fmt.Sprintf("melp-%s", k), v)
}
for k, v := range message.Headers {
req.Header.Add(k, v)
}
resp, err := netClient.Do(req)
if err != nil {
log.Error().Msgf("send failed: %v", err)
return err
}
if resp != nil {
log.Debug().Msgf("Send-> send-response = %d", resp.StatusCode)
}
if err == nil && (resp.StatusCode < http.StatusOK || resp.StatusCode > 299) {
return fmt.Errorf(resp.Status)
}
return err
}