Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warp test #126

Draft
wants to merge 13 commits into
base: staging
Choose a base branch
from
17 changes: 8 additions & 9 deletions cmd/gateway/zcn/dStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -147,7 +146,7 @@ func getSingleRegularRef(alloc *sdk.Allocation, remotePath string) (*sdk.ORef, e
oREsult, err := alloc.GetRefs(remotePath, "", "", "", "", "regular", level, 1)
if err != nil {
logger.Error("error with GetRefs", err.Error(), " this is the error")
fmt.Println("error with GetRefs", err)
// fmt.Println("error with GetRefs", err)
if isConsensusFailedError(err) {
time.Sleep(retryWaitTime)
oREsult, err = alloc.GetRefs(remotePath, "", "", "", "", "regular", level, 1)
Expand Down Expand Up @@ -175,7 +174,7 @@ var (
)

func getObjectRef(alloc *sdk.Allocation, bucket, object, remotePath string) (*minio.ObjectInfo, bool, error) {
log.Printf("~~~~~~~~~~~~~~~~~~~~~~~~ get object info remotePath: %v\n", remotePath)
// log.Printf("~~~~~~~~~~~~~~~~~~~~~~~~ get object info remotePath: %v\n", remotePath)
var isEncrypted bool
ref, err := getSingleRegularRef(alloc, remotePath)
if err != nil {
Expand All @@ -188,7 +187,7 @@ func getObjectRef(alloc *sdk.Allocation, bucket, object, remotePath string) (*mi
isEncrypted = true
}

log.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~get object info, ref: ", ref)
// log.Println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~get object info, ref: ", ref)

return &minio.ObjectInfo{
Bucket: bucket,
Expand Down Expand Up @@ -255,10 +254,10 @@ func getFileReader(ctx context.Context,
}
fileRangeSize = objectInfo.Size - rangeStart
}
log.Println("^^^^^^^^getFileReader: starting download: ", startBlock, endBlock, rangeStart, rangeEnd, fileRangeSize)
// log.Println("^^^^^^^^getFileReader: starting download: ", startBlock, endBlock, rangeStart, rangeEnd, fileRangeSize)
var r sys.File
if startBlock == 1 && endBlock == 0 {
log.Println("^^^^^^^^getFileReader: stream download ")
// log.Println("^^^^^^^^getFileReader: stream download ")
pr, pw := io.Pipe()
r = &pipeFile{w: pw}
go func() {
Expand Down Expand Up @@ -316,7 +315,7 @@ func getFileReader(ctx context.Context,

// create a new limited reader
f := io.LimitReader(r, fileRangeSize)
log.Println("^^^^^^^^getFileReader: finish download")
// log.Println("^^^^^^^^getFileReader: finish download")
fCloser := func() {
r.Close() //nolint:errcheck
if localFilePath != "" {
Expand All @@ -328,7 +327,7 @@ func getFileReader(ctx context.Context,
}

func putFile(ctx context.Context, alloc *sdk.Allocation, remotePath, contentType string, r io.Reader, size int64, isUpdate, shouldEncrypt bool) (err error) {
logger.Info("started PutFile")
// logger.Info("started PutFile")
fileName := filepath.Base(remotePath)
fileMeta := sdk.FileMeta{
Path: "",
Expand All @@ -338,7 +337,7 @@ func putFile(ctx context.Context, alloc *sdk.Allocation, remotePath, contentType
RemoteName: fileName,
}

logger.Info("starting chunked upload")
// logger.Info("starting chunked upload")
opRequest := sdk.OperationRequest{
OperationType: constants.FileOperationInsert,
FileReader: newMinioReader(r),
Expand Down
6 changes: 4 additions & 2 deletions cmd/gateway/zcn/gateway-zcn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path"
"path/filepath"
"runtime/debug"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -140,6 +141,7 @@ func (z *ZCN) NewGatewayLayer(creds madmin.Credentials) (minio.ObjectLayer, erro
alloc: allocation,
metrics: minio.NewMetrics(),
}
debug.SetGCPercent(50)
workDir, err = homedir.Dir()
if err != nil {
return nil, err
Expand Down Expand Up @@ -422,7 +424,7 @@ func (zob *zcnObjects) ListObjects(ctx context.Context, bucket, prefix, marker,
} else {
remotePath = filepath.Join(rootPath, bucket, prefix)
}
log.Println("ListObjects remotePath: ", remotePath, " objFileType: ", objFileType)
// log.Println("ListObjects remotePath: ", remotePath, " objFileType: ", objFileType)
var ref *sdk.ORef
ref, err = getSingleRegularRef(zob.alloc, remotePath)
if err != nil {
Expand Down Expand Up @@ -466,7 +468,7 @@ func (zob *zcnObjects) ListObjects(ctx context.Context, bucket, prefix, marker,
if delimiter != "" {
isDelimited = true
}
log.Println("ListObjects listRegularRefs: ", remotePath, " objFileType: ", objFileType)
// log.Println("ListObjects listRegularRefs: ", remotePath, " objFileType: ", objFileType)
refs, isTruncated, nextMarker, prefixes, err := listRegularRefs(zob.alloc, remotePath, marker, objFileType, maxKeys, isDelimited)
if err != nil {
if remotePath == rootPath && isPathNoExistError(err) {
Expand Down
6 changes: 5 additions & 1 deletion cmd/gateway/zcn/initSDK.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"

"github.com/0chain/gosdk/core/conf"
"github.com/0chain/gosdk/core/logger"
"github.com/0chain/gosdk/zboxcore/blockchain"
"github.com/0chain/gosdk/zboxcore/client"
"github.com/0chain/gosdk/zboxcore/sdk"
"github.com/0chain/gosdk/zcncore"
"github.com/mitchellh/go-homedir"
Expand Down Expand Up @@ -85,6 +87,8 @@ func initializeSDK(configDir, allocid string, nonce int64) error {
logger.SyncLoggers([]*logger.Logger{zcncore.GetLogger(), sdk.GetLogger()})
zcncore.SetLogFile("cmdlog.log", true)
sdk.SetLogFile("cmd.log", true)
sdk.SetLogLevel(2)
zcncore.SetLogLevel(2)

err = zcncore.InitZCNSDK(cfg.BlockWorker, cfg.SignatureScheme,
zcncore.WithChainID(cfg.ChainID),
Expand All @@ -99,7 +103,7 @@ func initializeSDK(configDir, allocid string, nonce int64) error {
if err != nil {
return err
}

log.Println("SDK initialized: ", client.GetClientID(), " ", client.GetClientPublicKey())
blockchain.SetMaxTxnQuery(cfg.MaxTxnQuery)
blockchain.SetQuerySleepTime(cfg.QuerySleepTime)
conf.InitClientConfig(&cfg)
Expand Down
57 changes: 29 additions & 28 deletions cmd/gateway/zcn/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type MultiPartFile struct {
lastPartUpdated bool
errorC chan error
seqPQ *seqpriorityqueue.SeqPriorityQueue
doneC chan struct{} // indicates that the uploading is done
cancelC chan struct{} // indicate the cancel of the uploading
dataC chan []byte // data to be uploaded
}
Expand Down Expand Up @@ -81,11 +80,11 @@ func (mpf *MultiPartFile) UpdateFileSize(partID int, size int64) {
// this is last part
mpf.fileSize = int64(partID-1)*mpf.lastPartSize + size
mpf.lastPartUpdated = true
log.Println("see last part, partID:", partID, "file size:", mpf.fileSize)
// log.Println("see last part, partID:", partID, "file size:", mpf.fileSize)
}

func (zob *zcnObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (uploadID string, err error) {
log.Println("initial multipart upload, partNumber:", opts.PartNumber)
// log.Println("initial multipart upload, partNumber:", opts.PartNumber)
contentType := opts.UserDefined["content-type"]
if contentType == "" {
contentType = mimedb.TypeByExtension(path.Ext(object))
Expand Down Expand Up @@ -131,15 +130,14 @@ func (zob *zcnObjects) newMultiPartUpload(localStorageDir, bucket, object, conte
memFile: memFile,
seqPQ: seqpriorityqueue.NewSeqPriorityQueue(),
errorC: make(chan error, 1),
doneC: make(chan struct{}),
dataC: make(chan []byte, 20),
cancelC: make(chan struct{}, 1),
}
FileMap[uploadID] = multiPartFile
mapLock.Unlock()
// Create the bucket directory if it doesn't exist
bucketPath := filepath.Join(localStorageDir, bucket, uploadID, object)
log.Println("bucketPath:", bucketPath)
// log.Println("bucketPath:", bucketPath)
if err := os.MkdirAll(bucketPath, os.ModePerm); err != nil {
log.Println(err)
return "", fmt.Errorf("erro creating bucket: %v", err)
Expand All @@ -156,21 +154,21 @@ func (zob *zcnObjects) newMultiPartUpload(localStorageDir, bucket, object, conte
zw = lz4.NewWriter(buf)
zw.Apply(lz4.CompressionLevelOption(lz4.Level1)) //nolint:errcheck
}
st := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
// st := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
for {
select {
case <-multiPartFile.cancelC:
log.Println("uploading is canceled, clean up temp dirs")
memFile.errChan <- fmt.Errorf("uploading is canceled")
// TODO: clean up temp dirs
_ = os.Remove(bucketPath)
cleanupPartFilesAndDirs(bucket, uploadID, localStorageDir)
return
case <-ctx.Done():
log.Println("uploading is timeout, clean up temp dirs")
memFile.errChan <- fmt.Errorf("uploading is timeout")
_ = os.Remove(bucketPath)
cleanupPartFilesAndDirs(bucket, uploadID, localStorageDir)
return
case data, ok := <-multiPartFile.dataC:
if ok {
Expand Down Expand Up @@ -211,7 +209,7 @@ func (zob *zcnObjects) newMultiPartUpload(localStorageDir, bucket, object, conte
cn := len(bbuf)

total += int64(cn)
log.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ uploaded:", total, " new:", cn)
// log.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ uploaded:", total, " new:", cn)
} else {
if toCompress {
err = zw.Close()
Expand All @@ -233,15 +231,15 @@ func (zob *zcnObjects) newMultiPartUpload(localStorageDir, bucket, object, conte
if end >= len(bbuf) {
end = len(bbuf)
memFileData.err = io.EOF
log.Println("uploading last part", current, end, end-current+1)
// log.Println("uploading last part", current, end, end-current+1)
}
memFileData.buf = bbuf[current:end]
multiPartFile.memFile.memFileDataChan <- memFileData
}
cn := len(bbuf)
close(multiPartFile.memFile.memFileDataChan)
total += int64(cn)
log.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ uploaded:", total, " new:", cn, " duration:", time.Since(st))
// log.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ uploaded:", total, " new:", cn, " duration:", time.Since(st))
if toCompress {
multiPartFile.fileSize = total
}
Expand Down Expand Up @@ -279,27 +277,26 @@ func (zob *zcnObjects) newMultiPartUpload(localStorageDir, bucket, object, conte
go func() {
// run this in background, will block until the data is written to memFile
// We should add ctx here to cancel the operation
multiPartFile.errorC <- zob.alloc.DoMultiOperation([]sdk.OperationRequest{operationRequest})

uploadErr := zob.alloc.DoMultiOperation([]sdk.OperationRequest{operationRequest})
if uploadErr != nil {
cleanupPartFilesAndDirs(bucket, uploadID, localStorageDir)
}
multiPartFile.errorC <- uploadErr
}()

for {
select {
case <-multiPartFile.cancelC:
log.Println("uploading is canceled, clean up temp dirs")
multiPartFile.memFile.errChan <- fmt.Errorf("uploading is canceled")
// TODO: clean up temp dirs
_ = os.Remove(bucketPath)
cleanupPartFilesAndDirs(bucket, uploadID, localStorageDir)
return
default:
partNumber := multiPartFile.seqPQ.Popup()
log.Println("==================================== popup part:", partNumber)
// log.Println("==================================== popup part:", partNumber)

if partNumber == -1 {
close(multiPartFile.dataC)
close(multiPartFile.doneC)
_ = os.Remove(bucketPath)
log.Println("==================================== popup done")
return
}

Expand All @@ -311,7 +308,10 @@ func (zob *zcnObjects) newMultiPartUpload(localStorageDir, bucket, object, conte
if err != nil {
log.Panicf("could not open part file: %v, err: %v", partFilename, err)
}
defer partFile.Close()
defer func() {
partFile.Close()
_ = os.Remove(partFilename)
}()
stat, err := partFile.Stat()
if err != nil {
log.Panicf("could not stat part file: %v, err: %v", partFilename, err)
Expand All @@ -323,7 +323,7 @@ func (zob *zcnObjects) newMultiPartUpload(localStorageDir, bucket, object, conte
}

multiPartFile.dataC <- data
log.Println("^^^^^^^^^ uploading part:", partNumber, "size:", len(data))
// log.Println("^^^^^^^^^ uploading part:", partNumber, "size:", len(data))
}()
}
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func (zob *zcnObjects) PutObjectPart(ctx context.Context, bucket, object, upload
}

seqPQ.Push(partID)
log.Println("VVVVVVVVVVVVVV pushed part:", partID)
// log.Println("VVVVVVVVVVVVVV pushed part:", partID)

// Calculate ETag for the part
// eTag := hex.EncodeToString(hash.Sum(nil))
Expand Down Expand Up @@ -404,7 +404,7 @@ func (zob *zcnObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
log.Println("Error uploading to Zus storage:", err)
return minio.ObjectInfo{}, fmt.Errorf("error uploading to Zus storage: %v", err)
}
log.Println("finish uploading!!")
// log.Println("finish uploading!!")

eTag, err := zob.constructCompleteObject(bucket, uploadID, object, localStorageDir)
if err != nil {
Expand All @@ -417,7 +417,7 @@ func (zob *zcnObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
// http.Error(w, "Error cleaning up part files and directories", http.StatusInternalServerError)
return minio.ObjectInfo{}, fmt.Errorf("error cleaning up part files and directories: %v", err)
}
log.Println("finish uploading: ", multiPartFile.fileSize, " name: ", object)
// log.Println("finish uploading: ", multiPartFile.fileSize, " name: ", object)
return minio.ObjectInfo{
Bucket: bucket,
Name: object,
Expand All @@ -436,7 +436,7 @@ func (zob *zcnObjects) constructCompleteObject(bucket, uploadID, object, localSt
partETagFilename := partFilename + ".etag"

// Break the loop when there are no more parts
if _, err := os.Stat(partFilename); os.IsNotExist(err) {
if _, err := os.Stat(partETagFilename); os.IsNotExist(err) {
break
}

Expand Down Expand Up @@ -523,15 +523,16 @@ func (zob *zcnObjects) ListObjectParts(ctx context.Context, bucket string, objec

for i := partNumberMarker; i <= maxParts; i++ {
partFilename := filepath.Join(localStorageDir, bucket, uploadID, object, fmt.Sprintf("part%d", i))
partETagFilename := partFilename + ".etag"
// Check if the part file exists
fs, err := os.Stat(partFilename)
fs, err := os.Stat(partETagFilename)
if err != nil {
// If the part file does not exist, we have reached the end of the parts list
break
}

// Read the ETag of the part
partETagFilename := partFilename + ".etag"

partETagBytes, err := os.ReadFile(partETagFilename)
if err != nil {
return minio.ListPartsInfo{}, fmt.Errorf("Unable to read part ETag: %w", err)
Expand Down
3 changes: 1 addition & 2 deletions cmd/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"sort"
Expand Down Expand Up @@ -1415,7 +1414,7 @@ func sendEvent(args eventArgs) {
if globalNotificationSys == nil {
return
}
log.Println("sendEvent:", args.EventName, args.Object.Name)
// log.Println("sendEvent:", args.EventName, args.Object.Name)
if globalHTTPListen.NumSubscribers() > 0 {
globalHTTPListen.Publish(args.ToEvent(false))
}
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565 h1:z+DtCR8mBsjPnEs
github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565/go.mod h1:UyDC8Qyl5z9lGkCnf9RHJPMektnFX8XtCJZHXCCVj8E=
github.com/0chain/errors v1.0.3 h1:QQZPFxTfnMcRdt32DXbzRQIfGWmBsKoEdszKQDb0rRM=
github.com/0chain/errors v1.0.3/go.mod h1:xymD6nVgrbgttWwkpSCfLLEJbFO6iHGQwk/yeSuYkIc=
github.com/0chain/gosdk v1.14.6 h1:wJcwlKZv9iUF3OrsuuY0nsQfL8oXlaIbh4uI9P0r+Ro=
github.com/0chain/gosdk v1.14.6/go.mod h1:tgAiVAuIy+Vs1tGfKCPEuuWWARwNQBEw32y950LrqrU=
github.com/0chain/gosdk v1.14.8-0.20240521143909-1b28bab767f3 h1:IsEjnuQ1/eINnOQUYJ8H0eZ9GOSSEftYeF2/yAuT4vo=
github.com/0chain/gosdk v1.14.8-0.20240521143909-1b28bab767f3/go.mod h1:tgAiVAuIy+Vs1tGfKCPEuuWWARwNQBEw32y950LrqrU=
github.com/Azure/azure-amqp-common-go/v2 v2.1.0/go.mod h1:R8rea+gJRuJR6QxTir/XuEd+YuKoUiazDC/N96FiDEU=
Expand Down
Loading
Loading