Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix incorrect recover #5295

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 67 additions & 14 deletions AWSS3/AWSS3TransferUtility.m
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ - (instancetype)initWithConfiguration:(AWSServiceConfiguration *)serviceConfigur
}

//Create the NS URL session
NSURLSessionConfiguration *configuration = [NSURLSessionConfiguration backgroundSessionConfigurationWithIdentifier:_sessionIdentifier];
// Modify by Dark: don't use backgrouond mode, it will make timeout not working
NSURLSessionConfiguration *configuration = [NSURLSessionConfiguration defaultSessionConfiguration];
configuration.allowsCellularAccess = serviceConfiguration.allowsCellularAccess;
configuration.timeoutIntervalForResource = transferUtilityConfiguration.timeoutIntervalForResource;

Expand Down Expand Up @@ -410,12 +411,17 @@ - (void) hydrateFromDB:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary
AWSDDLogDebug(@"Found MultiPartUpload [%@] with Multipart ID [%@] and status [%@]",transferUtilityMultiPartUploadTask.transferID,transferUtilityMultiPartUploadTask.uploadID, @(transferUtilityMultiPartUploadTask.status) );
}
else if ([transferType isEqualToString:@"MULTI_PART_UPLOAD_SUB_TASK"]) {
AWSS3TransferUtilityUploadSubTask *subTask = [self hydrateMultiPartUploadSubTask:task sessionTaskID:sessionTaskID];
AWSDDLogDebug(@"Found MultiPartUpload SubTask [%@] with taskNumber [%@] and status [%@]",subTask.transferID,@(subTask.taskIdentifier), @(subTask.status) );
// Modify by Dart: new created subTask couldn't assign taskIdentifier,
// otherwise it will make a mistake when removing task from self.taskDictionary
// coz when subTask associate with a real NSURLSessionTask, it's taskIdentifier will be duplicated by an existing taskIdentifier recovered from DB.
// AWSS3TransferUtilityUploadSubTask *subTask = [self hydrateMultiPartUploadSubTask:task sessionTaskID:sessionTaskID];
AWSS3TransferUtilityUploadSubTask *subTask = [self hydrateMultiPartUploadSubTask:task sessionTaskID:0];
AWSDDLogDebug(@"Found MultiPartUpload SubTask [%@] with taskNumber [%@] and status [%@]",subTask.transferID, @(sessionTaskID), @(subTask.status) );

//Get the Master MultiPart record from the Dictionary.
AWSS3TransferUtilityMultiPartUploadTask *multiPartUploadTask = [tempMultiPartMasterTaskDictionary objectForKey:subTask.uploadID];
if ( !multiPartUploadTask ) {
AWSDDLogError(@"Couldn't find the multipart upload master record by uploadID:%@", subTask.uploadID);
//Couldn't find the multipart upload master record. Must be an orphan part record. Clean up the DB and continue.
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:subTask.transferID databaseQueue:self->_databaseQueue];
continue;
Expand Down Expand Up @@ -546,6 +552,8 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster
//Handle any stragglers.
[self handleUnlinkedTransfers:tempMultiPartMasterTaskDictionary tempTransferDictionary:tempTransferDictionary];

AWSDDLogDebug(@"Recover result:%@", [self.taskDictionary allKeys]);

//Call completion handler if one was provided.
if (completionHandler) {
completionHandler(nil);
Expand All @@ -568,6 +576,7 @@ - (void) markTransferAsCompleted:(AWSS3TransferUtilityTask *) transferUtilityTas
}
[self.completedTaskDictionary setObject:transferUtilityTask forKey:transferUtilityTask.transferID];
[self.taskDictionary removeObjectForKey:@(transferUtilityTask.taskIdentifier)];
AWSDDLogDebug(@"A>Remove task:%@ from taskDictionary, allKeys:%@", @(transferUtilityTask.taskIdentifier), self.taskDictionary.allKeys);
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityTask.transferID databaseQueue:self->_databaseQueue];
}

