Skip to content

Commit

Permalink
fix incorrect recover
Browse files Browse the repository at this point in the history
1. fix fix incorrect recover of multi-part upload
2. add stop method to task, it can pause current upload and recover later
  • Loading branch information
dawen on M2 committed Apr 19, 2024
1 parent 2d643a5 commit d7ba6f8
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 16 deletions.
81 changes: 67 additions & 14 deletions AWSS3/AWSS3TransferUtility.m
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ - (instancetype)initWithConfiguration:(AWSServiceConfiguration *)serviceConfigur
}

//Create the NS URL session
NSURLSessionConfiguration *configuration = [NSURLSessionConfiguration backgroundSessionConfigurationWithIdentifier:_sessionIdentifier];
// Modify by Dark: don't use backgrouond mode, it will make timeout not working
NSURLSessionConfiguration *configuration = [NSURLSessionConfiguration defaultSessionConfiguration];
configuration.allowsCellularAccess = serviceConfiguration.allowsCellularAccess;
configuration.timeoutIntervalForResource = transferUtilityConfiguration.timeoutIntervalForResource;

Expand Down Expand Up @@ -410,12 +411,17 @@ - (void) hydrateFromDB:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary
AWSDDLogDebug(@"Found MultiPartUpload [%@] with Multipart ID [%@] and status [%@]",transferUtilityMultiPartUploadTask.transferID,transferUtilityMultiPartUploadTask.uploadID, @(transferUtilityMultiPartUploadTask.status) );
}
else if ([transferType isEqualToString:@"MULTI_PART_UPLOAD_SUB_TASK"]) {
AWSS3TransferUtilityUploadSubTask *subTask = [self hydrateMultiPartUploadSubTask:task sessionTaskID:sessionTaskID];
AWSDDLogDebug(@"Found MultiPartUpload SubTask [%@] with taskNumber [%@] and status [%@]",subTask.transferID,@(subTask.taskIdentifier), @(subTask.status) );
// Modify by Dart: new created subTask couldn't assign taskIdentifier,
// otherwise it will make a mistake when removing task from self.taskDictionary
// coz when subTask associate with a real NSURLSessionTask, it's taskIdentifier will be duplicated by an existing taskIdentifier recovered from DB.
// AWSS3TransferUtilityUploadSubTask *subTask = [self hydrateMultiPartUploadSubTask:task sessionTaskID:sessionTaskID];
AWSS3TransferUtilityUploadSubTask *subTask = [self hydrateMultiPartUploadSubTask:task sessionTaskID:0];
AWSDDLogDebug(@"Found MultiPartUpload SubTask [%@] with taskNumber [%@] and status [%@]",subTask.transferID, @(sessionTaskID), @(subTask.status) );

//Get the Master MultiPart record from the Dictionary.
AWSS3TransferUtilityMultiPartUploadTask *multiPartUploadTask = [tempMultiPartMasterTaskDictionary objectForKey:subTask.uploadID];
if ( !multiPartUploadTask ) {
AWSDDLogError(@"Couldn't find the multipart upload master record by uploadID:%@", subTask.uploadID);
//Couldn't find the multipart upload master record. Must be an orphan part record. Clean up the DB and continue.
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:subTask.transferID databaseQueue:self->_databaseQueue];
continue;
Expand Down Expand Up @@ -546,6 +552,8 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster
//Handle any stragglers.
[self handleUnlinkedTransfers:tempMultiPartMasterTaskDictionary tempTransferDictionary:tempTransferDictionary];

AWSDDLogDebug(@"Recover result:%@", [self.taskDictionary allKeys]);

//Call completion handler if one was provided.
if (completionHandler) {
completionHandler(nil);
Expand All @@ -568,6 +576,7 @@ - (void) markTransferAsCompleted:(AWSS3TransferUtilityTask *) transferUtilityTas
}
[self.completedTaskDictionary setObject:transferUtilityTask forKey:transferUtilityTask.transferID];
[self.taskDictionary removeObjectForKey:@(transferUtilityTask.taskIdentifier)];
AWSDDLogDebug(@"A>Remove task:%@ from taskDictionary, allKeys:%@", @(transferUtilityTask.taskIdentifier), self.taskDictionary.allKeys);
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityTask.transferID databaseQueue:self->_databaseQueue];
}

