Skip to content

Commit

Permalink
fix: panicked job calls hook job done with typed error
Browse files Browse the repository at this point in the history
  • Loading branch information
vgarvardt committed Dec 28, 2023
1 parent caf8aaa commit c723446
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
6 changes: 6 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package gue

import (
"errors"
"fmt"
"time"
)

// ErrJobPanicked is returned when the job failed ot be handled because it is panicked.
// Error is normally returned wrapped, so use `errors.Is(err, gue.ErrJobPanicked)` to ensure this is the error you're
// looking for.
var ErrJobPanicked = errors.New("job panicked")

// ErrJobReschedule interface implementation allows errors to reschedule jobs in the individual basis.
type ErrJobReschedule interface {
rescheduleJobAt() time.Time
Expand Down
50 changes: 29 additions & 21 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,32 +334,40 @@ func (w *Worker) markJobDone(ctx context.Context, j *Job, processingStartedAt ti
// recoverPanic tries to handle panics in job execution.
// A stacktrace is stored into Job last_error.
func (w *Worker) recoverPanic(ctx context.Context, j *Job, logger adapter.Logger) {
if r := recover(); r != nil {
ctx, span := w.tracer.Start(ctx, "Worker.recoverPanic")
defer span.End()
r := recover()
if r == nil {
return
}

// record an error on the job with panic message and stacktrace
stackBuf := make([]byte, w.panicStackBufSize)
n := runtime.Stack(stackBuf, false)
ctx, span := w.tracer.Start(ctx, "Worker.recoverPanic")
defer span.End()

buf := new(bytes.Buffer)
_, printRErr := fmt.Fprintf(buf, "%v\n", r)
_, printStackErr := fmt.Fprintln(buf, string(stackBuf[:n]))
_, printEllipsisErr := fmt.Fprintln(buf, "[...]")
stacktrace := buf.String()
// record an error on the job with panic message and stacktrace
stackBuf := make([]byte, w.panicStackBufSize)
n := runtime.Stack(stackBuf, false)

if err := errors.Join(printRErr, printStackErr, printEllipsisErr); err != nil {
logger.Error("Could not build panicked job stacktrace", adapter.Err(err), adapter.F("runtime-stack", string(stackBuf[:n])))
}
buf := new(bytes.Buffer)
_, printRErr := fmt.Fprintf(buf, "%v\n", r)
_, printStackErr := fmt.Fprintln(buf, string(stackBuf[:n]))
_, printEllipsisErr := fmt.Fprintln(buf, "[...]")
stacktrace := buf.String()

w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(false)))
span.RecordError(errors.New("job panicked"), trace.WithAttributes(attribute.String("stacktrace", stacktrace)))
logger.Error("Job panicked", adapter.F("stacktrace", stacktrace))
if err := errors.Join(printRErr, printStackErr, printEllipsisErr); err != nil {
logger.Error("Could not build panicked job stacktrace", adapter.Err(err), adapter.F("runtime-stack", string(stackBuf[:n])))
}

if err := j.Error(ctx, errors.New(stacktrace)); err != nil {
span.RecordError(fmt.Errorf("failed to mark panicked job as error: %w", err))
logger.Error("Got an error on setting an error to a panicked job", adapter.Err(err))
}
w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(false)))
span.RecordError(ErrJobPanicked, trace.WithAttributes(attribute.String("stacktrace", stacktrace)))
logger.Error("Job panicked", adapter.F("stacktrace", stacktrace))

errPanic := fmt.Errorf("%w:\n%s", ErrJobPanicked, stacktrace)
for _, hook := range w.hooksJobDone {
hook(ctx, j, errPanic)
}

if err := j.Error(ctx, errPanic); err != nil {
span.RecordError(fmt.Errorf("failed to mark panicked job as error: %w", err))
logger.Error("Got an error on setting an error to a panicked job", adapter.Err(err))
}
}

Expand Down
14 changes: 10 additions & 4 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,22 +333,28 @@ func testWorkerWorkRescuesPanic(t *testing.T, connPool adapter.ConnPool) {
c, err := NewClient(connPool)
require.NoError(t, err)

called := 0
var handlerCalled int
wm := WorkMap{
"MyJob": func(ctx context.Context, j *Job) error {
called++
handlerCalled++
panic("the panic msg")
},
}
w, err := NewWorker(c, wm, WithWorkerLogger(adapterZap.New(logger)))

var hookDoneCalled int
w, err := NewWorker(c, wm, WithWorkerLogger(adapterZap.New(logger)), WithWorkerHooksJobDone(func(ctx context.Context, j *Job, err error) {
hookDoneCalled++
assert.ErrorIs(t, err, ErrJobPanicked)
}))
require.NoError(t, err)

job := Job{Type: "MyJob"}
err = c.Enqueue(ctx, &job)
require.NoError(t, err)

w.WorkOne(ctx)
assert.Equal(t, 1, called)
require.Equal(t, 1, handlerCalled)
require.Equal(t, 1, hookDoneCalled)

j, err := c.LockJobByID(ctx, job.ID)
require.NoError(t, err)
Expand Down

0 comments on commit c723446

Please sign in to comment.