From c29255da5a4093a839d5d1ca92feb724d0b1827c Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Fri, 7 Sep 2018 15:48:06 +1000 Subject: [PATCH 1/2] Delete queue before completing lifecycle --- daemon.go | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/daemon.go b/daemon.go index e51a4ce..5d93aa8 100644 --- a/daemon.go +++ b/daemon.go @@ -5,6 +5,7 @@ import ( "encoding/json" "os" "os/exec" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -45,19 +46,15 @@ func (d *Daemon) Start(ctx context.Context) error { if err := d.Queue.Create(); err != nil { return err } - defer func() { - if err := d.Queue.Delete(); err != nil { - log.WithError(err).Error("Failed to delete queue") - } - }() if err := d.Queue.Subscribe(); err != nil { return err } + + // ensure the queue deletion happens only once + var deleteOnce sync.Once defer func() { - if err := d.Queue.Unsubscribe(); err != nil { - log.WithError(err).Error("Failed to unsubscribe from sns topic") - } + deleteOnce.Do(d.deleteQueue) }() ch := make(chan *sqs.Message) @@ -92,7 +89,10 @@ func (d *Daemon) Start(ctx context.Context) error { continue } - d.handleMessage(ctx, msg) + d.handleMessage(ctx, msg, func() { + // delete the queue before we complete + deleteOnce.Do(d.deleteQueue) + }) } }() @@ -114,7 +114,6 @@ func (d *Daemon) Start(ctx context.Context) error { } executeCtx.Info("Handler finished successfully") - } }() @@ -122,7 +121,7 @@ func (d *Daemon) Start(ctx context.Context) error { return d.Queue.Receive(ctx, ch) } -func (d *Daemon) handleMessage(ctx context.Context, m AutoscalingMessage) { +func (d *Daemon) handleMessage(ctx context.Context, m AutoscalingMessage, complete func()) { logCtx := log.WithFields(log.Fields{ "transition": m.Transition, "instanceid": m.InstanceID, @@ -160,6 +159,8 @@ func (d *Daemon) handleMessage(ctx context.Context, m AutoscalingMessage) { executeLogCtx.Info("Handler finished successfully") + complete() + if err = completeLifecycle(d.AutoScaling, m); err != nil { logCtx.WithError(err).Error("Failed to complete lifecycle action") return @@ -168,6 +169,17 @@ func (d *Daemon) handleMessage(ctx context.Context, m AutoscalingMessage) { logCtx.Info("Lifecycle action completed successfully") } +func (d *Daemon) deleteQueue() { + if err := d.Queue.Delete(); err != nil { + log.WithError(err).Error("Failed to delete queue") + } + + if err := d.Queue.Unsubscribe(); err != nil { + log.WithError(err).Error("Failed to unsubscribe from sns topic") + } + +} + func executeHandler(ctx context.Context, command *os.File, args []string) error { cmd := exec.CommandContext(ctx, command.Name(), args...) cmd.Env = os.Environ() From f04767ac1cf463270b6e17e9533372559e3580a0 Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Tue, 11 Sep 2018 13:24:15 +1000 Subject: [PATCH 2/2] Ensure that queue is deleted even if subscribe fails --- daemon.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/daemon.go b/daemon.go index 5d93aa8..ba4dda3 100644 --- a/daemon.go +++ b/daemon.go @@ -47,16 +47,21 @@ func (d *Daemon) Start(ctx context.Context) error { return err } - if err := d.Queue.Subscribe(); err != nil { - return err - } - // ensure the queue deletion happens only once var deleteOnce sync.Once defer func() { deleteOnce.Do(d.deleteQueue) }() + if err := d.Queue.Subscribe(); err != nil { + return err + } + defer func() { + if err := d.Queue.Unsubscribe(); err != nil { + log.WithError(err).Error("Failed to unsubscribe from sns topic") + } + }() + ch := make(chan *sqs.Message) go func() { @@ -173,11 +178,6 @@ func (d *Daemon) deleteQueue() { if err := d.Queue.Delete(); err != nil { log.WithError(err).Error("Failed to delete queue") } - - if err := d.Queue.Unsubscribe(); err != nil { - log.WithError(err).Error("Failed to unsubscribe from sns topic") - } - } func executeHandler(ctx context.Context, command *os.File, args []string) error {