Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded chunk file listing for OneDrive (with batching support) #645

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 204 additions & 8 deletions src/duplicacy_oneclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,24 @@ type OneDriveClient struct {
IsConnected bool
TestMode bool

IsBusiness bool
IsBusiness bool
MaxBatchReqs int

RefreshTokenURL string
APIURL string

BatchURL string
StripBatchPrefixURL string
}

func NewOneDriveClient(tokenFile string, isBusiness bool, client_id string, client_secret string, drive_id string) (*OneDriveClient, error) {
func NewOneDriveClient(
tokenFile string,
isBusiness bool,
max_batch_requests int,
client_id string,
client_secret string,
drive_id string,
) (*OneDriveClient, error) {

description, err := ioutil.ReadFile(tokenFile)
if err != nil {
Expand All @@ -64,12 +76,13 @@ func NewOneDriveClient(tokenFile string, isBusiness bool, client_id string, clie
}

client := &OneDriveClient{
HTTPClient: http.DefaultClient,
TokenFile: tokenFile,
Token: token,
OAConfig: nil,
TokenLock: &sync.Mutex{},
IsBusiness: isBusiness,
HTTPClient: http.DefaultClient,
TokenFile: tokenFile,
Token: token,
OAConfig: nil,
TokenLock: &sync.Mutex{},
IsBusiness: isBusiness,
MaxBatchReqs: max_batch_requests,
}

if (client_id != "") {
Expand All @@ -92,9 +105,12 @@ func NewOneDriveClient(tokenFile string, isBusiness bool, client_id string, clie
if drive_id != "" {
client.APIURL = "https://graph.microsoft.com/v1.0/drives/"+drive_id
}
client.BatchURL = "https://graph.microsoft.com/v1.0/$batch"
client.StripBatchPrefixURL = "https://graph.microsoft.com/v1.0/"
} else {
client.RefreshTokenURL = "https://duplicacy.com/one_refresh"
client.APIURL = "https://api.onedrive.com/v1.0/drive"
client.BatchURL = "<Batching is only supported by Graph API>"
}

client.RefreshToken(false)
Expand Down Expand Up @@ -272,16 +288,28 @@ func (client *OneDriveClient) RefreshToken(force bool) (err error) {
return nil
}


type OneDriveListReqItem struct {
Path string
URL string
}

type OneDriveEntry struct {
ID string
Name string
Folder map[string]interface{}
Size int64
}

type ErrorResponse struct {
Code string `json:"code"`
Message string `json:"message"`
}

type OneDriveListEntriesOutput struct {
Entries []OneDriveEntry `json:"value"`
NextLink string `json:"@odata.nextLink"`
Error ErrorResponse `json:"error"`
}

func (client *OneDriveClient) ListEntries(path string) ([]OneDriveEntry, error) {
Expand Down Expand Up @@ -324,6 +352,174 @@ func (client *OneDriveClient) ListEntries(path string) ([]OneDriveEntry, error)
return entries, nil
}

func (client *OneDriveClient) ListPathToURL(path string) (url string) {

url = client.APIURL + "/root:/" + path + ":/children"
if path == "" {
url = client.APIURL + "/root/children"
}
if client.TestMode {
url += "?top=8"
} else {
url += "?top=1000"
}
url += "&select=name,size,folder"

return url
}


type BatchRequestItem struct {
Id string `json:"id"`
Method string `json:"method"`
URL string `json:"url"`
}

type BatchRequest struct {
Requests []BatchRequestItem `json:"requests"`
}

type BatchResponseItem struct {
Id string `json:"id"`
Status int `json:"status"`
Headers map[string]string `json:"headers"`
Body OneDriveListEntriesOutput `json:"body"`
}

type BatchResponse struct {
Responses []BatchResponseItem `json:"responses"`
}


func (client *OneDriveClient) ListEntriesBatch(
prefix string,
batchReqs []OneDriveListReqItem,
) (
entriesPerReq []OneDriveListEntriesOutput,
newReqs []OneDriveListReqItem,
err error,
) {
nReqs := len(batchReqs)

entriesPerReq = make([]OneDriveListEntriesOutput, nReqs, nReqs)
newReqs = make([]OneDriveListReqItem, 0, 0)

for i, req := range batchReqs {
if req.URL == "" {
batchReqs[i].URL = client.ListPathToURL(prefix + "/" + batchReqs[i].Path)
}
}

if client.IsBusiness && nReqs > 1 {
// OneDrive Business uses Graph API which supports batching
breq := BatchRequest{}
breq.Requests = make([]BatchRequestItem, len(batchReqs), len(batchReqs))
for i, req := range batchReqs {
breq.Requests[i].Id = strconv.Itoa(i+1)
breq.Requests[i].Method = "GET"
breq.Requests[i].URL = req.URL[len(client.StripBatchPrefixURL):]
}
tracestr := fmt.Sprintf("Batch payload: %d requests", len(breq.Requests))
for _, req := range breq.Requests {
tracestr = tracestr + fmt.Sprintf("\n\t\t%s %s", req.Method, req.URL)
}
LOG_TRACE("ONEDRIVE_BATCH", tracestr)

readCloser, _, err := client.call(client.BatchURL, "POST", breq, "application/json")
if err != nil {
return nil, nil, err
}

defer readCloser.Close()

bresp := &BatchResponse{}

if err = json.NewDecoder(readCloser).Decode(&bresp); err != nil {
return nil, nil, err
}
if len(bresp.Responses) != len(batchReqs) {
return nil, nil, fmt.Errorf("Batch response length mismatch with requests length: %d != %d", len(bresp.Responses), len(batchReqs))
}
throttleDelay := 0

for _, resp := range bresp.Responses {
nresp, err := strconv.Atoi(resp.Id)
if err != nil {
return nil, nil, err
}
nresp-- // ids are 1-based in the response payload

if resp.Status == 200 { // success
entriesPerReq[nresp] = resp.Body
if entriesPerReq[nresp].NextLink != "" {
newReqs = append(
newReqs,
OneDriveListReqItem{
Path: batchReqs[nresp].Path,
URL: entriesPerReq[nresp].NextLink,
},
)
}
} else if resp.Status == 429 { // throttled
var backoff int
backoffList, found := resp.Headers["Retry-After"]
if found && len(backoffList)>0 {
backoff, _ = strconv.Atoi(backoffList)
backoff *= 1000 // s to ms
} else {
backoff = 300000 // 5 minutes by default
}
if backoff > throttleDelay {
throttleDelay = backoff
}
LOG_INFO("ONEDRIVE_RETRY", "Batch item response code: %d; suggested retry is %d milliseconds", resp.Status, backoff)
// Retry the same URL
newReqs = append(newReqs, batchReqs[nresp])
} else if resp.Status == 400 || resp.Status == 401 {
// Some errors are expected, e.g. unauthorized / expired token
// Retry the same URL
newReqs = append(newReqs, batchReqs[nresp])
} else { // unexpected error
errmsg := resp.Body.Error.Message
//LOG_TRACE("ONEDRIVE_BATCH", "Unexpected batch response error %d: %s / %s", resp.Status, http.StatusText(resp.Status), errmsg)
return nil, nil, fmt.Errorf("Unexpected batch response error %d: %s / %s", resp.Status, http.StatusText(resp.Status), errmsg)
}
}

if throttleDelay > 0 {
LOG_INFO("ONEDRIVE_RETRY", "Batch request throttled; retry after %d milliseconds", throttleDelay)
time.Sleep(time.Duration(throttleDelay) * time.Millisecond)
throttleDelay = 0
}

} else {
// Run without batching
for i, req := range batchReqs {
readCloser, _, err := client.call(req.URL, "GET", 0, "")
if err != nil {
return nil, nil, err
}

defer readCloser.Close()

if err = json.NewDecoder(readCloser).Decode(&entriesPerReq[i]); err != nil {
return nil, nil, err
}
if entriesPerReq[i].NextLink != "" {
newReqs = append(
newReqs,
OneDriveListReqItem{
Path: batchReqs[i].Path,
URL: entriesPerReq[i].NextLink,
},
)
}
}
}

return entriesPerReq, newReqs, nil
}

func (client *OneDriveClient) GetFileInfo(path string) (string, bool, int64, error) {

url := client.APIURL + "/root:/" + path
Expand Down
Loading