diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-07-29 19:19:41 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-07-29 19:19:41 +0200 |
commit | 2a51ba02383eaf31037935b466468fc2da8319b7 (patch) | |
tree | 013f64843c3b4a6c3c3ad7f1a03100c3bfea4593 | |
parent | ca1c1e93b1ced1ae24f2a072cb8cabc59971d6fa (diff) | |
download | Kamon-2a51ba02383eaf31037935b466468fc2da8319b7.tar.gz Kamon-2a51ba02383eaf31037935b466468fc2da8319b7.tar.bz2 Kamon-2a51ba02383eaf31037935b466468fc2da8319b7.zip |
= akka: correctly set LookupData for routers used by BalancingPool, fixes #199.
-rw-r--r-- | kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala | 14 | ||||
-rw-r--r-- | kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala | 16 |
2 files changed, 28 insertions, 2 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala index 0b963c16..e90838ad 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala @@ -18,7 +18,7 @@ package akka.kamon.instrumentation import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor } -import akka.actor.{ ActorSystem, ActorSystemImpl } +import akka.actor.{ ActorContext, Props, ActorSystem, ActorSystemImpl } import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool import akka.dispatch._ import akka.kamon.instrumentation.LookupDataAware.LookupData @@ -139,6 +139,18 @@ class DispatcherInstrumentation { } } + @Pointcut("execution(* akka.routing.BalancingPool.newRoutee(..)) && args(props, context)") + def createNewRouteeOnBalancingPool(props: Props, context: ActorContext): Unit = {} + + @Around("createNewRouteeOnBalancingPool(props, context)") + def aroundCreateNewRouteeOnBalancingPool(pjp: ProceedingJoinPoint, props: Props, context: ActorContext): Any = { + val deployPath = context.self.path.elements.drop(1).mkString("/", "/", "") + val dispatcherId = s"BalancingPool-$deployPath" + + LookupDataAware.withLookupData(LookupData(dispatcherId, context.system)) { + pjp.proceed() + } + } } @Aspect diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala index 02a5a0d4..cdd1dd12 100644 --- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala @@ -15,11 +15,13 @@ package kamon.akka -import akka.actor.ActorRef +import akka.actor.{ Props, ActorRef } import akka.dispatch.MessageDispatcher +import akka.routing.BalancingPool import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import kamon.Kamon +import kamon.akka.RouterMetricsTestActor.{ Pong, Ping } import kamon.metric.{ EntityRecorder, EntitySnapshot } import kamon.testkit.BaseKamonSpec @@ -152,6 +154,15 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") should be(empty) } + "play nicely when dispatchers are looked up from a BalancingPool router" in { + val balancingPoolRouter = system.actorOf(BalancingPool(5).props(Props[RouterMetricsTestActor]), "test-balancing-pool") + + balancingPoolRouter ! Ping + expectMsg(Pong) + + findDispatcherRecorder("BalancingPool-/test-balancing-pool", "fork-join-pool") shouldNot be(empty) + } + } def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") @@ -159,6 +170,9 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { def findDispatcherRecorder(dispatcher: MessageDispatcher, dispatcherType: String): Option[EntityRecorder] = Kamon.metrics.find(system.name + "/" + dispatcher.id, "akka-dispatcher", tags = Map("dispatcher-type" -> dispatcherType)) + def findDispatcherRecorder(dispatcherID: String, dispatcherType: String): Option[EntityRecorder] = + Kamon.metrics.find(system.name + "/" + dispatcherID, "akka-dispatcher", tags = Map("dispatcher-type" -> dispatcherType)) + def collectDispatcherMetrics(dispatcher: MessageDispatcher, dispatcherType: String): EntitySnapshot = findDispatcherRecorder(dispatcher, dispatcherType).map(_.collect(collectionContext)).get |