Expand Down Expand Up @@ -610,8 +619,9 @@ - (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTask
AWSS3TransferUtilityUploadSubTask *subTask = obj;
AWSS3TransferUtilityMultiPartUploadTask *multiPartUploadTask = [tempMultiPartMasterTaskDictionary objectForKey:subTask.uploadID];
[self retryUploadSubTask: multiPartUploadTask subTask:subTask startTransfer:NO];
subTask.status = AWSS3TransferUtilityTransferStatusWaiting;
[multiPartUploadTask.waitingPartsDictionary setObject:subTask forKey:@(subTask.taskIdentifier)];
// Modify by Dart: [retryUploadSubTask] has set the appropriate dictionary (Waiting or inProgress)
// subTask.status = AWSS3TransferUtilityTransferStatusWaiting;
// [multiPartUploadTask.waitingPartsDictionary setObject:subTask forKey:@(subTask.taskIdentifier)];
}
else if ([obj isKindOfClass:[AWSS3TransferUtilityDownloadTask class]]) {

Expand Down Expand Up @@ -646,10 +656,10 @@ - (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTask

AWSS3TransferUtilityMultiPartUploadTask *multiPartUploadTask = [tempMultiPartMasterTaskDictionary objectForKey:uploadID];
[self.taskDictionary setObject:multiPartUploadTask forKey:multiPartUploadTask.uploadID];

AWSDDLogDebug(@"Multipart transfer status is [%@]", @(multiPartUploadTask.status));

if (multiPartUploadTask.status == AWSS3TransferUtilityTransferStatusPaused) {
AWSDDLogDebug(@"Skipped for paused status");
continue;
}

Expand All @@ -665,8 +675,9 @@ - (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTask
//Remove it from the waitingList
[multiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(nextSubTask.taskIdentifier)];
AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(nextSubTask.taskIdentifier), multiPartUploadTask.uploadID);

[nextSubTask.sessionTask resume];

numberOfPartsInProgress++;
continue;
}
Expand Down Expand Up @@ -1013,7 +1024,8 @@ - (NSURLSessionUploadTask *)getURLSessionUploadTaskWithRequest:(NSURLRequest *)
- (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTask {
//Remove from taskDictionary
[self.taskDictionary removeObjectForKey:@(transferUtilityUploadTask.taskIdentifier)];

AWSDDLogDebug(@"B>Remove task:%@ from taskDictionary, allKeys:%@", @(transferUtilityUploadTask.taskIdentifier), self.taskDictionary.allKeys);

//Remove from Database
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityUploadTask.transferID taskIdentifier:transferUtilityUploadTask.taskIdentifier databaseQueue:_databaseQueue ];

Expand Down Expand Up @@ -1242,13 +1254,17 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa
if(!subTaskCreationError) {
subTask.status = AWSS3TransferUtilityTransferStatusInProgress;
AWSDDLogDebug(@"Added task for part [%@] to inProgress list", subTask.partNumber);
} else {
AWSDDLogError(@"Create inProgress upload sub task failed:%@", subTaskCreationError);
}
}
else {
subTaskCreationError = [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:NO internalDictionaryToAddSubTaskTo:transferUtilityMultiPartUploadTask.waitingPartsDictionary];
if (!subTaskCreationError ) {
subTask.status = AWSS3TransferUtilityTransferStatusWaiting;
AWSDDLogDebug(@"Added task for part [%@] to Waiting list", subTask.partNumber);
} else {
AWSDDLogError(@"Create Waiting upload sub task failed:%@", subTaskCreationError);
}
}

Expand Down Expand Up @@ -1500,8 +1516,8 @@ -(NSError *) createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *) tra
}

//Register transferUtilityMultiPartUploadTask into the taskDictionary for easy lookup in the NSURLCallback
[self->_taskDictionary setObject:transferUtilityMultiPartUploadTask forKey:@(subTask.taskIdentifier)];

[self.taskDictionary setObject:transferUtilityMultiPartUploadTask forKey:@(subTask.taskIdentifier)];
AWSDDLogDebug(@"Register subTask %@ to master:%@", @(subTask.taskIdentifier), transferUtilityMultiPartUploadTask);
//Add to required internal dictionary
[internalDictionaryToAddSubTaskTo setObject:subTask forKey:@(subTask.taskIdentifier)];

Expand All @@ -1524,19 +1540,37 @@ -(NSError *) createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *) tra
return error;
}

// Add by Dart:
- (void)removeSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask {
id obj = [self.taskDictionary objectForKey:@(subTask.taskIdentifier)];
if ([obj isKindOfClass:AWSS3TransferUtilityUploadSubTask.class]) {
AWSDDLogDebug(@"C>Remove task:%@ from taskDictionary, allKeys:%@", @(subTask.taskIdentifier), self.taskDictionary.allKeys);
[self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)];
} else {
if (obj != nil) {
AWSDDLogDebug(@"taskDictionary has subTask taskIdentifier:%@, but not subTask:%@", @(subTask.taskIdentifier), obj);
}
}
}

