Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline the Pipeline #128

Merged
merged 53 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
bead065
benchmark test
Jul 24, 2023
cf130e2
basic logic in place
Jul 26, 2023
66f1288
basic logic in place
Jul 26, 2023
42fa4fd
common.go unit test
Jul 26, 2023
b8a5ee1
commentariat
Jul 26, 2023
5504a23
better function comment
Jul 26, 2023
62da700
lint
Jul 26, 2023
0aeb483
wip
Jul 26, 2023
3ce28b8
fix mocking tests
Jul 27, 2023
06586e8
WhyStopped
Jul 27, 2023
6173e38
cancelWithProblem
Jul 27, 2023
77e2675
should still work even when there's no processors
Jul 27, 2023
5940c9f
addMetrics
Jul 27, 2023
35e635c
more originator details in algod_importer errors
Jul 27, 2023
15a62df
pass unit test
Jul 27, 2023
4956c9f
NewSyncError
Jul 27, 2023
579f2e4
lint
Jul 27, 2023
94b7bd8
bring back E2E finish signaller
Jul 28, 2023
cd7e53c
does e2e pass in CI?
Jul 29, 2023
4c1756f
finer logging granularity
Jul 31, 2023
c332cfc
comments cleanup
Jul 31, 2023
2aae293
commentary
Jul 31, 2023
586f116
Merge remote-tracking branch 'algorand/master' into pipelining
Aug 1, 2023
4f55763
conform logs with internal-tools logstats.go
Aug 2, 2023
2c2e387
changes after CR discussion
Aug 2, 2023
d2e0d3d
per CR
Aug 2, 2023
b2e67de
explanatory comment per CR discussion
Aug 2, 2023
deec57e
rename the sentinel cause and ignore lint warnings
Aug 2, 2023
4103de1
adding PipelineData to data.BlockData
Aug 2, 2023
a0edb98
lint
Aug 2, 2023
fc2fd41
lint
Aug 2, 2023
5ae8a21
don't pollute data.BlockData with pipeline statistics
Aug 3, 2023
3d06838
Update conduit/data/block_export_data.go
tzaffi Aug 3, 2023
30c6a4d
revert
Aug 3, 2023
a8337b3
time out after 15 secs instead of 5
Aug 9, 2023
05580c7
wip
Aug 9, 2023
cdc85e0
fix test
Aug 9, 2023
689d7bd
fix test... really!
Aug 9, 2023
674c949
remove OStart()
Aug 10, 2023
7e88db3
don't shodow the pipeline package
Aug 10, 2023
a160e28
enable `if !errors.Is(pl.WhyStopped(), pipeline.BecauseStopMethod)`
Aug 10, 2023
7b7cf2f
Update conduit/pipeline/pipeline.go
tzaffi Aug 10, 2023
cb86036
fail fast if can't save metadata and carry on without retrying the ca…
Aug 10, 2023
8c1e6fa
Merge branch 'master' into pipelining
Aug 11, 2023
1158f75
simplify cancellation cause and punt to issue Graceful Pipeline Exit …
Aug 11, 2023
8b88301
logstatsE2Elog
Aug 11, 2023
867973f
don't send metrics through the plugin channels
Aug 15, 2023
b36eb15
Apply suggestions from code review
tzaffi Aug 18, 2023
07e2e03
docs: Remove quotes in file_write examples. (#137)
winder Aug 15, 2023
136996b
build: add version to release filename. (#138)
winder Aug 16, 2023
300677f
temp commit
Aug 21, 2023
ded0284
revert
Aug 21, 2023
9918073
Merge branch 'master' into pipelining
Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion conduit/pipeline/common.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,83 @@
package pipeline

import log "github.com/sirupsen/logrus"
import (
"context"
"fmt"
"time"

log "github.com/sirupsen/logrus"

"github.com/algorand/conduit/conduit/data"
)

// HandlePanic function to log panics in a common way
func HandlePanic(logger *log.Logger) {
if r := recover(); r != nil {
logger.Panicf("conduit pipeline experienced a panic: %v", r)
}
}

type empty struct{}

type pluginInput interface {
uint64 | data.BlockData | string
}

type pluginOutput interface {
pluginInput | empty
}

// Retries is a wrapper for retrying a function call f() with a cancellation context,
// a delay and a max retry count. It attempts to call the wrapped function at least once
// and only after the first attempt will pay attention to a context cancellation.
// This can allow the pipeline to receive a cancellation and guarantee attempting to finish
// the round with at least one attempt for each pipeline component.
// - Retry behavior is configured by p.cfg.retryCount.
// - when 0, the function will retry forever or until the context is canceled
// - when > 0, the function will retry p.cfg.retryCount times before giving up
//
// - Upon success:
// - a nil error is returned even if there were intermediate failures
// - the returned duration dur measures the time spent in the call to f() that succeeded
//
// - Upon failure:
// - the return value y is the zero value of type Y and a non-nil error is returned
// - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries
// - when p.cfg.retryCount == 0, the error will be the last error encountered
// - the returned duration dur is the total time spent in the function, including retries
func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
start := time.Now()

for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ {
// the first time through, we don't sleep or mind ctx's done signal
if i > 0 {
select {
case <-p.ctx.Done():
dur = time.Since(start)
err = fmt.Errorf("%s: retry number %d/%d with err: %w. Done signal received: %w", msg, i, p.cfg.RetryCount, err, context.Cause(p.ctx))
return
default:
time.Sleep(p.cfg.RetryDelay)
}
}
opStart := time.Now()
y, err = f(x)
if err == nil {
dur = time.Since(opStart)
return
}
p.logger.Infof("%s: retry number %d/%d with err: %v", msg, i, p.cfg.RetryCount, err)
}

dur = time.Since(start)
err = fmt.Errorf("%s: giving up after %d retries: %w", msg, p.cfg.RetryCount, err)
return
}

// RetriesNoOutput applies the same logic as Retries, but for functions that return no output.
func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := Retries(func(x X) (empty, error) {
return empty{}, f(x)
}, a, p, msg)
return d, err
}
213 changes: 213 additions & 0 deletions conduit/pipeline/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package pipeline

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/algorand/conduit/conduit/data"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

