diff --git a/blockservice.go b/blockservice.go index 007d113..8594e25 100644 --- a/blockservice.go +++ b/blockservice.go @@ -12,13 +12,14 @@ import ( "go.opentelemetry.io/otel/trace" blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-blockservice/internal" cid "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" exchange "github.com/ipfs/go-ipfs-exchange-interface" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-verifcid" + + "github.com/ipfs/go-blockservice/internal" ) var logger = logging.Logger("blockservice") @@ -84,7 +85,7 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService { } } -// NewWriteThrough ceates a BlockService that guarantees writes will go +// NewWriteThrough creates a BlockService that guarantees writes will go // through to the blockstore and are not skipped by cache checks. func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService { if rem == nil { @@ -117,21 +118,22 @@ func NewSession(ctx context.Context, bs BlockService) *Session { exch := bs.Exchange() if sessEx, ok := exch.(exchange.SessionExchange); ok { return &Session{ - sessCtx: ctx, - ses: nil, - sessEx: sessEx, - bs: bs.Blockstore(), + sessCtx: ctx, + ses: nil, + sessEx: sessEx, + bs: bs.Blockstore(), + notifier: exch, } } return &Session{ - ses: exch, - sessCtx: ctx, - bs: bs.Blockstore(), + ses: exch, + sessCtx: ctx, + bs: bs.Blockstore(), + notifier: exch, } } // AddBlock adds a particular block to the service, Putting it into the datastore. -// TODO pass a context into this if the remote.HasBlock is going to remain here. func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { ctx, span := internal.StartSpan(ctx, "blockService.AddBlock") defer span.End() @@ -155,8 +157,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { logger.Debugf("BlockService.BlockAdded %s", c) if s.exchange != nil { - if err := s.exchange.HasBlock(ctx, o); err != nil { - logger.Errorf("HasBlock: %s", err.Error()) + if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil { + logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } @@ -200,11 +202,9 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { } if s.exchange != nil { - for _, o := range toput { - logger.Debugf("BlockService.BlockAdded %s", o.Cid()) - if err := s.exchange.HasBlock(ctx, o); err != nil { - logger.Errorf("HasBlock: %s", err.Error()) - } + logger.Debugf("BlockService.BlockAdded %d blocks", len(toput)) + if err := s.exchange.NotifyNewBlocks(ctx, toput...); err != nil { + logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } return nil @@ -216,7 +216,7 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - var f func() exchange.Fetcher + var f func() notifiableFetcher if s.exchange != nil { f = s.getExchange } @@ -224,11 +224,11 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e return getBlock(ctx, c, s.blockstore, f) // hash security } -func (s *blockService) getExchange() exchange.Fetcher { +func (s *blockService) getExchange() notifiableFetcher { return s.exchange } -func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) (blocks.Block, error) { +func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() notifiableFetcher) (blocks.Block, error) { err := verifcid.ValidateCid(c) // hash security if err != nil { return nil, err @@ -249,6 +249,15 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun if err != nil { return nil, err } + // also write in the blockstore for caching, inform the exchange that the block is available + err = bs.Put(ctx, blk) + if err != nil { + return nil, err + } + err = f.NotifyNewBlocks(ctx, blk) + if err != nil { + return nil, err + } logger.Debugf("BlockService.BlockFetched %s", c) return blk, nil } @@ -264,7 +273,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() - var f func() exchange.Fetcher + var f func() notifiableFetcher if s.exchange != nil { f = s.getExchange } @@ -272,7 +281,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block return getBlocks(ctx, ks, s.blockstore, f) // hash security } -func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) <-chan blocks.Block { +func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() notifiableFetcher) <-chan blocks.Block { out := make(chan blocks.Block) go func() { @@ -324,13 +333,53 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget return } - for b := range rblocks { - logger.Debugf("BlockService.BlockFetched %s", b.Cid()) - select { - case out <- b: - case <-ctx.Done(): + // batch available blocks together + const batchSize = 32 + batch := make([]blocks.Block, 0, batchSize) + for { + var noMoreBlocks bool + batchLoop: + for len(batch) < batchSize { + select { + case b, ok := <-rblocks: + if !ok { + noMoreBlocks = true + break batchLoop + } + + logger.Debugf("BlockService.BlockFetched %s", b.Cid()) + batch = append(batch, b) + case <-ctx.Done(): + return + default: + break batchLoop + } + } + + // also write in the blockstore for caching, inform the exchange that the blocks are available + err = bs.PutMany(ctx, batch) + if err != nil { + logger.Errorf("could not write blocks from the network to the blockstore: %s", err) return } + + err = f.NotifyNewBlocks(ctx, batch...) + if err != nil { + logger.Errorf("could not tell the exchange about new blocks: %s", err) + return + } + + for _, b := range batch { + select { + case out <- b: + case <-ctx.Done(): + return + } + } + batch = batch[:0] + if noMoreBlocks { + break + } } }() return out @@ -353,23 +402,53 @@ func (s *blockService) Close() error { return s.exchange.Close() } +type notifier interface { + NotifyNewBlocks(context.Context, ...blocks.Block) error +} + // Session is a helper type to provide higher level access to bitswap sessions type Session struct { - bs blockstore.Blockstore - ses exchange.Fetcher - sessEx exchange.SessionExchange - sessCtx context.Context - lk sync.Mutex + bs blockstore.Blockstore + ses exchange.Fetcher + sessEx exchange.SessionExchange + sessCtx context.Context + notifier notifier + lk sync.Mutex } -func (s *Session) getSession() exchange.Fetcher { +type notifiableFetcher interface { + exchange.Fetcher + notifier +} + +type notifiableFetcherWrapper struct { + exchange.Fetcher + notifier +} + +func (s *Session) getSession() notifiableFetcher { s.lk.Lock() defer s.lk.Unlock() if s.ses == nil { s.ses = s.sessEx.NewSession(s.sessCtx) } - return s.ses + return notifiableFetcherWrapper{s.ses, s.notifier} +} + +func (s *Session) getExchange() notifiableFetcher { + return notifiableFetcherWrapper{s.ses, s.notifier} +} + +func (s *Session) getFetcherFactory() func() notifiableFetcher { + if s.sessEx != nil { + return s.getSession + } + if s.ses != nil { + // Our exchange isn't session compatible, let's fallback to non sessions fetches + return s.getExchange + } + return nil } // GetBlock gets a block in the context of a request session @@ -377,11 +456,7 @@ func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - var f func() exchange.Fetcher - if s.sessEx != nil { - f = s.getSession - } - return getBlock(ctx, c, s.bs, f) // hash security + return getBlock(ctx, c, s.bs, s.getFetcherFactory()) // hash security } // GetBlocks gets blocks in the context of a request session @@ -389,11 +464,7 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo ctx, span := internal.StartSpan(ctx, "Session.GetBlocks") defer span.End() - var f func() exchange.Fetcher - if s.sessEx != nil { - f = s.getSession - } - return getBlocks(ctx, ks, s.bs, f) // hash security + return getBlocks(ctx, ks, s.bs, s.getFetcherFactory()) // hash security } var _ BlockGetter = (*Session)(nil) diff --git a/blockservice_test.go b/blockservice_test.go index c29f0cf..846ae71 100644 --- a/blockservice_test.go +++ b/blockservice_test.go @@ -5,6 +5,7 @@ import ( "testing" blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -19,8 +20,8 @@ func TestWriteThroughWorks(t *testing.T) { blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 0, } - bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - exch := offline.Exchange(bstore2) + exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + exch := offline.Exchange(exchbstore) bserv := NewWriteThrough(bstore, exch) bgen := butil.NewBlockGenerator() @@ -44,6 +45,77 @@ func TestWriteThroughWorks(t *testing.T) { } } +func TestExchangeWrite(t *testing.T) { + bstore := &PutCountingBlockstore{ + blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), + 0, + } + exchbstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + exch := ¬ifyCountingExchange{ + offline.Exchange(exchbstore), + 0, + } + bserv := NewWriteThrough(bstore, exch) + bgen := butil.NewBlockGenerator() + + for name, fetcher := range map[string]BlockGetter{ + "blockservice": bserv, + "session": NewSession(context.Background(), bserv), + } { + t.Run(name, func(t *testing.T) { + // GetBlock + block := bgen.Next() + err := exchbstore.Put(context.Background(), block) + if err != nil { + t.Fatal(err) + } + got, err := fetcher.GetBlock(context.Background(), block.Cid()) + if err != nil { + t.Fatal(err) + } + if got.Cid() != block.Cid() { + t.Fatalf("GetBlock returned unexpected block") + } + if bstore.PutCounter != 1 { + t.Fatalf("expected one Put call, have: %d", bstore.PutCounter) + } + if exch.notifyCount != 1 { + t.Fatalf("expected one NotifyNewBlocks call, have: %d", exch.notifyCount) + } + + // GetBlocks + b1 := bgen.Next() + err = exchbstore.Put(context.Background(), b1) + if err != nil { + t.Fatal(err) + } + b2 := bgen.Next() + err = exchbstore.Put(context.Background(), b2) + if err != nil { + t.Fatal(err) + } + bchan := fetcher.GetBlocks(context.Background(), []cid.Cid{b1.Cid(), b2.Cid()}) + var gotBlocks []blocks.Block + for b := range bchan { + gotBlocks = append(gotBlocks, b) + } + if len(gotBlocks) != 2 { + t.Fatalf("expected to retrieve 2 blocks, got %d", len(gotBlocks)) + } + if bstore.PutCounter != 3 { + t.Fatalf("expected 3 Put call, have: %d", bstore.PutCounter) + } + if exch.notifyCount != 3 { + t.Fatalf("expected one NotifyNewBlocks call, have: %d", exch.notifyCount) + } + + // reset counts + bstore.PutCounter = 0 + exch.notifyCount = 0 + }) + } +} + func TestLazySessionInitialization(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) @@ -53,8 +125,8 @@ func TestLazySessionInitialization(t *testing.T) { bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) bstore3 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) session := offline.Exchange(bstore2) - exchange := offline.Exchange(bstore3) - sessionExch := &fakeSessionExchange{Interface: exchange, session: session} + exch := offline.Exchange(bstore3) + sessionExch := &fakeSessionExchange{Interface: exch, session: session} bservSessEx := NewWriteThrough(bstore, sessionExch) bgen := butil.NewBlockGenerator() @@ -64,7 +136,11 @@ func TestLazySessionInitialization(t *testing.T) { t.Fatal(err) } block2 := bgen.Next() - err = session.HasBlock(ctx, block2) + err = bstore2.Put(ctx, block2) + if err != nil { + t.Fatal(err) + } + err = session.NotifyNewBlocks(ctx, block2) if err != nil { t.Fatal(err) } @@ -107,6 +183,23 @@ func (bs *PutCountingBlockstore) Put(ctx context.Context, block blocks.Block) er return bs.Blockstore.Put(ctx, block) } +func (bs *PutCountingBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { + bs.PutCounter += len(blocks) + return bs.Blockstore.PutMany(ctx, blocks) +} + +var _ exchange.Interface = (*notifyCountingExchange)(nil) + +type notifyCountingExchange struct { + exchange.Interface + notifyCount int +} + +func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + n.notifyCount += len(blocks) + return n.Interface.NotifyNewBlocks(ctx, blocks...) +} + var _ exchange.SessionExchange = (*fakeSessionExchange)(nil) type fakeSessionExchange struct { diff --git a/go.mod b/go.mod index 62ccaf3..a11a450 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,15 @@ module github.com/ipfs/go-blockservice go 1.17 require ( - github.com/ipfs/go-bitswap v0.6.0 + github.com/ipfs/go-bitswap v0.8.0 github.com/ipfs/go-block-format v0.0.3 github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.5.0 github.com/ipfs/go-ipfs-blockstore v1.2.0 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-delay v0.0.1 - github.com/ipfs/go-ipfs-exchange-interface v0.1.0 - github.com/ipfs/go-ipfs-exchange-offline v0.2.0 + github.com/ipfs/go-ipfs-exchange-interface v0.2.0 + github.com/ipfs/go-ipfs-exchange-offline v0.3.0 github.com/ipfs/go-ipfs-routing v0.2.1 github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-ipld-format v0.3.0 diff --git a/go.sum b/go.sum index b800a5e..44d57ba 100644 --- a/go.sum +++ b/go.sum @@ -249,8 +249,8 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/go-bitswap v0.6.0 h1:f2rc6GZtoSFhEIzQmddgGiel9xntj02Dg0ZNf2hSC+w= -github.com/ipfs/go-bitswap v0.6.0/go.mod h1:Hj3ZXdOC5wBJvENtdqsixmzzRukqd8EHLxZLZc3mzRA= +github.com/ipfs/go-bitswap v0.8.0 h1:UEV7kogQu2iGggkE9GhLykDrRCUpsNnpu2NODww/srw= +github.com/ipfs/go-bitswap v0.8.0/go.mod h1:/h8sBij8UVEaNWl8ABzpLRA5Y1cttdNUnpeGo2AA/LQ= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= github.com/ipfs/go-block-format v0.0.3 h1:r8t66QstRp/pd/or4dpnbVfXT5Gt7lOqRvC+/dDTpMc= github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk= @@ -289,10 +289,10 @@ github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs= github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR994w4Q= github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU= -github.com/ipfs/go-ipfs-exchange-interface v0.1.0 h1:TiMekCrOGQuWYtZO3mf4YJXDIdNgnKWZ9IE3fGlnWfo= -github.com/ipfs/go-ipfs-exchange-interface v0.1.0/go.mod h1:ych7WPlyHqFvCi/uQI48zLZuAWVP5iTQPXEfVaw5WEI= -github.com/ipfs/go-ipfs-exchange-offline v0.2.0 h1:2PF4o4A7W656rC0RxuhUace997FTcDTcIQ6NoEtyjAI= -github.com/ipfs/go-ipfs-exchange-offline v0.2.0/go.mod h1:HjwBeW0dvZvfOMwDP0TSKXIHf2s+ksdP4E3MLDRtLKY= +github.com/ipfs/go-ipfs-exchange-interface v0.2.0 h1:8lMSJmKogZYNo2jjhUs0izT+dck05pqUw4mWNW9Pw6Y= +github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y= +github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA= +github.com/ipfs/go-ipfs-exchange-offline v0.3.0/go.mod h1:MOdJ9DChbb5u37M1IcbrRB02e++Z7521fMxqCNRrz9s= github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY= github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= github.com/ipfs/go-ipfs-routing v0.2.1 h1:E+whHWhJkdN9YeoHZNj5itzc+OR292AJ2uE9FFiW0BY= diff --git a/version.json b/version.json index a654d65..372b6ea 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.3.0" + "version": "v0.4.0" }