diff options
Diffstat (limited to 'kamon-akka')
4 files changed, 185 insertions, 12 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala index 0d504343..691cae13 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -16,6 +16,7 @@ package akka.kamon.instrumentation +import java.lang.reflect.Method import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor } import akka.actor.{ ActorContext, Props, ActorSystem, ActorSystemImpl } @@ -47,15 +48,26 @@ class DispatcherInstrumentation { // Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup. val defaultDispatcher = system.dispatcher - val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate") - executorServiceDelegateField.setAccessible(true) + val defaultDispatcherExecutor = extractExecutor(defaultDispatcher.asInstanceOf[MessageDispatcher]) + registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system) + } - val lazyExecutorServiceDelegate = executorServiceDelegateField.get(defaultDispatcher) - val executorField = lazyExecutorServiceDelegate.getClass.getMethod("executor") - executorField.setAccessible(true) + private def extractExecutor(dispatcher: MessageDispatcher): ExecutorService = { + val executorServiceMethod: Method = { + // executorService is protected + val method = classOf[Dispatcher].getDeclaredMethod("executorService") + method.setAccessible(true) + method + } - val defaultDispatcherExecutor = executorField.invoke(lazyExecutorServiceDelegate).asInstanceOf[ExecutorService] - registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system) + dispatcher match { + case x: Dispatcher ⇒ + val executor = executorServiceMethod.invoke(x) match { + case delegate: ExecutorServiceDelegate ⇒ delegate.executor + case other ⇒ other + } + executor.asInstanceOf[ExecutorService] + } } private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit = diff --git a/kamon-akka/src/test/resources/application.conf b/kamon-akka/src/test/resources/application.conf index 5407ccfe..de2cf1e5 100644 --- a/kamon-akka/src/test/resources/application.conf +++ b/kamon-akka/src/test/resources/application.conf @@ -60,4 +60,48 @@ tracked-tpe { max-pool-size-factor = 100.0 max-pool-size-max = 21 } -}
\ No newline at end of file +} + +custom-fjp-based-dispatcher = { + type = "kamon.akka.dispatch.CustomDispatcherConfigurator" + executor = "fork-join-executor" + + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 100.0 + parallelism-max = 10 + } +} + +custom-tpe-based-dispatcher = { + type = "kamon.akka.dispatch.CustomDispatcherConfigurator" + executor = "thread-pool-executor" + + thread-pool-executor { + core-pool-size-min = 7 + core-pool-size-factor = 100.0 + max-pool-size-factor = 100.0 + max-pool-size-max = 21 + } +} + +actor-system-with-default-custom-fjp-based-dispatcher { + akka { + loglevel = INFO + loggers = [ "akka.event.slf4j.Slf4jLogger" ] + + actor { + default-dispatcher = { + type = "kamon.akka.dispatch.CustomDispatcherConfigurator" + executor = "fork-join-executor" + + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 100.0 + parallelism-max = 10 + } + } + } + } +} + diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala index 3fbb10fd..9a5ba101 100644 --- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala @@ -15,11 +15,10 @@ package kamon.akka -import akka.actor.{ Props, ActorRef } +import akka.actor.{ ActorSystem, Props, ActorRef } import akka.dispatch.MessageDispatcher import akka.routing.BalancingPool import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.akka.RouterMetricsTestActor.{ Pong, Ping } import kamon.metric.{ EntityRecorder, EntitySnapshot } @@ -35,11 +34,15 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher")) val fjpDispatcher = forceInit(system.dispatchers.lookup("tracked-fjp")) val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe")) + val customForkJoinPoolBasedDispatcher = forceInit(system.dispatchers.lookup("custom-fjp-based-dispatcher")) + val customThreadPoolExecutorBasedDispatcher = forceInit(system.dispatchers.lookup("custom-tpe-based-dispatcher")) val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded")) findDispatcherRecorder(defaultDispatcher, "fork-join-pool") shouldNot be(empty) findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty) findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty) + findDispatcherRecorder(customForkJoinPoolBasedDispatcher, "fork-join-pool") shouldNot be(empty) + findDispatcherRecorder(customThreadPoolExecutorBasedDispatcher, "thread-pool-executor") shouldNot be(empty) findDispatcherRecorder(excludedDispatcher, "fork-join-pool") should be(empty) } @@ -91,7 +94,7 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { } - "clean up the metrics recorders after a dispatcher is shut down" in { + "clean up the metrics recorders after a dispatcher is shutdown" in { implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") @@ -114,6 +117,75 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { findDispatcherRecorder("BalancingPool-/test-balancing-pool", "fork-join-pool") shouldNot be(empty) } + "record metrics for a custom dispatcher with a based fork-join-executor" in { + implicit val fjpDispatcher = system.dispatchers.lookup("custom-fjp-based-dispatcher") + collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") + + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(fjpDispatcher) + } + }, 5 seconds) + + refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool") + val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") + + snapshot.minMaxCounter("parallelism").get.max should be(10) + snapshot.gauge("pool-size").get.min should be >= 0L + snapshot.gauge("pool-size").get.max should be <= 10L + snapshot.gauge("active-threads").get.max should be >= 0L + snapshot.gauge("running-threads").get.max should be >= 0L + snapshot.gauge("queued-task-count").get.max should be(0) + + } + + "record metrics for a custom dispatcher with a based thread-pool-executor" in { + implicit val tpeDispatcher = system.dispatchers.lookup("custom-tpe-based-dispatcher") + refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor") + collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") + + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(tpeDispatcher) + } + }, 5 seconds) + + refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor") + val snapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") + + snapshot.gauge("active-threads") should not be empty + snapshot.gauge("pool-size").get.min should be >= 7L + snapshot.gauge("pool-size").get.max should be <= 21L + snapshot.gauge("max-pool-size").get.max should be(21) + snapshot.gauge("core-pool-size").get.max should be(21) + snapshot.gauge("processed-tasks").get.max should be(102L +- 5L) + + // The processed tasks should be reset to 0 if no more tasks are submitted. + val secondSnapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") + secondSnapshot.gauge("processed-tasks").get.max should be(0) + } + + "record metrics for a custom default dispatcher with a based fork-join-pool" in { + implicit val fjpDispatcher = ActorSystem("system-with-fjp-based-dispatcher", config.getConfig("actor-system-with-default-custom-fjp-based-dispatcher")).dispatcher.asInstanceOf[MessageDispatcher] + collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") + + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(fjpDispatcher) + } + }, 5 seconds) + + refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool") + val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") + + snapshot.minMaxCounter("parallelism").get.max should be(12) + snapshot.gauge("pool-size").get.min should be >= 0L + snapshot.gauge("pool-size").get.max should be <= 10L + snapshot.gauge("active-threads").get.max should be >= 0L + snapshot.gauge("running-threads").get.max should be >= 0L + snapshot.gauge("queued-task-count").get.max should be(0) + + } } def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") diff --git a/kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala b/kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala new file mode 100644 index 00000000..8b7660d6 --- /dev/null +++ b/kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala @@ -0,0 +1,45 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.akka.dispatch + +import java.util.concurrent.TimeUnit + +import akka.dispatch._ +import com.typesafe.config.Config +import scala.concurrent.duration.{ Duration, FiniteDuration } + +class CustomDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends MessageDispatcherConfigurator(config, prerequisites) { + + private val instance = new AwesomeDispatcher( + this, + config.getString("id"), + config.getInt("throughput"), + FiniteDuration(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS), + configureExecutor(), + FiniteDuration(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)) + + override def dispatcher(): MessageDispatcher = instance +} + +class AwesomeDispatcher(_configurator: MessageDispatcherConfigurator, + id: String, + throughput: Int, + throughputDeadlineTime: Duration, + executorServiceFactoryProvider: ExecutorServiceFactoryProvider, + shutdownTimeout: FiniteDuration) + extends Dispatcher(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout) { +} |