aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-akka')
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala28
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala8
2 files changed, 23 insertions, 13 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
index 0d504343..691cae13 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,7 @@
package akka.kamon.instrumentation
+import java.lang.reflect.Method
import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor }
import akka.actor.{ ActorContext, Props, ActorSystem, ActorSystemImpl }
@@ -47,15 +48,26 @@ class DispatcherInstrumentation {
// Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup.
val defaultDispatcher = system.dispatcher
- val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate")
- executorServiceDelegateField.setAccessible(true)
+ val defaultDispatcherExecutor = extractExecutor(defaultDispatcher.asInstanceOf[MessageDispatcher])
+ registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system)
+ }
- val lazyExecutorServiceDelegate = executorServiceDelegateField.get(defaultDispatcher)
- val executorField = lazyExecutorServiceDelegate.getClass.getMethod("executor")
- executorField.setAccessible(true)
+ private def extractExecutor(dispatcher: MessageDispatcher): ExecutorService = {
+ val executorServiceMethod: Method = {
+ // executorService is protected
+ val method = classOf[Dispatcher].getDeclaredMethod("executorService")
+ method.setAccessible(true)
+ method
+ }
- val defaultDispatcherExecutor = executorField.invoke(lazyExecutorServiceDelegate).asInstanceOf[ExecutorService]
- registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system)
+ dispatcher match {
+ case x: Dispatcher ⇒
+ val executor = executorServiceMethod.invoke(x) match {
+ case delegate: ExecutorServiceDelegate ⇒ delegate.executor
+ case other ⇒ other
+ }
+ executor.asInstanceOf[ExecutorService]
+ }
}
private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit =
diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
index 3fbb10fd..05165ca5 100644
--- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
@@ -15,13 +15,12 @@
package kamon.akka
-import akka.actor.{ Props, ActorRef }
+import akka.actor.{ ActorRef, Props }
import akka.dispatch.MessageDispatcher
import akka.routing.BalancingPool
import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
import kamon.Kamon
-import kamon.akka.RouterMetricsTestActor.{ Pong, Ping }
+import kamon.akka.RouterMetricsTestActor.{ Ping, Pong }
import kamon.metric.{ EntityRecorder, EntitySnapshot }
import kamon.testkit.BaseKamonSpec
import kamon.util.executors.{ ForkJoinPoolMetrics, ThreadPoolExecutorMetrics }
@@ -91,7 +90,7 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
}
- "clean up the metrics recorders after a dispatcher is shut down" in {
+ "clean up the metrics recorders after a dispatcher is shutdown" in {
implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
@@ -113,7 +112,6 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
findDispatcherRecorder("BalancingPool-/test-balancing-pool", "fork-join-pool") shouldNot be(empty)
}
-
}
def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/")