Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support for OkHttp thread metrics #1022

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ okio-version = "3.6.0"
otel-version = "1.32.0"
slf4j-version = "2.0.9"
slf4j-v1x-version = "1.7.36"
crt-kotlin-version = "0.8.2"
crt-kotlin-version = "0.8.3"

# codegen
smithy-version = "1.42.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import aws.smithy.kotlin.runtime.http.HttpCall
import aws.smithy.kotlin.runtime.http.config.EngineFactory
import aws.smithy.kotlin.runtime.http.engine.*
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.engine.internal.ThreadState
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.net.TlsVersion
import aws.smithy.kotlin.runtime.operation.ExecutionContext
import aws.smithy.kotlin.runtime.time.Instant
import aws.smithy.kotlin.runtime.time.fromEpochMilliseconds
import kotlinx.coroutines.job
import okhttp3.*
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import kotlin.time.toJavaDuration
import aws.smithy.kotlin.runtime.net.TlsVersion as SdkTlsVersion
Expand All @@ -40,8 +42,13 @@ public class OkHttpEngine(
override val engineConstructor: (OkHttpEngineConfig.Builder.() -> Unit) -> OkHttpEngine = ::invoke
}

private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider)
private val client = config.buildClient(metrics)
private val dispatcher = config.buildDispatcher()
private val metrics = HttpClientMetrics(
scope = TELEMETRY_SCOPE,
provider = config.telemetryProvider,
threadStateCallback = dispatcher::getThreadState,
)
private val client = config.buildClient(dispatcher, metrics)

override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall {
val callContext = callContext()
Expand All @@ -68,10 +75,19 @@ public class OkHttpEngine(
}
}

private fun Dispatcher.getThreadState(): ThreadState? = (executorService as? ThreadPoolExecutor)?.let { executor ->
ThreadState(executor.poolSize.toLong(), executor.activeCount.toLong())
}

private fun OkHttpEngineConfig.buildDispatcher() = Dispatcher().apply {
maxRequests = maxConcurrency.toInt()
maxRequestsPerHost = maxConcurrencyPerHost.toInt()
}

