Skip to content

Commit

Permalink
Better failure reporting from database queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jonyoder committed Aug 1, 2023
1 parent e97ce48 commit e9d7991
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 188 deletions.
90 changes: 0 additions & 90 deletions examples/cmd/markdownRenderer/store/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,35 +224,6 @@ func (conn *store) QueueGroupComplete(id int64) (done bool, cancelled bool, err
return count == 0, cancelled, err
}

func (conn *store) IsQueueAddressComplete(address string) (done bool, err error) {
address = strings.TrimSpace(address)
if address == "" {
return false, errors.New("no address provided for IsQueueAddressComplete")
}

// Include the address in the transaction description to avoid races when
// testing with a special locking connection.
var count int64
var workErr error
conn.db.Transaction(func(tx *gorm.DB) error {
err = tx.Model(&Queue{}).Where("address = ?", address).Count(&count).Error
if err != nil {
return err
}

c2 := &store{
db: tx,
}
workErr = c2.QueueAddressedCheck(address)
return nil
})

if workErr != nil {
err = workErr
}
return count == 0, err
}

func (conn *store) IsQueueAddressInProgress(address string) (bool, error) {
address = strings.TrimSpace(address)
if address == "" {
Expand Down Expand Up @@ -568,67 +539,6 @@ func (conn *store) QueueDelete(permitId permit.Permit) (err error) {
})
}

func (conn *store) QueueAddressedComplete(address string, failure error) (err error) {

var bytes []byte
if failure != nil {
queueError, isQueueError := failure.(*queue.QueueError)
if isQueueError {
// Record type of queue.QueueError
bytes, err = json.Marshal(queueError)
} else {
// Record generic error
bytes, err = json.Marshal(&queue.QueueError{
Message: failure.Error(),
})
}
}
// Return if there were any marshaling errors
if err != nil {
return err
}

var postgres bool

err = conn.db.Transaction(func(tx *gorm.DB) error {
if postgres {
// Lock to prevent race. Don't allow reading until this operation is complete.
err = tx.Exec("LOCK queue_failure IN ACCESS EXCLUSIVE MODE").Error
if err != nil {
return err
}
}

// First, delete any references to this address
err = tx.Delete(&QueueFailure{}, "address = ?", address).Error
if err != nil {
return err
}

// Next, if there is an error to report, insert it
if failure != nil {
failure := QueueFailure{
Address: address,
Error: string(bytes),
}
err = tx.Create(&failure).Error
if err != nil {
return err
}
}
return nil
})
return
}

type QueueAddressFailure struct {
Message string `json:"error"`
}

func (err *QueueAddressFailure) Error() string {
return err.Message
}

