Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better failure reporting from database queue #119

Open
wants to merge 1 commit into
base: jon-queue-failure
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 0 additions & 110 deletions examples/cmd/markdownRenderer/store/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,35 +224,6 @@ func (conn *store) QueueGroupComplete(id int64) (done bool, cancelled bool, err
return count == 0, cancelled, err
}

func (conn *store) IsQueueAddressComplete(address string) (done bool, err error) {
address = strings.TrimSpace(address)
if address == "" {
return false, errors.New("no address provided for IsQueueAddressComplete")
}

// Include the address in the transaction description to avoid races when
// testing with a special locking connection.
var count int64
var workErr error
conn.db.Transaction(func(tx *gorm.DB) error {
err = tx.Model(&Queue{}).Where("address = ?", address).Count(&count).Error
if err != nil {
return err
}

c2 := &store{
db: tx,
}
workErr = c2.QueueAddressedCheck(address)
return nil
})

if workErr != nil {
err = workErr
}
return count == 0, err
}

func (conn *store) IsQueueAddressInProgress(address string) (bool, error) {
address = strings.TrimSpace(address)
if address == "" {
Expand Down Expand Up @@ -568,87 +539,6 @@ func (conn *store) QueueDelete(permitId permit.Permit) (err error) {
})
}

func (conn *store) QueueAddressedComplete(address string, failure error) (err error) {

var bytes []byte
if failure != nil {
queueError, isQueueError := failure.(*queue.QueueError)
if isQueueError {
// Record type of queue.QueueError
bytes, err = json.Marshal(queueError)
} else {
// Record generic error
bytes, err = json.Marshal(&queue.QueueError{
Message: failure.Error(),
})
}
}
// Return if there were any marshaling errors
if err != nil {
return err
}

var postgres bool

err = conn.db.Transaction(func(tx *gorm.DB) error {
if postgres {
// Lock to prevent race. Don't allow reading until this operation is complete.
err = tx.Exec("LOCK queue_failure IN ACCESS EXCLUSIVE MODE").Error
if err != nil {
return err
}
}

// First, delete any references to this address
err = tx.Delete(&QueueFailure{}, "address = ?", address).Error
if err != nil {
return err
}

// Next, if there is an error to report, insert it
if failure != nil {
failure := QueueFailure{
Address: address,
Error: string(bytes),
}
err = tx.Create(&failure).Error
if err != nil {
return err
}
}
return nil
})
return
}

type QueueAddressFailure struct {
Message string `json:"error"`
}

func (err *QueueAddressFailure) Error() string {
return err.Message
}

func (conn *store) QueueAddressedCheck(address string) error {
var failure QueueFailure
err := conn.db.First(&failure, "address = ?", address).Error
if err == gorm.ErrRecordNotFound {
return nil
} else if err != nil {
return err
} else {
if failure.Error != "" {
var queueError queue.QueueError
if err := json.Unmarshal([]byte(failure.Error), &queueError); err != nil {
return fmt.Errorf("error unmarshalling queue.QueueError: %s", err)
} else {
return &queueError
}
}
}
return nil
}

