Skip to content

Commit

Permalink
Fix pre-exec/pre-bootstrap hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
moskyb committed Mar 15, 2023
1 parent 79ec359 commit 44ae959
Showing 1 changed file with 114 additions and 100 deletions.
214 changes: 114 additions & 100 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,21 +335,24 @@ type hookExit struct {

func (r *JobRunner) preExecHook(ctx context.Context, hookName string) hookExit {
exit := hookExit{Ok: true}
if hookPath, _ := hook.Find(r.conf.AgentConfiguration.HooksPath, hookName); hookPath != "" {
// Once we have a hook any failure to run it MUST be fatal to the job to guarantee a true
// positive result from the hook
okay, err := r.executePreExecHook(ctx, hookName, hookPath)
if !okay {
exit.Ok = false

// Ensure the Job UI knows why this job resulted in failure
r.logStreamer.Process([]byte(fmt.Sprintf("%s hook rejected this job, see the buildkite-agent logs for more details", hookName)))
// But disclose more information in the agent logs
r.logger.Error("%s hook rejected this job: %s", hookName, err)

exit.ExitStatus = -1
exit.SignalReason = "agent_refused"
}
hookPath, _ := hook.Find(r.conf.AgentConfiguration.HooksPath, hookName)
if hookPath == "" {
return exit
}

// Once we have a hook any failure to run it MUST be fatal to the job to guarantee a true
// positive result from the hook
okay, err := r.executePreExecHook(ctx, hookName, hookPath)
if !okay {
exit.Ok = false

// Ensure the Job UI knows why this job resulted in failure
r.logStreamer.Process([]byte(fmt.Sprintf("%s hook rejected this job, see the buildkite-agent logs for more details", hookName)))
// But disclose more information in the agent logs
r.logger.Error("%s hook rejected this job: %s", hookName, err)

exit.ExitStatus = -1
exit.SignalReason = "agent_refused"
}

return exit
Expand Down Expand Up @@ -390,108 +393,76 @@ func (r *JobRunner) Run(ctx context.Context) error {
return err
}

hookExit := r.preExecHooks(ctx)
// Before executing the job process with the received Job env,
// execute the pre-bootstrap/pre-exec hook (if present) for it to tell us
// whether it is happy to proceed.
hookExit := r.preExecHook(ctx, "pre-bootstrap")
hookExit = r.preExecHook(ctx, "pre-exec")

// Default exit status is no exit status
signal := ""
exitStatus := strconv.Itoa(hookExit.ExitStatus)
signalReason := hookExit.SignalReason

var wg sync.WaitGroup
// Set up a child context for helper goroutines related to running the job.
cctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
// Finish the build in the Buildkite Agent API
//
// Once we tell the API we're finished it might assign us new work, so make
// sure everything else is done first.
r.finishJob(ctx, startedAt, cancel, &wg, exitStatus, signal, signalReason)

var wg sync.WaitGroup
if hookExit.Ok {
// Kick off log streaming and job status checking when the process starts.
wg.Add(2)
go r.jobLogStreamer(cctx, &wg)
go r.jobCancellationChecker(cctx, &wg)

// Run the process. This will block until it finishes.
if err := r.process.Run(cctx); err != nil {
// Send the error as output
r.logStreamer.Process([]byte(err.Error()))

// The process did not run at all, so make sure it fails
exitStatus = "-1"
signalReason = "process_run_error"
} else {
// Add the final output to the streamer
r.logStreamer.Process(r.output.ReadAndTruncate())
r.logger.Info("Finished job %s", r.job.ID)
}()

// Collect the finished process' exit status
exitStatus = fmt.Sprintf("%d", r.process.WaitStatus().ExitStatus())
if ws := r.process.WaitStatus(); ws.Signaled() {
signal = process.SignalString(ws.Signal())
}
if r.stopped {
// The agent is being gracefully stopped, and we signaled the job to end. Often due
// to pending host shutdown or EC2 spot instance termination
signalReason = "agent_stop"
} else if r.cancelled {
// The job was signaled because it was cancelled via the buildkite web UI
signalReason = "cancel"
}
}
if !hookExit.Ok {
return nil
}

// Store the finished at time
finishedAt := time.Now()
// Kick off log streaming and job status checking when the process starts.
wg.Add(2)
go r.jobLogStreamer(cctx, &wg)
go r.jobCancellationChecker(cctx, &wg)

// Stop the header time streamer. This will block until all the chunks
// have been uploaded
r.headerTimesStreamer.Stop()
// Run the process. This will block until it finishes.
// Run the process. This will block until it finishes.
if err := r.process.Run(cctx); err != nil {
// Send the error as output
r.logStreamer.Process([]byte(err.Error()))

// Stop the log streamer. This will block until all the chunks have
// been uploaded
r.logStreamer.Stop()

// Warn about failed chunks
if count := r.logStreamer.FailedChunks(); count > 0 {
r.logger.Warn("%d chunks failed to upload for this job", count)
}

// Ensure the additional goroutines are stopped.
cancel()

// Wait for the routines that we spun up to finish
r.logger.Debug("[JobRunner] Waiting for all other routines to finish")
wg.Wait()
// The process did not run at all, so make sure it fails
exitStatus = "-1"
signalReason = "process_run_error"
} else {
// Add the final output to the streamer
r.logStreamer.Process(r.output.ReadAndTruncate())

// Remove the env file, if any
if r.envFile != nil {
if err := os.Remove(r.envFile.Name()); err != nil {
r.logger.Warn("[JobRunner] Error cleaning up env file: %s", err)
// Collect the finished process' exit status
exitStatus = fmt.Sprintf("%d", r.process.WaitStatus().ExitStatus())
if ws := r.process.WaitStatus(); ws.Signaled() {
signal = process.SignalString(ws.Signal())
}
if r.stopped {
// The agent is being gracefully stopped, and we signaled the job to end. Often due
// to pending host shutdown or EC2 spot instance termination
signalReason = "agent_stop"
} else if r.cancelled {
// The job was signaled because it was cancelled via the buildkite web UI
signalReason = "cancel"
}
r.logger.Debug("[JobRunner] Deleted env file: %s", r.envFile.Name())
}

// Write some metrics about the job run
jobMetrics := r.metrics.With(metrics.Tags{
"exit_code": exitStatus,
})
if exitStatus == "0" {
jobMetrics.Timing("jobs.duration.success", finishedAt.Sub(startedAt))
jobMetrics.Count("jobs.success", 1)
} else {
jobMetrics.Timing("jobs.duration.error", finishedAt.Sub(startedAt))
jobMetrics.Count("jobs.failed", 1)
}
return nil
}

// Finish the build in the Buildkite Agent API
//
// Once we tell the API we're finished it might assign us new work, so make
// sure everything else is done first.
r.finishJob(ctx, finishedAt, exitStatus, signal, signalReason, r.logStreamer.FailedChunks())
func (r *JobRunner) preExecHooks(ctx context.Context) hookExit {
hookExit := r.preExecHook(ctx, "pre-bootstrap")
if !hookExit.Ok {
return hookExit
}

r.logger.Info("Finished job %s", r.job.ID)
return r.preExecHook(ctx, "pre-exec")

return nil
}

func (r *JobRunner) CancelAndStop() error {
Expand Down Expand Up @@ -723,11 +694,11 @@ func (r *JobRunner) executePreExecHook(ctx context.Context, hookName, hookPath s
}

if err := sh.RunWithoutPrompt(ctx, hookPath); err != nil {
r.logger.Error("Finished %s hook %q: hookName, job rejected", hookPath)
r.logger.Error("Finished %s hook %q: job rejected", hookName, hookPath)
return false, err
}

r.logger.Info("Finished %s hook %q: hookName, job accepted", hookPath)
r.logger.Info("Finished %s hook %q: job accepted", hookName, hookPath)
return true, nil
}

Expand Down Expand Up @@ -761,18 +732,61 @@ func (r *JobRunner) startJob(ctx context.Context, startedAt time.Time) error {

// finishJob finishes the job in the Buildkite Agent API. If the FinishJob call
// cannot return successfully, this will retry for a long time.
func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exitStatus, signal, signalReason string, failedChunkCount int) error {
func (r *JobRunner) finishJob(ctx context.Context, startedAt time.Time, cancel func(), wg *sync.WaitGroup, exitStatus, signal, signalReason string) error {
finishedAt := time.Now()

// Stop the header time streamer. This will block until all the chunks
// have been uploaded
r.headerTimesStreamer.Stop()

// Stop the log streamer. This will block until all the chunks have
// been uploaded
r.logStreamer.Stop()

// Warn about failed chunks
if count := r.logStreamer.FailedChunks(); count > 0 {
r.logger.Warn("%d chunks failed to upload for this job", count)
}

// Ensure the additional goroutines are stopped.
cancel()

// Wait for the routines that we spun up to finish
r.logger.Debug("[JobRunner] Waiting for all other routines to finish")
wg.Wait()

// Remove the env file, if any
if r.envFile != nil {
if err := os.Remove(r.envFile.Name()); err != nil {
r.logger.Warn("[JobRunner] Error cleaning up env file: %s", err)
}
r.logger.Debug("[JobRunner] Deleted env file: %s", r.envFile.Name())
}

// Write some metrics about the job run
jobMetrics := r.metrics.With(metrics.Tags{
"exit_code": exitStatus,
})

if exitStatus == "0" {
jobMetrics.Timing("jobs.duration.success", finishedAt.Sub(startedAt))
jobMetrics.Count("jobs.success", 1)
} else {
jobMetrics.Timing("jobs.duration.error", finishedAt.Sub(startedAt))
jobMetrics.Count("jobs.failed", 1)
}

r.job.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano)
r.job.ExitStatus = exitStatus
r.job.Signal = signal
r.job.SignalReason = signalReason
r.job.ChunksFailedCount = failedChunkCount
r.job.ChunksFailedCount = r.logStreamer.FailedChunks()

r.logger.Debug("[JobRunner] Finishing job with exit_status=%s, signal=%s and signal_reason=%s",
r.job.ExitStatus, r.job.Signal, r.job.SignalReason)

ctx, cancel := context.WithTimeout(ctx, 48*time.Hour)
defer cancel()
ctx, ccancel := context.WithTimeout(ctx, 48*time.Hour)
defer ccancel()

return roko.NewRetrier(
roko.TryForever(),
Expand Down

0 comments on commit 44ae959

Please sign in to comment.