diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 6794add..9f64a4a 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -11,11 +11,10 @@ on: jobs: build: - runs-on: macos-latest - steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + - uses: swift-actions/setup-swift@v1 - name: Build run: swift build -v - name: Run tests diff --git a/Package.resolved b/Package.resolved new file mode 100644 index 0000000..b1ca9e7 --- /dev/null +++ b/Package.resolved @@ -0,0 +1,23 @@ +{ + "pins" : [ + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "d029d9d39c87bed85b1c50adee7c41795261a192", + "version" : "1.0.6" + } + }, + { + "identity" : "swift-pid", + "kind" : "remoteSourceControl", + "location" : "https://github.com/ryanashcraft/swift-pid.git", + "state" : { + "revision" : "3f524ecc12bd519f27cbbc73b986be4d60351e91", + "version" : "0.0.3" + } + } + ], + "version" : 2 +} diff --git a/Package.swift b/Package.swift index b58fab5..ee970c0 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version: 5.6 +// swift-tools-version: 5.9 // The swift-tools-version declares the minimum version of Swift required to build this package. import PackageDescription @@ -16,11 +16,18 @@ let package = Package( targets: ["CloudSyncSession"] ), ], - dependencies: [], + dependencies: [ + .package( + url: "https://github.com/ryanashcraft/swift-pid.git", + .upToNextMajor(from: "0.0.1") + ), + ], targets: [ .target( name: "CloudSyncSession", - dependencies: [] + dependencies: [ + .product(name: "PID", package: "swift-pid"), + ] ), .testTarget( name: "CloudSyncSessionTests", diff --git a/Sources/CloudSyncSession/CloudKitOperationHandler.swift b/Sources/CloudSyncSession/CloudKitOperationHandler.swift index e70c4d2..e9f5a97 100644 --- a/Sources/CloudSyncSession/CloudKitOperationHandler.swift +++ b/Sources/CloudSyncSession/CloudKitOperationHandler.swift @@ -23,10 +23,11 @@ import CloudKit import os.log +import PID /// An object that handles all of the key operations (fetch, modify, create zone, and create subscription) using the standard CloudKit APIs. public class CloudKitOperationHandler: OperationHandler { - static let minThrottleDuration: TimeInterval = 2 + static let minThrottleDuration: TimeInterval = 1 static let maxThrottleDuration: TimeInterval = 60 * 10 let database: CKDatabase @@ -35,6 +36,15 @@ public class CloudKitOperationHandler: OperationHandler { let log: OSLog let savePolicy: CKModifyRecordsOperation.RecordSavePolicy = .ifServerRecordUnchanged let qos: QualityOfService = .userInitiated + var rateLimitController = RateLimitPIDController( + kp: 2, + ki: 0.05, + kd: 0.02, + errorWindowSize: 20, + targetSuccessRate: 0.96, + initialRateLimit: 2, + outcomeWindowSize: 1 + ) private let operationQueue: OperationQueue = { let queue = OperationQueue() @@ -45,9 +55,11 @@ public class CloudKitOperationHandler: OperationHandler { var throttleDuration: TimeInterval { didSet { + nextOperationDeadline = DispatchTime.now() + throttleDuration + if throttleDuration > oldValue { os_log( - "Increasing throttle duration from %{public}.0f seconds to %{public}.0f seconds", + "Increasing throttle duration from %{public}.1f seconds to %{public}.1f seconds", log: log, type: .default, oldValue, @@ -55,7 +67,7 @@ public class CloudKitOperationHandler: OperationHandler { ) } else if throttleDuration < oldValue { os_log( - "Decreasing throttle duration from %{public}.0f seconds to %{public}.0f seconds", + "Decreasing throttle duration from %{public}.1f seconds to %{public}.1f seconds", log: log, type: .default, oldValue, @@ -65,23 +77,45 @@ public class CloudKitOperationHandler: OperationHandler { } } - var lastOperationTime: DispatchTime? + var nextOperationDeadline: DispatchTime? public init(database: CKDatabase, zoneID: CKRecordZone.ID, subscriptionID: String, log: OSLog) { self.database = database self.zoneID = zoneID self.subscriptionID = subscriptionID self.log = log - throttleDuration = Self.minThrottleDuration + throttleDuration = rateLimitController.rateLimit } private func queueOperation(_ operation: Operation) { - let deadline: DispatchTime = (lastOperationTime ?? DispatchTime.now()) + throttleDuration + let deadline: DispatchTime = nextOperationDeadline ?? DispatchTime.now() DispatchQueue.main.asyncAfter(deadline: deadline) { self.operationQueue.addOperation(operation) - self.operationQueue.addOperation { - self.lastOperationTime = DispatchTime.now() + } + } + + private func onOperationSuccess() { + rateLimitController.record(outcome: .success) + throttleDuration = min(Self.maxThrottleDuration, max(Self.minThrottleDuration, rateLimitController.rateLimit)) + } + + private func onOperationError(_ error: Error) { + if let ckError = error as? CKError { + rateLimitController.record(outcome: ckError.indicatesShouldBackoff ? .failure : .success) + + if let suggestedBackoffSeconds = ckError.suggestedBackoffSeconds { + os_log( + "CloudKit error suggests retrying after %{public}.1f seconds", + log: log, + type: .default, + suggestedBackoffSeconds + ) + + // Respect the amount suggested for the next operation + throttleDuration = suggestedBackoffSeconds + } else { + throttleDuration = min(Self.maxThrottleDuration, max(Self.minThrottleDuration, rateLimitController.rateLimit)) } } } @@ -106,15 +140,11 @@ public class CloudKitOperationHandler: OperationHandler { operation.modifyRecordsCompletionBlock = { serverRecords, deletedRecordIDs, error in if let error = error { - if let ckError = error as? CKError { - // Use the suggested retry delay, or exponentially increase throttle duration if not provided - self.throttleDuration = min(Self.maxThrottleDuration, ckError.retryAfterSeconds ?? (self.throttleDuration * 2)) - } + self.onOperationError(error) completion(.failure(error)) } else { - // On success, back off of the throttle duration by 66%. Backing off too quickly can result in thrashing. - self.throttleDuration = max(Self.minThrottleDuration, self.throttleDuration * 2 / 3) + self.onOperationSuccess() completion(.success(ModifyOperation.Response(savedRecords: serverRecords ?? [], deletedRecordIDs: deletedRecordIDs ?? []))) } @@ -199,17 +229,13 @@ public class CloudKitOperationHandler: OperationHandler { String(describing: error) ) - if let ckError = error as? CKError { - // Use the suggested retry delay, or exponentially increase throttle duration if not provided - self.throttleDuration = min(Self.maxThrottleDuration, ckError.retryAfterSeconds ?? (self.throttleDuration * 2)) - } + onOperationError(error) completion(.failure(error)) } else { os_log("Finished fetching record zone changes", log: self.log, type: .info) - // On success, back off of the throttle duration by 66%. Backing off too quickly can result in thrashing. - self.throttleDuration = max(Self.minThrottleDuration, self.throttleDuration * 2 / 3) + onOperationSuccess() completion( .success( diff --git a/Sources/CloudSyncSession/CloudSyncSession.swift b/Sources/CloudSyncSession/CloudSyncSession.swift index 6527512..18a5e08 100644 --- a/Sources/CloudSyncSession/CloudSyncSession.swift +++ b/Sources/CloudSyncSession/CloudSyncSession.swift @@ -108,7 +108,7 @@ public class CloudSyncSession { dispatch(event: .doWork(.modify(operation))) } - internal func dispatch(event: SyncEvent) { + func dispatch(event: SyncEvent) { dispatchQueue.async { func next(event: SyncEvent, middlewaresToRun: [AnyMiddleware]) -> SyncEvent { self.eventsPublisher.send(event) diff --git a/Sources/CloudSyncSession/Extensions/CKErrorExtensions.swift b/Sources/CloudSyncSession/Extensions/CKErrorExtensions.swift new file mode 100644 index 0000000..457adfe --- /dev/null +++ b/Sources/CloudSyncSession/Extensions/CKErrorExtensions.swift @@ -0,0 +1,38 @@ +import CloudKit + +extension CKError { + var suggestedBackoffSeconds: TimeInterval? { + if let retryAfterSeconds { + return retryAfterSeconds + } + + return partialErrorsByItemID? + .values + .compactMap { ($0 as? CKError)?.retryAfterSeconds } + .max() + } + + var indicatesShouldBackoff: Bool { + if retryAfterSeconds != nil { + return true + } + + switch self.code { + case .serviceUnavailable, + .zoneBusy, + .requestRateLimited: + return true + case .partialFailure: + guard let partialErrorsByRecordID = self.partialErrorsByItemID as? [CKRecord.ID: Error] else { + return false + } + + let partialErrors = partialErrorsByRecordID.compactMap { $0.value as? CKError } + let allErrorsAreRetryable = partialErrors.allSatisfy(\.indicatesShouldBackoff) + + return allErrorsAreRetryable + default: + return false + } + } +} diff --git a/Sources/CloudSyncSession/Middleware/ErrorMiddleware.swift b/Sources/CloudSyncSession/Middleware/ErrorMiddleware.swift index 59263f3..de00d3e 100644 --- a/Sources/CloudSyncSession/Middleware/ErrorMiddleware.swift +++ b/Sources/CloudSyncSession/Middleware/ErrorMiddleware.swift @@ -118,17 +118,20 @@ struct ErrorMiddleware: Middleware { case let .modify(operation): // Supported modify partial failures: batchRequestFailed and serverRecordChanged - guard let partialErrors = ckError.partialErrorsByItemID as? [CKRecord.ID: Error] else { + guard let partialErrorsByRecordID = ckError.partialErrorsByItemID as? [CKRecord.ID: Error] else { return .halt(error) } - let recordIDsNotSavedOrDeleted = Set(partialErrors.keys) + let recordIDsNotSavedOrDeleted = Set(partialErrorsByRecordID.keys) + + let partialErrors = partialErrorsByRecordID.compactMap { $0.value as? CKError } + + if ckError.indicatesShouldBackoff { + return .retry(work, error, ckError.suggestedBackoffSeconds) + } let unhandleableErrorsByItemID = partialErrors - .filter { _, error in - guard let error = error as? CKError else { - return true - } + .filter { error in switch error.code { case .batchRequestFailed, .serverRecordChanged, .unknownItem: @@ -145,7 +148,7 @@ struct ErrorMiddleware: Middleware { // All IDs for records that are unknown by the container (probably deleted by another client) let unknownItemRecordIDs = Set( - partialErrors + partialErrorsByRecordID .filter { _, error in if let error = error as? CKError, error.code == .unknownItem { return true @@ -158,7 +161,7 @@ struct ErrorMiddleware: Middleware { // All IDs for records that failed to be modified due to some other error in the batch modify operation let batchRequestFailedRecordIDs = Set( - partialErrors + partialErrorsByRecordID .filter { _, error in if let error = error as? CKError, error.code == .batchRequestFailed { return true @@ -171,14 +174,13 @@ struct ErrorMiddleware: Middleware { // All errors for records that failed because there was a conflict let serverRecordChangedErrors = partialErrors - .filter { _, error in - if let error = error as? CKError, error.code == .serverRecordChanged { + .filter { error in + if error.code == .serverRecordChanged { return true } return false } - .values // Resolved records let resolvedConflictsToSave = serverRecordChangedErrors @@ -330,7 +332,7 @@ struct ErrorMiddleware: Middleware { } } -internal extension CKRecord { +extension CKRecord { func removeAllFields() { let encryptedKeys = Set(encryptedValues.allKeys()) diff --git a/Sources/CloudSyncSession/SyncState.swift b/Sources/CloudSyncSession/SyncState.swift index c87d5fc..5db9d5c 100644 --- a/Sources/CloudSyncSession/SyncState.swift +++ b/Sources/CloudSyncSession/SyncState.swift @@ -11,16 +11,16 @@ public struct SyncState { } /// The queue of modification requests to be handled. - internal var modifyQueue = [ModifyOperation]() + var modifyQueue = [ModifyOperation]() /// The queue of fetch requests to be handled. - internal var fetchQueue = [FetchOperation]() + var fetchQueue = [FetchOperation]() /// The queue of create zone requests to be handled. - internal var createZoneQueue = [CreateZoneOperation]() + var createZoneQueue = [CreateZoneOperation]() /// The queue of create subscription requests to be handled. - internal var createSubscriptionQueue = [CreateSubscriptionOperation]() + var createSubscriptionQueue = [CreateSubscriptionOperation]() /// Indicates whether the CloudKit status is available. The value is nil if the account status is yet to be deteremined. public var hasGoodAccountStatus: Bool? = nil @@ -68,7 +68,7 @@ public struct SyncState { } /// Indicates what kind of work is allowed at this time. - internal var allowedOperationModes: Set { + var allowedOperationModes: Set { var allowedModes: Set = [nil] if isHalted { @@ -91,7 +91,7 @@ public struct SyncState { } /// An ordered list of the the kind of work that is allowed at this time. - internal var preferredOperationModes: [OperationMode?] { + var preferredOperationModes: [OperationMode?] { [.createZone, .createSubscription, .modify, .fetch, nil] .filter { allowedOperationModes.contains($0) } .filter { mode in @@ -128,7 +128,7 @@ public struct SyncState { } /// The current work that is, or is to be, worked on. - internal var currentWork: SyncWork? { + var currentWork: SyncWork? { guard allowedOperationModes.contains(operationMode) else { return nil } @@ -158,7 +158,7 @@ public struct SyncState { } /// Transition to a new operation mode (i.e. fetching, modifying creating a zone or subscription) - internal mutating func updateOperationMode() { + mutating func updateOperationMode() { if isHalted { operationMode = nil } @@ -169,7 +169,7 @@ public struct SyncState { } /// Add work to the end of the appropriate queue - internal mutating func pushWork(_ work: SyncWork) { + mutating func pushWork(_ work: SyncWork) { switch work { case let .fetch(operation): fetchQueue.append(operation) @@ -183,7 +183,7 @@ public struct SyncState { } /// Add work to the beginning of the appropriate queue. - internal mutating func prioritizeWork(_ work: SyncWork) { + mutating func prioritizeWork(_ work: SyncWork) { switch work { case let .fetch(operation): fetchQueue = [operation] + fetchQueue @@ -197,7 +197,7 @@ public struct SyncState { } /// Remove work from the corresponding queue. - internal mutating func popWork(work: SyncWork) { + mutating func popWork(work: SyncWork) { switch work { case let .fetch(operation): fetchQueue = fetchQueue.filter { $0.id != operation.id } @@ -211,7 +211,7 @@ public struct SyncState { } /// Update based on a sync event. - internal func reduce(event: SyncEvent) -> SyncState { + func reduce(event: SyncEvent) -> SyncState { var state = self switch event {