-(void) retryUploadSubTask: (AWSS3TransferUtilityMultiPartUploadTask *) transferUtilityMultiPartUploadTask
subTask: (AWSS3TransferUtilityUploadSubTask *) subTask
startTransfer: (BOOL) startTransfer {

//Track if the task to be retried is in the waiting or inprogress list
BOOL inWaitingPartsDictionary = NO;

[self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)];
// Modify by Dart:
// [self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)];
[self removeSubTask:subTask];

if ([transferUtilityMultiPartUploadTask.inProgressPartsDictionary objectForKey:@(subTask.taskIdentifier)] ) {
AWSDDLogDebug(@"SubTask %@ already inProgress, remove it", @(subTask.taskIdentifier));
[transferUtilityMultiPartUploadTask.inProgressPartsDictionary removeObjectForKey:@(subTask.taskIdentifier)];
transferUtilityMultiPartUploadTask.retryCount = transferUtilityMultiPartUploadTask.retryCount + 1;
}
else if ([transferUtilityMultiPartUploadTask.waitingPartsDictionary objectForKey:@(subTask.taskIdentifier)] ) {
AWSDDLogDebug(@"SubTask %@ already waiting, remove it", @(subTask.taskIdentifier));
[transferUtilityMultiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(subTask.taskIdentifier)];
inWaitingPartsDictionary = YES;
}
Expand All @@ -1557,6 +1591,7 @@ -(void) retryUploadSubTask: (AWSS3TransferUtilityMultiPartUploadTask *) transfer
}

if ( subTaskCreationError ) {
AWSDDLogInfo(@"SubTask %@ create error:%@", @(subTask.taskIdentifier), subTaskCreationError);
//cancel the multipart transfer
[transferUtilityMultiPartUploadTask cancel];
transferUtilityMultiPartUploadTask.status = AWSS3TransferUtilityTransferStatusError;
Expand Down Expand Up @@ -1831,6 +1866,10 @@ - (AWSS3TransferUtilityTask *)findTransferUtilityTask:(NSURLSessionTask *)task {
return transferUtilityTask;
}

- (AWSS3TransferUtilityMultiPartUploadTask *)findMasterTask:(NSURLSessionTask *)task {
return nil;
}

- (AWSTask *)getAllTasks {
AWSTaskCompletionSource *completionSource = [AWSTaskCompletionSource new];

Expand Down Expand Up @@ -1954,7 +1993,6 @@ - (AWSS3TransferUtilityDownloadTask *)getDownloadTask:(NSURLSessionDownloadTask
}

#pragma mark - UIApplicationDelegate interceptor

+ (void)interceptApplication:(UIApplication *)application
handleEventsForBackgroundURLSession:(NSString *)identifier
completionHandler:(void (^)(void))completionHandler {
Expand All @@ -1975,7 +2013,6 @@ + (void)interceptApplication:(UIApplication *)application
}
}
}

#pragma mark - NSURLSessionDelegate

- (void)URLSessionDidFinishEventsForBackgroundURLSession:(NSURLSession *)session {
Expand Down Expand Up @@ -2056,6 +2093,12 @@ - (void)URLSession:(NSURLSession *)session
return;
}

// Modify by Dart:
if (uploadTask.stopped) {
AWSDDLogDebug(@"Stopped");
return ;
}

uploadTask.error = error;
if (error && HTTPResponse) {
if ([self isErrorRetriable:HTTPResponse.statusCode responseFromServer:uploadTask.responseData] ) {
Expand Down Expand Up @@ -2135,6 +2178,11 @@ - (void)URLSession:(NSURLSession *)session
return;
}

if (transferUtilityMultiPartUploadTask.stopped) {
AWSDDLogDebug(@"Stop task:%lu", (unsigned long)task.taskIdentifier);
return ;
}

//Check if there was an error.
if (error) {

Expand Down Expand Up @@ -2216,6 +2264,8 @@ - (void)URLSession:(NSURLSession *)session
//Remove it from the waitingList
[transferUtilityMultiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(nextSubTask.taskIdentifier)];
AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(nextSubTask.taskIdentifier), transferUtilityMultiPartUploadTask.uploadID);
[self.taskDictionary setObject:nextSubTask forKey:@(nextSubTask.taskIdentifier)];
AWSDDLogDebug(@"Register Task[%@] to taskDictionary", @(nextSubTask.taskIdentifier));
[nextSubTask.sessionTask resume];
numberOfPartsInProgress++;
continue;
Expand Down Expand Up @@ -2428,6 +2478,7 @@ - (void)completeTask:(AWSS3TransferUtilityTask *)task removeCompletedTask:(BOOL)
// complete task before removing from dictionaries
[self.completedTaskDictionary removeObjectForKey:task.transferID];
[self.taskDictionary removeObjectForKey:@(task.taskIdentifier)];
AWSDDLogDebug(@"D>Remove task:%@ from taskDictionary, allKeys:%@", @(task.taskIdentifier), self.taskDictionary.allKeys);
}

}
Expand All @@ -2440,6 +2491,7 @@ - (void) cleanupForMultiPartUploadTask: (AWSS3TransferUtilityMultiPartUploadTask
//Remove all entries from taskDictionary.
for ( AWSS3TransferUtilityUploadSubTask *subTask in [task.inProgressPartsDictionary allValues] ) {
[self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)];
AWSDDLogDebug(@"E>Remove task:%@ from taskDictionary, allKeys:%@", @(subTask.taskIdentifier), self.taskDictionary.allKeys);
[self removeFile:subTask.file];
}

