Skip to content
This repository has been archived by the owner on Aug 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #68 from JackalLabs/logging-imp
Browse files Browse the repository at this point in the history
Fix: Stray Memory Usage & Logging
  • Loading branch information
BiPhan4 authored May 12, 2023
2 parents d7e5a71 + e9f9f99 commit c5eece1
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 73 deletions.
4 changes: 4 additions & 0 deletions jprov/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@ func BuildApi(cmd *cobra.Command, q *queue.UploadQueue, router *httprouter.Route
router.GET("/api/network/status", func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
network.GetStatus(cmd, w, r, ps)
})

router.GET("/checkme", func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
network.GetProxy(cmd, w, r, ps)
})
}
6 changes: 3 additions & 3 deletions jprov/api/client/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
)

func ListQueue(cmd *cobra.Command, w http.ResponseWriter, r *http.Request, ps httprouter.Params, q *queue.UploadQueue) {
messages := make([]sdk.Msg, 0)
messages := make([]sdk.Msg, len(q.Queue))

for _, v := range q.Queue {
messages = append(messages, v.Message)
for k, v := range q.Queue {
messages[k] = v.Message
}

v := types.QueueResponse{
Expand Down
50 changes: 50 additions & 0 deletions jprov/api/network/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package network

import (
"encoding/json"
"fmt"
"net/http"
"net/url"

"github.com/JackalLabs/jackal-provider/jprov/types"
"github.com/julienschmidt/httprouter"
"github.com/spf13/cobra"
)

func GetProxy(cmd *cobra.Command, w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
ok := true
var versionRes types.VersionResponse
var versionUrl *url.URL
var res *http.Response

queries := r.URL.Query()
uri := queries.Get("route")

u, err := url.Parse(uri)
if err != nil {
ok = false
goto skip
}

versionUrl = u.JoinPath("version")
res, err = http.Get(versionUrl.String())
if err != nil {
ok = false
goto skip
}
err = json.NewDecoder(res.Body).Decode(&versionRes)
if err != nil {
ok = false
goto skip
}

skip:

okRes := types.ProxyResponse{
Ok: ok,
}
err = json.NewEncoder(w).Encode(okRes)
if err != nil {
fmt.Println(err)
}
}
2 changes: 1 addition & 1 deletion jprov/jprovd/provider_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func StartServerCommand() *cobra.Command {
cmd.Flags().String(types.VersionFlag, "", "The value exposed by the version api to allow for custom deployments.")
cmd.Flags().Bool(types.HaltStraysFlag, false, "Debug flag to stop picking up strays.")
cmd.Flags().Uint16(types.FlagInterval, 32, "The interval in seconds for which to check proofs. Must be >=1800 if you need a custom interval")
cmd.Flags().Uint(types.FlagThreads, 10, "The amount of stray threads.")
cmd.Flags().Uint(types.FlagThreads, 3, "The amount of stray threads.")
cmd.Flags().Int(types.FlagMaxMisses, 16, "The amount of intervals a provider can miss their proofs before removing a file.")
cmd.Flags().Int64(types.FlagChunkSize, 10240, "The size of a single file chunk.")
cmd.Flags().Int64(types.FlagStrayInterval, 20, "The interval in seconds to check for new strays.")
Expand Down
43 changes: 32 additions & 11 deletions jprov/server/file_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"mime/multipart"
"net/http"
"os"
"strings"
"sync"

"github.com/JackalLabs/jackal-provider/jprov/crypto"
Expand Down Expand Up @@ -49,30 +50,35 @@ func saveFile(file multipart.File, handler *multipart.FileHeader, sender string,
return err
}

cidhash := sha256.New()
cidHash := sha256.New()

_, err = io.WriteString(cidhash, fmt.Sprintf("%s%s%s", sender, address, fid))
var str strings.Builder // building the FID
str.WriteString(sender)
str.WriteString(address)
str.WriteString(fid)

_, err = io.WriteString(cidHash, str.String())
if err != nil {
return err
}
cid := cidhash.Sum(nil)
strcid, err := utils.MakeCid(cid)
cid := cidHash.Sum(nil)
strCid, err := utils.MakeCid(cid)
if err != nil {
return err
}

var wg sync.WaitGroup
wg.Add(1)

msg, ctrerr := MakeContract(cmd, fid, sender, &wg, q, merkle, fmt.Sprintf("%d", size))
if ctrerr != nil {
ctx.Logger.Error("CONTRACT ERROR: %v", ctrerr)
return ctrerr
msg, ctrErr := MakeContract(cmd, fid, sender, &wg, q, merkle, fmt.Sprintf("%d", size))
if ctrErr != nil {
ctx.Logger.Error("CONTRACT ERROR: %v", ctrErr)
return ctrErr
}
wg.Wait()

v := types.UploadResponse{
CID: strcid,
CID: strCid,
FID: fid,
}

Expand All @@ -92,7 +98,7 @@ func saveFile(file multipart.File, handler *multipart.FileHeader, sender string,
return err
}

err = utils.SaveToDatabase(fid, strcid, db, ctx.Logger)
err = utils.SaveToDatabase(fid, strCid, db, ctx.Logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -159,12 +165,18 @@ func StartFileServer(cmd *cobra.Command) {
Address: address,
}

_, err = queryClient.Providers(context.Background(), params)
me, err := queryClient.Providers(context.Background(), params)
if err != nil {
fmt.Println("Provider not initialized on the blockchain, or connection to the RPC node has been lost. Please make sure your RPC node is available then run `jprovd init` to fix this.")
return
}

providers, err := queryClient.ProvidersAll(context.Background(), &storageTypes.QueryAllProvidersRequest{})
if err != nil {
fmt.Println("Cannot connect to jackal blockchain.")
return
}

path := utils.GetDataPath(clientCtx)

db, dberr := leveldb.OpenFile(path, nil)
Expand Down Expand Up @@ -201,6 +213,15 @@ func StartFileServer(cmd *cobra.Command) {
providerName = "A Storage Provider"
}

//fmt.Println("Testing connection...")
//connected := testConnection(providers.Providers, me.Providers.Ip)
//if !connected {
// fmt.Println("Domain not configured correctly, make sure your domain points to your provider.")
// return
//}
_ = providers
_ = me

manager := strays.NewStrayManager(cmd) // creating and starting the stray management system
if !strs {
manager.Init(cmd, threads, db)
Expand Down
35 changes: 21 additions & 14 deletions jprov/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,30 +76,37 @@ func checkVersion(cmd *cobra.Command, w http.ResponseWriter, ctx *utils.Context)

func downfil(cmd *cobra.Command, w http.ResponseWriter, ps httprouter.Params, ctx *utils.Context) {
clientCtx := client.GetClientContextFromCmd(cmd)

files, err := os.ReadDir(utils.GetStoragePath(clientCtx, ps.ByName("file")))
chunkSize, err := cmd.Flags().GetInt64(types.FlagChunkSize)
if err != nil {
ctx.Logger.Error(err.Error())
fmt.Println(err)
return
}

var data []byte
var fileList []*[]byte

var dataLength int

for i := 0; i < len(files); i += 1 {
f, err := os.ReadFile(filepath.Join(utils.GetStoragePath(clientCtx, ps.ByName("file")), fmt.Sprintf("%d.jkl", i)))
var i int
for { // loop through every file in the directory and fail once it hits a file that it can't find
path := filepath.Join(utils.GetStoragePath(clientCtx, ps.ByName("file")), fmt.Sprintf("%d.jkl", i))
f, err := os.ReadFile(path)
if err != nil {
ctx.Logger.Info("Error can't open file!")
_, err = w.Write([]byte("cannot find file"))
if err != nil {
ctx.Logger.Error(err.Error())
}
return
break
}
fileList = append(fileList, &f)
dataLength += len(f)
i++
}

data := make([]byte, dataLength)

data = append(data, f...)
for i, file := range fileList {
for k, b := range *file {
data[i*int(chunkSize)+k] = b
}
}

w.Header().Set("Content-Length", fmt.Sprintf("%d", len(data)))
w.Header().Set("Content-Length", fmt.Sprintf("%d", dataLength))

_, err = w.Write(data)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion jprov/server/proofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func postProofs(cmd *cobra.Command, db *leveldb.DB, q *queue.UploadQueue, ctx *u
if interval < 1800 { // If the provider picked an interval that's less than 30 minutes, we generate a random interval for them anyways

r := rand.New(rand.NewSource(time.Now().UnixNano()))
interval = uint16(r.Intn(901) + 900) // Generate interval between 15-30 minutes
interval = uint16(r.Intn(1801) + 60) // Generate interval between 1-30 minutes

}
ctx.Logger.Debug(fmt.Sprintf("The interval between proofs is now %d", interval))
Expand Down
64 changes: 64 additions & 0 deletions jprov/server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,79 @@ package server

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"

"github.com/JackalLabs/jackal-provider/jprov/types"
"github.com/cosmos/cosmos-sdk/client"
storageTypes "github.com/jackalLabs/canine-chain/x/storage/types"
)

const ErrNotYours = "not your deal"

//nolint:all
func testConnection(providers []storageTypes.Providers, ip string) bool {
onlineProviders := 0
respondingProvider := 0
outdatedProvider := 0
checked := 0

for _, provider := range providers {
if onlineProviders > 20 {
continue
}
checked++
fmt.Printf("Checked with %d other providers...\n", checked)
u, err := url.Parse(provider.Ip)
if err != nil {
continue
}
versionUrl := u.JoinPath("version")
r, err := http.Get(versionUrl.String())
if err != nil {
continue
}
var versionRes types.VersionResponse
err = json.NewDecoder(r.Body).Decode(&versionRes)
if err != nil {
continue
}
onlineProviders++

proxyUrl := u.JoinPath("checkme")
vals := proxyUrl.Query()
vals.Add("route", ip)
proxyUrl.RawQuery = vals.Encode()
r, err = http.Get(proxyUrl.String())
if err != nil {
outdatedProvider++
continue
}
var proxyRes types.ProxyResponse
err = json.NewDecoder(r.Body).Decode(&proxyRes)
if err != nil {
outdatedProvider++
continue
}

respondingProvider++
}
fmt.Printf("Total: %d | Online: %d | Outdated: %d| Responsive: %d\n", checked, onlineProviders, outdatedProvider, respondingProvider)

if respondingProvider < 2 && (onlineProviders-outdatedProvider) < 3 {
return true
}

if respondingProvider < 2 {
return false
}

return true
}

func queryBlock(clientCtx *client.Context, cid string) (string, error) {
queryClient := storageTypes.NewQueryClient(clientCtx)

Expand Down
33 changes: 1 addition & 32 deletions jprov/strays/hand_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"math/rand"
"os"
"time"

"github.com/JackalLabs/jackal-provider/jprov/crypto"
"github.com/JackalLabs/jackal-provider/jprov/utils"
Expand All @@ -17,7 +15,6 @@ import (
"github.com/cosmos/cosmos-sdk/types/tx/signing"
authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing"
storageTypes "github.com/jackalLabs/canine-chain/x/storage/types"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

Expand Down Expand Up @@ -64,7 +61,7 @@ func (h *LittleHand) Process(ctx *utils.Context, m *StrayManager) { // process t
the cached file from our filesystem which keeps the file alive)
*/
if _, err := os.Stat(utils.GetStoragePath(h.ClientContext, h.Stray.Fid)); os.IsNotExist(err) {
ctx.Logger.Info("Nobody, not even I have the file.")
ctx.Logger.Info(fmt.Sprintf("Nobody, not even I have %s.", h.Stray.Fid))
return // If we don't have it and nobody else does, there is nothing we can do.
}
} else { // If there are providers with this file, we will download it from them instead to keep things consistent
Expand Down Expand Up @@ -309,31 +306,3 @@ func (h *LittleHand) Sign(txf txns.Factory, clientCtx client.Context, index byte
prevSignatures = append(prevSignatures, sig)
return txBuilder.SetSignatures(prevSignatures...)
}

func (m *StrayManager) CollectStrays(cmd *cobra.Command) {
m.Context.Logger.Info("Collecting strays from chain...")
qClient := storageTypes.NewQueryClient(m.ClientContext)

res, err := qClient.StraysAll(cmd.Context(), &storageTypes.QueryAllStraysRequest{})
if err != nil {
m.Context.Logger.Error(err.Error())
return
}

s := res.Strays

if len(s) == 0 { // If there are no strays, the network has claimed them all. We will try again later.
m.Context.Logger.Debug("No strays found.")
return
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(s), func(i, j int) { s[i], s[j] = s[j], s[i] })

for _, newStray := range s { // Only add new strays to the queue

k := newStray
m.Strays = append(m.Strays, &k)

}
}
Loading

0 comments on commit c5eece1

Please sign in to comment.