Expand Down Expand Up @@ -610,8 +619,9 @@ - (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTask
AWSS3TransferUtilityUploadSubTask *subTask = obj;
AWSS3TransferUtilityMultiPartUploadTask *multiPartUploadTask = [tempMultiPartMasterTaskDictionary objectForKey:subTask.uploadID];
[self retryUploadSubTask: multiPartUploadTask subTask:subTask startTransfer:NO];
subTask.status = AWSS3TransferUtilityTransferStatusWaiting;
[multiPartUploadTask.waitingPartsDictionary setObject:subTask forKey:@(subTask.taskIdentifier)];
// Modify by Dart: [retryUploadSubTask] has set the appropriate dictionary (Waiting or inProgress)
// subTask.status = AWSS3TransferUtilityTransferStatusWaiting;
// [multiPartUploadTask.waitingPartsDictionary setObject:subTask forKey:@(subTask.taskIdentifier)];
}
else if ([obj isKindOfClass:[AWSS3TransferUtilityDownloadTask class]]) {

Expand Down Expand Up @@ -646,10 +656,10 @@ - (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTask

AWSS3TransferUtilityMultiPartUploadTask *multiPartUploadTask = [tempMultiPartMasterTaskDictionary objectForKey:uploadID];
[self.taskDictionary setObject:multiPartUploadTask forKey:multiPartUploadTask.uploadID];

AWSDDLogDebug(@"Multipart transfer status is [%@]", @(multiPartUploadTask.status));

if (multiPartUploadTask.status == AWSS3TransferUtilityTransferStatusPaused) {
AWSDDLogDebug(@"Skipped for paused status");
continue;
}

Expand All @@ -665,8 +675,9 @@ - (void) handleUnlinkedTransfers:(NSMutableDictionary *) tempMultiPartMasterTask
//Remove it from the waitingList
[multiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(nextSubTask.taskIdentifier)];
AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(nextSubTask.taskIdentifier), multiPartUploadTask.uploadID);

[nextSubTask.sessionTask resume];

numberOfPartsInProgress++;
continue;
}
Expand Down Expand Up @@ -1013,7 +1024,8 @@ - (NSURLSessionUploadTask *)getURLSessionUploadTaskWithRequest:(NSURLRequest *)
- (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTask {
//Remove from taskDictionary
[self.taskDictionary removeObjectForKey:@(transferUtilityUploadTask.taskIdentifier)];

AWSDDLogDebug(@"B>Remove task:%@ from taskDictionary, allKeys:%@", @(transferUtilityUploadTask.taskIdentifier), self.taskDictionary.allKeys);

//Remove from Database
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:transferUtilityUploadTask.transferID taskIdentifier:transferUtilityUploadTask.taskIdentifier databaseQueue:_databaseQueue ];

Expand Down Expand Up @@ -1242,13 +1254,17 @@ - (void) retryUpload: (AWSS3TransferUtilityUploadTask *) transferUtilityUploadTa
if(!subTaskCreationError) {
subTask.status = AWSS3TransferUtilityTransferStatusInProgress;
AWSDDLogDebug(@"Added task for part [%@] to inProgress list", subTask.partNumber);
} else {
AWSDDLogError(@"Create inProgress upload sub task failed:%@", subTaskCreationError);
}
}
else {
subTaskCreationError = [self createUploadSubTask:transferUtilityMultiPartUploadTask subTask:subTask startTransfer:NO internalDictionaryToAddSubTaskTo:transferUtilityMultiPartUploadTask.waitingPartsDictionary];
if (!subTaskCreationError ) {
subTask.status = AWSS3TransferUtilityTransferStatusWaiting;
AWSDDLogDebug(@"Added task for part [%@] to Waiting list", subTask.partNumber);
} else {
AWSDDLogError(@"Create Waiting upload sub task failed:%@", subTaskCreationError);
}
}

Expand Down Expand Up @@ -1500,8 +1516,8 @@ -(NSError *) createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *) tra
}

//Register transferUtilityMultiPartUploadTask into the taskDictionary for easy lookup in the NSURLCallback
[self->_taskDictionary setObject:transferUtilityMultiPartUploadTask forKey:@(subTask.taskIdentifier)];

[self.taskDictionary setObject:transferUtilityMultiPartUploadTask forKey:@(subTask.taskIdentifier)];
AWSDDLogDebug(@"Register subTask %@ to master:%@", @(subTask.taskIdentifier), transferUtilityMultiPartUploadTask);
//Add to required internal dictionary
[internalDictionaryToAddSubTaskTo setObject:subTask forKey:@(subTask.taskIdentifier)];

Expand All @@ -1524,19 +1540,37 @@ -(NSError *) createUploadSubTask:(AWSS3TransferUtilityMultiPartUploadTask *) tra
return error;
}

// Add by Dart:
- (void)removeSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask {
id obj = [self.taskDictionary objectForKey:@(subTask.taskIdentifier)];
if ([obj isKindOfClass:AWSS3TransferUtilityUploadSubTask.class]) {
AWSDDLogDebug(@"C>Remove task:%@ from taskDictionary, allKeys:%@", @(subTask.taskIdentifier), self.taskDictionary.allKeys);
[self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)];
} else {
if (obj != nil) {
AWSDDLogDebug(@"taskDictionary has subTask taskIdentifier:%@, but not subTask:%@", @(subTask.taskIdentifier), obj);
}
}
}

