Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better failure reporting from queue #118

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 22 additions & 59 deletions examples/cmd/markdownRenderer/store/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package store
import (
"database/sql"
"encoding/json"
"errors"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -182,12 +181,12 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) {
c.Assert(err, check.DeepEquals, queue.ErrDuplicateAddressedPush)

// Addresses should not be completed
done, err := s.store.IsQueueAddressComplete("abc")
queued, err := s.store.IsQueueAddressInProgress("abc")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, false)
done, err = s.store.IsQueueAddressComplete("def")
c.Check(queued, check.Equals, true)
queued, err = s.store.IsQueueAddressInProgress("def")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, false)
c.Check(queued, check.Equals, true)

// Attempt to pop a type that doesn't exist in the queue
queueWork, err := s.store.QueuePop("test", 0, []uint64{888})
Expand All @@ -206,12 +205,12 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) {
c.Assert(err, check.IsNil)

// First address should be completed
done, err = s.store.IsQueueAddressComplete("abc")
queued, err = s.store.IsQueueAddressInProgress("abc")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, true)
done, err = s.store.IsQueueAddressComplete("def")
c.Check(queued, check.Equals, false)
queued, err = s.store.IsQueueAddressInProgress("def")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, false)
c.Check(queued, check.Equals, true)

// Pop second item
queueWork, err = s.store.QueuePop("test", 0, []uint64{TypeTest, TypeTest2})
Expand All @@ -225,22 +224,18 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) {
c.Assert(err, check.IsNil)

// Second address should be completed
done, err = s.store.IsQueueAddressComplete("def")
queued, err = s.store.IsQueueAddressInProgress("def")
c.Assert(err, check.IsNil)
c.Check(done, check.Equals, true)
c.Check(queued, check.Equals, false)

// Record error for second address
err = s.store.QueueAddressedComplete("def", errors.New("some error"))
// Second address should be completed
queued, err = s.store.IsQueueAddressInProgress("def")
c.Assert(err, check.IsNil)

// Second address should be completed, but with error
done, err = s.store.IsQueueAddressComplete("def")
c.Assert(err, check.ErrorMatches, "some error")
c.Check(done, check.Equals, true)
c.Check(queued, check.Equals, false)

// Cannot check for empty address
_, err = s.store.IsQueueAddressComplete(" ")
c.Check(err, check.ErrorMatches, "no address provided for IsQueueAddressComplete")
_, err = s.store.IsQueueAddressInProgress(" ")
c.Check(err, check.ErrorMatches, "no address provided for IsQueueAddressInProgress")
}

