Skip to content

Commit

Permalink
Merge pull request #62 from JackalLabs/marston/test-recycle
Browse files Browse the repository at this point in the history
Recycle Old Files
  • Loading branch information
TheMarstonConnell authored Aug 10, 2024
2 parents d686c72 + 5074ad9 commit 2b5d067
Show file tree
Hide file tree
Showing 30 changed files with 914 additions and 113 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ data
/.idea/vcs.xml
/.idea/modules.xml
/coverage.txt

*.log
.idea
2 changes: 1 addition & 1 deletion api/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/JackalLabs/sequoia/api/types"
"github.com/JackalLabs/sequoia/file_system"
"github.com/gorilla/mux"
storageTypes "github.com/jackalLabs/canine-chain/v3/x/storage/types"
storageTypes "github.com/jackalLabs/canine-chain/v4/x/storage/types"
"github.com/rs/zerolog/log"
)

Expand Down
24 changes: 24 additions & 0 deletions api/index.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"net/http"

"github.com/JackalLabs/sequoia/config"
Expand Down Expand Up @@ -46,3 +47,26 @@ func VersionHandler(wallet *wallet.Wallet) func(http.ResponseWriter, *http.Reque
}
}
}

func NetworkHandler(wallet *wallet.Wallet) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
status, err := wallet.Client.RPCClient.Status(context.Background())
if err != nil {
w.WriteHeader(500)
return
}

grpcStatus := wallet.Client.GRPCConn.GetState()

v := types.NetworkResponse{
GRPCStatus: grpcStatus.String(),
RPCStatus: status,
}

err = json.NewEncoder(w).Encode(v)
if err != nil {
log.Error().Err(err)
return
}
}
}
4 changes: 4 additions & 0 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ func NewAPI(port int64) *API {
}

func (a *API) Close() error {
if a.srv == nil {
return fmt.Errorf("no server available")
}
return a.srv.Close()
}

Expand All @@ -56,6 +59,7 @@ func (a *API) Serve(f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.
r.HandleFunc("/dump", DumpDBHandler(f))

r.HandleFunc("/version", VersionHandler(wallet))
r.HandleFunc("/network", NetworkHandler(wallet))

r.Handle("/metrics", promhttp.Handler())
r.Use(loggingMiddleware)
Expand Down
10 changes: 9 additions & 1 deletion api/types/responses.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package types

import "github.com/libp2p/go-libp2p/core/peer"
import (
"github.com/libp2p/go-libp2p/core/peer"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
)

type UploadResponse struct {
Merkle []byte `json:"merkle"`
Expand All @@ -19,6 +22,11 @@ type VersionResponse struct {
ChainID string `json:"chain-id"`
}

type NetworkResponse struct {
GRPCStatus string `json:"grpc-status"`
RPCStatus *coretypes.ResultStatus `json:"rpc-status"`
}

type IndexResponse struct {
Status string `json:"status"`
Address string `json:"address"`
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func RootCmd() *cobra.Command {
r.PersistentFlags().String(types.FlagHome, types.DefaultHome, "sets the home directory for sequoia")
r.PersistentFlags().String(types.FlagLogLevel, types.DefaultLogLevel, "log level. info|error|debug")

r.AddCommand(StartCmd(), wallet.WalletCmd(), InitCmd(), VersionCmd(), IPFSCmd())
r.AddCommand(StartCmd(), wallet.WalletCmd(), InitCmd(), VersionCmd(), IPFSCmd(), SalvageCmd())

return r
}
Expand Down
45 changes: 45 additions & 0 deletions cmd/salvage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cmd

import (
"github.com/JackalLabs/sequoia/cmd/types"
"github.com/JackalLabs/sequoia/core"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)

func SalvageCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "jprovd-salvage",
Short: "salvage unmigrated jprovd files to sequoia. jprovd directory required as arg",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
home, err := cmd.Flags().GetString(types.FlagHome)
if err != nil {
return err
}

logLevel, err := cmd.Flags().GetString(types.FlagLogLevel)
if err != nil {
return err
}

if logLevel == "info" {
log.Logger = log.Level(zerolog.InfoLevel)
} else if logLevel == "debug" {
log.Logger = log.Level(zerolog.DebugLevel)
} else if logLevel == "error" {
log.Logger = log.Level(zerolog.ErrorLevel)
}

app, err := core.NewApp(home)
if err != nil {
return err
}

return app.Salvage(args[0])
},
}

return cmd
}
2 changes: 1 addition & 1 deletion config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func DefaultConfig() *Config {
},
ChainCfg: types.ChainConfig{
RPCAddr: "http://localhost:26657",
GRPCAddr: "127.0.0.1:9090",
GRPCAddr: "localhost:9090",
GasPrice: "0.02ujkl",
GasAdjustment: 1.5,
Bech32Prefix: "jkl",
Expand Down
141 changes: 136 additions & 5 deletions core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
walletTypes "github.com/desmos-labs/cosmos-go-wallet/types"
"github.com/desmos-labs/cosmos-go-wallet/wallet"
"github.com/dgraph-io/badger/v4"
storageTypes "github.com/jackalLabs/canine-chain/v3/x/storage/types"
"github.com/jackalLabs/canine-chain/v4/x/storage/types"
storageTypes "github.com/jackalLabs/canine-chain/v4/x/storage/types"
"github.com/rs/zerolog/log"

"github.com/JackalLabs/sequoia/recycle"
)

