Skip to content

Commit

Permalink
Pinned videos until the limit was reached and unpinned videos if the …
Browse files Browse the repository at this point in the history
…total pinned storage exceeded the limit.
  • Loading branch information
nathansenn committed Aug 18, 2023
1 parent 17f424e commit ebe6fea
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 91 deletions.
140 changes: 66 additions & 74 deletions Rewards/rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/url"
"proofofaccess/ipfs"
"proofofaccess/localdata"
"strconv"
"strings"
"time"
)
Expand All @@ -21,7 +20,6 @@ type APIResponse struct {
VideoV2 string `json:"video_v2"`
} `json:"recommended"`
}

type ProofMessage struct {
Status string `json:"Status"`
Message string `json:"Message"`
Expand Down Expand Up @@ -216,93 +214,87 @@ type CIDSize struct {
}

func PinVideos(gb int, ctx context.Context) error {
// Connect to the local IPFS node
fmt.Println("Pinning videos")
sh := ipfs.Shell

// Calculate total pinned storage
const GB = 1024 * 1024 * 1024
limit := int64(gb * GB)
totalPinned := int64(0)
fmt.Println("Getting pins")
pins, err := sh.Pins()
if err != nil {
fmt.Println("Error getting pins")
fmt.Println(err)

// Convert ThreeSpeakVideos to a map for quicker lookups
videoMap := make(map[string]bool)
for _, cid := range localdata.ThreeSpeakVideos {
videoMap[cid] = true
}
fmt.Println("Got pins")

// Define the limit for the pinned storage
const GB = 1024 * 1024 * 1024
limit := int64(gb * GB)
fmt.Println("Limit: ", strconv.FormatInt(limit, 10))
// Generate list of CIDs with size
cidList := make([]CIDSize, len(localdata.ThreeSpeakVideos))
fmt.Println("Making CID list")
fmt.Println("Length of localdata.ThreeSpeakVideos: ")
fmt.Println(len(localdata.ThreeSpeakVideos))

for i, cid := range localdata.ThreeSpeakVideos {
select {
case <-ctx.Done():
return ctx.Err()
default:
if cid == "" {
fmt.Printf("Empty CID at index %d, skipping\n", i)
continue
}
fmt.Println("CID: " + cid)
stat, err := sh.ObjectStat(cid)
fmt.Println("Got object stats ", stat)
if err != nil {
fmt.Printf("Failed to get object stats for CID %s: %v, skipping\n", cid, err)
continue
}
// Check all the currently pinned CIDs
fmt.Println("Checking currently pinned CIDs")
allPinsData, _ := sh.Pins()

cidList[i] = CIDSize{
CID: cid,
Size: int64(stat.CumulativeSize),
}
totalPinned += int64(stat.CumulativeSize)
fmt.Println("Total pinned: " + strconv.FormatInt(totalPinned, 10))
}
// Map the allPins to only CIDs
allPins := make(map[string]bool)
for cid := range allPinsData {
allPins[cid] = true
}

fmt.Println("Got CID list")

// Pin new videos until limit is reached
for _, video := range cidList {
select {
case <-ctx.Done():
return ctx.Err()
default:
if totalPinned+video.Size > limit {
fmt.Println("Total pinned storage exceeds limit")
break
}
fmt.Println("Pinning CID: " + video.CID)
if err := sh.Pin(video.CID); err != nil {
fmt.Println("failed to pin CID %s: %w", video.CID, err)
for cid, pinInfo := range allPinsData {
// Filter only the direct pins
if pinInfo.Type == "recursive" {
if videoMap[cid] {
fmt.Println("CID is in the video list", cid)
// If the CID is in the video list, get its size and add to totalPinned
stat, err := sh.ObjectStat(cid)
if err != nil {
fmt.Printf("Error getting stats for CID %s: %s\n", cid, err)
continue
}
size := int64(stat.CumulativeSize)
totalPinned += size
fmt.Println("Total pinned: ", totalPinned)
} else {
// If the CID is not in the video list, unpin it
fmt.Println("Unpinning CID: ", cid)
if err := sh.Unpin(cid); err != nil {
fmt.Printf("Error unpinning CID %s: %s\n", cid, err)
continue
}
fmt.Println("Unpinned CID: ", cid)
}
fmt.Println("Pinned CID: " + video.CID)
totalPinned += video.Size
}
}
fmt.Println("Total pinned: ", totalPinned)

// Remove older videos if total pinned storage exceeds limit
for cid := range pins {
select {
case <-ctx.Done():
return ctx.Err()
default:
// Pin videos from ThreeSpeak until the limit is reached
for _, cid := range localdata.ThreeSpeakVideos {
if totalPinned >= limit {
fmt.Println("Total pinned is greater than limit")
break
}
fmt.Println("Getting stats for CID: ", cid)
// If CID isn't already in allPins, then it's not pinned
if _, pinned := allPins[cid]; !pinned {
stat, err := sh.ObjectStat(cid)
if err != nil {
fmt.Println("failed to get object stats for CID %s: %w", cid, err)
}
if totalPinned <= limit {
break
fmt.Printf("Error getting stats for CID %s: %s\n", cid, err)
continue
}
if err := sh.Unpin(cid); err != nil {
fmt.Println("failed to unpin CID %s: %w", cid, err)

size := int64(stat.CumulativeSize)
fmt.Println("Size: ", size)
fmt.Println("Total pinned: ", totalPinned)
fmt.Println("Limit: ", limit)
if totalPinned+size-1000000 <= limit {
fmt.Println("Pinning CID: ", cid)
if err := sh.Pin(cid); err != nil {
fmt.Printf("Failed to pin CID %s: %s\n", cid, err)
continue
}
totalPinned += size
// Once pinned, add it to the allPins
allPins[cid] = true
fmt.Println("Pinned CID: ", cid)
}
totalPinned -= int64(stat.CumulativeSize) // Use actual size of the unpinned video
} else {
fmt.Println("CID is already pinned")
}
}

Expand Down
2 changes: 1 addition & 1 deletion api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func handleStats(c *gin.Context) {
return
}
defer closeWebSocket(conn)
log.Info("Entering handleStats")
//log.Info("Entering handleStats")
stats(conn)
return
}
Expand Down
7 changes: 4 additions & 3 deletions api/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ func stats(c *websocket.Conn) {
peerSizes[peerName] = fmt.Sprintf("%d", localdata.PeerSize[peerName]/1024/1024/1024)
peerSynced[peerName] = fmt.Sprintf("%v", localdata.NodesStatus[peerName])
}
fmt.Println("Network Storage: ", NetworkStorage)
// fmt.Println("Network Storage: ", NetworkStorage)
// Print the Network Storage in GB
NetworkStorage = NetworkStorage / 1024 / 1024 / 1024
fmt.Println("Size: ", NetworkStorage, "GB")
fmt.Println("NodeType: ", localdata.NodeType)
// fmt.Println("Size: ", NetworkStorage, "GB")
// fmt.Println("NodeType: ", localdata.NodeType)
NodeType := ""
if localdata.NodeType == 1 {
NodeType = "Validator"
Expand All @@ -55,6 +55,7 @@ func stats(c *websocket.Conn) {
"PeerProofs": localdata.PeerProofs,
"PeerSynced": peerSynced,
"PeerHiveRewards": localdata.HiveRewarded,
"PeerCids": len(localdata.PeerCids),
}
localdata.Lock.Unlock()
jsonData, err := json.Marshal(data)
Expand Down
7 changes: 4 additions & 3 deletions api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ var wsMutex = &sync.Mutex{}

func upgradeToWebSocket(c *gin.Context) *websocket.Conn {
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
fmt.Println("upgradeToWebSocket")
//fmt.Println("upgradeToWebSocket")
if err != nil {
log.Error(err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to upgrade connection"})
return nil
}
fmt.Println("upgradeToWebSocket2")
//fmt.Println("upgradeToWebSocket2")

return conn
}

func closeWebSocket(conn *websocket.Conn) {
log.Info("Closing WebSocket connection")
// log.Info("Closing WebSocket connection")
err := conn.Close()
if err != nil {
return
Expand All @@ -67,6 +67,7 @@ func readWebSocketMessage(conn *websocket.Conn) (*message, error) {

func sendWsResponse(status string, message string, elapsed string, conn *websocket.Conn) {
localdata.Lock.Lock()
fmt.Println("sendWsResponse", status, message, elapsed)
err := conn.WriteJSON(ExampleResponse{
Status: status,
Message: message,
Expand Down
2 changes: 0 additions & 2 deletions ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ func SyncNode(NewPins map[string]interface{}, name string) {
if localdata.NodesStatus[name] != "Synced" {
localdata.Lock.Lock()
localdata.PeerSize[name] = peersize
fmt.Println("Peer size: ", peersize)
localdata.Lock.Unlock()
}
fmt.Println("Key not found: ", key)
Expand Down Expand Up @@ -339,7 +338,6 @@ func SyncNode(NewPins map[string]interface{}, name string) {
if localdata.NodesStatus[name] != "Synced" {
localdata.Lock.Lock()
localdata.PeerSize[name] = peersize
fmt.Println("Peer size: ", peersize)
localdata.Lock.Unlock()
}
fmt.Println("Key found: ", key)
Expand Down
1 change: 1 addition & 0 deletions localdata/localdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var Lock sync.Mutex
var PeerProofs = map[string]int{}
var PeerLastActive = map[string]time.Time{}
var HiveRewarded = map[string]float64{}
var PiningVideos = false

type NetworkRecord struct {
Peers int `json:"Peers"`
Expand Down
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func initialize(ctx context.Context) {
fmt.Println("Pinning and unpinning videos")
go Rewards.PinVideos(*storageLimit, ctx)
}
fmt.Println("Done pinning and unpinning videos")
}

database.Init()
Expand Down Expand Up @@ -218,7 +217,7 @@ func fetchPins(ctx context.Context) {
size, _ := ipfs.FileSize(key)
localdata.Lock.Lock()
PeerSize += size
fmt.Println("Peer size: ", localdata.PeerSize[localdata.NodeName])
//fmt.Println("Peer size: ", localdata.PeerSize[localdata.NodeName])
localdata.Lock.Unlock()
if !ipfs.IsPinnedInDB(key) {
localdata.Lock.Lock()
Expand Down
8 changes: 5 additions & 3 deletions messaging/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ func SendProof(req Request, hash string, seed string, user string) {
nodeType := localdata.NodeType
localdata.Lock.Unlock()
if wsPeers == req.User && nodeType == 1 {
wsMutex.Lock()
localdata.Lock.Lock()
ws := localdata.WsClients[req.User]
localdata.Lock.Unlock()
ws.WriteMessage(websocket.TextMessage, jsonData)
wsMutex.Unlock()
} else if localdata.UseWS == true && localdata.NodeType == 2 {
wsMutex.Lock()
localdata.Lock.Lock()
localdata.WsValidators["Validator1"].WriteMessage(websocket.TextMessage, jsonData)
wsMutex.Unlock()
localdata.Lock.Unlock()
} else {
pubsub.Publish(string(jsonData), user)
}
Expand Down Expand Up @@ -163,6 +164,7 @@ func HandleRequestProof(req Request) {
validationHash := validation.CreatProofHash(hash, CID)
SendProof(req, validationHash, hash, localdata.NodeName)
} else {
fmt.Println("Sending proof of access to validation node")
SendProof(req, hash, req.Seed, localdata.NodeName)
}

Expand Down
2 changes: 1 addition & 1 deletion messaging/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func StartWsClient() {
continue
}
go HandleMessage(string(message))
fmt.Println("Client recv: ", string(message))
//fmt.Println("Client recv: ", string(message))
} else {
log.Println("Connection is not established.")
time.Sleep(1 * time.Second) // Sleep for a second before next reconnection attempt
Expand Down
4 changes: 3 additions & 1 deletion public/node-stats.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,20 @@
var peerProofs = stats.PeerProofs; // get PeerProofs
var peerStatus = stats.PeerSynced; // get PeerStatus
var PeerHiveRewards = stats.PeerHiveRewards; // get PeerHiveRewards
var PeerCids = stats.PeerCids; // get PeerCids
var peerList = "";
for (var peer in peerSizes) {
var size = peerSizes[peer] || "N/A";
var lastActive = peerLastActive[peer] || "N/A";
var peerProof = peerProofs[peer] || "N/A";
var rawPeerHiveRewards = PeerHiveRewards[peer];
var rawPeerCids = PeerCids[peer];

var peerHiveRewards = "N/A";
if (rawPeerHiveRewards !== undefined && !isNaN(rawPeerHiveRewards)) {
peerHiveRewards = parseFloat(rawPeerHiveRewards).toFixed(3);
}
peerList += `${peer}: ${size} GB (Last Active: ${lastActive}) (Proof: ${peerProof}/10) (Status: ${peerStatus[peer]})(Hive Rewards: ${peerHiveRewards})<br />`;
peerList += `${peer}: ${size} GB (Last Active: ${lastActive}) (Proof: ${peerProof}/10) (Status: ${peerStatus[peer]})(Hive Rewards: ${peerHiveRewards}) (CidsPinned: ${rawPeerCids})<br>`;
}
document.getElementById("sync-status").innerHTML = syncStatus === "true" ? 'Synced' : 'Not Synced';
document.getElementById("sync-status").style.color = syncStatus === "true" ? 'green' : 'red';
Expand Down
2 changes: 1 addition & 1 deletion validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func CreatProofHash(hash string, CID string) string {
}
// Create the proof hash
proofHash = proofcrypto.HashFile(proofHash)
//fmt.Println("Proof Hash: ", proofHash)
fmt.Println("Proof Hash: ", proofHash)
return proofHash
}

Expand Down

0 comments on commit ebe6fea

Please sign in to comment.