diff options
Diffstat (limited to 'kamon-akka/src/test/scala')
7 files changed, 308 insertions, 227 deletions
diff --git a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala index 06a232bd..8f7ae613 100644 --- a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala @@ -15,35 +15,32 @@ * ========================================================== */ package kamon.instrumentation.akka -import akka.actor.{ Actor, ActorSystem, Props } +import akka.actor.{ Actor, Props } import akka.pattern.{ ask, pipe } import akka.routing._ -import akka.testkit.{ TestKitBase, ImplicitSender, TestKit } import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import kamon.trace.TraceRecorder -import org.scalatest.{ BeforeAndAfterAll, WordSpecLike } +import kamon.testkit.BaseKamonSpec +import kamon.trace.TraceContext import scala.concurrent.duration._ -class ActorCellInstrumentationSpec extends TestKitBase with WordSpecLike with ImplicitSender with BeforeAndAfterAll { - implicit lazy val system: ActorSystem = ActorSystem("actor-cell-instrumentation-spec") - implicit val executionContext = system.dispatcher +class ActorCellInstrumentationSpec extends BaseKamonSpec("actor-cell-instrumentation-spec") { + implicit lazy val executionContext = system.dispatcher "the message passing instrumentation" should { "propagate the TraceContext using bang" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") { + val testTraceContext = TraceContext.withContext(newContext("bang-reply")) { ctxEchoActor ! "test" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext using tell" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") { + val testTraceContext = TraceContext.withContext(newContext("tell-reply")) { ctxEchoActor.tell("test", testActor) - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) @@ -51,37 +48,37 @@ class ActorCellInstrumentationSpec extends TestKitBase with WordSpecLike with Im "propagate the TraceContext using ask" in new EchoActorFixture { implicit val timeout = Timeout(1 seconds) - val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") { + val testTraceContext = TraceContext.withContext(newContext("ask-reply")) { // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. (ctxEchoActor ? "test") pipeTo (testActor) - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext to actors behind a simple router" in new EchoSimpleRouterFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { + val testTraceContext = TraceContext.withContext(newContext("router-reply")) { router.route("test", testActor) - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext to actors behind a pool router" in new EchoPoolRouterFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { + val testTraceContext = TraceContext.withContext(newContext("router-reply")) { pool ! "test" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext to actors behind a group router" in new EchoGroupRouterFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { + val testTraceContext = TraceContext.withContext(newContext("router-reply")) { group ! "test" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) @@ -119,7 +116,7 @@ class ActorCellInstrumentationSpec extends TestKitBase with WordSpecLike with Im class TraceContextEcho extends Actor { def receive = { - case msg: String ⇒ sender ! TraceRecorder.currentContext + case msg: String ⇒ sender ! TraceContext.currentContext } } diff --git a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala index 598e9327..21706af9 100644 --- a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala @@ -15,28 +15,33 @@ * ========================================================== */ package kamon.instrumentation.akka -import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } +import akka.actor.{ Actor, ActorLogging, Props } import akka.event.Logging.LogEvent -import akka.testkit.TestKitBase import com.typesafe.config.ConfigFactory +import kamon.testkit.BaseKamonSpec import kamon.trace.TraceLocal.AvailableToMdc import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceContextAware, TraceLocal, TraceRecorder } -import org.scalatest.{ BeforeAndAfterAll, Inspectors, Matchers, WordSpecLike } +import kamon.trace.{ TraceContextAware, TraceLocal, TraceContext } +import org.scalatest.Inspectors import org.slf4j.MDC -class ActorLoggingInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with Inspectors with MdcKeysSupport with BeforeAndAfterAll { - implicit lazy val system: ActorSystem = ActorSystem("actor-logging-instrumentation-spec", - ConfigFactory.parseString("""akka.loggers = ["akka.event.slf4j.Slf4jLogger"]""")) +class ActorLoggingInstrumentationSpec extends BaseKamonSpec("actor-logging-instrumentation-spec") with Inspectors with MdcKeysSupport { + override lazy val config = + ConfigFactory.parseString( + """ + |akka { + | loggers = ["akka.event.slf4j.Slf4jLogger"] + |} + """.stripMargin) "the ActorLogging instrumentation" should { "attach the TraceContext (if available) to log events" in { val loggerActor = system.actorOf(Props[LoggerActor]) system.eventStream.subscribe(testActor, classOf[LogEvent]) - val testTraceContext = TraceRecorder.withNewTraceContext("logging") { + val testTraceContext = TraceContext.withContext(newContext("logging")) { loggerActor ! "info" - TraceRecorder.currentContext + TraceContext.currentContext } fishForMessage() { @@ -50,7 +55,7 @@ class ActorLoggingInstrumentationSpec extends TestKitBase with WordSpecLike with "allow retrieve a value from the MDC when was created a key of type AvailableToMdc" in { val testString = "Hello World" - TraceRecorder.withNewTraceContext("logging-with-mdc") { + TraceContext.withContext(newContext("logging-with-mdc")) { TraceLocal.store(AvailableToMdc("some-cool-key"))(testString) withMdc { @@ -66,6 +71,6 @@ class ActorLoggingInstrumentationSpec extends TestKitBase with WordSpecLike with class LoggerActor extends Actor with ActorLogging { def receive = { - case "info" ⇒ log.info("TraceContext(name = {}, token = {})", TraceRecorder.currentContext.name, TraceRecorder.currentContext.token) + case "info" ⇒ log.info("TraceContext(name = {}, token = {})", TraceContext.currentContext.name, TraceContext.currentContext.token) } } diff --git a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala index 0e9025af..8c1033ae 100644 --- a/kamon-akka/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala @@ -2,49 +2,53 @@ package kamon.instrumentation.akka import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop } import akka.actor._ -import akka.testkit.{ TestKitBase, ImplicitSender } +import akka.testkit.ImplicitSender import com.typesafe.config.ConfigFactory -import kamon.trace.{ EmptyTraceContext, TraceRecorder } +import kamon.testkit.BaseKamonSpec +import kamon.trace.{ EmptyTraceContext, TraceContext } import org.scalatest.WordSpecLike import scala.concurrent.duration._ import scala.util.control.NonFatal -class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLike with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("actor-system-message-instrumentation-spec", ConfigFactory.parseString( - """ - |akka.loglevel = OFF - """.stripMargin)) +class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-message-instrumentation-spec") with WordSpecLike with ImplicitSender { + override lazy val config = + ConfigFactory.parseString( + """ + |akka { + | loglevel = OFF + |} + """.stripMargin) - implicit val executionContext = system.dispatcher + implicit lazy val executionContext = system.dispatcher "the system message passing instrumentation" should { "keep the TraceContext while processing the Create message in top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") { + val testTraceContext = TraceContext.withContext(newContext("creating-top-level-actor")) { system.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext + testActor ! TraceContext.currentContext def receive: Actor.Receive = { case any ⇒ } })) - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) } "keep the TraceContext while processing the Create message in non top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") { + val testTraceContext = TraceContext.withContext(newContext("creating-non-top-level-actor")) { system.actorOf(Props(new Actor { def receive: Actor.Receive = { case any ⇒ context.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext + testActor ! TraceContext.currentContext def receive: Actor.Receive = { case any ⇒ } })) } })) ! "any" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) @@ -54,9 +58,9 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik "the actor is resumed" in { val supervisor = supervisorWithDirective(Resume) - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") { + val testTraceContext = TraceContext.withContext(newContext("fail-and-resume")) { supervisor ! "fail" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -69,9 +73,9 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik "the actor is restarted" in { val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") { + val testTraceContext = TraceContext.withContext(newContext("fail-and-restart")) { supervisor ! "fail" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -86,9 +90,9 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik "the actor is stopped" in { val supervisor = supervisorWithDirective(Stop, sendPostStop = true) - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") { + val testTraceContext = TraceContext.withContext(newContext("fail-and-stop")) { supervisor ! "fail" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -99,9 +103,9 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik "the failure is escalated" in { val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") { + val testTraceContext = TraceContext.withContext(newContext("fail-and-escalate")) { supervisor ! "fail" - TraceRecorder.currentContext + TraceContext.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -119,7 +123,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik val child = context.actorOf(Props(new Parent)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop + case NonFatal(throwable) ⇒ testActor ! TraceContext.currentContext; Stop } def receive = { @@ -131,7 +135,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik val child = context.actorOf(Props(new Child)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive + case NonFatal(throwable) ⇒ testActor ! TraceContext.currentContext; directive } def receive: Actor.Receive = { @@ -139,7 +143,7 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik } override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext + if (sendPostStop) testActor ! TraceContext.currentContext super.postStop() } } @@ -147,26 +151,26 @@ class ActorSystemMessageInstrumentationSpec extends TestKitBase with WordSpecLik class Child extends Actor { def receive = { case "fail" ⇒ throw new ArithmeticException("Division by zero.") - case "context" ⇒ sender ! TraceRecorder.currentContext + case "context" ⇒ sender ! TraceContext.currentContext } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - if (sendPreRestart) testActor ! TraceRecorder.currentContext + if (sendPreRestart) testActor ! TraceContext.currentContext super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { - if (sendPostRestart) testActor ! TraceRecorder.currentContext + if (sendPostRestart) testActor ! TraceContext.currentContext super.postRestart(reason) } override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext + if (sendPostStop) testActor ! TraceContext.currentContext super.postStop() } override def preStart(): Unit = { - if (sendPreStart) testActor ! TraceRecorder.currentContext + if (sendPreStart) testActor ! TraceContext.currentContext super.preStart() } } diff --git a/kamon-akka/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala index 5c9905ba..0d63a19e 100644 --- a/kamon-akka/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala @@ -21,21 +21,26 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import akka.event.Logging.Warning import akka.pattern.ask -import akka.testkit.{ TestProbe, TestKitBase } +import akka.testkit.TestProbe import akka.util.Timeout import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.akka.Akka -import kamon.trace.{ TraceContext, TraceContextAware, TraceRecorder } -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import kamon.testkit.BaseKamonSpec +import kamon.trace.{ TraceContext, TraceContextAware } import scala.concurrent.duration._ -class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll { - implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", - ConfigFactory.parseString("""akka.loggers = ["akka.event.slf4j.Slf4jLogger"]""")) +class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |akka { + | loglevel = OFF + |} + """.stripMargin) - implicit val ec = system.dispatcher + implicit lazy val ec = system.dispatcher implicit val askTimeout = Timeout(10 millis) // TODO: Make this work with ActorSelections @@ -46,9 +51,9 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M setAskPatternTimeoutWarningMode("heavyweight") expectTimeoutWarning() { - TraceRecorder.withNewTraceContext("ask-timeout-warning") { + TraceContext.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" - TraceRecorder.currentContext + TraceContext.currentContext } } } @@ -59,9 +64,9 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M setAskPatternTimeoutWarningMode("lightweight") expectTimeoutWarning(messageSizeLimit = Some(1)) { - TraceRecorder.withNewTraceContext("ask-timeout-warning") { + TraceContext.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" - TraceRecorder.currentContext + TraceContext.currentContext } } } @@ -72,9 +77,9 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M setAskPatternTimeoutWarningMode("off") expectTimeoutWarning(expectWarning = false) { - TraceRecorder.withNewTraceContext("ask-timeout-warning") { + TraceContext.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" - TraceRecorder.currentContext + TraceContext.currentContext } } } diff --git a/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala index 6d16386b..322abed2 100644 --- a/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/metric/ActorMetricsSpec.scala @@ -20,44 +20,29 @@ import java.nio.LongBuffer import kamon.Kamon import kamon.akka.ActorMetrics import kamon.metric.ActorMetricsTestActor._ +import kamon.metric.instrument.CollectionContext import org.scalatest.{ BeforeAndAfterAll, WordSpecLike, Matchers } import akka.testkit.{ ImplicitSender, TestProbe, TestKitBase } import akka.actor._ import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ -import ActorMetrics.{ ActorMetricsRecorder, ActorMetricSnapshot } class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender with BeforeAndAfterAll { implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( """ - |kamon.metrics { + |kamon.metric { | tick-interval = 1 hour | default-collection-context-buffer-size = 10 | - | filters = [ - | { - | actor { - | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ] - | excludes = [ "user/tracked-explicitly-excluded"] - | } - | } - | ] - | precision.actor { - | processing-time { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | - | time-in-mailbox { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 + | filters { + | akka-actor { + | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ] + | excludes = [ "user/tracked-explicitly-excluded", "user/non-tracked-actor" ] | } + | } | - | mailbox-size { - | refresh-interval = 1 hour - | highest-trackable-value = 999999999 - | significant-value-digits = 2 - | } + | instrument-settings { + | akka-actor.mailbox-size.refresh-interval = 1 hour | } |} | @@ -89,16 +74,16 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with expectMsg(Pong) val firstSnapshot = collectMetricsOf(trackedActor).get - firstSnapshot.errors.count should be(1L) - firstSnapshot.mailboxSize.numberOfMeasurements should be > 0L - firstSnapshot.processingTime.numberOfMeasurements should be(102L) // 102 examples - firstSnapshot.timeInMailbox.numberOfMeasurements should be(102L) // 102 examples + firstSnapshot.counter("errors").get.count should be(1L) + firstSnapshot.minMaxCounter("mailbox-size").get.numberOfMeasurements should be > 0L + firstSnapshot.histogram("processing-time").get.numberOfMeasurements should be(102L) // 102 examples + firstSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(102L) // 102 examples val secondSnapshot = collectMetricsOf(trackedActor).get // Ensure that the recorders are clean - secondSnapshot.errors.count should be(0L) - secondSnapshot.mailboxSize.numberOfMeasurements should be(3L) // min, max and current - secondSnapshot.processingTime.numberOfMeasurements should be(0L) - secondSnapshot.timeInMailbox.numberOfMeasurements should be(0L) + secondSnapshot.counter("errors").get.count should be(0L) + secondSnapshot.minMaxCounter("mailbox-size").get.numberOfMeasurements should be(3L) // min, max and current + secondSnapshot.histogram("processing-time").get.numberOfMeasurements should be(0L) + secondSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(0L) } } @@ -109,9 +94,9 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = expectMsgType[TrackedTimings] val snapshot = collectMetricsOf(trackedActor).get - snapshot.processingTime.numberOfMeasurements should be(1L) - snapshot.processingTime.recordsIterator.next().count should be(1L) - snapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + snapshot.histogram("processing-time").get.numberOfMeasurements should be(1L) + snapshot.histogram("processing-time").get.recordsIterator.next().count should be(1L) + snapshot.histogram("processing-time").get.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) } "record the number of errors" in new ActorMetricsFixtures { @@ -122,7 +107,7 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with expectMsg(Pong) val snapshot = collectMetricsOf(trackedActor).get - snapshot.errors.count should be(10) + snapshot.counter("errors").get.count should be(10) } "record the mailbox-size" in new ActorMetricsFixtures { @@ -138,8 +123,8 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with expectMsg(Pong) val snapshot = collectMetricsOf(trackedActor).get - snapshot.mailboxSize.min should be(0L) - snapshot.mailboxSize.max should be(11L +- 1L) + snapshot.minMaxCounter("mailbox-size").get.min should be(0L) + snapshot.minMaxCounter("mailbox-size").get.max should be(11L +- 1L) } "record the time-in-mailbox" in new ActorMetricsFixtures { @@ -149,20 +134,22 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = expectMsgType[TrackedTimings] val snapshot = collectMetricsOf(trackedActor).get - snapshot.timeInMailbox.numberOfMeasurements should be(1L) - snapshot.timeInMailbox.recordsIterator.next().count should be(1L) - snapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + snapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(1L) + snapshot.histogram("time-in-mailbox").get.recordsIterator.next().count should be(1L) + snapshot.histogram("time-in-mailbox").get.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) } "clean up the associated recorder when the actor is stopped" in new ActorMetricsFixtures { val trackedActor = createTestActor("stop") + val firstRecorder = actorMetricsRecorderOf(trackedActor).get + // Killing the actor should remove it's ActorMetrics and registering again bellow should create a new one. val deathWatcher = TestProbe() deathWatcher.watch(trackedActor) trackedActor ! PoisonPill deathWatcher.expectTerminated(trackedActor) - actorMetricsRecorderOf(trackedActor) shouldBe empty + actorMetricsRecorderOf(trackedActor).get shouldNot be theSameInstanceAs (firstRecorder) } } @@ -175,10 +162,10 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") - def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetricsRecorder] = - Kamon(Metrics)(system).storage.get(ActorMetrics(actorRecorderName(ref))).map(_.asInstanceOf[ActorMetricsRecorder]) + def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetrics] = + Kamon(Metrics)(system).register(ActorMetrics, actorRecorderName(ref)).map(_.recorder) - def collectMetricsOf(ref: ActorRef): Option[ActorMetricSnapshot] = { + def collectMetricsOf(ref: ActorRef): Option[EntitySnapshot] = { Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. actorMetricsRecorderOf(ref).map(_.collect(collectionContext)) } diff --git a/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala index 55af3f2e..2c530da9 100644 --- a/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala @@ -15,96 +15,199 @@ package kamon.metric -import akka.actor.{ ActorRef, ActorSystem, Props } +import java.nio.LongBuffer + +import akka.actor.{ PoisonPill, Props, ActorRef, ActorSystem } +import akka.dispatch.MessageDispatcher import akka.testkit.{ TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.akka.DispatcherMetrics -import DispatcherMetrics.DispatcherMetricSnapshot -import kamon.metric.Subscriptions.TickMetricSnapshot -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.akka.{ ForkJoinPoolDispatcherMetrics, ThreadPoolExecutorDispatcherMetrics } +import kamon.metric.ActorMetricsTestActor.{ Pong, Ping } +import kamon.metric.instrument.CollectionContext +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers { +class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll { implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString( """ - |kamon.metrics { - | tick-interval = 1 second + |kamon.metric { + | tick-interval = 1 hour | default-collection-context-buffer-size = 10 | - | filters = [ - | { - | dispatcher { - | includes = ["*"] - | excludes = ["dispatcher-explicitly-excluded"] - | } + | filters = { + | akka-dispatcher { + | includes = [ "*" ] + | excludes = [ "explicitly-excluded" ] | } - | ] + | } + | + | default-instrument-settings { + | gauge.refresh-interval = 1 hour + | min-max-counter.refresh-interval = 1 hour + | } + |} + | + |explicitly-excluded { + | type = "Dispatcher" + | executor = "fork-join-executor" |} | - |dispatcher-explicitly-excluded { - | type = "Dispatcher" - | executor = "fork-join-executor" + |tracked-fjp { + | type = "Dispatcher" + | executor = "fork-join-executor" + | + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 100.0 + | parallelism-max = 22 + | } |} | - |tracked-dispatcher { - | type = "Dispatcher" - | executor = "thread-pool-executor" + |tracked-tpe { + | type = "Dispatcher" + | executor = "thread-pool-executor" + | + | thread-pool-executor { + | core-pool-size-min = 7 + | core-pool-size-factor = 100.0 + | max-pool-size-factor = 100.0 + | max-pool-size-max = 21 + | } |} | """.stripMargin)) "the Kamon dispatcher metrics" should { "respect the configured include and exclude filters" in { - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher") - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher") + val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher")) + val fjpDispatcher = forceInit(system.dispatchers.lookup("tracked-fjp")) + val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe")) + val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded")) + + findDispatcherRecorder(defaultDispatcher) shouldNot be(empty) + findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) + findDispatcherRecorder(excludedDispatcher) should be(empty) + } - Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] + "record metrics for a dispatcher with thread-pool-executor" in { + implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") + collectDispatcherMetrics(tpeDispatcher) - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher")) - tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded")) - } + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(tpeDispatcher) + } + }, 5 seconds) + + refreshDispatcherInstruments(tpeDispatcher) + val snapshot = collectDispatcherMetrics(tpeDispatcher) + + snapshot.gauge("active-threads") should not be empty + snapshot.gauge("pool-size").get.min should be >= 7L + snapshot.gauge("pool-size").get.max should be <= 21L + snapshot.gauge("max-pool-size").get.max should be(21) + snapshot.gauge("core-pool-size").get.max should be(21) + snapshot.gauge("processed-tasks").get.max should be(102L +- 5L) + + // The processed tasks should be reset to 0 if no more tasks are submitted. + val secondSnapshot = collectDispatcherMetrics(tpeDispatcher) + secondSnapshot.gauge("processed-tasks").get.max should be(0) } - "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture { - val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher") + "record metrics for a dispatcher with fork-join-executor" in { + implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") + collectDispatcherMetrics(fjpDispatcher) + + Await.result({ + Future.sequence { + for (_ ← 1 to 100) yield submit(fjpDispatcher) + } + }, 5 seconds) - for (_ ← 1 to 100) { - //delayable ! Discard - } + refreshDispatcherInstruments(fjpDispatcher) + val snapshot = collectDispatcherMetrics(fjpDispatcher) + + snapshot.minMaxCounter("parallelism").get.max should be(22) + snapshot.gauge("pool-size").get.min should be >= 0L + snapshot.gauge("pool-size").get.max should be <= 22L + snapshot.gauge("active-threads").get.max should be >= 0L + snapshot.gauge("running-threads").get.max should be >= 0L + snapshot.gauge("queued-task-count").get.max should be(0) - val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds) - dispatcherMetrics.maximumPoolSize.max should be <= 64L //fail in travis - dispatcherMetrics.poolSize.max should be <= 22L //fail in travis - dispatcherMetrics.queueTaskCount.max should be(0L) - dispatcherMetrics.runningThreadCount.max should be(0L) } - } + "clean up the metrics recorders after a dispatcher is shut down" in { + implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") + implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") + + findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) - def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = { - val tickSnapshot = within(waitTime) { - listener.expectMsgType[TickMetricSnapshot] + shutdownDispatcher(tpeDispatcher) + shutdownDispatcher(fjpDispatcher) + + findDispatcherRecorder(fjpDispatcher) should be(empty) + findDispatcherRecorder(tpeDispatcher) should be(empty) } - val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId)) - dispatcherMetricsOption should not be empty - dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot] + + } + + val collectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) } - trait DelayableActorFixture { - def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = { - val actor = system.actorOf(Props[ActorMetricsTestActor].withDispatcher(dispatcher), name) - val metricsListener = TestProbe() + def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") - Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true) - // Wait for one empty snapshot before proceeding to the test. - metricsListener.expectMsgType[TickMetricSnapshot] + def findDispatcherRecorder(dispatcher: MessageDispatcher): Option[EntityRecorder] = + Kamon(Metrics)(system).find(dispatcher.id, "akka-dispatcher") - (actor, metricsListener) + def collectDispatcherMetrics(dispatcher: MessageDispatcher): EntitySnapshot = + findDispatcherRecorder(dispatcher).map(_.collect(collectionContext)).get + + def refreshDispatcherInstruments(dispatcher: MessageDispatcher): Unit = { + findDispatcherRecorder(dispatcher) match { + case Some(tpe: ThreadPoolExecutorDispatcherMetrics) ⇒ + tpe.processedTasks.refreshValue() + tpe.activeThreads.refreshValue() + tpe.maxPoolSize.refreshValue() + tpe.poolSize.refreshValue() + tpe.corePoolSize.refreshValue() + + case Some(fjp: ForkJoinPoolDispatcherMetrics) ⇒ + fjp.activeThreads.refreshValue() + fjp.poolSize.refreshValue() + fjp.queuedTaskCount.refreshValue() + fjp.paralellism.refreshValues() + fjp.runningThreads.refreshValue() + + case other ⇒ } } + + def forceInit(dispatcher: MessageDispatcher): MessageDispatcher = { + val listener = TestProbe() + Future { + listener.ref ! "init done" + }(dispatcher) + listener.expectMsg("init done") + + dispatcher + } + + def submit(dispatcher: MessageDispatcher): Future[String] = Future { + "hello" + }(dispatcher) + + def shutdownDispatcher(dispatcher: MessageDispatcher): Unit = { + val shutdownMethod = dispatcher.getClass.getDeclaredMethod("shutdown") + shutdownMethod.setAccessible(true) + shutdownMethod.invoke(dispatcher) + } + + override protected def afterAll(): Unit = system.shutdown() } + diff --git a/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala index abc195ba..5f6bbb4f 100644 --- a/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/metric/RouterMetricsSpec.scala @@ -18,17 +18,13 @@ package kamon.metric import java.nio.LongBuffer import akka.actor._ -import akka.kamon.instrumentation.ActorCellMetrics import akka.routing._ import akka.testkit.{ ImplicitSender, TestKitBase, TestProbe } import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.akka.{ RouterMetrics, ActorMetrics } -import ActorMetrics.{ ActorMetricSnapshot, ActorMetricsRecorder } -import RouterMetrics._ +import kamon.akka.RouterMetrics import kamon.metric.RouterMetricsTestActor._ -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.instrument.{ Counter, Histogram } +import kamon.metric.instrument.CollectionContext import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } import scala.concurrent.duration._ @@ -36,22 +32,14 @@ import scala.concurrent.duration._ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender with BeforeAndAfterAll { implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString( """ - |kamon.metrics { + |kamon.metric { | tick-interval = 1 hour | default-collection-context-buffer-size = 10 | - | filters = [ - | { - | router { - | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ] - | excludes = [ "user/tracked-explicitly-excluded-*"] - | } - | } - | ] - | precision { - | default-histogram-precision { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 + | filters = { + | akka-router { + | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ] + | excludes = [ "user/tracked-explicitly-excluded-*"] | } | } |} @@ -85,7 +73,7 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with listener.expectMsg(Pong) val routerSnapshot = collectMetricsOf("user/measuring-routing-time-in-pool-router").get - routerSnapshot.routingTime.numberOfMeasurements should be(1L) + routerSnapshot.histogram("routing-time").get.numberOfMeasurements should be(1L) } "record the routing-time of the receive function for group routers" in new RouterMetricsFixtures { @@ -96,7 +84,7 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with listener.expectMsg(Pong) val routerSnapshot = collectMetricsOf("user/measuring-routing-time-in-group-router").get - routerSnapshot.routingTime.numberOfMeasurements should be(1L) + routerSnapshot.histogram("routing-time").get.numberOfMeasurements should be(1L) } "record the processing-time of the receive function for pool routers" in new RouterMetricsFixtures { @@ -107,9 +95,9 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = timingsListener.expectMsgType[RouterTrackedTimings] val routerSnapshot = collectMetricsOf("user/measuring-processing-time-in-pool-router").get - routerSnapshot.processingTime.numberOfMeasurements should be(1L) - routerSnapshot.processingTime.recordsIterator.next().count should be(1L) - routerSnapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + routerSnapshot.histogram("processing-time").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) } "record the processing-time of the receive function for group routers" in new RouterMetricsFixtures { @@ -120,9 +108,9 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = timingsListener.expectMsgType[RouterTrackedTimings] val routerSnapshot = collectMetricsOf("user/measuring-processing-time-in-group-router").get - routerSnapshot.processingTime.numberOfMeasurements should be(1L) - routerSnapshot.processingTime.recordsIterator.next().count should be(1L) - routerSnapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) + routerSnapshot.histogram("processing-time").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("processing-time").get.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) } "record the number of errors for pool routers" in new RouterMetricsFixtures { @@ -137,7 +125,7 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with listener.expectMsg(Pong) val routerSnapshot = collectMetricsOf("user/measuring-errors-in-pool-router").get - routerSnapshot.errors.count should be(10L) + routerSnapshot.counter("errors").get.count should be(10L) } "record the number of errors for group routers" in new RouterMetricsFixtures { @@ -152,7 +140,7 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with listener.expectMsg(Pong) val routerSnapshot = collectMetricsOf("user/measuring-errors-in-group-router").get - routerSnapshot.errors.count should be(10L) + routerSnapshot.counter("errors").get.count should be(10L) } "record the time-in-mailbox for pool routers" in new RouterMetricsFixtures { @@ -163,9 +151,9 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = timingsListener.expectMsgType[RouterTrackedTimings] val routerSnapshot = collectMetricsOf("user/measuring-time-in-mailbox-in-pool-router").get - routerSnapshot.timeInMailbox.numberOfMeasurements should be(1L) - routerSnapshot.timeInMailbox.recordsIterator.next().count should be(1L) - routerSnapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + routerSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) } "record the time-in-mailbox for group routers" in new RouterMetricsFixtures { @@ -176,33 +164,35 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val timings = timingsListener.expectMsgType[RouterTrackedTimings] val routerSnapshot = collectMetricsOf("user/measuring-time-in-mailbox-in-group-router").get - routerSnapshot.timeInMailbox.numberOfMeasurements should be(1L) - routerSnapshot.timeInMailbox.recordsIterator.next().count should be(1L) - routerSnapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) + routerSnapshot.histogram("time-in-mailbox").get.numberOfMeasurements should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().count should be(1L) + routerSnapshot.histogram("time-in-mailbox").get.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) } "clean up the associated recorder when the pool router is stopped" in new RouterMetricsFixtures { val trackedRouter = createTestPoolRouter("stop-in-pool-router") - routerMetricsRecorderOf("user/stop-in-pool-router") should not be empty + val firstRecorder = routerMetricsRecorderOf("user/stop-in-pool-router").get + // Killing the router should remove it's RouterMetrics and registering again bellow should create a new one. val deathWatcher = TestProbe() deathWatcher.watch(trackedRouter) trackedRouter ! PoisonPill deathWatcher.expectTerminated(trackedRouter) - routerMetricsRecorderOf("user/stop-in-pool-router") shouldBe empty + routerMetricsRecorderOf("user/stop-in-pool-router").get shouldNot be theSameInstanceAs (firstRecorder) } "clean up the associated recorder when the group router is stopped" in new RouterMetricsFixtures { val trackedRouter = createTestPoolRouter("stop-in-group-router") - routerMetricsRecorderOf("user/stop-in-group-router") should not be empty + val firstRecorder = routerMetricsRecorderOf("user/stop-in-group-router").get + // Killing the router should remove it's RouterMetrics and registering again bellow should create a new one. val deathWatcher = TestProbe() deathWatcher.watch(trackedRouter) trackedRouter ! PoisonPill deathWatcher.expectTerminated(trackedRouter) - routerMetricsRecorderOf("user/stop-in-group-router") shouldBe empty + routerMetricsRecorderOf("user/stop-in-group-router").get shouldNot be theSameInstanceAs (firstRecorder) } } @@ -213,10 +203,10 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with val buffer: LongBuffer = LongBuffer.allocate(10000) } - def routerMetricsRecorderOf(routerName: String): Option[RouterMetricsRecorder] = - Kamon(Metrics)(system).storage.get(RouterMetrics(routerName)).map(_.asInstanceOf[RouterMetricsRecorder]) + def routerMetricsRecorderOf(routerName: String): Option[RouterMetrics] = + Kamon(Metrics)(system).register(RouterMetrics, routerName).map(_.recorder) - def collectMetricsOf(routerName: String): Option[RouterMetricSnapshot] = { + def collectMetricsOf(routerName: String): Option[EntitySnapshot] = { Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. routerMetricsRecorderOf(routerName).map(_.collect(collectionContext)) } @@ -255,16 +245,6 @@ class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with router } } - - trait ActorMetricsFixtures { - val collectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(10000) - } - - def createTestActor(name: String): ActorRef = system.actorOf(Props[ActorMetricsTestActor], name) - - def takeSnapshotOf(amr: ActorMetricsRecorder): ActorMetricSnapshot = amr.collect(collectionContext) - } } class RouterMetricsTestActor extends Actor { |