func (s *QueueSqliteSuite) TestQueueGroups(c *check.C) {
Expand Down Expand Up @@ -593,43 +588,6 @@ func (s *QueueSqliteSuite) TestQueueGroupCancel(c *check.C) {
c.Check(cancelled, check.Equals, true)
}

func (s *QueueSqliteSuite) TestAddressFailure(c *check.C) {
// Fail
address := "abcdefg"
// Pass a generic error
err := s.store.QueueAddressedComplete(address, errors.New("first error"))
c.Assert(err, check.IsNil)
err = s.store.(*store).QueueAddressedCheck(address)
c.Check(err, check.ErrorMatches, "first error")

// Should be castable to queue.QueueError pointer
queueError, ok := err.(*queue.QueueError)
c.Assert(ok, check.Equals, true)
c.Check(queueError, check.DeepEquals, &queue.QueueError{
Code: 0, // Not set on generic error
Message: "first error",
})

// Fail again, but pass a typed error
err = s.store.QueueAddressedComplete(address, &queue.QueueError{Code: 404, Message: "second error"})
c.Assert(err, check.IsNil)
err = s.store.(*store).QueueAddressedCheck(address)
c.Check(err, check.ErrorMatches, "second error")

// Should be castable to queue.QueueError pointer
queueError, ok = err.(*queue.QueueError)
c.Assert(ok, check.Equals, true)
c.Check(queueError, check.DeepEquals, &queue.QueueError{
Code: 404,
Message: "second error",
})

// Don't fail
err = s.store.QueueAddressedComplete(address, nil)
c.Assert(err, check.IsNil)
c.Check(s.store.(*store).QueueAddressedCheck(address), check.IsNil)
}

func (s *QueueSqliteSuite) TestIsQueueAddressInProgress(c *check.C) {
// Is a non-existing address in the queue?
found, err := s.store.IsQueueAddressInProgress("def")
Expand All @@ -645,12 +603,17 @@ func (s *QueueSqliteSuite) TestIsQueueAddressInProgress(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(found, check.Equals, true)

// Assign a permit
w, err := s.store.QueuePop("test", 0, []uint64{TypeTest})
c.Assert(err, check.IsNil)
c.Assert(w.Permit, check.Not(check.Equals), permit.Permit(0))

// Complete the work
err = s.store.QueueAddressedComplete("def", nil)
err = s.store.QueueDelete(w.Permit)
c.Assert(err, check.IsNil)
found, err = s.store.IsQueueAddressInProgress("def")

// Now it should not be found
c.Assert(err, check.IsNil)
c.Assert(found, check.Equals, true)
c.Assert(found, check.Equals, false)
}
24 changes: 8 additions & 16 deletions pkg/rsqueue/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,7 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
}

// Run the job (blocks)
func() {

jobError := func() (result error) {
// Extend the job's heartbeat in the queue periodically until the job completes.
done := make(chan struct{})
defer close(done)
Expand All @@ -362,7 +361,7 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
case <-ticker.C:
// Extend the job visibility
a.traceLogger.Debugf("Job of type %d visibility timeout needs to be extended: %d\n", queueWork.WorkType, queueWork.Permit)
err := a.queue.Extend(queueWork.Permit)
err = a.queue.Extend(queueWork.Permit)
if err != nil {
a.logger.Debugf("Error extending job for work type %d: %s", queueWork.WorkType, err)
}
Expand All @@ -374,24 +373,17 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j

// Actually run the job
a.traceLogger.Debugf("Running job with type %d, address '%s', and permit '%d'", queueWork.WorkType, queueWork.Address, queueWork.Permit)
err := a.runner.Run(queue.RecursableWork{
err = a.runner.Run(queue.RecursableWork{
Work: queueWork.Work,
WorkType: queueWork.WorkType,
Context: ctx,
})
if err != nil {
result = err
a.logger.Debugf("Job type %d: address: %s work: %#v returned error: %s\n", queueWork.WorkType, queueWork.Address, string(queueWork.Work), err)
}

// If the work was addressed, record the result
if queueWork.Address != "" {
// Note, `RecordFailure` will record the error if err != nil. If
// err == nil, then it clears any recorded error for the address.
err = a.queue.RecordFailure(queueWork.Address, err)
if err != nil {
a.logger.Debugf("Failed while recording addressed work success/failure: %s\n", err)
}
}
return
}()

// Decrement the job count
Expand All @@ -416,16 +408,16 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
// Delete the job from the queue
a.traceLogger.Debugf("Deleting job from queue: %d\n", queueWork.Permit)

if err := a.queue.Delete(queueWork.Permit); err != nil {
a.logger.Debugf("queue Delete() returned error: %s", err)
if deleteErr := a.queue.Delete(queueWork.Permit); deleteErr != nil {
a.logger.Debugf("queue Delete() returned error: %s", deleteErr)
}

// Notify that work is complete if work is addressed. This must happen after the work has been deleted from the
// queue.
if queueWork.Address != "" {
n := time.Now()
a.traceLogger.Debugf("Ready to notify of address %s", queueWork.Address)
notify(agenttypes.NewWorkCompleteNotification(queueWork.Address, a.notifyTypeWorkComplete))
notify(agenttypes.NewWorkCompleteNotification(queueWork.Address, a.notifyTypeWorkComplete, jobError))
a.traceLogger.Debugf("Notified of address %s in %d ms", queueWork.Address, time.Now().Sub(n).Nanoseconds()/1000000)
}

Expand Down
31 changes: 11 additions & 20 deletions pkg/rsqueue/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ type FakeQueue struct {
extend error
mutex sync.Mutex
record error
errs []error
}

func (*FakeQueue) Push(priority uint64, groupId int64, work queue.Work) error {
Expand All @@ -138,12 +137,6 @@ func (*FakeQueue) IsAddressInQueue(address string) (bool, error) {
func (f *FakeQueue) PollAddress(address string) (errs <-chan error) {
return f.pollErrs
}
func (f *FakeQueue) RecordFailure(address string, failure error) error {
if f.record == nil {
f.errs = append(f.errs, failure)
}
return f.record
}
func (*FakeQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error) {
return nil, nil
}
Expand Down Expand Up @@ -214,9 +207,9 @@ func (s *AgentSuite) TestWaitBlocked(c *check.C) {
}()

// Send some messages across the queue while blocked and make sure they're discarded
msgs <- agenttypes.NewWorkCompleteNotification("somewhere1", 10)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere2", 10)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere3", 10)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere1", 10, nil)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere2", 10, nil)
msgs <- agenttypes.NewWorkCompleteNotification("somewhere3", 10, nil)

c.Check(completed, check.Equals, false)
jobDone <- 98
Expand All @@ -229,7 +222,6 @@ func (s *AgentSuite) TestWaitBlocked(c *check.C) {
func (s *AgentSuite) TestRunJobOk(c *check.C) {
q := &FakeQueue{
extended: make(map[permit.Permit]int),
errs: make([]error, 0),
}
finish := map[string]chan bool{
"one": make(chan bool),
Expand Down Expand Up @@ -316,11 +308,6 @@ func (s *AgentSuite) TestRunJobOk(c *check.C) {
c.Check(receivedPriority[1], check.Equals, MAX_CONCURRENCY)
c.Check(receivedPriority[2], check.Equals, MAX_CONCURRENCY)

// Jobs b2 and b3 should have been marked as successful since they were addressed
c.Check(q.errs, check.HasLen, 2)
c.Check(q.errs[0], check.IsNil)
c.Check(q.errs[1], check.IsNil)

// Jobs should have been deleted
c.Check(q.deleted, check.Equals, 3)

Expand All @@ -337,7 +324,6 @@ func (s *AgentSuite) TestRunJobOk(c *check.C) {
func (s *AgentSuite) TestRunJobFails(c *check.C) {
q := &FakeQueue{
extended: make(map[permit.Permit]int),
errs: make([]error, 0),
}
finish := map[string]chan bool{
"one": make(chan bool),
Expand All @@ -363,7 +349,10 @@ func (s *AgentSuite) TestRunJobFails(c *check.C) {
b1, err := json.Marshal(&w1)
c.Assert(err, check.IsNil)

n := func(n listener.Notification) {}
var notified listener.Notification
n := func(n listener.Notification) {
notified = n
}

maxPriorityDone := make(chan struct{})
go func() {
Expand All @@ -387,8 +376,10 @@ func (s *AgentSuite) TestRunJobFails(c *check.C) {
<-maxPriorityDone

// Jobs should have been marked as failed since it was addressed
c.Check(q.errs, check.HasLen, 1)
c.Check(q.errs[0], check.ErrorMatches, "work failed")
c.Assert(notified, check.FitsTypeOf, &agenttypes.WorkCompleteNotification{})
workNotified, ok := notified.(*agenttypes.WorkCompleteNotification)
c.Assert(ok, check.Equals, true)
c.Assert(workNotified.Error, check.ErrorMatches, "work failed")

// Jobs should have been deleted
c.Check(q.deleted, check.Equals, 1)
Expand Down
4 changes: 3 additions & 1 deletion pkg/rsqueue/agent/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ type Notify func(n listener.Notification)
type WorkCompleteNotification struct {
listener.GenericNotification
Address string
Error error
}

func NewWorkCompleteNotification(address string, workType uint8) *WorkCompleteNotification {
func NewWorkCompleteNotification(address string, workType uint8, err error) *WorkCompleteNotification {
return &WorkCompleteNotification{
GenericNotification: listener.GenericNotification{
NotifyGuid: uuid.New().String(),
NotifyType: workType,
},
Address: address,
Error: err,
}
}
17 changes: 16 additions & 1 deletion pkg/rsqueue/agent/types/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agenttypes
// Copyright (C) 2022 by RStudio, PBC

import (
"errors"
"testing"

"github.com/rstudio/platform-lib/pkg/rsnotify/listener"
Expand All @@ -16,7 +17,7 @@ func TestPackage(t *testing.T) { check.TestingT(t) }
var _ = check.Suite(&AgentTypesSuite{})

func (s *AgentTypesSuite) TestWorkCompleteNotification(c *check.C) {
cn := NewWorkCompleteNotification("some_address", 10)
cn := NewWorkCompleteNotification("some_address", 10, nil)
// Guid should be set
c.Assert(cn.Guid(), check.Not(check.Equals), "")
cn.NotifyGuid = ""
Expand All @@ -27,4 +28,18 @@ func (s *AgentTypesSuite) TestWorkCompleteNotification(c *check.C) {

Address: "some_address",
})

err := errors.New("some error")
cn = NewWorkCompleteNotification("some_address", 10, err)
// Guid should be set
c.Assert(cn.Guid(), check.Not(check.Equals), "")
cn.NotifyGuid = ""
c.Assert(cn, check.DeepEquals, &WorkCompleteNotification{
GenericNotification: listener.GenericNotification{
NotifyType: uint8(10),
},

Address: "some_address",
Error: err,
})
}
3 changes: 0 additions & 3 deletions pkg/rsqueue/groups/grouprunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ func (f *fakeQueue) IsAddressInQueue(address string) (bool, error) {
func (f *fakeQueue) PollAddress(address string) (errs <-chan error) {
return f.pollErrs
}
func (f *fakeQueue) RecordFailure(address string, failure error) error {
return f.record
}
func (f *fakeQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error) {
return nil, errors.New("n/i")
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/rsqueue/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ type Queue interface {
// * work the work to push into the queue. Must be JSON-serializable
AddressedPush(priority uint64, groupId int64, address string, work Work) error

// RecordFailure records a failure of addressed work in the queue
// * address the address of the work the failed
// * failure the error that occurred. Overwrites any previous error information
// from earlier runs of the same work. If `failure==nil`, the error is cleared.
RecordFailure(address string, failure error) error

// PollAddress polls addressed work in the queue, and an `errs` channels
// to report when the work is done and/or an error has occurred. We pass
// `nil` over the errs channel when the poll has completed without errors
Expand Down
4 changes: 0 additions & 4 deletions pkg/rsqueue/queue/recording_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ func (q *RecordingProducer) PollAddress(address string) (errs <-chan error) {
return q.PollErrs
}

func (q *RecordingProducer) RecordFailure(address string, failure error) error {
return q.RecordErr
}

func (q *RecordingProducer) Get(maxPriority uint64, maxPriorityChan chan uint64, types QueueSupportedTypes, stop chan bool) (*QueueWork, error) {
return nil, kaboom
}
Expand Down