From 50d89e2a25b331e953a03ad8d91a18b9e8c0b121 Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 29 Aug 2014 02:33:04 -0300 Subject: + core: provide metrics for routers * processing-time * errors * time-in-mailbox closes #62 --- .../scala/kamon/metric/RouterMetricsSpec.scala | 163 +++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala (limited to 'kamon-core/src/test') diff --git a/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala new file mode 100644 index 00000000..37b3d169 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala @@ -0,0 +1,163 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.LongBuffer + +import akka.actor._ +import akka.routing.RoundRobinPool +import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metric.RouterMetrics._ +import kamon.metric.RouterMetricsTestActor._ +import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.instrument.{ Counter, Histogram } +import org.scalatest.{ Matchers, WordSpecLike } + +import scala.concurrent.duration._ + +class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 second + | default-collection-context-buffer-size = 10 + | + | filters = [ + | { + | router { + | includes = [ "user/tracked-*", "user/measuring-*", "user/stop" ] + | excludes = [ "user/tracked-explicitly-excluded"] + | } + | } + | ] + | precision { + | default-histogram-precision { + | highest-trackable-value = 3600000000000 + | significant-value-digits = 2 + | } + | } + |} + """.stripMargin)) + + "the Kamon router metrics" should { + "respect the configured include and exclude filters" in new RouterMetricsFixtures { + createTestRouter("tracked-router") + createTestRouter("non-tracked-router") + createTestRouter("tracked-explicitly-excluded") + + Kamon(Metrics).subscribe(RouterMetrics, "*", testActor, permanently = true) + expectMsgType[TickMetricSnapshot] + + within(2 seconds) { + val tickSnapshot = expectMsgType[TickMetricSnapshot] + tickSnapshot.metrics.keys should contain(RouterMetrics("user/tracked-router")) + tickSnapshot.metrics.keys should not contain (RouterMetrics("user/non-tracked-router")) + tickSnapshot.metrics.keys should not contain (RouterMetrics("user/tracked-explicitly-excluded")) + } + } + + "record the processing-time of the receive function" in new RouterMetricsFixtures { + val metricsListener = TestProbe() + val trackedRouter = createTestRouter("measuring-processing-time") + + trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) + val timings = metricsListener.expectMsgType[RouterTrackedTimings] + + val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics + tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) + tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) + // tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + } + + "record the number of errors" in new RouterMetricsFixtures { + val metricsListener = TestProbe() + val trackedRouter = createTestRouter("measuring-errors") + + for (i ← 1 to 10) { + trackedRouter.tell(Fail, metricsListener.ref) + } + val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics + tickSnapshot(RouterMetrics("user/measuring-errors")).metrics(Errors).asInstanceOf[Counter.Snapshot].count should be(10L) + } + + "record the time-in-mailbox" in new RouterMetricsFixtures { + val metricsListener = TestProbe() + val trackedRouter = createTestRouter("measuring-time-in-mailbox") + + trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) + val timings = metricsListener.expectMsgType[RouterTrackedTimings] + val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics + + tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) + tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) + tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + } + + "clean up the associated recorder when the actor is stopped" in new RouterMetricsFixtures { + val trackedRouter = createTestRouter("stop") + trackedRouter ! Ping + Kamon(Metrics).storage.toString() // force to be initialized + Kamon(Metrics).storage.get(RouterMetrics("user/stop")) should not be empty + + val deathWatcher = TestProbe() + deathWatcher.watch(trackedRouter) + trackedRouter ! PoisonPill + deathWatcher.expectTerminated(trackedRouter) + + Kamon(Metrics).storage.get(RouterMetrics("user/stop")) shouldBe empty + } + } + + trait RouterMetricsFixtures { + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } + + def createTestRouter(name: String): ActorRef = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), name) + + def takeSnapshotOf(amr: RouterMetricsRecorder): RouterMetricSnapshot = amr.collect(collectionContext) + } +} + +class RouterMetricsTestActor extends Actor { + def receive = { + case Discard ⇒ + case Fail ⇒ 1 / 0 + case Ping ⇒ sender ! Pong + case RouterTrackTimings(sendTimestamp, sleep) ⇒ { + val dequeueTimestamp = System.nanoTime() + sleep.map(s ⇒ Thread.sleep(s.toMillis)) + val afterReceiveTimestamp = System.nanoTime() + + sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) + } + } +} + +object RouterMetricsTestActor { + case object Ping + case object Pong + case object Fail + case object Discard + + case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) + case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { + def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp + def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp + } +} -- cgit v1.2.3