Skip to content

Commit

Permalink
Merge pull request #47 from buildkite/use-context-to-signal-subproces…
Browse files Browse the repository at this point in the history
…sses

Use context for interupting handler script
  • Loading branch information
lox authored Sep 7, 2018
2 parents fadbf35 + 64e8557 commit b1f84bf
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 26 deletions.
41 changes: 16 additions & 25 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type Daemon struct {
Queue *Queue
AutoScaling *autoscaling.AutoScaling
Handler *os.File
Signals chan os.Signal
}

// Start the daemon.
Expand Down Expand Up @@ -93,7 +92,7 @@ func (d *Daemon) Start(ctx context.Context) error {
continue
}

d.handleMessage(msg)
d.handleMessage(ctx, msg)
}
}()

Expand All @@ -104,7 +103,7 @@ func (d *Daemon) Start(ctx context.Context) error {

log.Info("Executing handler")
timer := time.Now()
err := executeHandler(d.Handler, []string{terminationTransition, d.InstanceID}, d.Signals)
err := executeHandler(ctx, d.Handler, []string{terminationTransition, d.InstanceID})
executeCtx := log.WithFields(log.Fields{
"duration": time.Now().Sub(timer),
})
Expand All @@ -123,65 +122,57 @@ func (d *Daemon) Start(ctx context.Context) error {
return d.Queue.Receive(ctx, ch)
}

func (d *Daemon) handleMessage(m AutoscalingMessage) {
ctx := log.WithFields(log.Fields{
func (d *Daemon) handleMessage(ctx context.Context, m AutoscalingMessage) {
logCtx := log.WithFields(log.Fields{
"transition": m.Transition,
"instanceid": m.InstanceID,
})

hbt := time.NewTicker(heartbeatFrequency)
go func() {
for range hbt.C {
ctx.Debug("Sending heartbeat")
logCtx.Debug("Sending heartbeat")
if err := sendHeartbeat(d.AutoScaling, m); err != nil {
ctx.WithError(err).Error("Heartbeat failed")
logCtx.WithError(err).Error("Heartbeat failed")
}
}
}()

handlerCtx := log.WithFields(log.Fields{
handlerLogCtx := log.WithFields(log.Fields{
"transition": m.Transition,
"instanceid": m.InstanceID,
"handler": d.Handler.Name(),
})

handlerCtx.Info("Executing handler")
handlerLogCtx.Info("Executing handler")
timer := time.Now()

err := executeHandler(d.Handler, []string{m.Transition, m.InstanceID}, d.Signals)
executeCtx := handlerCtx.WithFields(log.Fields{
err := executeHandler(ctx, d.Handler, []string{m.Transition, m.InstanceID})
executeLogCtx := handlerLogCtx.WithFields(log.Fields{
"duration": time.Now().Sub(timer),
})
hbt.Stop()

if err != nil {
executeCtx.WithError(err).Error("Handler script failed")
executeLogCtx.WithError(err).Error("Handler script failed")
return
}

executeCtx.Info("Handler finished successfully")
executeLogCtx.Info("Handler finished successfully")

if err = completeLifecycle(d.AutoScaling, m); err != nil {
ctx.WithError(err).Error("Failed to complete lifecycle action")
logCtx.WithError(err).Error("Failed to complete lifecycle action")
return
}

ctx.Info("Lifecycle action completed successfully")
logCtx.Info("Lifecycle action completed successfully")
}

func executeHandler(command *os.File, args []string, sigs chan os.Signal) error {
cmd := exec.Command(command.Name(), args...)
func executeHandler(ctx context.Context, command *os.File, args []string) error {
cmd := exec.CommandContext(ctx, command.Name(), args...)
cmd.Env = os.Environ()
cmd.Stdout = os.Stderr
cmd.Stderr = os.Stderr

go func() {
sig := <-sigs
if cmd.Process != nil {
cmd.Process.Signal(sig)
}
}()

return cmd.Run()
}

Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func main() {
InstanceID: instanceID,
AutoScaling: autoscaling.New(sess),
Handler: handler,
Signals: sigs,
Queue: NewQueue(sess, generateQueueName(instanceID), snsTopic),
}

Expand Down

0 comments on commit b1f84bf

Please sign in to comment.