Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPFS peering on boot #86

Merged
merged 9 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (a *API) Close() error {
return a.srv.Close()
}

func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) error {
func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) {
defer log.Info().Msg("API module stopped")
r := mux.NewRouter()

Expand Down Expand Up @@ -84,17 +84,16 @@ func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proo
Handler: handler,
Addr: fmt.Sprintf("0.0.0.0:%d", a.port),
// Good practice: enforce timeouts for servers you create!
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
WriteTimeout: 0,
ReadTimeout: 30 * time.Second,
}

log.Logger.Info().Msg(fmt.Sprintf("Sequoia API now listening on %s", a.srv.Addr))
err := a.srv.ListenAndServe()
if err != nil {
if !errors.Is(err, http.ErrServerClosed) {
return err
log.Warn().Err(err)
return
}
}

return nil
}
74 changes: 74 additions & 0 deletions core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@ package core

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"strconv"
"strings"
"syscall"
"time"

apiTypes "github.com/JackalLabs/sequoia/api/types"
"github.com/desmos-labs/cosmos-go-wallet/client"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/JackalLabs/sequoia/file_system"
"github.com/JackalLabs/sequoia/ipfs"
"github.com/ipfs/boxo/blockstore"
Expand Down Expand Up @@ -268,6 +277,7 @@ func (a *App) Start() error {

// Starting the 4 concurrent services
// nolint:all
go a.ConnectPeers(w.Client)
go a.api.Serve(recycleDepot, a.fileSystem, a.prover, w, params.ChunkSize)
go a.prover.Start()
go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize)
Expand Down Expand Up @@ -295,6 +305,70 @@ func (a *App) Start() error {
return nil
}

func (a *App) ConnectPeers(cl *client.Client) {
log.Info().Msg("Starting IPFS Peering cycle...")
ctx := context.Background()
queryClient := storageTypes.NewQueryClient(cl.GRPCConn)

activeProviders, err := queryClient.ActiveProviders(ctx, &storageTypes.QueryActiveProviders{})
if err != nil {
log.Warn().Msg("Cannot get active provider list. Won't try IPFS Peers.")
return
}

for _, provider := range activeProviders.Providers {
providerDetails, err := queryClient.Provider(ctx, &storageTypes.QueryProvider{
Address: provider.Address,
})
if err != nil {
log.Warn().Msgf("Couldn't get provider details from %s, something is really wrong with the network!", provider)
continue
}
ip := providerDetails.Provider.Ip

log.Info().Msgf("Attempting to peer with %s", ip)

uip, err := url.Parse(ip)
if err != nil {
log.Warn().Msgf("Could not get parse %s", ip)
continue
}
uip.Path = path.Join(uip.Path, "ipfs", "hosts")

ipfsHostAddress := uip.String()

res, err := http.Get(ipfsHostAddress)
if err != nil {
log.Warn().Msgf("Could not get hosts from %s", ipfsHostAddress)
continue
}
defer res.Body.Close()

var hosts apiTypes.HostResponse

err = json.NewDecoder(res.Body).Decode(&hosts)
if err != nil {
log.Warn().Msgf("Could not parse hosts %s", ip)
continue
}

for _, h := range hosts.Hosts {
host := h
go func() {
if strings.Contains(host, "127.0.0.1") || strings.Contains(host, "ip6/") {
return
}
adr, err := peer.AddrInfoFromString(host)
if err != nil {
log.Warn().Msgf("Could not parse host %s from %s", adr, ip)
return
}
a.fileSystem.Connect(adr)
}()
}
}
}

func (a *App) Salvage(jprovdHome string) error {
cfg, err := config.Init(a.home)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions file_system/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

ipfs2 "github.com/JackalLabs/sequoia/ipfs"
"github.com/dgraph-io/badger/v4"
Expand All @@ -20,11 +21,11 @@ type FileSystem struct {
}

func NewFileSystem(ctx context.Context, db *badger.DB, ds datastore.Batching, bs blockstore.Blockstore, ipfsPort int, ipfsDomain string) (*FileSystem, error) {
ipfs, host, err := ipfs2.MakeIPFS(ctx, ds, bs, ipfsPort, ipfsDomain)
ipfs, hh, err := ipfs2.MakeIPFS(ctx, ds, bs, ipfsPort, ipfsDomain)
if err != nil {
return nil, err
}
return &FileSystem{db: db, ipfs: ipfs, ipfsHost: host}, nil
return &FileSystem{db: db, ipfs: ipfs, ipfsHost: hh}, nil
}

func (f *FileSystem) Close() {
Expand All @@ -37,3 +38,11 @@ func (f *FileSystem) Close() {
log.Error().Err(err).Msg("error occurred while stopping ipfs host")
}
}

func (f *FileSystem) Connect(info *peer.AddrInfo) {
log.Info().Msgf("Attempting connection to %s", info.String())
err := f.ipfsHost.Connect(context.Background(), *info)
if err != nil {
log.Warn().Msgf("Could not connect to %s | %v", info.String(), err)
}
}
7 changes: 3 additions & 4 deletions proofs/proofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ func (p *Prover) GenerateProof(merkle []byte, owner string, start int64, blockHe

proven := file.ProvenThisBlock(blockHeight+int64(t.Seconds()/5.0), newProof.LastProven)
if proven {
log.Info().Msg(fmt.Sprintf("%x was proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight))
log.Debug().Msg("File was already proven")
log.Debug().Msg(fmt.Sprintf("%x was proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight))
return nil, nil, 0, nil
}
log.Info().Msg(fmt.Sprintf("%x was not yet proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight))
Expand Down Expand Up @@ -200,7 +199,7 @@ func (p *Prover) PostProof(merkle []byte, owner string, start int64, blockHeight
log.Error().Msg(postRes.ErrorMessage)
}

log.Info().Msg(fmt.Sprintf("%x was successfully proven", merkle))
log.Debug().Msg(fmt.Sprintf("%x was successfully proven", merkle))

log.Debug().Msg(fmt.Sprintf("TX Hash: %s", m.Hash()))

Expand Down Expand Up @@ -262,7 +261,7 @@ func (p *Prover) wrapPostProof(merkle []byte, owner string, start int64, height
Msg("proof error")

if err.Error() == "rpc error: code = NotFound desc = not found" { // if the file is not found on the network, delete it
log.Info().
log.Debug().
Hex("merkle", merkle).
Str("owner", owner).
Int64("start", start).
Expand Down
Loading