type App struct {
Expand Down Expand Up @@ -218,14 +221,28 @@ func (a *App) Start() error {
return err
}

a.q = queue.NewQueue(w, cfg.QueueInterval)
go a.q.Listen()

prover := proofs.NewProver(w, a.q, a.fileSystem, cfg.ProofInterval, cfg.ProofThreads, int(params.ChunkSize))

recycleDepot, err := recycle.NewRecycleDepot(
a.home,
myAddress,
params.ChunkSize,
a.fileSystem,
prover,
types.NewQueryClient(w.Client.GRPCConn),
)
if err != nil {
return err
}

myUrl := cfg.Ip

log.Info().Msg(fmt.Sprintf("Provider started as: %s", myAddress))

a.q = queue.NewQueue(w, cfg.QueueInterval)
go a.q.Listen()

a.prover = proofs.NewProver(w, a.q, a.fileSystem, cfg.ProofInterval, cfg.ProofThreads, int(params.ChunkSize))
a.prover = prover
a.strayManager = strays.NewStrayManager(w, a.q, cfg.StrayManagerCfg.CheckInterval, cfg.StrayManagerCfg.RefreshInterval, cfg.StrayManagerCfg.HandCount, claimers)
a.monitor = monitoring.NewMonitor(w)

Expand All @@ -235,6 +252,7 @@ func (a *App) Start() error {
go a.prover.Start()
go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize)
go a.monitor.Start()
go recycleDepot.Start(cfg.StrayManagerCfg.CheckInterval)

done := make(chan os.Signal, 1)
defer signal.Stop(done) // undo signal.Notify effect
Expand All @@ -244,6 +262,7 @@ func (a *App) Start() error {

fmt.Println("Shutting Sequoia down safely...")

recycleDepot.Stop()
_ = a.api.Close()
a.q.Stop()
a.prover.Stop()
Expand All @@ -255,3 +274,115 @@ func (a *App) Start() error {

return nil
}

func (a *App) Salvage(jprovdHome string) error {
cfg, err := config.Init(a.home)
if err != nil {
return err
}

w, err := config.InitWallet(a.home)
if err != nil {
return err
}

myAddress := w.AccAddress()

queryParams := &storageTypes.QueryProvider{
Address: myAddress,
}

cl := storageTypes.NewQueryClient(w.Client.GRPCConn)

claimers := make([]string, 0)

res, err := cl.Provider(context.Background(), queryParams)
if err != nil {
log.Info().Msg("Provider does not exist on network or is not connected...")
err := initProviderOnChain(w, cfg.Ip, cfg.TotalSpace)
if err != nil {
return err
}
} else {
claimers = res.Provider.AuthClaimers

totalSpace, err := strconv.ParseInt(res.Provider.Totalspace, 10, 64)
if err != nil {
return err
}
if totalSpace != cfg.TotalSpace {
err := updateSpace(w, cfg.TotalSpace)
if err != nil {
return err
}
}
if res.Provider.Ip != cfg.Ip {
err := updateIp(w, cfg.Ip)
if err != nil {
return err
}
}
}

params, err := a.GetStorageParams(w.Client.GRPCConn)
if err != nil {
return err
}

a.q = queue.NewQueue(w, cfg.QueueInterval)
go a.q.Listen()

prover := proofs.NewProver(w, a.q, a.fileSystem, cfg.ProofInterval, cfg.ProofThreads, int(params.ChunkSize))

recycleDepot, err := recycle.NewRecycleDepot(
a.home,
myAddress,
params.ChunkSize,
a.fileSystem,
prover,
types.NewQueryClient(w.Client.GRPCConn),
)
if err != nil {
return err
}

myUrl := cfg.Ip

log.Info().Msg(fmt.Sprintf("Provider started as: %s", myAddress))

a.prover = prover
a.strayManager = strays.NewStrayManager(w, a.q, cfg.StrayManagerCfg.CheckInterval, cfg.StrayManagerCfg.RefreshInterval, cfg.StrayManagerCfg.HandCount, claimers)
a.monitor = monitoring.NewMonitor(w)

done := make(chan os.Signal, 1)
defer signal.Stop(done) // undo signal.Notify effect

// Starting the 4 concurrent services
// nolint:all
go a.api.Serve(a.fileSystem, a.prover, w, params.ChunkSize)
go a.prover.Start()
go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize)
go a.monitor.Start()
go recycleDepot.Start(cfg.StrayManagerCfg.CheckInterval)

err = recycleDepot.SalvageFiles(jprovdHome)
if err != nil {
return err
}

signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
<-done // Will block here until user hits ctrl+c

fmt.Println("Shutting Sequoia down safely...")

_ = a.api.Close()
a.q.Stop()
a.prover.Stop()
a.strayManager.Stop()
a.monitor.Stop()
recycleDepot.Stop()
time.Sleep(time.Second * 30) // give the program some time to shut down
a.fileSystem.Close()

return nil
}
42 changes: 42 additions & 0 deletions file_system/recycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package file_system

import (
"bytes"
"context"
"fmt"
"io"

badger "github.com/dgraph-io/badger/v4"
)

func (f *FileSystem) salvageFile(r io.Reader, chunkSize int64) ([]byte, int, error) {
root, _, chunks, size, err := BuildTree(r, chunkSize)
if err != nil {
return nil, 0, err
}

// TODO: there must be a better way to do this
data := make([]byte, 0)
for _, chunk := range chunks {
data = append(data, chunk...)
}

buf := bytes.NewBuffer(data)
node, err := f.ipfs.AddFile(context.Background(), buf, nil)
if err != nil {
return nil, 0, err
}

err = f.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(fmt.Sprintf("cid/%x", root)), []byte(node.Cid().String()))
})
if err != nil {
return nil, 0, err
}

return root, size, nil
}

func (f *FileSystem) SalvageFile(r io.Reader, chunkSize int64) (merkle []byte, size int, err error) {
return f.salvageFile(r, chunkSize)
}
Loading

0 comments on commit 2b5d067

Please sign in to comment.