diff --git a/core/sys/fs_mem.go b/core/sys/fs_mem.go index 7c338c920..cf009c47c 100644 --- a/core/sys/fs_mem.go +++ b/core/sys/fs_mem.go @@ -271,6 +271,7 @@ type MemChanFile struct { ChunkWriteSize int // 0 value means no limit Sys interface{} // FileInfo.Sys reader io.Reader + data []byte } func (f *MemChanFile) Stat() (fs.FileInfo, error) { @@ -287,24 +288,29 @@ func (f *MemChanFile) Read(p []byte) (int, error) { n := copy(p, recieveData) return n, nil } + func (f *MemChanFile) Write(p []byte) (n int, err error) { if f.ChunkWriteSize == 0 { f.Buffer <- p } else { - current := 0 - for ; current < len(p); current += f.ChunkWriteSize { - end := current + f.ChunkWriteSize - if end > len(p) { - end = len(p) - } - f.Buffer <- p[current:end] + if cap(f.data) == 0 { + f.data = make([]byte, 0, f.ChunkWriteSize) } + f.data = append(f.data, p...) } return len(p), nil - } func (f *MemChanFile) Sync() error { + current := 0 + for ; current < len(f.data); current += f.ChunkWriteSize { + end := current + f.ChunkWriteSize + if end > len(f.data) { + end = len(f.data) + } + f.Buffer <- f.data[current:end] + } + f.data = make([]byte, 0, f.ChunkWriteSize) return nil } func (f *MemChanFile) Seek(offset int64, whence int) (ret int64, err error) { diff --git a/wasmsdk/jsbridge/file_writer.go b/wasmsdk/jsbridge/file_writer.go index 9a2ef955c..25c7d9a28 100644 --- a/wasmsdk/jsbridge/file_writer.go +++ b/wasmsdk/jsbridge/file_writer.go @@ -14,6 +14,9 @@ var jsFileWriterMutex sync.Mutex type FileWriter struct { writableStream js.Value + uint8Array js.Value + fileHandle js.Value + bufLen int } func (w *FileWriter) Write(p []byte) (int, error) { @@ -21,9 +24,12 @@ func (w *FileWriter) Write(p []byte) (int, error) { jsFileWriterMutex.Lock() defer jsFileWriterMutex.Unlock() - uint8Array := js.Global().Get("Uint8Array").New(len(p)) - js.CopyBytesToJS(uint8Array, p) - _, err := Await(w.writableStream.Call("write", uint8Array)) + if w.bufLen != len(p) { + w.bufLen = len(p) + w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen) + } + js.CopyBytesToJS(w.uint8Array, p) + _, err := Await(w.writableStream.Call("write", w.uint8Array)) if len(err) > 0 && !err[0].IsNull() { return 0, errors.New("file_writer: " + err[0].String()) } @@ -55,14 +61,7 @@ func (w *FileWriter) Stat() (fs.FileInfo, error) { } func NewFileWriter(filename string) (*FileWriter, error) { - // writableStream := js.Global().Get("writableStream") - // stream, err := Await(writableStream.Call("create", filename)) - // if len(err) > 0 && !err[0].IsNull() { - // return nil, errors.New("file_writer: " + err[0].String()) - // } - // return &FileWriter{ - // writableStream: stream[0], - // }, nil + if !js.Global().Get("window").Get("showSaveFilePicker").Truthy() || !js.Global().Get("window").Get("WritableStream").Truthy() { return nil, errors.New("file_writer: not supported") } @@ -84,5 +83,6 @@ func NewFileWriter(filename string) (*FileWriter, error) { } return &FileWriter{ writableStream: writableStream[0], + fileHandle: fileHandle[0], }, nil } diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 35ef7377c..9a859d380 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -299,7 +299,7 @@ func (a *Allocation) GetBlobberStats() map[string]*BlobberAllocationStats { return result } -const downloadWorkerCount = 10 +const downloadWorkerCount = 6 func (a *Allocation) InitAllocation() { a.downloadChan = make(chan *DownloadRequest, 100) diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index d79715cd7..9420375fd 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net/http" - "strconv" "sync" "time" @@ -19,6 +18,7 @@ import ( zlogger "github.com/0chain/gosdk/zboxcore/logger" "github.com/0chain/gosdk/zboxcore/marker" "github.com/0chain/gosdk/zboxcore/zboxutil" + "golang.org/x/sync/semaphore" ) const ( @@ -45,6 +45,7 @@ type BlockDownloadRequest struct { result chan *downloadBlock shouldVerify bool connectionID string + respBuf []byte } type downloadResponse struct { @@ -74,21 +75,26 @@ func InitBlockDownloader(blobbers []*blockchain.StorageNode, workerCount int) { for _, blobber := range blobbers { if _, ok := downloadBlockChan[blobber.ID]; !ok { downloadBlockChan[blobber.ID] = make(chan *BlockDownloadRequest, workerCount) - for i := 0; i < workerCount; i++ { - blobberChan := downloadBlockChan[blobber.ID] - go startBlockDownloadWorker(blobberChan) - } + go startBlockDownloadWorker(downloadBlockChan[blobber.ID], workerCount) } } } -func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest) { +func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers int) { + sem := semaphore.NewWeighted(int64(workers)) for { blockDownloadReq, open := <-blobberChan if !open { break } - blockDownloadReq.downloadBlobberBlock() + if err := sem.Acquire(blockDownloadReq.ctx, 1); err != nil { + blockDownloadReq.result <- &downloadBlock{Success: false, idx: blockDownloadReq.blobberIdx, err: err} + continue + } + go func() { + blockDownloadReq.downloadBlobberBlock() + sem.Release(1) + }() } } @@ -155,45 +161,49 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() { if req.chunkSize == 0 { req.chunkSize = CHUNK_SIZE } + + if resp.StatusCode == http.StatusTooManyRequests { + shouldRetry = true + time.Sleep(time.Second * 2) + return errors.New(RateLimitError, "Rate limit error") + } + + if resp.StatusCode == http.StatusInternalServerError { + shouldRetry = true + return errors.New("internal_server_error", "Internal server error") + } + var rspData downloadBlock - respLen := resp.Header.Get("Content-Length") - var respBody []byte - if respLen != "" { - len, err := strconv.Atoi(respLen) - zlogger.Logger.Info("respLen", len) - if err != nil { - zlogger.Logger.Error("respLen convert error: ", err) - return err - } - respBody, err = readBody(resp.Body, len) + if req.shouldVerify { + req.respBuf, err = io.ReadAll(resp.Body) if err != nil { zlogger.Logger.Error("respBody read error: ", err) return err } } else { - respBody, err = readBody(resp.Body, int(req.numBlocks)*req.chunkSize) + req.respBuf, err = readBody(resp.Body, req.respBuf) if err != nil { zlogger.Logger.Error("respBody read error: ", err) return err } } if resp.StatusCode != http.StatusOK { - zlogger.Logger.Debug(fmt.Sprintf("downloadBlobberBlock FAIL - blobberID: %v, clientID: %v, blockNum: %d, retry: %d, response: %v", req.blobber.ID, client.GetClientID(), header.BlockNum, retry, string(respBody))) - if err = json.Unmarshal(respBody, &rspData); err == nil { + zlogger.Logger.Debug(fmt.Sprintf("downloadBlobberBlock FAIL - blobberID: %v, clientID: %v, blockNum: %d, retry: %d, response: %v", req.blobber.ID, client.GetClientID(), header.BlockNum, retry, string(req.respBuf))) + if err = json.Unmarshal(req.respBuf, &rspData); err == nil { return errors.New("download_error", fmt.Sprintf("Response status: %d, Error: %v,", resp.StatusCode, rspData.err)) } - return errors.New("response_error", string(respBody)) + return errors.New("response_error", string(req.respBuf)) } dR := downloadResponse{} contentType := resp.Header.Get("Content-Type") if contentType == "application/json" { - err = json.Unmarshal(respBody, &dR) + err = json.Unmarshal(req.respBuf, &dR) if err != nil { return err } } else { - dR.Data = respBody + dR.Data = req.respBuf } if req.contentMode == DOWNLOAD_CONTENT_FULL && req.shouldVerify { @@ -235,17 +245,17 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() { if err != nil { if shouldRetry { - retry = 0 + if retry >= 3 { + req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err} + return + } shouldRetry = false zlogger.Logger.Debug("Retrying for Error occurred: ", err) + retry++ continue - } - if retry >= 3 { + } else { req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err} - return } - retry++ - continue } return } @@ -254,22 +264,30 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() { } -func AddBlockDownloadReq(req *BlockDownloadRequest) { +func AddBlockDownloadReq(ctx context.Context, req *BlockDownloadRequest, rb *zboxutil.DownloadBuffer, effectiveBlockSize int) { + if rb != nil { + reqCtx, cncl := context.WithTimeout(ctx, (time.Second * 10)) + defer cncl() + req.respBuf = rb.RequestChunk(reqCtx, int(req.blockNum/req.numBlocks)) + if len(req.respBuf) == 0 { + req.respBuf = make([]byte, int(req.numBlocks)*effectiveBlockSize) + } + } downloadBlockChan[req.blobber.ID] <- req } -func readBody(r io.Reader, size int) ([]byte, error) { - b := make([]byte, 0, size) +func readBody(r io.Reader, b []byte) ([]byte, error) { + start := 0 + if len(b) == 0 { + return nil, fmt.Errorf("readBody: empty buffer") + } for { - if len(b) == cap(b) { - // Add more capacity (let append pick how much). - b = append(b, 0)[:len(b)] - } - n, err := r.Read(b[len(b):cap(b)]) - b = b[:len(b)+n] + n, err := r.Read(b[start:]) + start += n if err != nil { if err == io.EOF { err = nil + b = b[:start] } return b, err } diff --git a/zboxcore/sdk/downloader_option.go b/zboxcore/sdk/downloader_option.go index d6d9e6839..78c9f335a 100644 --- a/zboxcore/sdk/downloader_option.go +++ b/zboxcore/sdk/downloader_option.go @@ -2,7 +2,7 @@ package sdk import "github.com/0chain/gosdk/core/sys" -const DefaultBlocksPerMarker int = 10 +const DefaultBlocksPerMarker int = 100 // DownloadOption set download option type DownloadOption func(do *DownloadOptions) diff --git a/zboxcore/sdk/downloadworker.go b/zboxcore/sdk/downloadworker.go index 06ac4acb4..4199428db 100644 --- a/zboxcore/sdk/downloadworker.go +++ b/zboxcore/sdk/downloadworker.go @@ -10,7 +10,6 @@ import ( "hash" "io" "io/ioutil" - "math" "net/http" "os" "strings" @@ -37,6 +36,7 @@ import ( const ( DOWNLOAD_CONTENT_FULL = "full" DOWNLOAD_CONTENT_THUMB = "thumbnail" + EXTRA_COUNT = 2 ) type DownloadRequest struct { @@ -79,11 +79,12 @@ type DownloadRequest struct { chunksPerShard int64 size int64 offset int64 + bufferMap map[int]*zboxutil.DownloadBuffer } type blockData struct { blockNum int - data []byte + data [][][]byte } func (req *DownloadRequest) removeFromMask(pos uint64) { @@ -133,7 +134,7 @@ func (req *DownloadRequest) getBlocksDataFromBlobbers(startBlock, totalBlock int // getBlocksData will get data blocks for some interval from minimal blobers and aggregate them and // return to the caller -func (req *DownloadRequest) getBlocksData(startBlock, totalBlock int64) ([]byte, error) { +func (req *DownloadRequest) getBlocksData(startBlock, totalBlock int64) ([][][]byte, error) { shards, err := req.getBlocksDataFromBlobbers(startBlock, totalBlock) if err != nil { @@ -142,20 +143,16 @@ func (req *DownloadRequest) getBlocksData(startBlock, totalBlock int64) ([]byte, // erasure decoding // Can we benefit from goroutine for erasure decoding?? - c := req.datashards * req.effectiveBlockSize - data := make([]byte, req.datashards*req.effectiveBlockSize*int(totalBlock)) + // c := req.datashards * req.effectiveBlockSize + // data := make([]byte, req.datashards*req.effectiveBlockSize*int(totalBlock)) for i := range shards { - var d []byte - var err error - d, err = req.decodeEC(shards[i]) + err = req.decodeEC(shards[i]) if err != nil { return nil, err } - index := i * c - copy(data[index:index+c], d) } - return data, nil + return shards, nil } // downloadBlock This function will add download requests to the download channel which picks up @@ -215,7 +212,11 @@ func (req *DownloadRequest) downloadBlock( if !skipDownload { bf := req.validationRootMap[blockDownloadReq.blobber.ID] blockDownloadReq.blobberFile = bf - go AddBlockDownloadReq(blockDownloadReq) + if req.shouldVerify { + go AddBlockDownloadReq(req.ctx, blockDownloadReq, nil, req.effectiveBlockSize) + } else { + go AddBlockDownloadReq(req.ctx, blockDownloadReq, req.bufferMap[int(pos)], req.effectiveBlockSize) + } } c++ @@ -223,7 +224,6 @@ func (req *DownloadRequest) downloadBlock( remainingMask = i break } - } var failed int32 @@ -241,6 +241,7 @@ func (req *DownloadRequest) downloadBlock( downloadErrors[i] = fmt.Sprintf("Error %s from %s", err.Error(), req.blobbers[result.idx].Baseurl) logger.Logger.Error(err) + req.bufferMap[result.idx].ReleaseChunk(int(req.startBlock / req.numBlocks)) } wg.Done() }() @@ -257,23 +258,26 @@ func (req *DownloadRequest) downloadBlock( } // decodeEC will reconstruct shards and verify it -func (req *DownloadRequest) decodeEC(shards [][]byte) (data []byte, err error) { +func (req *DownloadRequest) decodeEC(shards [][]byte) (err error) { err = req.ecEncoder.ReconstructData(shards) if err != nil { return } - c := len(shards[0]) - data = make([]byte, req.datashards*c) - for i := 0; i < req.datashards; i++ { - index := i * c - copy(data[index:index+c], shards[i]) - } - return data, nil + // c := len(shards[0]) + // data = make([]byte, req.datashards*c) + // for i := 0; i < req.datashards; i++ { + // index := i * c + // copy(data[index:index+c], shards[i]) + // } + return nil } +//shards -> shards[i][data] + // fillShards will fill `shards` with data from blobbers that belongs to specific // blockNumber and blobber's position index in an allocation func (req *DownloadRequest) fillShards(shards [][][]byte, result *downloadBlock) (err error) { + for i := 0; i < len(result.BlockChunks); i++ { var data []byte if req.encryptedKey != "" { @@ -381,11 +385,6 @@ func (req *DownloadRequest) processDownload(ctx context.Context) { } size, chunksPerShard, blocksPerShard := req.size, req.chunksPerShard, req.blocksPerShard - logger.Logger.Info( - fmt.Sprintf("Downloading file with size: %d from start block: %d and end block: %d. "+ - "Blocks per blobber: %d", size, req.startBlock, req.endBlock, blocksPerShard), - ) - now := time.Now() err := req.initEC() if err != nil { @@ -411,12 +410,12 @@ func (req *DownloadRequest) processDownload(ctx context.Context) { startBlock, endBlock, numBlocks := req.startBlock, req.endBlock, req.numBlocks // remainingSize should be calculated based on startBlock number // otherwise end data will have null bytes. - remainingSize := size - startBlock*int64(req.effectiveBlockSize) + remainingSize := size - startBlock*int64(req.effectiveBlockSize)*int64(req.datashards) if req.statusCallback != nil { // Started will also initialize progress bar. So without calling this function // other callback's call will panic - req.statusCallback.Started(req.allocationID, remotePathCB, op, int(size)) + req.statusCallback.Started(req.allocationID, remotePathCB, op, int(remainingSize)) } if req.shouldVerify { @@ -432,21 +431,36 @@ func (req *DownloadRequest) processDownload(ctx context.Context) { var ( actualFileHasher hash.Hash isPREAndWholeFile bool - closeOnce *sync.Once - hashDataChan chan []byte - hashWg *sync.WaitGroup ) if !req.shouldVerify && (startBlock == 0 && endBlock == chunksPerShard) { actualFileHasher = md5.New() - hashDataChan = make(chan []byte, n) - hashWg = &sync.WaitGroup{} - hashWg.Add(1) - closeOnce = &sync.Once{} - go processHashData(hashDataChan, hashWg, actualFileHasher) isPREAndWholeFile = true } + if !req.shouldVerify { + var pos uint64 + req.bufferMap = make(map[int]*zboxutil.DownloadBuffer) + sz := downloadWorkerCount + EXTRA_COUNT + if sz > n { + sz = n + } + for i := req.downloadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) { + pos = uint64(i.TrailingZeros()) + req.bufferMap[int(pos)] = zboxutil.NewDownloadBuffer(sz, int(numBlocks), req.effectiveBlockSize) + } + } + + toSync := false + if _, ok := req.fileHandler.(*sys.MemChanFile); ok { + toSync = true + } + + logger.Logger.Info( + fmt.Sprintf("Downloading file with size: %d from start block: %d and end block: %d. "+ + "Blocks per blobber: %d remainingSize: %d and total requests: %d", size, req.startBlock, req.endBlock, blocksPerShard, remainingSize, n), + ) + writeCtx, writeCancel := context.WithCancel(ctx) defer writeCancel() var wg sync.WaitGroup @@ -454,47 +468,54 @@ func (req *DownloadRequest) processDownload(ctx context.Context) { // Handle writing the blocks in order as soon as they are downloaded go func() { - buffer := make(map[int][]byte) + defer wg.Done() + buffer := make(map[int][][][]byte) for i := 0; i < n; i++ { select { case <-writeCtx.Done(): - if isPREAndWholeFile { - closeOnce.Do(func() { - close(hashDataChan) - }) - } goto breakLoop default: } if data, ok := buffer[i]; ok { // If the block we need to write next is already in the buffer, write it - numBytes := int64(math.Min(float64(remainingSize), float64(len(data)))) + hashWg := &sync.WaitGroup{} if isPREAndWholeFile { - hashDataChan <- data[:numBytes] if i == n-1 { - closeOnce.Do(func() { - close(hashDataChan) - }) - hashWg.Wait() + writeData(actualFileHasher, data, req.datashards, int(remainingSize)) //nolint if calculatedFileHash, ok := checkHash(actualFileHasher, fRef, req.contentMode); !ok { req.errorCB(fmt.Errorf("Expected actual file hash %s, calculated file hash %s", fRef.ActualFileHash, calculatedFileHash), remotePathCB) return } + } else { + hashWg.Add(1) + go func() { + writeData(actualFileHasher, data, req.datashards, int(remainingSize)) //nolint + hashWg.Done() + }() } } - _, err = req.fileHandler.Write(data[:numBytes]) + totalWritten, err := writeData(req.fileHandler, data, req.datashards, int(remainingSize)) if err != nil { req.errorCB(errors.Wrap(err, "Write file failed"), remotePathCB) return } + if toSync { + req.fileHandler.Sync() //nolint + } - downloaded = downloaded + int(numBytes) - remainingSize -= numBytes + if isPREAndWholeFile { + hashWg.Wait() + } + for _, rb := range req.bufferMap { + rb.ReleaseChunk(i) + } + downloaded = downloaded + totalWritten + remainingSize -= int64(totalWritten) if req.statusCallback != nil { - req.statusCallback.InProgress(req.allocationID, remotePathCB, op, downloaded, data) + req.statusCallback.InProgress(req.allocationID, remotePathCB, op, downloaded, nil) } // Remove the block from the buffer @@ -504,32 +525,46 @@ func (req *DownloadRequest) processDownload(ctx context.Context) { for block := range blocks { if block.blockNum == i { // Write the data - numBytes := int64(math.Min(float64(remainingSize), float64(len(block.data)))) + hashWg := &sync.WaitGroup{} if isPREAndWholeFile { - hashDataChan <- block.data[:numBytes] if i == n-1 { - closeOnce.Do(func() { - close(hashDataChan) - }) - hashWg.Wait() + writeData(actualFileHasher, block.data, req.datashards, int(remainingSize)) //nolint if calculatedFileHash, ok := checkHash(actualFileHasher, fRef, req.contentMode); !ok { req.errorCB(fmt.Errorf("Expected actual file hash %s, calculated file hash %s", fRef.ActualFileHash, calculatedFileHash), remotePathCB) return } + } else { + hashWg.Add(1) + go func() { + writeData(actualFileHasher, block.data, req.datashards, int(remainingSize)) //nolint + hashWg.Done() + }() } } - _, err = req.fileHandler.Write(block.data[:numBytes]) + + totalWritten, err := writeData(req.fileHandler, block.data, req.datashards, int(remainingSize)) if err != nil { req.errorCB(errors.Wrap(err, "Write file failed"), remotePathCB) return } - downloaded = downloaded + int(numBytes) - remainingSize -= numBytes + if toSync { + req.fileHandler.Sync() //nolint + } + + if isPREAndWholeFile { + hashWg.Wait() + } + for _, rb := range req.bufferMap { + rb.ReleaseChunk(i) + } + + downloaded = downloaded + totalWritten + remainingSize -= int64(totalWritten) if req.statusCallback != nil { - req.statusCallback.InProgress(req.allocationID, remotePathCB, op, downloaded, block.data) + req.statusCallback.InProgress(req.allocationID, remotePathCB, op, downloaded, nil) } break @@ -542,10 +577,10 @@ func (req *DownloadRequest) processDownload(ctx context.Context) { } breakLoop: req.fileHandler.Sync() //nolint - wg.Done() }() eg, _ := errgroup.WithContext(ctx) + eg.SetLimit(downloadWorkerCount + EXTRA_COUNT) for i := 0; i < n; i++ { j := i eg.Go(func() error { @@ -597,13 +632,6 @@ func checkHash(actualFileHasher hash.Hash, fref *fileref.FileRef, contentMode st } } -func processHashData(hashDataChan chan []byte, hashWg *sync.WaitGroup, actualFileHasher hash.Hash) { - defer hashWg.Done() - for data := range hashDataChan { - actualFileHasher.Write(data) - } -} - func (req *DownloadRequest) submitReadMarker(blobber *blockchain.StorageNode, readCount int64) (err error) { var retryCount = 3 for retryCount > 0 { @@ -1115,3 +1143,29 @@ func (req *DownloadRequest) Seek(offset int64, whence int) (int64, error) { } return req.offset, nil } + +func writeData(dest io.Writer, data [][][]byte, dataShards, remaining int) (int, error) { + total := 0 + for i := 0; i < len(data); i++ { + for j := 0; j < dataShards; j++ { + if len(data[i][j]) <= remaining { + n, err := dest.Write(data[i][j]) + total += n + if err != nil { + return total, err + } + } else { + n, err := dest.Write(data[i][j][:remaining]) + total += n + if err != nil { + return total, err + } + } + remaining -= len(data[i][j]) + if remaining <= 0 { + return total, nil + } + } + } + return total, nil +} diff --git a/zboxcore/sdk/downloadworker_test.go b/zboxcore/sdk/downloadworker_test.go index 7d9e9af23..30df15948 100644 --- a/zboxcore/sdk/downloadworker_test.go +++ b/zboxcore/sdk/downloadworker_test.go @@ -83,8 +83,7 @@ func TestDecodeEC(t *testing.T) { test.setup(test) } - data, err := test.req.decodeEC(test.shards) - + err := test.req.decodeEC(test.shards) if test.wantErr { require.Error(t, err) require.Contains(t, err.Error(), test.errMsg) @@ -94,10 +93,12 @@ func TestDecodeEC(t *testing.T) { } h := sha3.New256() - n, err := h.Write(data) - require.NoError(t, err) - require.Equal(t, len(data), n) - + total := 0 + for i := 0; i < test.req.datashards; i++ { + n, err := h.Write(test.shards[i]) + require.NoError(t, err) + total += n + } hash := hex.EncodeToString(h.Sum(nil)) require.Equal(t, test.contentHash, hash) diff --git a/zboxcore/sdk/reader.go b/zboxcore/sdk/reader.go index c42c05fc9..0ee6b911d 100644 --- a/zboxcore/sdk/reader.go +++ b/zboxcore/sdk/reader.go @@ -135,7 +135,7 @@ func (sd *StreamDownload) Read(b []byte) (int, error) { wantBlocksPerShard := (wantSize + int64(sd.effectiveBlockSize) - 1) / int64(sd.effectiveBlockSize) sd.blocksPerShard = wantBlocksPerShard - effectiveChunkSize := sd.effectiveBlockSize * sd.datashards + // effectiveChunkSize := sd.effectiveBlockSize * sd.datashards n := 0 for startInd < endInd { if startInd+numBlocks > endInd { @@ -149,12 +149,12 @@ func (sd *StreamDownload) Read(b []byte) (int, error) { return 0, err } - offset := sd.offset % int64(effectiveChunkSize) + // offset := sd.offset % int64(effectiveChunkSize) // size of buffer `b` can be any number but we don't want to copy more than want size // offset is important parameter because without it data will be corrupted. // If previously set offset was 65536 + 1(block number 0) and we get data block with block number 1 // then we should not copy whole data to the buffer rather after offset. - n += copy(b[n:wantSize], data[offset:]) + n += copy(b[n:wantSize], data[0][0]) startInd += numBlocks } @@ -179,7 +179,7 @@ func GetDStorageFileReader(alloc *Allocation, ref *ORef, sdo *StreamDownloadOpti validationRootMap: make(map[string]*blobberFile), shouldVerify: sdo.VerifyDownload, Consensus: Consensus{ - RWMutex: &sync.RWMutex{}, + RWMutex: &sync.RWMutex{}, fullconsensus: alloc.fullconsensus, consensusThresh: alloc.consensusThreshold, }, diff --git a/zboxcore/zboxutil/download_buffer.go b/zboxcore/zboxutil/download_buffer.go new file mode 100644 index 000000000..78a0fe2d7 --- /dev/null +++ b/zboxcore/zboxutil/download_buffer.go @@ -0,0 +1,60 @@ +package zboxutil + +import ( + "context" + "sync" + "time" +) + +type DownloadBuffer struct { + buf []byte + length int + reqSize int + mask uint32 + mu sync.RWMutex +} + +func NewDownloadBuffer(size, numBlocks, effectiveBlockSize int) *DownloadBuffer { + numBlocks++ + return &DownloadBuffer{ + buf: make([]byte, size*numBlocks*effectiveBlockSize), + length: size, + reqSize: effectiveBlockSize * numBlocks, + mask: (1 << size) - 1, + } +} + +func (r *DownloadBuffer) RequestChunk(ctx context.Context, num int) []byte { + num = num % r.length + for { + select { + case <-ctx.Done(): + return nil + default: + } + r.mu.RLock() + isSet := r.mask & (1 << num) + r.mu.RUnlock() + // already assigned + if isSet == 0 { + time.Sleep(500 * time.Millisecond) + continue + } + // assign the chunk by clearing the bit + r.mu.Lock() + r.mask &= ^(1 << num) + r.mu.Unlock() + return r.buf[num*r.reqSize : (num+1)*r.reqSize] + } +} + +func (r *DownloadBuffer) ReleaseChunk(num int) { + num = num % r.length + r.mu.Lock() + defer r.mu.Unlock() + r.mask |= 1 << num +} + +func (r *DownloadBuffer) Stats() (int, int) { + return len(r.buf), cap(r.buf) +}