Skip to content

Commit

Permalink
Implement (some of) Room Lifecycle Monitoring spec
Browse files Browse the repository at this point in the history
Implements points CHA-RL4* from same spec as referenced in e70ee44.

TODO btw i haven't put action descriptions in all of the test names because
they might end up really long

In addition to the TODOs added in the code (all of which refer either to
existing GitHub issues or questions on the spec, for which we have #66
as a catch-all issue), I’ve also not done:

- CHA-RL4a2 — I don’t understand the meaning of “has not yet
  successfully managed to attach its Realtime Channel”, asked about it
  in [1]

- CHA-RL4b2 — seems redundant, asked about it in [2]

- CHA-RL4b3, CHA-RL4b4 — seem redundant, asked about it in [3]

- CHA-RL4b5, CHA-RL4b6, CHA-RL4b7 — these relate to transient disconnect
  timeouts, so will do them in #48

Resolves #53. (TODO move to next commit with the discontinuity points)

[1] https://github.com/ably/specification/pull/200/files#r1775552624
[2] https://github.com/ably/specification/pull/200/files#r1777212960
[3] https://github.com/ably/specification/pull/200/files#r1777365677
  • Loading branch information
lawrence-forooghian committed Sep 26, 2024
1 parent 2070846 commit ce1abeb
Show file tree
Hide file tree
Showing 4 changed files with 433 additions and 6 deletions.
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ let package = Package(
name: "Ably",
package: "ably-cocoa"
),
.product(
name: "AsyncAlgorithms",
package: "swift-async-algorithms"
),
]
),
.testTarget(
Expand Down
170 changes: 167 additions & 3 deletions Sources/AblyChat/RoomLifecycleManager.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Ably
@preconcurrency import Ably
import AsyncAlgorithms

/// The interface that the lifecycle manager expects its contributing realtime channels to conform to.
///
Expand All @@ -11,6 +12,13 @@ internal protocol RoomLifecycleContributorChannel: Sendable {

var state: ARTRealtimeChannelState { get async }
var errorReason: ARTErrorInfo? { get async }

// TODO: consider the consequences of this async (right now, it's just to make it easy to write a mock using an actor), but from a semantics point of view does it make sense in the same way as the above ones?
func subscribeToState() async -> Subscription<ARTChannelStateChange>

// TODO: this really isn't the right place for this to go, move elsewhere
// TODO: again, this `async` is a bit dodgy, it's just there so we can use an actor to manage some subscription state
func emitDiscontinuity(_ error: ARTErrorInfo) async
}

internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
Expand All @@ -22,8 +30,22 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
internal var channel: Channel
}

// TODO: something intelligent to say about this beyond that it's a term used in the spec. Exposed for tests
internal struct PendingDiscontinuityEvent {
internal var error: ARTErrorInfo
}

/// Stores manager state relating to a given contributor.
private struct ContributorAnnotation {
var pendingDiscontinuityEvents: [PendingDiscontinuityEvent] = []
}

internal private(set) var current: RoomLifecycle
internal private(set) var error: ARTErrorInfo?
// TODO: link this to what the manager is actually doing
private var hasOperationInProgress: Bool
/// The annotation at a given index belongs to the element of ``contributors`` at the same index.
private var contributorAnnotations: [ContributorAnnotation]

