Skip to content

Commit

Permalink
add module for monitoring akka-http client internals
Browse files Browse the repository at this point in the history
  • Loading branch information
jypma committed Jun 29, 2017
1 parent a901a85 commit f1f470a
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 1 deletion.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import sbtprotobuf.{ProtobufPlugin=>PB}
lazy val projectSettings = PB.protobufSettings ++ Seq(
licenses := Seq(("MIT", url("http://opensource.org/licenses/MIT"))),
organization := "com.tradeshift",
version := "0.0.25-SNAPSHOT",
version := "0.0.26",
scalaVersion := "2.11.8",
publishMavenStyle := true,
javacOptions ++= Seq("-source", "1.8"),
Expand Down Expand Up @@ -92,6 +92,8 @@ lazy val `ts-reaktive-kamon-log4j` = project.settings(commonSettings: _*)

lazy val `ts-reaktive-kamon-akka` = project.settings(commonSettings: _*)

lazy val `ts-reaktive-kamon-akka-client` = project.settings(commonSettings: _*)

lazy val `ts-reaktive-kamon-akka-cluster` = project.settings(commonSettings: _*)

lazy val root = (project in file(".")).settings(publish := { }, publishLocal := { }).aggregate(
Expand All @@ -109,6 +111,7 @@ lazy val root = (project in file(".")).settings(publish := { }, publishLocal :=
`ts-reaktive-testkit-assertj`,
`ts-reaktive-kamon-log4j`,
`ts-reaktive-kamon-akka`,
`ts-reaktive-kamon-akka-client`,
`ts-reaktive-kamon-akka-cluster`)

// Don't publish the root artifact; only publish sub-projects
Expand Down
2 changes: 2 additions & 0 deletions ts-reaktive-kamon-akka-client/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

description := "Kamon module for akka http client internals monitoring"
12 changes: 12 additions & 0 deletions ts-reaktive-kamon-akka-client/src/main/resources/META-INF/aop.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">

<aspectj>
<aspects>
<aspect name="akka.http.impl.engine.client.PoolConductorMonitoring"/>
<aspect name="akka.http.impl.engine.client.PoolInterfaceActorMonitoring"/>
</aspects>

<weaver>
<include within="akka..*"/>
</weaver>
</aspectj>
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

kamon {
modules {
ts-akka-http-client {
requires-aspectj = yes
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package akka.http.impl.engine.client

import org.aspectj.lang.annotation.After
import org.aspectj.lang.annotation.Aspect
import org.aspectj.lang.annotation.Pointcut

import akka.event.BusLogging
import kamon.Kamon

@Aspect
class PoolConductorMonitoring {
val slotStatesField = Class.forName("akka.http.impl.engine.client.PoolConductor$SlotSelector$$anon$1").getDeclaredField("slotStates")
val slotSelectorField = Class.forName("akka.http.impl.engine.client.PoolConductor$SlotSelector$$anon$1").getDeclaredField("$outer")
val logField = Class.forName("akka.http.impl.engine.client.PoolConductor$SlotSelector").getDeclaredField("log")

@Pointcut("execution(akka.http.impl.engine.client.PoolConductor$SlotSelector$$anon$1.new(..)) && this(logic)")
def newGraphStageLogic(logic: AnyRef): Unit = {}

private val HOST = raw"//(.+):".r
private val PORT = raw":([0-9]+)".r

private def getTags(graphStageLogic: AnyRef): Map[String,String] = {
slotSelectorField.setAccessible(true)
val slotSelector = slotSelectorField.get(graphStageLogic)

logField.setAccessible(true)
val log = logField.get(slotSelector).asInstanceOf[BusLogging]
val host = HOST.findFirstMatchIn(log.logSource).map(_.group(1)).getOrElse("unknown")
val port = PORT.findAllMatchIn(log.logSource).toVector.lastOption.map(_.group(1)).getOrElse("80")

Map("target_host" -> host, "target_port" -> port)
}

private val states: Map[String,Class[_]] = Map(
"idle" -> Class.forName("akka.http.impl.engine.client.PoolConductor$Idle$"),
"unconnected" -> Class.forName("akka.http.impl.engine.client.PoolConductor$Unconnected$"),
"loaded" -> Class.forName("akka.http.impl.engine.client.PoolConductor$Loaded"),
"busy" -> Class.forName("akka.http.impl.engine.client.PoolConductor$Busy"))

@After("newGraphStageLogic(logic)")
def afterNewGraphStageLogic(logic: AnyRef): Unit = {
slotStatesField.setAccessible(true)
val slotStates = slotStatesField.get(logic).asInstanceOf[Array[_]]

val tags = getTags(logic)
for ((name, stateClass) <- states) {
// note: don't use "host" as a tag, since for Datadog, it will cause dogstatsd to swallow ALL _other_ default tags.
Kamon.metrics.gauge(s"http-client.pool.connections.${name}", tags) {
() => slotStates.count(s => s.getClass() eq stateClass).toLong
}
}
}

@Pointcut("execution(akka.http.impl.engine.client.PoolConductor$SlotSelector$$anon$1.postStop(..)) && this(logic)")
def postStopGraphStageLogic(logic: AnyRef): Unit = {}

@After("postStopGraphStageLogic(logic)")
def afterPostStopGraphStageLogic(logic: AnyRef): Unit = {
val tags = getTags(logic)
for ((name, _) <- states) {
Kamon.metrics.removeGauge(s"http-client.pool.connections.${name}", tags)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package akka.http.impl.engine.client

import org.aspectj.lang.annotation.After
import org.aspectj.lang.annotation.Aspect
import org.aspectj.lang.annotation.Pointcut

import akka.http.impl.engine.client.PoolInterfaceActor.PoolRequest
import akka.http.impl.settings.HostConnectionPoolSetup
import akka.stream.impl.Buffer
import kamon.Kamon

@Aspect
class PoolInterfaceActorMonitoring {
val inputBufferField = classOf[PoolInterfaceActor].getDeclaredField("akka$http$impl$engine$client$PoolInterfaceActor$$inputBuffer")
val hcpsField = classOf[PoolInterfaceActor].getDeclaredField("akka$http$impl$engine$client$PoolInterfaceActor$$hcps")

@Pointcut("execution(akka.http.impl.engine.client.PoolInterfaceActor.new(..)) && this(actor)")
def create(actor: PoolInterfaceActor): Unit = {}

@After("create(actor)")
def afterCreate(actor: PoolInterfaceActor): Unit = {
inputBufferField.setAccessible(true)
val buffer = inputBufferField.get(actor).asInstanceOf[Buffer[PoolRequest]]
hcpsField.setAccessible(true)
val hcps = hcpsField.get(actor).asInstanceOf[HostConnectionPoolSetup]

val tags = Map("target_host" -> hcps.host, "target_port" -> hcps.port.toString)
Kamon.metrics.gauge("http-client.pool.queue.used", tags) { () => buffer.used.toLong }
Kamon.metrics.gauge("http-client.pool.queue.capacity", tags) { () => buffer.capacity.toLong }
}

@Pointcut("execution(* akka.http.impl.engine.client.PoolInterfaceActor.afterStop(..)) && this(actor)")
def stop(actor: PoolInterfaceActor): Unit = {}

@After("stop(actor)")
def afterStop(actor: PoolInterfaceActor): Unit = {
hcpsField.setAccessible(true)
val hcps = hcpsField.get(actor).asInstanceOf[HostConnectionPoolSetup]

val tags = Map("target_host" -> hcps.host, "target_port" -> hcps.port.toString)
Kamon.metrics.removeGauge("http-client.pool.queue.used", tags)
Kamon.metrics.removeGauge("http-client.pool.queue.capacity", tags)
}
}

0 comments on commit f1f470a

Please sign in to comment.