Skip to content

Commit

Permalink
improving callback-url-error checking and log
Browse files Browse the repository at this point in the history
  • Loading branch information
ninlil committed Apr 6, 2023
1 parent 6b96e83 commit 0612663
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 508 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ bin/
Makefile.inc
*.secret
*.cfg
version.txt
version.txt
tests/
23 changes: 17 additions & 6 deletions callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"
"fmt"
"net/http"
"net/url"
"os"
"time"

Expand Down Expand Up @@ -37,23 +38,33 @@ func (hlog *httpLogger) Warn(format string, v ...interface{}) {
log.Warn().Msgf(hlog.Format(format, v...))
}

func expandMap(txt string, vars map[string]string) string {
return os.Expand(txt, func(key string) string {
return vars[key]
})
}

func (callback *melpCallback) Send(message *Message) error {

netClient.Logger = new(httpLogger)

log.Trace().Msgf("Send-> preparing to send message...")
log.Trace().Msgf("Send-> preparing to send message to '%s'...", callback.URL)

url := os.Expand(callback.URL, func(key string) string {
return message.Metadata[key]
})
// target := os.Expand(callback.URL, func(key string) string {
// return message.Metadata[key]
// })
target := expandMap(callback.URL, message.Metadata)
if _, err := url.Parse(target); err != nil {
return fmt.Errorf("invalid url: %s", target)
}

log.Trace().Msgf("Send-> url = '%s'", url)
log.Trace().Msgf("Send-> url = '%s'", target)

//buffer := bytes.NewBuffer(message.Body)

//resp, err := netClient.Post(url, message.ContentType(), buffer)

req, err := retryablehttp.NewRequest("POST", url, message.Body)
req, err := retryablehttp.NewRequest("POST", target, message.Body)
if err != nil {
log.Error().Msgf("create request failed: %v", err)
return err
Expand Down
40 changes: 40 additions & 0 deletions config/melp-disabled.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
apiVersion: melp/v1alpha1

# REST-endpoint -> Send message
output:
kafka:
- endpoint: ${ENDPOINT}
key: ${OUTPUT_KEY}
secret: ${OUTPUT_SECRET}
topic: ${TOPIC}
id: xyzzy
#disabled: true
auth:
anon: true
# bearer: ${OUTPUT_BEARER}
# basic:
# kalle: ada
# olle: gun

# Incoming message -> REST-endpoint
input:
kafka:
- endpoint: ${ENDPOINT}
key: ${INPUT_KEY}
secret: ${INPUT_SECRET}
group: ${CONSUMERGROUP}
topics:
- ${TOPIC}
id: read-hb
disabled: true
callback:
# url: http://localhost:9090/dump?topic=${topic}
# url: http://localhost:9090/fail?topic=${topic}&status=200&accept=44
url: http://localhost:9090/retry/xyzzy
# auth:
# bearer: 1234abcd
# basic:
# kalle: ada


# fail: true
1 change: 1 addition & 0 deletions config/melp-empty.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
apiVersion: melp/v1alpha1
3 changes: 0 additions & 3 deletions examples/DOCKER.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,3 @@ docker run --rm -d --restart unless-stopped `
```sh
docker run --rm lindex/melp melp -h
```

## Powershell considerations
You need to change the command to a one-liner, or replace ending the '\' to an \` character
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/Shopify/sarama v1.38.1
github.com/alexflint/go-arg v1.4.3
github.com/hashicorp/go-retryablehttp v0.7.2
github.com/ninlil/butler v0.4.2
github.com/ninlil/butler v0.5.0
github.com/rs/zerolog v1.29.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -19,8 +19,8 @@ require (
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-chi/chi v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
Expand All @@ -30,13 +30,15 @@ require (
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/justinas/alice v1.2.0 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/klauspost/compress v1.16.4 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/rs/xid v1.4.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/sys v0.7.0 // indirect
)
506 changes: 13 additions & 493 deletions go.sum

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions kafka-input.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net/url"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -77,6 +78,14 @@ func (r *kafkaReceiver) Validate() ([]error, bool) {
errs = append(errs, requiredError(URL))
}

vars := map[string]string{
"topic": "topic",
}
target := expandMap(r.Callback.URL, vars)
if _, err := url.Parse(target); err != nil {
errs = append(errs, fmt.Errorf("invalid URL: %v", err))
}

return errs, true
}

Expand Down

0 comments on commit 0612663

Please sign in to comment.