func (conn *store) QueuePermits(name string) ([]dbqueuetypes.QueuePermit, error) {
permits := make([]QueuePermit, 0)

Expand Down
14 changes: 7 additions & 7 deletions graph.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 0 additions & 11 deletions pkg/rsqueue/impls/database/dbqueuetypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,13 @@ type QueueStore interface {
// * error - errors
QueuePeek(types ...uint64) (results []queue.QueueWork, err error)

// IsQueueAddressComplete checks to see if an address is done/gone
// Expects:
// * id - the queue item address
// Returns:
// * bool - is the item done/gone?
// * error - errors
IsQueueAddressComplete(address string) (bool, error)

// IsQueueAddressInProgress checks to see if an address is still in progress
// Expects:
// * id - the queue item address
// Returns:
// * bool - is the item still in progress?
// * error - errors
IsQueueAddressInProgress(address string) (bool, error)

// QueueAddressedComplete saves or clears failure information for an addressed item
QueueAddressedComplete(address string, failure error) error
}

type QueueGroupStore interface {
Expand Down
4 changes: 2 additions & 2 deletions pkg/rsqueue/impls/database/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.20
require (
github.com/fortytw2/leaktest v1.3.0
github.com/google/uuid v1.1.2
github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0
github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1
github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1
github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1
github.com/rstudio/platform-lib/pkg/rsqueue v0.3.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
)

Expand Down
10 changes: 4 additions & 6 deletions pkg/rsqueue/impls/database/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0 h1:tMeO7PckDZzDROBmu5nZJ1pORN3dNDWMWv57X0xpMf4=
github.com/rstudio/platform-lib/pkg/rsnotify v1.4.0/go.mod h1:OTMZNgESF0Y2THisqq2gd4P/yF9YLCUW1GNXy5ainYM=
github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1 h1:hkLLKye0iBqW5r5ae6qnX5viU9qj0Eiwh+vUpkuy2zI=
github.com/rstudio/platform-lib/pkg/rsnotify v1.5.1/go.mod h1:OTMZNgESF0Y2THisqq2gd4P/yF9YLCUW1GNXy5ainYM=
github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1 h1:65BsZFFHKW9Bm0MBi3Jw7a72C2XqRnmA+519Cm+6zFk=
github.com/rstudio/platform-lib/pkg/rsnotify/listeners/local v1.4.1/go.mod h1:3fj43uPHSrY30ZJmAgbSmmy9d7heQVnL/MXVwOz+5Pc=
github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1 h1:y6hxq3z1F98KFEnfSSYDm5BP4C9MlSxwnNPKHGluV7w=
github.com/rstudio/platform-lib/pkg/rsqueue v0.1.1/go.mod h1:v47Q+KI/vBl6NLgrMLry6x3J651uhppVvCCGffq/bDI=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
github.com/rstudio/platform-lib/pkg/rsqueue v0.3.0 h1:2/risfO4VatW7FcmWeup0Mbx1QlPLYYUZrN8sb8Pt+g=
github.com/rstudio/platform-lib/pkg/rsqueue v0.3.0/go.mod h1:ac7W+fPG4pcb8yYp/npchGewPJezAu6UjlN0DjPEuH8=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
3 changes: 0 additions & 3 deletions pkg/rsqueue/impls/database/groupqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ func (f *fakeQueue) IsAddressInQueue(address string) (bool, error) {
func (f *fakeQueue) PollAddress(address string) (errs <-chan error) {
return f.pollErrs
}
func (f *fakeQueue) RecordFailure(address string, failure error) error {
return f.record
}
func (f *fakeQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error) {
return nil, errors.New("n/i")
}
Expand Down
29 changes: 15 additions & 14 deletions pkg/rsqueue/impls/database/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,6 @@ func (q *DatabaseQueue) AddressedPush(priority uint64, groupId int64, address st
return err
}

func (q *DatabaseQueue) RecordFailure(address string, failure error) error {
return q.store.QueueAddressedComplete(address, failure)
}

func (q *DatabaseQueue) IsAddressInQueue(address string) (bool, error) {
return q.store.IsQueueAddressInProgress(address)
}
Expand All @@ -247,22 +243,22 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error {

go func() {
for {
isDone, err := q.store.IsQueueAddressComplete(address)
queued, err := q.IsAddressInQueue(address)
if err != nil {
// Ignore lock errors
if !utils.IsSqliteLockError(err) {
errCh <- err
close(errCh)
return
}
} else if isDone {
} else if !queued {
q.debugLogger.Debugf("Queue work with address %s completed", address)
close(errCh)
return
}

// Wait for a notification or an interval, then poll again
done := func() bool {
done, err := func() (bool, error) {
completedMsgs := q.SubscribeOne(q.notifyTypeWorkComplete, func(n listener.Notification) bool {
if wn, ok := n.(*agenttypes.WorkCompleteNotification); ok {
return wn.Address == address
Expand All @@ -279,20 +275,25 @@ func (q *DatabaseQueue) PollAddress(address string) <-chan error {
defer tick.Stop()
for {
select {
case <-completedMsgs:
case n := <-completedMsgs:
q.debugLogger.Debugf("Queue was notified that work with address %s completed", address)
return false
if wn, ok := n.(*agenttypes.WorkCompleteNotification); ok {
return true, wn.Error
}
case <-chunkMsgs:
q.debugLogger.Debugf("Queue was notified that chunk with address %s is ready", address)
return true
return true, nil
case <-tick.C:
return false
return false, nil
}
}
}()
// If we received a chunk notification, then we return immediately so the client can
// begin downloading chunks
// If we received a chunk or work complete notification, then we return immediately so the client can
// continue.
if done {
if err != nil {
errCh <- err
}
close(errCh)
return
}
Expand Down Expand Up @@ -372,7 +373,7 @@ func (q *DatabaseQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, typ
if err != nil {
return queueWork, err
}
queueWork, err := q.store.QueuePop(q.name, maxPriority, types.Enabled())
queueWork, err = q.store.QueuePop(q.name, maxPriority, types.Enabled())
if err != sql.ErrNoRows {
q.measureDequeue(queueWork, err)
return queueWork, err
Expand Down
Loading