diff options
Diffstat (limited to 'kamon-akka/src/test/scala/kamon')
-rw-r--r-- | kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala | 78 | ||||
-rw-r--r-- | kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala | 45 |
2 files changed, 120 insertions, 3 deletions
diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala index 3fbb10fd..9a5ba101 100644 --- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala @@ -15,11 +15,10 @@ package kamon.akka -import akka.actor.{ Props, ActorRef } +import akka.actor.{ ActorSystem, Props, ActorRef } import akka.dispatch.MessageDispatcher import akka.routing.BalancingPool import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.akka.RouterMetricsTestActor.{ Pong, Ping } import kamon.metric.{ EntityRecorder, EntitySnapshot } @@ -35,11 +34,15 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher")) val fjpDispatcher = forceInit(system.dispatchers.lookup("tracked-fjp")) val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe")) + val customForkJoinPoolBasedDispatcher = forceInit(system.dispatchers.lookup("custom-fjp-based-dispatcher")) + val customThreadPoolExecutorBasedDispatcher = forceInit(system.dispatchers.lookup("custom-tpe-based-dispatcher")) val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded")) findDispatcherRecorder(defaultDispatcher, "fork-join-pool") shouldNot be(empty) findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty) findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty) + findDispatcherRecorder(customForkJoinPoolBasedDispatcher, "fork-join-pool") shouldNot be(empty) + findDispatcherRecorder(customThreadPoolExecutorBasedDispatcher, "thread-pool-executor") shouldNot be(empty) findDispatcherRecorder(excludedDispatcher, "fork-join-pool") should be(empty) } @@ -91,7 +94,7 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { } - "clean up the metrics recorders after a dispatcher is shut down" in { + "clean up the metrics recorders after a dispatcher is shutdown" in { implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") @@ -114,6 +117,75 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { findDispatcherRecorder("BalancingPool-/test-balancing-pool", "fork-join-pool") shouldNot be(empty) } + "record metrics for a custom dispatcher with a based fork-join-executor" in { + implicit val fjpDispatcher = system.dispatchers.lookup("custom-fjp-based-dispatcher") + collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") + + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(fjpDispatcher) + } + }, 5 seconds) + + refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool") + val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") + + snapshot.minMaxCounter("parallelism").get.max should be(10) + snapshot.gauge("pool-size").get.min should be >= 0L + snapshot.gauge("pool-size").get.max should be <= 10L + snapshot.gauge("active-threads").get.max should be >= 0L + snapshot.gauge("running-threads").get.max should be >= 0L + snapshot.gauge("queued-task-count").get.max should be(0) + + } + + "record metrics for a custom dispatcher with a based thread-pool-executor" in { + implicit val tpeDispatcher = system.dispatchers.lookup("custom-tpe-based-dispatcher") + refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor") + collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") + + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(tpeDispatcher) + } + }, 5 seconds) + + refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor") + val snapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") + + snapshot.gauge("active-threads") should not be empty + snapshot.gauge("pool-size").get.min should be >= 7L + snapshot.gauge("pool-size").get.max should be <= 21L + snapshot.gauge("max-pool-size").get.max should be(21) + snapshot.gauge("core-pool-size").get.max should be(21) + snapshot.gauge("processed-tasks").get.max should be(102L +- 5L) + + // The processed tasks should be reset to 0 if no more tasks are submitted. + val secondSnapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") + secondSnapshot.gauge("processed-tasks").get.max should be(0) + } + + "record metrics for a custom default dispatcher with a based fork-join-pool" in { + implicit val fjpDispatcher = ActorSystem("system-with-fjp-based-dispatcher", config.getConfig("actor-system-with-default-custom-fjp-based-dispatcher")).dispatcher.asInstanceOf[MessageDispatcher] + collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") + + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(fjpDispatcher) + } + }, 5 seconds) + + refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool") + val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") + + snapshot.minMaxCounter("parallelism").get.max should be(12) + snapshot.gauge("pool-size").get.min should be >= 0L + snapshot.gauge("pool-size").get.max should be <= 10L + snapshot.gauge("active-threads").get.max should be >= 0L + snapshot.gauge("running-threads").get.max should be >= 0L + snapshot.gauge("queued-task-count").get.max should be(0) + + } } def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") diff --git a/kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala b/kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala new file mode 100644 index 00000000..8b7660d6 --- /dev/null +++ b/kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala @@ -0,0 +1,45 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.akka.dispatch + +import java.util.concurrent.TimeUnit + +import akka.dispatch._ +import com.typesafe.config.Config +import scala.concurrent.duration.{ Duration, FiniteDuration } + +class CustomDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends MessageDispatcherConfigurator(config, prerequisites) { + + private val instance = new AwesomeDispatcher( + this, + config.getString("id"), + config.getInt("throughput"), + FiniteDuration(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS), + configureExecutor(), + FiniteDuration(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)) + + override def dispatcher(): MessageDispatcher = instance +} + +class AwesomeDispatcher(_configurator: MessageDispatcherConfigurator, + id: String, + throughput: Int, + throughputDeadlineTime: Duration, + executorServiceFactoryProvider: ExecutorServiceFactoryProvider, + shutdownTimeout: FiniteDuration) + extends Dispatcher(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout) { +} |