func (conn *store) QueueAddressedCheck(address string) error {
var failure QueueFailure
err := conn.db.First(&failure, "address = ?", address).Error
Expand Down
14 changes: 7 additions & 7 deletions graph.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 0 additions & 11 deletions pkg/rsqueue/impls/database/dbqueuetypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,13 @@ type QueueStore interface {
// * error - errors
QueuePeek(types ...uint64) (results []queue.QueueWork, err error)

// IsQueueAddressComplete checks to see if an address is done/gone
// Expects:
// * id - the queue item address
// Returns:
// * bool - is the item done/gone?
// * error - errors
IsQueueAddressComplete(address string) (bool, error)

// IsQueueAddressInProgress checks to see if an address is still in progress
// Expects:
// * id - the queue item address
// Returns:
// * bool - is the item still in progress?
// * error - errors
IsQueueAddressInProgress(address string) (bool, error)

// QueueAddressedComplete saves or clears failure information for an addressed item
QueueAddressedComplete(address string, failure error) error
}

type QueueGroupStore interface {
Expand Down
4 changes: 2 additions & 2 deletions pkg/rsqueue/impls/database/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.20
require (
github.com/fortytw2/leaktest v1.3.0
github.com/google/uuid v1.1.2
github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0
github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1
github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1
github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1
github.com/rstudio/platform-lib/pkg/rsqueue v0.3.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
)

Expand Down
10 changes: 4 additions & 6 deletions pkg/rsqueue/impls/database/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0 h1:tMeO7PckDZzDROBmu5nZJ1pORN3dNDWMWv57X0xpMf4=
github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0/go.mod h1:OTMZNgESF0Y2THisqq2gd4P/yF9YLCUW1GNXy5ainYM=
github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1 h1:hkLLKye0iBqW5r5ae6qnX5viU9qj0Eiwh+vUpkuy2zI=
github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1/go.mod h1:OTMZNgESF0Y2THisqq2gd4P/yF9YLCUW1GNXy5ainYM=
github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1 h1:65BsZFFHKW9Bm0MBi3Jw7a72C2XqRnmA+519Cm+6zFk=
github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1/go.mod h1:3fj43uPHSrY30ZJmAgbSmmy9d7heQVnL/MXVwOz+5Pc=
github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1 h1:y6hxq3z1F98KFEnfSSYDm5BP4C9MlSxwnNPKHGluV7w=
github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1/go.mod h1:v47Q+KI/vBl6NLgrMLry6x3J651uhppVvCCGffq/bDI=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
github.com/rstudio/platform-lib/pkg/rsqueue v0.3.0 h1:2/risfO4VatW7FcmWeup0Mbx1QlPLYYUZrN8sb8Pt+g=
github.com/rstudio/platform-lib/pkg/rsqueue v0.3.0/go.mod h1:ac7W+fPG4pcb8yYp/npchGewPJezAu6UjlN0DjPEuH8=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
3 changes: 0 additions & 3 deletions pkg/rsqueue/impls/database/groupqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ func (f *fakeQueue) IsAddressInQueue(address string) (bool, error) {
func (f *fakeQueue) PollAddress(address string) (errs <-chan error) {
return f.pollErrs
}
func (f *fakeQueue) RecordFailure(address string, failure error) error {
return f.record
}
func (f *fakeQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error) {
return nil, errors.New("n/i")
}
Expand Down
29 changes: 15 additions & 14 deletions pkg/rsqueue/impls/database/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,6 @@ func (q *DatabaseQueue) AddressedPush(priority uint64, groupId int64, address st
return err
}

func (q *DatabaseQueue) RecordFailure(address string, failure error) error {
return q.store.QueueAddressedComplete(address, failure)
}

func (q *DatabaseQueue) IsAddressInQueue(address string) (bool, error) {
return q.store.IsQueueAddressInProgress(address)
}
Expand All @@ -247,22 +243,22 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error {

go func() {
for {
isDone, err := q.store.IsQueueAddressComplete(address)
queued, err := q.IsAddressInQueue(address)
if err != nil {
// Ignore lock errors
if !utils.IsSqliteLockError(err) {
errCh <- err
close(errCh)
return
}
} else if isDone {
} else if !queued {
q.debugLogger.Debugf("Queue work with address %s completed", address)
close(errCh)
return
}

// Wait for a notification or an interval, then poll again
done := func() bool {
done, err := func() (bool, error) {
completedMsgs := q.SubscribeOne(q.notifyTypeWorkComplete, func(n listener.Notification) bool {
if wn, ok := n.(*agenttypes.WorkCompleteNotification); ok {
return wn.Address == address
Expand All @@ -279,20 +275,25 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error {
defer tick.Stop()
for {
select {
case <-completedMsgs:
case n := <-completedMsgs:
q.debugLogger.Debugf("Queue was notified that work with address %s completed", address)
return false
if wn, ok := n.(*agenttypes.WorkCompleteNotification); ok {
return true, wn.Error
}
case <-chunkMsgs:
q.debugLogger.Debugf("Queue was notified that chunk with address %s is ready", address)
return true
return true, nil
case <-tick.C:
return false
return false, nil
}
}
}()
// If we received a chunk notification, then we return immediately so the client can
// begin downloading chunks
// If we received a chunk or work complete notification, then we return immediately so the client can
// continue.
if done {
if err != nil {
errCh <- err
}
close(errCh)
return
}
Expand Down Expand Up @@ -372,7 +373,7 @@ func (q *DatabaseQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, typ
if err != nil {
return queueWork, err
}
queueWork, err := q.store.QueuePop(q.name, maxPriority, types.Enabled())
queueWork, err = q.store.QueuePop(q.name, maxPriority, types.Enabled())
if err != sql.ErrNoRows {
q.measureDequeue(queueWork, err)
return queueWork, err
Expand Down
Loading

0 comments on commit e9d7991

Please sign in to comment.