diff --git a/Package.swift b/Package.swift index 69eb4d6..b75414c 100644 --- a/Package.swift +++ b/Package.swift @@ -5,7 +5,10 @@ import PackageDescription let package = Package( name: "swift-twitch-client", platforms: [.macOS(.v13), .iOS(.v16), .tvOS(.v16), .watchOS(.v9)], - products: [.library(name: "Twitch", targets: ["Twitch"])], + products: [ + .library(name: "Twitch", targets: ["Twitch"]), + .library(name: "TwitchWebsocketKit", targets: ["TwitchWebsocketKit"]), + ], dependencies: [ .package( url: "https://github.com/WeTransfer/Mocker.git", .upToNextMajor(from: "3.0.2")), @@ -15,11 +18,14 @@ let package = Package( targets: [ .target( name: "Twitch", + dependencies: ["TwitchIRC"]), + .target( + name: "TwitchWebsocketKit", dependencies: [ "TwitchIRC", + "Twitch", .product( - name: "WebSocketKit", package: "websocket-kit", - condition: .when(platforms: [.linux])), + name: "WebSocketKit", package: "websocket-kit"), ]), .testTarget( name: "TwitchTests", dependencies: ["Twitch", "Mocker"], diff --git a/Sources/Twitch/IRC/Errors/WebSocketError.swift b/Sources/Twitch/IRC/Errors/WebSocketError.swift index 151070d..8b3e366 100644 --- a/Sources/Twitch/IRC/Errors/WebSocketError.swift +++ b/Sources/Twitch/IRC/Errors/WebSocketError.swift @@ -1,4 +1,5 @@ -internal enum WebSocketError: Error { +package enum WebSocketError: Error { case alreadyConnected case unsupportedDataReceived + case connectionTimeout } diff --git a/Sources/Twitch/IRC/IRCConnection.swift b/Sources/Twitch/IRC/IRCConnection.swift index 9d218ba..b44f760 100644 --- a/Sources/Twitch/IRC/IRCConnection.swift +++ b/Sources/Twitch/IRC/IRCConnection.swift @@ -5,31 +5,33 @@ import TwitchIRC import FoundationNetworking #endif -internal actor IRCConnection { +public actor IRCConnection { private let TMI: URL = URL(string: "wss://irc-ws.chat.twitch.tv:443")! private let credentials: TwitchCredentials? - private let urlSession: URLSession + private let websocketProvider: WebsocketProvider - private var websocket: URLSessionWebSocketTask? - private(set) var joinedChannels: Set = [] + private var websocket: WebsocketProvider.Task? + private var _joinedChannels: Set = [] + public var joinedChannels: Set { _joinedChannels } - init(credentials: TwitchCredentials? = nil, urlSession: URLSession) { + public init(credentials: TwitchCredentials? = nil, websocketProvider: WebsocketProvider) + { self.credentials = credentials - self.urlSession = urlSession + self.websocketProvider = websocketProvider } deinit { - self.websocket?.cancel(with: .goingAway, reason: nil) + self.websocket?.cancel(with: .goingAway) } @discardableResult - internal func connect() async throws -> AsyncThrowingStream< + public func connect() async throws -> AsyncThrowingStream< IncomingMessage, Error > { guard self.websocket == nil else { throw WebSocketError.alreadyConnected } - self.websocket = urlSession.webSocketTask(with: TMI) - self.websocket?.resume() + self.websocket = websocketProvider.task(with: TMI) + try self.websocket?.resume() try await self.requestCapabilities() let globalUserState = try await self.authenticate() @@ -41,16 +43,14 @@ internal actor IRCConnection { if let globalUserState { continuation.yield(.globalUserState(globalUserState)) } do { - while let message = try await self.websocket?.receive() { - if case .string(let messageText) = message { - let messages = IncomingMessage.parse(ircOutput: messageText) - .compactMap(\.message) + while let messageText = try await self.websocket?.receive() { + let messages = IncomingMessage.parse(ircOutput: messageText) + .compactMap(\.message) - for message in messages { - guard try await !self.handleMessage(message) else { continue } + for message in messages { + guard try await !self.handleMessage(message) else { continue } - continuation.yield(message) - } + continuation.yield(message) } } } catch { @@ -66,38 +66,37 @@ internal actor IRCConnection { continuation.finish(throwing: error) } - self.disconnect() + try? await self.disconnect() } } return stream } - internal func send(_ message: OutgoingMessage) async throws { - try await self.websocket?.send(.string(message.serialize())) + public func send(_ message: OutgoingMessage) async throws { + try await self.websocket?.send(message.serialize()) } - internal func join(to channel: String) async throws { + public func join(to channel: String) async throws { try await self.send(.join(to: channel)) } - internal func part(from channel: String) async throws { + public func part(from channel: String) async throws { try await self.send(.part(from: channel)) } - internal func disconnect() { - self.websocket?.cancel(with: .goingAway, reason: nil) + public func disconnect() async throws { + self.websocket?.cancel(with: .goingAway) self.websocket = nil - self.joinedChannels.removeAll() + self._joinedChannels.removeAll() } private func requestCapabilities() async throws { try await self.send(.capabilities([.commands, .tags])) // verify that we receive the capabilities message - let nextMessage = try await websocket?.receive() - guard case .string(let messageText) = nextMessage else { + guard let messageText = try await websocket?.receive() else { throw WebSocketError.unsupportedDataReceived } @@ -120,8 +119,7 @@ internal actor IRCConnection { try await self.send(.nick(name: credentials?.userLogin ?? "justinfan12345")) // verify that we receive the connection message - let nextMessage = try await self.websocket?.receive() - guard case .string(let messageText) = nextMessage else { + guard let messageText = try await self.websocket?.receive() else { throw WebSocketError.unsupportedDataReceived } @@ -155,9 +153,9 @@ internal actor IRCConnection { try await self.send(.pong) return true case .join(let join): - joinedChannels.insert(join.channel) + _joinedChannels.insert(join.channel) case .part(let part): - joinedChannels.remove(part.channel) + _joinedChannels.remove(part.channel) default: break } diff --git a/Sources/Twitch/IRC/IRCConnectionPool.swift b/Sources/Twitch/IRC/IRCConnectionPool.swift index 36f9da3..e60f6f3 100644 --- a/Sources/Twitch/IRC/IRCConnectionPool.swift +++ b/Sources/Twitch/IRC/IRCConnectionPool.swift @@ -5,17 +5,19 @@ import TwitchIRC import FoundationNetworking #endif -public actor IRCConnectionPool { - private var connections: [IRCConnection] = [] +public actor IRCConnectionPool { + typealias Connection = IRCConnection + + private var connections: [Connection] = [] private let credentials: TwitchCredentials? - private let urlSession: URLSession + private let websocketProvider: WebsocketProvider private var continuation: AsyncThrowingStream.Continuation? - init(with credentials: TwitchCredentials? = nil, urlSession: URLSession) { + init(with credentials: TwitchCredentials? = nil, websocketProvider: WebsocketProvider) { self.credentials = credentials - self.urlSession = urlSession + self.websocketProvider = websocketProvider } internal func connect() async throws -> AsyncThrowingStream { @@ -30,7 +32,7 @@ public actor IRCConnectionPool { } internal func disconnect() async { - for connection in self.connections { await connection.disconnect() } + for connection in self.connections { try? await connection.disconnect() } self.connections.removeAll() self.continuation?.finish() @@ -52,7 +54,7 @@ public actor IRCConnectionPool { } guard await connection.joinedChannels.count >= 2 else { - await connection.disconnect() + try? await connection.disconnect() self.connections.removeAll(where: { $0 === connection }) return } @@ -60,7 +62,7 @@ public actor IRCConnectionPool { try await connection.part(from: channel) } - private func getConnection(to channel: String) async -> IRCConnection? { + private func getConnection(to channel: String) async -> Connection? { for connection in self.connections where await connection.joinedChannels.contains(channel) { return connection @@ -69,7 +71,7 @@ public actor IRCConnectionPool { return nil } - private func getFreeConnection() async throws -> IRCConnection { + private func getFreeConnection() async throws -> Connection { for connection in self.connections where await connection.joinedChannels.count < 90 { return connection @@ -78,8 +80,9 @@ public actor IRCConnectionPool { return try await self.createConnection() } - @discardableResult private func createConnection() async throws -> IRCConnection { - let connection = IRCConnection(credentials: credentials, urlSession: urlSession) + @discardableResult private func createConnection() async throws -> Connection { + let connection = Connection( + credentials: credentials, websocketProvider: websocketProvider) let messageStream = try await connection.connect() Task { diff --git a/Sources/Twitch/IRC/TwitchClient+IRC.swift b/Sources/Twitch/IRC/TwitchClient+IRC.swift index 387cb4b..b549d28 100644 --- a/Sources/Twitch/IRC/TwitchClient+IRC.swift +++ b/Sources/Twitch/IRC/TwitchClient+IRC.swift @@ -1,18 +1,29 @@ import Foundation import TwitchIRC +#if canImport(FoundationNetworking) + import FoundationNetworking +#endif + #if canImport(Combine) import Combine #endif extension TwitchClient { - public func ircClient( - with options: TwitchIRCClient.Options = .init() - ) async throws -> TwitchIRCClient { + public func ircClient( + with options: TwitchIRCClient.Options = .init(), + websocketProvider: WebsocketProvider + ) async throws -> TwitchIRCClient { return try await TwitchIRCClient( .authenticated(self.authentication), options: options, - urlSession: self.urlSession + websocketProvider: websocketProvider ) } + + public func ircClient( + with options: TwitchIRCClient.Options = .init() + ) async throws -> TwitchIRCClient { + return try await ircClient(with: options, websocketProvider: self.urlSession) + } } diff --git a/Sources/Twitch/IRC/TwitchIRCClient.swift b/Sources/Twitch/IRC/TwitchIRCClient.swift index a7128d3..4e66451 100644 --- a/Sources/Twitch/IRC/TwitchIRCClient.swift +++ b/Sources/Twitch/IRC/TwitchIRCClient.swift @@ -5,7 +5,7 @@ import TwitchIRC import FoundationNetworking #endif -public actor TwitchIRCClient { +public actor TwitchIRCClient { public enum AuthenticationStyle { case anonymous case authenticated(_ credentials: TwitchCredentials) @@ -19,14 +19,14 @@ public actor TwitchIRCClient { } } - private let writeConnection: IRCConnection? - private let readConnectionPool: IRCConnectionPool + private let writeConnection: IRCConnection? + private let readConnectionPool: IRCConnectionPool private var handlers = [IRCMessageHandler]() public init( _ authenticationStyle: AuthenticationStyle, options: Options = .init(), - urlSession: URLSession = URLSession(configuration: .default) + websocketProvider: WebsocketProvider ) async throws { let credentials: TwitchCredentials? = switch authenticationStyle { @@ -36,17 +36,13 @@ public actor TwitchIRCClient { if options.enableWriteConnection { self.writeConnection = IRCConnection( - credentials: credentials, - urlSession: urlSession - ) + credentials: credentials, websocketProvider: websocketProvider) } else { self.writeConnection = nil } self.readConnectionPool = IRCConnectionPool( - with: credentials, - urlSession: urlSession - ) + with: credentials, websocketProvider: websocketProvider) try await writeConnection?.connect() let messageStream = try await readConnectionPool.connect() @@ -115,6 +111,17 @@ public actor TwitchIRCClient { } } +extension TwitchIRCClient where WebsocketProvider == URLSession { + public init( + _ authenticationStyle: AuthenticationStyle, + options: Options = .init(), + urlSession: URLSession = URLSession(configuration: .default) + ) async throws { + try await self.init( + authenticationStyle, options: options, websocketProvider: urlSession) + } +} + #if canImport(Combine) import Combine diff --git a/Sources/Twitch/IRC/Websocket/URLSessionWebSocketTask+WebsocketTask.swift b/Sources/Twitch/IRC/Websocket/URLSessionWebSocketTask+WebsocketTask.swift new file mode 100644 index 0000000..3808492 --- /dev/null +++ b/Sources/Twitch/IRC/Websocket/URLSessionWebSocketTask+WebsocketTask.swift @@ -0,0 +1,41 @@ +import Foundation + +#if canImport(FoundationNetworking) + import FoundationNetworking +#endif + +extension WebsocketTaskCloseCode { + var urlSessionWebsocketCloseCode: URLSessionWebSocketTask.CloseCode { + switch self { + case .goingAway: .goingAway + case .serverError: .internalServerError + } + } +} + +extension URLSessionWebSocketTask: WebsocketTask { + public func receive() async throws -> String? { + let message: Message = try await self.receive() + guard case .string(let text) = message else { + return nil + } + + return text + } + + public func send(_ text: String) async throws { + try await self.send(.string(text)) + } + + public func cancel(with code: WebsocketTaskCloseCode) { + self.cancel(with: code.urlSessionWebsocketCloseCode, reason: nil) + } +} + +extension URLSession: WebsocketTaskProvider { + public typealias Task = URLSessionWebSocketTask + + public func task(with url: URL) -> Task { + self.webSocketTask(with: url) + } +} diff --git a/Sources/Twitch/IRC/Websocket/WebsocketTask.swift b/Sources/Twitch/IRC/Websocket/WebsocketTask.swift new file mode 100644 index 0000000..32b33a2 --- /dev/null +++ b/Sources/Twitch/IRC/Websocket/WebsocketTask.swift @@ -0,0 +1,18 @@ +import Foundation + +public enum WebsocketTaskCloseCode { + case goingAway + case serverError +} + +public protocol WebsocketTask { + func resume() throws + func receive() async throws -> String? + func send(_ text: String) async throws + func cancel(with code: WebsocketTaskCloseCode) +} + +public protocol WebsocketTaskProvider { + associatedtype Task: WebsocketTask + func task(with: URL) -> Task +} diff --git a/Sources/Twitch/Shared/Extensions/URLSessionWebSocketTask+receive.swift b/Sources/Twitch/Shared/Extensions/URLSessionWebSocketTask+receive.swift index 9cb741c..f2906b3 100644 --- a/Sources/Twitch/Shared/Extensions/URLSessionWebSocketTask+receive.swift +++ b/Sources/Twitch/Shared/Extensions/URLSessionWebSocketTask+receive.swift @@ -5,7 +5,7 @@ func receive(completionHandler: @escaping (Result) -> Void) { Task { do { - let message = try await self.receive() + let message: Message = try await self.receive() completionHandler(.success(message)) } catch { completionHandler(.failure(error)) diff --git a/Sources/Twitch/TwitchClient.swift b/Sources/Twitch/TwitchClient.swift index fd20125..fe3c12b 100644 --- a/Sources/Twitch/TwitchClient.swift +++ b/Sources/Twitch/TwitchClient.swift @@ -5,7 +5,7 @@ import Foundation #endif public actor TwitchClient { - internal let authentication: TwitchCredentials + package let authentication: TwitchCredentials internal let urlSession: URLSession internal let encoder = JSONEncoder() diff --git a/Sources/TwitchWebsocketKit/WebsocketKitTask.swift b/Sources/TwitchWebsocketKit/WebsocketKitTask.swift new file mode 100644 index 0000000..67d6cc4 --- /dev/null +++ b/Sources/TwitchWebsocketKit/WebsocketKitTask.swift @@ -0,0 +1,107 @@ +import Foundation +import NIO +import NIOWebSocket +import Twitch +import WebSocketKit + +public final class WebsocketKitTask: WebsocketTask { + private static let connectionTimeout: TimeInterval = 30 + private static let connectionCheckSleep: TimeInterval = 0.1 + + private let url: URL + private let eventLoopGroup: EventLoopGroup + + private var websocket: WebSocket? + private var stream: AsyncThrowingStream? + private var iterator: AsyncThrowingStream.Iterator? + private var continuation: AsyncThrowingStream.Continuation? + + private var connected: Bool = false + + init(url: URL, eventLoopGroup: EventLoopGroup) { + self.url = url + self.eventLoopGroup = eventLoopGroup + } + + public func resume() throws { + guard websocket == nil else { + throw WebSocketError.alreadyConnected + } + + let (stream, continuation) = AsyncThrowingStream.makeStream() + self.stream = stream + self.iterator = stream.makeAsyncIterator() + self.continuation = continuation + + try WebSocket.connect(to: url, on: eventLoopGroup) { [weak self] socket in + guard let self else { + _ = socket.close(code: .goingAway) + return + } + self.websocket = socket + + _ = socket.onClose.map { + self.continuation?.finish() + } + + socket.onPing { _, _ in + self.continuation?.yield("PING :") + } + + socket.onPong { _, _ in + // Skip first pong coming from our connection ping + guard self.connected else { + self.connected = true + return + } + self.continuation?.yield("PONG :") + } + + socket.onText { _, text in + self.continuation?.yield(text) + } + + // Kickoff the connectiong with a ping + socket.sendPing() + }.wait() + } + + public func receive() async throws -> String? { + try await ensureConnection() + return try await iterator?.next() + } + + public func send(_ text: String) async throws { + try await ensureConnection() + try await websocket?.send(text) + } + + public func cancel(with code: WebsocketTaskCloseCode) { + _ = websocket?.close(code: code.websoketKitCloseCode) + } + + private func ensureConnection() async throws { + if websocket != nil, connected { + return + } + + let start = Date.now + while websocket == nil, !connected { + try await Task.sleep(for: .seconds(Self.connectionCheckSleep)) + + let time = Date.now.timeIntervalSince(start) + if time > Self.connectionTimeout { + throw WebSocketError.connectionTimeout + } + } + } +} + +extension Twitch.WebsocketTaskCloseCode { + var websoketKitCloseCode: WebSocketErrorCode { + switch self { + case .goingAway: .goingAway + case .serverError: .unexpectedServerError + } + } +} diff --git a/Sources/TwitchWebsocketKit/WebsocketKitTaskProvider.swift b/Sources/TwitchWebsocketKit/WebsocketKitTaskProvider.swift new file mode 100644 index 0000000..5b81025 --- /dev/null +++ b/Sources/TwitchWebsocketKit/WebsocketKitTaskProvider.swift @@ -0,0 +1,37 @@ +import Foundation +import NIO +import Twitch +import TwitchIRC + +public struct WebsocketKitTaskProvider: WebsocketTaskProvider { + public typealias Task = WebsocketKitTask + + internal static let shared: WebsocketKitTaskProvider = .init() + private init() {} + + public func task(with: URL) -> Task { + WebsocketKitTask( + url: with, eventLoopGroup: MultiThreadedEventLoopGroup(numberOfThreads: 1)) + } +} + +extension TwitchIRCClient where WebsocketProvider == WebsocketKitTaskProvider { + public init( + _ authenticationStyle: AuthenticationStyle, + options: Options = .init() + ) async throws { + try await self.init(authenticationStyle, options: options, websocketProvider: .shared) + } +} + +extension TwitchClient { + public func ircClient( + with options: TwitchIRCClient.Options = .init() + ) async throws -> TwitchIRCClient { + return try await TwitchIRCClient( + .authenticated(self.authentication), + options: options, + websocketProvider: .shared + ) + } +}