diff --git a/build.sbt b/build.sbt index 1198b12b..44381e13 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ import sbtprotobuf.{ProtobufPlugin=>PB} lazy val projectSettings = PB.protobufSettings ++ Seq( licenses := Seq(("MIT", url("http://opensource.org/licenses/MIT"))), organization := "com.tradeshift", - version := "0.0.25-SNAPSHOT", + version := "0.0.26", scalaVersion := "2.11.8", publishMavenStyle := true, javacOptions ++= Seq("-source", "1.8"), @@ -92,6 +92,8 @@ lazy val `ts-reaktive-kamon-log4j` = project.settings(commonSettings: _*) lazy val `ts-reaktive-kamon-akka` = project.settings(commonSettings: _*) +lazy val `ts-reaktive-kamon-akka-client` = project.settings(commonSettings: _*) + lazy val `ts-reaktive-kamon-akka-cluster` = project.settings(commonSettings: _*) lazy val root = (project in file(".")).settings(publish := { }, publishLocal := { }).aggregate( @@ -109,6 +111,7 @@ lazy val root = (project in file(".")).settings(publish := { }, publishLocal := `ts-reaktive-testkit-assertj`, `ts-reaktive-kamon-log4j`, `ts-reaktive-kamon-akka`, + `ts-reaktive-kamon-akka-client`, `ts-reaktive-kamon-akka-cluster`) // Don't publish the root artifact; only publish sub-projects diff --git a/ts-reaktive-kamon-akka-client/build.sbt b/ts-reaktive-kamon-akka-client/build.sbt new file mode 100644 index 00000000..cf7936d9 --- /dev/null +++ b/ts-reaktive-kamon-akka-client/build.sbt @@ -0,0 +1,2 @@ + +description := "Kamon module for akka http client internals monitoring" diff --git a/ts-reaktive-kamon-akka-client/src/main/resources/META-INF/aop.xml b/ts-reaktive-kamon-akka-client/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..c84c258e --- /dev/null +++ b/ts-reaktive-kamon-akka-client/src/main/resources/META-INF/aop.xml @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/ts-reaktive-kamon-akka-client/src/main/resources/reference.conf b/ts-reaktive-kamon-akka-client/src/main/resources/reference.conf new file mode 100644 index 00000000..f63fc8d1 --- /dev/null +++ b/ts-reaktive-kamon-akka-client/src/main/resources/reference.conf @@ -0,0 +1,8 @@ + +kamon { + modules { + ts-akka-http-client { + requires-aspectj = yes + } + } +} diff --git a/ts-reaktive-kamon-akka-client/src/main/scala/akka/http/impl/engine/client/PoolConductorMonitoring.scala b/ts-reaktive-kamon-akka-client/src/main/scala/akka/http/impl/engine/client/PoolConductorMonitoring.scala new file mode 100644 index 00000000..64e5784a --- /dev/null +++ b/ts-reaktive-kamon-akka-client/src/main/scala/akka/http/impl/engine/client/PoolConductorMonitoring.scala @@ -0,0 +1,64 @@ +package akka.http.impl.engine.client + +import org.aspectj.lang.annotation.After +import org.aspectj.lang.annotation.Aspect +import org.aspectj.lang.annotation.Pointcut + +import akka.event.BusLogging +import kamon.Kamon + +@Aspect +class PoolConductorMonitoring { + val slotStatesField = Class.forName("akka.http.impl.engine.client.PoolConductor$SlotSelector$$anon$1").getDeclaredField("slotStates") + val slotSelectorField = Class.forName("akka.http.impl.engine.client.PoolConductor$SlotSelector$$anon$1").getDeclaredField("$outer") + val logField = Class.forName("akka.http.impl.engine.client.PoolConductor$SlotSelector").getDeclaredField("log") + + @Pointcut("execution(akka.http.impl.engine.client.PoolConductor$SlotSelector$$anon$1.new(..)) && this(logic)") + def newGraphStageLogic(logic: AnyRef): Unit = {} + + private val HOST = raw"//(.+):".r + private val PORT = raw":([0-9]+)".r + + private def getTags(graphStageLogic: AnyRef): Map[String,String] = { + slotSelectorField.setAccessible(true) + val slotSelector = slotSelectorField.get(graphStageLogic) + + logField.setAccessible(true) + val log = logField.get(slotSelector).asInstanceOf[BusLogging] + val host = HOST.findFirstMatchIn(log.logSource).map(_.group(1)).getOrElse("unknown") + val port = PORT.findAllMatchIn(log.logSource).toVector.lastOption.map(_.group(1)).getOrElse("80") + + Map("target_host" -> host, "target_port" -> port) + } + + private val states: Map[String,Class[_]] = Map( + "idle" -> Class.forName("akka.http.impl.engine.client.PoolConductor$Idle$"), + "unconnected" -> Class.forName("akka.http.impl.engine.client.PoolConductor$Unconnected$"), + "loaded" -> Class.forName("akka.http.impl.engine.client.PoolConductor$Loaded"), + "busy" -> Class.forName("akka.http.impl.engine.client.PoolConductor$Busy")) + + @After("newGraphStageLogic(logic)") + def afterNewGraphStageLogic(logic: AnyRef): Unit = { + slotStatesField.setAccessible(true) + val slotStates = slotStatesField.get(logic).asInstanceOf[Array[_]] + + val tags = getTags(logic) + for ((name, stateClass) <- states) { + // note: don't use "host" as a tag, since for Datadog, it will cause dogstatsd to swallow ALL _other_ default tags. + Kamon.metrics.gauge(s"http-client.pool.connections.${name}", tags) { + () => slotStates.count(s => s.getClass() eq stateClass).toLong + } + } + } + + @Pointcut("execution(akka.http.impl.engine.client.PoolConductor$SlotSelector$$anon$1.postStop(..)) && this(logic)") + def postStopGraphStageLogic(logic: AnyRef): Unit = {} + + @After("postStopGraphStageLogic(logic)") + def afterPostStopGraphStageLogic(logic: AnyRef): Unit = { + val tags = getTags(logic) + for ((name, _) <- states) { + Kamon.metrics.removeGauge(s"http-client.pool.connections.${name}", tags) + } + } +} \ No newline at end of file diff --git a/ts-reaktive-kamon-akka-client/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActorMonitoring.scala b/ts-reaktive-kamon-akka-client/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActorMonitoring.scala new file mode 100644 index 00000000..d5fc8b34 --- /dev/null +++ b/ts-reaktive-kamon-akka-client/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActorMonitoring.scala @@ -0,0 +1,44 @@ +package akka.http.impl.engine.client + +import org.aspectj.lang.annotation.After +import org.aspectj.lang.annotation.Aspect +import org.aspectj.lang.annotation.Pointcut + +import akka.http.impl.engine.client.PoolInterfaceActor.PoolRequest +import akka.http.impl.settings.HostConnectionPoolSetup +import akka.stream.impl.Buffer +import kamon.Kamon + +@Aspect +class PoolInterfaceActorMonitoring { + val inputBufferField = classOf[PoolInterfaceActor].getDeclaredField("akka$http$impl$engine$client$PoolInterfaceActor$$inputBuffer") + val hcpsField = classOf[PoolInterfaceActor].getDeclaredField("akka$http$impl$engine$client$PoolInterfaceActor$$hcps") + + @Pointcut("execution(akka.http.impl.engine.client.PoolInterfaceActor.new(..)) && this(actor)") + def create(actor: PoolInterfaceActor): Unit = {} + + @After("create(actor)") + def afterCreate(actor: PoolInterfaceActor): Unit = { + inputBufferField.setAccessible(true) + val buffer = inputBufferField.get(actor).asInstanceOf[Buffer[PoolRequest]] + hcpsField.setAccessible(true) + val hcps = hcpsField.get(actor).asInstanceOf[HostConnectionPoolSetup] + + val tags = Map("target_host" -> hcps.host, "target_port" -> hcps.port.toString) + Kamon.metrics.gauge("http-client.pool.queue.used", tags) { () => buffer.used.toLong } + Kamon.metrics.gauge("http-client.pool.queue.capacity", tags) { () => buffer.capacity.toLong } + } + + @Pointcut("execution(* akka.http.impl.engine.client.PoolInterfaceActor.afterStop(..)) && this(actor)") + def stop(actor: PoolInterfaceActor): Unit = {} + + @After("stop(actor)") + def afterStop(actor: PoolInterfaceActor): Unit = { + hcpsField.setAccessible(true) + val hcps = hcpsField.get(actor).asInstanceOf[HostConnectionPoolSetup] + + val tags = Map("target_host" -> hcps.host, "target_port" -> hcps.port.toString) + Kamon.metrics.removeGauge("http-client.pool.queue.used", tags) + Kamon.metrics.removeGauge("http-client.pool.queue.capacity", tags) + } +} \ No newline at end of file