aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-03-24 00:27:02 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-03-24 00:27:02 -0300
commitf6e9cb16798abf5eecada300df44339d3f09592d (patch)
treed7c47b891a11959681f3e5688c42314133dcac19
parentb0c40681d8d6a7f5096fd6d70f37eda23336b0a6 (diff)
downloadKamon-f6e9cb16798abf5eecada300df44339d3f09592d.tar.gz
Kamon-f6e9cb16798abf5eecada300df44339d3f09592d.tar.bz2
Kamon-f6e9cb16798abf5eecada300df44339d3f09592d.zip
record mailbox size when sending a message to an actor
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala11
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala98
2 files changed, 97 insertions, 12 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index 60266461..dcdf6f94 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -60,6 +60,17 @@ class BehaviourInvokeTracing {
}
}
+ @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)")
+ def sendingMessageToActorCell(cell: ActorCell): Unit = {}
+
+ @After("sendingMessageToActorCell(cell)")
+ def afterSendMessageToActorCell(cell: ActorCell): Unit = {
+ val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
+ cellWithMetrics.actorMetricsRecorder.map { am ⇒
+ am.mailboxSize.record(cell.numberOfMessages)
+ }
+ }
+
@Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)")
def actorStop(cell: ActorCell): Unit = {}
diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
index ad9fd13f..646e4159 100644
--- a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
@@ -16,12 +16,14 @@
package kamon.metrics
import org.scalatest.{ WordSpecLike, Matchers }
-import akka.testkit.TestKitBase
-import akka.actor.{ Actor, Props, ActorSystem }
+import akka.testkit.{ TestProbe, TestKitBase }
+import akka.actor.{ ActorRef, Actor, Props, ActorSystem }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import kamon.Kamon
import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metrics.ActorMetrics.ActorMetricSnapshot
+import kamon.metrics.MetricSnapshot.Measurement
class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
@@ -30,8 +32,8 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
| filters = [
| {
| actor {
- | includes = [ "user/*" ]
- | excludes = [ ]
+ | includes = [ "user/tracked-*" ]
+ | excludes = [ "user/tracked-explicitly-excluded"]
| }
| }
| ]
@@ -39,18 +41,90 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
""".stripMargin))
"the Kamon actor metrics" should {
- "track configured actors" in {
- Kamon(Metrics).subscribe(ActorMetrics, "user/test-tracked-actor", testActor)
+ "respect the configured include and exclude filters" in new DelayableActorFixture {
+ val tracked = system.actorOf(Props[DelayableActor], "tracked-actor")
+ val nonTracked = system.actorOf(Props[DelayableActor], "non-tracked-actor")
+ val trackedExplicitlyExcluded = system.actorOf(Props[DelayableActor], "tracked-explicitly-excluded")
- system.actorOf(Props[Discard], "test-tracked-actor") ! "nothing"
+ Kamon(Metrics).subscribe(ActorMetrics, "*", testActor, permanently = true)
+ expectMsgType[TickMetricSnapshot]
- println(within(5 seconds) {
- expectMsgType[TickMetricSnapshot]
- })
+ tracked ! Discard
+ nonTracked ! Discard
+ trackedExplicitlyExcluded ! Discard
+
+ within(2 seconds) {
+ val tickSnapshot = expectMsgType[TickMetricSnapshot]
+ tickSnapshot.metrics.keys should contain(ActorMetrics("user/tracked-actor"))
+ tickSnapshot.metrics.keys should not contain (ActorMetrics("user/non-tracked-actor"))
+ tickSnapshot.metrics.keys should not contain (ActorMetrics("user/tracked-explicitly-excluded"))
+ }
+ }
+
+ "record mailbox-size, processing-time and time-in-mailbox metrics under regular conditions" in new DelayableActorFixture {
+ val (delayable, metricsListener) = delayableActor("tracked-normal-conditions")
+
+ for (_ ← 1 to 10) {
+ delayable ! Discard
+ }
+
+ val actorMetrics = expectActorMetrics("user/tracked-normal-conditions", metricsListener, 3 seconds)
+ // Mailbox size is measured twice, on queue and dequeue, plus the automatic last-value recording.
+ actorMetrics.mailboxSize.numberOfMeasurements should be(21)
+ actorMetrics.mailboxSize.max should be < 10L
+
+ actorMetrics.processingTime.numberOfMeasurements should be(10L)
+
+ actorMetrics.timeInMailbox.numberOfMeasurements should be(10L)
+ }
+
+ "keep a correct mailbox-size even if the actor is blocked processing a message" in new DelayableActorFixture {
+ val (delayable, metricsListener) = delayableActor("tracked-mailbox-size-queueing-up")
+
+ delayable ! Delay(2500 milliseconds)
+ for (_ ← 1 to 9) {
+ delayable ! Discard
+ }
+
+ // let the first snapshot pass
+ metricsListener.expectMsgType[TickMetricSnapshot]
+
+ val actorMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 3 seconds)
+ actorMetrics.mailboxSize.numberOfMeasurements should equal(1)
+ actorMetrics.mailboxSize.measurements should contain only (Measurement(9, 1)) // only the automatic last-value recording
+ actorMetrics.mailboxSize.max should equal(9)
+ }
+ }
+
+ def expectActorMetrics(actorPath: String, listener: TestProbe, waitTime: FiniteDuration): ActorMetricSnapshot = {
+ val tickSnapshot = within(waitTime) {
+ listener.expectMsgType[TickMetricSnapshot]
+ }
+ val actorMetricsOption = tickSnapshot.metrics.get(ActorMetrics(actorPath))
+ actorMetricsOption should not be empty
+ actorMetricsOption.get.asInstanceOf[ActorMetricSnapshot]
+ }
+
+ trait DelayableActorFixture {
+ def delayableActor(name: String): (ActorRef, TestProbe) = {
+ val actor = system.actorOf(Props[DelayableActor], name)
+ val metricsListener = TestProbe()
+
+ Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true)
+ // Wait for one empty snapshot before proceeding to the test.
+ metricsListener.expectMsgType[TickMetricSnapshot]
+
+ (actor, metricsListener)
}
}
}
-class Discard extends Actor {
- def receive = { case a ⇒ }
+class DelayableActor extends Actor {
+ def receive = {
+ case Delay(time) ⇒ Thread.sleep(time.toMillis)
+ case Discard ⇒
+ }
}
+
+case object Discard
+case class Delay(time: FiniteDuration)