Skip to content

Commit

Permalink
store property update requests that need auth
Browse files Browse the repository at this point in the history
When a requests fails with a 401 due to JWT or fails when preparing for execution we remove the request from the request queue and add it to the pending dictionary.

Once we get the onJWTUpdated callback for that externalId we requeue the pending requests and try again.
  • Loading branch information
emawby committed Sep 11, 2024
1 parent f3cb3d8 commit 6ebfcfc
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ private struct OSCombinedProperties {
class OSPropertyOperationExecutor: OSOperationExecutor {
var supportedDeltas: [String] = [OS_UPDATE_PROPERTIES_DELTA]
var deltaQueue: [OSDelta] = []
var pendingAuthRequests: [String: [OSRequestUpdateProperties]] = [String:[OSRequestUpdateProperties]]()
var updateRequestQueue: [OSRequestUpdateProperties] = []
let newRecordsState: OSNewRecordsState
let jwtConfig: OSUserJwtConfig
Expand Down Expand Up @@ -268,19 +269,44 @@ class OSPropertyOperationExecutor: OSOperationExecutor {
}
}

func handleUnauthorizedError(externalId: String, error: NSError) {
func handleUnauthorizedError(externalId: String, error: NSError, request: OSRequestUpdateProperties) {
if (jwtConfig.isRequired ?? false) {
self.pendRequestUntilAuthUpdated(request, externalId: externalId)
OneSignalUserManagerImpl.sharedInstance.invalidateJwtForExternalId(externalId: externalId, error: error)
}
}

func pendRequestUntilAuthUpdated(_ request: OSRequestUpdateProperties, externalId: String?) {
self.dispatchQueue.async {
self.updateRequestQueue.removeAll(where: { $0 == request})
guard let externalId = externalId else {
return
}
var requests = self.pendingAuthRequests[externalId] ?? []
let inQueue = requests.contains(where: {$0 == request})
guard !inQueue else {
return
}
requests.append(request)
self.pendingAuthRequests[externalId] = requests
}
}

func executeUpdatePropertiesRequest(_ request: OSRequestUpdateProperties, inBackground: Bool) {
guard !request.sentToClient else {
return
}

guard request.addJWTHeaderIsValid(identityModel: request.identityModel) else {
pendRequestUntilAuthUpdated(request, externalId:request.identityModel.externalId)
return
}

guard request.prepareForExecution(newRecordsState: newRecordsState) else {
return
}

print("ECM executing properties request: %@", request.identityModel.externalId)
request.sentToClient = true

let backgroundTaskIdentifier = PROPERTIES_EXECUTOR_BACKGROUND_TASK + UUID().uuidString
Expand Down Expand Up @@ -319,7 +345,7 @@ class OSPropertyOperationExecutor: OSOperationExecutor {
OneSignalUserManagerImpl.sharedInstance._logout()
} else if responseType == .unauthorized && (self.jwtConfig.isRequired ?? false) {
if let externalId = request.identityModel.externalId {
self.handleUnauthorizedError(externalId: externalId, error: nsError)
self.handleUnauthorizedError(externalId: externalId, error: nsError, request: request)
}
request.sentToClient = false
} else if responseType != .retryable {
Expand Down Expand Up @@ -347,6 +373,21 @@ extension OSPropertyOperationExecutor: OSUserJwtConfigListener {

func onJwtUpdated(externalId: String, token: String?) {
print("❌ OSPropertyOperationExecutor onJwtUpdated for \(externalId) to \(String(describing: token))")
reQueuePendingRequestsForExternalId(externalId: externalId)
}

private func reQueuePendingRequestsForExternalId(externalId: String) {
self.dispatchQueue.async {
guard let requests = self.pendingAuthRequests[externalId] else {
return
}
for request in requests {
self.updateRequestQueue.append(request)
}
self.pendingAuthRequests[externalId] = nil
OneSignalUserDefaults.initShared().saveCodeableData(forKey: OS_USER_EXECUTOR_USER_REQUEST_QUEUE_KEY, withValue: self.updateRequestQueue)
self.processRequestQueue(inBackground: false)
}
}

private func removeInvalidDeltasAndRequests() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,82 @@ final class PropertyExecutorTests: XCTestCase {
XCTAssertTrue(mocks.client.hasExecutedRequestOfType(OSRequestUpdateProperties.self))
XCTAssertTrue(invalidatedCallbackWasCalled)
}

func testUserRequests_Retry_OnTokenUpdate() {

/* Setup */
let mocks = Mocks()
mocks.setAuthRequired(true)
OneSignalUserManagerImpl.sharedInstance.operationRepo.paused = true

let user = mocks.setUserManagerInternalUser(externalId: userA_EUID, onesignalId: userA_OSID)
user.identityModel.jwtBearerToken = userA_InvalidJwtToken

// We need to use the user manager's executor because the onJWTUpdated callback won't fire on the mock executor
let executor = OneSignalUserManagerImpl.sharedInstance.propertyExecutor!

let tags = ["testUserA" : "true"]
MockUserRequests.setUnauthorizedUpdatePropertiesFailureResponses(with: mocks.client, tags: tags)
executor.enqueueDelta(OSDelta(name: OS_UPDATE_PROPERTIES_DELTA, identityModelId: user.identityModel.modelId, model: OSPropertiesModel(changeNotifier: OSEventProducer()), property: "tags", value:tags))

var invalidatedCallbackWasCalled = false
OneSignalUserManagerImpl.sharedInstance.User.onJwtInvalidated { event in
invalidatedCallbackWasCalled = true
MockUserRequests.setAddTagsResponse(with: mocks.client, tags: tags)
OneSignalUserManagerImpl.sharedInstance.updateUserJwt(externalId: userA_EUID, token: userA_ValidJwtToken)
}

/* When */
executor.processDeltaQueue(inBackground: false)
OneSignalCoreMocks.waitForBackgroundThreads(seconds: 0.5)

/* Then */
XCTAssertTrue(mocks.client.hasExecutedRequestOfType(OSRequestUpdateProperties.self))
XCTAssertTrue(invalidatedCallbackWasCalled)
XCTAssertEqual(mocks.client.networkRequestCount, 2)
}

func testUserRequests_RetryRequests_OnTokenUpdate_ForOnlyUpdatedUser() {
/* Setup */
let mocks = Mocks()

mocks.setAuthRequired(true)

let userA = mocks.setUserManagerInternalUser(externalId: userA_EUID, onesignalId: userA_OSID)
userA.identityModel.jwtBearerToken = userA_InvalidJwtToken

let userB = mocks.setUserManagerInternalUser(externalId: userB_EUID, onesignalId: userB_OSID)
userB.identityModel.jwtBearerToken = userA_InvalidJwtToken
// We need to use the user manager's executor because the onJWTUpdated callback won't fire on the mock executor
let executor = OneSignalUserManagerImpl.sharedInstance.propertyExecutor!

let tags = ["testUserA" : "true"]
MockUserRequests.setUnauthorizedUpdatePropertiesFailureResponses(with: mocks.client, tags: tags)

executor.enqueueDelta(OSDelta(name: OS_UPDATE_PROPERTIES_DELTA, identityModelId: userA.identityModel.modelId, model: OSPropertiesModel(changeNotifier: OSEventProducer()), property: "tags", value:tags))
executor.enqueueDelta(OSDelta(name: OS_UPDATE_PROPERTIES_DELTA, identityModelId: userB.identityModel.modelId, model: OSPropertiesModel(changeNotifier: OSEventProducer()), property: "tags", value:tags))

var invalidatedCallbackWasCalled = false
OneSignalUserManagerImpl.sharedInstance.User.onJwtInvalidated { event in
invalidatedCallbackWasCalled = true
}

/* When */
executor.processDeltaQueue(inBackground: false)
OneSignalCoreMocks.waitForBackgroundThreads(seconds: 0.5)

MockUserRequests.setAddTagsResponse(with: mocks.client, tags: tags)
OneSignalUserManagerImpl.sharedInstance.updateUserJwt(externalId: userB_EUID, token: userB_ValidJwtToken)

OneSignalCoreMocks.waitForBackgroundThreads(seconds: 0.5)

/* Then */
// The executor should execute this request since identity verification is required and the token was set
XCTAssertTrue(mocks.client.hasExecutedRequestOfType(OSRequestUpdateProperties.self))
XCTAssertTrue(invalidatedCallbackWasCalled)
let updateRequests = mocks.client.executedRequests.filter { request in
request.isKind(of: OSRequestUpdateProperties.self)
}
XCTAssertEqual(updateRequests.count, 3)
}
}

0 comments on commit 6ebfcfc

Please sign in to comment.