Skip to content

Commit

Permalink
test: add lifecycle test for daser
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay committed Feb 23, 2022
1 parent 314bae0 commit a034dbe
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 16 deletions.
4 changes: 0 additions & 4 deletions das/checkpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package das

import (
"encoding/binary"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
)
Expand Down Expand Up @@ -39,13 +38,10 @@ func loadCheckpoint(ds datastore.Datastore) (int64, error) {
return int64(binary.BigEndian.Uint64(checkpoint)), err
}

// TODO @renaynay: document

// storeCheckpoint stores the given DAS checkpoint to disk.
func storeCheckpoint(ds datastore.Datastore, checkpoint int64) error {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(checkpoint))

return ds.Put(checkpointKey, buf)
}

10 changes: 8 additions & 2 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func (d *DASer) Start(ctx context.Context) error {
}
log.Infow("loaded latest DASed checkpoint", "height", checkpoint)

// load current network head // TODO @renaynay: do we have to sample over netHead as well or will that be handled by sampleLatest via hsub?
// load current network head
// TODO @renaynay: do we have to sample over netHead as well or will that be handled by sampleLatest via hsub?
netHead, err := d.getter.Head(ctx)
if err != nil {
return err
Expand All @@ -93,7 +94,8 @@ func (d *DASer) Stop(ctx context.Context) error {
// wait for both sampling routines to exit
for i := 0; i < 2; i++ {
select {
case <-d.sampleCheckpointDn: // TODO @renaynay: check to ensure if sampleCheckpoint already done when stop called, then still works
// TODO @renaynay: check to ensure if sampleCheckpoint already done when stop called, then still works
case <-d.sampleCheckpointDn:
continue
case <-d.sampleLatestDn:
continue
Expand Down Expand Up @@ -159,6 +161,10 @@ func (d *DASer) sampleLatest(ctx context.Context, sub header.Subscription, check

defer func() {
// store latest DASed checkpoint to disk
// TODO @renaynay: what sampleLatest DASes [100:150] and
// stores latest checkpoint to disk as network head (150)
// but sampleFromCheckpoint routine has only sampled from [1:40] so there is a gap
// missing from (40: 100)?
if err := storeCheckpoint(d.ds, height); err != nil {
log.Errorw("storing latest DASed checkpoint to disk", "height", height, "err", err)
}
Expand Down
71 changes: 62 additions & 9 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,52 @@ package das

import (
"context"
"sync"
"testing"
"time"

"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
mdutils "github.com/ipfs/go-merkledag/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sync"
"testing"

"github.com/celestiaorg/celestia-node/service/header"
"github.com/celestiaorg/celestia-node/service/share"
)

// TestDASerLifecycle tests to ensure every mock block is DASed and
// the DASer checkpoint is updated to network head.
func TestDASerLifecycle(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
cstore := NewCheckpointStore(ds) // we aren't storing the checkpoint so that DASer starts DASing from height 1.

mockGet, shareServ, sub := createDASerSubcomponents(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
t.Cleanup(cancel)

daser := NewDASer(shareServ, sub, mockGet, cstore)

err := daser.Start(ctx)
require.NoError(t, err)

select {
// wait for sampleLatest routine to finish so that it stores the
// latest DASed checkpoint so that it stores the latest DASed checkpoint
case <-daser.sampleLatestDn:
case <-ctx.Done():
}

// load checkpoint and ensure it's at network head
checkpoint, err := loadCheckpoint(cstore)
require.NoError(t, err)
assert.Equal(t, int64(30), checkpoint)

err = daser.Stop(ctx)
require.NoError(t, err)
}

func TestDASer_sampleLatest(t *testing.T) {
shareServ, dah := share.RandLightServiceWithSquare(t, 16)

Expand Down Expand Up @@ -43,7 +78,7 @@ func TestDASer_sampleCheckpoint(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
cstore := NewCheckpointStore(ds)

mockGet, shareServ := fillMockGetterAndShareServ(t)
mockGet, shareServ, _ := createDASerSubcomponents(t)

// store checkpoint
err := storeCheckpoint(cstore, 2) // pick random header as last checkpoint
Expand All @@ -65,28 +100,46 @@ func TestDASer_sampleCheckpoint(t *testing.T) {
wg.Wait()
}

func fillMockGetterAndShareServ(t *testing.T) (*mockGetter, share.Service) {
func createDASerSubcomponents(t *testing.T) (*mockGetter, share.Service, header.Subscriber) {
dag := mdutils.Mock()
shareServ := share.NewService(dag, share.NewLightAvailability(dag))

mockGet := &mockGetter{
headers: make(map[int64]*header.ExtendedHeader),
}

// generate 15 headers
for i := 0; i < 14; i++ {
// generate 15 headers from the past for HeaderGetter
for i := 0; i < 15; i++ {
dah := share.RandFillDAG(t, 16, dag)

randHeader := header.RandExtendedHeader(t)
randHeader.DataHash = dah.Hash()
randHeader.DAH = dah
randHeader.Height = int64(i + 1)

mockGet.headers[int64(i+1)] = randHeader
}
mockGet.head = 15 // network head

sub := &header.DummySubscriber{
Headers: make([]*header.ExtendedHeader, 15),
}

// generate 15 headers from the future for p2pSub to pipe through to DASer
index := 0
for i := 15; i < 30; i++ {
dah := share.RandFillDAG(t, 16, dag)

randHeader := header.RandExtendedHeader(t)
randHeader.DataHash = dah.Hash()
randHeader.DAH = dah
randHeader.Height = int64(i + 1)

mockGet.headers[int64(i + 1)] = randHeader
sub.Headers[index] = randHeader
index++
}
mockGet.head = 15

return mockGet, shareServ
return mockGet, shareServ, sub
}

type mockGetter struct {
Expand Down
1 change: 1 addition & 0 deletions das/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package das

import (
"context"

"github.com/celestiaorg/celestia-node/service/header"
)

Expand Down
8 changes: 7 additions & 1 deletion service/header/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ func (mhs *DummySubscriber) Subscribe() (Subscription, error) {

func (mhs *DummySubscriber) NextHeader(ctx context.Context) (*ExtendedHeader, error) {
defer func() {
mhs.Headers = make([]*ExtendedHeader, 0)
if len(mhs.Headers) > 1 {
// pop the already-returned header
cp := mhs.Headers
mhs.Headers = cp[1:]
} else {
mhs.Headers = make([]*ExtendedHeader, 0)
}
}()
if len(mhs.Headers) == 0 {
return nil, context.Canceled
Expand Down

0 comments on commit a034dbe

Please sign in to comment.