/* ========================================================================================= * Copyright © 2013 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. * ========================================================================================= */ package kamon.metrics import org.scalatest.{ WordSpecLike, Matchers } import akka.testkit.{ TestProbe, TestKitBase } import akka.actor.{ ActorRef, Actor, Props, ActorSystem } import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import kamon.Kamon import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics.ActorMetrics.ActorMetricSnapshot import kamon.metrics.MetricSnapshot.Measurement class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( """ |kamon.metrics { | filters = [ | { | actor { | includes = [ "user/tracked-*" ] | excludes = [ "user/tracked-explicitly-excluded"] | } | } | ] |} """.stripMargin)) "the Kamon actor metrics" should { "respect the configured include and exclude filters" in new DelayableActorFixture { val tracked = system.actorOf(Props[DelayableActor], "tracked-actor") val nonTracked = system.actorOf(Props[DelayableActor], "non-tracked-actor") val trackedExplicitlyExcluded = system.actorOf(Props[DelayableActor], "tracked-explicitly-excluded") Kamon(Metrics).subscribe(ActorMetrics, "*", testActor, permanently = true) expectMsgType[TickMetricSnapshot] tracked ! Discard nonTracked ! Discard trackedExplicitlyExcluded ! Discard within(2 seconds) { val tickSnapshot = expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.keys should contain(ActorMetrics("user/tracked-actor")) tickSnapshot.metrics.keys should not contain (ActorMetrics("user/non-tracked-actor")) tickSnapshot.metrics.keys should not contain (ActorMetrics("user/tracked-explicitly-excluded")) } } "record mailbox-size, processing-time and time-in-mailbox metrics under regular conditions" in new DelayableActorFixture { val (delayable, metricsListener) = delayableActor("tracked-normal-conditions") for (_ ← 1 to 10) { delayable ! Discard } val actorMetrics = expectActorMetrics("user/tracked-normal-conditions", metricsListener, 3 seconds) actorMetrics.mailboxSize.max should be <= 10L actorMetrics.processingTime.numberOfMeasurements should be(10L) actorMetrics.timeInMailbox.numberOfMeasurements should be(10L) } "keep a correct mailbox-size even if the actor is blocked processing a message" in new DelayableActorFixture { val (delayable, metricsListener) = delayableActor("tracked-mailbox-size-queueing-up") delayable ! Delay(2500 milliseconds) for (_ ← 1 to 9) { delayable ! Discard } // let the first snapshot pass metricsListener.expectMsgType[TickMetricSnapshot] // process the tick in which the actor is stalled. val stalledTickMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 2 seconds) stalledTickMetrics.mailboxSize.numberOfMeasurements should equal(1) // only the automatic last-value recording should be taken, and includes the message being currently processed. stalledTickMetrics.mailboxSize.measurements should contain only (Measurement(10, 1)) stalledTickMetrics.mailboxSize.max should equal(10) stalledTickMetrics.processingTime.numberOfMeasurements should be(0L) stalledTickMetrics.timeInMailbox.numberOfMeasurements should be(0L) // process the tick after the actor is unblocked. val afterStallTickMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 2 seconds) afterStallTickMetrics.processingTime.numberOfMeasurements should be(10L) afterStallTickMetrics.timeInMailbox.numberOfMeasurements should be(10L) afterStallTickMetrics.processingTime.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos) afterStallTickMetrics.timeInMailbox.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos) } } "track the number of errors" in new ErrorActorFixture { val (error, metricsListener) = failedActor("tracked-errors") for (_ ← 1 to 5) { error ! Error } val actorMetrics = expectActorMetrics("user/tracked-errors", metricsListener, 3 seconds) actorMetrics.errorCounter.numberOfMeasurements should be(5L) } def expectActorMetrics(actorPath: String, listener: TestProbe, waitTime: FiniteDuration): ActorMetricSnapshot = { val tickSnapshot = within(waitTime) { listener.expectMsgType[TickMetricSnapshot] } val actorMetricsOption = tickSnapshot.metrics.get(ActorMetrics(actorPath)) actorMetricsOption should not be empty actorMetricsOption.get.asInstanceOf[ActorMetricSnapshot] } trait DelayableActorFixture { def delayableActor(name: String): (ActorRef, TestProbe) = { val actor = system.actorOf(Props[DelayableActor], name) val metricsListener = TestProbe() Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true) // Wait for one empty snapshot before proceeding to the test. metricsListener.expectMsgType[TickMetricSnapshot] (actor, metricsListener) } } trait ErrorActorFixture { def failedActor(name: String): (ActorRef, TestProbe) = { val actor = system.actorOf(Props[FailedActor], name) val metricsListener = TestProbe() Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true) // Wait for one empty snapshot before proceeding to the test. metricsListener.expectMsgType[TickMetricSnapshot] (actor, metricsListener) } } } class DelayableActor extends Actor { def receive = { case Delay(time) ⇒ Thread.sleep(time.toMillis) case Discard ⇒ } } class FailedActor extends Actor { def receive = { case Error ⇒ 1 / 0 case Discard ⇒ } } case object Discard case class Delay(time: FiniteDuration) case class Error()