From de07ad2dd2b551a9c774e00bf9914a2d1591efe8 Mon Sep 17 00:00:00 2001 From: Marston Connell <34043723+TheMarstonConnell@users.noreply.github.com> Date: Thu, 19 Dec 2024 23:00:38 -0500 Subject: [PATCH] IPFS peering on boot (#86) * tweaking some timeouts * peering on start * more logging * better url parsing * we don't need this message clogging things up * async peers * dont need that as a log * fuck it we try em all * switching delete to debug --- api/server.go | 11 +++---- core/app.go | 74 ++++++++++++++++++++++++++++++++++++++++++++ file_system/types.go | 13 ++++++-- proofs/proofs.go | 7 ++--- 4 files changed, 93 insertions(+), 12 deletions(-) diff --git a/api/server.go b/api/server.go index aaf4c53..d8fc44d 100644 --- a/api/server.go +++ b/api/server.go @@ -44,7 +44,7 @@ func (a *API) Close() error { return a.srv.Close() } -func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) error { +func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) { defer log.Info().Msg("API module stopped") r := mux.NewRouter() @@ -84,17 +84,16 @@ func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proo Handler: handler, Addr: fmt.Sprintf("0.0.0.0:%d", a.port), // Good practice: enforce timeouts for servers you create! - WriteTimeout: 15 * time.Second, - ReadTimeout: 15 * time.Second, + WriteTimeout: 0, + ReadTimeout: 30 * time.Second, } log.Logger.Info().Msg(fmt.Sprintf("Sequoia API now listening on %s", a.srv.Addr)) err := a.srv.ListenAndServe() if err != nil { if !errors.Is(err, http.ErrServerClosed) { - return err + log.Warn().Err(err) + return } } - - return nil } diff --git a/core/app.go b/core/app.go index 4ad728d..466af8f 100644 --- a/core/app.go +++ b/core/app.go @@ -2,13 +2,22 @@ package core import ( "context" + "encoding/json" "fmt" + "net/http" + "net/url" "os" "os/signal" + "path" "strconv" + "strings" "syscall" "time" + apiTypes "github.com/JackalLabs/sequoia/api/types" + "github.com/desmos-labs/cosmos-go-wallet/client" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/JackalLabs/sequoia/file_system" "github.com/JackalLabs/sequoia/ipfs" "github.com/ipfs/boxo/blockstore" @@ -268,6 +277,7 @@ func (a *App) Start() error { // Starting the 4 concurrent services // nolint:all + go a.ConnectPeers(w.Client) go a.api.Serve(recycleDepot, a.fileSystem, a.prover, w, params.ChunkSize) go a.prover.Start() go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize) @@ -295,6 +305,70 @@ func (a *App) Start() error { return nil } +func (a *App) ConnectPeers(cl *client.Client) { + log.Info().Msg("Starting IPFS Peering cycle...") + ctx := context.Background() + queryClient := storageTypes.NewQueryClient(cl.GRPCConn) + + activeProviders, err := queryClient.ActiveProviders(ctx, &storageTypes.QueryActiveProviders{}) + if err != nil { + log.Warn().Msg("Cannot get active provider list. Won't try IPFS Peers.") + return + } + + for _, provider := range activeProviders.Providers { + providerDetails, err := queryClient.Provider(ctx, &storageTypes.QueryProvider{ + Address: provider.Address, + }) + if err != nil { + log.Warn().Msgf("Couldn't get provider details from %s, something is really wrong with the network!", provider) + continue + } + ip := providerDetails.Provider.Ip + + log.Info().Msgf("Attempting to peer with %s", ip) + + uip, err := url.Parse(ip) + if err != nil { + log.Warn().Msgf("Could not get parse %s", ip) + continue + } + uip.Path = path.Join(uip.Path, "ipfs", "hosts") + + ipfsHostAddress := uip.String() + + res, err := http.Get(ipfsHostAddress) + if err != nil { + log.Warn().Msgf("Could not get hosts from %s", ipfsHostAddress) + continue + } + defer res.Body.Close() + + var hosts apiTypes.HostResponse + + err = json.NewDecoder(res.Body).Decode(&hosts) + if err != nil { + log.Warn().Msgf("Could not parse hosts %s", ip) + continue + } + + for _, h := range hosts.Hosts { + host := h + go func() { + if strings.Contains(host, "127.0.0.1") || strings.Contains(host, "ip6/") { + return + } + adr, err := peer.AddrInfoFromString(host) + if err != nil { + log.Warn().Msgf("Could not parse host %s from %s", adr, ip) + return + } + a.fileSystem.Connect(adr) + }() + } + } +} + func (a *App) Salvage(jprovdHome string) error { cfg, err := config.Init(a.home) if err != nil { diff --git a/file_system/types.go b/file_system/types.go index f4ee2fd..e4fcf29 100644 --- a/file_system/types.go +++ b/file_system/types.go @@ -6,6 +6,7 @@ import ( "github.com/ipfs/boxo/blockstore" "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" ipfs2 "github.com/JackalLabs/sequoia/ipfs" "github.com/dgraph-io/badger/v4" @@ -20,11 +21,11 @@ type FileSystem struct { } func NewFileSystem(ctx context.Context, db *badger.DB, ds datastore.Batching, bs blockstore.Blockstore, ipfsPort int, ipfsDomain string) (*FileSystem, error) { - ipfs, host, err := ipfs2.MakeIPFS(ctx, ds, bs, ipfsPort, ipfsDomain) + ipfs, hh, err := ipfs2.MakeIPFS(ctx, ds, bs, ipfsPort, ipfsDomain) if err != nil { return nil, err } - return &FileSystem{db: db, ipfs: ipfs, ipfsHost: host}, nil + return &FileSystem{db: db, ipfs: ipfs, ipfsHost: hh}, nil } func (f *FileSystem) Close() { @@ -37,3 +38,11 @@ func (f *FileSystem) Close() { log.Error().Err(err).Msg("error occurred while stopping ipfs host") } } + +func (f *FileSystem) Connect(info *peer.AddrInfo) { + log.Info().Msgf("Attempting connection to %s", info.String()) + err := f.ipfsHost.Connect(context.Background(), *info) + if err != nil { + log.Warn().Msgf("Could not connect to %s | %v", info.String(), err) + } +} diff --git a/proofs/proofs.go b/proofs/proofs.go index 79fff34..c68603b 100644 --- a/proofs/proofs.go +++ b/proofs/proofs.go @@ -133,8 +133,7 @@ func (p *Prover) GenerateProof(merkle []byte, owner string, start int64, blockHe proven := file.ProvenThisBlock(blockHeight+int64(t.Seconds()/5.0), newProof.LastProven) if proven { - log.Info().Msg(fmt.Sprintf("%x was proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight)) - log.Debug().Msg("File was already proven") + log.Debug().Msg(fmt.Sprintf("%x was proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight)) return nil, nil, 0, nil } log.Info().Msg(fmt.Sprintf("%x was not yet proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight)) @@ -200,7 +199,7 @@ func (p *Prover) PostProof(merkle []byte, owner string, start int64, blockHeight log.Error().Msg(postRes.ErrorMessage) } - log.Info().Msg(fmt.Sprintf("%x was successfully proven", merkle)) + log.Debug().Msg(fmt.Sprintf("%x was successfully proven", merkle)) log.Debug().Msg(fmt.Sprintf("TX Hash: %s", m.Hash())) @@ -262,7 +261,7 @@ func (p *Prover) wrapPostProof(merkle []byte, owner string, start int64, height Msg("proof error") if err.Error() == "rpc error: code = NotFound desc = not found" { // if the file is not found on the network, delete it - log.Info(). + log.Debug(). Hex("merkle", merkle). Str("owner", owner). Int64("start", start).