Skip to content

Commit

Permalink
Merge pull request #275 from tumblr/metrics-tick-order
Browse files Browse the repository at this point in the history
fixing bug where reported metrics lagged by one tick interval
  • Loading branch information
DanSimon committed Oct 21, 2015
2 parents 9d8a27c + 1ca0a60 commit 60676d3
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ trait ActorMetrics extends Actor with ActorLogging {
case InvalidEvent => log.error(s"Invalid event $m")
}
case Tick(v, interval) => {
val agg = metrics.aggregate(interval)
metrics.tick(interval)
val agg = metrics.tick(interval)
sender() ! Tock(agg, v)
}
}
Expand Down
18 changes: 8 additions & 10 deletions colossus-metrics/src/main/scala/colossus/metrics/Collection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,16 @@ class LocalCollection(
localProducers += producer
}

def aggregate(interval: FiniteDuration): MetricMap = {
/**
* Ticks all of the metric collectors in this collection and returns their aggregated metric map.
*
*/
def tick(interval: FiniteDuration): MetricMap = {
//do not move this import unless you want to have a bad time
import scala.collection.JavaConversions._
val now = System.currentTimeMillis
metrics.values.collect{
case t: TickedCollector => t.tick(interval)
}
val context = CollectionContext(globalTags, interval)
val eventMetrics = metrics.values.map{_.metrics(context)}
val producerMetrics = localProducers.map{_.metrics(context)}
Expand All @@ -251,14 +257,6 @@ class LocalCollection(
.filter{! _._2.isEmpty}
}

//TODO this method is probably not needed, can be merged with aggregate
def tick(tickPeriod: FiniteDuration) {
import scala.collection.JavaConversions._
metrics.values.collect{
case t: TickedCollector => t.tick(tickPeriod)
}
}

def subCollection(subSpace: MetricAddress = MetricAddress.Root, subTags: TagMap = TagMap.Empty) = {
new LocalCollection(subSpace, subTags, intervals, metrics, Some(this))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import akka.actor._
import akka.testkit._

import scala.language.higherKinds
import scala.concurrent.duration._

import org.scalatest._

class NewSpec extends WordSpec with MustMatchers with BeforeAndAfterAll{
import MetricValues._

class CollectionSpec extends WordSpec with MustMatchers with BeforeAndAfterAll{

implicit val sys = ActorSystem("test")

Expand Down Expand Up @@ -71,6 +74,21 @@ class NewSpec extends WordSpec with MustMatchers with BeforeAndAfterAll{
}

}

"properly tick during aggregation" in {
val c = new LocalCollection(intervals = Seq(1.second, 1.minute))//(TestProbe().ref)
val r: Rate = c.getOrAdd(Rate("/foo"))
r.hit()
r.hit()
r.hit()
val m = c.tick(1.second)
m("/foo")(Map()) must equal(SumValue(3))
r.hit()
r.hit()
val n = c.tick(1.minute)
n("/foo")(Map()) must equal(SumValue(5))

}

}

Expand Down

0 comments on commit 60676d3

Please sign in to comment.