Skip to content

Commit

Permalink
feature: Improve management of catch-up threads and prevent gaps in D…
Browse files Browse the repository at this point in the history
…AS history (#20)
  • Loading branch information
renaynay authored Mar 15, 2022
1 parent 6b9723e commit 55d198f
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 38 deletions.
96 changes: 69 additions & 27 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package das
import (
"context"
"fmt"
"sync"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -39,12 +40,10 @@ func NewDASer(
) *DASer {
wrappedDS := wrapCheckpointStore(cstore)
return &DASer{
da: da,
hsub: hsub,
getter: getter,
cstore: wrappedDS,
sampleDn: make(chan struct{}),
catchUpDn: make(chan struct{}),
da: da,
hsub: hsub,
getter: getter,
cstore: wrappedDS,
}
}

Expand All @@ -67,10 +66,10 @@ func (d *DASer) Start(context.Context) error {
log.Infow("loaded latest DASed checkpoint", "height", checkpoint)

dasCtx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
d.sampleDn, d.catchUpDn = make(chan struct{}), make(chan struct{})

go d.sample(dasCtx, sub, checkpoint)

d.cancel = cancel
return nil
}

Expand All @@ -81,7 +80,9 @@ func (d *DASer) Stop(ctx context.Context) error {
for i := 0; i < 2; i++ {
select {
case <-d.catchUpDn:
d.catchUpDn = nil
case <-d.sampleDn:
d.sampleDn = nil
case <-ctx.Done():
return ctx.Err()
}
Expand All @@ -93,18 +94,14 @@ func (d *DASer) Stop(ctx context.Context) error {
// sample validates availability for each Header received from header subscription.
func (d *DASer) sample(ctx context.Context, sub header.Subscription, checkpoint int64) {
defer func() {
// store latest DASed checkpoint to disk
// TODO @renaynay: what sample DASes [100:150] and
// stores latest checkpoint to disk as network head (150)
// but catchUp routine has only sampled from [1:40] so there is a gap
// missing from (40: 100)?
if err := storeCheckpoint(d.cstore, checkpoint); err != nil {
log.Errorw("storing latest DASed checkpoint to disk", "height", checkpoint, "err", err)
}
sub.Cancel()
close(d.sampleDn)
}()

// kick off catchUpScheduler
jobsCh := make(chan *catchUpJob)
go d.catchUpScheduler(ctx, jobsCh, checkpoint)

for {
h, err := sub.NextHeader(ctx)
if err != nil {
Expand All @@ -120,10 +117,13 @@ func (d *DASer) sample(ctx context.Context, sub header.Subscription, checkpoint
// to our last DASed header, kick off routine to DAS all headers
// between last DASed header and h. This situation could occur
// either on start or due to network latency/disconnection.
if h.Height > (checkpoint + 1) {
if h.Height > checkpoint+1 {
// DAS headers between last DASed height up to the current
// header
go d.catchUp(ctx, checkpoint, h.Height-1)
jobsCh <- &catchUpJob{
from: checkpoint,
to: h.Height - 1,
}
}

startTime := time.Now()
Expand All @@ -146,17 +146,59 @@ func (d *DASer) sample(ctx context.Context, sub header.Subscription, checkpoint
}
}

// catchUp starts a sampling routine for headers starting at the `from`
// height and exits the loop once `to` is reached. (from:to]
func (d *DASer) catchUp(ctx context.Context, from, to int64) {
defer close(d.catchUpDn)
// catchUpJob represents a catch-up job. (from:to]
type catchUpJob struct {
from, to int64
}

// catchUpScheduler spawns and manages catch-up jobs, exiting only once all jobs
// are complete and the last known DASing checkpoint has been stored to disk.
func (d *DASer) catchUpScheduler(ctx context.Context, jobsCh chan *catchUpJob, checkpoint int64) {
wg := sync.WaitGroup{}

defer func() {
// wait for all catch-up jobs to finish
wg.Wait()
// store latest DASed checkpoint to disk here to ensure that if DASer is not yet
// fully caught up to network head, it will resume DASing from this checkpoint
// up to current network head
// TODO @renaynay: Implement Share Cache #180 to ensure no duplicate DASing over same
// header
if err := storeCheckpoint(d.cstore, checkpoint); err != nil {
log.Errorw("da: storing latest DASed checkpoint to disk", "height", checkpoint, "err", err)
}
// signal that all catchUp routines have finished
close(d.catchUpDn)
}()

for {
select {
case <-ctx.Done():
return
case job := <-jobsCh:
// spawn catchUp routine
wg.Add(1)
go func() {
defer wg.Done()

d.catchUp(ctx, job)
// TODO @renaynay: assumption that each subsequent job is to a higher height than the previous.
// I don't see why that is *not* the case, though.
checkpoint = job.to
}()
}
}
}

// catchUp starts a sampling routine for headers starting at the next header
// after the `from` height and exits the loop once `to` is reached. (from:to]
func (d *DASer) catchUp(ctx context.Context, job *catchUpJob) {
routineStartTime := time.Now()
log.Infow("starting sample routine", "from", from, "to", to)
log.Infow("starting sample routine", "from", job.from, "to", job.to)

// start sampling from height at checkpoint+1 since the
// checkpoint height has already been successfully DASed
for height := from + 1; height <= to; height++ {
// checkpoint height is DASed by broader sample routine
for height := job.from + 1; height <= job.to; height++ {
h, err := d.getter.GetByHeight(ctx, uint64(height))
if err != nil {
if err == context.Canceled {
Expand Down Expand Up @@ -185,6 +227,6 @@ func (d *DASer) catchUp(ctx context.Context, from, to int64) {
}

totalElapsedTime := time.Since(routineStartTime)
log.Infow("successfully sampled all headers up to network head", "from", from,
"to", to, "finished (s)", totalElapsedTime.Seconds())
log.Infow("successfully sampled all headers up to network head", "from", job.from,
"to", job.to, "finished (s)", totalElapsedTime.Seconds())
}
40 changes: 29 additions & 11 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
func TestDASerLifecycle(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

// 15 headers from the past and 15 future headers
mockGet, shareServ, sub := createDASerSubcomponents(t, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
Expand All @@ -31,20 +32,21 @@ func TestDASerLifecycle(t *testing.T) {
err := daser.Start(ctx)
require.NoError(t, err)

// wait for dasing catch-up routine to finish
select {
// wait for sample routine to finish so that it stores the
// latest DASed checkpoint so that it stores the latest DASed checkpoint
case <-daser.sampleDn:
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-mockGet.doneCh:
}

// load checkpoint and ensure it's at network head
checkpoint, err := loadCheckpoint(daser.cstore)
err = daser.Stop(ctx)
require.NoError(t, err)
assert.Equal(t, int64(30), checkpoint)

err = daser.Stop(ctx)
// load checkpoint and ensure it's at network head
checkpoint, err := loadCheckpoint(daser.cstore)
require.NoError(t, err)
// ensure checkpoint is stored at 15
assert.Equal(t, int64(15), checkpoint)
}

func TestDASer_catchUp(t *testing.T) {
Expand All @@ -62,7 +64,11 @@ func TestDASer_catchUp(t *testing.T) {
wg.Add(1)
go func(wg *sync.WaitGroup) {
// catch up from height 2 to head
daser.catchUp(ctx, 2, mockGet.head)
job := &catchUpJob{
from: 2,
to: mockGet.head,
}
daser.catchUp(ctx, job)
wg.Done()
}(wg)
wg.Wait()
Expand Down Expand Up @@ -90,7 +96,11 @@ func TestDASer_catchUp_oneHeader(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func(wg *sync.WaitGroup) {
daser.catchUp(ctx, checkpoint, mockGet.head)
job := &catchUpJob{
from: checkpoint,
to: mockGet.head,
}
daser.catchUp(ctx, job)
wg.Done()
}(wg)
wg.Wait()
Expand All @@ -106,6 +116,7 @@ func createDASerSubcomponents(t *testing.T, numGetter, numSub int) (*mockGetter,

mockGet := &mockGetter{
headers: make(map[int64]*header.ExtendedHeader),
doneCh: make(chan struct{}),
}

// generate 15 headers from the past for HeaderGetter
Expand Down Expand Up @@ -143,14 +154,21 @@ func createDASerSubcomponents(t *testing.T, numGetter, numSub int) (*mockGetter,
}

type mockGetter struct {
doneCh chan struct{} // signals all stored headers have been retrieved

head int64
headers map[int64]*header.ExtendedHeader
}

func (m mockGetter) Head(context.Context) (*header.ExtendedHeader, error) {
func (m *mockGetter) Head(context.Context) (*header.ExtendedHeader, error) {
return m.headers[m.head], nil
}

func (m mockGetter) GetByHeight(_ context.Context, height uint64) (*header.ExtendedHeader, error) {
func (m *mockGetter) GetByHeight(_ context.Context, height uint64) (*header.ExtendedHeader, error) {
defer func() {
if int64(height) == m.head {
close(m.doneCh)
}
}()
return m.headers[int64(height)], nil
}

0 comments on commit 55d198f

Please sign in to comment.