-(void) retryUploadSubTask: (AWSS3TransferUtilityMultiPartUploadTask *) transferUtilityMultiPartUploadTask
subTask: (AWSS3TransferUtilityUploadSubTask *) subTask
startTransfer: (BOOL) startTransfer {

//Track if the task to be retried is in the waiting or inprogress list
BOOL inWaitingPartsDictionary = NO;

[self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)];
// Modify by Dart:
// [self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)];
[self removeSubTask:subTask];

if ([transferUtilityMultiPartUploadTask.inProgressPartsDictionary objectForKey:@(subTask.taskIdentifier)] ) {
AWSDDLogDebug(@"SubTask %@ already inProgress, remove it", @(subTask.taskIdentifier));
[transferUtilityMultiPartUploadTask.inProgressPartsDictionary removeObjectForKey:@(subTask.taskIdentifier)];
transferUtilityMultiPartUploadTask.retryCount = transferUtilityMultiPartUploadTask.retryCount + 1;
}
else if ([transferUtilityMultiPartUploadTask.waitingPartsDictionary objectForKey:@(subTask.taskIdentifier)] ) {
AWSDDLogDebug(@"SubTask %@ already waiting, remove it", @(subTask.taskIdentifier));
[transferUtilityMultiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(subTask.taskIdentifier)];
inWaitingPartsDictionary = YES;
}
Expand All @@ -1557,6 +1591,7 @@ -(void) retryUploadSubTask: (AWSS3TransferUtilityMultiPartUploadTask *) transfer
}

if ( subTaskCreationError ) {
AWSDDLogInfo(@"SubTask %@ create error:%@", @(subTask.taskIdentifier), subTaskCreationError);
//cancel the multipart transfer
[transferUtilityMultiPartUploadTask cancel];
transferUtilityMultiPartUploadTask.status = AWSS3TransferUtilityTransferStatusError;
Expand Down Expand Up @@ -1831,6 +1866,10 @@ - (AWSS3TransferUtilityTask *)findTransferUtilityTask:(NSURLSessionTask *)task {
return transferUtilityTask;
}

- (AWSS3TransferUtilityMultiPartUploadTask *)findMasterTask:(NSURLSessionTask *)task {
return nil;
}

- (AWSTask *)getAllTasks {
AWSTaskCompletionSource *completionSource = [AWSTaskCompletionSource new];

Expand Down Expand Up @@ -1954,7 +1993,6 @@ - (AWSS3TransferUtilityDownloadTask *)getDownloadTask:(NSURLSessionDownloadTask
}

#pragma mark - UIApplicationDelegate interceptor

+ (void)interceptApplication:(UIApplication *)application
handleEventsForBackgroundURLSession:(NSString *)identifier
completionHandler:(void (^)(void))completionHandler {
Expand All @@ -1975,7 +2013,6 @@ + (void)interceptApplication:(UIApplication *)application
}
}
}

#pragma mark - NSURLSessionDelegate

- (void)URLSessionDidFinishEventsForBackgroundURLSession:(NSURLSession *)session {
Expand Down Expand Up @@ -2056,6 +2093,12 @@ - (void)URLSession:(NSURLSession *)session
return;
}

// Modify by Dart:
if (uploadTask.stopped) {
AWSDDLogDebug(@"Stopped");
return ;
}