private let logger: InternalLogger
private let clock: SimpleClock
Expand All @@ -32,30 +54,68 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
internal init(contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async {
await self.init(
current: nil,
hasOperationInProgress: nil,
contributors: contributors,
logger: logger,
clock: clock
)
}

#if DEBUG
internal init(testsOnly_current current: RoomLifecycle? = nil, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async {
internal init(testsOnly_current current: RoomLifecycle? = nil, testsOnly_hasOperationInProgress hasOperationInProgress: Bool? = nil, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async {
await self.init(
current: current,
hasOperationInProgress: hasOperationInProgress,
contributors: contributors,
logger: logger,
clock: clock
)
}
#endif

private init(current: RoomLifecycle?, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async {
private init(current: RoomLifecycle?, hasOperationInProgress: Bool?, contributors: [Contributor], logger: InternalLogger, clock: SimpleClock) async {
self.current = current ?? .initialized
self.hasOperationInProgress = hasOperationInProgress ?? false
self.contributors = contributors
contributorAnnotations = Array(repeating: .init(), count: contributors.count)
self.logger = logger
self.clock = clock

// The idea here is to make sure that, before the initializer completes, we are already listening for state changes, so that e.g. tests don’t miss a state change.
let subscriptions = await withTaskGroup(of: Subscription<ARTChannelStateChange>.self) { group in
for contributor in contributors {
group.addTask {
await contributor.channel.subscribeToState()
}
}

return await Array(group)
}

// TODO: who owns this task? how does it get cancelled? how do we know when it's started and that the manager is "ready to go"? Do we need an async initializer?
Task {
await withTaskGroup(of: Void.self) { group in
for (index, subscription) in subscriptions.enumerated() {
// TODO: this capture
// TODO: is await what we want? is there a way to make the manager's initializer isolated to the manager?
// TODO: this @Sendable was to make a mysterious compiler error go away
group.addTask { @Sendable in
for await stateChange in subscription {
// TODO: why does this not inherit the actor isolation? (I mean, now that I have @Sendable, I get it)
await self.didReceiveStateChange(stateChange, forContributorAt: index)
}
}
}
}
}
}

#if DEBUG
internal func testsOnly_pendingDiscontinuityEventsForContributor(at index: Int) -> [PendingDiscontinuityEvent] {
contributorAnnotations[index].pendingDiscontinuityEvents
}
#endif

// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
private var subscriptions: [Subscription<RoomStatusChange>] = []

Expand All @@ -65,6 +125,110 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
return subscription
}

#if DEBUG
// TODO: explain — these are to let the tests know that it's handled a state change
private var stateChangeHandledSubscriptions: [Subscription<ARTChannelStateChange>] = []

internal func testsOnly_subscribeToHandledContributorStateChanges() -> Subscription<ARTChannelStateChange> {
let subscription = Subscription<ARTChannelStateChange>(bufferingPolicy: .unbounded)
stateChangeHandledSubscriptions.append(subscription)
return subscription
}
#endif

// TODO: this is only async because it needs to call `emitDiscontinuity`; that's not great. update: now it's also calling detach on channels but that probably shouldn't be inline, need clarification)
/// Implements CHA-RL4b’s contributor state change handling.
private func didReceiveStateChange(_ stateChange: ARTChannelStateChange, forContributorAt index: Int) async {
logger.log(message: "Got state change \(stateChange) for contributor at index \(index)", level: .info)

switch stateChange.event {
case .update:
// CHA-RL4a1 — if RESUMED then no-op
guard !stateChange.resumed else {
break
}

guard let reason = stateChange.reason else {
// TODO: is this OK? would be good if ably-cocoa could communicate this in types
preconditionFailure("State change event with resumed == false should have a reason")
}

if hasOperationInProgress {
// CHA-RL4a3
logger.log(message: "Recording pending discontinuity event for contributor at index \(index)", level: .info)

contributorAnnotations[index].pendingDiscontinuityEvents.append(
.init(error: reason)
)
} else {
// CHA-RL4a4
logger.log(message: "Emitting discontinuity event for contributor at index \(index)", level: .info)

let contributor = contributors[index]
await contributor.channel.emitDiscontinuity(reason)
}
case .attached:
if hasOperationInProgress {
if !stateChange.resumed {
// CHA-RL4b1
logger.log(message: "Recording pending discontinuity event for contributor at index \(index)", level: .info)

guard let reason = stateChange.reason else {
// TODO: same question as above about whether this is OK
preconditionFailure("State change event with resumed == false should have a reason")
}

contributorAnnotations[index].pendingDiscontinuityEvents.append(
.init(error: reason)
)
}
} else if current != .attached {
if await (contributors.async.map { await $0.channel.state }.allSatisfy { $0 == .attached }) {
// CHA-RL4b8
logger.log(message: "Now that all contributors are ATTACHED, transitioning room to ATTACHED", level: .info)
changeStatus(to: .attached)
}
}
case .failed:
if !hasOperationInProgress {
// CHA-RL4b5
guard let reason = stateChange.reason else {
// TODO: same question as above about whether this is OK
preconditionFailure("FAILED state change event should have a reason")
}

changeStatus(to: .failed, error: reason)

// TODO: CHA-RL4b5 is a bit unclear about how to handle failure, and whether they can be detached concurrently (asked in https://github.com/ably/specification/pull/200/files#r1777471810)
for contributor in contributors {
do {
try await contributor.channel.detach()
} catch {
logger.log(message: "Failed to detach contributor \(contributor), error \(error)", level: .info)
}
}
}
case .suspended:
if !hasOperationInProgress {
// CHA-RL4b9
guard let reason = stateChange.reason else {
// TODO: same question as above about whether this is OK
preconditionFailure("SUSPENDED state change event should have a reason")
}

changeStatus(to: .suspended, error: reason)
}
default:
break
}

#if DEBUG
for subscription in stateChangeHandledSubscriptions {
subscription.emit(stateChange)
}
#endif
}

/// Updates ``current`` and ``error`` and emits a status change event.
private func changeStatus(to new: RoomLifecycle, error: ARTErrorInfo? = nil) {
logger.log(message: "Transitioning from \(current) to \(new), error \(String(describing: error))", level: .info)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Ably
@preconcurrency import Ably
@testable import AblyChat

final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel {
Expand All @@ -7,9 +7,12 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel

var state: ARTRealtimeChannelState
var errorReason: ARTErrorInfo?
// TODO: clean up
private var subscriptions: [Subscription<ARTChannelStateChange>] = []

private(set) var attachCallCount = 0
private(set) var detachCallCount = 0
private(set) var emitDiscontinuityArguments: [ARTErrorInfo] = []

init(
initialState: ARTRealtimeChannelState,
Expand Down Expand Up @@ -92,4 +95,20 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel
throw error
}
}

func subscribeToState() -> Subscription<ARTChannelStateChange> {
let subscription = Subscription<ARTChannelStateChange>(bufferingPolicy: .unbounded)
subscriptions.append(subscription)
return subscription
}

func emitStateChange(_ stateChange: ARTChannelStateChange) {
for subscription in subscriptions {
subscription.emit(stateChange)
}
}

func emitDiscontinuity(_ error: ARTErrorInfo) async {
emitDiscontinuityArguments.append(error)
}
}
Loading

0 comments on commit ce1abeb

Please sign in to comment.