Skip to content

Commit

Permalink
Improve rate limiting behavior (#6)
Browse files Browse the repository at this point in the history
* Make some fields and inits publicly accessible

* Retry failed requests where every partial error is serviceUnavailable

* Increase min throttle duration to 6 seconds

* Use PID to control throttling

* Run swiftlint

* Use shouldRateLimit

* Rename File

* Fix build

* Fix build

* Try setting deadline and tweak PID

* Reimplement min/max throttle durations, tweak shouldRateLimit

* Don't reset PID when retryAfterSeconds is given

* Remove unused variable

* Add log, rename variables, use partial errors for retryAfterSeconds

* Decrease target success rate and min throttle duration

* Fix initial rate limit to 2

* More precision in logs

* Rename file and move

* Revert "Make some fields and inits publicly accessible"

5de0125

* Upgrade to Swift 5.9 to fix CI

* Add back actions/checkout
  • Loading branch information
ryanashcraft authored Jan 11, 2024
1 parent 5389fb6 commit 94c2d57
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 51 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ on:

jobs:
build:

runs-on: macos-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: swift-actions/setup-swift@v1
- name: Build
run: swift build -v
- name: Run tests
Expand Down
23 changes: 23 additions & 0 deletions Package.resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"pins" : [
{
"identity" : "swift-collections",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-collections.git",
"state" : {
"revision" : "d029d9d39c87bed85b1c50adee7c41795261a192",
"version" : "1.0.6"
}
},
{
"identity" : "swift-pid",
"kind" : "remoteSourceControl",
"location" : "https://github.com/ryanashcraft/swift-pid.git",
"state" : {
"revision" : "3f524ecc12bd519f27cbbc73b986be4d60351e91",
"version" : "0.0.3"
}
}
],
"version" : 2
}
13 changes: 10 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version: 5.6
// swift-tools-version: 5.9
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription
Expand All @@ -16,11 +16,18 @@ let package = Package(
targets: ["CloudSyncSession"]
),
],
dependencies: [],
dependencies: [
.package(
url: "https://github.com/ryanashcraft/swift-pid.git",
.upToNextMajor(from: "0.0.1")
),
],
targets: [
.target(
name: "CloudSyncSession",
dependencies: []
dependencies: [
.product(name: "PID", package: "swift-pid"),
]
),
.testTarget(
name: "CloudSyncSessionTests",
Expand Down
66 changes: 46 additions & 20 deletions Sources/CloudSyncSession/CloudKitOperationHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@

import CloudKit
import os.log
import PID

/// An object that handles all of the key operations (fetch, modify, create zone, and create subscription) using the standard CloudKit APIs.
public class CloudKitOperationHandler: OperationHandler {
static let minThrottleDuration: TimeInterval = 2
static let minThrottleDuration: TimeInterval = 1
static let maxThrottleDuration: TimeInterval = 60 * 10

let database: CKDatabase
Expand All @@ -35,6 +36,15 @@ public class CloudKitOperationHandler: OperationHandler {
let log: OSLog
let savePolicy: CKModifyRecordsOperation.RecordSavePolicy = .ifServerRecordUnchanged
let qos: QualityOfService = .userInitiated
var rateLimitController = RateLimitPIDController(
kp: 2,
ki: 0.05,
kd: 0.02,
errorWindowSize: 20,
targetSuccessRate: 0.96,
initialRateLimit: 2,
outcomeWindowSize: 1
)

private let operationQueue: OperationQueue = {
let queue = OperationQueue()
Expand All @@ -45,17 +55,19 @@ public class CloudKitOperationHandler: OperationHandler {

var throttleDuration: TimeInterval {
didSet {
nextOperationDeadline = DispatchTime.now() + throttleDuration

if throttleDuration > oldValue {
os_log(
"Increasing throttle duration from %{public}.0f seconds to %{public}.0f seconds",
"Increasing throttle duration from %{public}.1f seconds to %{public}.1f seconds",
log: log,
type: .default,
oldValue,
throttleDuration
)
} else if throttleDuration < oldValue {
os_log(
"Decreasing throttle duration from %{public}.0f seconds to %{public}.0f seconds",
"Decreasing throttle duration from %{public}.1f seconds to %{public}.1f seconds",
log: log,
type: .default,
oldValue,
Expand All @@ -65,23 +77,45 @@ public class CloudKitOperationHandler: OperationHandler {
}
}

var lastOperationTime: DispatchTime?
var nextOperationDeadline: DispatchTime?

public init(database: CKDatabase, zoneID: CKRecordZone.ID, subscriptionID: String, log: OSLog) {
self.database = database
self.zoneID = zoneID
self.subscriptionID = subscriptionID
self.log = log
throttleDuration = Self.minThrottleDuration
throttleDuration = rateLimitController.rateLimit
}

private func queueOperation(_ operation: Operation) {
let deadline: DispatchTime = (lastOperationTime ?? DispatchTime.now()) + throttleDuration
let deadline: DispatchTime = nextOperationDeadline ?? DispatchTime.now()

DispatchQueue.main.asyncAfter(deadline: deadline) {
self.operationQueue.addOperation(operation)
self.operationQueue.addOperation {
self.lastOperationTime = DispatchTime.now()
}
}

private func onOperationSuccess() {
rateLimitController.record(outcome: .success)
throttleDuration = min(Self.maxThrottleDuration, max(Self.minThrottleDuration, rateLimitController.rateLimit))
}

private func onOperationError(_ error: Error) {
if let ckError = error as? CKError {
rateLimitController.record(outcome: ckError.indicatesShouldBackoff ? .failure : .success)

if let suggestedBackoffSeconds = ckError.suggestedBackoffSeconds {
os_log(
"CloudKit error suggests retrying after %{public}.1f seconds",
log: log,
type: .default,
suggestedBackoffSeconds
)

// Respect the amount suggested for the next operation
throttleDuration = suggestedBackoffSeconds
} else {
throttleDuration = min(Self.maxThrottleDuration, max(Self.minThrottleDuration, rateLimitController.rateLimit))
}
}
}
Expand All @@ -106,15 +140,11 @@ public class CloudKitOperationHandler: OperationHandler {

operation.modifyRecordsCompletionBlock = { serverRecords, deletedRecordIDs, error in
if let error = error {
if let ckError = error as? CKError {
// Use the suggested retry delay, or exponentially increase throttle duration if not provided
self.throttleDuration = min(Self.maxThrottleDuration, ckError.retryAfterSeconds ?? (self.throttleDuration * 2))
}
self.onOperationError(error)

completion(.failure(error))
} else {
// On success, back off of the throttle duration by 66%. Backing off too quickly can result in thrashing.
self.throttleDuration = max(Self.minThrottleDuration, self.throttleDuration * 2 / 3)
self.onOperationSuccess()

completion(.success(ModifyOperation.Response(savedRecords: serverRecords ?? [], deletedRecordIDs: deletedRecordIDs ?? [])))
}
Expand Down Expand Up @@ -199,17 +229,13 @@ public class CloudKitOperationHandler: OperationHandler {
String(describing: error)
)

if let ckError = error as? CKError {
// Use the suggested retry delay, or exponentially increase throttle duration if not provided
self.throttleDuration = min(Self.maxThrottleDuration, ckError.retryAfterSeconds ?? (self.throttleDuration * 2))
}
onOperationError(error)

completion(.failure(error))
} else {
os_log("Finished fetching record zone changes", log: self.log, type: .info)

// On success, back off of the throttle duration by 66%. Backing off too quickly can result in thrashing.
self.throttleDuration = max(Self.minThrottleDuration, self.throttleDuration * 2 / 3)
onOperationSuccess()

completion(
.success(
Expand Down
2 changes: 1 addition & 1 deletion Sources/CloudSyncSession/CloudSyncSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class CloudSyncSession {
dispatch(event: .doWork(.modify(operation)))
}

internal func dispatch(event: SyncEvent) {
func dispatch(event: SyncEvent) {
dispatchQueue.async {
func next(event: SyncEvent, middlewaresToRun: [AnyMiddleware]) -> SyncEvent {
self.eventsPublisher.send(event)
Expand Down
38 changes: 38 additions & 0 deletions Sources/CloudSyncSession/Extensions/CKErrorExtensions.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import CloudKit

extension CKError {
var suggestedBackoffSeconds: TimeInterval? {
if let retryAfterSeconds {
return retryAfterSeconds
}

return partialErrorsByItemID?
.values
.compactMap { ($0 as? CKError)?.retryAfterSeconds }
.max()
}

var indicatesShouldBackoff: Bool {
if retryAfterSeconds != nil {
return true
}

switch self.code {
case .serviceUnavailable,
.zoneBusy,
.requestRateLimited:
return true
case .partialFailure:
guard let partialErrorsByRecordID = self.partialErrorsByItemID as? [CKRecord.ID: Error] else {
return false
}

let partialErrors = partialErrorsByRecordID.compactMap { $0.value as? CKError }
let allErrorsAreRetryable = partialErrors.allSatisfy(\.indicatesShouldBackoff)

return allErrorsAreRetryable
default:
return false
}
}
}
26 changes: 14 additions & 12 deletions Sources/CloudSyncSession/Middleware/ErrorMiddleware.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,20 @@ struct ErrorMiddleware: Middleware {
case let .modify(operation):
// Supported modify partial failures: batchRequestFailed and serverRecordChanged

guard let partialErrors = ckError.partialErrorsByItemID as? [CKRecord.ID: Error] else {
guard let partialErrorsByRecordID = ckError.partialErrorsByItemID as? [CKRecord.ID: Error] else {
return .halt(error)
}

let recordIDsNotSavedOrDeleted = Set(partialErrors.keys)
let recordIDsNotSavedOrDeleted = Set(partialErrorsByRecordID.keys)

let partialErrors = partialErrorsByRecordID.compactMap { $0.value as? CKError }

if ckError.indicatesShouldBackoff {
return .retry(work, error, ckError.suggestedBackoffSeconds)
}

let unhandleableErrorsByItemID = partialErrors
.filter { _, error in
guard let error = error as? CKError else {
return true
}
.filter { error in

switch error.code {
case .batchRequestFailed, .serverRecordChanged, .unknownItem:
Expand All @@ -145,7 +148,7 @@ struct ErrorMiddleware: Middleware {

// All IDs for records that are unknown by the container (probably deleted by another client)
let unknownItemRecordIDs = Set(
partialErrors
partialErrorsByRecordID
.filter { _, error in
if let error = error as? CKError, error.code == .unknownItem {
return true
Expand All @@ -158,7 +161,7 @@ struct ErrorMiddleware: Middleware {

// All IDs for records that failed to be modified due to some other error in the batch modify operation
let batchRequestFailedRecordIDs = Set(
partialErrors
partialErrorsByRecordID
.filter { _, error in
if let error = error as? CKError, error.code == .batchRequestFailed {
return true
Expand All @@ -171,14 +174,13 @@ struct ErrorMiddleware: Middleware {

// All errors for records that failed because there was a conflict
let serverRecordChangedErrors = partialErrors
.filter { _, error in
if let error = error as? CKError, error.code == .serverRecordChanged {
.filter { error in
if error.code == .serverRecordChanged {
return true
}

return false
}
.values

// Resolved records
let resolvedConflictsToSave = serverRecordChangedErrors
Expand Down Expand Up @@ -330,7 +332,7 @@ struct ErrorMiddleware: Middleware {
}
}

internal extension CKRecord {
extension CKRecord {
func removeAllFields() {
let encryptedKeys = Set(encryptedValues.allKeys())

Expand Down
Loading

0 comments on commit 94c2d57

Please sign in to comment.