Skip to content

Commit

Permalink
fix | refactor: ensure checkpoint storing isn't racey, one catchUpJob…
Browse files Browse the repository at this point in the history
… at a time (#24)
  • Loading branch information
renaynay committed Mar 26, 2022
1 parent 378ff97 commit c01a057
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 36 deletions.
55 changes: 30 additions & 25 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package das
import (
"context"
"fmt"
"sync"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -47,7 +46,7 @@ func NewDASer(
hsub: hsub,
getter: getter,
cstore: wrappedDS,
jobsCh: make(chan *catchUpJob),
jobsCh: make(chan *catchUpJob, 16),
}
}

Expand All @@ -73,8 +72,8 @@ func (d *DASer) Start(context.Context) error {
d.cancel = cancel
d.sampleDn, d.catchUpDn = make(chan struct{}), make(chan struct{})

// kick off catch-up routine scheduler
go d.catchUpScheduler(dasCtx, checkpoint)
// kick off catch-up routine manager
go d.catchUpManager(dasCtx, checkpoint)
// kick off sampling routine for recently received headers
go d.sample(dasCtx, sub, checkpoint)
return nil
Expand Down Expand Up @@ -159,14 +158,10 @@ type catchUpJob struct {
from, to int64
}

// catchUpScheduler spawns and manages catch-up jobs, exiting only once all jobs
// are complete and the last known DASing checkpoint has been stored to disk.
func (d *DASer) catchUpScheduler(ctx context.Context, checkpoint int64) {
wg := sync.WaitGroup{}

// catchUpManager manages catch-up jobs, performing them one at a time, exiting
// only once context is canceled and storing latest DASed checkpoint to disk.
func (d *DASer) catchUpManager(ctx context.Context, checkpoint int64) {
defer func() {
// wait for all catch-up jobs to finish
wg.Wait()
// store latest DASed checkpoint to disk here to ensure that if DASer is not yet
// fully caught up to network head, it will resume DASing from this checkpoint
// up to current network head
Expand All @@ -184,23 +179,26 @@ func (d *DASer) catchUpScheduler(ctx context.Context, checkpoint int64) {
case <-ctx.Done():
return
case job := <-d.jobsCh:
// spawn catchUp routine
wg.Add(1)
go func() {
defer wg.Done()

d.catchUp(ctx, job)
// TODO @renaynay: assumption that each subsequent job is to a higher height than the previous.
// I don't see why that is *not* the case, though.
checkpoint = job.to
}()
// perform catchUp routine
height, err := d.catchUp(ctx, job)
// update checkpoint before checking error as the height
// value from catchUp represents the last successfully DASed height,
// regardless of the routine's success
checkpoint = height
// exit routine if a catch-up job was unsuccessful
if err != nil {
log.Errorw("catch-up routine failed", "attempted range (from, to)", job.from,
job.to, "last successfully sampled height", height)
log.Warn("IN ORDER TO CONTINUE SAMPLING OVER PAST HEADERS, RE-START THE NODE")
return
}
}
}
}

// catchUp starts a sampling routine for headers starting at the next header
// after the `from` height and exits the loop once `to` is reached. (from:to]
func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) {
func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) (int64, error) {
routineStartTime := time.Now()
log.Infow("sampling past headers", "from", job.from, "to", job.to)

Expand All @@ -210,19 +208,24 @@ func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) {
h, err := d.getter.GetByHeight(ctx, uint64(height))
if err != nil {
if err == context.Canceled {
return
// report previous height as the last successfully sampled height and
// error as nil since the routine was ordered to stop
return height - 1, nil
}

log.Errorw("failed to get next header", "height", height, "err", err)
return
// report previous height as the last successfully sampled height
return height - 1, err
}

startTime := time.Now()

err = d.da.SharesAvailable(ctx, h.DAH)
if err != nil {
if err == context.Canceled {
return
// report previous height as the last successfully sampled height and
// error as nil since the routine was ordered to stop
return height - 1, nil
}
log.Errorw("sampling failed", "height", h.Height, "hash", h.Hash(),
"square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "err", err)
Expand All @@ -236,4 +239,6 @@ func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) {

log.Infow("successfully caught up", "from", job.from,
"to", job.to, "finished (s)", time.Since(routineStartTime))
// report successful result
return job.to, nil
}
64 changes: 53 additions & 11 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/celestiaorg/celestia-node/service/share"
)

var timeout = time.Second * 15

// TestDASerLifecycle tests to ensure every mock block is DASed and
// the DASer checkpoint is updated to network head.
func TestDASerLifecycle(t *testing.T) {
Expand All @@ -26,7 +28,7 @@ func TestDASerLifecycle(t *testing.T) {
// 15 headers from the past and 15 future headers
mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)

daser := NewDASer(shareServ, sub, mockGet, ds)
Expand All @@ -41,6 +43,12 @@ func TestDASerLifecycle(t *testing.T) {
case <-mockGet.doneCh:
}

// give catch-up routine a second to finish up sampling last header
// TODO @renaynay: this sleep is a known flakey solution to the issue that
// we do not have DASState implemented yet. Once DASState is implemented, rely
// on that instead of sleeping.
time.Sleep(time.Second * 1)

err = daser.Stop(ctx)
require.NoError(t, err)

Expand All @@ -58,7 +66,7 @@ func TestDASer_Restart(t *testing.T) {
// 15 headers from the past and 15 future headers
mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)

daser := NewDASer(shareServ, sub, mockGet, ds)
Expand All @@ -85,7 +93,7 @@ func TestDASer_Restart(t *testing.T) {
mockGet.head = int64(45)

// restart DASer with new context
restartCtx, restartCancel := context.WithTimeout(context.Background(), time.Second*15)
restartCtx, restartCancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(restartCancel)

err = daser.Start(restartCtx)
Expand All @@ -98,6 +106,12 @@ func TestDASer_Restart(t *testing.T) {
case <-mockGet.doneCh:
}

// give catch-up routine a second to finish up sampling last header
// TODO @renaynay: this sleep is a known flakey solution to the issue that
// we do not have DASState implemented yet. Once DASState is implemented, rely
// on that instead of sleeping.
time.Sleep(time.Second * 1)

err = daser.Stop(restartCtx)
require.NoError(t, err)

Expand All @@ -119,18 +133,32 @@ func TestDASer_catchUp(t *testing.T) {

daser := NewDASer(shareServ, nil, mockGet, ds)

type catchUpResult struct {
checkpoint int64
err error
}
resultCh := make(chan *catchUpResult, 1)

wg := &sync.WaitGroup{}
wg.Add(1)
go func(wg *sync.WaitGroup) {
go func() {
defer wg.Done()
// catch up from height 2 to head
job := &catchUpJob{
from: 2,
to: mockGet.head,
}
daser.catchUp(ctx, job)
wg.Done()
}(wg)
checkpt, err := daser.catchUp(ctx, job)
resultCh <- &catchUpResult{
checkpoint: checkpt,
err: err,
}
}()
wg.Wait()

result := <-resultCh
assert.Equal(t, mockGet.head, result.checkpoint)
require.NoError(t, result.err)
}

// TestDASer_catchUp_oneHeader tests that catchUp works with a from-to
Expand All @@ -152,17 +180,31 @@ func TestDASer_catchUp_oneHeader(t *testing.T) {
checkpoint, err := loadCheckpoint(daser.cstore)
require.NoError(t, err)

type catchUpResult struct {
checkpoint int64
err error
}
resultCh := make(chan *catchUpResult, 1)

wg := &sync.WaitGroup{}
wg.Add(1)
go func(wg *sync.WaitGroup) {
go func() {
defer wg.Done()
job := &catchUpJob{
from: checkpoint,
to: mockGet.head,
}
daser.catchUp(ctx, job)
wg.Done()
}(wg)
checkpt, err := daser.catchUp(ctx, job)
resultCh <- &catchUpResult{
checkpoint: checkpt,
err: err,
}
}()
wg.Wait()

result := <-resultCh
assert.Equal(t, mockGet.head, result.checkpoint)
require.NoError(t, result.err)
}

// createDASerSubcomponents takes numGetter (number of headers
Expand Down

0 comments on commit c01a057

Please sign in to comment.