Skip to content

Commit

Permalink
feat: more mqtt logging
Browse files Browse the repository at this point in the history
  • Loading branch information
fritterhoff committed Nov 21, 2023
1 parent 080fb05 commit 7ceba45
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
2 changes: 2 additions & 0 deletions acme/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,13 @@ func http01Validate(ctx context.Context, ch *Challenge, db DB, jwk *jose.JSONWeb
}
data, err := json.Marshal(req)
if err != nil {
logrus.Warn(err)
return
}
if token := mqtt.GetClient().Publish(fmt.Sprintf("%s/jobs", mqtt.GetOrganization()), 1, false, data); token.Wait() && token.Error() != nil {
logrus.Warn(token.Error())
}
logrus.Info("published validation request")
}()
vc := MustClientFromContext(ctx)
resp, errHttp := vc.Get(u.String())
Expand Down
16 changes: 11 additions & 5 deletions cmd/step-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,14 @@ var agent = cli.Command{
options.OnReconnecting = func(mqtt.Client, *mqtt.ClientOptions) {
logrus.Println("mqtt reconnecting")
}

client := mqtt.NewClient(options)
if token := client.Connect(); token.WaitTimeout(30*time.Second) && token.Error() != nil {
logrus.Warn(token.Error())
}

// Subscribe to topic
client.Subscribe(fmt.Sprintf("%s/jobs", c.String("organization")), 0, func(client mqtt.Client, msg mqtt.Message) {
token := client.Subscribe(fmt.Sprintf("%s/jobs", c.String("organization")), 0, func(client mqtt.Client, msg mqtt.Message) {
logrus.Infof("received message on topic %s", msg.Topic())
logrus.Infof("message: %s", msg.Payload())

Expand All @@ -81,9 +82,6 @@ var agent = cli.Command{
req := msg.Payload()
json.Unmarshal(req, &data)

logrus.Infof("authz: %s", data.Authz)
logrus.Infof("target: %s", data.Target)
logrus.Infof("account: %s", data.Challenge)
logger := logrus.WithField("authz", data.Authz).WithField("target", data.Target).WithField("account", data.Challenge)

http := acme.NewClient()
Expand Down Expand Up @@ -120,11 +118,19 @@ var agent = cli.Command{
// Publish to topic
token := client.Publish(fmt.Sprintf("%s/data", c.String("organization")), 0, false, json)
if token.WaitTimeout(30*time.Second) && token.Error() != nil {
logrus.WithError(token.Error()).Warn("publishing failed")
logger.WithError(token.Error()).Warn("publishing failed")
} else {
logger.Infof("published to topic %s", fmt.Sprintf("%s/data", c.String("organization")))
}

})

if token.WaitTimeout(30*time.Second) && token.Error() != nil {
logrus.WithError(token.Error()).Warn("subscribing failed")
} else {
logrus.Infof("subscribed to topic %s", fmt.Sprintf("%s/jobs", c.String("organization")))
}

return nil
},
}
Expand Down

0 comments on commit 7ceba45

Please sign in to comment.