Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor APIProvider to NodeProvider #400

Merged
merged 8 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/tendermint/commands/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"strings"
"time"

coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/spf13/cobra"

"github.com/lazyledger/lazyledger-core/crypto/merkle"
Expand Down Expand Up @@ -185,13 +184,14 @@ func runProxy(cmd *cobra.Command, args []string) error {
cfg := ipfs.DefaultConfig()
cfg.RootDir = dir
// TODO(ismail): share badger instance
apiProvider := ipfs.Embedded(true, cfg, logger)
var dag coreiface.APIDagService
dag, ipfsCloser, err = apiProvider()
apiProvider, err := ipfs.Embedded(true, cfg, logger)
if err != nil {
return err
}
if err != nil {
return fmt.Errorf("could not start ipfs API: %w", err)
}
options = append(options, light.DataAvailabilitySampling(numSamples, dag))
options = append(options, light.DataAvailabilitySampling(numSamples, apiProvider.DAG()))
case sequential:
options = append(options, light.SequentialVerification())
default:
Expand Down
7 changes: 6 additions & 1 deletion cmd/tendermint/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,14 @@ func NewRunNodeCmd(nodeProvider nm.Provider) *cobra.Command {
return err
}

apiProvider, err := ipfs.Embedded(initIPFS, config.IPFS, logger)
if err != nil {
return err
}

n, err := nodeProvider(
config,
ipfs.Embedded(initIPFS, config.IPFS, logger),
apiProvider,
logger,
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-ipfs v0.8.0
github.com/ipfs/go-ipfs-api v0.2.0
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-config v0.11.0
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.2
Expand All @@ -30,6 +31,7 @@ require (
github.com/lazyledger/nmt v0.5.0
github.com/lazyledger/rsmt2d v0.2.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p-core v0.7.0
github.com/minio/highwayhash v1.0.1
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multihash v0.0.14
Expand Down
111 changes: 50 additions & 61 deletions ipfs/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"sync"

ipfscfg "github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/go-ipfs/core/corehttp"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/fsrepo"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"

Expand All @@ -25,70 +22,62 @@ import (

// Embedded is the provider that embeds IPFS node within the same process.
// It also returns closable for graceful node shutdown.
func Embedded(init bool, cfg *Config, logger log.Logger) APIProvider {
return func() (coreiface.APIDagService, io.Closer, error) {
path := cfg.Path()
defer os.Setenv(ipfscfg.EnvDir, path)
func Embedded(init bool, cfg *Config, logger log.Logger) (APIProvider, error) {
path := cfg.Path()
defer os.Setenv(ipfscfg.EnvDir, path)
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved

// NOTE: no need to validate the path before
if err := plugins(path); err != nil {
return nil, nil, err
}
// Init Repo if requested
if init {
if err := InitRepo(path, logger); err != nil {
return nil, nil, err
}
}
// Open the repo
repo, err := fsrepo.Open(path)
if err != nil {
var nrerr fsrepo.NoRepoError
if errors.As(err, &nrerr) {
return nil, nil, fmt.Errorf("no IPFS repo found in %s.\nplease use flag: --ipfs-init", nrerr.Path)
}
return nil, nil, err
}
// Construct the node
nodeOptions := &core.BuildCfg{
Online: true,
// This option sets the node to be a full DHT node (both fetching and storing DHT Records)
Routing: libp2p.DHTOption,
// This option sets the node to be a client DHT node (only fetching records)
// Routing: libp2p.DHTClientOption,
Repo: repo,
}
// Internally, ipfs decorates the context with a
// context.WithCancel. Which is then used for lifecycle management.
// We do not make use of this context and rely on calling
// Close() on the node instead
ctx := context.Background()
// It is essential that we create a fresh instance of ipfs node on
// each start as internally the node gets only stopped once per instance.
// At least in ipfs 0.7.0; see:
// https://github.com/lazyledger/go-ipfs/blob/dd295e45608560d2ada7d7c8a30f1eef3f4019bb/core/builder.go#L48-L57
node, err := core.NewNode(ctx, nodeOptions)
if err != nil {
_ = repo.Close()
return nil, nil, err
// NOTE: no need to validate the path before
if err := plugins(path); err != nil {
return nil, err
}
// Init Repo if requested
if init {
if err := InitRepo(path, logger); err != nil {
return nil, err
}
// Serve API if requested
if cfg.ServeAPI {
if err := serveAPI(path, repo, node); err != nil {
_ = node.Close()
return nil, nil, err
}
}
// Open the repo
repo, err := fsrepo.Open(path)
if err != nil {
var nrerr fsrepo.NoRepoError
if errors.As(err, &nrerr) {
return nil, fmt.Errorf("no IPFS repo found in %s.\nplease use flag: --ipfs-init", nrerr.Path)
}
// Wrap Node and create CoreAPI
api, err := coreapi.NewCoreAPI(node)
if err != nil {
return nil, err
}
// Construct the node
nodeOptions := &core.BuildCfg{
Online: true,
// This option sets the node to be a full DHT node (both fetching and storing DHT Records)
Routing: libp2p.DHTOption,
// This option sets the node to be a client DHT node (only fetching records)
// Routing: libp2p.DHTClientOption,
Repo: repo,
}
// Internally, ipfs decorates the context with a
// context.WithCancel. Which is then used for lifecycle management.
// We do not make use of this context and rely on calling
// Close() on the node instead
ctx := context.Background()
// It is essential that we create a fresh instance of ipfs node on
// each start as internally the node gets only stopped once per instance.
// At least in ipfs 0.7.0; see:
// https://github.com/lazyledger/go-ipfs/blob/dd295e45608560d2ada7d7c8a30f1eef3f4019bb/core/builder.go#L48-L57
node, err := core.NewNode(ctx, nodeOptions)
if err != nil {
_ = repo.Close()
return nil, err
}
// Serve API if requested
if cfg.ServeAPI {
if err := serveAPI(path, repo, node); err != nil {
_ = node.Close()
return nil, nil, fmt.Errorf("failed to create an instance of the IPFS core API: %w", err)
return nil, err
}

logger.Info("Successfully created embedded IPFS node", "ipfs-repo", path)
return api.Dag(), node, nil
}

logger.Info("Successfully created embedded IPFS node", "ipfs-repo", path)
return FullNodeProvider{node: node}, nil
}

// serveAPI creates and HTTP server for IPFS API.
Expand Down
24 changes: 12 additions & 12 deletions ipfs/mock.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
package ipfs

import (
"io"

blockstore "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format"
mdutils "github.com/ipfs/go-merkledag/test"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/libp2p/go-libp2p-core/routing"
)

// Mock provides simple mock IPFS API useful for testing
func Mock() APIProvider {
return func() (coreiface.APIDagService, io.Closer, error) {
dom := dagOnlyMock{mdutils.Mock()}

return dom, dom, nil
return mockProvider{
dag: mdutils.Mock(),
}
}

type dagOnlyMock struct {
ipld.DAGService
var _ APIProvider = mockProvider{}

type mockProvider struct {
dag ipld.DAGService
}

func (dom dagOnlyMock) Dag() coreiface.APIDagService { return dom }
func (dagOnlyMock) Close() error { return nil }
func (dom dagOnlyMock) Pinning() ipld.NodeAdder { return dom }
func (m mockProvider) DAG() ipld.DAGService { return m.dag }
func (m mockProvider) Routing() routing.ContentRouting { return nil }
func (m mockProvider) Blockstore() blockstore.Blockstore { return nil }
func (m mockProvider) Close() error { return nil }
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
36 changes: 34 additions & 2 deletions ipfs/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,40 @@ package ipfs
import (
"io"

coreiface "github.com/ipfs/interface-go-ipfs-core"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs/core"
ipld "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p-core/routing"
)

// APIProvider allows customizable IPFS core APIs.
type APIProvider func() (coreiface.APIDagService, io.Closer, error)
type APIProvider interface {
DAG() ipld.DAGService
Blockstore() blockstore.Blockstore
Routing() routing.ContentRouting

io.Closer
}
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved

var _ APIProvider = FullNodeProvider{}

// FullNodeProvider wraps a ipfs core node to fullfill the APIProvider interface
type FullNodeProvider struct {
node *core.IpfsNode
}

func (f FullNodeProvider) DAG() ipld.DAGService {
return f.node.DAG
}

func (f FullNodeProvider) Routing() routing.ContentRouting {
return f.node.Routing
}

func (f FullNodeProvider) Blockstore() blockstore.Blockstore {
return f.node.Blockstore
}

func (f FullNodeProvider) Close() error {
return f.Close()
}
5 changes: 2 additions & 3 deletions light/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/lazyledger/nmt/namespace"

"github.com/lazyledger/lazyledger-core/libs/log"
Expand Down Expand Up @@ -71,7 +70,7 @@ func SkippingVerification(trustLevel tmmath.Fraction) Option {
}
}

func DataAvailabilitySampling(numSamples uint32, ipfsAPI coreiface.APIDagService) Option {
func DataAvailabilitySampling(numSamples uint32, ipfsAPI format.DAGService) Option {
return func(c *Client) {
c.verificationMode = dataAvailabilitySampling
c.numSamples = numSamples
Expand Down Expand Up @@ -157,7 +156,7 @@ type Client struct {

logger log.Logger

dag coreiface.APIDagService
dag format.DAGService
sessionDAG format.NodeGetter
}

Expand Down
14 changes: 4 additions & 10 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

ipld "github.com/ipfs/go-ipld-format"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
Expand Down Expand Up @@ -222,7 +221,7 @@ type Node struct {
func initDBs(
config *cfg.Config,
dbProvider DBProvider,
dag iface.APIDagService,
dag ipld.DAGService,
) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&DBContext{"blockstore", config})
Expand Down Expand Up @@ -643,12 +642,7 @@ func NewNode(config *cfg.Config,
logger log.Logger,
options ...Option) (*Node, error) {

dag, ipfsclose, err := ipfsProvider()
if err != nil {
return nil, err
}

blockStore, stateDB, err := initDBs(config, dbProvider, dag)
blockStore, stateDB, err := initDBs(config, dbProvider, ipfsProvider.DAG())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -762,7 +756,7 @@ func NewNode(config *cfg.Config,
}
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || fastSync, eventBus, dag, consensusLogger,
privValidator, csMetrics, stateSync || fastSync, eventBus, ipfsProvider.DAG(), consensusLogger,
)

// Set up state sync reactor, and schedule a sync if requested.
Expand Down Expand Up @@ -863,7 +857,7 @@ func NewNode(config *cfg.Config,
txIndexer: txIndexer,
indexerService: indexerService,
eventBus: eventBus,
ipfsClose: ipfsclose,
ipfsClose: ipfsProvider,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)

Expand Down
8 changes: 7 additions & 1 deletion test/e2e/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,19 @@ func startNode(cfg *Config) error {
if err != nil {
return err
}

ipfsProvider, err := ipfs.Embedded(true, ipfs.DefaultConfig(), nodeLogger)
if err != nil {
return err
}

n, err := node.NewNode(tmcfg,
pval,
*nodeKey,
proxy.NewLocalClientCreator(app),
node.DefaultGenesisDocProviderFunc(tmcfg),
node.DefaultDBProvider,
ipfs.Embedded(true, ipfs.DefaultConfig(), nodeLogger),
ipfsProvider,
node.DefaultMetricsProvider(tmcfg.Instrumentation),
nodeLogger,
)
Expand Down