/**
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
*/
private fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpClient {
private fun OkHttpEngineConfig.buildClient(dispatcher: Dispatcher, metrics: HttpClientMetrics): OkHttpClient {
val config = this

return OkHttpClient.Builder().apply {
Expand Down Expand Up @@ -100,10 +116,6 @@ private fun OkHttpEngineConfig.buildClient(metrics: HttpClientMetrics): OkHttpCl
)
connectionPool(pool)

val dispatcher = Dispatcher().apply {
maxRequests = config.maxConcurrency.toInt()
maxRequestsPerHost = config.maxConcurrencyPerHost.toInt()
}
dispatcher(dispatcher)

// Log events coming from okhttp. Allocate a new listener per-call to facilitate dedicated trace spans.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ public object HttpClientMetricAttributes {
public val AcquiredConnection: Attributes = attributesOf { "state" to "acquired" }
public val QueuedRequest: Attributes = attributesOf { "state" to "queued" }
public val InFlightRequest: Attributes = attributesOf { "state" to "in-flight" }
public val TotalThread: Attributes = attributesOf { "state" to "total" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correctness: The state attribute for this should probably follow the usage convention where the sub states total to some limit.
e.g. thread state usage metric smithy.client.http.threads.usage with states of idle | consumed and a separate metric for the limit smithy.client.http.threads.limit.

Also I suppose we should probably get this defined in the SRA before codifying it in an implementation.

We may not know the total thread limit in every case or thread pools may be dynamic and spin down threads which would break the "should" part of the convention but I think that's fine. In the ideal case the convention applies and even when it doesn't it's still how you'd want it graphed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll change this to idle/active threads.

public val ActiveThread: Attributes = attributesOf { "state" to "active" }
}

@InternalApi
public data class ThreadState(val total: Long, val active: Long)

/**
* Container for common HTTP engine related metrics. Engine implementations can re-use this and update
* the various fields in whatever manner fits best (increment/decrement vs current absolute value).
Expand All @@ -39,6 +44,7 @@ public object HttpClientMetricAttributes {
public class HttpClientMetrics(
scope: String,
public val provider: TelemetryProvider,
threadStateCallback: (() -> ThreadState?)? = null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems different for no apparent reason. It's not wrong I guess but feels odd that we would special case it since each engine can adapt however they want to the current way metrics are exposed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I concede it's very different from what we have elsewhere and feels very specialized right now.

I think one problem is that we don't directly control the thread pool. We don't decide when new threads are created or when they swap from idle to active. All we can do is inspect the state at any given moment to know the current value. That sounds like what an AsyncUpDownCounter is intended to do via a callback.

We could start a background thread in the OkHttp engine to periodically inspect the thread pool and stuff the idle/active counts into HttpClientMetrics, which would then be read when the callback is run. But this means the value will be out of sync by whatever interval the background thread uses. If, for instance, the background thread runs every 10 seconds but the meter polls every 5 seconds, then up to two datapoints will be reported inaccurately.

Is your greater concern that this change exposes the concept of callbacks outside of HttpClientMetrics and inverts the collection model from a push to a pull? Is it the inconsistency of some metrics being exposed outside of HttpClientMetrics as callbacks while others are mutable properties inside? Or is it that this may not be flexible enough to adapt to other engines where thread updates are more deterministic?

) : Closeable {
private val meter = provider.meterProvider.getOrCreateMeter(scope)

Expand Down Expand Up @@ -76,34 +82,6 @@ public class HttpClientMetrics(
"The amount of time a connection has been open",
)

private val connectionLimitHandle = meter.createAsyncUpDownCounter(
"smithy.client.http.connections.limit",
{ it.record(_connectionsLimit.value) },
"{connection}",
"Max connections configured for the HTTP client",
)

private val connectionUsageHandle = meter.createAsyncUpDownCounter(
"smithy.client.http.connections.usage",
::recordConnectionState,
"{connection}",
"Current state of connections (idle, acquired)",
)

private val requestsConcurrencyLimitHandle = meter.createAsyncUpDownCounter(
"smithy.client.http.requests.limit",
{ it.record(_requestConcurrencyLimit.value) },
"{request}",
"Max concurrent requests configured for the HTTP client",
)

private val requestsHandle = meter.createAsyncUpDownCounter(
"smithy.client.http.requests.usage",
::recordRequestsState,
"{request}",
"The current state of HTTP client request concurrency (queued, in-flight)",
)

public val bytesSent: MonotonicCounter = meter.createMonotonicCounter(
"smithy.client.http.bytes_sent",
"By",
Expand All @@ -122,6 +100,41 @@ public class HttpClientMetrics(
"The amount of time after a request has been sent spent waiting on a response from the remote server",
)

private val asyncHandles = listOfNotNull(
meter.createAsyncUpDownCounter(
"smithy.client.http.connections.limit",
{ it.record(_connectionsLimit.value) },
"{connection}",
"Max connections configured for the HTTP client",
),
meter.createAsyncUpDownCounter(
"smithy.client.http.connections.usage",
::recordConnectionState,
"{connection}",
"Current state of connections (idle, acquired)",
),
meter.createAsyncUpDownCounter(
"smithy.client.http.requests.limit",
{ it.record(_requestConcurrencyLimit.value) },
"{request}",
"Max concurrent requests configured for the HTTP client",
),
meter.createAsyncUpDownCounter(
"smithy.client.http.requests.usage",
::recordRequestsState,
"{request}",
"The current state of HTTP client request concurrency (queued, in-flight)",
),
threadStateCallback?.let { callback ->
meter.createAsyncUpDownCounter(
"smithy.client.http.threads.count",
{ measurement -> recordThreadState(measurement, callback) },
"{thread}",
"Current state of HTTP engine threads (active, running)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are "running" threads different from "active" ones? I see only "total" and "active" are recorded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, "active" and "running" are not the correct terms here. I can change those to "total" and "active".

)
},
)

/**
* The maximum number of connections configured for the client
*/
Expand Down Expand Up @@ -186,13 +199,17 @@ public class HttpClientMetrics(
measurement.record(acquiredConnections, HttpClientMetricAttributes.AcquiredConnection)
}

private fun recordThreadState(measurement: LongAsyncMeasurement, callback: () -> ThreadState?) {
callback()?.let { threadState ->
measurement.record(threadState.total, HttpClientMetricAttributes.TotalThread)
measurement.record(threadState.active, HttpClientMetricAttributes.ActiveThread)
}
}

override fun close() {
val exceptions = listOf(
runCatching(connectionLimitHandle::stop),
runCatching(connectionUsageHandle::stop),
runCatching(requestsHandle::stop),
runCatching(requestsConcurrencyLimitHandle::stop),
).mapNotNull(Result<*>::exceptionOrNull)
val exceptions = asyncHandles
.map { handle -> runCatching { handle.stop() } }
.mapNotNull(Result<*>::exceptionOrNull)

exceptions.firstOrNull()?.let { first ->
exceptions.drop(1).forEach(first::addSuppressed)
Expand Down
Loading