Skip to content

Commit

Permalink
Merge pull request #1421 from 0chain/feat/list-thresh
Browse files Browse the repository at this point in the history
strict timeouts and break when consensus thresh is met
  • Loading branch information
dabasov authored Mar 10, 2024
2 parents cfe9bea + d38ff81 commit 14c6161
Show file tree
Hide file tree
Showing 11 changed files with 24 additions and 57 deletions.
5 changes: 0 additions & 5 deletions zboxcore/sdk/chunked_upload_blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest(
}

respbody := resp.Body()
if err != nil {
logger.Logger.Error("Error: Resp ", err)
return fmt.Errorf("Error while reading body. Error %s", err), false
}

if resp.StatusCode() == http.StatusTooManyRequests {
logger.Logger.Error("Got too many request error")
var r int
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/dirworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (req *DirRequest) createDirInBlobber(blobber *blockchain.StorageNode, pos u

for i := 0; i < 3; i++ {
err, shouldContinue = func() (err error, shouldContinue bool) {
ctx, cncl := context.WithTimeout(req.ctx, (time.Second * 30))
ctx, cncl := context.WithTimeout(req.ctx, (time.Second * 10))
resp, err = zboxutil.Client.Do(httpreq.WithContext(ctx))
cncl()
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,11 +1082,8 @@ func GetFileRefFromBlobber(allocationID, blobberId, remotePath string) (fRef *fi
listReq.ctx = ctx
listReq.remotefilepath = remotePath

listReq.wg = &sync.WaitGroup{}
listReq.wg.Add(1)
rspCh := make(chan *fileMetaResponse, 1)
go listReq.getFileMetaInfoFromBlobber(listReq.blobbers[0], 0, rspCh)
listReq.wg.Wait()
resp := <-rspCh
return resp.fileref, resp.err
}
Expand Down
5 changes: 0 additions & 5 deletions zboxcore/sdk/filemetaworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"mime/multipart"
"net/http"
"sync"
"time"

"github.com/0chain/errors"
Expand All @@ -25,7 +24,6 @@ type fileMetaResponse struct {
}

func (req *ListRequest) getFileMetaInfoFromBlobber(blobber *blockchain.StorageNode, blobberIdx int, rspCh chan<- *fileMetaResponse) {
defer req.wg.Done()
body := new(bytes.Buffer)
formWriter := multipart.NewWriter(body)

Expand Down Expand Up @@ -92,13 +90,10 @@ func (req *ListRequest) getFileMetaInfoFromBlobber(blobber *blockchain.StorageNo

func (req *ListRequest) getFileMetaFromBlobbers() []*fileMetaResponse {
numList := len(req.blobbers)
req.wg = &sync.WaitGroup{}
req.wg.Add(numList)
rspCh := make(chan *fileMetaResponse, numList)
for i := 0; i < numList; i++ {
go req.getFileMetaInfoFromBlobber(req.blobbers[i], i, rspCh)
}
req.wg.Wait()
fileInfos := make([]*fileMetaResponse, len(req.blobbers))
for i := 0; i < numList; i++ {
ch := <-rspCh
Expand Down
4 changes: 0 additions & 4 deletions zboxcore/sdk/filemetaworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,9 @@ func TestListRequest_getFileMetaInfoFromBlobber(t *testing.T) {
authToken: &marker.AuthTicket{
Signature: mockSignature,
},
wg: &sync.WaitGroup{},
}
rspCh := make(chan *fileMetaResponse, 1)
req.wg.Add(1)
go req.getFileMetaInfoFromBlobber(blobber, 73, rspCh)
req.wg.Wait()

var resp *fileMetaResponse
select {
Expand Down Expand Up @@ -280,7 +277,6 @@ func TestListRequest_getFileConsensusFromBlobbers(t *testing.T) {
allocationTx: mockAllocationTxId,
ctx: context.TODO(),
blobbers: []*blockchain.StorageNode{},
wg: &sync.WaitGroup{},
Consensus: tt.consensus, //nolint
}
for i := 0; i < tt.numBlobbers; i++ {
Expand Down
5 changes: 0 additions & 5 deletions zboxcore/sdk/filestatsworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"mime/multipart"
"net/http"
"strings"
"sync"
"time"

"github.com/0chain/errors"
Expand Down Expand Up @@ -45,7 +44,6 @@ type fileStatsResponse struct {
}

func (req *ListRequest) getFileStatsInfoFromBlobber(blobber *blockchain.StorageNode, blobberIdx int, rspCh chan<- *fileStatsResponse) {
defer req.wg.Done()
body := new(bytes.Buffer)
formWriter := multipart.NewWriter(body)

Expand Down Expand Up @@ -111,13 +109,10 @@ func (req *ListRequest) getFileStatsInfoFromBlobber(blobber *blockchain.StorageN
func (req *ListRequest) getFileStatsFromBlobbers() map[string]*FileStats {
numList := len(req.blobbers)
//fmt.Printf("%v\n", req.blobbers)
req.wg = &sync.WaitGroup{}
req.wg.Add(numList)
rspCh := make(chan *fileStatsResponse, numList)
for i := 0; i < numList; i++ {
go req.getFileStatsInfoFromBlobber(req.blobbers[i], i, rspCh)
}
req.wg.Wait()
fileInfos := make(map[string]*FileStats)
for i := 0; i < numList; i++ {
ch := <-rspCh
Expand Down
5 changes: 0 additions & 5 deletions zboxcore/sdk/filestatsworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"testing"

"github.com/0chain/errors"
Expand Down Expand Up @@ -157,12 +156,9 @@ func TestListRequest_getFileStatsInfoFromBlobber(t *testing.T) {
allocationTx: mockAllocationTxId,
remotefilepath: mockRemoteFilePath,
ctx: context.Background(),
wg: &sync.WaitGroup{},
}
rspCh := make(chan *fileStatsResponse, 1)
req.wg.Add(1)
go req.getFileStatsInfoFromBlobber(&blobber, mockBlobberIndex, rspCh)
req.wg.Wait()
resp := <-rspCh
require.EqualValues(t, tt.wantErr, resp.err != nil)
if resp.err != nil {
Expand Down Expand Up @@ -257,7 +253,6 @@ func TestListRequest_getFileStatsFromBlobbers(t *testing.T) {
allocationTx: mockAllocationTxId,
ctx: context.TODO(),
blobbers: []*blockchain.StorageNode{},
wg: &sync.WaitGroup{},
}
for i := 0; i < tt.numBlobbers; i++ {
req.blobbers = append(req.blobbers, &blockchain.StorageNode{
Expand Down
41 changes: 19 additions & 22 deletions zboxcore/sdk/listworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type ListRequest struct {
remotefilepath string
authToken *marker.AuthTicket
ctx context.Context
wg *sync.WaitGroup
forRepair bool
listOnly bool
offset int
Expand Down Expand Up @@ -91,7 +90,6 @@ func WithListRequestForRepair(forRepair bool) ListRequestOptions {
}

func (req *ListRequest) getListInfoFromBlobber(blobber *blockchain.StorageNode, blobberIdx int, rspCh chan<- *listResponse) {
defer req.wg.Done()
//body := new(bytes.Buffer)
//formWriter := multipart.NewWriter(body)

Expand Down Expand Up @@ -130,7 +128,7 @@ func (req *ListRequest) getListInfoFromBlobber(blobber *blockchain.StorageNode,
}

//httpreq.Header.Add("Content-Type", formWriter.FormDataContentType())
ctx, cncl := context.WithTimeout(req.ctx, (time.Second * 30))
ctx, cncl := context.WithTimeout(req.ctx, (time.Second * 10))
err = zboxutil.HttpDo(ctx, cncl, httpreq, func(resp *http.Response, err error) error {
if err != nil {
l.Logger.Error("List : ", err)
Expand Down Expand Up @@ -162,46 +160,45 @@ func (req *ListRequest) getListInfoFromBlobber(blobber *blockchain.StorageNode,

func (req *ListRequest) getlistFromBlobbers() ([]*listResponse, error) {
numList := len(req.blobbers)
req.wg = &sync.WaitGroup{}
req.wg.Add(numList)
rspCh := make(chan *listResponse, numList)
for i := 0; i < numList; i++ {
go req.getListInfoFromBlobber(req.blobbers[i], i, rspCh)
}
req.wg.Wait()
listInfos := make([]*listResponse, numList)
consensusMap := make(map[string][]*blockchain.StorageNode)
var consensusHash string
for i := 0; i < numList; i++ {
listInfos[i] = <-rspCh
if !req.forRepair {
if listInfos[i].err != nil || listInfos[i].ref == nil {
continue
}
hash := listInfos[i].ref.FileMetaHash
consensusMap[hash] = append(consensusMap[hash], req.blobbers[listInfos[i].blobberIdx])
if len(consensusMap[hash]) >= req.consensusThresh {
consensusHash = hash
break
}
}
}
if req.listOnly {
return listInfos, nil
}
consensusMap := make(map[string][]*blockchain.StorageNode)
var consensusHash string
for i := 0; i < numList; i++ {
if listInfos[i].err != nil || listInfos[i].ref == nil {
continue
}
hash := listInfos[i].ref.FileMetaHash
consensusMap[hash] = append(consensusMap[hash], req.blobbers[listInfos[i].blobberIdx])
if len(consensusMap[hash]) >= req.consensusThresh {
consensusHash = hash
}
}

var err error
req.listOnly = true
listLen := len(consensusMap[consensusHash])
if listLen < req.consensusThresh {
return listInfos, listInfos[0].err
}
listInfos = listInfos[:1]
listOnlyRespCh := make(chan *listResponse, 1)
for i := 0; i < listLen; i++ {
var rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
num := rnd.Intn(listLen)
randomBlobber := consensusMap[consensusHash][num]
req.wg.Add(1)
go req.getListInfoFromBlobber(randomBlobber, 0, rspCh)
req.wg.Wait()
listInfos[0] = <-rspCh
go req.getListInfoFromBlobber(randomBlobber, 0, listOnlyRespCh)
listInfos[0] = <-listOnlyRespCh
if listInfos[0].err == nil {
return listInfos, nil
}
Expand Down
4 changes: 0 additions & 4 deletions zboxcore/sdk/listworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,9 @@ func TestListRequest_getListInfoFromBlobber(t *testing.T) {
authToken: &marker.AuthTicket{
Signature: mockSignature,
},
wg: &sync.WaitGroup{},
}
rspCh := make(chan *listResponse, 1)
req.wg.Add(1)
go req.getListInfoFromBlobber(blobber, 41, rspCh)
req.wg.Wait()
resp := <-rspCh
require.EqualValues(tt.wantErr, resp.err != nil)
if resp.err != nil {
Expand Down Expand Up @@ -288,7 +285,6 @@ func TestListRequest_GetListFromBlobbers(t *testing.T) {
allocationTx: mockAllocationTxId,
ctx: context.TODO(),
blobbers: []*blockchain.StorageNode{},
wg: &sync.WaitGroup{},
Consensus: Consensus{
consensusThresh: tt.consensusThresh,
fullconsensus: tt.fullconsensus,
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/multi_operation_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
DefaultCreateConnectionTimeOut = 2 * time.Minute
DefaultCreateConnectionTimeOut = 10 * time.Second
)

var BatchSize = 6
Expand Down
5 changes: 3 additions & 2 deletions zboxcore/sdk/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ func GetWritemarker(allocID, allocTx, id, baseUrl string) (*LatestPrevWriteMarke
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for retries := 0; retries < 3; retries++ {

resp, err := zboxutil.Client.Do(req)
resp, err := zboxutil.Client.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 14c6161

Please sign in to comment.