diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-03-13 15:21:33 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-03-13 15:21:33 +0100 |
commit | 0bb77ab8d744f57d70db846d3746c5e5878e56ee (patch) | |
tree | f6ca6988f2260dfc44d8dce3440d1fc60c415b82 | |
parent | 37bf47d4ede655df5bda73882a2c15f98b39e820 (diff) | |
download | Kamon-0bb77ab8d744f57d70db846d3746c5e5878e56ee.tar.gz Kamon-0bb77ab8d744f57d70db846d3746c5e5878e56ee.tar.bz2 Kamon-0bb77ab8d744f57d70db846d3746c5e5878e56ee.zip |
! akka: prefix actors, routers and dispatchers with the actor system name.
8 files changed, 74 insertions, 49 deletions
diff --git a/kamon-akka/src/main/resources/reference.conf b/kamon-akka/src/main/resources/reference.conf index cc2b6060..47ce073b 100644 --- a/kamon-akka/src/main/resources/reference.conf +++ b/kamon-akka/src/main/resources/reference.conf @@ -15,10 +15,21 @@ kamon { } metric.filters { - actor.includes = [] - actor.excludes = [ "", "user", "system**", "user/IO-**" ] - } + akka-actor { + includes = [] + excludes = [ "", "*/user", "*/system**", "*/user/IO-**" ] + } + + akka-router { + includes = [] + excludes = [] + } + akka-dispatcher { + includes = [] + excludes = [] + } + } modules { kamon-akka { diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala index d1d420fe..39bdcfe0 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala @@ -34,7 +34,7 @@ class ActorCellInstrumentation { @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - val actorEntity = Entity(ref.path.elements.mkString("/"), ActorMetrics.category) + val actorEntity = Entity(system.name + "/" + ref.path.elements.mkString("/"), ActorMetrics.category) if (Kamon.metrics.shouldTrack(actorEntity)) { val actorMetricsRecorder = Kamon.metrics.entity(ActorMetrics, actorEntity) @@ -60,19 +60,19 @@ class ActorCellInstrumentation { pjp.proceed() } } finally { - cellMetrics.recorder.map { am ⇒ - val processingTime = System.nanoTime() - timestampBeforeProcessing - val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime + val processingTime = System.nanoTime() - timestampBeforeProcessing + val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime + cellMetrics.recorder.map { am ⇒ am.processingTime.record(processingTime) am.timeInMailbox.record(timeInMailbox) am.mailboxSize.decrement() + } - // In case that this actor is behind a router, record the metrics for the router. - envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.map { rm ⇒ - rm.processingTime.record(processingTime) - rm.timeInMailbox.record(timeInMailbox) - } + // In case that this actor is behind a router, record the metrics for the router. + envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.map { rm ⇒ + rm.processingTime.record(processingTime) + rm.timeInMailbox.record(timeInMailbox) } } } @@ -127,7 +127,7 @@ class RoutedActorCellInstrumentation { @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { - val routerEntity = Entity(ref.path.elements.mkString("/"), RouterMetrics.category) + val routerEntity = Entity(system.name + "/" + ref.path.elements.mkString("/"), RouterMetrics.category) if (Kamon.metrics.shouldTrack(routerEntity)) { val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics] diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala index 4567d442..931d8a45 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala @@ -59,16 +59,16 @@ class DispatcherInstrumentation { private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit = executorService match { case fjp: AkkaForkJoinPool ⇒ - val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category) + val dispatcherEntity = Entity(system.name + "/" + dispatcherName, AkkaDispatcherMetrics.Category, tags = Map("dispatcher-type" -> "fork-join-pool")) if (Kamon.metrics.shouldTrack(dispatcherEntity)) - Kamon.metrics.entity(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName, Map("dispatcher-type" -> "fork-join-pool")) + Kamon.metrics.entity(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherEntity) case tpe: ThreadPoolExecutor ⇒ - val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category) + val dispatcherEntity = Entity(system.name + "/" + dispatcherName, AkkaDispatcherMetrics.Category, tags = Map("dispatcher-type" -> "thread-pool-executor")) if (Kamon.metrics.shouldTrack(dispatcherEntity)) - Kamon.metrics.entity(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName, Map("dispatcher-type" -> "thread-pool-executor")) + Kamon.metrics.entity(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherEntity) case others ⇒ // Currently not interested in other kinds of dispatchers. } @@ -126,7 +126,17 @@ class DispatcherInstrumentation { import lazyExecutor.lookupData if (lookupData.actorSystem != null) - Kamon.metrics.removeEntity(Entity(lookupData.dispatcherName, AkkaDispatcherMetrics.Category)) + lazyExecutor.asInstanceOf[ExecutorServiceDelegate].executor match { + case fjp: AkkaForkJoinPool ⇒ + Kamon.metrics.removeEntity(Entity(lookupData.actorSystem.name + "/" + lookupData.dispatcherName, + AkkaDispatcherMetrics.Category, tags = Map("dispatcher-type" -> "fork-join-pool"))) + + case tpe: ThreadPoolExecutor ⇒ + Kamon.metrics.removeEntity(Entity(lookupData.actorSystem.name + "/" + lookupData.dispatcherName, + AkkaDispatcherMetrics.Category, tags = Map("dispatcher-type" -> "thread-pool-executor"))) + + case other ⇒ // nothing to remove. + } } } diff --git a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala index f4db8b8c..cef0071b 100644 --- a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala @@ -38,8 +38,8 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") { | | filters { | akka-actor { - | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ] - | excludes = [ "user/tracked-explicitly-excluded", "user/non-tracked-actor" ] + | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/clean-after-collect", "*/user/stop" ] + | excludes = [ "*/user/tracked-explicitly-excluded", "*/user/non-tracked-actor" ] | } | } | @@ -162,7 +162,7 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") { val buffer: LongBuffer = LongBuffer.allocate(10000) } - def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") + def actorRecorderName(ref: ActorRef): String = system.name + "/" + ref.path.elements.mkString("/") def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetrics] = Kamon.metrics.find(actorRecorderName(ref), ActorMetrics.category).map(_.asInstanceOf[ActorMetrics]) diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala index dd5cfa45..02a5a0d4 100644 --- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala @@ -36,8 +36,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { | | filters = { | akka-dispatcher { - | includes = [ "*" ] - | excludes = [ "explicitly-excluded" ] + | includes = [ "**" ] + | excludes = [ "*/explicitly-excluded" ] | } | } | @@ -84,16 +84,16 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe")) val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded")) - findDispatcherRecorder(defaultDispatcher) shouldNot be(empty) - findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) - findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) - findDispatcherRecorder(excludedDispatcher) should be(empty) + findDispatcherRecorder(defaultDispatcher, "fork-join-pool") shouldNot be(empty) + findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty) + findDispatcherRecorder(excludedDispatcher, "fork-join-pool") should be(empty) } "record metrics for a dispatcher with thread-pool-executor" in { implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") - refreshDispatcherInstruments(tpeDispatcher) - collectDispatcherMetrics(tpeDispatcher) + refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor") + collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") Await.result({ Future.sequence { @@ -101,8 +101,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { } }, 5 seconds) - refreshDispatcherInstruments(tpeDispatcher) - val snapshot = collectDispatcherMetrics(tpeDispatcher) + refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor") + val snapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") snapshot.gauge("active-threads") should not be empty snapshot.gauge("pool-size").get.min should be >= 7L @@ -112,13 +112,13 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { snapshot.gauge("processed-tasks").get.max should be(102L +- 5L) // The processed tasks should be reset to 0 if no more tasks are submitted. - val secondSnapshot = collectDispatcherMetrics(tpeDispatcher) + val secondSnapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") secondSnapshot.gauge("processed-tasks").get.max should be(0) } "record metrics for a dispatcher with fork-join-executor" in { implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") - collectDispatcherMetrics(fjpDispatcher) + collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") Await.result({ Future.sequence { @@ -126,8 +126,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { } }, 5 seconds) - refreshDispatcherInstruments(fjpDispatcher) - val snapshot = collectDispatcherMetrics(fjpDispatcher) + refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool") + val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") snapshot.minMaxCounter("parallelism").get.max should be(22) snapshot.gauge("pool-size").get.min should be >= 0L @@ -142,28 +142,28 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") - findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) - findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) + findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty) shutdownDispatcher(tpeDispatcher) shutdownDispatcher(fjpDispatcher) - findDispatcherRecorder(fjpDispatcher) should be(empty) - findDispatcherRecorder(tpeDispatcher) should be(empty) + findDispatcherRecorder(fjpDispatcher, "fork-join-pool") should be(empty) + findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") should be(empty) } } def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") - def findDispatcherRecorder(dispatcher: MessageDispatcher): Option[EntityRecorder] = - Kamon.metrics.find(dispatcher.id, "akka-dispatcher") + def findDispatcherRecorder(dispatcher: MessageDispatcher, dispatcherType: String): Option[EntityRecorder] = + Kamon.metrics.find(system.name + "/" + dispatcher.id, "akka-dispatcher", tags = Map("dispatcher-type" -> dispatcherType)) - def collectDispatcherMetrics(dispatcher: MessageDispatcher): EntitySnapshot = - findDispatcherRecorder(dispatcher).map(_.collect(collectionContext)).get + def collectDispatcherMetrics(dispatcher: MessageDispatcher, dispatcherType: String): EntitySnapshot = + findDispatcherRecorder(dispatcher, dispatcherType).map(_.collect(collectionContext)).get - def refreshDispatcherInstruments(dispatcher: MessageDispatcher): Unit = { - findDispatcherRecorder(dispatcher) match { + def refreshDispatcherInstruments(dispatcher: MessageDispatcher, dispatcherType: String): Unit = { + findDispatcherRecorder(dispatcher, dispatcherType) match { case Some(tpe: ThreadPoolExecutorDispatcherMetrics) ⇒ tpe.processedTasks.refreshValue() tpe.activeThreads.refreshValue() diff --git a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala index 41c329d6..4128c9ef 100644 --- a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala @@ -39,8 +39,8 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") { | | filters = { | akka-router { - | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ] - | excludes = [ "user/tracked-explicitly-excluded-*"] + | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/stop-*" ] + | excludes = [ "*/user/tracked-explicitly-excluded-*"] | } | } |} @@ -205,7 +205,7 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") { } def routerMetricsRecorderOf(routerName: String): Option[RouterMetrics] = - Kamon.metrics.find(routerName, RouterMetrics.category).map(_.asInstanceOf[RouterMetrics]) + Kamon.metrics.find(system.name + "/" + routerName, RouterMetrics.category).map(_.asInstanceOf[RouterMetrics]) def collectMetricsOf(routerName: String): Option[EntitySnapshot] = { Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala index 58e16bbd..05f333ab 100644 --- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala @@ -205,6 +205,9 @@ trait Metrics { def find(name: String, category: String): Option[EntityRecorder] = find(Entity(name, category)) + def find(name: String, category: String, tags: Map[String, String]): Option[EntityRecorder] = + find(Entity(name, category, tags)) + def find(entity: Entity): Option[EntityRecorder] def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit = diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index 4922a5ba..463894b2 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -22,7 +22,8 @@ kamon { metric { filters { trace.includes = [ "**" ] - actor.includes = [ "**" ] + akka-actor.includes = [ "**" ] + akka-dispatcher.includes = [ "**" ] } } |