From 8c399542f971177b6797d9043a733555437aa436 Mon Sep 17 00:00:00 2001 From: marston Date: Thu, 19 Dec 2024 17:01:04 -0500 Subject: [PATCH 1/9] tweaking some timeouts --- api/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/server.go b/api/server.go index aaf4c53..0406592 100644 --- a/api/server.go +++ b/api/server.go @@ -84,8 +84,8 @@ func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proo Handler: handler, Addr: fmt.Sprintf("0.0.0.0:%d", a.port), // Good practice: enforce timeouts for servers you create! - WriteTimeout: 15 * time.Second, - ReadTimeout: 15 * time.Second, + WriteTimeout: 0, + ReadTimeout: 30 * time.Second, } log.Logger.Info().Msg(fmt.Sprintf("Sequoia API now listening on %s", a.srv.Addr)) From c352418d446147ae94819563c3a7b18472cbcb5d Mon Sep 17 00:00:00 2001 From: marston Date: Thu, 19 Dec 2024 21:32:30 -0500 Subject: [PATCH 2/9] peering on start --- core/app.go | 63 ++++++++++++++++++++++++++++++++++++++++++++ file_system/types.go | 12 +++++++-- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/core/app.go b/core/app.go index 4ad728d..25adf75 100644 --- a/core/app.go +++ b/core/app.go @@ -2,13 +2,21 @@ package core import ( "context" + "encoding/json" "fmt" + "net/http" "os" "os/signal" + "path" "strconv" + "strings" "syscall" "time" + apiTypes "github.com/JackalLabs/sequoia/api/types" + "github.com/desmos-labs/cosmos-go-wallet/client" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/JackalLabs/sequoia/file_system" "github.com/JackalLabs/sequoia/ipfs" "github.com/ipfs/boxo/blockstore" @@ -273,6 +281,7 @@ func (a *App) Start() error { go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize) go a.monitor.Start() go recycleDepot.Start(cfg.StrayManagerCfg.CheckInterval) + go a.ConnectPeers(w.Client) done := make(chan os.Signal, 1) defer signal.Stop(done) // undo signal.Notify effect @@ -295,6 +304,60 @@ func (a *App) Start() error { return nil } +func (a *App) ConnectPeers(cl *client.Client) { + ctx := context.Background() + queryClient := storageTypes.NewQueryClient(cl.GRPCConn) + + activeProviders, err := queryClient.ActiveProviders(ctx, &storageTypes.QueryActiveProviders{}) + if err != nil { + log.Warn().Msg("Cannot get active provider list. Won't try IPFS Peers.") + return + } + + for _, provider := range activeProviders.Providers { + providerDetails, err := queryClient.Provider(ctx, &storageTypes.QueryProvider{ + Address: provider.Address, + }) + if err != nil { + log.Warn().Msgf("Couldn't get provider details from %s, something is really wrong with the network!", provider) + continue + } + + ip := providerDetails.Provider.Ip + + ipfsHostAddress := path.Join(ip, "/ipfs/hosts") + + res, err := http.Get(ipfsHostAddress) + if err != nil { + log.Warn().Msgf("Could not get hosts from %s", ip) + continue + } + defer res.Body.Close() + + var hosts apiTypes.HostResponse + + err = json.NewDecoder(res.Body).Decode(&hosts) + if err != nil { + log.Warn().Msgf("Could not parse hosts %s", ip) + continue + } + + for _, host := range hosts.Hosts { + if strings.Contains(host, "127.0.0.1") { + continue + } + adr, err := peer.AddrInfoFromString(host) + if err != nil { + log.Warn().Msgf("Could not parse host %s from %s", adr, ip) + continue + } + + a.fileSystem.Connect(adr) + } + + } +} + func (a *App) Salvage(jprovdHome string) error { cfg, err := config.Init(a.home) if err != nil { diff --git a/file_system/types.go b/file_system/types.go index f4ee2fd..00e3462 100644 --- a/file_system/types.go +++ b/file_system/types.go @@ -6,6 +6,7 @@ import ( "github.com/ipfs/boxo/blockstore" "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" ipfs2 "github.com/JackalLabs/sequoia/ipfs" "github.com/dgraph-io/badger/v4" @@ -20,11 +21,11 @@ type FileSystem struct { } func NewFileSystem(ctx context.Context, db *badger.DB, ds datastore.Batching, bs blockstore.Blockstore, ipfsPort int, ipfsDomain string) (*FileSystem, error) { - ipfs, host, err := ipfs2.MakeIPFS(ctx, ds, bs, ipfsPort, ipfsDomain) + ipfs, hh, err := ipfs2.MakeIPFS(ctx, ds, bs, ipfsPort, ipfsDomain) if err != nil { return nil, err } - return &FileSystem{db: db, ipfs: ipfs, ipfsHost: host}, nil + return &FileSystem{db: db, ipfs: ipfs, ipfsHost: hh}, nil } func (f *FileSystem) Close() { @@ -37,3 +38,10 @@ func (f *FileSystem) Close() { log.Error().Err(err).Msg("error occurred while stopping ipfs host") } } + +func (f *FileSystem) Connect(info *peer.AddrInfo) { + err := f.ipfsHost.Connect(context.Background(), *info) + if err != nil { + log.Warn().Msgf("Could not connect to %s | %v", info.String(), err) + } +} From c2057df7e39d122c5d7624a0be53c35302ee6c2f Mon Sep 17 00:00:00 2001 From: marston Date: Thu, 19 Dec 2024 21:41:59 -0500 Subject: [PATCH 3/9] more logging --- api/server.go | 7 +++---- core/app.go | 6 ++++-- file_system/types.go | 1 + 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/api/server.go b/api/server.go index 0406592..d8fc44d 100644 --- a/api/server.go +++ b/api/server.go @@ -44,7 +44,7 @@ func (a *API) Close() error { return a.srv.Close() } -func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) error { +func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) { defer log.Info().Msg("API module stopped") r := mux.NewRouter() @@ -92,9 +92,8 @@ func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proo err := a.srv.ListenAndServe() if err != nil { if !errors.Is(err, http.ErrServerClosed) { - return err + log.Warn().Err(err) + return } } - - return nil } diff --git a/core/app.go b/core/app.go index 25adf75..f57214c 100644 --- a/core/app.go +++ b/core/app.go @@ -276,12 +276,12 @@ func (a *App) Start() error { // Starting the 4 concurrent services // nolint:all + go a.ConnectPeers(w.Client) go a.api.Serve(recycleDepot, a.fileSystem, a.prover, w, params.ChunkSize) go a.prover.Start() go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize) go a.monitor.Start() go recycleDepot.Start(cfg.StrayManagerCfg.CheckInterval) - go a.ConnectPeers(w.Client) done := make(chan os.Signal, 1) defer signal.Stop(done) // undo signal.Notify effect @@ -305,6 +305,7 @@ func (a *App) Start() error { } func (a *App) ConnectPeers(cl *client.Client) { + log.Info().Msg("Starting IPFS Peering cycle...") ctx := context.Background() queryClient := storageTypes.NewQueryClient(cl.GRPCConn) @@ -322,9 +323,10 @@ func (a *App) ConnectPeers(cl *client.Client) { log.Warn().Msgf("Couldn't get provider details from %s, something is really wrong with the network!", provider) continue } - ip := providerDetails.Provider.Ip + log.Info().Msgf("Attempting to peer with %s", ip) + ipfsHostAddress := path.Join(ip, "/ipfs/hosts") res, err := http.Get(ipfsHostAddress) diff --git a/file_system/types.go b/file_system/types.go index 00e3462..e4fcf29 100644 --- a/file_system/types.go +++ b/file_system/types.go @@ -40,6 +40,7 @@ func (f *FileSystem) Close() { } func (f *FileSystem) Connect(info *peer.AddrInfo) { + log.Info().Msgf("Attempting connection to %s", info.String()) err := f.ipfsHost.Connect(context.Background(), *info) if err != nil { log.Warn().Msgf("Could not connect to %s | %v", info.String(), err) From 03ca7bdb3bd1676dbe759cc1b7178da7fbd80ca0 Mon Sep 17 00:00:00 2001 From: marston Date: Thu, 19 Dec 2024 21:54:38 -0500 Subject: [PATCH 4/9] better url parsing --- core/app.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/app.go b/core/app.go index f57214c..47f3be5 100644 --- a/core/app.go +++ b/core/app.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "os" "os/signal" "path" @@ -327,11 +328,18 @@ func (a *App) ConnectPeers(cl *client.Client) { log.Info().Msgf("Attempting to peer with %s", ip) - ipfsHostAddress := path.Join(ip, "/ipfs/hosts") + uip, err := url.Parse(ip) + if err != nil { + log.Warn().Msgf("Could not get parse %s", ip) + continue + } + uip.Path = path.Join(uip.Path, "ipfs", "hosts") + + ipfsHostAddress := uip.String() res, err := http.Get(ipfsHostAddress) if err != nil { - log.Warn().Msgf("Could not get hosts from %s", ip) + log.Warn().Msgf("Could not get hosts from %s", ipfsHostAddress) continue } defer res.Body.Close() From 2278f530e5f381184e9bc01b2c086c06349ffde1 Mon Sep 17 00:00:00 2001 From: marston Date: Thu, 19 Dec 2024 22:02:46 -0500 Subject: [PATCH 5/9] we don't need this message clogging things up --- proofs/proofs.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/proofs/proofs.go b/proofs/proofs.go index 79fff34..ad2454b 100644 --- a/proofs/proofs.go +++ b/proofs/proofs.go @@ -133,8 +133,7 @@ func (p *Prover) GenerateProof(merkle []byte, owner string, start int64, blockHe proven := file.ProvenThisBlock(blockHeight+int64(t.Seconds()/5.0), newProof.LastProven) if proven { - log.Info().Msg(fmt.Sprintf("%x was proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight)) - log.Debug().Msg("File was already proven") + log.Debug().Msg(fmt.Sprintf("%x was proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight)) return nil, nil, 0, nil } log.Info().Msg(fmt.Sprintf("%x was not yet proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight)) From 1a0c9fe255fbd557042f9b226229718829d97754 Mon Sep 17 00:00:00 2001 From: marston Date: Thu, 19 Dec 2024 22:04:57 -0500 Subject: [PATCH 6/9] async peers --- core/app.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/core/app.go b/core/app.go index 47f3be5..2d3d18d 100644 --- a/core/app.go +++ b/core/app.go @@ -11,6 +11,7 @@ import ( "path" "strconv" "strings" + "sync" "syscall" "time" @@ -352,18 +353,26 @@ func (a *App) ConnectPeers(cl *client.Client) { continue } + var wg sync.WaitGroup for _, host := range hosts.Hosts { - if strings.Contains(host, "127.0.0.1") { - continue - } - adr, err := peer.AddrInfoFromString(host) - if err != nil { - log.Warn().Msgf("Could not parse host %s from %s", adr, ip) - continue - } - - a.fileSystem.Connect(adr) + wg.Add(1) + go func() { + if strings.Contains(host, "127.0.0.1") { + wg.Done() + return + } + adr, err := peer.AddrInfoFromString(host) + if err != nil { + wg.Done() + log.Warn().Msgf("Could not parse host %s from %s", adr, ip) + return + } + + a.fileSystem.Connect(adr) + wg.Done() + }() } + wg.Wait() } } From 42332b30ac2c0637aaa4e054045955f8e0d00a19 Mon Sep 17 00:00:00 2001 From: marston Date: Thu, 19 Dec 2024 22:05:27 -0500 Subject: [PATCH 7/9] dont need that as a log --- proofs/proofs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proofs/proofs.go b/proofs/proofs.go index ad2454b..b8fc0a7 100644 --- a/proofs/proofs.go +++ b/proofs/proofs.go @@ -199,7 +199,7 @@ func (p *Prover) PostProof(merkle []byte, owner string, start int64, blockHeight log.Error().Msg(postRes.ErrorMessage) } - log.Info().Msg(fmt.Sprintf("%x was successfully proven", merkle)) + log.Debug().Msg(fmt.Sprintf("%x was successfully proven", merkle)) log.Debug().Msg(fmt.Sprintf("TX Hash: %s", m.Hash())) From 84abf8e45df723815eb9d2eeba3bfdb81856171d Mon Sep 17 00:00:00 2001 From: marston Date: Thu, 19 Dec 2024 22:23:30 -0500 Subject: [PATCH 8/9] fuck it we try em all --- core/app.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/core/app.go b/core/app.go index 2d3d18d..466af8f 100644 --- a/core/app.go +++ b/core/app.go @@ -11,7 +11,6 @@ import ( "path" "strconv" "strings" - "sync" "syscall" "time" @@ -353,27 +352,20 @@ func (a *App) ConnectPeers(cl *client.Client) { continue } - var wg sync.WaitGroup - for _, host := range hosts.Hosts { - wg.Add(1) + for _, h := range hosts.Hosts { + host := h go func() { - if strings.Contains(host, "127.0.0.1") { - wg.Done() + if strings.Contains(host, "127.0.0.1") || strings.Contains(host, "ip6/") { return } adr, err := peer.AddrInfoFromString(host) if err != nil { - wg.Done() log.Warn().Msgf("Could not parse host %s from %s", adr, ip) return } - a.fileSystem.Connect(adr) - wg.Done() }() } - wg.Wait() - } } From e5d3578b71e9434044efcea49492dd6b0acaf54e Mon Sep 17 00:00:00 2001 From: marston Date: Thu, 19 Dec 2024 22:26:22 -0500 Subject: [PATCH 9/9] switching delete to debug --- proofs/proofs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proofs/proofs.go b/proofs/proofs.go index b8fc0a7..c68603b 100644 --- a/proofs/proofs.go +++ b/proofs/proofs.go @@ -261,7 +261,7 @@ func (p *Prover) wrapPostProof(merkle []byte, owner string, start int64, height Msg("proof error") if err.Error() == "rpc error: code = NotFound desc = not found" { // if the file is not found on the network, delete it - log.Info(). + log.Debug(). Hex("merkle", merkle). Str("owner", owner). Int64("start", start).