Skip to content

Commit

Permalink
Merge pull request #65 from JackalLabs/resume-salvage
Browse files Browse the repository at this point in the history
Resume salvage
  • Loading branch information
TheMarstonConnell authored Aug 22, 2024
2 parents 953bc9d + 2b006fc commit 756e990
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 14 deletions.
25 changes: 25 additions & 0 deletions api/recycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package api

import (
"net/http"

"github.com/JackalLabs/sequoia/api/types"
"github.com/JackalLabs/sequoia/recycle"
"github.com/rs/zerolog/log"
)

func RecycleSalvageHandler(rd *recycle.RecycleDepot) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
v := types.RecycleSalvageResponse{
TotalJackalProviderFiles: rd.TotalJprovFiles,
SalvagedFilesCount: rd.SalvagedFilesCount,
IsSalvageFinished: rd.SalvagedFilesCount == rd.TotalJprovFiles,
}

err := json.NewEncoder(w).Encode(v)
if err != nil {
log.Error().Err(err)
return
}
}
}
8 changes: 6 additions & 2 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/rs/cors"

"github.com/JackalLabs/sequoia/file_system"
"github.com/JackalLabs/sequoia/recycle"

"github.com/prometheus/client_golang/prometheus/promhttp"

Expand All @@ -17,8 +18,9 @@ import (

"github.com/desmos-labs/cosmos-go-wallet/wallet"
"github.com/gorilla/mux"

jsoniter "github.com/json-iterator/go"
)
import jsoniter "github.com/json-iterator/go"

var json = jsoniter.ConfigCompatibleWithStandardLibrary

Expand All @@ -40,7 +42,7 @@ func (a *API) Close() error {
return a.srv.Close()
}

func (a *API) Serve(f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) error {
func (a *API) Serve(rd *recycle.RecycleDepot, f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) error {
defer log.Info().Msg("API module stopped")
r := mux.NewRouter()
r.HandleFunc("/", IndexHandler(wallet.AccAddress()))
Expand All @@ -63,6 +65,8 @@ func (a *API) Serve(f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.
r.HandleFunc("/version", VersionHandler(wallet))
r.HandleFunc("/network", NetworkHandler(wallet))

r.HandleFunc("/recycle/salvage", RecycleSalvageHandler(rd))

r.Handle("/metrics", promhttp.Handler())
r.Use(loggingMiddleware)

Expand Down
6 changes: 6 additions & 0 deletions api/types/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,9 @@ type CidFolderResponse struct {
type CidMapResponse struct {
CidMap map[string]string `json:"cid_map"`
}

type RecycleSalvageResponse struct {
TotalJackalProviderFiles int64 `json:"total_jackal_provider_files"`
SalvagedFilesCount int64 `json:"salvaged_files_count"`
IsSalvageFinished bool `json:"is_salvage_finished"`
}
4 changes: 2 additions & 2 deletions core/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (a *App) Start() error {

// Starting the 4 concurrent services
// nolint:all
go a.api.Serve(a.fileSystem, a.prover, w, params.ChunkSize)
go a.api.Serve(recycleDepot, a.fileSystem, a.prover, w, params.ChunkSize)
go a.prover.Start()
go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize)
go a.monitor.Start()
Expand Down Expand Up @@ -360,7 +360,7 @@ func (a *App) Salvage(jprovdHome string) error {

// Starting the 4 concurrent services
// nolint:all
go a.api.Serve(a.fileSystem, a.prover, w, params.ChunkSize)
go a.api.Serve(recycleDepot, a.fileSystem, a.prover, w, params.ChunkSize)
go a.prover.Start()
go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize)
go a.monitor.Start()
Expand Down
90 changes: 87 additions & 3 deletions recycle/recycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"time"

"github.com/JackalLabs/jackal-provider/jprov/archive"
Expand All @@ -24,6 +26,73 @@ func (r *RecycleDepot) salvageFile(jprovArchive archive.Archive, fid string) ([]
return r.fs.SalvageFile(file, r.chunkSize)
}

func (r *RecycleDepot) lastSalvagedFile(record *os.File) (string, error) {
// read backwards from the end of the file to find the last salvaged file
line := ""
var cursor int64 = 0
stat, _ := record.Stat()
filesize := stat.Size()

if filesize == 0 {
return "", nil
}

for {
cursor -= 1
_, err := record.Seek(cursor, io.SeekEnd)
if err != nil {
return "", err
}

char := make([]byte, 1)
_, err = record.Read(char)
if err != nil {
return "", err
}

if cursor != -1 && (char[0] == 10) {
break
}

line = fmt.Sprintf("%s%s", string(char), line)

if cursor == -filesize {
break
}
}

if len(line) == 0 {
return "", nil
}

substrs := strings.Split(line, ",")

_, err := record.Seek(0, io.SeekEnd)
if err != nil {
return "", err
}

fid := strings.TrimSuffix(substrs[2], "\n")

return fid, nil
}

func countJklFiles(dirList []fs.DirEntry) int64 {
var c int64 = 0

for _, d := range dirList {
if !d.IsDir() {
continue
}

if strings.HasPrefix(d.Name(), "jklf") {
c++
}
}

return c
}

func (r *RecycleDepot) SalvageFiles(jprovdHome string) error {
log.Info().Msg("salvaging jprovd files...")
recordFile, err := os.OpenFile(
Expand All @@ -46,13 +115,28 @@ func (r *RecycleDepot) SalvageFiles(jprovdHome string) error {
log.Error().Err(err).Msg("failed to read jprovd storage directory")
return err
}
r.TotalJprovFiles = countJklFiles(dirList)

lastSalvaged, err := r.lastSalvagedFile(recordFile)
if err != nil {
return err
}
lastSalvagedFound := false

salvaged := 0
r.SalvagedFilesCount = 0
for _, d := range dirList {
if !d.IsDir() {
continue
}

if lastSalvaged != "" && !lastSalvagedFound {
if d.Name() == lastSalvaged {
lastSalvagedFound = true
}
r.SalvagedFilesCount++
continue // skip the last salvage record to avoid duplicate
}

merkle, size, err := r.salvageFile(jprovArchive, d.Name())
if err != nil {
log.Error().Err(err).Str("fid", d.Name()).Msg("failed to salvage file")
Expand All @@ -75,10 +159,10 @@ func (r *RecycleDepot) SalvageFiles(jprovdHome string) error {
Msg("failed to record salvage info")
}

salvaged++
r.SalvagedFilesCount++
}

log.Info().Int("count", salvaged).Msg("salvaging finished...")
log.Info().Int("count", int(r.SalvagedFilesCount)).Msg("salvaging finished...")
return nil
}

Expand Down
16 changes: 9 additions & 7 deletions recycle/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
const salvageRecordFileName = "salvage_record"

type RecycleDepot struct {
fs *fs.FileSystem
stop bool
chunkSize int64
homeDir string
queryClient types.QueryClient
address string
prover *proofs.Prover
fs *fs.FileSystem
stop bool
chunkSize int64
homeDir string
queryClient types.QueryClient
address string
prover *proofs.Prover
TotalJprovFiles int64
SalvagedFilesCount int64
}

func NewRecycleDepot(home string, address string, chunkSize int64, fs *fs.FileSystem, prover *proofs.Prover, queryClient types.QueryClient) (*RecycleDepot, error) {
Expand Down

0 comments on commit 756e990

Please sign in to comment.