Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced to async method for URLSessionWebSocketTask.receive(). #2145

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion IceCubesApp/App/Main/IceCubesApp+Scene.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ extension IceCubesApp {
.onChange(of: appAccountsManager.currentClient) { _, newValue in
setNewClientsInEnv(client: newValue)
if newValue.isAuth {
watcher.watch(streams: [.user, .direct])
Task {
await watcher.watch(streams: [.user, .direct])
}
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions IceCubesApp/App/Main/IceCubesApp.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ struct IceCubesApp: App {
userPreferences.setClient(client: client)
Task {
await currentInstance.fetchCurrentInstance()
watcher.setClient(client: client, instanceStreamingURL: currentInstance.instance?.urls?.streamingApi)
watcher.watch(streams: [.user, .direct])
await watcher.setClient(client: client, instanceStreamingURL: currentInstance.instance?.urls?.streamingApi)
await watcher.watch(streams: [.user, .direct])
}
}

Expand All @@ -54,10 +54,10 @@ struct IceCubesApp: App {
case .background:
watcher.stopWatching()
case .active:
watcher.watch(streams: [.user, .direct])
UNUserNotificationCenter.current().setBadgeCount(0)
userPreferences.reloadNotificationsCount(tokens: appAccountsManager.availableAccounts.compactMap(\.oauthToken))
Task {
await watcher.watch(streams: [.user, .direct])
try? await UNUserNotificationCenter.current().setBadgeCount(0)
userPreferences.reloadNotificationsCount(tokens: appAccountsManager.availableAccounts.compactMap(\.oauthToken))
await userPreferences.refreshServerPreferences()
}
default:
Expand Down
80 changes: 37 additions & 43 deletions Packages/Env/Sources/Env/StreamWatcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ import OSLog
decoder.keyDecodingStrategy = .convertFromSnakeCase
}

public func setClient(client: Client, instanceStreamingURL: URL?) {
public func setClient(client: Client, instanceStreamingURL: URL?) async {
if self.client != nil {
stopWatching()
}
self.client = client
self.instanceStreamingURL = instanceStreamingURL
connect()
await connect()
}

private func connect() {
private func connect() async {
guard let task = try? client?.makeWebSocketTask(
endpoint: Streaming.streaming,
instanceStreamingURL: instanceStreamingURL
Expand All @@ -54,15 +54,15 @@ import OSLog
}
self.task = task
self.task?.resume()
receiveMessage()
await receiveMessage()
}

public func watch(streams: [Stream]) {
public func watch(streams: [Stream]) async {
if client?.isAuth == false {
return
}
if task == nil {
connect()
await connect()
}
watchedStreams = streams
for stream in streams {
Expand All @@ -83,48 +83,42 @@ import OSLog
}
}

private func receiveMessage() {
task?.receive(completionHandler: { [weak self] result in
guard let self else { return }
switch result {
case let .success(message):
switch message {
case let .string(string):
do {
guard let data = string.data(using: .utf8) else {
logger.error("Error decoding streaming event string")
return
}
let rawEvent = try decoder.decode(RawStreamEvent.self, from: data)
logger.info("Stream update: \(rawEvent.event)")
if let event = rawEventToEvent(rawEvent: rawEvent) {
Task { @MainActor in
self.events.append(event)
self.latestEvent = event
if let event = event as? StreamEventNotification, event.notification.status?.visibility != .direct {
self.unreadNotificationsCount += 1
}
}
private func receiveMessage() async {
do {
guard let message = try await task?.receive() else { return }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should return here as it could break the next receiveMessage loop.


switch message {
case let .string(string):
do {
guard let data = string.data(using: .utf8) else {
logger.error("Error decoding streaming event string")
return
}
let rawEvent = try decoder.decode(RawStreamEvent.self, from: data)
logger.info("Stream update: \(rawEvent.event)")
if let event = rawEventToEvent(rawEvent: rawEvent) {
events.append(event)
latestEvent = event
if let event = event as? StreamEventNotification, event.notification.status?.visibility != .direct {
unreadNotificationsCount += 1
}
} catch {
logger.error("Error decoding streaming event: \(error.localizedDescription)")
}

default:
break
} catch {
logger.error("Error decoding streaming event: \(error.localizedDescription)")
}

receiveMessage()

case .failure:
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(retryDelay)) {
self.retryDelay += 30
self.stopWatching()
self.connect()
self.watch(streams: self.watchedStreams)
}
default:
break
}
})

await receiveMessage()
} catch {
try? await Task.sleep(nanoseconds: UInt64(retryDelay * 1000 * 1000 * 1000))
retryDelay += 30
stopWatching()
await connect()
await watch(streams: watchedStreams)
}
}

private func rawEventToEvent(rawEvent: RawStreamEvent) -> (any StreamEvent)? {
Expand Down
Loading