aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-03-13 15:21:33 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-03-13 15:21:33 +0100
commit0bb77ab8d744f57d70db846d3746c5e5878e56ee (patch)
treef6ca6988f2260dfc44d8dce3440d1fc60c415b82
parent37bf47d4ede655df5bda73882a2c15f98b39e820 (diff)
downloadKamon-0bb77ab8d744f57d70db846d3746c5e5878e56ee.tar.gz
Kamon-0bb77ab8d744f57d70db846d3746c5e5878e56ee.tar.bz2
Kamon-0bb77ab8d744f57d70db846d3746c5e5878e56ee.zip
! akka: prefix actors, routers and dispatchers with the actor system name.
-rw-r--r--kamon-akka/src/main/resources/reference.conf17
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala20
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala20
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala6
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala48
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Metrics.scala3
-rw-r--r--kamon-playground/src/main/resources/application.conf3
8 files changed, 74 insertions, 49 deletions
diff --git a/kamon-akka/src/main/resources/reference.conf b/kamon-akka/src/main/resources/reference.conf
index cc2b6060..47ce073b 100644
--- a/kamon-akka/src/main/resources/reference.conf
+++ b/kamon-akka/src/main/resources/reference.conf
@@ -15,10 +15,21 @@ kamon {
}
metric.filters {
- actor.includes = []
- actor.excludes = [ "", "user", "system**", "user/IO-**" ]
- }
+ akka-actor {
+ includes = []
+ excludes = [ "", "*/user", "*/system**", "*/user/IO-**" ]
+ }
+
+ akka-router {
+ includes = []
+ excludes = []
+ }
+ akka-dispatcher {
+ includes = []
+ excludes = []
+ }
+ }
modules {
kamon-akka {
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala
index d1d420fe..39bdcfe0 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorCellInstrumentation.scala
@@ -34,7 +34,7 @@ class ActorCellInstrumentation {
@After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
- val actorEntity = Entity(ref.path.elements.mkString("/"), ActorMetrics.category)
+ val actorEntity = Entity(system.name + "/" + ref.path.elements.mkString("/"), ActorMetrics.category)
if (Kamon.metrics.shouldTrack(actorEntity)) {
val actorMetricsRecorder = Kamon.metrics.entity(ActorMetrics, actorEntity)
@@ -60,19 +60,19 @@ class ActorCellInstrumentation {
pjp.proceed()
}
} finally {
- cellMetrics.recorder.map { am ⇒
- val processingTime = System.nanoTime() - timestampBeforeProcessing
- val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime
+ val processingTime = System.nanoTime() - timestampBeforeProcessing
+ val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime
+ cellMetrics.recorder.map { am ⇒
am.processingTime.record(processingTime)
am.timeInMailbox.record(timeInMailbox)
am.mailboxSize.decrement()
+ }
- // In case that this actor is behind a router, record the metrics for the router.
- envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.map { rm ⇒
- rm.processingTime.record(processingTime)
- rm.timeInMailbox.record(timeInMailbox)
- }
+ // In case that this actor is behind a router, record the metrics for the router.
+ envelope.asInstanceOf[RouterAwareEnvelope].routerMetricsRecorder.map { rm ⇒
+ rm.processingTime.record(processingTime)
+ rm.timeInMailbox.record(timeInMailbox)
}
}
}
@@ -127,7 +127,7 @@ class RoutedActorCellInstrumentation {
@After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)")
def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {
- val routerEntity = Entity(ref.path.elements.mkString("/"), RouterMetrics.category)
+ val routerEntity = Entity(system.name + "/" + ref.path.elements.mkString("/"), RouterMetrics.category)
if (Kamon.metrics.shouldTrack(routerEntity)) {
val cellMetrics = cell.asInstanceOf[RoutedActorCellMetrics]
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
index 4567d442..931d8a45 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
@@ -59,16 +59,16 @@ class DispatcherInstrumentation {
private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, system: ActorSystem): Unit =
executorService match {
case fjp: AkkaForkJoinPool ⇒
- val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category)
+ val dispatcherEntity = Entity(system.name + "/" + dispatcherName, AkkaDispatcherMetrics.Category, tags = Map("dispatcher-type" -> "fork-join-pool"))
if (Kamon.metrics.shouldTrack(dispatcherEntity))
- Kamon.metrics.entity(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName, Map("dispatcher-type" -> "fork-join-pool"))
+ Kamon.metrics.entity(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherEntity)
case tpe: ThreadPoolExecutor ⇒
- val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category)
+ val dispatcherEntity = Entity(system.name + "/" + dispatcherName, AkkaDispatcherMetrics.Category, tags = Map("dispatcher-type" -> "thread-pool-executor"))
if (Kamon.metrics.shouldTrack(dispatcherEntity))
- Kamon.metrics.entity(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName, Map("dispatcher-type" -> "thread-pool-executor"))
+ Kamon.metrics.entity(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherEntity)
case others ⇒ // Currently not interested in other kinds of dispatchers.
}
@@ -126,7 +126,17 @@ class DispatcherInstrumentation {
import lazyExecutor.lookupData
if (lookupData.actorSystem != null)
- Kamon.metrics.removeEntity(Entity(lookupData.dispatcherName, AkkaDispatcherMetrics.Category))
+ lazyExecutor.asInstanceOf[ExecutorServiceDelegate].executor match {
+ case fjp: AkkaForkJoinPool ⇒
+ Kamon.metrics.removeEntity(Entity(lookupData.actorSystem.name + "/" + lookupData.dispatcherName,
+ AkkaDispatcherMetrics.Category, tags = Map("dispatcher-type" -> "fork-join-pool")))
+
+ case tpe: ThreadPoolExecutor ⇒
+ Kamon.metrics.removeEntity(Entity(lookupData.actorSystem.name + "/" + lookupData.dispatcherName,
+ AkkaDispatcherMetrics.Category, tags = Map("dispatcher-type" -> "thread-pool-executor")))
+
+ case other ⇒ // nothing to remove.
+ }
}
}
diff --git a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala
index f4db8b8c..cef0071b 100644
--- a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala
@@ -38,8 +38,8 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") {
|
| filters {
| akka-actor {
- | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ]
- | excludes = [ "user/tracked-explicitly-excluded", "user/non-tracked-actor" ]
+ | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/clean-after-collect", "*/user/stop" ]
+ | excludes = [ "*/user/tracked-explicitly-excluded", "*/user/non-tracked-actor" ]
| }
| }
|
@@ -162,7 +162,7 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") {
val buffer: LongBuffer = LongBuffer.allocate(10000)
}
- def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/")
+ def actorRecorderName(ref: ActorRef): String = system.name + "/" + ref.path.elements.mkString("/")
def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetrics] =
Kamon.metrics.find(actorRecorderName(ref), ActorMetrics.category).map(_.asInstanceOf[ActorMetrics])
diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
index dd5cfa45..02a5a0d4 100644
--- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
@@ -36,8 +36,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
|
| filters = {
| akka-dispatcher {
- | includes = [ "*" ]
- | excludes = [ "explicitly-excluded" ]
+ | includes = [ "**" ]
+ | excludes = [ "*/explicitly-excluded" ]
| }
| }
|
@@ -84,16 +84,16 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe"))
val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded"))
- findDispatcherRecorder(defaultDispatcher) shouldNot be(empty)
- findDispatcherRecorder(fjpDispatcher) shouldNot be(empty)
- findDispatcherRecorder(tpeDispatcher) shouldNot be(empty)
- findDispatcherRecorder(excludedDispatcher) should be(empty)
+ findDispatcherRecorder(defaultDispatcher, "fork-join-pool") shouldNot be(empty)
+ findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty)
+ findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty)
+ findDispatcherRecorder(excludedDispatcher, "fork-join-pool") should be(empty)
}
"record metrics for a dispatcher with thread-pool-executor" in {
implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
- refreshDispatcherInstruments(tpeDispatcher)
- collectDispatcherMetrics(tpeDispatcher)
+ refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor")
+ collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor")
Await.result({
Future.sequence {
@@ -101,8 +101,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
}
}, 5 seconds)
- refreshDispatcherInstruments(tpeDispatcher)
- val snapshot = collectDispatcherMetrics(tpeDispatcher)
+ refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor")
+ val snapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor")
snapshot.gauge("active-threads") should not be empty
snapshot.gauge("pool-size").get.min should be >= 7L
@@ -112,13 +112,13 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
snapshot.gauge("processed-tasks").get.max should be(102L +- 5L)
// The processed tasks should be reset to 0 if no more tasks are submitted.
- val secondSnapshot = collectDispatcherMetrics(tpeDispatcher)
+ val secondSnapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor")
secondSnapshot.gauge("processed-tasks").get.max should be(0)
}
"record metrics for a dispatcher with fork-join-executor" in {
implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
- collectDispatcherMetrics(fjpDispatcher)
+ collectDispatcherMetrics(fjpDispatcher, "fork-join-pool")
Await.result({
Future.sequence {
@@ -126,8 +126,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
}
}, 5 seconds)
- refreshDispatcherInstruments(fjpDispatcher)
- val snapshot = collectDispatcherMetrics(fjpDispatcher)
+ refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool")
+ val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool")
snapshot.minMaxCounter("parallelism").get.max should be(22)
snapshot.gauge("pool-size").get.min should be >= 0L
@@ -142,28 +142,28 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
- findDispatcherRecorder(fjpDispatcher) shouldNot be(empty)
- findDispatcherRecorder(tpeDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty)
+ findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty)
shutdownDispatcher(tpeDispatcher)
shutdownDispatcher(fjpDispatcher)
- findDispatcherRecorder(fjpDispatcher) should be(empty)
- findDispatcherRecorder(tpeDispatcher) should be(empty)
+ findDispatcherRecorder(fjpDispatcher, "fork-join-pool") should be(empty)
+ findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") should be(empty)
}
}
def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/")
- def findDispatcherRecorder(dispatcher: MessageDispatcher): Option[EntityRecorder] =
- Kamon.metrics.find(dispatcher.id, "akka-dispatcher")
+ def findDispatcherRecorder(dispatcher: MessageDispatcher, dispatcherType: String): Option[EntityRecorder] =
+ Kamon.metrics.find(system.name + "/" + dispatcher.id, "akka-dispatcher", tags = Map("dispatcher-type" -> dispatcherType))
- def collectDispatcherMetrics(dispatcher: MessageDispatcher): EntitySnapshot =
- findDispatcherRecorder(dispatcher).map(_.collect(collectionContext)).get
+ def collectDispatcherMetrics(dispatcher: MessageDispatcher, dispatcherType: String): EntitySnapshot =
+ findDispatcherRecorder(dispatcher, dispatcherType).map(_.collect(collectionContext)).get
- def refreshDispatcherInstruments(dispatcher: MessageDispatcher): Unit = {
- findDispatcherRecorder(dispatcher) match {
+ def refreshDispatcherInstruments(dispatcher: MessageDispatcher, dispatcherType: String): Unit = {
+ findDispatcherRecorder(dispatcher, dispatcherType) match {
case Some(tpe: ThreadPoolExecutorDispatcherMetrics) ⇒
tpe.processedTasks.refreshValue()
tpe.activeThreads.refreshValue()
diff --git a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala
index 41c329d6..4128c9ef 100644
--- a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala
@@ -39,8 +39,8 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") {
|
| filters = {
| akka-router {
- | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ]
- | excludes = [ "user/tracked-explicitly-excluded-*"]
+ | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/stop-*" ]
+ | excludes = [ "*/user/tracked-explicitly-excluded-*"]
| }
| }
|}
@@ -205,7 +205,7 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") {
}
def routerMetricsRecorderOf(routerName: String): Option[RouterMetrics] =
- Kamon.metrics.find(routerName, RouterMetrics.category).map(_.asInstanceOf[RouterMetrics])
+ Kamon.metrics.find(system.name + "/" + routerName, RouterMetrics.category).map(_.asInstanceOf[RouterMetrics])
def collectMetricsOf(routerName: String): Option[EntitySnapshot] = {
Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested.
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
index 58e16bbd..05f333ab 100644
--- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
@@ -205,6 +205,9 @@ trait Metrics {
def find(name: String, category: String): Option[EntityRecorder] =
find(Entity(name, category))
+ def find(name: String, category: String, tags: Map[String, String]): Option[EntityRecorder] =
+ find(Entity(name, category, tags))
+
def find(entity: Entity): Option[EntityRecorder]
def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit =
diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf
index 4922a5ba..463894b2 100644
--- a/kamon-playground/src/main/resources/application.conf
+++ b/kamon-playground/src/main/resources/application.conf
@@ -22,7 +22,8 @@ kamon {
metric {
filters {
trace.includes = [ "**" ]
- actor.includes = [ "**" ]
+ akka-actor.includes = [ "**" ]
+ akka-dispatcher.includes = [ "**" ]
}
}