uploadTask.error = error;
if (error && HTTPResponse) {
if ([self isErrorRetriable:HTTPResponse.statusCode responseFromServer:uploadTask.responseData] ) {
Expand Down Expand Up @@ -2135,6 +2178,11 @@ - (void)URLSession:(NSURLSession *)session
return;
}

if (transferUtilityMultiPartUploadTask.stopped) {
AWSDDLogDebug(@"Stop task:%lu", (unsigned long)task.taskIdentifier);
return ;
}

//Check if there was an error.
if (error) {

Expand Down Expand Up @@ -2216,6 +2264,8 @@ - (void)URLSession:(NSURLSession *)session
//Remove it from the waitingList
[transferUtilityMultiPartUploadTask.waitingPartsDictionary removeObjectForKey:@(nextSubTask.taskIdentifier)];
AWSDDLogDebug(@"Moving Task[%@] to progress for Multipart[%@]", @(nextSubTask.taskIdentifier), transferUtilityMultiPartUploadTask.uploadID);
[self.taskDictionary setObject:nextSubTask forKey:@(nextSubTask.taskIdentifier)];
AWSDDLogDebug(@"Register Task[%@] to taskDictionary", @(nextSubTask.taskIdentifier));
[nextSubTask.sessionTask resume];
numberOfPartsInProgress++;
continue;
Expand Down Expand Up @@ -2428,6 +2478,7 @@ - (void)completeTask:(AWSS3TransferUtilityTask *)task removeCompletedTask:(BOOL)
// complete task before removing from dictionaries
[self.completedTaskDictionary removeObjectForKey:task.transferID];
[self.taskDictionary removeObjectForKey:@(task.taskIdentifier)];
AWSDDLogDebug(@"D>Remove task:%@ from taskDictionary, allKeys:%@", @(task.taskIdentifier), self.taskDictionary.allKeys);
}

}
Expand All @@ -2440,6 +2491,7 @@ - (void) cleanupForMultiPartUploadTask: (AWSS3TransferUtilityMultiPartUploadTask
//Remove all entries from taskDictionary.
for ( AWSS3TransferUtilityUploadSubTask *subTask in [task.inProgressPartsDictionary allValues] ) {
[self.taskDictionary removeObjectForKey:@(subTask.taskIdentifier)];
AWSDDLogDebug(@"E>Remove task:%@ from taskDictionary, allKeys:%@", @(subTask.taskIdentifier), self.taskDictionary.allKeys);
[self removeFile:subTask.file];
}

Expand All @@ -2458,7 +2510,8 @@ - (void) cleanupForUploadTask: (AWSS3TransferUtilityUploadTask *) uploadTask {

//Remove entry from taskDictionary
[self.taskDictionary removeObjectForKey:@(uploadTask.taskIdentifier)];

AWSDDLogDebug(@"F>Remove task:%@ from taskDictionary, allKeys:%@", @(uploadTask.taskIdentifier), self.taskDictionary.allKeys);

//Remove temporary file if required.
if (uploadTask.temporaryFileCreated) {
[self removeFile:uploadTask.file];
Expand Down
3 changes: 3 additions & 0 deletions AWSS3/AWSS3TransferUtilityTasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ typedef void (^AWSS3TransferUtilityMultiPartProgressBlock) (AWSS3TransferUtility
*/
- (void)cancel;

// Modify by Dart: it will cancel NSURLSessionTask, but not remove task record from DB, in order to be recover later
- (void)stop;

/**
Resumes the task, if it is suspended.
*/
Expand Down
55 changes: 53 additions & 2 deletions AWSS3/AWSS3TransferUtilityTasks.m
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ -(void) cancel {
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:self.transferID databaseQueue:self.databaseQueue];
}

// Add by Dart:
- (void)stop {
if (self.status != AWSS3TransferUtilityTransferStatusInProgress ) {
// only stop inProgress task
return;
}

self.stopped = YES;
[self.sessionTask cancel];
self.status = AWSS3TransferUtilityTransferStatusPaused;
[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID
partNumber:@0
taskIdentifier:self.taskIdentifier
eTag:@""
status:self.status
retry_count:self.retryCount
databaseQueue:self.databaseQueue];
}

-(void) setCompletionHandler:(AWSS3TransferUtilityUploadCompletionHandlerBlock)completionHandler {

self.expression.completionHandler = completionHandler;
Expand Down Expand Up @@ -155,13 +174,45 @@ - (void)cancel {
[AWSS3TransferUtilityDatabaseHelper deleteTransferRequestFromDB:self.transferID databaseQueue:self.databaseQueue];
}

// Add by Dart:
- (void)stop {
if (self.status != AWSS3TransferUtilityTransferStatusInProgress) {
// only stop inProgress task
return;
}

self.stopped = YES;
for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];

[subTask.sessionTask cancel];
subTask.status = AWSS3TransferUtilityTransferStatusPaused;

[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID
partNumber:subTask.partNumber
taskIdentifier:subTask.taskIdentifier
eTag:subTask.eTag
status:subTask.status
retry_count:self.retryCount
databaseQueue:self.databaseQueue];
}
self.status = AWSS3TransferUtilityTransferStatusPaused;
//Update the Master Record
[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:self.transferID
partNumber:@0
taskIdentifier:0
eTag:@""
status:self.status
retry_count:self.retryCount
databaseQueue:self.databaseQueue];}

- (void)resume {
if (self.status != AWSS3TransferUtilityTransferStatusPaused ) {
//Resume called on a transfer that hasn't been paused. No op.
return;
}

for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
AWSDDLogDebug(@"Resume %@ inProgress sub task:%@", self.uploadID, key);
AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];
subTask.status = AWSS3TransferUtilityTransferStatusInProgress;
[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:subTask.transferID
Expand Down Expand Up @@ -189,9 +240,9 @@ - (void)suspend {
//Pause called on a transfer that is not in progresss. No op.
return;
}

for (NSNumber *key in [self.inProgressPartsDictionary allKeys]) {
AWSS3TransferUtilityUploadSubTask *subTask = [self.inProgressPartsDictionary objectForKey:key];

[subTask.sessionTask suspend];
subTask.status = AWSS3TransferUtilityTransferStatusPaused;

Expand Down
Loading

0 comments on commit d7ba6f8

Please sign in to comment.