Skip to content

Commit

Permalink
Enhancement/lp refactor download (#753)
Browse files Browse the repository at this point in the history
* Add initial code of refactor download

* Add similar fields

* Add functions for getting data and reconstructing

* WIP: Separate out get data to vertical and horizontal

Vertical can have multiple blocks per read marker

Horizontal can have only one block per read mearker

* WIP: Add downloadData function
Define errors

* WIP: Add decrypt function
Define more errors
Refactor StreamDownld structure

* Get refs with path hash and auth ticket

* Get stream downloader
Get refs with path hash and authticket as well

* Fix WIP code/logic.

* Fix authticket missing decoding

* Add decryption for shared encrypted file

* Separate out readCounter sync map in separate file

* Fix with set counter

* Change read counter to read size

* Delete read counter logic implementation

* Update with latest changes in readmarker
Few fixes and optimization
Remove print statements

* Remove print statements

* Add function to download file from where it left

* Uncomment actual code

* Modify function parameter

* Modify w.r.t. rm changes and handle errors

* Fix function return value

* Remove unrequired function parameter

* Add unit test to refactor download

* Add test case for refactor download test

* Fix/Rename functions

* Add test case for refactor download

* Add unit test cases

* Add fixes

* Add context to downloader

* Remove rx_pay

* Add support for thumbnail download
Remove rx pay

* Add support for thumbnail download
Remove rx-pay

* Add comment

* Remove download type as chunk size is fixed.

* Rename function

* Rename field

* Modify reader

* Fix issues

* Comply reader with io.ReadSeekerCloser interface

* Add comment and remove unused variables and constants

---------

Co-authored-by: Kishan Dhakan <[email protected]>
  • Loading branch information
lpoli and Kishan-Dhakan authored May 1, 2023
1 parent f4760c5 commit 0885b6e
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 42 deletions.
161 changes: 154 additions & 7 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
Expand Down Expand Up @@ -678,19 +679,17 @@ func (a *Allocation) ListDir(path string) (*ListResult, error) {
return nil, errors.New("list_request_failed", "Failed to get list response from the blobbers")
}

// This function will retrieve paginated objectTree and will handle concensus; Required tree should be made in application side.
// TODO use allocation context
func (a *Allocation) GetRefs(path, offsetPath, updatedDate, offsetDate, fileType, refType string, level, pageLimit int) (*ObjectTreeResult, error) {
if len(path) == 0 || !zboxutil.IsRemoteAbs(path) {
return nil, errors.New("invalid_path", fmt.Sprintf("Absolute path required. Path provided: %v", path))
}
func (a *Allocation) getRefs(path, pathHash, authToken, offsetPath, updatedDate, offsetDate, fileType, refType string, level, pageLimit int) (*ObjectTreeResult, error) {
if !a.isInitialized() {
return nil, notInitialized
}

oTreeReq := &ObjectTreeRequest{
allocationID: a.ID,
allocationTx: a.Tx,
blobbers: a.Blobbers,
authToken: authToken,
pathHash: pathHash,
remotefilepath: path,
pageLimit: pageLimit,
level: level,
Expand All @@ -704,10 +703,46 @@ func (a *Allocation) GetRefs(path, offsetPath, updatedDate, offsetDate, fileType
}
oTreeReq.fullconsensus = a.fullconsensus
oTreeReq.consensusThresh = a.consensusThreshold

return oTreeReq.GetRefs()
}

// GetRefsWithAuthTicket get refs that are children of shared remote path.
func (a *Allocation) GetRefsWithAuthTicket(authToken, offsetPath, updatedDate, offsetDate, fileType, refType string, level, pageLimit int) (*ObjectTreeResult, error) {
if authToken == "" {
return nil, errors.New("empty_auth_token", "auth token cannot be empty")
}
sEnc, err := base64.StdEncoding.DecodeString(authToken)
if err != nil {
return nil, errors.New("auth_ticket_decode_error", "Error decoding the auth ticket."+err.Error())
}

authTicket := new(marker.AuthTicket)
if err := json.Unmarshal(sEnc, authTicket); err != nil {
return nil, errors.New("json_unmarshall_error", err.Error())
}

at, _ := json.Marshal(authTicket)
return a.getRefs("", authTicket.FilePathHash, string(at), offsetPath, updatedDate, offsetDate, fileType, refType, level, pageLimit)
}

//This function will retrieve paginated objectTree and will handle concensus; Required tree should be made in application side.
func (a *Allocation) GetRefs(path, offsetPath, updatedDate, offsetDate, fileType, refType string, level, pageLimit int) (*ObjectTreeResult, error) {
if len(path) == 0 || !zboxutil.IsRemoteAbs(path) {
return nil, errors.New("invalid_path", fmt.Sprintf("Absolute path required. Path provided: %v", path))
}

return a.getRefs(path, "", "", offsetPath, updatedDate, offsetDate, fileType, refType, level, pageLimit)
}

func (a *Allocation) GetRefsFromLookupHash(pathHash, offsetPath, updatedDate, offsetDate, fileType, refType string, level, pageLimit int) (*ObjectTreeResult, error) {
if pathHash == "" {
return nil, errors.New("invalid_lookup_hash", "lookup hash cannot be empty")
}

return a.getRefs("", pathHash, "", offsetPath, updatedDate, offsetDate, fileType, refType, level, pageLimit)

}

func (a *Allocation) GetRecentlyAddedRefs(page int, fromDate int64, pageLimit int) (*RecentlyAddedRefResult, error) {
if !a.isInitialized() {
return nil, notInitialized
Expand Down Expand Up @@ -1208,6 +1243,118 @@ func (a *Allocation) CancelDownload(remotepath string) error {
return errors.New("remote_path_not_found", "Invalid path. No download in progress for the path "+remotepath)
}

func (a *Allocation) DownloadFromReader(
remotePath, localPath, pathHash, authToken, contentMode string,
verifyDownload bool, blocksPerMarker uint) error {

finfo, err := os.Stat(localPath)
if err != nil {
return err
}
if !finfo.IsDir() {
return errors.New("invalid_path", "local path must be directory")
}

r, err := a.GetAllocationFileReader(
remotePath, pathHash, authToken, contentMode, verifyDownload, blocksPerMarker)
if err != nil {
return err
}

sd := r.(*StreamDownload)

fileName := filepath.Base(sd.remotefilepath)
var localFPath string
if contentMode == DOWNLOAD_CONTENT_THUMB {
localFPath = filepath.Join(localPath, fileName, ".thumb")
} else {
localFPath = filepath.Join(localPath, fileName)
}

finfo, err = os.Stat(localFPath)

var f *os.File
if errors.Is(err, os.ErrNotExist) {
f, err = os.Create(localFPath)
} else {
r.Seek(finfo.Size(), io.SeekStart)
f, err = os.OpenFile(localFPath, os.O_WRONLY|os.O_APPEND, 0644)
}

if err != nil {
return err
}
defer f.Close()

buf := make([]byte, 1024*KB)
for {
n, err := r.Read(buf)
if err != nil && errors.Is(err, io.EOF) {
_, err = f.Write(buf[:n])
if err != nil {
return err
}
break
}
_, err = f.Write(buf[:n])
if err != nil {
return err
}
}

return nil
}

// GetStreamDownloader will check file ref existence and returns an instance that provides
// io.ReadSeekerCloser interface
func (a *Allocation) GetAllocationFileReader(
remotePath,
pathHash,
authToken,
contentMode string,
verifyDownload bool,
blocksPerMarker uint) (io.ReadSeekCloser, error) {

if !a.isInitialized() {
return nil, notInitialized
}
//Remove content mode option
remotePath = filepath.Clean(remotePath)
var res *ObjectTreeResult
var err error
switch {
case authToken != "":
res, err = a.GetRefsWithAuthTicket(authToken, "", "", "", "", "regular", 0, 1)
case remotePath != "":
res, err = a.GetRefs(remotePath, "", "", "", "", "regular", 0, 1)
case pathHash != "":
res, err = a.GetRefsFromLookupHash(pathHash, "", "", "", "", "regular", 0, 1) //
default:
return nil, errors.New("invalid_path", "remote path or authticket is required")
}

if err != nil {
return nil, err
}

if len(res.Refs) == 0 {
return nil, errors.New("file_does_not_exist", "")
}
ref := &res.Refs[0]
if ref.Type != fileref.FILE {
return nil, errors.New("operation_not_supported", "downloading other than file is not supported")
}

sdo := &StreamDownloadOption{
ContentMode: contentMode,
AuthTicket: authToken,
VerifyDownload: verifyDownload,
BlocksPerMarker: blocksPerMarker,
}

return GetDStorageFileReader(a, ref, sdo)
}

func (a *Allocation) DownloadThumbnailFromAuthTicket(localPath string,
authTicket string, remoteLookupHash string, remoteFilename string, verifyDownload bool,
status StatusCallback) error {
Expand Down
3 changes: 1 addition & 2 deletions zboxcore/sdk/blockdownloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import (
)

const (
NotEnoughTokens = "not_enough_tokens"
LockExists = "lock_exists"
LockExists = "lock_exists"
)

type BlockDownloadRequest struct {
Expand Down
20 changes: 11 additions & 9 deletions zboxcore/sdk/downloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type DownloadRequest struct {
completedCallback func(remotepath string, remotepathhash string)
contentMode string
Consensus
effectiveChunkSize int
effectiveBlockSize int // blocksize - encryptionOverHead
ecEncoder reedsolomon.Encoder
maskMu *sync.Mutex
encScheme encryption.EncryptionScheme
Expand Down Expand Up @@ -112,8 +112,8 @@ func (req *DownloadRequest) getBlocksData(startBlock, totalBlock int64) ([]byte,

// erasure decoding
// Can we benefit from goroutine for erasure decoding??
c := req.datashards * req.effectiveChunkSize
data := make([]byte, req.datashards*req.effectiveChunkSize*int(totalBlock))
c := req.datashards * req.effectiveBlockSize
data := make([]byte, req.datashards*req.effectiveBlockSize*int(totalBlock))
var isValid bool
for i := range shards {
var d []byte
Expand Down Expand Up @@ -395,7 +395,7 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
startBlock, endBlock, numBlocks := req.startBlock, req.endBlock, req.numBlocks
// remainingSize should be calculated based on startBlock number
// otherwise end data will have null bytes.
remainingSize := size - startBlock*int64(req.effectiveChunkSize)
remainingSize := size - startBlock*int64(req.effectiveBlockSize)
if remainingSize <= 0 {
logger.Logger.Error("Nothing to download")
req.errorCB(
Expand Down Expand Up @@ -487,11 +487,12 @@ func (req *DownloadRequest) processDownload(ctx context.Context) {
}
}

// initEC will initialize erasure encoder/decoder
func (req *DownloadRequest) initEC() error {
var err error
req.ecEncoder, err = reedsolomon.New(
req.datashards, req.parityshards,
reedsolomon.WithAutoGoroutines(int(req.effectiveChunkSize)))
reedsolomon.WithAutoGoroutines(int(req.effectiveBlockSize)))

if err != nil {
return errors.New("init_ec",
Expand All @@ -500,6 +501,7 @@ func (req *DownloadRequest) initEC() error {
return nil
}

// initEncryption will initialize encScheme with client's keys
func (req *DownloadRequest) initEncryption() error {
req.encScheme = encryption.NewEncryptionScheme()
mnemonic := client.GetClient().Mnemonic
Expand Down Expand Up @@ -558,14 +560,14 @@ func (req *DownloadRequest) calculateShardsParams(
// fRef.ActualFileSize is size of file that does not include encryption bytes.
// that is why, actualPerShard will have different value for encrypted file.
effectivePerShardSize := (size + int64(req.datashards) - 1) / int64(req.datashards)
effectiveChunkSize := fRef.ChunkSize
effectiveBlockSize := fRef.ChunkSize
if fRef.EncryptedKey != "" {
effectiveChunkSize -= EncryptionHeaderSize + EncryptedDataPaddingSize
effectiveBlockSize -= EncryptionHeaderSize + EncryptedDataPaddingSize
}

req.effectiveChunkSize = int(effectiveChunkSize)
req.effectiveBlockSize = int(effectiveBlockSize)

chunksPerShard = (effectivePerShardSize + effectiveChunkSize - 1) / effectiveChunkSize
chunksPerShard = (effectivePerShardSize + effectiveBlockSize - 1) / effectiveBlockSize
actualPerShard = chunksPerShard * fRef.ChunkSize
if req.endBlock == 0 || req.endBlock > chunksPerShard {
req.endBlock = chunksPerShard
Expand Down
6 changes: 3 additions & 3 deletions zboxcore/sdk/downloadworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestDecodeEC(t *testing.T) {
req := DownloadRequest{}
req.datashards = 4
req.parityshards = 2
req.effectiveChunkSize = 64 * 1024
req.effectiveBlockSize = 64 * 1024

err := req.initEC()
require.NoError(t, err)
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestDecodeEC(t *testing.T) {
req := DownloadRequest{}
req.datashards = 4
req.parityshards = 2
req.effectiveChunkSize = 64 * 1024
req.effectiveBlockSize = 64 * 1024

err := req.initEC()
require.NoError(t, err)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestDecodeEC(t *testing.T) {
req := DownloadRequest{}
req.datashards = 4
req.parityshards = 2
req.effectiveChunkSize = 64 * 1024
req.effectiveBlockSize = 64 * 1024

err := req.initEC()
require.NoError(t, err)
Expand Down
55 changes: 36 additions & 19 deletions zboxcore/sdk/filerefsworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type ObjectTreeRequest struct {
allocationID string
allocationTx string
blobbers []*blockchain.StorageNode
authToken string
pathHash string
remotefilepath string
pageLimit int // numbers of refs that will be returned by blobber at max
level int
Expand Down Expand Up @@ -111,7 +113,20 @@ func (o *ObjectTreeRequest) GetRefs() (*ObjectTreeResult, error) {

func (o *ObjectTreeRequest) getFileRefs(oTR *oTreeResponse, bUrl string) {
defer o.wg.Done()
oReq, err := zboxutil.NewRefsRequest(bUrl, o.allocationID, o.remotefilepath, o.offsetPath, o.updatedDate, o.offsetDate, o.fileType, o.refType, o.level, o.pageLimit)
oReq, err := zboxutil.NewRefsRequest(
bUrl,
o.allocationID,
o.remotefilepath,
o.pathHash,
o.authToken,
o.offsetPath,
o.updatedDate,
o.offsetDate,
o.fileType,
o.refType,
o.level,
o.pageLimit,
)
if err != nil {
oTR.err = err
return
Expand Down Expand Up @@ -151,27 +166,29 @@ func (o *ObjectTreeRequest) getFileRefs(oTR *oTreeResponse, bUrl string) {
// i.e. we cannot calculate hash of response and have consensus on it
type ORef struct {
SimilarField
ID int64 `json:"id"`
ID int64 `json:"id"`
CreatedAt common.Timestamp `json:"created_at"`
UpdatedAt common.Timestamp `json:"updated_at"`
}

type SimilarField struct {
FileID string `json:"file_id"`
Type string `json:"type"`
AllocationID string `json:"allocation_id"`
LookupHash string `json:"lookup_hash"`
Name string `json:"name"`
Path string `json:"path"`
PathHash string `json:"path_hash"`
ParentPath string `json:"parent_path"`
PathLevel int `json:"level"`
Size int64 `json:"size"`
ActualFileSize int64 `json:"actual_file_size"`
ActualFileHash string `json:"actual_file_hash"`
MimeType string `json:"mimetype"`
ActualThumbnailSize int64 `json:"actual_thumbnail_size"`
ActualThumbnailHash string `json:"actual_thumbnail_hash"`
CreatedAt common.Timestamp `json:"created_at"`
UpdatedAt common.Timestamp `json:"updated_at"`
FileID string `json:"file_id"`
FileMetaHash string `json:"file_meta_hash"`
Type string `json:"type"`
AllocationID string `json:"allocation_id"`
LookupHash string `json:"lookup_hash"`
Name string `json:"name"`
Path string `json:"path"`
PathHash string `json:"path_hash"`
ParentPath string `json:"parent_path"`
PathLevel int `json:"level"`
Size int64 `json:"size"`
EncryptedKey string `json:"encrypted_key"`
ActualFileSize int64 `json:"actual_file_size"`
ActualFileHash string `json:"actual_file_hash"`
MimeType string `json:"mimetype"`
ActualThumbnailSize int64 `json:"actual_thumbnail_size"`
ActualThumbnailHash string `json:"actual_thumbnail_hash"`
}

type RecentlyAddedRefRequest struct {
Expand Down
Loading

0 comments on commit 0885b6e

Please sign in to comment.