Skip to content

Commit

Permalink
download buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Jan 15, 2024
1 parent abaa1b2 commit 94fd40b
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 94 deletions.
2 changes: 1 addition & 1 deletion zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (a *Allocation) GetBlobberStats() map[string]*BlobberAllocationStats {
return result
}

const downloadWorkerCount = 10
const downloadWorkerCount = 6

func (a *Allocation) InitAllocation() {
a.downloadChan = make(chan *DownloadRequest, 100)
Expand Down
116 changes: 78 additions & 38 deletions zboxcore/sdk/blockdownloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"

Expand All @@ -19,6 +18,7 @@ import (
zlogger "github.com/0chain/gosdk/zboxcore/logger"
"github.com/0chain/gosdk/zboxcore/marker"
"github.com/0chain/gosdk/zboxcore/zboxutil"
"golang.org/x/sync/semaphore"
)

const (
Expand All @@ -45,6 +45,7 @@ type BlockDownloadRequest struct {
result chan *downloadBlock
shouldVerify bool
connectionID string
respBuf []byte
}

type downloadResponse struct {
Expand Down Expand Up @@ -74,21 +75,26 @@ func InitBlockDownloader(blobbers []*blockchain.StorageNode, workerCount int) {
for _, blobber := range blobbers {
if _, ok := downloadBlockChan[blobber.ID]; !ok {
downloadBlockChan[blobber.ID] = make(chan *BlockDownloadRequest, workerCount)
for i := 0; i < workerCount; i++ {
blobberChan := downloadBlockChan[blobber.ID]
go startBlockDownloadWorker(blobberChan)
}
go startBlockDownloadWorker(downloadBlockChan[blobber.ID], workerCount)
}
}
}

func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest) {
func startBlockDownloadWorker(blobberChan chan *BlockDownloadRequest, workers int) {
sem := semaphore.NewWeighted(int64(workers))
for {
blockDownloadReq, open := <-blobberChan
if !open {
break
}
blockDownloadReq.downloadBlobberBlock()
if err := sem.Acquire(blockDownloadReq.ctx, 1); err != nil {
blockDownloadReq.result <- &downloadBlock{Success: false, idx: blockDownloadReq.blobberIdx, err: err}
continue
}
go func() {
blockDownloadReq.downloadBlobberBlock()
sem.Release(1)
}()
}
}

Expand All @@ -105,6 +111,25 @@ func (req *BlockDownloadRequest) splitData(buf []byte, lim int) [][]byte {
return chunks
}

func splitShards(shards [][]byte, lim int) [][][]byte {

Check failure on line 114 in zboxcore/sdk/blockdownloadworker.go

View workflow job for this annotation

GitHub Actions / lint

func `splitShards` is unused (unused)
newShards := make([][][]byte, 0, common.MustAddInt(len(shards[0])/lim, 1))
for i := range newShards {
newShards[i] = make([][]byte, len(shards))
}
offset := 0
for i := range newShards {
for j := range newShards[i] {
if offset+lim <= len(shards[j]) {
newShards[i][j] = shards[j][offset : lim+offset]
} else if len(shards[j]) > 0 {
newShards[i][j] = shards[j][offset:]
}
}
offset += lim
}
return newShards
}

func (req *BlockDownloadRequest) downloadBlobberBlock() {
if req.numBlocks <= 0 {
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: errors.New("invalid_request", "Invalid number of blocks for download")}
Expand Down Expand Up @@ -155,45 +180,49 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() {
if req.chunkSize == 0 {
req.chunkSize = CHUNK_SIZE
}

if resp.StatusCode == http.StatusTooManyRequests {
shouldRetry = true
time.Sleep(time.Second * 2)
return errors.New(RateLimitError, "Rate limit error")
}

if resp.StatusCode == http.StatusInternalServerError {
shouldRetry = true
return errors.New("internal_server_error", "Internal server error")
}

var rspData downloadBlock
respLen := resp.Header.Get("Content-Length")
var respBody []byte
if respLen != "" {
len, err := strconv.Atoi(respLen)
zlogger.Logger.Info("respLen", len)
if err != nil {
zlogger.Logger.Error("respLen convert error: ", err)
return err
}
respBody, err = readBody(resp.Body, len)
if req.shouldVerify {
req.respBuf, err = io.ReadAll(resp.Body)
if err != nil {
zlogger.Logger.Error("respBody read error: ", err)
return err
}
} else {
respBody, err = readBody(resp.Body, int(req.numBlocks)*req.chunkSize)
req.respBuf, err = readBody(resp.Body, req.respBuf)
if err != nil {
zlogger.Logger.Error("respBody read error: ", err)
return err
}
}
if resp.StatusCode != http.StatusOK {
zlogger.Logger.Debug(fmt.Sprintf("downloadBlobberBlock FAIL - blobberID: %v, clientID: %v, blockNum: %d, retry: %d, response: %v", req.blobber.ID, client.GetClientID(), header.BlockNum, retry, string(respBody)))
if err = json.Unmarshal(respBody, &rspData); err == nil {
zlogger.Logger.Debug(fmt.Sprintf("downloadBlobberBlock FAIL - blobberID: %v, clientID: %v, blockNum: %d, retry: %d, response: %v", req.blobber.ID, client.GetClientID(), header.BlockNum, retry, string(req.respBuf)))
if err = json.Unmarshal(req.respBuf, &rspData); err == nil {
return errors.New("download_error", fmt.Sprintf("Response status: %d, Error: %v,", resp.StatusCode, rspData.err))
}
return errors.New("response_error", string(respBody))
return errors.New("response_error", string(req.respBuf))
}

dR := downloadResponse{}
contentType := resp.Header.Get("Content-Type")
if contentType == "application/json" {
err = json.Unmarshal(respBody, &dR)
err = json.Unmarshal(req.respBuf, &dR)
if err != nil {
return err
}
} else {
dR.Data = respBody
dR.Data = req.respBuf
}
if req.contentMode == DOWNLOAD_CONTENT_FULL && req.shouldVerify {

Expand Down Expand Up @@ -235,17 +264,17 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() {

if err != nil {
if shouldRetry {
retry = 0
if retry >= 3 {
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err}
return
}
shouldRetry = false
zlogger.Logger.Debug("Retrying for Error occurred: ", err)
retry++
continue
}
if retry >= 3 {
} else {
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: err}
return
}
retry++
continue
}
return
}
Expand All @@ -254,24 +283,35 @@ func (req *BlockDownloadRequest) downloadBlobberBlock() {

}

func AddBlockDownloadReq(req *BlockDownloadRequest) {
func AddBlockDownloadReq(ctx context.Context, req *BlockDownloadRequest, rb *zboxutil.DownloadBuffer, effectiveBlockSize int) {
if rb != nil {
reqCtx, cncl := context.WithTimeout(ctx, (time.Second * 10))
defer cncl()
req.respBuf = rb.RequestChunk(reqCtx, int(req.blockNum/req.numBlocks))
if len(req.respBuf) == 0 {
req.respBuf = make([]byte, int(req.numBlocks)*effectiveBlockSize)
}
}
downloadBlockChan[req.blobber.ID] <- req
}

func readBody(r io.Reader, size int) ([]byte, error) {
b := make([]byte, 0, size)
func readBody(r io.Reader, b []byte) ([]byte, error) {
start := 0
if len(b) == 0 {
return nil, fmt.Errorf("readBody: empty buffer")
}
for {
if len(b) == cap(b) {
// Add more capacity (let append pick how much).
b = append(b, 0)[:len(b)]
}
n, err := r.Read(b[len(b):cap(b)])
b = b[:len(b)+n]
n, err := r.Read(b[start:])
start += n
if err != nil {
if err == io.EOF {
err = nil
b = b[:start]
}
return b, err
}
if start == len(b) {
return nil, fmt.Errorf("readBody: buffer full (len=%d) and start %v", len(b), start)
}
}
}
Loading

0 comments on commit 94fd40b

Please sign in to comment.