Skip to content

Commit

Permalink
Add webstreaming to multiupload (#1190)
Browse files Browse the repository at this point in the history
* add webstreaming

* fix typos

* add webstreaming to wasm multiupload

* fix typo
  • Loading branch information
din-mukhammed authored and YarikRevich committed Aug 30, 2023
1 parent 9a51ef3 commit 47cfad4
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 36 deletions.
23 changes: 14 additions & 9 deletions mobilesdk/zbox/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ type MultiOperationOption struct {
}

type MultiUploadOption struct {
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsWebstreaming bool `json:"isWebstreaming,omitempty"`
}

type MultiDownloadOption struct {
Expand Down Expand Up @@ -336,6 +337,7 @@ func MultiUpload(allocationID string, workdir string, jsonMultiUploadOptions str
encrypts := make([]bool, totalUploads)
chunkNumbers := make([]int, totalUploads)
isUpdates := make([]bool, totalUploads)
isWebstreaming := make([]bool, totalUploads)
for idx, option := range options {
filePaths[idx] = option.FilePath
fileNames[idx] = option.FileName
Expand All @@ -344,13 +346,14 @@ func MultiUpload(allocationID string, workdir string, jsonMultiUploadOptions str
chunkNumbers[idx] = option.ChunkNumber
encrypts[idx] = option.Encrypt
isUpdates[idx] = false
isWebstreaming[idx] = option.IsWebstreaming
}

a, err := getAllocation(allocationID)
if err != nil {
return err
}
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, &StatusCallbackWrapped{Callback: statusCb})
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, &StatusCallbackWrapped{Callback: statusCb})

}

Expand All @@ -373,6 +376,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str
encrypts := make([]bool, totalUploads)
chunkNumbers := make([]int, totalUploads)
isUpdates := make([]bool, totalUploads)
isWebstreaming := make([]bool, totalUploads)
for idx, option := range options {
filePaths[idx] = option.FilePath
fileNames[idx] = option.FileName
Expand All @@ -381,6 +385,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str
chunkNumbers[idx] = option.ChunkNumber
encrypts[idx] = option.Encrypt
isUpdates[idx] = true
isWebstreaming[idx] = option.IsWebstreaming
}
if err != nil {
return err
Expand All @@ -390,7 +395,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str
if err != nil {
return err
}
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, &StatusCallbackWrapped{Callback: statusCb})
return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, &StatusCallbackWrapped{Callback: statusCb})

}

