Skip to content

Commit

Permalink
Add "debug initializer" hook for channels
Browse files Browse the repository at this point in the history
Motivation:

As requested in #596, it can be handy to have a lower-level access to channels (HTTP/1 connection, HTTP/2 connection, or HTTP/2 stream) to enable a more fine-grained interaction for, say, observability, testing, etc.

Modifications:

- Add 3 new properties (`http1_1ConnectionDebugInitializer`, `http2ConnectionDebugInitializer` and `http2StreamChannelDebugInitializer`) to `HTTPClient.Configuration` with access to the respective channels. These properties are of `Optional` type `@Sendable (Channel) -> EventLoopFuture<Void>` and are called when creating a connection/stream.

Result:

Provides APIs for a lower-level access to channels.
  • Loading branch information
clintonpi committed Jan 21, 2025
1 parent e69318d commit 9e26b5e
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 21 deletions.
30 changes: 24 additions & 6 deletions Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,21 @@ final class HTTP2Connection {
return connection._start0().map { maxStreams in (connection, maxStreams) }
}

func executeRequest(_ request: HTTPExecutableRequest) {
func executeRequest(
_ request: HTTPExecutableRequest,
streamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
) {
if self.channel.eventLoop.inEventLoop {
self.executeRequest0(request)
self.executeRequest0(
request,
streamChannelDebugInitializer: streamChannelDebugInitializer
)
} else {
self.channel.eventLoop.execute {
self.executeRequest0(request)
self.executeRequest0(
request,
streamChannelDebugInitializer: streamChannelDebugInitializer
)
}
}
}
Expand Down Expand Up @@ -218,7 +227,10 @@ final class HTTP2Connection {
return readyToAcceptConnectionsPromise.futureResult
}

private func executeRequest0(_ request: HTTPExecutableRequest) {
private func executeRequest0(
_ request: HTTPExecutableRequest,
streamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
) {
self.channel.eventLoop.assertInEventLoop()

switch self.state {
Expand Down Expand Up @@ -259,8 +271,14 @@ final class HTTP2Connection {
self.openStreams.remove(box)
}

channel.write(request, promise: nil)
return channel.eventLoop.makeSucceededVoidFuture()
if let streamChannelDebugInitializer {
return streamChannelDebugInitializer(channel).map { _ in
channel.write(request, promise: nil)
}
} else {
channel.write(request, promise: nil)
return channel.eventLoop.makeSucceededVoidFuture()
}
} catch {
return channel.eventLoop.makeFailedFuture(error)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,21 @@ extension HTTPConnectionPool.ConnectionFactory {
decompression: self.clientConfiguration.decompression,
logger: logger
)
requester.http1ConnectionCreated(connection)

if let debugInitializer
= self.clientConfiguration.http1_1ConnectionDebugInitializer
{
debugInitializer(channel).whenComplete { debugInitializerResult in
switch debugInitializerResult {
case .success:
requester.http1ConnectionCreated(connection)
case .failure(let error):
requester.failedToCreateHTTPConnection(connectionID, error: error)
}
}
} else {
requester.http1ConnectionCreated(connection)
}
} catch {
requester.failedToCreateHTTPConnection(connectionID, error: error)
}
Expand All @@ -99,7 +113,29 @@ extension HTTPConnectionPool.ConnectionFactory {
).whenComplete { result in
switch result {
case .success((let connection, let maximumStreams)):
requester.http2ConnectionCreated(connection, maximumStreams: maximumStreams)
if let debugInitializer
= self.clientConfiguration.http2ConnectionDebugInitializer
{
debugInitializer(channel).whenComplete { debugInitializerResult in
switch debugInitializerResult {
case .success:
requester.http2ConnectionCreated(
connection,
maximumStreams: maximumStreams
)
case .failure(let error):
requester.failedToCreateHTTPConnection(
connectionID,
error: error
)
}
}
} else {
requester.http2ConnectionCreated(
connection,
maximumStreams: maximumStreams
)
}
case .failure(let error):
requester.failedToCreateHTTPConnection(connectionID, error: error)
}
Expand Down
24 changes: 20 additions & 4 deletions Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,20 @@ final class HTTPConnectionPool:
private func runUnlockedRequestAction(_ action: Actions.RequestAction.Unlocked) {
switch action {
case .executeRequest(let request, let connection):
connection.executeRequest(request.req)
connection.executeRequest(
request.req,
http2StreamChannelDebugInitializer:
self.clientConfiguration.http2StreamChannelDebugInitializer
)

case .executeRequests(let requests, let connection):
for request in requests { connection.executeRequest(request.req) }
for request in requests {
connection.executeRequest(
request.req,
http2StreamChannelDebugInitializer:
self.clientConfiguration.http2StreamChannelDebugInitializer
)
}

case .failRequest(let request, let error):
request.req.fail(error)
Expand Down Expand Up @@ -651,12 +661,18 @@ extension HTTPConnectionPool {
}
}

fileprivate func executeRequest(_ request: HTTPExecutableRequest) {
fileprivate func executeRequest(
_ request: HTTPExecutableRequest,
http2StreamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
) {
switch self._ref {
case .http1_1(let connection):
return connection.executeRequest(request)
case .http2(let connection):
return connection.executeRequest(request)
return connection.executeRequest(
request,
streamChannelDebugInitializer: http2StreamChannelDebugInitializer
)
case .__testOnly_connection:
break
}
Expand Down
75 changes: 66 additions & 9 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -847,14 +847,32 @@ public class HTTPClient {
/// By default, don't use it
public var enableMultipath: Bool

/// A method with access to the HTTP/1 connection channel that is called when creating the connection.
public var http1_1ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)?

/// A method with access to the HTTP/2 connection channel that is called when creating the connection.
public var http2ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)?

/// A method with access to the HTTP/2 stream channel that is called when creating the stream.
public var http2StreamChannelDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)?

public init(
tlsConfiguration: TLSConfiguration? = nil,
redirectConfiguration: RedirectConfiguration? = nil,
timeout: Timeout = Timeout(),
connectionPool: ConnectionPool = ConnectionPool(),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled
decompression: Decompression = .disabled,
http1_1ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2StreamChannelDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
) {
self.tlsConfiguration = tlsConfiguration
self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration()
Expand All @@ -865,6 +883,9 @@ public class HTTPClient {
self.httpVersion = .automatic
self.networkFrameworkWaitForConnectivity = true
self.enableMultipath = false
self.http1_1ConnectionDebugInitializer = http1_1ConnectionDebugInitializer
self.http2ConnectionDebugInitializer = http2ConnectionDebugInitializer
self.http2StreamChannelDebugInitializer = http2StreamChannelDebugInitializer
}

public init(
Expand All @@ -873,7 +894,13 @@ public class HTTPClient {
timeout: Timeout = Timeout(),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled
decompression: Decompression = .disabled,
http1_1ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2StreamChannelDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
) {
self.init(
tlsConfiguration: tlsConfiguration,
Expand All @@ -882,7 +909,10 @@ public class HTTPClient {
connectionPool: ConnectionPool(),
proxy: proxy,
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
decompression: decompression
decompression: decompression,
http1_1ConnectionDebugInitializer: http1_1ConnectionDebugInitializer,
http2ConnectionDebugInitializer: http2ConnectionDebugInitializer,
http2StreamChannelDebugInitializer: http2StreamChannelDebugInitializer
)
}

Expand All @@ -893,7 +923,13 @@ public class HTTPClient {
maximumAllowedIdleTimeInConnectionPool: TimeAmount = .seconds(60),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled
decompression: Decompression = .disabled,
http1_1ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2StreamChannelDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
) {
var tlsConfig = TLSConfiguration.makeClientConfiguration()
tlsConfig.certificateVerification = certificateVerification
Expand All @@ -904,7 +940,10 @@ public class HTTPClient {
connectionPool: ConnectionPool(idleTimeout: maximumAllowedIdleTimeInConnectionPool),
proxy: proxy,
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
decompression: decompression
decompression: decompression,
http1_1ConnectionDebugInitializer: http1_1ConnectionDebugInitializer,
http2ConnectionDebugInitializer: http2ConnectionDebugInitializer,
http2StreamChannelDebugInitializer: http2StreamChannelDebugInitializer
)
}

Expand All @@ -916,7 +955,13 @@ public class HTTPClient {
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled,
backgroundActivityLogger: Logger?
backgroundActivityLogger: Logger?,
http1_1ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2StreamChannelDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
) {
var tlsConfig = TLSConfiguration.makeClientConfiguration()
tlsConfig.certificateVerification = certificateVerification
Expand All @@ -927,7 +972,10 @@ public class HTTPClient {
connectionPool: ConnectionPool(idleTimeout: connectionPool),
proxy: proxy,
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
decompression: decompression
decompression: decompression,
http1_1ConnectionDebugInitializer: http1_1ConnectionDebugInitializer,
http2ConnectionDebugInitializer: http2ConnectionDebugInitializer,
http2StreamChannelDebugInitializer: http2StreamChannelDebugInitializer
)
}

Expand All @@ -937,7 +985,13 @@ public class HTTPClient {
timeout: Timeout = Timeout(),
proxy: Proxy? = nil,
ignoreUncleanSSLShutdown: Bool = false,
decompression: Decompression = .disabled
decompression: Decompression = .disabled,
http1_1ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2ConnectionDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
http2StreamChannelDebugInitializer:
(@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
) {
self.init(
certificateVerification: certificateVerification,
Expand All @@ -946,7 +1000,10 @@ public class HTTPClient {
maximumAllowedIdleTimeInConnectionPool: .seconds(60),
proxy: proxy,
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
decompression: decompression
decompression: decompression,
http1_1ConnectionDebugInitializer: http1_1ConnectionDebugInitializer,
http2ConnectionDebugInitializer: http2ConnectionDebugInitializer,
http2StreamChannelDebugInitializer: http2StreamChannelDebugInitializer
)
}
}
Expand Down
85 changes: 85 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4306,4 +4306,89 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass {
request.setBasicAuth(username: "foo", password: "bar")
XCTAssertEqual(request.headers.first(name: "Authorization"), "Basic Zm9vOmJhcg==")
}

func testHTTP1ConnectionDebugInitializer() {
let connectionDebugInitializerUtil = DebugInitializerUtil()

var config = HTTPClient.Configuration()
config.tlsConfiguration = .clientDefault
config.tlsConfiguration?.certificateVerification = .none
config.httpVersion = .http1Only
config.http1_1ConnectionDebugInitializer = connectionDebugInitializerUtil.operation

let client = HTTPClient(
eventLoopGroupProvider: .singleton,
configuration: config,
backgroundActivityLogger: Logger(
label: "HTTPClient",
factory: StreamLogHandler.standardOutput(label:)
)
)
defer { XCTAssertNoThrow(client.shutdown()) }

let bin = HTTPBin(.http1_1(ssl: true, compress: false))
defer { XCTAssertNoThrow(try bin.shutdown()) }

for _ in 0..<3 {
XCTAssertNoThrow(try client.get(url: "https://localhost:\(bin.port)/get").wait())
}

// Even though multiple requests were made, the connection debug initializer must be called
// only once.
XCTAssertEqual(connectionDebugInitializerUtil.executionCount, 1)
}

func testHTTP2ConnectionAndStreamChannelDebugInitializers() {
let connectionDebugInitializerUtil = DebugInitializerUtil()
let streamChannelDebugInitializerUtil = DebugInitializerUtil()

var config = HTTPClient.Configuration()
config.tlsConfiguration = .clientDefault
config.tlsConfiguration?.certificateVerification = .none
config.httpVersion = .automatic
config.http2ConnectionDebugInitializer = connectionDebugInitializerUtil.operation
config.http2StreamChannelDebugInitializer = streamChannelDebugInitializerUtil.operation

let client = HTTPClient(
eventLoopGroupProvider: .singleton,
configuration: config,
backgroundActivityLogger: Logger(
label: "HTTPClient",
factory: StreamLogHandler.standardOutput(label:)
)
)
defer { XCTAssertNoThrow(client.shutdown()) }

let bin = HTTPBin(.http2(compress: false))
defer { XCTAssertNoThrow(try bin.shutdown()) }

let numberOfRequests = 3

for _ in 0..<numberOfRequests {
XCTAssertNoThrow(try client.get(url: "https://localhost:\(bin.port)/get").wait())
}

// Even though multiple requests were made, the connection debug initializer must be called
// only once.
XCTAssertEqual(connectionDebugInitializerUtil.executionCount, 1)

// The stream channel debug initializer must be called only as much as the number of
// requests made.
XCTAssertEqual(streamChannelDebugInitializerUtil.executionCount, numberOfRequests)
}
}

class DebugInitializerUtil {
var executionCount: Int

@Sendable
func operation(channel: Channel) -> EventLoopFuture<Void> {
self.executionCount += 1

return channel.eventLoop.makeSucceededVoidFuture()
}

init() {
self.executionCount = 0
}
}

0 comments on commit 9e26b5e

Please sign in to comment.