From c01a057d7a78ec2cb9d6ed68891245bdfd691cd6 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Sat, 26 Mar 2022 12:33:46 +0000 Subject: [PATCH] fix | refactor: ensure checkpoint storing isn't racey, one catchUpJob at a time (#24) --- das/daser.go | 55 ++++++++++++++++++++++------------------ das/daser_test.go | 64 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 83 insertions(+), 36 deletions(-) diff --git a/das/daser.go b/das/daser.go index 61c1084c3f..7436d2446b 100644 --- a/das/daser.go +++ b/das/daser.go @@ -3,7 +3,6 @@ package das import ( "context" "fmt" - "sync" "time" "github.com/ipfs/go-datastore" @@ -47,7 +46,7 @@ func NewDASer( hsub: hsub, getter: getter, cstore: wrappedDS, - jobsCh: make(chan *catchUpJob), + jobsCh: make(chan *catchUpJob, 16), } } @@ -73,8 +72,8 @@ func (d *DASer) Start(context.Context) error { d.cancel = cancel d.sampleDn, d.catchUpDn = make(chan struct{}), make(chan struct{}) - // kick off catch-up routine scheduler - go d.catchUpScheduler(dasCtx, checkpoint) + // kick off catch-up routine manager + go d.catchUpManager(dasCtx, checkpoint) // kick off sampling routine for recently received headers go d.sample(dasCtx, sub, checkpoint) return nil @@ -159,14 +158,10 @@ type catchUpJob struct { from, to int64 } -// catchUpScheduler spawns and manages catch-up jobs, exiting only once all jobs -// are complete and the last known DASing checkpoint has been stored to disk. -func (d *DASer) catchUpScheduler(ctx context.Context, checkpoint int64) { - wg := sync.WaitGroup{} - +// catchUpManager manages catch-up jobs, performing them one at a time, exiting +// only once context is canceled and storing latest DASed checkpoint to disk. +func (d *DASer) catchUpManager(ctx context.Context, checkpoint int64) { defer func() { - // wait for all catch-up jobs to finish - wg.Wait() // store latest DASed checkpoint to disk here to ensure that if DASer is not yet // fully caught up to network head, it will resume DASing from this checkpoint // up to current network head @@ -184,23 +179,26 @@ func (d *DASer) catchUpScheduler(ctx context.Context, checkpoint int64) { case <-ctx.Done(): return case job := <-d.jobsCh: - // spawn catchUp routine - wg.Add(1) - go func() { - defer wg.Done() - - d.catchUp(ctx, job) - // TODO @renaynay: assumption that each subsequent job is to a higher height than the previous. - // I don't see why that is *not* the case, though. - checkpoint = job.to - }() + // perform catchUp routine + height, err := d.catchUp(ctx, job) + // update checkpoint before checking error as the height + // value from catchUp represents the last successfully DASed height, + // regardless of the routine's success + checkpoint = height + // exit routine if a catch-up job was unsuccessful + if err != nil { + log.Errorw("catch-up routine failed", "attempted range (from, to)", job.from, + job.to, "last successfully sampled height", height) + log.Warn("IN ORDER TO CONTINUE SAMPLING OVER PAST HEADERS, RE-START THE NODE") + return + } } } } // catchUp starts a sampling routine for headers starting at the next header // after the `from` height and exits the loop once `to` is reached. (from:to] -func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) { +func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) (int64, error) { routineStartTime := time.Now() log.Infow("sampling past headers", "from", job.from, "to", job.to) @@ -210,11 +208,14 @@ func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) { h, err := d.getter.GetByHeight(ctx, uint64(height)) if err != nil { if err == context.Canceled { - return + // report previous height as the last successfully sampled height and + // error as nil since the routine was ordered to stop + return height - 1, nil } log.Errorw("failed to get next header", "height", height, "err", err) - return + // report previous height as the last successfully sampled height + return height - 1, err } startTime := time.Now() @@ -222,7 +223,9 @@ func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) { err = d.da.SharesAvailable(ctx, h.DAH) if err != nil { if err == context.Canceled { - return + // report previous height as the last successfully sampled height and + // error as nil since the routine was ordered to stop + return height - 1, nil } log.Errorw("sampling failed", "height", h.Height, "hash", h.Hash(), "square width", len(h.DAH.RowsRoots), "data root", h.DAH.Hash(), "err", err) @@ -236,4 +239,6 @@ func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) { log.Infow("successfully caught up", "from", job.from, "to", job.to, "finished (s)", time.Since(routineStartTime)) + // report successful result + return job.to, nil } diff --git a/das/daser_test.go b/das/daser_test.go index dfd004cb3a..c4ac8d6d46 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -17,6 +17,8 @@ import ( "github.com/celestiaorg/celestia-node/service/share" ) +var timeout = time.Second * 15 + // TestDASerLifecycle tests to ensure every mock block is DASed and // the DASer checkpoint is updated to network head. func TestDASerLifecycle(t *testing.T) { @@ -26,7 +28,7 @@ func TestDASerLifecycle(t *testing.T) { // 15 headers from the past and 15 future headers mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), timeout) t.Cleanup(cancel) daser := NewDASer(shareServ, sub, mockGet, ds) @@ -41,6 +43,12 @@ func TestDASerLifecycle(t *testing.T) { case <-mockGet.doneCh: } + // give catch-up routine a second to finish up sampling last header + // TODO @renaynay: this sleep is a known flakey solution to the issue that + // we do not have DASState implemented yet. Once DASState is implemented, rely + // on that instead of sleeping. + time.Sleep(time.Second * 1) + err = daser.Stop(ctx) require.NoError(t, err) @@ -58,7 +66,7 @@ func TestDASer_Restart(t *testing.T) { // 15 headers from the past and 15 future headers mockGet, shareServ, sub := createDASerSubcomponents(t, dag, 15, 15) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel := context.WithTimeout(context.Background(), timeout) t.Cleanup(cancel) daser := NewDASer(shareServ, sub, mockGet, ds) @@ -85,7 +93,7 @@ func TestDASer_Restart(t *testing.T) { mockGet.head = int64(45) // restart DASer with new context - restartCtx, restartCancel := context.WithTimeout(context.Background(), time.Second*15) + restartCtx, restartCancel := context.WithTimeout(context.Background(), timeout) t.Cleanup(restartCancel) err = daser.Start(restartCtx) @@ -98,6 +106,12 @@ func TestDASer_Restart(t *testing.T) { case <-mockGet.doneCh: } + // give catch-up routine a second to finish up sampling last header + // TODO @renaynay: this sleep is a known flakey solution to the issue that + // we do not have DASState implemented yet. Once DASState is implemented, rely + // on that instead of sleeping. + time.Sleep(time.Second * 1) + err = daser.Stop(restartCtx) require.NoError(t, err) @@ -119,18 +133,32 @@ func TestDASer_catchUp(t *testing.T) { daser := NewDASer(shareServ, nil, mockGet, ds) + type catchUpResult struct { + checkpoint int64 + err error + } + resultCh := make(chan *catchUpResult, 1) + wg := &sync.WaitGroup{} wg.Add(1) - go func(wg *sync.WaitGroup) { + go func() { + defer wg.Done() // catch up from height 2 to head job := &catchUpJob{ from: 2, to: mockGet.head, } - daser.catchUp(ctx, job) - wg.Done() - }(wg) + checkpt, err := daser.catchUp(ctx, job) + resultCh <- &catchUpResult{ + checkpoint: checkpt, + err: err, + } + }() wg.Wait() + + result := <-resultCh + assert.Equal(t, mockGet.head, result.checkpoint) + require.NoError(t, result.err) } // TestDASer_catchUp_oneHeader tests that catchUp works with a from-to @@ -152,17 +180,31 @@ func TestDASer_catchUp_oneHeader(t *testing.T) { checkpoint, err := loadCheckpoint(daser.cstore) require.NoError(t, err) + type catchUpResult struct { + checkpoint int64 + err error + } + resultCh := make(chan *catchUpResult, 1) + wg := &sync.WaitGroup{} wg.Add(1) - go func(wg *sync.WaitGroup) { + go func() { + defer wg.Done() job := &catchUpJob{ from: checkpoint, to: mockGet.head, } - daser.catchUp(ctx, job) - wg.Done() - }(wg) + checkpt, err := daser.catchUp(ctx, job) + resultCh <- &catchUpResult{ + checkpoint: checkpt, + err: err, + } + }() wg.Wait() + + result := <-resultCh + assert.Equal(t, mockGet.head, result.checkpoint) + require.NoError(t, result.err) } // createDASerSubcomponents takes numGetter (number of headers