Skip to content

Commit

Permalink
fix(datastore): sync pending mutation events with latest synced metad…
Browse files Browse the repository at this point in the history
…ata (#3377)

* apply metadata of incoming mutation event which has pending mutations

* sync mutation event with latest synced version

* add doc for reconcile

* resolve comments

* remove redundant logic

* update test case to verify the latest synced version is applied to api request

* resolve comments
  • Loading branch information
5d authored Jan 2, 2024
1 parent 756548b commit a0ee257
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 677 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,6 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
func resolveConflictsThenSave(mutationEvent: MutationEvent,
storageAdapter: StorageEngineAdapter,
completion: @escaping (Result<MutationEvent, DataStoreError>)->Void) {

// We don't want to query MutationSync<AnyModel> because a) we already have the model, and b) delete mutations
// are submitted *after* the delete has already been applied to the local data store, meaning there is no model
// to query.
var mutationEvent = mutationEvent
do {
// TODO: Refactor this so that it's clear that the storage engine is not responsible for setting the version
// perhaps as simple as renaming to `submit(unversionedMutationEvent:)` or similar
let syncMetadata = try storageAdapter.queryMutationSyncMetadata(for: mutationEvent.modelId,
modelName: mutationEvent.modelName)
mutationEvent.version = syncMetadata?.version
} catch {
completion(.failure(DataStoreError(error: error)))
}

MutationEvent.pendingMutationEvents(
forMutationEvent: mutationEvent,
storageAdapter: storageAdapter) { result in
Expand Down Expand Up @@ -208,8 +193,6 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
}
resolvedEvent.mutationType = updatedMutationType

resolvedEvent.version = candidate.version

return resolvedEvent
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
Task {
let syncMutationToCloudOperation = await SyncMutationToCloudOperation(
mutationEvent: mutationEvent,
getLatestSyncMetadata: { try? self.storageAdapter.queryMutationSyncMetadata(for: mutationEvent.modelId, modelName: mutationEvent.modelName) },
api: api,
authModeStrategy: authModeStrategy
) { [weak self] result in
Expand Down Expand Up @@ -257,12 +258,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
return
}
reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName)
MutationEvent.reconcilePendingMutationEventsVersion(
sent: mutationEvent,
received: mutationSync,
storageAdapter: storageAdapter) { _ in
self.completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
}
completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
} else {
completeProcessingEvent(mutationEvent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {

private weak var api: APICategoryGraphQLBehaviorExtended?
private let mutationEvent: MutationEvent
private let getLatestSyncMetadata: () -> MutationSyncMetadata?
private let completion: GraphQLOperation<MutationSync<AnyModel>>.ResultListener
private let requestRetryablePolicy: RequestRetryablePolicy

Expand All @@ -31,13 +32,15 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
private var authTypesIterator: AWSAuthorizationTypeIterator?

init(mutationEvent: MutationEvent,
getLatestSyncMetadata: @escaping () -> MutationSyncMetadata?,
api: APICategoryGraphQLBehaviorExtended,
authModeStrategy: AuthModeStrategy,
networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, Never>? = nil,
currentAttemptNumber: Int = 1,
requestRetryablePolicy: RequestRetryablePolicy? = RequestRetryablePolicy(),
completion: @escaping GraphQLOperation<MutationSync<AnyModel>>.ResultListener) async {
self.mutationEvent = mutationEvent
self.getLatestSyncMetadata = getLatestSyncMetadata
self.api = api
self.networkReachabilityPublisher = networkReachabilityPublisher
self.completion = completion
Expand All @@ -57,6 +60,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {

override func main() {
log.verbose(#function)

sendMutationToCloud(withAuthType: authTypesIterator?.next())
}

Expand Down Expand Up @@ -108,6 +112,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
mutationType: GraphQLMutationType,
authType: AWSAuthorizationType? = nil
) -> GraphQLRequest<MutationSync<AnyModel>>? {
let latestSyncMetadata = getLatestSyncMetadata()
var request: GraphQLRequest<MutationSync<AnyModel>>

do {
Expand All @@ -128,7 +133,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.deleteMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: mutationEvent.version)
version: latestSyncMetadata?.version)
case .update:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -140,7 +145,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.updateMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: mutationEvent.version)
version: latestSyncMetadata?.version)
case .create:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -151,7 +156,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
}
request = GraphQLRequest<MutationSyncResult>.createMutation(of: model,
modelSchema: modelSchema,
version: mutationEvent.version)
version: latestSyncMetadata?.version)
}
} catch {
let apiError = APIError.unknown("Couldn't decode model", "", error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {

// MARK: - Responder methods

/// The reconcile function incorporates incoming mutation events into the local database through the following steps:
/// 1. Retrieve the local metadata of the models.
/// 2. Generate dispositions based on incoming mutation events and local metadata.
/// 3. Categorize dispositions into:
/// 3.1 Apply metadata only for those with existing pending mutations.
/// 3.1.1 Notify the count of these incoming mutation events as dropped items.
/// 3.2 Apply incoming mutation and metadata for those without existing pending mutations.
/// 4. Notify the final result.
func reconcile(remoteModels: [RemoteModel]) {
guard !isCancelled else {
log.info("\(#function) - cancelled, aborting")
Expand All @@ -133,16 +141,21 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {

do {
try storageAdapter.transaction {
queryPendingMutations(withModels: remoteModels.map(\.model))
self.queryLocalMetadata(remoteModels)
.subscribe(on: workQueue)
.flatMap { mutationEvents -> Future<([RemoteModel], [LocalMetadata]), DataStoreError> in
let remoteModelsToApply = self.reconcile(remoteModels, pendingMutations: mutationEvents)
return self.queryLocalMetadata(remoteModelsToApply)
.map { (remoteModels, localMetadatas) in
self.getDispositions(for: remoteModels, localMetadatas: localMetadatas)
}
.flatMap { (remoteModelsToApply, localMetadatas) -> Future<Void, DataStoreError> in
let dispositions = self.getDispositions(for: remoteModelsToApply,
localMetadatas: localMetadatas)
return self.applyRemoteModelsDispositions(dispositions)
.flatMap { dispositions in
self.queryPendingMutations(withModels: dispositions.map(\.remoteModel.model))
.map { pendingMutations in (pendingMutations, dispositions) }
}
.map { (pendingMutations, dispositions) in
self.separateDispositions(pendingMutations: pendingMutations, dispositions: dispositions)
}
.flatMap { (dispositions, dispositionOnlyApplyMetadata) in
self.waitAllPublisherFinishes(publishers: dispositionOnlyApplyMetadata.map(self.saveMetadata(disposition:)))
.flatMap { _ in self.applyRemoteModelsDispositions(dispositions) }
}
.sink(
receiveCompletion: {
Expand Down Expand Up @@ -203,15 +216,27 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

func reconcile(_ remoteModels: [RemoteModel], pendingMutations: [MutationEvent]) -> [RemoteModel] {
guard !remoteModels.isEmpty else {
return []
func separateDispositions(
pendingMutations: [MutationEvent],
dispositions: [RemoteSyncReconciler.Disposition]
) -> ([RemoteSyncReconciler.Disposition], [RemoteSyncReconciler.Disposition]) {
guard !dispositions.isEmpty else {
return ([], [])
}

let remoteModelsToApply = RemoteSyncReconciler.filter(remoteModels,
pendingMutations: pendingMutations)
notifyDropped(count: remoteModels.count - remoteModelsToApply.count)
return remoteModelsToApply

let pendingMutationModelIds = Set(pendingMutations.map(\.modelId))

let dispositionsToApply = dispositions.filter {
!pendingMutationModelIds.contains($0.remoteModel.model.identifier)
}

let dispositionsOnlyApplyMetadata = dispositions.filter {
pendingMutationModelIds.contains($0.remoteModel.model.identifier)
}

notifyDropped(count: dispositionsOnlyApplyMetadata.count)
return (dispositionsToApply, dispositionsOnlyApplyMetadata)
}

func queryLocalMetadata(_ remoteModels: [RemoteModel]) -> Future<([RemoteModel], [LocalMetadata]), DataStoreError> {
Expand Down Expand Up @@ -269,24 +294,16 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
disposition: RemoteSyncReconciler.Disposition
) -> AnyPublisher<Result<Void, DataStoreError>, Never> {
let operation: Future<ApplyRemoteModelResult, DataStoreError>
let mutationType: MutationEvent.MutationType
switch disposition {
case .create(let remoteModel):
operation = self.save(storageAdapter: storageAdapter, remoteModel: remoteModel)
mutationType = .create
case .update(let remoteModel):
operation = self.save(storageAdapter: storageAdapter, remoteModel: remoteModel)
mutationType = .update
case .delete(let remoteModel):
operation = self.delete(storageAdapter: storageAdapter, remoteModel: remoteModel)
mutationType = .delete
case .create, .update:
operation = self.save(storageAdapter: storageAdapter, remoteModel: disposition.remoteModel)
case .delete:
operation = self.delete(storageAdapter: storageAdapter, remoteModel: disposition.remoteModel)
}

return operation
.flatMap { applyResult in
self.saveMetadata(storageAdapter: storageAdapter, applyResult: applyResult, mutationType: mutationType)
}
.map {_ in Result.success(()) }
.flatMap { self.saveMetadata(storageAdapter: storageAdapter, result: $0, mutationType: disposition.mutationType) }
.map { _ in Result.success(()) }
.catch { Just<Result<Void, DataStoreError>>(.failure($0))}
.eraseToAnyPublisher()
}
Expand Down Expand Up @@ -315,15 +332,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
applyRemoteModelsDisposition(storageAdapter: storageAdapter, disposition: $0)
}

return Future { promise in
Publishers.MergeMany(publishers)
.collect()
.sink { _ in
// This stream will never fail, as we wrapped error in the result type.
promise(.successfulVoid)
} receiveValue: { _ in }
.store(in: &self.cancellables)
}
return self.waitAllPublisherFinishes(publishers: publishers)
}

enum ApplyRemoteModelResult {
Expand Down Expand Up @@ -359,8 +368,10 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

private func save(storageAdapter: StorageEngineAdapter,
remoteModel: RemoteModel) -> Future<ApplyRemoteModelResult, DataStoreError> {
private func save(
storageAdapter: StorageEngineAdapter,
remoteModel: RemoteModel
) -> Future<ApplyRemoteModelResult, DataStoreError> {
Future<ApplyRemoteModelResult, DataStoreError> { promise in
storageAdapter.save(untypedModel: remoteModel.model.instance, eagerLoad: self.isEagerLoad) { response in
switch response {
Expand Down Expand Up @@ -388,27 +399,50 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

private func saveMetadata(storageAdapter: StorageEngineAdapter,
applyResult: ApplyRemoteModelResult,
mutationType: MutationEvent.MutationType) -> Future<Void, DataStoreError> {
Future<Void, DataStoreError> { promise in
guard case let .applied(inProcessModel) = applyResult else {
promise(.successfulVoid)
return
}
private func saveMetadata(
disposition: RemoteSyncReconciler.Disposition
) -> AnyPublisher<Void, Never> {
guard let storageAdapter = self.storageAdapter else {
return Just(()).eraseToAnyPublisher()
}
return saveMetadata(storageAdapter: storageAdapter, remoteModel: disposition.remoteModel, mutationType: disposition.mutationType)
.map { _ in () }
.catch { _ in Just(()) }
.eraseToAnyPublisher()
}

storageAdapter.save(inProcessModel.syncMetadata,
condition: nil,
eagerLoad: self.isEagerLoad) { result in
switch result {
case .failure(let dataStoreError):
self.notifyDropped(error: dataStoreError)
promise(.failure(dataStoreError))
case .success(let syncMetadata):
private func saveMetadata(
storageAdapter: StorageEngineAdapter,
result: ApplyRemoteModelResult,
mutationType: MutationEvent.MutationType
) -> AnyPublisher<Void, DataStoreError> {
if case let .applied(inProcessModel) = result {
return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: inProcessModel, mutationType: mutationType)
.handleEvents( receiveOutput: { syncMetadata in
let appliedModel = MutationSync(model: inProcessModel.model, syncMetadata: syncMetadata)
self.notify(savedModel: appliedModel, mutationType: mutationType)
promise(.successfulVoid)
}
}, receiveCompletion: { completion in
if case .failure(let error) = completion {
self.notifyDropped(error: error)
}
})
.map { _ in () }
.eraseToAnyPublisher()

}
return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher()
}

private func saveMetadata(
storageAdapter: StorageEngineAdapter,
remoteModel: RemoteModel,
mutationType: MutationEvent.MutationType
) -> Future<MutationSyncMetadata, DataStoreError> {
Future { promise in
storageAdapter.save(remoteModel.syncMetadata,
condition: nil,
eagerLoad: self.isEagerLoad) { result in
promise(result)
}
}
}
Expand Down Expand Up @@ -454,6 +488,17 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
private static func unfulfilledDataStoreError(name: String = #function) -> DataStoreError {
.unknown("\(name) did not fulfill promise", AmplifyErrorMessages.shouldNotHappenReportBugToAWS(), nil)
}

private func waitAllPublisherFinishes<T>(publishers: [AnyPublisher<T, Never>]) -> Future<Void, DataStoreError> {
Future { promise in
Publishers.MergeMany(publishers)
.collect()
.sink(receiveCompletion: { _ in
promise(.successfulVoid)
}, receiveValue: { _ in })
.store(in: &self.cancellables)
}
}
}

extension ReconcileAndLocalSaveOperation: DefaultLogger {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,24 @@ struct RemoteSyncReconciler {
case create(RemoteModel)
case update(RemoteModel)
case delete(RemoteModel)
}

/// Filter the incoming `remoteModels` against the pending mutations.
/// If there is a matching pending mutation, drop the remote model.
///
/// - Parameters:
/// - remoteModels: models retrieved from the remote store
/// - pendingMutations: pending mutations from the outbox
/// - Returns: remote models to be applied
static func filter(_ remoteModels: [RemoteModel],
pendingMutations: [MutationEvent]) -> [RemoteModel] {
guard !pendingMutations.isEmpty else {
return remoteModels
var remoteModel: RemoteModel {
switch self {
case .create(let model), .update(let model), .delete(let model):
return model
}
}

let pendingMutationModelIdsArr = pendingMutations.map { mutationEvent in
mutationEvent.modelId
}
let pendingMutationModelIds = Set(pendingMutationModelIdsArr)

return remoteModels.filter { remoteModel in
!pendingMutationModelIds.contains(remoteModel.model.identifier)
var mutationType: MutationEvent.MutationType {
switch self {
case .create: return .create
case .update: return .update
case .delete: return .delete
}
}
}


/// Reconciles the incoming `remoteModels` against the local metadata to get the disposition
///
/// - Parameters:
Expand Down
Loading

0 comments on commit a0ee257

Please sign in to comment.