aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2015-11-27 14:46:32 -0300
committerDiego <diegolparra@gmail.com>2015-11-27 14:46:32 -0300
commitb019fb4beb90397c8498a3f126d14392eefe0ca3 (patch)
tree8e9258ec9c9f35a27aba3d963027dcc5eb4dd270 /kamon-akka/src
parent394206e0eeeea09f22d50a261f0c231dcf2975fc (diff)
downloadKamon-b019fb4beb90397c8498a3f126d14392eefe0ca3.tar.gz
Kamon-b019fb4beb90397c8498a3f126d14392eefe0ca3.tar.bz2
Kamon-b019fb4beb90397c8498a3f126d14392eefe0ca3.zip
! akka: avoid error thrown in dispatcher instrumentation when using custom dispatchers and close #290
Diffstat (limited to 'kamon-akka/src')
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala28
-rw-r--r--kamon-akka/src/test/resources/application.conf46
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala78
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala45
4 files changed, 185 insertions, 12 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
index 0d504343..691cae13 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,7 @@
package akka.kamon.instrumentation
+import java.lang.reflect.Method
import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor }
import akka.actor.{ ActorContext, Props, ActorSystem, ActorSystemImpl }
@@ -47,15 +48,26 @@ class DispatcherInstrumentation {
// Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup.
val defaultDispatcher = system.dispatcher
- val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate")
- executorServiceDelegateField.setAccessible(true)
+ val defaultDispatcherExecutor = extractExecutor(defaultDispatcher.asInstanceOf[MessageDispatcher])
+ registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system)
+ }
- val lazyExecutorServiceDelegate = executorServiceDelegateField.get(defaultDispatcher)
- val executorField = lazyExecutorServiceDelegate.getClass.getMethod("executor")
- executorField.setAccessible(true)
+ private def extractExecutor(dispatcher: MessageDispatcher): ExecutorService = {
+ val executorServiceMethod: Method = {
+ // executorService is protected
+ val method = classOf[Dispatcher].getDeclaredMethod("executorService")
+ method.setAccessible(true)
+ method
+ }
- val defaultDispatcherExecutor = executorField.invoke(lazyExecutorServiceDelegate).asInstanceOf[ExecutorService]
- registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system)
+ dispatcher match {
+ case x: Dispatcher ⇒
+ val executor = executorServiceMethod.invoke(x) match {
+ case delegate: ExecutorServiceDelegate ⇒ delegate.executor
+ case other ⇒ other
+ }
+ executor.asInstanceOf[ExecutorService]
+ }
}
private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit =
diff --git a/kamon-akka/src/test/resources/application.conf b/kamon-akka/src/test/resources/application.conf
index 5407ccfe..de2cf1e5 100644
--- a/kamon-akka/src/test/resources/application.conf
+++ b/kamon-akka/src/test/resources/application.conf
@@ -60,4 +60,48 @@ tracked-tpe {
max-pool-size-factor = 100.0
max-pool-size-max = 21
}
-} \ No newline at end of file
+}
+
+custom-fjp-based-dispatcher = {
+ type = "kamon.akka.dispatch.CustomDispatcherConfigurator"
+ executor = "fork-join-executor"
+
+ fork-join-executor {
+ parallelism-min = 2
+ parallelism-factor = 100.0
+ parallelism-max = 10
+ }
+}
+
+custom-tpe-based-dispatcher = {
+ type = "kamon.akka.dispatch.CustomDispatcherConfigurator"
+ executor = "thread-pool-executor"
+
+ thread-pool-executor {
+ core-pool-size-min = 7
+ core-pool-size-factor = 100.0
+ max-pool-size-factor = 100.0
+ max-pool-size-max = 21
+ }
+}
+
+actor-system-with-default-custom-fjp-based-dispatcher {
+ akka {
+ loglevel = INFO
+ loggers = [ "akka.event.slf4j.Slf4jLogger" ]
+
+ actor {
+ default-dispatcher = {
+ type = "kamon.akka.dispatch.CustomDispatcherConfigurator"
+ executor = "fork-join-executor"
+
+ fork-join-executor {
+ parallelism-min = 2
+ parallelism-factor = 100.0
+ parallelism-max = 10
+ }
+ }
+ }
+ }
+}
+
diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
index 3fbb10fd..9a5ba101 100644
--- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
@@ -15,11 +15,10 @@
package kamon.akka
-import akka.actor.{ Props, ActorRef }
+import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.dispatch.MessageDispatcher
import akka.routing.BalancingPool
import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
import kamon.Kamon
import kamon.akka.RouterMetricsTestActor.{ Pong, Ping }
import kamon.metric.{ EntityRecorder, EntitySnapshot }
@@ -35,11 +34,15 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher"))
val fjpDispatcher = forceInit(system.dispatchers.lookup("tracked-fjp"))
val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe"))
+ val customForkJoinPoolBasedDispatcher = forceInit(system.dispatchers.lookup("custom-fjp-based-dispatcher"))
+ val customThreadPoolExecutorBasedDispatcher = forceInit(system.dispatchers.lookup("custom-tpe-based-dispatcher"))
val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded"))
findDispatcherRecorder(defaultDispatcher, "fork-join-pool") shouldNot be(empty)
findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty)
findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty)
+ findDispatcherRecorder(customForkJoinPoolBasedDispatcher, "fork-join-pool") shouldNot be(empty)
+ findDispatcherRecorder(customThreadPoolExecutorBasedDispatcher, "thread-pool-executor") shouldNot be(empty)
findDispatcherRecorder(excludedDispatcher, "fork-join-pool") should be(empty)
}
@@ -91,7 +94,7 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
}
- "clean up the metrics recorders after a dispatcher is shut down" in {
+ "clean up the metrics recorders after a dispatcher is shutdown" in {
implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
@@ -114,6 +117,75 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
findDispatcherRecorder("BalancingPool-/test-balancing-pool", "fork-join-pool") shouldNot be(empty)
}
+ "record metrics for a custom dispatcher with a based fork-join-executor" in {
+ implicit val fjpDispatcher = system.dispatchers.lookup("custom-fjp-based-dispatcher")
+ collectDispatcherMetrics(fjpDispatcher, "fork-join-pool")
+
+ Await.result({
+ Future.sequence {
+ for (_ ← 1 to 100) yield submit(fjpDispatcher)
+ }
+ }, 5 seconds)
+
+ refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool")
+ val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool")
+
+ snapshot.minMaxCounter("parallelism").get.max should be(10)
+ snapshot.gauge("pool-size").get.min should be >= 0L
+ snapshot.gauge("pool-size").get.max should be <= 10L
+ snapshot.gauge("active-threads").get.max should be >= 0L
+ snapshot.gauge("running-threads").get.max should be >= 0L
+ snapshot.gauge("queued-task-count").get.max should be(0)
+
+ }
+
+ "record metrics for a custom dispatcher with a based thread-pool-executor" in {
+ implicit val tpeDispatcher = system.dispatchers.lookup("custom-tpe-based-dispatcher")
+ refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor")
+ collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor")
+
+ Await.result({
+ Future.sequence {
+ for (_ ← 1 to 100) yield submit(tpeDispatcher)
+ }
+ }, 5 seconds)
+
+ refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor")
+ val snapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor")
+
+ snapshot.gauge("active-threads") should not be empty
+ snapshot.gauge("pool-size").get.min should be >= 7L
+ snapshot.gauge("pool-size").get.max should be <= 21L
+ snapshot.gauge("max-pool-size").get.max should be(21)
+ snapshot.gauge("core-pool-size").get.max should be(21)
+ snapshot.gauge("processed-tasks").get.max should be(102L +- 5L)
+
+ // The processed tasks should be reset to 0 if no more tasks are submitted.
+ val secondSnapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor")
+ secondSnapshot.gauge("processed-tasks").get.max should be(0)
+ }
+
+ "record metrics for a custom default dispatcher with a based fork-join-pool" in {
+ implicit val fjpDispatcher = ActorSystem("system-with-fjp-based-dispatcher", config.getConfig("actor-system-with-default-custom-fjp-based-dispatcher")).dispatcher.asInstanceOf[MessageDispatcher]
+ collectDispatcherMetrics(fjpDispatcher, "fork-join-pool")
+
+ Await.result({
+ Future.sequence {
+ for (_ ← 1 to 100) yield submit(fjpDispatcher)
+ }
+ }, 5 seconds)
+
+ refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool")
+ val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool")
+
+ snapshot.minMaxCounter("parallelism").get.max should be(12)
+ snapshot.gauge("pool-size").get.min should be >= 0L
+ snapshot.gauge("pool-size").get.max should be <= 10L
+ snapshot.gauge("active-threads").get.max should be >= 0L
+ snapshot.gauge("running-threads").get.max should be >= 0L
+ snapshot.gauge("queued-task-count").get.max should be(0)
+
+ }
}
def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/")
diff --git a/kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala b/kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala
new file mode 100644
index 00000000..8b7660d6
--- /dev/null
+++ b/kamon-akka/src/test/scala/kamon/akka/dispatch/CustomDispatcherConfigurator.scala
@@ -0,0 +1,45 @@
+/* =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.akka.dispatch
+
+import java.util.concurrent.TimeUnit
+
+import akka.dispatch._
+import com.typesafe.config.Config
+import scala.concurrent.duration.{ Duration, FiniteDuration }
+
+class CustomDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
+ extends MessageDispatcherConfigurator(config, prerequisites) {
+
+ private val instance = new AwesomeDispatcher(
+ this,
+ config.getString("id"),
+ config.getInt("throughput"),
+ FiniteDuration(config.getDuration("throughput-deadline-time", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS),
+ configureExecutor(),
+ FiniteDuration(config.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS))
+
+ override def dispatcher(): MessageDispatcher = instance
+}
+
+class AwesomeDispatcher(_configurator: MessageDispatcherConfigurator,
+ id: String,
+ throughput: Int,
+ throughputDeadlineTime: Duration,
+ executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
+ shutdownTimeout: FiniteDuration)
+ extends Dispatcher(_configurator, id, throughput, throughputDeadlineTime, executorServiceFactoryProvider, shutdownTimeout) {
+}