Skip to content

Commit

Permalink
fix write to file
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Jan 15, 2024
1 parent 94fd40b commit cee36cb
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 47 deletions.
19 changes: 0 additions & 19 deletions zboxcore/sdk/blockdownloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,6 @@ func (req *BlockDownloadRequest) splitData(buf []byte, lim int) [][]byte {
return chunks
}

func splitShards(shards [][]byte, lim int) [][][]byte {
newShards := make([][][]byte, 0, common.MustAddInt(len(shards[0])/lim, 1))
for i := range newShards {
newShards[i] = make([][]byte, len(shards))
}
offset := 0
for i := range newShards {
for j := range newShards[i] {
if offset+lim <= len(shards[j]) {
newShards[i][j] = shards[j][offset : lim+offset]
} else if len(shards[j]) > 0 {
newShards[i][j] = shards[j][offset:]
}
}
offset += lim
}
return newShards
}

func (req *BlockDownloadRequest) downloadBlobberBlock() {
if req.numBlocks <= 0 {
req.result <- &downloadBlock{Success: false, idx: req.blobberIdx, err: errors.New("invalid_request", "Invalid number of blocks for download")}
Expand Down
48 changes: 24 additions & 24 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (req *DownloadRequest) decodeEC(shards [][]byte) (err error) {
// fillShards will fill `shards` with data from blobbers that belongs to specific
// blockNumber and blobber's position index in an allocation
func (req *DownloadRequest) fillShards(shards [][][]byte, result *downloadBlock) (err error) {
fmt.Println("fillShards: ", req.startBlock, req.numBlocks)

for i := 0; i < len(result.BlockChunks); i++ {
var data []byte
if req.encryptedKey != "" {
Expand Down Expand Up @@ -475,7 +475,7 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
hashWg := &sync.WaitGroup{}
if isPREAndWholeFile {
if i == n-1 {
writeData(actualFileHasher, data, req.datashards) //nolint
writeData(actualFileHasher, data, req.datashards, int(remainingSize)) //nolint
if calculatedFileHash, ok := checkHash(actualFileHasher, fRef, req.contentMode); !ok {
req.errorCB(fmt.Errorf("Expected actual file hash %s, calculated file hash %s",
fRef.ActualFileHash, calculatedFileHash), remotePathCB)
Expand All @@ -484,13 +484,13 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
} else {
hashWg.Add(1)
go func() {
writeData(actualFileHasher, data, req.datashards) //nolint
writeData(actualFileHasher, data, req.datashards, int(remainingSize)) //nolint
hashWg.Done()
}()
}
}

totalWritten, err := writeData(req.fileHandler, data, req.datashards)
totalWritten, err := writeData(req.fileHandler, data, req.datashards, int(remainingSize))
if err != nil {
req.errorCB(errors.Wrap(err, "Write file failed"), remotePathCB)
return
Expand All @@ -502,7 +502,6 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
for _, rb := range req.bufferMap {
rb.ReleaseChunk(i)
}
fmt.Println("Written block: ", i, totalWritten/MB)
downloaded = downloaded + totalWritten
remainingSize -= int64(totalWritten)

Expand All @@ -520,7 +519,7 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
hashWg := &sync.WaitGroup{}
if isPREAndWholeFile {
if i == n-1 {
writeData(actualFileHasher, block.data, req.datashards) //nolint
writeData(actualFileHasher, block.data, req.datashards, int(remainingSize)) //nolint
if calculatedFileHash, ok := checkHash(actualFileHasher, fRef, req.contentMode); !ok {
req.errorCB(fmt.Errorf("Expected actual file hash %s, calculated file hash %s",
fRef.ActualFileHash, calculatedFileHash), remotePathCB)
Expand All @@ -529,18 +528,17 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
} else {
hashWg.Add(1)
go func() {
writeData(actualFileHasher, block.data, req.datashards) //nolint
writeData(actualFileHasher, block.data, req.datashards, int(remainingSize)) //nolint
hashWg.Done()
}()
}
}

totalWritten, err := writeData(req.fileHandler, block.data, req.datashards)
totalWritten, err := writeData(req.fileHandler, block.data, req.datashards, int(remainingSize))
if err != nil {
req.errorCB(errors.Wrap(err, "Write file failed"), remotePathCB)
return
}
fmt.Println("Written block: ", i, totalWritten/MB)

if isPREAndWholeFile {
hashWg.Wait()
Expand Down Expand Up @@ -585,7 +583,6 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
if err != nil {
return errors.Wrap(err, fmt.Sprintf("Download failed for block %d. ", startBlock+int64(j)*numBlocks))
}
fmt.Println("Downloaded block: ", j)
blocks <- blockData{blockNum: j, data: data}

return nil
Expand All @@ -598,7 +595,6 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
}

close(blocks)
fmt.Println("download complete waiting for write")
wg.Wait()
elapsedGetBlocksAndWrite := time.Since(now) - elapsedInitEC - elapsedInitEncryption
l.Logger.Info(fmt.Sprintf("[processDownload] Timings:\n allocation_id: %s,\n remotefilepath: %s,\n initEC: %d ms,\n initEncryption: %d ms,\n getBlocks and writes: %d ms",
Expand All @@ -624,13 +620,6 @@ func checkHash(actualFileHasher hash.Hash, fref *fileref.FileRef, contentMode st
}
}

func processHashData(hashDataChan chan []byte, hashWg *sync.WaitGroup, actualFileHasher hash.Hash) {
defer hashWg.Done()
for data := range hashDataChan {
actualFileHasher.Write(data)
}
}

func (req *DownloadRequest) submitReadMarker(blobber *blockchain.StorageNode, readCount int64) (err error) {
var retryCount = 3
for retryCount > 0 {
Expand Down Expand Up @@ -1143,15 +1132,26 @@ func (req *DownloadRequest) Seek(offset int64, whence int) (int64, error) {
return req.offset, nil
}

func writeData(dest io.Writer, data [][][]byte, dataShards int) (int, error) {
func writeData(dest io.Writer, data [][][]byte, dataShards, remaining int) (int, error) {
total := 0
fmt.Println("writeData: ", len(data))
for i := 0; i < len(data); i++ {
for j := 0; j < dataShards; j++ {
n, err := dest.Write(data[i][j])
total += n
if err != nil {
return total, err
if len(data[i][j]) <= remaining {
n, err := dest.Write(data[i][j])
total += n
if err != nil {
return total, err
}
} else {
n, err := dest.Write(data[i][j][:remaining])
total += n
if err != nil {
return total, err
}
}
remaining -= len(data[i][j])
if remaining <= 0 {
return total, nil
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions zboxcore/zboxutil/download_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package zboxutil

import (
"context"
"fmt"
"sync"
"time"
)
Expand All @@ -28,7 +27,6 @@ func NewDownloadBuffer(size, numBlocks, effectiveBlockSize int) *DownloadBuffer
func (r *DownloadBuffer) RequestChunk(ctx context.Context, num int) []byte {
num = num % r.length
for {
fmt.Println("Requested chunk: ", num)
select {
case <-ctx.Done():
return nil
Expand All @@ -45,15 +43,13 @@ func (r *DownloadBuffer) RequestChunk(ctx context.Context, num int) []byte {
// assign the chunk by clearing the bit
r.mu.Lock()
r.mask &= ^(1 << num)
fmt.Println("Acquired chunk: ", num)
r.mu.Unlock()
return r.buf[num*r.reqSize : (num+1)*r.reqSize]
}
}

func (r *DownloadBuffer) ReleaseChunk(num int) {
num = num % r.length
fmt.Println("ReleaseChunk: ", num)
r.mu.Lock()
defer r.mu.Unlock()
r.mask |= 1 << num
Expand Down

0 comments on commit cee36cb

Please sign in to comment.