aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-07-29 19:19:41 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2015-07-29 19:19:41 +0200
commit2a51ba02383eaf31037935b466468fc2da8319b7 (patch)
tree013f64843c3b4a6c3c3ad7f1a03100c3bfea4593
parentca1c1e93b1ced1ae24f2a072cb8cabc59971d6fa (diff)
downloadKamon-2a51ba02383eaf31037935b466468fc2da8319b7.tar.gz
Kamon-2a51ba02383eaf31037935b466468fc2da8319b7.tar.bz2
Kamon-2a51ba02383eaf31037935b466468fc2da8319b7.zip
= akka: correctly set LookupData for routers used by BalancingPool, fixes #199.
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala14
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala16
2 files changed, 28 insertions, 2 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
index 0b963c16..e90838ad 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
@@ -18,7 +18,7 @@ package akka.kamon.instrumentation
import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor }
-import akka.actor.{ ActorSystem, ActorSystemImpl }
+import akka.actor.{ ActorContext, Props, ActorSystem, ActorSystemImpl }
import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
import akka.dispatch._
import akka.kamon.instrumentation.LookupDataAware.LookupData
@@ -139,6 +139,18 @@ class DispatcherInstrumentation {
}
}
+ @Pointcut("execution(* akka.routing.BalancingPool.newRoutee(..)) && args(props, context)")
+ def createNewRouteeOnBalancingPool(props: Props, context: ActorContext): Unit = {}
+
+ @Around("createNewRouteeOnBalancingPool(props, context)")
+ def aroundCreateNewRouteeOnBalancingPool(pjp: ProceedingJoinPoint, props: Props, context: ActorContext): Any = {
+ val deployPath = context.self.path.elements.drop(1).mkString("/", "/", "")
+ val dispatcherId = s"BalancingPool-$deployPath"
+
+ LookupDataAware.withLookupData(LookupData(dispatcherId, context.system)) {
+ pjp.proceed()
+ }
+ }
}
@Aspect
diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
index 02a5a0d4..cdd1dd12 100644
--- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
@@ -15,11 +15,13 @@
package kamon.akka
-import akka.actor.ActorRef
+import akka.actor.{ Props, ActorRef }
import akka.dispatch.MessageDispatcher
+import akka.routing.BalancingPool
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import kamon.Kamon
+import kamon.akka.RouterMetricsTestActor.{ Pong, Ping }
import kamon.metric.{ EntityRecorder, EntitySnapshot }
import kamon.testkit.BaseKamonSpec
@@ -152,6 +154,15 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") should be(empty)
}
+ "play nicely when dispatchers are looked up from a BalancingPool router" in {
+ val balancingPoolRouter = system.actorOf(BalancingPool(5).props(Props[RouterMetricsTestActor]), "test-balancing-pool")
+
+ balancingPoolRouter ! Ping
+ expectMsg(Pong)
+
+ findDispatcherRecorder("BalancingPool-/test-balancing-pool", "fork-join-pool") shouldNot be(empty)
+ }
+
}
def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/")
@@ -159,6 +170,9 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
def findDispatcherRecorder(dispatcher: MessageDispatcher, dispatcherType: String): Option[EntityRecorder] =
Kamon.metrics.find(system.name + "/" + dispatcher.id, "akka-dispatcher", tags = Map("dispatcher-type" -> dispatcherType))
+ def findDispatcherRecorder(dispatcherID: String, dispatcherType: String): Option[EntityRecorder] =
+ Kamon.metrics.find(system.name + "/" + dispatcherID, "akka-dispatcher", tags = Map("dispatcher-type" -> dispatcherType))
+
def collectDispatcherMetrics(dispatcher: MessageDispatcher, dispatcherType: String): EntitySnapshot =
findDispatcherRecorder(dispatcher, dispatcherType).map(_.collect(collectionContext)).get