diff --git a/das/checkpoint_store.go b/das/checkpoint_store.go index ced4e818de..6e182ce678 100644 --- a/das/checkpoint_store.go +++ b/das/checkpoint_store.go @@ -2,7 +2,6 @@ package das import ( "encoding/binary" - "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" ) @@ -39,8 +38,6 @@ func loadCheckpoint(ds datastore.Datastore) (int64, error) { return int64(binary.BigEndian.Uint64(checkpoint)), err } -// TODO @renaynay: document - // storeCheckpoint stores the given DAS checkpoint to disk. func storeCheckpoint(ds datastore.Datastore, checkpoint int64) error { buf := make([]byte, 8) @@ -48,4 +45,3 @@ func storeCheckpoint(ds datastore.Datastore, checkpoint int64) error { return ds.Put(checkpointKey, buf) } - diff --git a/das/daser.go b/das/daser.go index 1a66f14776..6f49f3a4bd 100644 --- a/das/daser.go +++ b/das/daser.go @@ -66,7 +66,8 @@ func (d *DASer) Start(ctx context.Context) error { } log.Infow("loaded latest DASed checkpoint", "height", checkpoint) - // load current network head // TODO @renaynay: do we have to sample over netHead as well or will that be handled by sampleLatest via hsub? + // load current network head + // TODO @renaynay: do we have to sample over netHead as well or will that be handled by sampleLatest via hsub? netHead, err := d.getter.Head(ctx) if err != nil { return err @@ -93,7 +94,8 @@ func (d *DASer) Stop(ctx context.Context) error { // wait for both sampling routines to exit for i := 0; i < 2; i++ { select { - case <-d.sampleCheckpointDn: // TODO @renaynay: check to ensure if sampleCheckpoint already done when stop called, then still works + // TODO @renaynay: check to ensure if sampleCheckpoint already done when stop called, then still works + case <-d.sampleCheckpointDn: continue case <-d.sampleLatestDn: continue @@ -159,6 +161,10 @@ func (d *DASer) sampleLatest(ctx context.Context, sub header.Subscription, check defer func() { // store latest DASed checkpoint to disk + // TODO @renaynay: what sampleLatest DASes [100:150] and + // stores latest checkpoint to disk as network head (150) + // but sampleFromCheckpoint routine has only sampled from [1:40] so there is a gap + // missing from (40: 100)? if err := storeCheckpoint(d.ds, height); err != nil { log.Errorw("storing latest DASed checkpoint to disk", "height", height, "err", err) } diff --git a/das/daser_test.go b/das/daser_test.go index 219b06f948..fca29b875e 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -2,17 +2,52 @@ package das import ( "context" + "sync" + "testing" + "time" + "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" mdutils "github.com/ipfs/go-merkledag/test" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "sync" - "testing" "github.com/celestiaorg/celestia-node/service/header" "github.com/celestiaorg/celestia-node/service/share" ) +// TestDASerLifecycle tests to ensure every mock block is DASed and +// the DASer checkpoint is updated to network head. +func TestDASerLifecycle(t *testing.T) { + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + cstore := NewCheckpointStore(ds) // we aren't storing the checkpoint so that DASer starts DASing from height 1. + + mockGet, shareServ, sub := createDASerSubcomponents(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + t.Cleanup(cancel) + + daser := NewDASer(shareServ, sub, mockGet, cstore) + + err := daser.Start(ctx) + require.NoError(t, err) + + select { + // wait for sampleLatest routine to finish so that it stores the + // latest DASed checkpoint so that it stores the latest DASed checkpoint + case <-daser.sampleLatestDn: + case <-ctx.Done(): + } + + // load checkpoint and ensure it's at network head + checkpoint, err := loadCheckpoint(cstore) + require.NoError(t, err) + assert.Equal(t, int64(30), checkpoint) + + err = daser.Stop(ctx) + require.NoError(t, err) +} + func TestDASer_sampleLatest(t *testing.T) { shareServ, dah := share.RandLightServiceWithSquare(t, 16) @@ -43,7 +78,7 @@ func TestDASer_sampleCheckpoint(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) cstore := NewCheckpointStore(ds) - mockGet, shareServ := fillMockGetterAndShareServ(t) + mockGet, shareServ, _ := createDASerSubcomponents(t) // store checkpoint err := storeCheckpoint(cstore, 2) // pick random header as last checkpoint @@ -65,7 +100,7 @@ func TestDASer_sampleCheckpoint(t *testing.T) { wg.Wait() } -func fillMockGetterAndShareServ(t *testing.T) (*mockGetter, share.Service) { +func createDASerSubcomponents(t *testing.T) (*mockGetter, share.Service, header.Subscriber) { dag := mdutils.Mock() shareServ := share.NewService(dag, share.NewLightAvailability(dag)) @@ -73,8 +108,26 @@ func fillMockGetterAndShareServ(t *testing.T) (*mockGetter, share.Service) { headers: make(map[int64]*header.ExtendedHeader), } - // generate 15 headers - for i := 0; i < 14; i++ { + // generate 15 headers from the past for HeaderGetter + for i := 0; i < 15; i++ { + dah := share.RandFillDAG(t, 16, dag) + + randHeader := header.RandExtendedHeader(t) + randHeader.DataHash = dah.Hash() + randHeader.DAH = dah + randHeader.Height = int64(i + 1) + + mockGet.headers[int64(i+1)] = randHeader + } + mockGet.head = 15 // network head + + sub := &header.DummySubscriber{ + Headers: make([]*header.ExtendedHeader, 15), + } + + // generate 15 headers from the future for p2pSub to pipe through to DASer + index := 0 + for i := 15; i < 30; i++ { dah := share.RandFillDAG(t, 16, dag) randHeader := header.RandExtendedHeader(t) @@ -82,11 +135,11 @@ func fillMockGetterAndShareServ(t *testing.T) (*mockGetter, share.Service) { randHeader.DAH = dah randHeader.Height = int64(i + 1) - mockGet.headers[int64(i + 1)] = randHeader + sub.Headers[index] = randHeader + index++ } - mockGet.head = 15 - return mockGet, shareServ + return mockGet, shareServ, sub } type mockGetter struct { diff --git a/das/interface.go b/das/interface.go index c182cabc0b..6bf8b07d89 100644 --- a/das/interface.go +++ b/das/interface.go @@ -2,6 +2,7 @@ package das import ( "context" + "github.com/celestiaorg/celestia-node/service/header" ) diff --git a/service/header/testing.go b/service/header/testing.go index a68664826d..70157d60d1 100644 --- a/service/header/testing.go +++ b/service/header/testing.go @@ -228,7 +228,13 @@ func (mhs *DummySubscriber) Subscribe() (Subscription, error) { func (mhs *DummySubscriber) NextHeader(ctx context.Context) (*ExtendedHeader, error) { defer func() { - mhs.Headers = make([]*ExtendedHeader, 0) + if len(mhs.Headers) > 1 { + // pop the already-returned header + cp := mhs.Headers + mhs.Headers = cp[1:] + } else { + mhs.Headers = make([]*ExtendedHeader, 0) + } }() if len(mhs.Headers) == 0 { return nil, context.Canceled