aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
blob: 7430e0f08624631195bdf8e0d114668d34715c99 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/* =========================================================================================
 * Copyright © 2013 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon.metrics

import org.scalatest.{ WordSpecLike, Matchers }
import akka.testkit.{ TestProbe, TestKitBase }
import akka.actor.{ ActorRef, Actor, Props, ActorSystem }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import kamon.Kamon
import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics.ActorMetrics.ActorMetricSnapshot
import kamon.metrics.MetricSnapshot.Measurement

class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
  implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
    """
      |kamon.metrics {
      |  filters = [
      |    {
      |      actor {
      |        includes = [ "user/tracked-*" ]
      |        excludes = [ "user/tracked-explicitly-excluded"]
      |      }
      |    }
      |  ]
      |}
    """.stripMargin))

  "the Kamon actor metrics" should {
    "respect the configured include and exclude filters" in new DelayableActorFixture {
      val tracked = system.actorOf(Props[DelayableActor], "tracked-actor")
      val nonTracked = system.actorOf(Props[DelayableActor], "non-tracked-actor")
      val trackedExplicitlyExcluded = system.actorOf(Props[DelayableActor], "tracked-explicitly-excluded")

      Kamon(Metrics).subscribe(ActorMetrics, "*", testActor, permanently = true)
      expectMsgType[TickMetricSnapshot]

      tracked ! Discard
      nonTracked ! Discard
      trackedExplicitlyExcluded ! Discard

      within(2 seconds) {
        val tickSnapshot = expectMsgType[TickMetricSnapshot]
        tickSnapshot.metrics.keys should contain(ActorMetrics("user/tracked-actor"))
        tickSnapshot.metrics.keys should not contain (ActorMetrics("user/non-tracked-actor"))
        tickSnapshot.metrics.keys should not contain (ActorMetrics("user/tracked-explicitly-excluded"))
      }
    }

    "record mailbox-size, processing-time and time-in-mailbox metrics under regular conditions" in new DelayableActorFixture {
      val (delayable, metricsListener) = delayableActor("tracked-normal-conditions")

      for (_  1 to 10) {
        delayable ! Discard
      }

      val actorMetrics = expectActorMetrics("user/tracked-normal-conditions", metricsListener, 3 seconds)
      actorMetrics.mailboxSize.max should be <= 10L
      actorMetrics.processingTime.numberOfMeasurements should be(10L)
      actorMetrics.timeInMailbox.numberOfMeasurements should be(10L)
    }

    "keep a correct mailbox-size even if the actor is blocked processing a message" in new DelayableActorFixture {
      val (delayable, metricsListener) = delayableActor("tracked-mailbox-size-queueing-up")

      delayable ! Delay(2500 milliseconds)
      for (_  1 to 9) {
        delayable ! Discard
      }

      // let the first snapshot pass
      metricsListener.expectMsgType[TickMetricSnapshot]

      // process the tick in which the actor is stalled.
      val stalledTickMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 2 seconds)
      stalledTickMetrics.mailboxSize.numberOfMeasurements should equal(31)
      // only the automatic last-value recording should be taken, and includes the message being currently processed.
      stalledTickMetrics.mailboxSize.measurements should contain only (Measurement(10, 31))
      stalledTickMetrics.mailboxSize.min should equal(10)
      stalledTickMetrics.mailboxSize.max should equal(10)
      stalledTickMetrics.processingTime.numberOfMeasurements should be(0L)
      stalledTickMetrics.timeInMailbox.numberOfMeasurements should be(0L)

      // process the tick after the actor is unblocked.
      val afterStallTickMetrics = expectActorMetrics("user/tracked-mailbox-size-queueing-up", metricsListener, 2 seconds)
      afterStallTickMetrics.processingTime.numberOfMeasurements should be(10L)
      afterStallTickMetrics.timeInMailbox.numberOfMeasurements should be(10L)
      afterStallTickMetrics.processingTime.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos)
      afterStallTickMetrics.timeInMailbox.max should be(2500.milliseconds.toNanos +- 100.milliseconds.toNanos)
    }

    "track the number of errors" in new ErrorActorFixture {
      val (error, metricsListener) = failedActor("tracked-errors")

      for (_  1 to 5) {
        error ! Error
      }

      val actorMetrics = expectActorMetrics("user/tracked-errors", metricsListener, 3 seconds)
      actorMetrics.errorCounter.numberOfMeasurements should be(5L)
    }
  }

  def expectActorMetrics(actorPath: String, listener: TestProbe, waitTime: FiniteDuration): ActorMetricSnapshot = {
    val tickSnapshot = within(waitTime) {
      listener.expectMsgType[TickMetricSnapshot]
    }
    val actorMetricsOption = tickSnapshot.metrics.get(ActorMetrics(actorPath))
    actorMetricsOption should not be empty
    actorMetricsOption.get.asInstanceOf[ActorMetricSnapshot]
  }

  trait DelayableActorFixture {
    def delayableActor(name: String): (ActorRef, TestProbe) = {
      val actor = system.actorOf(Props[DelayableActor], name)
      val metricsListener = TestProbe()

      Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true)
      // Wait for one empty snapshot before proceeding to the test.
      metricsListener.expectMsgType[TickMetricSnapshot]

      (actor, metricsListener)
    }
  }

  trait ErrorActorFixture {
    def failedActor(name: String): (ActorRef, TestProbe) = {
      val actor = system.actorOf(Props[FailedActor], name)
      val metricsListener = TestProbe()

      Kamon(Metrics).subscribe(ActorMetrics, "user/" + name, metricsListener.ref, permanently = true)
      // Wait for one empty snapshot before proceeding to the test.
      metricsListener.expectMsgType[TickMetricSnapshot]

      (actor, metricsListener)
    }
  }
}

class DelayableActor extends Actor {
  def receive = {
    case Delay(time)  Thread.sleep(time.toMillis)
    case Discard     
  }
}

class FailedActor extends Actor {
  def receive = {
    case Error    1 / 0
    case Discard 
  }
}

case object Discard

case class Delay(time: FiniteDuration)

case class Error()