/* ========================================================================================= * Copyright © 2013-2014 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. * ========================================================================================= */ package kamon.metric import java.nio.LongBuffer import akka.actor._ import akka.routing.RoundRobinPool import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.metric.RouterMetrics._ import kamon.metric.RouterMetricsTestActor._ import kamon.metric.Subscriptions.TickMetricSnapshot import kamon.metric.instrument.{ Counter, Histogram } import org.scalatest.{ Matchers, WordSpecLike } import scala.concurrent.duration._ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString( """ |kamon.metrics { | tick-interval = 1 second | default-collection-context-buffer-size = 10 | | filters = [ | { | router { | includes = [ "user/tracked-*", "user/measuring-*", "user/stop" ] | excludes = [ "user/tracked-explicitly-excluded"] | } | } | ] | precision { | default-histogram-precision { | highest-trackable-value = 3600000000000 | significant-value-digits = 2 | } | } |} """.stripMargin)) "the Kamon router metrics" should { "respect the configured include and exclude filters" in new RouterMetricsFixtures { createTestRouter("tracked-router") createTestRouter("non-tracked-router") createTestRouter("tracked-explicitly-excluded") Kamon(Metrics).subscribe(RouterMetrics, "*", testActor, permanently = true) expectMsgType[TickMetricSnapshot] within(2 seconds) { val tickSnapshot = expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.keys should contain(RouterMetrics("user/tracked-router")) tickSnapshot.metrics.keys should not contain (RouterMetrics("user/non-tracked-router")) tickSnapshot.metrics.keys should not contain (RouterMetrics("user/tracked-explicitly-excluded")) } } "record the processing-time of the receive function" in new RouterMetricsFixtures { val metricsListener = TestProbe() val trackedRouter = createTestRouter("measuring-processing-time") trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) val timings = metricsListener.expectMsgType[RouterTrackedTimings] val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) // tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) } "record the number of errors" in new RouterMetricsFixtures { val metricsListener = TestProbe() val trackedRouter = createTestRouter("measuring-errors") for (i ← 1 to 10) { trackedRouter.tell(Fail, metricsListener.ref) } val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics tickSnapshot(RouterMetrics("user/measuring-errors")).metrics(Errors).asInstanceOf[Counter.Snapshot].count should be(10L) } "record the time-in-mailbox" in new RouterMetricsFixtures { val metricsListener = TestProbe() val trackedRouter = createTestRouter("measuring-time-in-mailbox") trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) val timings = metricsListener.expectMsgType[RouterTrackedTimings] val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) } "clean up the associated recorder when the actor is stopped" in new RouterMetricsFixtures { val trackedRouter = createTestRouter("stop") trackedRouter ! Ping Kamon(Metrics).storage.toString() // force to be initialized Kamon(Metrics).storage.get(RouterMetrics("user/stop")) should not be empty val deathWatcher = TestProbe() deathWatcher.watch(trackedRouter) trackedRouter ! PoisonPill deathWatcher.expectTerminated(trackedRouter) Kamon(Metrics).storage.get(RouterMetrics("user/stop")) shouldBe empty } } trait RouterMetricsFixtures { val collectionContext = new CollectionContext { val buffer: LongBuffer = LongBuffer.allocate(10000) } def createTestRouter(name: String): ActorRef = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), name) } } class RouterMetricsTestActor extends Actor { def receive = { case Discard ⇒ case Fail ⇒ throw new ArithmeticException("Division by zero.") case Ping ⇒ sender ! Pong case RouterTrackTimings(sendTimestamp, sleep) ⇒ { val dequeueTimestamp = System.nanoTime() sleep.map(s ⇒ Thread.sleep(s.toMillis)) val afterReceiveTimestamp = System.nanoTime() sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) } } } object RouterMetricsTestActor { case object Ping case object Pong case object Fail case object Discard case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp } }