diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-02-12 11:30:06 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-02-13 05:15:30 +0100 |
commit | c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8 (patch) | |
tree | d7dbe6a1007b168998f167ac74a98744542c6fa8 /kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala | |
parent | 6729c9632245328a007332cdcce7d362584d735a (diff) | |
download | Kamon-c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8.tar.gz Kamon-c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8.tar.bz2 Kamon-c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8.zip |
! all: Kamon now works as a single instance in a companion object.
Diffstat (limited to 'kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala')
-rw-r--r-- | kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala | 277 |
1 files changed, 277 insertions, 0 deletions
diff --git a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala new file mode 100644 index 00000000..ec55648b --- /dev/null +++ b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala @@ -0,0 +1,277 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.akka + +import java.nio.LongBuffer + +import akka.actor._ +import akka.routing._ +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.akka.RouterMetricsTestActor._ +import kamon.metric.EntitySnapshot +import kamon.metric.instrument.CollectionContext +import kamon.testkit.BaseKamonSpec + +import scala.concurrent.duration._ + +class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon.metric { + | tick-interval = 1 hour + | default-collection-context-buffer-size = 10 + | + | filters = { + | akka-router { + | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ] + | excludes = [ "user/tracked-explicitly-excluded-*"] + | } + | } + |} + | + |akka.loglevel = OFF + | + """.stripMargin) + + "the Kamon router metrics" should { + "respect the configured include and exclude filters" in new RouterMetricsFixtures { + createTestPoolRouter("tracked-pool-router") + createTestGroupRouter("tracked-group-router") + createTestPoolRouter("non-tracked-pool-router") + createTestGroupRouter("non-tracked-group-router") + createTestPoolRouter("tracked-explicitly-excluded-pool-router") + createTestGroupRouter("tracked-explicitly-excluded-group-router") + + routerMetricsRecorderOf("user/tracked-pool-router") should not be empty + routerMetricsRecorderOf("user/tracked-group-router") should not be empty + routerMetricsRecorderOf("user/non-tracked-pool-router") shouldBe empty + routerMetricsRecorderOf("user/non-tracked-group-router") shouldBe empty + routerMetricsRecorderOf("user/tracked-explicitly-excluded-pool-router") shouldBe empty + routerMetricsRecorderOf("user/tracked-explicitly-excluded-group-router") shouldBe empty + } + + "record the routing-time of the receive function for pool routers" in new RouterMetricsFixtures { + val listener = TestProbe() + val router = createTestPoolRouter("measuring-routing-time-in-pool-router") + + router.tell(Ping, listener.ref) + listener.expectMsg(Pong) + val routerSnapshot = collectMetricsOf("user/measuring-routing-time-in-pool-router").get + + routerSnapshot.histogram("routing-time").get.numberOfMeasurements should be(1L) + } + + "record the routing-time of the receive function for group routers" in new RouterMetricsFixtures { + val listener = TestProbe() + val router = createTestGroupRouter("measuring-routing-time-in-group-router") + + router.tell(Ping, listener.ref) + listener.expectMsg(Pong) + val routerSnapshot = collectMetricsOf("user/measuring-routing-time-in-group-router").get + + routerSnapshot.histogram("routing-time").get.numberOfMeasurements should be(1L) + } + + "record the processing-time of the receive function for pool routers" in new RouterMetricsFixtures { + val timingsListener = TestProbe() + val router = createTestPoolRouter("measuring-processing-time-in-pool-router") + + router.tell(RouterTrackTimings(sleep = Some(1 second)), timingsListener.ref) + val timings = timingsListener.expectMsgType[RouterTrackedTimings] + val routerSnapshot = collectMetricsOf("user/measuring-processing-time-in-pool-router").get + + routerSnapshot.histogram("processing-time").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + } + + "record the processing-time of the receive function for group routers" in new RouterMetricsFixtures { + val timingsListener = TestProbe() + val router = createTestGroupRouter("measuring-processing-time-in-group-router") + + router.tell(RouterTrackTimings(sleep = Some(1 second)), timingsListener.ref) + val timings = timingsListener.expectMsgType[RouterTrackedTimings] + val routerSnapshot = collectMetricsOf("user/measuring-processing-time-in-group-router").get + + routerSnapshot.histogram("processing-time").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + } + + "record the number of errors for pool routers" in new RouterMetricsFixtures { + val listener = TestProbe() + val router = createTestPoolRouter("measuring-errors-in-pool-router") + + for (i ← 1 to 10) { + router.tell(Fail, listener.ref) + } + + router.tell(Ping, listener.ref) + listener.expectMsg(Pong) + + val routerSnapshot = collectMetricsOf("user/measuring-errors-in-pool-router").get + routerSnapshot.counter("errors").get.count should be(10L) + } + + "record the number of errors for group routers" in new RouterMetricsFixtures { + val listener = TestProbe() + val router = createTestGroupRouter("measuring-errors-in-group-router") + + for (i ← 1 to 10) { + router.tell(Fail, listener.ref) + } + + router.tell(Ping, listener.ref) + listener.expectMsg(Pong) + + val routerSnapshot = collectMetricsOf("user/measuring-errors-in-group-router").get + routerSnapshot.counter("errors").get.count should be(10L) + } + + "record the time-in-mailbox for pool routers" in new RouterMetricsFixtures { + val timingsListener = TestProbe() + val router = createTestPoolRouter("measuring-time-in-mailbox-in-pool-router") + + router.tell(RouterTrackTimings(sleep = Some(1 second)), timingsListener.ref) + val timings = timingsListener.expectMsgType[RouterTrackedTimings] + val routerSnapshot = collectMetricsOf("user/measuring-time-in-mailbox-in-pool-router").get + + routerSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + } + + "record the time-in-mailbox for group routers" in new RouterMetricsFixtures { + val timingsListener = TestProbe() + val router = createTestGroupRouter("measuring-time-in-mailbox-in-group-router") + + router.tell(RouterTrackTimings(sleep = Some(1 second)), timingsListener.ref) + val timings = timingsListener.expectMsgType[RouterTrackedTimings] + val routerSnapshot = collectMetricsOf("user/measuring-time-in-mailbox-in-group-router").get + + routerSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + } + + "clean up the associated recorder when the pool router is stopped" in new RouterMetricsFixtures { + val trackedRouter = createTestPoolRouter("stop-in-pool-router") + val firstRecorder = routerMetricsRecorderOf("user/stop-in-pool-router").get + + // Killing the router should remove it's RouterMetrics and registering again bellow should create a new one. + val deathWatcher = TestProbe() + deathWatcher.watch(trackedRouter) + trackedRouter ! PoisonPill + deathWatcher.expectTerminated(trackedRouter) + + routerMetricsRecorderOf("user/stop-in-pool-router").get shouldNot be theSameInstanceAs (firstRecorder) + } + + "clean up the associated recorder when the group router is stopped" in new RouterMetricsFixtures { + val trackedRouter = createTestPoolRouter("stop-in-group-router") + val firstRecorder = routerMetricsRecorderOf("user/stop-in-group-router").get + + // Killing the router should remove it's RouterMetrics and registering again bellow should create a new one. + val deathWatcher = TestProbe() + deathWatcher.watch(trackedRouter) + trackedRouter ! PoisonPill + deathWatcher.expectTerminated(trackedRouter) + + routerMetricsRecorderOf("user/stop-in-group-router").get shouldNot be theSameInstanceAs (firstRecorder) + } + } + + override protected def afterAll(): Unit = shutdown() + + trait RouterMetricsFixtures { + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } + + def routerMetricsRecorderOf(routerName: String): Option[RouterMetrics] = + Kamon.metrics.register(RouterMetrics, routerName).map(_.recorder) + + def collectMetricsOf(routerName: String): Option[EntitySnapshot] = { + Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. + routerMetricsRecorderOf(routerName).map(_.collect(collectionContext)) + } + + def createTestGroupRouter(routerName: String): ActorRef = { + val routees = Vector.fill(5) { + system.actorOf(Props[RouterMetricsTestActor]) + } + + val group = system.actorOf(RoundRobinGroup(routees.map(_.path.toStringWithoutAddress)).props(), routerName) + + //val router = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), routerName) + val initialiseListener = TestProbe() + + // Ensure that the router has been created before returning. + group.tell(Ping, initialiseListener.ref) + initialiseListener.expectMsg(Pong) + + // Cleanup all the metric recording instruments: + collectMetricsOf("user/" + routerName) + + group + } + + def createTestPoolRouter(routerName: String): ActorRef = { + val router = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), routerName) + val initialiseListener = TestProbe() + + // Ensure that the router has been created before returning. + router.tell(Ping, initialiseListener.ref) + initialiseListener.expectMsg(Pong) + + // Cleanup all the metric recording instruments: + collectMetricsOf("user/" + routerName) + + router + } + } +} + +class RouterMetricsTestActor extends Actor { + def receive = { + case Discard ⇒ + case Fail ⇒ throw new ArithmeticException("Division by zero.") + case Ping ⇒ sender ! Pong + case RouterTrackTimings(sendTimestamp, sleep) ⇒ { + val dequeueTimestamp = System.nanoTime() + sleep.map(s ⇒ Thread.sleep(s.toMillis)) + val afterReceiveTimestamp = System.nanoTime() + + sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) + } + } +} + +object RouterMetricsTestActor { + case object Ping + case object Pong + case object Fail + case object Discard + + case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) + case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { + def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp + def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp + } +} |