Expand All @@ -2458,7 +2510,8 @@ - (void) cleanupForUploadTask: (AWSS3TransferUtilityUploadTask *) uploadTask {

//Remove entry from taskDictionary
[self.taskDictionary removeObjectForKey:@(uploadTask.taskIdentifier)];

AWSDDLogDebug(@"F>Remove task:%@ from taskDictionary, allKeys:%@", @(uploadTask.taskIdentifier), self.taskDictionary.allKeys);

//Remove temporary file if required.
if (uploadTask.temporaryFileCreated) {
[self removeFile:uploadTask.file];
Expand Down
3 changes: 3 additions & 0 deletions AWSS3/AWSS3TransferUtilityTasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ typedef void (^AWSS3TransferUtilityMultiPartProgressBlock) (AWSS3TransferUtility
*/
- (void)cancel;

// Modify by Dart: it will cancel NSURLSessionTask, but not remove task record from DB, in order to be recover later
- (void)stop;

/**
Resumes the task, if it is suspended.
*/
Expand Down
55 changes: 53 additions & 2 deletions AWSS3/AWSS3TransferUtilityTasks.m
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ -(void) cancel {
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:self.transferID databaseQueue:self.databaseQueue];
}

// Add by Dart:
- (void)stop {
if (self.status != AWSS3TransferUtilityTransferStatusInProgress ) {
// only stop inProgress task
return;
}

self.stopped = YES;
[self.sessionTask cancel];
self.status = AWSS3TransferUtilityTransferStatusPaused;
[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID
partNumber:@0
taskIdentifier:self.taskIdentifier
eTag:@""
status:self.status
retry_count:self.retryCount
databaseQueue:self.databaseQueue];
}

-(void) setCompletionHandler:(AWSS3TransferUtilityUploadCompletionHandlerBlock)completionHandler {

self.expression.completionHandler = completionHandler;
Expand Down Expand Up @@ -155,13 +174,45 @@ - (void)cancel {
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:self.transferID databaseQueue:self.databaseQueue];
}

// Add by Dart:
- (void)stop {
if (self.status != AWSS3TransferUtilityTransferStatusInProgress) {
// only stop inProgress task
return;
}

self.stopped = YES;
for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];

[subTask.sessionTask cancel];
subTask.status = AWSS3TransferUtilityTransferStatusPaused;

[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID
partNumber:subTask.partNumber
taskIdentifier:subTask.taskIdentifier
eTag:subTask.eTag
status:subTask.status
retry_count:self.retryCount
databaseQueue:self.databaseQueue];
}
self.status = AWSS3TransferUtilityTransferStatusPaused;
//Update the Master Record
[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID
partNumber:@0
taskIdentifier:0
eTag:@""
status:self.status
retry_count:self.retryCount
databaseQueue:self.databaseQueue];}

- (void)resume {
if (self.status != AWSS3TransferUtilityTransferStatusPaused ) {
//Resume called on a transfer that hasn't been paused. No op.
return;
}

for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
AWSDDLogDebug(@"Resume %@ inProgress sub task:%@", self.uploadID, key);
AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];
subTask.status = AWSS3TransferUtilityTransferStatusInProgress;
[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID
Expand Down Expand Up @@ -189,9 +240,9 @@ - (void)suspend {
//Pause called on a transfer that is not in progresss. No op.
return;
}

for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];

[subTask.sessionTask suspend];
subTask.status = AWSS3TransferUtilityTransferStatusPaused;

Expand Down
Loading