Expand Down
12 changes: 7 additions & 5 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ type BulkUploadOption struct {
Webstreaming bool `json:"webstreaming,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsRepair bool `json:"isRepair,omitempty"`
IsWebstreaming bool `json:"isWebstreaming,omitempty"`

NumBlocks int `json:"numBlocks,omitempty"`
FileSize int64 `json:"fileSize,omitempty"`
Expand Down Expand Up @@ -671,11 +672,12 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
sdk.WithChunkNumber(numBlocks),
}
operationRequests[idx] = sdk.OperationRequest{
FileMeta: fileMeta,
FileReader: fileReader,
OperationType: FileOperationInsert,
Opts: options,
Workdir: "/",
FileMeta: fileMeta,
FileReader: fileReader,
OperationType: FileOperationInsert,
Opts: options,
Workdir: "/",
IsWebstreaming: option.IsWebstreaming,
}

}
Expand Down
7 changes: 4 additions & 3 deletions winsdk/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ type UploadFile struct {
Path string
ThumbnailPath string

RemotePath string
Encrypt bool
IsUpdate bool
RemotePath string
Encrypt bool
IsUpdate bool
IsWebstreaming bool

ChunkNumber int
}
19 changes: 11 additions & 8 deletions winsdk/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,14 @@ type MultiOperationOption struct {
}

type MultiUploadOption struct {
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
FilePath string `json:"filePath,omitempty"`
FileName string `json:"fileName,omitempty"`
RemotePath string `json:"remotePath,omitempty"`
ThumbnailPath string `json:"thumbnailPath,omitempty"`
Encrypt bool `json:"encrypt,omitempty"`
ChunkNumber int `json:"chunkNumber,omitempty"`
IsUpdate bool `json:"isUpdate,omitempty"`
IsWebstreaming bool `json:"isWebstreaming,omitempty"`
}

// MultiOperation - do copy, move, delete and createdir operation together
Expand Down Expand Up @@ -323,6 +324,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char {
chunkNumbers := make([]int, totalUploads)
encrypts := make([]bool, totalUploads)
isUpdates := make([]bool, totalUploads)
isWebstreaming := make([]bool, totalUploads)

statusBar := &StatusCallback{
status: make(map[string]*Status),
Expand All @@ -335,6 +337,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char {
remotePaths[idx] = option.RemotePath
chunkNumbers[idx] = option.ChunkNumber
isUpdates[idx] = option.IsUpdate
isWebstreaming[idx] = option.IsWebstreaming
encrypts[idx] = option.Encrypt
statusBar.status[option.RemotePath+option.Name] = &Status{}
}
Expand All @@ -346,7 +349,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char {

statusCaches.Add(C.GoString(uploadID), statusBar)

err = a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, statusBar)
err = a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, statusBar)
if err != nil {
return WithJSON(nil, err)
}
Expand Down
22 changes: 13 additions & 9 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,13 @@ type Allocation struct {
}

type OperationRequest struct {
OperationType string
LocalPath string
RemotePath string
DestName string // Required only for rename operation
DestPath string // Required for copy and move operation
IsUpdate bool
OperationType string
LocalPath string
RemotePath string
DestName string // Required only for rename operation
DestPath string // Required for copy and move operation
IsUpdate bool
IsWebstreaming bool

// Required for uploads
Workdir string
Expand Down Expand Up @@ -495,7 +496,7 @@ func (a *Allocation) EncryptAndUploadFileWithThumbnail(
)
}

func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileNames []string, thumbnailPaths []string, encrypts []bool, chunkNumbers []int, remotePaths []string, isUpdate []bool, status StatusCallback) error {
func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileNames []string, thumbnailPaths []string, encrypts []bool, chunkNumbers []int, remotePaths []string, isUpdate []bool, isWebstreaming []bool, status StatusCallback) error {
if len(localPaths) != len(thumbnailPaths) {
return errors.New("invalid_value", "length of localpaths and thumbnailpaths must be equal")
}
Expand Down Expand Up @@ -588,6 +589,9 @@ func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileN
if isUpdate[idx] {
operationRequests[idx].OperationType = constants.FileOperationUpdate
}
if isWebstreaming[idx] {
operationRequests[idx].IsWebstreaming = true
}

}
err := a.DoMultiOperation(operationRequests)
Expand Down Expand Up @@ -873,13 +877,13 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error {
operation = NewMoveOperation(op.RemotePath, op.DestPath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)

case constants.FileOperationInsert:
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, false, op.Opts...)
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, false, op.IsWebstreaming, op.Opts...)

case constants.FileOperationDelete:
operation = NewDeleteOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)

case constants.FileOperationUpdate:
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, true, op.Opts...)
operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, true, op.IsWebstreaming, op.Opts...)

case constants.FileOperationCreateDir:
operation = NewDirOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx)
Expand Down
6 changes: 4 additions & 2 deletions zboxcore/sdk/upload_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ type UploadOperation struct {
opts []ChunkedUploadOption
refs []*fileref.FileRef
isUpdate bool
isWebstreaming bool
statusCallback StatusCallback
opCode int
}

func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) {
cu, err := CreateChunkedUpload(uo.workdir, allocObj, uo.fileMeta, uo.fileReader, uo.isUpdate, false, false, connectionID, uo.opts...)
cu, err := CreateChunkedUpload(uo.workdir, allocObj, uo.fileMeta, uo.fileReader, uo.isUpdate, false, uo.isWebstreaming, connectionID, uo.opts...)
if err != nil {
uploadMask := zboxutil.NewUint128(1).Lsh(uint64(len(allocObj.Blobbers))).Sub64(1)
return nil, uploadMask, err
Expand Down Expand Up @@ -123,12 +124,13 @@ func (uo *UploadOperation) Error(allocObj *Allocation, consensus int, err error)
}
}

func NewUploadOperation(workdir string, fileMeta FileMeta, fileReader io.Reader, isUpdate bool, opts ...ChunkedUploadOption) *UploadOperation {
func NewUploadOperation(workdir string, fileMeta FileMeta, fileReader io.Reader, isUpdate, isWebstreaming bool, opts ...ChunkedUploadOption) *UploadOperation {
uo := &UploadOperation{}
uo.workdir = workdir
uo.fileMeta = fileMeta
uo.fileReader = fileReader
uo.opts = opts
uo.isUpdate = isUpdate
uo.isWebstreaming = isWebstreaming
return uo
}

0 comments on commit 47cfad4

Please sign in to comment.