Skip to content

Commit

Permalink
[WIP] Implement Distributed Tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
slashmo committed Dec 30, 2024
1 parent f77cc00 commit 5e7ddf1
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 34 deletions.
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.13.0"),
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.19.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"),
.package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-service-context.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
.package(url: "https://github.com/apple/swift-algorithms.git", from: "1.0.0"),
],
Expand All @@ -53,6 +55,8 @@ let package = Package(
.product(name: "NIOSOCKS", package: "swift-nio-extras"),
.product(name: "NIOTransportServices", package: "swift-nio-transport-services"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Tracing", package: "swift-distributed-tracing"),
.product(name: "ServiceContextModule", package: "swift-service-context"),
.product(name: "Atomics", package: "swift-atomics"),
.product(name: "Algorithms", package: "swift-algorithms"),
]
Expand Down
112 changes: 78 additions & 34 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import Logging
import NIOCore
import NIOHTTP1
import ServiceContextModule
import Tracing

import struct Foundation.URL

Expand All @@ -30,18 +32,20 @@ extension HTTPClient {
public func execute(
_ request: HTTPClientRequest,
deadline: NIODeadline,
logger: Logger? = nil
logger: Logger? = nil,
context: ServiceContext? = nil
) async throws -> HTTPClientResponse {
try await self.executeAndFollowRedirectsIfNeeded(
request,
deadline: deadline,
logger: logger ?? Self.loggingDisabled,
context: context ?? .current ?? .topLevel,
redirectState: RedirectState(self.configuration.redirectConfiguration.mode, initialURL: request.url)
)
}
}

// MARK: Connivence methods
// MARK: Convenience methods

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClient {
Expand All @@ -55,12 +59,14 @@ extension HTTPClient {
public func execute(
_ request: HTTPClientRequest,
timeout: TimeAmount,
logger: Logger? = nil
logger: Logger? = nil,
context: ServiceContext? = nil
) async throws -> HTTPClientResponse {
try await self.execute(
request,
deadline: .now() + timeout,
logger: logger
logger: logger,
context: context
)
}
}
Expand All @@ -71,15 +77,23 @@ extension HTTPClient {
_ request: HTTPClientRequest,
deadline: NIODeadline,
logger: Logger,
context: ServiceContext,
redirectState: RedirectState?
) async throws -> HTTPClientResponse {
var currentRequest = request
var currentRedirectState = redirectState
var resendCount = 0

// this loop is there to follow potential redirects
while true {
let preparedRequest = try HTTPClientRequest.Prepared(currentRequest, dnsOverride: configuration.dnsOverride)
let response = try await self.executeCancellable(preparedRequest, deadline: deadline, logger: logger)
let response = try await self.executeCancellable(
preparedRequest,
deadline: deadline,
logger: logger,
context: context,
resendCount: resendCount > 0 ? resendCount : nil
)

guard var redirectState = currentRedirectState else {
// a `nil` redirectState means we should not follow redirects
Expand Down Expand Up @@ -112,46 +126,76 @@ extension HTTPClient {
return response
}

resendCount += 1
currentRequest = newRequest
}
}

private func executeCancellable(
_ request: HTTPClientRequest.Prepared,
deadline: NIODeadline,
logger: Logger
logger: Logger,
context: ServiceContext,
resendCount: Int?
) async throws -> HTTPClientResponse {
let cancelHandler = TransactionCancelHandler()

return try await withTaskCancellationHandler(
operation: { () async throws -> HTTPClientResponse in
let eventLoop = self.eventLoopGroup.any()
let deadlineTask = eventLoop.scheduleTask(deadline: deadline) {
cancelHandler.cancel(reason: .deadlineExceeded)
}
defer {
deadlineTask.cancel()
}
return try await withCheckedThrowingContinuation {
(continuation: CheckedContinuation<HTTPClientResponse, Swift.Error>) -> Void in
let transaction = Transaction(
request: request,
requestOptions: .fromClientConfiguration(self.configuration),
logger: logger,
connectionDeadline: .now() + (self.configuration.timeout.connectionCreationTimeout),
preferredEventLoop: eventLoop,
responseContinuation: continuation
)

cancelHandler.registerTransaction(transaction)

self.poolManager.executeRequest(transaction)
}
},
onCancel: {
cancelHandler.cancel(reason: .taskCanceled)
return try await withSpan(request.head.method.rawValue, context: context, ofKind: .client) { span in
var request = request
request.head.headers.propagate(span.context)
span.updateAttributes { attributes in
attributes["http.request.method"] = request.head.method.rawValue
attributes["server.address"] = request.poolKey.connectionTarget.host
attributes["server.port"] = request.poolKey.connectionTarget.port
attributes["url.full"] = request.url.absoluteString
attributes["http.request.resend_count"] = resendCount
}
)

do {
let response = try await withTaskCancellationHandler(
operation: { () async throws -> HTTPClientResponse in
let eventLoop = self.eventLoopGroup.any()
let deadlineTask = eventLoop.scheduleTask(deadline: deadline) {
cancelHandler.cancel(reason: .deadlineExceeded)
}
defer {
deadlineTask.cancel()
}
let response = try await withCheckedThrowingContinuation {
(continuation: CheckedContinuation<HTTPClientResponse, Swift.Error>) -> Void in
let transaction = Transaction(
request: request,
requestOptions: .fromClientConfiguration(self.configuration),
logger: logger,
connectionDeadline: .now() + (self.configuration.timeout.connectionCreationTimeout),
preferredEventLoop: eventLoop,
responseContinuation: continuation
)

cancelHandler.registerTransaction(transaction)

self.poolManager.executeRequest(transaction)
}
if response.status.code >= 400 {
span.setStatus(.init(code: .error))
span.attributes["error.type"] = "\(response.status.code)"
}
span.attributes["http.response.status_code"] = "\(response.status.code)"
return response
},
onCancel: {
span.setStatus(.init(code: .error))
span.attributes["error.type"] = "\(CancellationError.self)"
cancelHandler.cancel(reason: .taskCanceled)
}
)
return response
} catch {
span.setStatus(.init(code: .error))
span.attributes["error.type"] = "\(type(of: error))"
throw error
}
}
}
}

Expand Down
32 changes: 32 additions & 0 deletions Sources/AsyncHTTPClient/ServiceContextPropagation.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2024 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Instrumentation
import NIOHTTP1
import ServiceContextModule

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPHeaders {
mutating func propagate(_ context: ServiceContext) {
InstrumentationSystem.instrument.inject(context, into: &self, using: Self.injector)
}

private static let injector = HTTPHeadersInjector()
}

private struct HTTPHeadersInjector: Injector {
func inject(_ value: String, forKey name: String, into headers: inout HTTPHeaders) {
headers.add(name: name, value: value)
}
}

0 comments on commit 5e7ddf1

Please sign in to comment.