From 488abc3d6c1816284373d94eed11c746a0a1977c Mon Sep 17 00:00:00 2001 From: Victor Mozgin Date: Wed, 22 Feb 2023 05:25:02 -0500 Subject: [PATCH] Multithreaded chunk listing for OneDrive (with batching and configuration) --- src/duplicacy_oneclient.go | 212 ++++++++++++++++++++++++++++++++++-- src/duplicacy_onestorage.go | 174 +++++++++++++++++++++++++++-- src/duplicacy_storage.go | 23 +++- 3 files changed, 389 insertions(+), 20 deletions(-) diff --git a/src/duplicacy_oneclient.go b/src/duplicacy_oneclient.go index 0f25bb91..e166feb3 100644 --- a/src/duplicacy_oneclient.go +++ b/src/duplicacy_oneclient.go @@ -46,12 +46,24 @@ type OneDriveClient struct { IsConnected bool TestMode bool - IsBusiness bool + IsBusiness bool + MaxBatchReqs int + RefreshTokenURL string APIURL string + + BatchURL string + StripBatchPrefixURL string } -func NewOneDriveClient(tokenFile string, isBusiness bool, client_id string, client_secret string, drive_id string) (*OneDriveClient, error) { +func NewOneDriveClient( + tokenFile string, + isBusiness bool, + max_batch_requests int, + client_id string, + client_secret string, + drive_id string, +) (*OneDriveClient, error) { description, err := ioutil.ReadFile(tokenFile) if err != nil { @@ -64,12 +76,13 @@ func NewOneDriveClient(tokenFile string, isBusiness bool, client_id string, clie } client := &OneDriveClient{ - HTTPClient: http.DefaultClient, - TokenFile: tokenFile, - Token: token, - OAConfig: nil, - TokenLock: &sync.Mutex{}, - IsBusiness: isBusiness, + HTTPClient: http.DefaultClient, + TokenFile: tokenFile, + Token: token, + OAConfig: nil, + TokenLock: &sync.Mutex{}, + IsBusiness: isBusiness, + MaxBatchReqs: max_batch_requests, } if (client_id != "") { @@ -92,9 +105,12 @@ func NewOneDriveClient(tokenFile string, isBusiness bool, client_id string, clie if drive_id != "" { client.APIURL = "https://graph.microsoft.com/v1.0/drives/"+drive_id } + client.BatchURL = "https://graph.microsoft.com/v1.0/$batch" + client.StripBatchPrefixURL = "https://graph.microsoft.com/v1.0/" } else { client.RefreshTokenURL = "https://duplicacy.com/one_refresh" client.APIURL = "https://api.onedrive.com/v1.0/drive" + client.BatchURL = "" } client.RefreshToken(false) @@ -272,6 +288,12 @@ func (client *OneDriveClient) RefreshToken(force bool) (err error) { return nil } + +type OneDriveListReqItem struct { + Path string + URL string +} + type OneDriveEntry struct { ID string Name string @@ -279,9 +301,15 @@ type OneDriveEntry struct { Size int64 } +type ErrorResponse struct { + Code string `json:"code"` + Message string `json:"message"` +} + type OneDriveListEntriesOutput struct { Entries []OneDriveEntry `json:"value"` NextLink string `json:"@odata.nextLink"` + Error ErrorResponse `json:"error"` } func (client *OneDriveClient) ListEntries(path string) ([]OneDriveEntry, error) { @@ -324,6 +352,174 @@ func (client *OneDriveClient) ListEntries(path string) ([]OneDriveEntry, error) return entries, nil } +func (client *OneDriveClient) ListPathToURL(path string) (url string) { + + url = client.APIURL + "/root:/" + path + ":/children" + if path == "" { + url = client.APIURL + "/root/children" + } + if client.TestMode { + url += "?top=8" + } else { + url += "?top=1000" + } + url += "&select=name,size,folder" + + return url +} + + +type BatchRequestItem struct { + Id string `json:"id"` + Method string `json:"method"` + URL string `json:"url"` +} + +type BatchRequest struct { + Requests []BatchRequestItem `json:"requests"` +} + +type BatchResponseItem struct { + Id string `json:"id"` + Status int `json:"status"` + Headers map[string]string `json:"headers"` + Body OneDriveListEntriesOutput `json:"body"` +} + +type BatchResponse struct { + Responses []BatchResponseItem `json:"responses"` +} + + +func (client *OneDriveClient) ListEntriesBatch( + prefix string, + batchReqs []OneDriveListReqItem, +) ( + entriesPerReq []OneDriveListEntriesOutput, + newReqs []OneDriveListReqItem, + err error, +) { + nReqs := len(batchReqs) + + entriesPerReq = make([]OneDriveListEntriesOutput, nReqs, nReqs) + newReqs = make([]OneDriveListReqItem, 0, 0) + + for i, req := range batchReqs { + if req.URL == "" { + batchReqs[i].URL = client.ListPathToURL(prefix + "/" + batchReqs[i].Path) + } + } + + if client.IsBusiness && nReqs > 1 { + // OneDrive Business uses Graph API which supports batching + breq := BatchRequest{} + breq.Requests = make([]BatchRequestItem, len(batchReqs), len(batchReqs)) + for i, req := range batchReqs { + breq.Requests[i].Id = strconv.Itoa(i+1) + breq.Requests[i].Method = "GET" + breq.Requests[i].URL = req.URL[len(client.StripBatchPrefixURL):] + } + tracestr := fmt.Sprintf("Batch payload: %d requests", len(breq.Requests)) + for _, req := range breq.Requests { + tracestr = tracestr + fmt.Sprintf("\n\t\t%s %s", req.Method, req.URL) + } + LOG_TRACE("ONEDRIVE_BATCH", tracestr) + + readCloser, _, err := client.call(client.BatchURL, "POST", breq, "application/json") + if err != nil { + return nil, nil, err + } + + defer readCloser.Close() + + bresp := &BatchResponse{} + + if err = json.NewDecoder(readCloser).Decode(&bresp); err != nil { + return nil, nil, err + } + if len(bresp.Responses) != len(batchReqs) { + return nil, nil, fmt.Errorf("Batch response length mismatch with requests length: %d != %d", len(bresp.Responses), len(batchReqs)) + } + throttleDelay := 0 + + for _, resp := range bresp.Responses { + nresp, err := strconv.Atoi(resp.Id) + if err != nil { + return nil, nil, err + } + nresp-- // ids are 1-based in the response payload + + if resp.Status == 200 { // success + entriesPerReq[nresp] = resp.Body + if entriesPerReq[nresp].NextLink != "" { + newReqs = append( + newReqs, + OneDriveListReqItem{ + Path: batchReqs[nresp].Path, + URL: entriesPerReq[nresp].NextLink, + }, + ) + } + } else if resp.Status == 429 { // throttled + var backoff int + backoffList, found := resp.Headers["Retry-After"] + if found && len(backoffList)>0 { + backoff, _ = strconv.Atoi(backoffList) + backoff *= 1000 // s to ms + } else { + backoff = 300000 // 5 minutes by default + } + if backoff > throttleDelay { + throttleDelay = backoff + } + LOG_INFO("ONEDRIVE_RETRY", "Batch item response code: %d; suggested retry is %d milliseconds", resp.Status, backoff) + // Retry the same URL + newReqs = append(newReqs, batchReqs[nresp]) + } else if resp.Status == 400 || resp.Status == 401 { + // Some errors are expected, e.g. unauthorized / expired token + // Retry the same URL + newReqs = append(newReqs, batchReqs[nresp]) + } else { // unexpected error + errmsg := resp.Body.Error.Message + //LOG_TRACE("ONEDRIVE_BATCH", "Unexpected batch response error %d: %s / %s", resp.Status, http.StatusText(resp.Status), errmsg) + return nil, nil, fmt.Errorf("Unexpected batch response error %d: %s / %s", resp.Status, http.StatusText(resp.Status), errmsg) + } + } + + if throttleDelay > 0 { + LOG_INFO("ONEDRIVE_RETRY", "Batch request throttled; retry after %d milliseconds", throttleDelay) + time.Sleep(time.Duration(throttleDelay) * time.Millisecond) + throttleDelay = 0 + } + + } else { + // Run without batching + for i, req := range batchReqs { + readCloser, _, err := client.call(req.URL, "GET", 0, "") + if err != nil { + return nil, nil, err + } + + defer readCloser.Close() + + if err = json.NewDecoder(readCloser).Decode(&entriesPerReq[i]); err != nil { + return nil, nil, err + } + if entriesPerReq[i].NextLink != "" { + newReqs = append( + newReqs, + OneDriveListReqItem{ + Path: batchReqs[i].Path, + URL: entriesPerReq[i].NextLink, + }, + ) + } + } + } + + return entriesPerReq, newReqs, nil +} + func (client *OneDriveClient) GetFileInfo(path string) (string, bool, int64, error) { url := client.APIURL + "/root:/" + path diff --git a/src/duplicacy_onestorage.go b/src/duplicacy_onestorage.go index fb0fc804..e7ad4f8d 100644 --- a/src/duplicacy_onestorage.go +++ b/src/duplicacy_onestorage.go @@ -8,24 +8,41 @@ import ( "fmt" "path" "strings" + "sync" ) type OneDriveStorage struct { StorageBase - client *OneDriveClient - storageDir string - numberOfThread int + client *OneDriveClient + storageDir string + numberOfThreads int } // CreateOneDriveStorage creates an OneDrive storage object. -func CreateOneDriveStorage(tokenFile string, isBusiness bool, storagePath string, threads int, client_id string, client_secret string, drive_id string) (storage *OneDriveStorage, err error) { +func CreateOneDriveStorage( + tokenFile string, + isBusiness bool, + storagePath string, + threads int, + max_batch_requests int, + client_id string, + client_secret string, + drive_id string, +) (storage *OneDriveStorage, err error) { for len(storagePath) > 0 && storagePath[len(storagePath)-1] == '/' { storagePath = storagePath[:len(storagePath)-1] } - client, err := NewOneDriveClient(tokenFile, isBusiness, client_id, client_secret, drive_id) + client, err := NewOneDriveClient( + tokenFile, + isBusiness, + max_batch_requests, + client_id, + client_secret, + drive_id, + ) if err != nil { return nil, err } @@ -44,9 +61,9 @@ func CreateOneDriveStorage(tokenFile string, isBusiness bool, storagePath string } storage = &OneDriveStorage{ - client: client, - storageDir: storagePath, - numberOfThread: threads, + client: client, + storageDir: storagePath, + numberOfThreads: threads, } for _, path := range []string{"chunks", "fossils", "snapshots"} { @@ -78,8 +95,9 @@ func (storage *OneDriveStorage) convertFilePath(filePath string) string { return filePath } + // ListFiles return the list of files and subdirectories under 'dir' (non-recursively) -func (storage *OneDriveStorage) ListFiles(threadIndex int, dir string) ([]string, []int64, error) { +func (storage *OneDriveStorage) ListFilesNotThreaded(threadIndex int, dir string) ([]string, []int64, error) { for len(dir) > 0 && dir[len(dir)-1] == '/' { dir = dir[:len(dir)-1] @@ -145,6 +163,140 @@ func (storage *OneDriveStorage) ListFiles(threadIndex int, dir string) ([]string } +// ListFiles return the list of files and subdirectories under 'dir' (non-recursively) +func (storage *OneDriveStorage) ListFiles(threadIndex int, dir string) ([]string, []int64, error) { + + for len(dir) > 0 && dir[len(dir)-1] == '/' { + dir = dir[:len(dir)-1] + } + + if dir == "snapshots" { + // Not threaded + entries, err := storage.client.ListEntries(storage.storageDir + "/" + dir) + if err != nil { + return nil, nil, err + } + + subDirs := []string{} + for _, entry := range entries { + if len(entry.Folder) > 0 { + subDirs = append(subDirs, entry.Name+"/") + } + } + return subDirs, nil, nil + } else if strings.HasPrefix(dir, "snapshots/") || strings.HasPrefix(dir, "benchmark") { + // Not threaded + entries, err := storage.client.ListEntries(storage.storageDir + "/" + dir) + if err != nil { + return nil, nil, err + } + + files := []string{} + + for _, entry := range entries { + if len(entry.Folder) == 0 { + files = append(files, entry.Name) + } + } + return files, nil, nil + } else { + // Batched and threaded + lock := sync.Mutex {} + allFiles := []string{} + allSizes := []int64{} + + errorChannel := make(chan error) + requestChannel := make(chan OneDriveListReqItem) + activeWorkers := 0 + + requests := []OneDriveListReqItem{ + {Path:"chunks", URL:""}, + {Path:"fossils", URL:""}, + } + + maxRequestsPerThread := 1 + if storage.client.MaxBatchReqs > 1 { + // OneDrive Business works through Graph API + // which supports batch requests (20 is the max) + maxRequestsPerThread = storage.client.MaxBatchReqs + } + + for len(requests) > 0 || activeWorkers > 0 { + if len(requests) > 0 && activeWorkers < storage.numberOfThreads { + n_batchReqs := len(requests) + if n_batchReqs > maxRequestsPerThread { + n_batchReqs = maxRequestsPerThread + } + // Dequeue n_batch_reqs from the request queue + batchReqs := requests[:n_batchReqs] + requests = requests[n_batchReqs:] + activeWorkers++ + + go func(batchReqs []OneDriveListReqItem) { + // Will do non-batching if disabled / not supported + entriesPerReq, newReqs, err := storage.client.ListEntriesBatch(storage.storageDir, batchReqs) + if err != nil { + errorChannel <- err + return + } + + // send paging requests first + for _, pageReq := range newReqs { + requestChannel <- pageReq + } + + files := []string {} + sizes := []int64 {} + + for i, entries := range entriesPerReq { + LOG_DEBUG("ONE_STORAGE", "Listing %s; %d items returned", batchReqs[i].Path, len(entries.Entries)) + for _, entry := range entries.Entries { + if len(entry.Folder) == 0 { + name := entry.Name + if strings.HasPrefix(batchReqs[i].Path, "fossils") { + name = batchReqs[i].Path + "/" + name + ".fsl" + name = name[len("fossils/"):] + } else { + name = batchReqs[i].Path + "/" + name + name = name[len("chunks/"):] + } + files = append(files, name) + sizes = append(sizes, entry.Size) + } else { + recurseDirRequest := OneDriveListReqItem{ + Path: batchReqs[i].Path + "/" + entry.Name, + URL: "", + } + requestChannel <- recurseDirRequest + } + } + } + lock.Lock() + allFiles = append(allFiles, files...) + allSizes = append(allSizes, sizes...) + lock.Unlock() + requestChannel <- OneDriveListReqItem{Path:"", URL:""} + } (batchReqs) + } + + if activeWorkers > 0 { + select { + case err := <- errorChannel: + return nil, nil, err + case request := <- requestChannel: + if request.Path == "" { + activeWorkers-- + } else { + requests = append(requests, request) + } + } + } + } + + return allFiles, allSizes, nil + } +} + // DeleteFile deletes the file or directory at 'filePath'. func (storage *OneDriveStorage) DeleteFile(threadIndex int, filePath string) (err error) { filePath = storage.convertFilePath(filePath) @@ -211,13 +363,13 @@ func (storage *OneDriveStorage) DownloadFile(threadIndex int, filePath string, c defer readCloser.Close() - _, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/storage.numberOfThread) + _, err = RateLimitedCopy(chunk, readCloser, storage.DownloadRateLimit/storage.numberOfThreads) return err } // UploadFile writes 'content' to the file at 'filePath'. func (storage *OneDriveStorage) UploadFile(threadIndex int, filePath string, content []byte) (err error) { - err = storage.client.UploadFile(storage.storageDir+"/"+filePath, content, storage.UploadRateLimit/storage.numberOfThread) + err = storage.client.UploadFile(storage.storageDir+"/"+filePath, content, storage.UploadRateLimit/storage.numberOfThreads) if e, ok := err.(OneDriveError); ok && e.Status == 409 { LOG_TRACE("ONEDRIVE_UPLOAD", "File %s already exists", filePath) diff --git a/src/duplicacy_storage.go b/src/duplicacy_storage.go index 0ad6891d..4a3457f5 100644 --- a/src/duplicacy_storage.go +++ b/src/duplicacy_storage.go @@ -669,7 +669,28 @@ func CreateStorage(preference Preference, resetPassword bool, threads int) (stor client_secret = GetPassword(preference, matched[1] + "_client_secret", prompt, true, resetPassword) } - oneDriveStorage, err := CreateOneDriveStorage(tokenFile, matched[1] == "odb", storagePath, threads, client_id, client_secret, drive_id) + // OneDrive Business uses Graph API which supports request batching + // "disabled" - (-1) disabled + // "max" - ( 0) enabled, max requests per batch (normally, 20) + // "" - (nn) enabled, specified requests per batch + max_batch_requests := -1 + + if matched[1] == "odb" { + max_batch_requests_str := GetPasswordFromPreference(preference, matched[1] + "_max_batch_requests") + if max_batch_requests_str == "max" { + max_batch_requests = 20 + } else if max_batch_requests_str != "" { + n, err := strconv.Atoi(max_batch_requests_str) + if err == nil { + max_batch_requests = n + if max_batch_requests > 20 { + max_batch_requests = 20 + } + } + } + } + + oneDriveStorage, err := CreateOneDriveStorage(tokenFile, matched[1] == "odb", storagePath, threads, max_batch_requests, client_id, client_secret, drive_id) if err != nil { LOG_ERROR("STORAGE_CREATE", "Failed to load the OneDrive storage at %s: %v", storageURL, err) return nil