From 5e7ddf19810e26f9d23b9f448bfde0c2d0a6a6fd Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Mon, 30 Dec 2024 22:31:19 +0100 Subject: [PATCH] [WIP] Implement Distributed Tracing --- Package.swift | 4 + .../AsyncAwait/HTTPClient+execute.swift | 112 ++++++++++++------ .../ServiceContextPropagation.swift | 32 +++++ 3 files changed, 114 insertions(+), 34 deletions(-) create mode 100644 Sources/AsyncHTTPClient/ServiceContextPropagation.swift diff --git a/Package.swift b/Package.swift index e4cccb6de..2d2c2b54d 100644 --- a/Package.swift +++ b/Package.swift @@ -27,6 +27,8 @@ let package = Package( .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.13.0"), .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.19.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"), + .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-service-context.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"), .package(url: "https://github.com/apple/swift-algorithms.git", from: "1.0.0"), ], @@ -53,6 +55,8 @@ let package = Package( .product(name: "NIOSOCKS", package: "swift-nio-extras"), .product(name: "NIOTransportServices", package: "swift-nio-transport-services"), .product(name: "Logging", package: "swift-log"), + .product(name: "Tracing", package: "swift-distributed-tracing"), + .product(name: "ServiceContextModule", package: "swift-service-context"), .product(name: "Atomics", package: "swift-atomics"), .product(name: "Algorithms", package: "swift-algorithms"), ] diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift index fc1dbc209..1eebb3ee5 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift @@ -15,6 +15,8 @@ import Logging import NIOCore import NIOHTTP1 +import ServiceContextModule +import Tracing import struct Foundation.URL @@ -30,18 +32,20 @@ extension HTTPClient { public func execute( _ request: HTTPClientRequest, deadline: NIODeadline, - logger: Logger? = nil + logger: Logger? = nil, + context: ServiceContext? = nil ) async throws -> HTTPClientResponse { try await self.executeAndFollowRedirectsIfNeeded( request, deadline: deadline, logger: logger ?? Self.loggingDisabled, + context: context ?? .current ?? .topLevel, redirectState: RedirectState(self.configuration.redirectConfiguration.mode, initialURL: request.url) ) } } -// MARK: Connivence methods +// MARK: Convenience methods @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension HTTPClient { @@ -55,12 +59,14 @@ extension HTTPClient { public func execute( _ request: HTTPClientRequest, timeout: TimeAmount, - logger: Logger? = nil + logger: Logger? = nil, + context: ServiceContext? = nil ) async throws -> HTTPClientResponse { try await self.execute( request, deadline: .now() + timeout, - logger: logger + logger: logger, + context: context ) } } @@ -71,15 +77,23 @@ extension HTTPClient { _ request: HTTPClientRequest, deadline: NIODeadline, logger: Logger, + context: ServiceContext, redirectState: RedirectState? ) async throws -> HTTPClientResponse { var currentRequest = request var currentRedirectState = redirectState + var resendCount = 0 // this loop is there to follow potential redirects while true { let preparedRequest = try HTTPClientRequest.Prepared(currentRequest, dnsOverride: configuration.dnsOverride) - let response = try await self.executeCancellable(preparedRequest, deadline: deadline, logger: logger) + let response = try await self.executeCancellable( + preparedRequest, + deadline: deadline, + logger: logger, + context: context, + resendCount: resendCount > 0 ? resendCount : nil + ) guard var redirectState = currentRedirectState else { // a `nil` redirectState means we should not follow redirects @@ -112,6 +126,7 @@ extension HTTPClient { return response } + resendCount += 1 currentRequest = newRequest } } @@ -119,39 +134,68 @@ extension HTTPClient { private func executeCancellable( _ request: HTTPClientRequest.Prepared, deadline: NIODeadline, - logger: Logger + logger: Logger, + context: ServiceContext, + resendCount: Int? ) async throws -> HTTPClientResponse { let cancelHandler = TransactionCancelHandler() - return try await withTaskCancellationHandler( - operation: { () async throws -> HTTPClientResponse in - let eventLoop = self.eventLoopGroup.any() - let deadlineTask = eventLoop.scheduleTask(deadline: deadline) { - cancelHandler.cancel(reason: .deadlineExceeded) - } - defer { - deadlineTask.cancel() - } - return try await withCheckedThrowingContinuation { - (continuation: CheckedContinuation) -> Void in - let transaction = Transaction( - request: request, - requestOptions: .fromClientConfiguration(self.configuration), - logger: logger, - connectionDeadline: .now() + (self.configuration.timeout.connectionCreationTimeout), - preferredEventLoop: eventLoop, - responseContinuation: continuation - ) - - cancelHandler.registerTransaction(transaction) - - self.poolManager.executeRequest(transaction) - } - }, - onCancel: { - cancelHandler.cancel(reason: .taskCanceled) + return try await withSpan(request.head.method.rawValue, context: context, ofKind: .client) { span in + var request = request + request.head.headers.propagate(span.context) + span.updateAttributes { attributes in + attributes["http.request.method"] = request.head.method.rawValue + attributes["server.address"] = request.poolKey.connectionTarget.host + attributes["server.port"] = request.poolKey.connectionTarget.port + attributes["url.full"] = request.url.absoluteString + attributes["http.request.resend_count"] = resendCount } - ) + + do { + let response = try await withTaskCancellationHandler( + operation: { () async throws -> HTTPClientResponse in + let eventLoop = self.eventLoopGroup.any() + let deadlineTask = eventLoop.scheduleTask(deadline: deadline) { + cancelHandler.cancel(reason: .deadlineExceeded) + } + defer { + deadlineTask.cancel() + } + let response = try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) -> Void in + let transaction = Transaction( + request: request, + requestOptions: .fromClientConfiguration(self.configuration), + logger: logger, + connectionDeadline: .now() + (self.configuration.timeout.connectionCreationTimeout), + preferredEventLoop: eventLoop, + responseContinuation: continuation + ) + + cancelHandler.registerTransaction(transaction) + + self.poolManager.executeRequest(transaction) + } + if response.status.code >= 400 { + span.setStatus(.init(code: .error)) + span.attributes["error.type"] = "\(response.status.code)" + } + span.attributes["http.response.status_code"] = "\(response.status.code)" + return response + }, + onCancel: { + span.setStatus(.init(code: .error)) + span.attributes["error.type"] = "\(CancellationError.self)" + cancelHandler.cancel(reason: .taskCanceled) + } + ) + return response + } catch { + span.setStatus(.init(code: .error)) + span.attributes["error.type"] = "\(type(of: error))" + throw error + } + } } } diff --git a/Sources/AsyncHTTPClient/ServiceContextPropagation.swift b/Sources/AsyncHTTPClient/ServiceContextPropagation.swift new file mode 100644 index 000000000..e4d283601 --- /dev/null +++ b/Sources/AsyncHTTPClient/ServiceContextPropagation.swift @@ -0,0 +1,32 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2024 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Instrumentation +import NIOHTTP1 +import ServiceContextModule + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension HTTPHeaders { + mutating func propagate(_ context: ServiceContext) { + InstrumentationSystem.instrument.inject(context, into: &self, using: Self.injector) + } + + private static let injector = HTTPHeadersInjector() +} + +private struct HTTPHeadersInjector: Injector { + func inject(_ value: String, forKey name: String, into headers: inout HTTPHeaders) { + headers.add(name: name, value: value) + } +}