Skip to content

Commit

Permalink
IPFS peering on boot (#86)
Browse files Browse the repository at this point in the history
* tweaking some timeouts

* peering on start

* more logging

* better url parsing

* we don't need this message clogging things up

* async peers

* dont need that as a log

* fuck it we try em all

* switching delete to debug
  • Loading branch information
TheMarstonConnell authored Dec 20, 2024
1 parent e202c2f commit de07ad2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 12 deletions.
11 changes: 5 additions & 6 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (a *API) Close() error {
return a.srv.Close()
}

func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) error {
func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) {
defer log.Info().Msg("API module stopped")
r := mux.NewRouter()

Expand Down Expand Up @@ -84,17 +84,16 @@ func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proo
Handler: handler,
Addr: fmt.Sprintf("0.0.0.0:%d", a.port),
// Good practice: enforce timeouts for servers you create!
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
WriteTimeout: 0,
ReadTimeout: 30 * time.Second,
}

log.Logger.Info().Msg(fmt.Sprintf("Sequoia API now listening on %s", a.srv.Addr))
err := a.srv.ListenAndServe()
if err != nil {
if !errors.Is(err, http.ErrServerClosed) {
return err
log.Warn().Err(err)
return
}
}

return nil
}
74 changes: 74 additions & 0 deletions core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@ package core

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"strconv"
"strings"
"syscall"
"time"

apiTypes "github.com/JackalLabs/sequoia/api/types"
"github.com/desmos-labs/cosmos-go-wallet/client"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/JackalLabs/sequoia/file_system"
"github.com/JackalLabs/sequoia/ipfs"
"github.com/ipfs/boxo/blockstore"
Expand Down Expand Up @@ -268,6 +277,7 @@ func (a *App) Start() error {

// Starting the 4 concurrent services
// nolint:all
go a.ConnectPeers(w.Client)
go a.api.Serve(recycleDepot, a.fileSystem, a.prover, w, params.ChunkSize)
go a.prover.Start()
go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize)
Expand Down Expand Up @@ -295,6 +305,70 @@ func (a *App) Start() error {
return nil
}

func (a *App) ConnectPeers(cl *client.Client) {
log.Info().Msg("Starting IPFS Peering cycle...")
ctx := context.Background()
queryClient := storageTypes.NewQueryClient(cl.GRPCConn)

activeProviders, err := queryClient.ActiveProviders(ctx, &storageTypes.QueryActiveProviders{})
if err != nil {
log.Warn().Msg("Cannot get active provider list. Won't try IPFS Peers.")
return
}

for _, provider := range activeProviders.Providers {
providerDetails, err := queryClient.Provider(ctx, &storageTypes.QueryProvider{
Address: provider.Address,
})
if err != nil {
log.Warn().Msgf("Couldn't get provider details from %s, something is really wrong with the network!", provider)
continue
}
ip := providerDetails.Provider.Ip

log.Info().Msgf("Attempting to peer with %s", ip)

uip, err := url.Parse(ip)
if err != nil {
log.Warn().Msgf("Could not get parse %s", ip)
continue
}
uip.Path = path.Join(uip.Path, "ipfs", "hosts")

ipfsHostAddress := uip.String()

res, err := http.Get(ipfsHostAddress)
if err != nil {
log.Warn().Msgf("Could not get hosts from %s", ipfsHostAddress)
continue
}
defer res.Body.Close()

var hosts apiTypes.HostResponse

err = json.NewDecoder(res.Body).Decode(&hosts)
if err != nil {
log.Warn().Msgf("Could not parse hosts %s", ip)
continue
}

for _, h := range hosts.Hosts {
host := h
go func() {
if strings.Contains(host, "127.0.0.1") || strings.Contains(host, "ip6/") {
return
}
adr, err := peer.AddrInfoFromString(host)
if err != nil {
log.Warn().Msgf("Could not parse host %s from %s", adr, ip)
return
}
a.fileSystem.Connect(adr)
}()
}
}
}

func (a *App) Salvage(jprovdHome string) error {
cfg, err := config.Init(a.home)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions file_system/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

ipfs2 "github.com/JackalLabs/sequoia/ipfs"
"github.com/dgraph-io/badger/v4"
Expand All @@ -20,11 +21,11 @@ type FileSystem struct {
}

func NewFileSystem(ctx context.Context, db *badger.DB, ds datastore.Batching, bs blockstore.Blockstore, ipfsPort int, ipfsDomain string) (*FileSystem, error) {
ipfs, host, err := ipfs2.MakeIPFS(ctx, ds, bs, ipfsPort, ipfsDomain)
ipfs, hh, err := ipfs2.MakeIPFS(ctx, ds, bs, ipfsPort, ipfsDomain)
if err != nil {
return nil, err
}
return &FileSystem{db: db, ipfs: ipfs, ipfsHost: host}, nil
return &FileSystem{db: db, ipfs: ipfs, ipfsHost: hh}, nil
}

func (f *FileSystem) Close() {
Expand All @@ -37,3 +38,11 @@ func (f *FileSystem) Close() {
log.Error().Err(err).Msg("error occurred while stopping ipfs host")
}
}

func (f *FileSystem) Connect(info *peer.AddrInfo) {
log.Info().Msgf("Attempting connection to %s", info.String())
err := f.ipfsHost.Connect(context.Background(), *info)
if err != nil {
log.Warn().Msgf("Could not connect to %s | %v", info.String(), err)
}
}
7 changes: 3 additions & 4 deletions proofs/proofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ func (p *Prover) GenerateProof(merkle []byte, owner string, start int64, blockHe

proven := file.ProvenThisBlock(blockHeight+int64(t.Seconds()/5.0), newProof.LastProven)
if proven {
log.Info().Msg(fmt.Sprintf("%x was proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight))
log.Debug().Msg("File was already proven")
log.Debug().Msg(fmt.Sprintf("%x was proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight))
return nil, nil, 0, nil
}
log.Info().Msg(fmt.Sprintf("%x was not yet proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight))
Expand Down Expand Up @@ -200,7 +199,7 @@ func (p *Prover) PostProof(merkle []byte, owner string, start int64, blockHeight
log.Error().Msg(postRes.ErrorMessage)
}

log.Info().Msg(fmt.Sprintf("%x was successfully proven", merkle))
log.Debug().Msg(fmt.Sprintf("%x was successfully proven", merkle))

log.Debug().Msg(fmt.Sprintf("TX Hash: %s", m.Hash()))

Expand Down Expand Up @@ -262,7 +261,7 @@ func (p *Prover) wrapPostProof(merkle []byte, owner string, start int64, height
Msg("proof error")

if err.Error() == "rpc error: code = NotFound desc = not found" { // if the file is not found on the network, delete it
log.Info().
log.Debug().
Hex("merkle", merkle).
Str("owner", owner).
Int64("start", start).
Expand Down

0 comments on commit de07ad2

Please sign in to comment.