From f6e9cb16798abf5eecada300df44339d3f09592d Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 24 Mar 2014 00:27:02 -0300 Subject: record mailbox size when sending a message to an actor --- .../ActorMessagePassingTracing.scala | 11 +++ .../scala/kamon/metrics/ActorMetricsSpec.scala | 98 +++++++++++++++++++--- 2 files changed, 97 insertions(+), 12 deletions(-) (limited to 'kamon-core') diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 60266461..dcdf6f94 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -60,6 +60,17 @@ class BehaviourInvokeTracing { } } + @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell)") + def sendingMessageToActorCell(cell: ActorCell): Unit = {} + + @After("sendingMessageToActorCell(cell)") + def afterSendMessageToActorCell(cell: ActorCell): Unit = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] + cellWithMetrics.actorMetricsRecorder.map { am ⇒ + am.mailboxSize.record(cell.numberOfMessages) + } + } + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") def actorStop(cell: ActorCell): Unit = {} diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala index ad9fd13f..646e4159 100644 --- a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala @@ -16,12 +16,14 @@ package kamon.metrics import org.scalatest.{ WordSpecLike, Matchers } -import akka.testkit.TestKitBase -import akka.actor.{ Actor, Props, ActorSystem } +import akka.testkit.{ TestProbe, TestKitBase } +import akka.actor.{ ActorRef, Actor, Props, ActorSystem } import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import kamon.Kamon import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.ActorMetrics.ActorMetricSnapshot +import kamon.metrics.MetricSnapshot.Measurement class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( @@ -30,8 +32,8 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { | filters = [ | { | actor { - | includes = [ "user/*" ] - | excludes = [ ] + | includes = [ "user/tracked-*" ] + | excludes = [ "user/tracked-explicitly-excluded"] | } | } | ] @@ -39,18 +41,90 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { """.stripMargin)) "the Kamon actor metrics" should { - "track configured actors" in { - Kamon(Metrics).subscribe(ActorMetrics, "user/test-tracked-actor", testActor) + "respect the configured include and exclude filters" in new DelayableActorFixture { + val tracked = system.actorOf(Props[DelayableActor], "tracked-actor") + val nonTracked = system.actorOf(Props[DelayableActor], "non-tracked-actor") + val trackedExplicitlyExcluded = system.actorOf(Props[DelayableActor], "tracked-explicitly-excluded") - system.actorOf(Props[Discard], "test-tracked-actor") ! "nothing" + Kamon(Metrics).subscribe(ActorMetrics, "*", testActor, permanently = true) + expectMsgType[TickMetricSnapshot] - println(within(5 seconds) { - expectMsgType[TickMetricSnapshot] - }) + tracked ! Discard + nonTracked ! Discard + trackedExplicitlyExcluded ! Discard + + within(2 seconds) { + val tickSnapshot = expectMsgType[TickMetricSnapshot] + tickSnapshot.metrics.keys should contain(ActorMetrics("user/tracked-actor")) + tickSnapshot.metrics.keys should not contain (ActorMetrics("user/non-tracked-actor")) + tickSnapshot.metrics.keys should not contain (ActorMetrics("user/tracked-explicitly-excluded")) + } + } + + "record mailbox-size, processing-time and time-in-mailbox metrics under regular conditions" in new DelayableActorFixture { + val (delayable, metricsListener) = delayableActor("tracked-normal-conditions") + + for (_ ← 1 to 10) { + delayable ! Discard + } + + val actorMetrics = expectActorMetrics("user/tracked-normal-conditions", metricsListener, 3 seconds) + // Mailbox size is measured twice, on queue and dequeue, plus the automatic last-value recording. + actorMetrics.mailboxSize.numberOfMeasurements should be(21) + actorMetrics.mailboxSize.max should be < 10L + + actorMetrics.processingTime.numberOfMeasurements should be(10L) + + actorMetrics.timeInMailbox.numberOfMeasurements should be(10L) + } + + "keep a correct mailbox-size even if the actor is blocked processing a message" in new DelayableActorFixture { + val (delayable, metricsListener) = delayableActor("tracked-mailbox-size-queueing-up") + + delayable ! Delay(2500 milliseconds) + for (_ ← 1 to 9) { + delayable ! Discard + } + + // let the first snapshot pass + metricsListener.expectMsgType[TickMetricSnapshot] + + val actorMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 3 seconds) + actorMetrics.mailboxSize.numberOfMeasurements should equal(1) + actorMetrics.mailboxSize.measurements should contain only (Measurement(9, 1)) // only the automatic last-value recording + actorMetrics.mailboxSize.max should equal(9) + } + } + + def expectActorMetrics(actorPath: String, listener: TestProbe, waitTime: FiniteDuration): ActorMetricSnapshot = { + val tickSnapshot = within(waitTime) { + listener.expectMsgType[TickMetricSnapshot] + } + val actorMetricsOption = tickSnapshot.metrics.get(ActorMetrics(actorPath)) + actorMetricsOption should not be empty + actorMetricsOption.get.asInstanceOf[ActorMetricSnapshot] + } + + trait DelayableActorFixture { + def delayableActor(name: String): (ActorRef, TestProbe) = { + val actor = system.actorOf(Props[DelayableActor], name) + val metricsListener = TestProbe() + + Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true) + // Wait for one empty snapshot before proceeding to the test. + metricsListener.expectMsgType[TickMetricSnapshot] + + (actor, metricsListener) } } } -class Discard extends Actor { - def receive = { case a ⇒ } +class DelayableActor extends Actor { + def receive = { + case Delay(time) ⇒ Thread.sleep(time.toMillis) + case Discard ⇒ + } } + +case object Discard +case class Delay(time: FiniteDuration) -- cgit v1.2.3