Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download buffer #1363

Merged
merged 11 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions core/sys/fs_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ type MemChanFile struct {
ChunkWriteSize int // 0 value means no limit
Sys interface{} // FileInfo.Sys
reader io.Reader
data []byte
}

func (f *MemChanFile) Stat() (fs.FileInfo, error) {
Expand All @@ -287,24 +288,29 @@ func (f *MemChanFile) Read(p []byte) (int, error) {
n := copy(p, recieveData)
return n, nil
}

func (f *MemChanFile) Write(p []byte) (n int, err error) {
if f.ChunkWriteSize == 0 {
f.Buffer <- p
} else {
current := 0
for ; current < len(p); current += f.ChunkWriteSize {
end := current + f.ChunkWriteSize
if end > len(p) {
end = len(p)
}
f.Buffer <- p[current:end]
if cap(f.data) == 0 {
f.data = make([]byte, 0, f.ChunkWriteSize)
}
f.data = append(f.data, p...)
}
return len(p), nil

}

func (f *MemChanFile) Sync() error {
current := 0
for ; current < len(f.data); current += f.ChunkWriteSize {
end := current + f.ChunkWriteSize
if end > len(f.data) {
end = len(f.data)
}
f.Buffer <- f.data[current:end]
}
f.data = make([]byte, 0, f.ChunkWriteSize)
return nil
}
func (f *MemChanFile) Seek(offset int64, whence int) (ret int64, err error) {
Expand Down
22 changes: 11 additions & 11 deletions wasmsdk/jsbridge/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@ var jsFileWriterMutex sync.Mutex

type FileWriter struct {
writableStream js.Value
uint8Array js.Value
fileHandle js.Value
bufLen int
}

func (w *FileWriter) Write(p []byte) (int, error) {
//js.Value doesn't work in parallel invoke
jsFileWriterMutex.Lock()
defer jsFileWriterMutex.Unlock()

uint8Array := js.Global().Get("Uint8Array").New(len(p))
js.CopyBytesToJS(uint8Array, p)
_, err := Await(w.writableStream.Call("write", uint8Array))
if w.bufLen != len(p) {
w.bufLen = len(p)
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
}
js.CopyBytesToJS(w.uint8Array, p)
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
return 0, errors.New("file_writer: " + err[0].String())
}
Expand Down Expand Up @@ -55,14 +61,7 @@ func (w *FileWriter) Stat() (fs.FileInfo, error) {
}

func NewFileWriter(filename string) (*FileWriter, error) {
// writableStream := js.Global().Get("writableStream")
// stream, err := Await(writableStream.Call("create", filename))
// if len(err) > 0 && !err[0].IsNull() {
// return nil, errors.New("file_writer: " + err[0].String())
// }
// return &FileWriter{
// writableStream: stream[0],
// }, nil

if !js.Global().Get("window").Get("showSaveFilePicker").Truthy() || !js.Global().Get("window").Get("WritableStream").Truthy() {
return nil, errors.New("file_writer: not supported")
}
Expand All @@ -84,5 +83,6 @@ func NewFileWriter(filename string) (*FileWriter, error) {
}
return &FileWriter{
writableStream: writableStream[0],
fileHandle: fileHandle[0],
}, nil
}
2 changes: 1 addition & 1 deletion zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (a *Allocation) GetBlobberStats() map[string]*BlobberAllocationStats {
return result
}

const downloadWorkerCount = 10
const downloadWorkerCount = 6

func (a *Allocation) InitAllocation() {
a.downloadChan = make(chan *DownloadRequest, 100)
Expand Down
94 changes: 56 additions & 38 deletions zboxcore/sdk/blockdownloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"

Expand All @@ -19,6 +18,7 @@ import (
zlogger "github.com/0chain/gosdk/zboxcore/logger"
"github.com/0chain/gosdk/zboxcore/marker"
"github.com/0chain/gosdk/zboxcore/zboxutil"
"golang.org/x/sync/semaphore"
)

const (
Expand All @@ -45,6 +45,7 @@ type BlockDownloadRequest struct {
result chan *downloadBlock
shouldVerify bool
connectionID string
respBuf []byte
}

type downloadResponse struct {
Expand Down Expand Up @@ -74,21 +75,26 @@ func InitBlockDownloader(blobbers []*blockchain.StorageNode, workerCount int) {
for _, blobber := range blobbers {
if _, ok := downloadBlockChan[blobber.ID]; !ok {
downloadBlockChan[blobber.ID] = make(chan *BlockDownloadRequest, workerCount)
for i := 0; i < workerCount; i++ {
blobberChan := downloadBlockChan[blobber.ID]
go startBlockDownloadWorker(blobberChan)
}
go startBlockDownloadWorker(downloadBlockChan[blobber.ID], workerCount)
}
}
}

func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest) {
func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers int) {
sem := semaphore.NewWeighted(int64(workers))
for {
blockDownloadReq, open := <-blobberChan
if !open {
break
}
blockDownloadReq.downloadBlobberBlock()
if err := sem.Acquire(blockDownloadReq.ctx, 1); err != nil {
blockDownloadReq.result <- &downloadBlock{Success: false, idx: blockDownloadReq.blobberIdx, err: err}
continue
}
go func() {
blockDownloadReq.downloadBlobberBlock()
sem.Release(1)
}()
}
}

Expand Down Expand Up @@ -155,45 +161,49 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() {
if req.chunkSize == 0 {
req.chunkSize = CHUNK_SIZE
}

if resp.StatusCode == http.StatusTooManyRequests {
shouldRetry = true
time.Sleep(time.Second * 2)
return errors.New(RateLimitError, "Rate limit error")
}

if resp.StatusCode == http.StatusInternalServerError {
shouldRetry = true
return errors.New("internal_server_error", "Internal server error")
}

var rspData downloadBlock
respLen := resp.Header.Get("Content-Length")
var respBody []byte
if respLen != "" {
len, err := strconv.Atoi(respLen)
zlogger.Logger.Info("respLen", len)
if err != nil {
zlogger.Logger.Error("respLen convert error: ", err)
return err
}
respBody, err = readBody(resp.Body, len)
if req.shouldVerify {
req.respBuf, err = io.ReadAll(resp.Body)
if err != nil {
zlogger.Logger.Error("respBody read error: ", err)
return err
}
} else {
respBody, err = readBody(resp.Body, int(req.numBlocks)*req.chunkSize)
req.respBuf, err = readBody(resp.Body, req.respBuf)
if err != nil {
zlogger.Logger.Error("respBody read error: ", err)
return err
}
}
if resp.StatusCode != http.StatusOK {
zlogger.Logger.Debug(fmt.Sprintf("downloadBlobberBlock FAIL - blobberID: %v, clientID: %v, blockNum: %d, retry: %d, response: %v", req.blobber.ID, client.GetClientID(), header.BlockNum, retry, string(respBody)))
if err = json.Unmarshal(respBody, &rspData); err == nil {
zlogger.Logger.Debug(fmt.Sprintf("downloadBlobberBlock FAIL - blobberID: %v, clientID: %v, blockNum: %d, retry: %d, response: %v", req.blobber.ID, client.GetClientID(), header.BlockNum, retry, string(req.respBuf)))
if err = json.Unmarshal(req.respBuf, &rspData); err == nil {
return errors.New("download_error", fmt.Sprintf("Response status: %d, Error: %v,", resp.StatusCode, rspData.err))
}
return errors.New("response_error", string(respBody))
return errors.New("response_error", string(req.respBuf))
}

dR := downloadResponse{}
contentType := resp.Header.Get("Content-Type")
if contentType == "application/json" {
err = json.Unmarshal(respBody, &dR)
err = json.Unmarshal(req.respBuf, &dR)
if err != nil {
return err
}
} else {
dR.Data = respBody
dR.Data = req.respBuf
}
if req.contentMode == DOWNLOAD_CONTENT_FULL && req.shouldVerify {

Expand Down Expand Up @@ -235,17 +245,17 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() {

if err != nil {
if shouldRetry {
retry = 0
if retry >= 3 {
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err}
return
}
shouldRetry = false
zlogger.Logger.Debug("Retrying for Error occurred: ", err)
retry++
continue
}
if retry >= 3 {
} else {
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err}
return
}
retry++
continue
}
return
}
Expand All @@ -254,22 +264,30 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() {

}

func AddBlockDownloadReq(req *BlockDownloadRequest) {
func AddBlockDownloadReq(ctx context.Context, req *BlockDownloadRequest, rb *zboxutil.DownloadBuffer, effectiveBlockSize int) {
if rb != nil {
reqCtx, cncl := context.WithTimeout(ctx, (time.Second * 10))
defer cncl()
req.respBuf = rb.RequestChunk(reqCtx, int(req.blockNum/req.numBlocks))
if len(req.respBuf) == 0 {
req.respBuf = make([]byte, int(req.numBlocks)*effectiveBlockSize)
}
}
downloadBlockChan[req.blobber.ID] <- req
}

func readBody(r io.Reader, size int) ([]byte, error) {
b := make([]byte, 0, size)
func readBody(r io.Reader, b []byte) ([]byte, error) {
start := 0
if len(b) == 0 {
return nil, fmt.Errorf("readBody: empty buffer")
}
for {
if len(b) == cap(b) {
// Add more capacity (let append pick how much).
b = append(b, 0)[:len(b)]
}
n, err := r.Read(b[len(b):cap(b)])
b = b[:len(b)+n]
n, err := r.Read(b[start:])
start += n
if err != nil {
if err == io.EOF {
err = nil
b = b[:start]
}
return b, err
}
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/downloader_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sdk

import "github.com/0chain/gosdk/core/sys"

const DefaultBlocksPerMarker int = 10
const DefaultBlocksPerMarker int = 100

// DownloadOption set download option
type DownloadOption func(do *DownloadOptions)
Expand Down
Loading
Loading