aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala')
-rw-r--r--kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala213
1 files changed, 158 insertions, 55 deletions
diff --git a/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala
index 55af3f2e..2c530da9 100644
--- a/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala
@@ -15,96 +15,199 @@
package kamon.metric
-import akka.actor.{ ActorRef, ActorSystem, Props }
+import java.nio.LongBuffer
+
+import akka.actor.{ PoisonPill, Props, ActorRef, ActorSystem }
+import akka.dispatch.MessageDispatcher
import akka.testkit.{ TestKitBase, TestProbe }
import com.typesafe.config.ConfigFactory
import kamon.Kamon
-import kamon.akka.DispatcherMetrics
-import DispatcherMetrics.DispatcherMetricSnapshot
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.akka.{ ForkJoinPoolDispatcherMetrics, ThreadPoolExecutorDispatcherMetrics }
+import kamon.metric.ActorMetricsTestActor.{ Pong, Ping }
+import kamon.metric.instrument.CollectionContext
+import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
+import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
-class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
+class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll {
implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString(
"""
- |kamon.metrics {
- | tick-interval = 1 second
+ |kamon.metric {
+ | tick-interval = 1 hour
| default-collection-context-buffer-size = 10
|
- | filters = [
- | {
- | dispatcher {
- | includes = ["*"]
- | excludes = ["dispatcher-explicitly-excluded"]
- | }
+ | filters = {
+ | akka-dispatcher {
+ | includes = [ "*" ]
+ | excludes = [ "explicitly-excluded" ]
| }
- | ]
+ | }
+ |
+ | default-instrument-settings {
+ | gauge.refresh-interval = 1 hour
+ | min-max-counter.refresh-interval = 1 hour
+ | }
+ |}
+ |
+ |explicitly-excluded {
+ | type = "Dispatcher"
+ | executor = "fork-join-executor"
|}
|
- |dispatcher-explicitly-excluded {
- | type = "Dispatcher"
- | executor = "fork-join-executor"
+ |tracked-fjp {
+ | type = "Dispatcher"
+ | executor = "fork-join-executor"
+ |
+ | fork-join-executor {
+ | parallelism-min = 8
+ | parallelism-factor = 100.0
+ | parallelism-max = 22
+ | }
|}
|
- |tracked-dispatcher {
- | type = "Dispatcher"
- | executor = "thread-pool-executor"
+ |tracked-tpe {
+ | type = "Dispatcher"
+ | executor = "thread-pool-executor"
+ |
+ | thread-pool-executor {
+ | core-pool-size-min = 7
+ | core-pool-size-factor = 100.0
+ | max-pool-size-factor = 100.0
+ | max-pool-size-max = 21
+ | }
|}
|
""".stripMargin))
"the Kamon dispatcher metrics" should {
"respect the configured include and exclude filters" in {
- system.actorOf(Props[ActorMetricsTestActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher")
- system.actorOf(Props[ActorMetricsTestActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher")
+ val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher"))
+ val fjpDispatcher = forceInit(system.dispatchers.lookup("tracked-fjp"))
+ val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe"))
+ val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded"))
+
+ findDispatcherRecorder(defaultDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(fjpDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(tpeDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(excludedDispatcher) should be(empty)
+ }
- Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true)
- expectMsgType[TickMetricSnapshot]
+ "record metrics for a dispatcher with thread-pool-executor" in {
+ implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
+ collectDispatcherMetrics(tpeDispatcher)
- within(2 seconds) {
- val tickSnapshot = expectMsgType[TickMetricSnapshot]
- tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher"))
- tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded"))
- }
+ Await.result({
+ Future.sequence {
+ for (_ ← 1 to 100) yield submit(tpeDispatcher)
+ }
+ }, 5 seconds)
+
+ refreshDispatcherInstruments(tpeDispatcher)
+ val snapshot = collectDispatcherMetrics(tpeDispatcher)
+
+ snapshot.gauge("active-threads") should not be empty
+ snapshot.gauge("pool-size").get.min should be >= 7L
+ snapshot.gauge("pool-size").get.max should be <= 21L
+ snapshot.gauge("max-pool-size").get.max should be(21)
+ snapshot.gauge("core-pool-size").get.max should be(21)
+ snapshot.gauge("processed-tasks").get.max should be(102L +- 5L)
+
+ // The processed tasks should be reset to 0 if no more tasks are submitted.
+ val secondSnapshot = collectDispatcherMetrics(tpeDispatcher)
+ secondSnapshot.gauge("processed-tasks").get.max should be(0)
}
- "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture {
- val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher")
+ "record metrics for a dispatcher with fork-join-executor" in {
+ implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
+ collectDispatcherMetrics(fjpDispatcher)
+
+ Await.result({
+ Future.sequence {
+ for (_ ← 1 to 100) yield submit(fjpDispatcher)
+ }
+ }, 5 seconds)
- for (_ ← 1 to 100) {
- //delayable ! Discard
- }
+ refreshDispatcherInstruments(fjpDispatcher)
+ val snapshot = collectDispatcherMetrics(fjpDispatcher)
+
+ snapshot.minMaxCounter("parallelism").get.max should be(22)
+ snapshot.gauge("pool-size").get.min should be >= 0L
+ snapshot.gauge("pool-size").get.max should be <= 22L
+ snapshot.gauge("active-threads").get.max should be >= 0L
+ snapshot.gauge("running-threads").get.max should be >= 0L
+ snapshot.gauge("queued-task-count").get.max should be(0)
- val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds)
- dispatcherMetrics.maximumPoolSize.max should be <= 64L //fail in travis
- dispatcherMetrics.poolSize.max should be <= 22L //fail in travis
- dispatcherMetrics.queueTaskCount.max should be(0L)
- dispatcherMetrics.runningThreadCount.max should be(0L)
}
- }
+ "clean up the metrics recorders after a dispatcher is shut down" in {
+ implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
+ implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
+
+ findDispatcherRecorder(fjpDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(tpeDispatcher) shouldNot be(empty)
- def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = {
- val tickSnapshot = within(waitTime) {
- listener.expectMsgType[TickMetricSnapshot]
+ shutdownDispatcher(tpeDispatcher)
+ shutdownDispatcher(fjpDispatcher)
+
+ findDispatcherRecorder(fjpDispatcher) should be(empty)
+ findDispatcherRecorder(tpeDispatcher) should be(empty)
}
- val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId))
- dispatcherMetricsOption should not be empty
- dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot]
+
+ }
+
+ val collectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(10000)
}
- trait DelayableActorFixture {
- def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = {
- val actor = system.actorOf(Props[ActorMetricsTestActor].withDispatcher(dispatcher), name)
- val metricsListener = TestProbe()
+ def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/")
- Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true)
- // Wait for one empty snapshot before proceeding to the test.
- metricsListener.expectMsgType[TickMetricSnapshot]
+ def findDispatcherRecorder(dispatcher: MessageDispatcher): Option[EntityRecorder] =
+ Kamon(Metrics)(system).find(dispatcher.id, "akka-dispatcher")
- (actor, metricsListener)
+ def collectDispatcherMetrics(dispatcher: MessageDispatcher): EntitySnapshot =
+ findDispatcherRecorder(dispatcher).map(_.collect(collectionContext)).get
+
+ def refreshDispatcherInstruments(dispatcher: MessageDispatcher): Unit = {
+ findDispatcherRecorder(dispatcher) match {
+ case Some(tpe: ThreadPoolExecutorDispatcherMetrics) ⇒
+ tpe.processedTasks.refreshValue()
+ tpe.activeThreads.refreshValue()
+ tpe.maxPoolSize.refreshValue()
+ tpe.poolSize.refreshValue()
+ tpe.corePoolSize.refreshValue()
+
+ case Some(fjp: ForkJoinPoolDispatcherMetrics) ⇒
+ fjp.activeThreads.refreshValue()
+ fjp.poolSize.refreshValue()
+ fjp.queuedTaskCount.refreshValue()
+ fjp.paralellism.refreshValues()
+ fjp.runningThreads.refreshValue()
+
+ case other ⇒
}
}
+
+ def forceInit(dispatcher: MessageDispatcher): MessageDispatcher = {
+ val listener = TestProbe()
+ Future {
+ listener.ref ! "init done"
+ }(dispatcher)
+ listener.expectMsg("init done")
+
+ dispatcher
+ }
+
+ def submit(dispatcher: MessageDispatcher): Future[String] = Future {
+ "hello"
+ }(dispatcher)
+
+ def shutdownDispatcher(dispatcher: MessageDispatcher): Unit = {
+ val shutdownMethod = dispatcher.getClass.getDeclaredMethod("shutdown")
+ shutdownMethod.setAccessible(true)
+ shutdownMethod.invoke(dispatcher)
+ }
+
+ override protected def afterAll(): Unit = system.shutdown()
}
+