+package kamon.akka
+import java.nio.LongBuffer
+import akka.actor.{ ActorRef, ActorSystem }
+import akka.dispatch.MessageDispatcher
+import akka.testkit.{ TestKitBase, TestProbe }
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.metric.instrument.CollectionContext
+import kamon.metric.{ EntityRecorder, EntitySnapshot }
+import kamon.testkit.BaseKamonSpec
+import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
+import scala.concurrent.duration._
+import scala.concurrent.{ Await, Future }
+class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon.metric {
+ | tick-interval = 1 hour
+ | default-collection-context-buffer-size = 10
+ |
+ | filters = {
+ | akka-dispatcher {
+ | includes = [ "*" ]
+ | excludes = [ "explicitly-excluded" ]
+ | }
+ | }
+ |
+ | default-instrument-settings {
+ | gauge.refresh-interval = 1 hour
+ | min-max-counter.refresh-interval = 1 hour
+ | }
+ |}
+ |
+ |explicitly-excluded {
+ | type = "Dispatcher"
+ | executor = "fork-join-executor"
+ |}
+ |
+ |tracked-fjp {
+ | type = "Dispatcher"
+ | executor = "fork-join-executor"
+ |
+ | fork-join-executor {
+ | parallelism-min = 8
+ | parallelism-factor = 100.0
+ | parallelism-max = 22
+ | }
+ |}
+ |
+ |tracked-tpe {
+ | type = "Dispatcher"
+ | executor = "thread-pool-executor"
+ |
+ | thread-pool-executor {
+ | core-pool-size-min = 7
+ | core-pool-size-factor = 100.0
+ | max-pool-size-factor = 100.0
+ | max-pool-size-max = 21
+ | }
+ |}
+ |
+ """.stripMargin)
+ "the Kamon dispatcher metrics" should {
+ "respect the configured include and exclude filters" in {
+ val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher"))
+ val fjpDispatcher = forceInit(system.dispatchers.lookup("tracked-fjp"))
+ val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe"))
+ val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded"))
+ findDispatcherRecorder(defaultDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(fjpDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(tpeDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(excludedDispatcher) should be(empty)
+ }
+ "record metrics for a dispatcher with thread-pool-executor" in {
+ implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
+ refreshDispatcherInstruments(tpeDispatcher)
+ collectDispatcherMetrics(tpeDispatcher)
+ Await.result({
+ Future.sequence {
+ for (_ ← 1 to 100) yield submit(tpeDispatcher)
+ }
+ }, 5 seconds)
+ refreshDispatcherInstruments(tpeDispatcher)
+ val snapshot = collectDispatcherMetrics(tpeDispatcher)
+ snapshot.gauge("active-threads") should not be empty
+ snapshot.gauge("pool-size").get.min should be >= 7L
+ snapshot.gauge("pool-size").get.max should be <= 21L
+ snapshot.gauge("max-pool-size").get.max should be(21)
+ snapshot.gauge("core-pool-size").get.max should be(21)
+ snapshot.gauge("processed-tasks").get.max should be(102L +- 5L)
+ // The processed tasks should be reset to 0 if no more tasks are submitted.
+ val secondSnapshot = collectDispatcherMetrics(tpeDispatcher)
+ secondSnapshot.gauge("processed-tasks").get.max should be(0)
+ }
+ "record metrics for a dispatcher with fork-join-executor" in {
+ implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
+ collectDispatcherMetrics(fjpDispatcher)
+ Await.result({
+ Future.sequence {
+ for (_ ← 1 to 100) yield submit(fjpDispatcher)
+ }
+ }, 5 seconds)
+ refreshDispatcherInstruments(fjpDispatcher)
+ val snapshot = collectDispatcherMetrics(fjpDispatcher)
+ snapshot.minMaxCounter("parallelism").get.max should be(22)
+ snapshot.gauge("pool-size").get.min should be >= 0L
+ snapshot.gauge("pool-size").get.max should be <= 22L
+ snapshot.gauge("active-threads").get.max should be >= 0L
+ snapshot.gauge("running-threads").get.max should be >= 0L
+ snapshot.gauge("queued-task-count").get.max should be(0)
+ }
+ "clean up the metrics recorders after a dispatcher is shut down" in {
+ implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
+ implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
+ findDispatcherRecorder(fjpDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(tpeDispatcher) shouldNot be(empty)
+ shutdownDispatcher(tpeDispatcher)
+ shutdownDispatcher(fjpDispatcher)
+ findDispatcherRecorder(fjpDispatcher) should be(empty)
+ findDispatcherRecorder(tpeDispatcher) should be(empty)
+ }
+ }
+ def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/")
+ def findDispatcherRecorder(dispatcher: MessageDispatcher): Option[EntityRecorder] =
+ Kamon.metrics.find(dispatcher.id, "akka-dispatcher")
+ def collectDispatcherMetrics(dispatcher: MessageDispatcher): EntitySnapshot =
+ findDispatcherRecorder(dispatcher).map(_.collect(collectionContext)).get
+ def refreshDispatcherInstruments(dispatcher: MessageDispatcher): Unit = {
+ findDispatcherRecorder(dispatcher) match {
+ case Some(tpe: ThreadPoolExecutorDispatcherMetrics) ⇒
+ tpe.processedTasks.refreshValue()
+ tpe.activeThreads.refreshValue()
+ tpe.maxPoolSize.refreshValue()
+ tpe.poolSize.refreshValue()
+ tpe.corePoolSize.refreshValue()
+ case Some(fjp: ForkJoinPoolDispatcherMetrics) ⇒
+ fjp.activeThreads.refreshValue()
+ fjp.poolSize.refreshValue()
+ fjp.queuedTaskCount.refreshValue()
+ fjp.paralellism.refreshValues()
+ fjp.runningThreads.refreshValue()
+ case other ⇒
+ }
+ }
+ def forceInit(dispatcher: MessageDispatcher): MessageDispatcher = {
+ val listener = TestProbe()
+ Future {
+ listener.ref ! "init done"
+ }(dispatcher)
+ listener.expectMsg("init done")
+ dispatcher
+ }
+ def submit(dispatcher: MessageDispatcher): Future[String] = Future {
+ "hello"
+ }(dispatcher)
+ def shutdownDispatcher(dispatcher: MessageDispatcher): Unit = {
+ val shutdownMethod = dispatcher.getClass.getDeclaredMethod("shutdown")
+ shutdownMethod.setAccessible(true)
+ shutdownMethod.invoke(dispatcher)
+ }
+ override protected def afterAll(): Unit = system.shutdown()