Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(IRC): add WebsocketKit support #17

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
12 changes: 9 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import PackageDescription
let package = Package(
name: "swift-twitch-client",
platforms: [.macOS(.v13), .iOS(.v16), .tvOS(.v16), .watchOS(.v9)],
products: [.library(name: "Twitch", targets: ["Twitch"])],
products: [
.library(name: "Twitch", targets: ["Twitch"]),
.library(name: "TwitchWebsocketKit", targets: ["TwitchWebsocketKit"]),
],
dependencies: [
.package(
url: "https://github.com/WeTransfer/Mocker.git", .upToNextMajor(from: "3.0.2")),
Expand All @@ -15,11 +18,14 @@ let package = Package(
targets: [
.target(
name: "Twitch",
dependencies: ["TwitchIRC"]),
.target(
name: "TwitchWebsocketKit",
dependencies: [
"TwitchIRC",
"Twitch",
.product(
name: "WebSocketKit", package: "websocket-kit",
condition: .when(platforms: [.linux])),
name: "WebSocketKit", package: "websocket-kit"),
]),
.testTarget(
name: "TwitchTests", dependencies: ["Twitch", "Mocker"],
Expand Down
3 changes: 2 additions & 1 deletion Sources/Twitch/IRC/Errors/WebSocketError.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
internal enum WebSocketError: Error {
package enum WebSocketError: Error {
case alreadyConnected
case unsupportedDataReceived
case connectionTimeout
}
62 changes: 30 additions & 32 deletions Sources/Twitch/IRC/IRCConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,33 @@ import TwitchIRC
import FoundationNetworking
#endif

internal actor IRCConnection {
public actor IRCConnection<WebsocketProvider: WebsocketTaskProvider> {
private let TMI: URL = URL(string: "wss://irc-ws.chat.twitch.tv:443")!

private let credentials: TwitchCredentials?
private let urlSession: URLSession
private let websocketProvider: WebsocketProvider

private var websocket: URLSessionWebSocketTask?
private(set) var joinedChannels: Set<String> = []
private var websocket: WebsocketProvider.Task?
private var _joinedChannels: Set<String> = []
public var joinedChannels: Set<String> { _joinedChannels }

init(credentials: TwitchCredentials? = nil, urlSession: URLSession) {
public init(credentials: TwitchCredentials? = nil, websocketProvider: WebsocketProvider)
{
self.credentials = credentials
self.urlSession = urlSession
self.websocketProvider = websocketProvider
}

deinit {
self.websocket?.cancel(with: .goingAway, reason: nil)
self.websocket?.cancel(with: .goingAway)
}

@discardableResult
internal func connect() async throws -> AsyncThrowingStream<
public func connect() async throws -> AsyncThrowingStream<
IncomingMessage, Error
> {
guard self.websocket == nil else { throw WebSocketError.alreadyConnected }
self.websocket = urlSession.webSocketTask(with: TMI)
self.websocket?.resume()
self.websocket = websocketProvider.task(with: TMI)
try self.websocket?.resume()

try await self.requestCapabilities()
let globalUserState = try await self.authenticate()
Expand All @@ -41,16 +43,14 @@ internal actor IRCConnection {
if let globalUserState { continuation.yield(.globalUserState(globalUserState)) }

do {
while let message = try await self.websocket?.receive() {
if case .string(let messageText) = message {
let messages = IncomingMessage.parse(ircOutput: messageText)
.compactMap(\.message)
while let messageText = try await self.websocket?.receive() {
let messages = IncomingMessage.parse(ircOutput: messageText)
.compactMap(\.message)

for message in messages {
guard try await !self.handleMessage(message) else { continue }
for message in messages {
guard try await !self.handleMessage(message) else { continue }

continuation.yield(message)
}
continuation.yield(message)
}
}
} catch {
Expand All @@ -66,38 +66,37 @@ internal actor IRCConnection {
continuation.finish(throwing: error)
}

self.disconnect()
try? await self.disconnect()
}
}

return stream
}

internal func send(_ message: OutgoingMessage) async throws {
try await self.websocket?.send(.string(message.serialize()))
public func send(_ message: OutgoingMessage) async throws {
try await self.websocket?.send(message.serialize())
}

internal func join(to channel: String) async throws {
public func join(to channel: String) async throws {
try await self.send(.join(to: channel))
}

internal func part(from channel: String) async throws {
public func part(from channel: String) async throws {
try await self.send(.part(from: channel))
}

internal func disconnect() {
self.websocket?.cancel(with: .goingAway, reason: nil)
public func disconnect() async throws {
self.websocket?.cancel(with: .goingAway)
self.websocket = nil

self.joinedChannels.removeAll()
self._joinedChannels.removeAll()
}

private func requestCapabilities() async throws {
try await self.send(.capabilities([.commands, .tags]))

// verify that we receive the capabilities message
let nextMessage = try await websocket?.receive()
guard case .string(let messageText) = nextMessage else {
guard let messageText = try await websocket?.receive() else {
throw WebSocketError.unsupportedDataReceived
}

Expand All @@ -120,8 +119,7 @@ internal actor IRCConnection {
try await self.send(.nick(name: credentials?.userLogin ?? "justinfan12345"))

// verify that we receive the connection message
let nextMessage = try await self.websocket?.receive()
guard case .string(let messageText) = nextMessage else {
guard let messageText = try await self.websocket?.receive() else {
throw WebSocketError.unsupportedDataReceived
}

Expand Down Expand Up @@ -155,9 +153,9 @@ internal actor IRCConnection {
try await self.send(.pong)
return true
case .join(let join):
joinedChannels.insert(join.channel)
_joinedChannels.insert(join.channel)
case .part(let part):
joinedChannels.remove(part.channel)
_joinedChannels.remove(part.channel)
default: break
}

Expand Down
25 changes: 14 additions & 11 deletions Sources/Twitch/IRC/IRCConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import TwitchIRC
import FoundationNetworking
#endif

public actor IRCConnectionPool {
private var connections: [IRCConnection] = []
public actor IRCConnectionPool<WebsocketProvider: WebsocketTaskProvider> {
typealias Connection = IRCConnection<WebsocketProvider>

private var connections: [Connection] = []

private let credentials: TwitchCredentials?
private let urlSession: URLSession
private let websocketProvider: WebsocketProvider

private var continuation: AsyncThrowingStream<IncomingMessage, Error>.Continuation?

init(with credentials: TwitchCredentials? = nil, urlSession: URLSession) {
init(with credentials: TwitchCredentials? = nil, websocketProvider: WebsocketProvider) {
self.credentials = credentials
self.urlSession = urlSession
self.websocketProvider = websocketProvider
}

internal func connect() async throws -> AsyncThrowingStream<IncomingMessage, Error> {
Expand All @@ -30,7 +32,7 @@ public actor IRCConnectionPool {
}

internal func disconnect() async {
for connection in self.connections { await connection.disconnect() }
for connection in self.connections { try? await connection.disconnect() }
self.connections.removeAll()

self.continuation?.finish()
Expand All @@ -52,15 +54,15 @@ public actor IRCConnectionPool {
}

guard await connection.joinedChannels.count >= 2 else {
await connection.disconnect()
try? await connection.disconnect()
self.connections.removeAll(where: { $0 === connection })
return
}

try await connection.part(from: channel)
}

private func getConnection(to channel: String) async -> IRCConnection? {
private func getConnection(to channel: String) async -> Connection? {
for connection in self.connections
where await connection.joinedChannels.contains(channel) {
return connection
Expand All @@ -69,7 +71,7 @@ public actor IRCConnectionPool {
return nil
}

private func getFreeConnection() async throws -> IRCConnection {
private func getFreeConnection() async throws -> Connection {
for connection in self.connections
where await connection.joinedChannels.count < 90 {
return connection
Expand All @@ -78,8 +80,9 @@ public actor IRCConnectionPool {
return try await self.createConnection()
}

@discardableResult private func createConnection() async throws -> IRCConnection {
let connection = IRCConnection(credentials: credentials, urlSession: urlSession)
@discardableResult private func createConnection() async throws -> Connection {
let connection = Connection(
credentials: credentials, websocketProvider: websocketProvider)
let messageStream = try await connection.connect()

Task {
Expand Down
19 changes: 15 additions & 4 deletions Sources/Twitch/IRC/TwitchClient+IRC.swift
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
import Foundation
import TwitchIRC

#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

#if canImport(Combine)
import Combine
#endif

extension TwitchClient {
public func ircClient(
with options: TwitchIRCClient.Options = .init()
) async throws -> TwitchIRCClient {
public func ircClient<WebsocketProvider: WebsocketTaskProvider>(
with options: TwitchIRCClient<WebsocketProvider>.Options = .init(),
websocketProvider: WebsocketProvider
) async throws -> TwitchIRCClient<WebsocketProvider> {
return try await TwitchIRCClient(
.authenticated(self.authentication),
options: options,
urlSession: self.urlSession
websocketProvider: websocketProvider
)
}

public func ircClient(
with options: TwitchIRCClient<URLSession>.Options = .init()
) async throws -> TwitchIRCClient<URLSession> {
return try await ircClient(with: options, websocketProvider: self.urlSession)
}
}
27 changes: 17 additions & 10 deletions Sources/Twitch/IRC/TwitchIRCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import TwitchIRC
import FoundationNetworking
#endif

public actor TwitchIRCClient {
public actor TwitchIRCClient<WebsocketProvider: WebsocketTaskProvider> {
public enum AuthenticationStyle {
case anonymous
case authenticated(_ credentials: TwitchCredentials)
Expand All @@ -19,14 +19,14 @@ public actor TwitchIRCClient {
}
}

private let writeConnection: IRCConnection?
private let readConnectionPool: IRCConnectionPool
private let writeConnection: IRCConnection<WebsocketProvider>?
private let readConnectionPool: IRCConnectionPool<WebsocketProvider>
private var handlers = [IRCMessageHandler]()

public init(
_ authenticationStyle: AuthenticationStyle,
options: Options = .init(),
urlSession: URLSession = URLSession(configuration: .default)
websocketProvider: WebsocketProvider
) async throws {
let credentials: TwitchCredentials? =
switch authenticationStyle {
Expand All @@ -36,17 +36,13 @@ public actor TwitchIRCClient {

if options.enableWriteConnection {
self.writeConnection = IRCConnection(
credentials: credentials,
urlSession: urlSession
)
credentials: credentials, websocketProvider: websocketProvider)
} else {
self.writeConnection = nil
}

self.readConnectionPool = IRCConnectionPool(
with: credentials,
urlSession: urlSession
)
with: credentials, websocketProvider: websocketProvider)

try await writeConnection?.connect()
let messageStream = try await readConnectionPool.connect()
Expand Down Expand Up @@ -115,6 +111,17 @@ public actor TwitchIRCClient {
}
}

extension TwitchIRCClient where WebsocketProvider == URLSession {
public init(
_ authenticationStyle: AuthenticationStyle,
options: Options = .init(),
urlSession: URLSession = URLSession(configuration: .default)
) async throws {
try await self.init(
authenticationStyle, options: options, websocketProvider: urlSession)
}
}

#if canImport(Combine)
import Combine

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Foundation

#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

extension WebsocketTaskCloseCode {
var urlSessionWebsocketCloseCode: URLSessionWebSocketTask.CloseCode {
switch self {
case .goingAway: .goingAway
case .serverError: .internalServerError
}
}
}

extension URLSessionWebSocketTask: WebsocketTask {
public func receive() async throws -> String? {
let message: Message = try await self.receive()
guard case .string(let text) = message else {
return nil
}

return text
}

public func send(_ text: String) async throws {
try await self.send(.string(text))
}

public func cancel(with code: WebsocketTaskCloseCode) {
self.cancel(with: code.urlSessionWebsocketCloseCode, reason: nil)
}
}

extension URLSession: WebsocketTaskProvider {
public typealias Task = URLSessionWebSocketTask

public func task(with url: URL) -> Task {
self.webSocketTask(with: url)
}
}
Loading
Loading