diff options
Diffstat (limited to 'kamon-akka')
-rw-r--r-- | kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala | 28 | ||||
-rw-r--r-- | kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala | 8 |
2 files changed, 23 insertions, 13 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala index 0d504343..691cae13 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -16,6 +16,7 @@ package akka.kamon.instrumentation +import java.lang.reflect.Method import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor } import akka.actor.{ ActorContext, Props, ActorSystem, ActorSystemImpl } @@ -47,15 +48,26 @@ class DispatcherInstrumentation { // Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup. val defaultDispatcher = system.dispatcher - val executorServiceDelegateField = defaultDispatcher.getClass.getDeclaredField("executorServiceDelegate") - executorServiceDelegateField.setAccessible(true) + val defaultDispatcherExecutor = extractExecutor(defaultDispatcher.asInstanceOf[MessageDispatcher]) + registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system) + } - val lazyExecutorServiceDelegate = executorServiceDelegateField.get(defaultDispatcher) - val executorField = lazyExecutorServiceDelegate.getClass.getMethod("executor") - executorField.setAccessible(true) + private def extractExecutor(dispatcher: MessageDispatcher): ExecutorService = { + val executorServiceMethod: Method = { + // executorService is protected + val method = classOf[Dispatcher].getDeclaredMethod("executorService") + method.setAccessible(true) + method + } - val defaultDispatcherExecutor = executorField.invoke(lazyExecutorServiceDelegate).asInstanceOf[ExecutorService] - registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, system) + dispatcher match { + case x: Dispatcher ⇒ + val executor = executorServiceMethod.invoke(x) match { + case delegate: ExecutorServiceDelegate ⇒ delegate.executor + case other ⇒ other + } + executor.asInstanceOf[ExecutorService] + } } private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit = diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala index 3fbb10fd..05165ca5 100644 --- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala @@ -15,13 +15,12 @@ package kamon.akka -import akka.actor.{ Props, ActorRef } +import akka.actor.{ ActorRef, Props } import akka.dispatch.MessageDispatcher import akka.routing.BalancingPool import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.akka.RouterMetricsTestActor.{ Pong, Ping } +import kamon.akka.RouterMetricsTestActor.{ Ping, Pong } import kamon.metric.{ EntityRecorder, EntitySnapshot } import kamon.testkit.BaseKamonSpec import kamon.util.executors.{ ForkJoinPoolMetrics, ThreadPoolExecutorMetrics } @@ -91,7 +90,7 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { } - "clean up the metrics recorders after a dispatcher is shut down" in { + "clean up the metrics recorders after a dispatcher is shutdown" in { implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") @@ -113,7 +112,6 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { findDispatcherRecorder("BalancingPool-/test-balancing-pool", "fork-join-pool") shouldNot be(empty) } - } def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") |