// TestRetries tests the retry logic
func TestRetries(t *testing.T) {
errSentinelCause := errors.New("succeed after has failed")

succeedAfterFactory := func(succeedAfter uint64, never bool) func(uint64) (uint64, error) {
tries := uint64(0)

return func(x uint64) (uint64, error) {
if tries >= succeedAfter && !never {
// ensure not to return the zero value on success
return tries + 1, nil
}
tries++
return 0, fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1)
}
}

succeedAfterFactoryNoOutput := func(succeedAfter uint64, never bool) func(uint64) error {
tries := uint64(0)

return func(x uint64) error {
if tries >= succeedAfter && !never {
return nil
}
tries++
return fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1)
}
}

cases := []struct {
Eric-Warehime marked this conversation as resolved.
Show resolved Hide resolved
name string
retryCount uint64
succeedAfter uint64
neverSucceed bool // neverSucceed trumps succeedAfter
}{
{
name: "retry forever succeeds after 0",
retryCount: 0,
succeedAfter: 0,
neverSucceed: false,
},
{
name: "retry forever succeeds after 1",
retryCount: 0,
succeedAfter: 1,
neverSucceed: false,
},
{
name: "retry forever succeeds after 7",
retryCount: 0,
succeedAfter: 7,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 0",
retryCount: 5,
succeedAfter: 0,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 1",
retryCount: 5,
succeedAfter: 1,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 5",
retryCount: 5,
succeedAfter: 5,
neverSucceed: false,
},
{
name: "retry 5 succeeds after 7",
retryCount: 5,
succeedAfter: 7,
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
neverSucceed: false,
},
{
name: "retry 5 never succeeds",
retryCount: 5,
succeedAfter: 0,
neverSucceed: true,
},
{
name: "retry foerever never succeeds",
retryCount: 0,
succeedAfter: 0,
neverSucceed: true,
},
}

for _, tc := range cases {
tc := tc

// run cases for Retries()
t.Run("Retries() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
ctx: ctx,
ccf: ccf,
logger: log.New(),
cfg: &data.Config{
RetryCount: tc.retryCount,
RetryDelay: 1 * time.Millisecond,
},
}
succeedAfter := succeedAfterFactory(tc.succeedAfter, tc.neverSucceed)

if tc.retryCount == 0 && tc.neverSucceed {
// avoid infinite loop by cancelling the context

yChan := make(chan uint64)
errChan := make(chan error)
go func() {
y, _, err := Retries(succeedAfter, 0, p, "test")
yChan <- y
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
errTestCancelled := errors.New("test cancelled")
go func() {
ccf(errTestCancelled)
}()
y := <-yChan
err := <-errChan
require.ErrorIs(t, err, errTestCancelled, tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
require.Zero(t, y, tc.name)
return
}

y, _, err := Retries(succeedAfter, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)

// note we subtract 1 from y below because succeedAfter has added 1 to its output
// to disambiguate with the zero value which occurs on failure
require.Equal(t, tc.succeedAfter, y-1, tc.name)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
} else { // retryCount > 0 so doesn't retry forever
if tc.neverSucceed || tc.succeedAfter > tc.retryCount {
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
require.Zero(t, y, tc.name)
} else { // !tc.neverSucceed && succeedAfter <= retryCount
require.NoError(t, err, tc.name)
require.Equal(t, tc.succeedAfter, y-1, tc.name)
}
}
})

// run cases for RetriesNoOutput()
t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
ctx: ctx,
ccf: ccf,
logger: log.New(),
cfg: &data.Config{
RetryCount: tc.retryCount,
RetryDelay: 1 * time.Millisecond,
},
}
succeedAfterNoOutput := succeedAfterFactoryNoOutput(tc.succeedAfter, tc.neverSucceed)

if tc.retryCount == 0 && tc.neverSucceed {
// avoid infinite loop by cancelling the context

errChan := make(chan error)
go func() {
_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
errTestCancelled := errors.New("test cancelled")
go func() {
ccf(errTestCancelled)
}()
err := <-errChan
require.ErrorIs(t, err, errTestCancelled, tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
return
}

_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)
} else { // retryCount > 0 so doesn't retry forever
if tc.neverSucceed || tc.succeedAfter > tc.retryCount {
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
} else { // !tc.neverSucceed && succeedAfter <= retryCount
require.NoError(t, err, tc.name)
}
}
})
}
}
Loading
Loading