diff options
Diffstat (limited to 'kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala')
-rw-r--r-- | kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala | 213 |
1 files changed, 158 insertions, 55 deletions
diff --git a/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala index 55af3f2e..2c530da9 100644 --- a/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala @@ -15,96 +15,199 @@ package kamon.metric -import akka.actor.{ ActorRef, ActorSystem, Props } +import java.nio.LongBuffer + +import akka.actor.{ PoisonPill, Props, ActorRef, ActorSystem } +import akka.dispatch.MessageDispatcher import akka.testkit.{ TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.akka.DispatcherMetrics -import DispatcherMetrics.DispatcherMetricSnapshot -import kamon.metric.Subscriptions.TickMetricSnapshot -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.akka.{ ForkJoinPoolDispatcherMetrics, ThreadPoolExecutorDispatcherMetrics } +import kamon.metric.ActorMetricsTestActor.{ Pong, Ping } +import kamon.metric.instrument.CollectionContext +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers { +class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll { implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString( """ - |kamon.metrics { - | tick-interval = 1 second + |kamon.metric { + | tick-interval = 1 hour | default-collection-context-buffer-size = 10 | - | filters = [ - | { - | dispatcher { - | includes = ["*"] - | excludes = ["dispatcher-explicitly-excluded"] - | } + | filters = { + | akka-dispatcher { + | includes = [ "*" ] + | excludes = [ "explicitly-excluded" ] | } - | ] + | } + | + | default-instrument-settings { + | gauge.refresh-interval = 1 hour + | min-max-counter.refresh-interval = 1 hour + | } + |} + | + |explicitly-excluded { + | type = "Dispatcher" + | executor = "fork-join-executor" |} | - |dispatcher-explicitly-excluded { - | type = "Dispatcher" - | executor = "fork-join-executor" + |tracked-fjp { + | type = "Dispatcher" + | executor = "fork-join-executor" + | + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 100.0 + | parallelism-max = 22 + | } |} | - |tracked-dispatcher { - | type = "Dispatcher" - | executor = "thread-pool-executor" + |tracked-tpe { + | type = "Dispatcher" + | executor = "thread-pool-executor" + | + | thread-pool-executor { + | core-pool-size-min = 7 + | core-pool-size-factor = 100.0 + | max-pool-size-factor = 100.0 + | max-pool-size-max = 21 + | } |} | """.stripMargin)) "the Kamon dispatcher metrics" should { "respect the configured include and exclude filters" in { - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher") - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher") + val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher")) + val fjpDispatcher = forceInit(system.dispatchers.lookup("tracked-fjp")) + val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe")) + val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded")) + + findDispatcherRecorder(defaultDispatcher) shouldNot be(empty) + findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) + findDispatcherRecorder(excludedDispatcher) should be(empty) + } - Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] + "record metrics for a dispatcher with thread-pool-executor" in { + implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") + collectDispatcherMetrics(tpeDispatcher) - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher")) - tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded")) - } + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(tpeDispatcher) + } + }, 5 seconds) + + refreshDispatcherInstruments(tpeDispatcher) + val snapshot = collectDispatcherMetrics(tpeDispatcher) + + snapshot.gauge("active-threads") should not be empty + snapshot.gauge("pool-size").get.min should be >= 7L + snapshot.gauge("pool-size").get.max should be <= 21L + snapshot.gauge("max-pool-size").get.max should be(21) + snapshot.gauge("core-pool-size").get.max should be(21) + snapshot.gauge("processed-tasks").get.max should be(102L +- 5L) + + // The processed tasks should be reset to 0 if no more tasks are submitted. + val secondSnapshot = collectDispatcherMetrics(tpeDispatcher) + secondSnapshot.gauge("processed-tasks").get.max should be(0) } - "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture { - val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher") + "record metrics for a dispatcher with fork-join-executor" in { + implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") + collectDispatcherMetrics(fjpDispatcher) + + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(fjpDispatcher) + } + }, 5 seconds) - for (_ ← 1 to 100) { - //delayable ! Discard - } + refreshDispatcherInstruments(fjpDispatcher) + val snapshot = collectDispatcherMetrics(fjpDispatcher) + + snapshot.minMaxCounter("parallelism").get.max should be(22) + snapshot.gauge("pool-size").get.min should be >= 0L + snapshot.gauge("pool-size").get.max should be <= 22L + snapshot.gauge("active-threads").get.max should be >= 0L + snapshot.gauge("running-threads").get.max should be >= 0L + snapshot.gauge("queued-task-count").get.max should be(0) - val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds) - dispatcherMetrics.maximumPoolSize.max should be <= 64L //fail in travis - dispatcherMetrics.poolSize.max should be <= 22L //fail in travis - dispatcherMetrics.queueTaskCount.max should be(0L) - dispatcherMetrics.runningThreadCount.max should be(0L) } - } + "clean up the metrics recorders after a dispatcher is shut down" in { + implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") + implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") + + findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) - def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = { - val tickSnapshot = within(waitTime) { - listener.expectMsgType[TickMetricSnapshot] + shutdownDispatcher(tpeDispatcher) + shutdownDispatcher(fjpDispatcher) + + findDispatcherRecorder(fjpDispatcher) should be(empty) + findDispatcherRecorder(tpeDispatcher) should be(empty) } - val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId)) - dispatcherMetricsOption should not be empty - dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot] + + } + + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) } - trait DelayableActorFixture { - def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = { - val actor = system.actorOf(Props[ActorMetricsTestActor].withDispatcher(dispatcher), name) - val metricsListener = TestProbe() + def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") - Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true) - // Wait for one empty snapshot before proceeding to the test. - metricsListener.expectMsgType[TickMetricSnapshot] + def findDispatcherRecorder(dispatcher: MessageDispatcher): Option[EntityRecorder] = + Kamon(Metrics)(system).find(dispatcher.id, "akka-dispatcher") - (actor, metricsListener) + def collectDispatcherMetrics(dispatcher: MessageDispatcher): EntitySnapshot = + findDispatcherRecorder(dispatcher).map(_.collect(collectionContext)).get + + def refreshDispatcherInstruments(dispatcher: MessageDispatcher): Unit = { + findDispatcherRecorder(dispatcher) match { + case Some(tpe: ThreadPoolExecutorDispatcherMetrics) ⇒ + tpe.processedTasks.refreshValue() + tpe.activeThreads.refreshValue() + tpe.maxPoolSize.refreshValue() + tpe.poolSize.refreshValue() + tpe.corePoolSize.refreshValue() + + case Some(fjp: ForkJoinPoolDispatcherMetrics) ⇒ + fjp.activeThreads.refreshValue() + fjp.poolSize.refreshValue() + fjp.queuedTaskCount.refreshValue() + fjp.paralellism.refreshValues() + fjp.runningThreads.refreshValue() + + case other ⇒ } } + + def forceInit(dispatcher: MessageDispatcher): MessageDispatcher = { + val listener = TestProbe() + Future { + listener.ref ! "init done" + }(dispatcher) + listener.expectMsg("init done") + + dispatcher + } + + def submit(dispatcher: MessageDispatcher): Future[String] = Future { + "hello" + }(dispatcher) + + def shutdownDispatcher(dispatcher: MessageDispatcher): Unit = { + val shutdownMethod = dispatcher.getClass.getDeclaredMethod("shutdown") + shutdownMethod.setAccessible(true) + shutdownMethod.invoke(dispatcher) + } + + override protected def afterAll(): Unit = system.shutdown() } + |