From 55d198fe32faf83acab201a5325ddb15a93ba018 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Tue, 15 Mar 2022 16:44:45 +0000 Subject: [PATCH] feature: Improve management of catch-up threads and prevent gaps in DAS history (#20) --- das/daser.go | 96 ++++++++++++++++++++++++++++++++++------------- das/daser_test.go | 40 ++++++++++++++------ 2 files changed, 98 insertions(+), 38 deletions(-) diff --git a/das/daser.go b/das/daser.go index 41e2175840..02f48ec3fc 100644 --- a/das/daser.go +++ b/das/daser.go @@ -3,6 +3,7 @@ package das import ( "context" "fmt" + "sync" "time" "github.com/ipfs/go-datastore" @@ -39,12 +40,10 @@ func NewDASer( ) *DASer { wrappedDS := wrapCheckpointStore(cstore) return &DASer{ - da: da, - hsub: hsub, - getter: getter, - cstore: wrappedDS, - sampleDn: make(chan struct{}), - catchUpDn: make(chan struct{}), + da: da, + hsub: hsub, + getter: getter, + cstore: wrappedDS, } } @@ -67,10 +66,10 @@ func (d *DASer) Start(context.Context) error { log.Infow("loaded latest DASed checkpoint", "height", checkpoint) dasCtx, cancel := context.WithCancel(context.Background()) + d.cancel = cancel + d.sampleDn, d.catchUpDn = make(chan struct{}), make(chan struct{}) go d.sample(dasCtx, sub, checkpoint) - - d.cancel = cancel return nil } @@ -81,7 +80,9 @@ func (d *DASer) Stop(ctx context.Context) error { for i := 0; i < 2; i++ { select { case <-d.catchUpDn: + d.catchUpDn = nil case <-d.sampleDn: + d.sampleDn = nil case <-ctx.Done(): return ctx.Err() } @@ -93,18 +94,14 @@ func (d *DASer) Stop(ctx context.Context) error { // sample validates availability for each Header received from header subscription. func (d *DASer) sample(ctx context.Context, sub header.Subscription, checkpoint int64) { defer func() { - // store latest DASed checkpoint to disk - // TODO @renaynay: what sample DASes [100:150] and - // stores latest checkpoint to disk as network head (150) - // but catchUp routine has only sampled from [1:40] so there is a gap - // missing from (40: 100)? - if err := storeCheckpoint(d.cstore, checkpoint); err != nil { - log.Errorw("storing latest DASed checkpoint to disk", "height", checkpoint, "err", err) - } sub.Cancel() close(d.sampleDn) }() + // kick off catchUpScheduler + jobsCh := make(chan *catchUpJob) + go d.catchUpScheduler(ctx, jobsCh, checkpoint) + for { h, err := sub.NextHeader(ctx) if err != nil { @@ -120,10 +117,13 @@ func (d *DASer) sample(ctx context.Context, sub header.Subscription, checkpoint // to our last DASed header, kick off routine to DAS all headers // between last DASed header and h. This situation could occur // either on start or due to network latency/disconnection. - if h.Height > (checkpoint + 1) { + if h.Height > checkpoint+1 { // DAS headers between last DASed height up to the current // header - go d.catchUp(ctx, checkpoint, h.Height-1) + jobsCh <- &catchUpJob{ + from: checkpoint, + to: h.Height - 1, + } } startTime := time.Now() @@ -146,17 +146,59 @@ func (d *DASer) sample(ctx context.Context, sub header.Subscription, checkpoint } } -// catchUp starts a sampling routine for headers starting at the `from` -// height and exits the loop once `to` is reached. (from:to] -func (d *DASer) catchUp(ctx context.Context, from, to int64) { - defer close(d.catchUpDn) +// catchUpJob represents a catch-up job. (from:to] +type catchUpJob struct { + from, to int64 +} + +// catchUpScheduler spawns and manages catch-up jobs, exiting only once all jobs +// are complete and the last known DASing checkpoint has been stored to disk. +func (d *DASer) catchUpScheduler(ctx context.Context, jobsCh chan *catchUpJob, checkpoint int64) { + wg := sync.WaitGroup{} + + defer func() { + // wait for all catch-up jobs to finish + wg.Wait() + // store latest DASed checkpoint to disk here to ensure that if DASer is not yet + // fully caught up to network head, it will resume DASing from this checkpoint + // up to current network head + // TODO @renaynay: Implement Share Cache #180 to ensure no duplicate DASing over same + // header + if err := storeCheckpoint(d.cstore, checkpoint); err != nil { + log.Errorw("da: storing latest DASed checkpoint to disk", "height", checkpoint, "err", err) + } + // signal that all catchUp routines have finished + close(d.catchUpDn) + }() + + for { + select { + case <-ctx.Done(): + return + case job := <-jobsCh: + // spawn catchUp routine + wg.Add(1) + go func() { + defer wg.Done() + + d.catchUp(ctx, job) + // TODO @renaynay: assumption that each subsequent job is to a higher height than the previous. + // I don't see why that is *not* the case, though. + checkpoint = job.to + }() + } + } +} +// catchUp starts a sampling routine for headers starting at the next header +// after the `from` height and exits the loop once `to` is reached. (from:to] +func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) { routineStartTime := time.Now() - log.Infow("starting sample routine", "from", from, "to", to) + log.Infow("starting sample routine", "from", job.from, "to", job.to) // start sampling from height at checkpoint+1 since the - // checkpoint height has already been successfully DASed - for height := from + 1; height <= to; height++ { + // checkpoint height is DASed by broader sample routine + for height := job.from + 1; height <= job.to; height++ { h, err := d.getter.GetByHeight(ctx, uint64(height)) if err != nil { if err == context.Canceled { @@ -185,6 +227,6 @@ func (d *DASer) catchUp(ctx context.Context, from, to int64) { } totalElapsedTime := time.Since(routineStartTime) - log.Infow("successfully sampled all headers up to network head", "from", from, - "to", to, "finished (s)", totalElapsedTime.Seconds()) + log.Infow("successfully sampled all headers up to network head", "from", job.from, + "to", job.to, "finished (s)", totalElapsedTime.Seconds()) } diff --git a/das/daser_test.go b/das/daser_test.go index 7ef4ba77c3..686633868d 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -21,6 +21,7 @@ import ( func TestDASerLifecycle(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + // 15 headers from the past and 15 future headers mockGet, shareServ, sub := createDASerSubcomponents(t, 15, 15) ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) @@ -31,20 +32,21 @@ func TestDASerLifecycle(t *testing.T) { err := daser.Start(ctx) require.NoError(t, err) + // wait for dasing catch-up routine to finish select { - // wait for sample routine to finish so that it stores the - // latest DASed checkpoint so that it stores the latest DASed checkpoint - case <-daser.sampleDn: case <-ctx.Done(): + t.Fatal(ctx.Err()) + case <-mockGet.doneCh: } - // load checkpoint and ensure it's at network head - checkpoint, err := loadCheckpoint(daser.cstore) + err = daser.Stop(ctx) require.NoError(t, err) - assert.Equal(t, int64(30), checkpoint) - err = daser.Stop(ctx) + // load checkpoint and ensure it's at network head + checkpoint, err := loadCheckpoint(daser.cstore) require.NoError(t, err) + // ensure checkpoint is stored at 15 + assert.Equal(t, int64(15), checkpoint) } func TestDASer_catchUp(t *testing.T) { @@ -62,7 +64,11 @@ func TestDASer_catchUp(t *testing.T) { wg.Add(1) go func(wg *sync.WaitGroup) { // catch up from height 2 to head - daser.catchUp(ctx, 2, mockGet.head) + job := &catchUpJob{ + from: 2, + to: mockGet.head, + } + daser.catchUp(ctx, job) wg.Done() }(wg) wg.Wait() @@ -90,7 +96,11 @@ func TestDASer_catchUp_oneHeader(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func(wg *sync.WaitGroup) { - daser.catchUp(ctx, checkpoint, mockGet.head) + job := &catchUpJob{ + from: checkpoint, + to: mockGet.head, + } + daser.catchUp(ctx, job) wg.Done() }(wg) wg.Wait() @@ -106,6 +116,7 @@ func createDASerSubcomponents(t *testing.T, numGetter, numSub int) (*mockGetter, mockGet := &mockGetter{ headers: make(map[int64]*header.ExtendedHeader), + doneCh: make(chan struct{}), } // generate 15 headers from the past for HeaderGetter @@ -143,14 +154,21 @@ func createDASerSubcomponents(t *testing.T, numGetter, numSub int) (*mockGetter, } type mockGetter struct { + doneCh chan struct{} // signals all stored headers have been retrieved + head int64 headers map[int64]*header.ExtendedHeader } -func (m mockGetter) Head(context.Context) (*header.ExtendedHeader, error) { +func (m *mockGetter) Head(context.Context) (*header.ExtendedHeader, error) { return m.headers[m.head], nil } -func (m mockGetter) GetByHeight(_ context.Context, height uint64) (*header.ExtendedHeader, error) { +func (m *mockGetter) GetByHeight(_ context.Context, height uint64) (*header.ExtendedHeader, error) { + defer func() { + if int64(height) == m.head { + close(m.doneCh) + } + }() return m.headers[int64(height)], nil }