From ff7ac0ec79dd61849b4c76b10f74af72c7cceea9 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 28 Dec 2014 07:59:59 +0100 Subject: ! core,akka: separate all akka instrumentation into it's own kamon-akka module, related to #136. All Akka-related instrumentation and code has been moved to the kamon-akka module, including the filters for actor, dispatcher and router metrics. Also the following changes are included: - Router Metrics are now working properly, related to #139. - Cleanup the log output for this module, related to #142. - Some minor cleanups in various tests. This PR breaks the reporting modules which will need to wait for #141 to be ready to come back to life. --- .../akka/ActorCellInstrumentationSpec.scala | 87 -------- .../akka/ActorLoggingInstrumentationSpec.scala | 73 ------- .../ActorSystemMessageInstrumentationSpec.scala | 172 ---------------- .../akka/AskPatternInstrumentationSpec.scala | 66 ------ .../test/scala/kamon/metric/ActorMetricsSpec.scala | 224 --------------------- .../scala/kamon/metric/DispatcherMetricsSpec.scala | 108 ---------- .../scala/kamon/metric/RouterMetricsSpec.scala | 161 --------------- 7 files changed, 891 deletions(-) delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala delete mode 100644 kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala (limited to 'kamon-core/src/test/scala/kamon') diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala deleted file mode 100644 index 0f682500..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorCellInstrumentationSpec.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation.akka - -import akka.actor.{ Actor, ActorSystem, Props } -import akka.pattern.{ ask, pipe } -import akka.routing.RoundRobinPool -import akka.testkit.{ ImplicitSender, TestKit } -import akka.util.Timeout -import kamon.trace.TraceRecorder -import org.scalatest.WordSpecLike - -import scala.concurrent.duration._ - -class ActorCellInstrumentationSpec extends TestKit(ActorSystem("actor-cell-instrumentation-spec")) with WordSpecLike - with ImplicitSender { - - implicit val executionContext = system.dispatcher - - "the message passing instrumentation" should { - "propagate the TraceContext using bang" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("bang-reply") { - ctxEchoActor ! "test" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext using tell" in new EchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("tell-reply") { - ctxEchoActor.tell("test", testActor) - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext using ask" in new EchoActorFixture { - implicit val timeout = Timeout(1 seconds) - val testTraceContext = TraceRecorder.withNewTraceContext("ask-reply") { - // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. - (ctxEchoActor ? "test") pipeTo (testActor) - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "propagate the TraceContext to actors behind a router" in new RoutedEchoActorFixture { - val testTraceContext = TraceRecorder.withNewTraceContext("router-reply") { - ctxEchoActor ! "test" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - } - - trait EchoActorFixture { - val ctxEchoActor = system.actorOf(Props[TraceContextEcho]) - } - - trait RoutedEchoActorFixture extends EchoActorFixture { - override val ctxEchoActor = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinPool(nrOfInstances = 1))) - } -} - -class TraceContextEcho extends Actor { - def receive = { - case msg: String ⇒ sender ! TraceRecorder.currentContext - } -} - diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala deleted file mode 100644 index 4b114d4f..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorLoggingInstrumentationSpec.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.instrumentation.akka - -import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } -import akka.event.Logging.LogEvent -import akka.testkit.TestKit -import kamon.trace.TraceLocal.AvailableToMdc -import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceLocal, TraceContextAware, TraceRecorder } -import org.scalatest.{ Inspectors, Matchers, WordSpecLike } -import org.slf4j.MDC - -class ActorLoggingInstrumentationSpec extends TestKit(ActorSystem("actor-logging-instrumentation-spec")) with WordSpecLike - with Matchers with Inspectors with MdcKeysSupport { - - "the ActorLogging instrumentation" should { - "attach the TraceContext (if available) to log events" in { - val loggerActor = system.actorOf(Props[LoggerActor]) - system.eventStream.subscribe(testActor, classOf[LogEvent]) - - val testTraceContext = TraceRecorder.withNewTraceContext("logging") { - loggerActor ! "info" - TraceRecorder.currentContext - } - - fishForMessage() { - case event: LogEvent if event.message.toString contains "TraceContext =>" ⇒ - val ctxInEvent = event.asInstanceOf[TraceContextAware].traceContext - ctxInEvent === testTraceContext - - case event: LogEvent ⇒ false - } - } - - "allow retrieve a value from the MDC when was created a key of type AvailableToMdc" in { - val testString = "Hello World" - val SampleTraceLocalKeyAvailableToMDC = AvailableToMdc("some-cool-key") - - val loggerActor = system.actorOf(Props[LoggerActor]) - system.eventStream.subscribe(testActor, classOf[LogEvent]) - - TraceRecorder.withNewTraceContext("logging-with-mdc") { - TraceLocal.store(SampleTraceLocalKeyAvailableToMDC)(testString) - - loggerActor ! "info" - - withMdc { - MDC.get("some-cool-key") should equal(testString) - } - } - } - } -} - -class LoggerActor extends Actor with ActorLogging { - def receive = { - case "info" ⇒ log.info("TraceContext => {}", TraceRecorder.currentContext) - } -} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala deleted file mode 100644 index d79ccbe0..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentationSpec.scala +++ /dev/null @@ -1,172 +0,0 @@ -package kamon.instrumentation.akka - -import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop } -import akka.actor._ -import akka.testkit.{ ImplicitSender, TestKit } -import kamon.trace.{ EmptyTraceContext, TraceRecorder } -import org.scalatest.WordSpecLike - -import scala.concurrent.duration._ -import scala.util.control.NonFatal - -class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-system-message-instrumentation-spec")) - with WordSpecLike with ImplicitSender { - - implicit val executionContext = system.dispatcher - - "the system message passing instrumentation" should { - "keep the TraceContext while processing the Create message in top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") { - system.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext - def receive: Actor.Receive = { case any ⇒ } - })) - - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "keep the TraceContext while processing the Create message in non top level actors" in { - val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") { - system.actorOf(Props(new Actor { - def receive: Actor.Receive = { - case any ⇒ - context.actorOf(Props(new Actor { - testActor ! TraceRecorder.currentContext - def receive: Actor.Receive = { case any ⇒ } - })) - } - })) ! "any" - - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) - } - - "keep the TraceContext in the supervision cycle" when { - "the actor is resumed" in { - val supervisor = supervisorWithDirective(Resume) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - - // Ensure we didn't tie the actor with the context - supervisor ! "context" - expectMsg(EmptyTraceContext) - } - - "the actor is restarted" in { - val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the preRestart hook - expectMsg(testTraceContext) // From the postRestart hook - - // Ensure we didn't tie the actor with the context - supervisor ! "context" - expectMsg(EmptyTraceContext) - } - - "the actor is stopped" in { - val supervisor = supervisorWithDirective(Stop, sendPostStop = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the postStop hook - expectNoMsg(1 second) - } - - "the failure is escalated" in { - val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) - - val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") { - supervisor ! "fail" - TraceRecorder.currentContext - } - - expectMsg(testTraceContext) // From the parent executing the supervision strategy - expectMsg(testTraceContext) // From the grandparent executing the supervision strategy - expectMsg(testTraceContext) // From the postStop hook in the child - expectMsg(testTraceContext) // From the postStop hook in the parent - expectNoMsg(1 second) - } - } - } - - def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false, - sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = { - class GrandParent extends Actor { - val child = context.actorOf(Props(new Parent)) - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop - } - - def receive = { - case any ⇒ child forward any - } - } - - class Parent extends Actor { - val child = context.actorOf(Props(new Child)) - - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive - } - - def receive: Actor.Receive = { - case any ⇒ child forward any - } - - override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext - super.postStop() - } - } - - class Child extends Actor { - def receive = { - case "fail" ⇒ throw new ArithmeticException("Division by zero.") - case "context" ⇒ sender ! TraceRecorder.currentContext - } - - override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - if (sendPreRestart) testActor ! TraceRecorder.currentContext - super.preRestart(reason, message) - } - - override def postRestart(reason: Throwable): Unit = { - if (sendPostRestart) testActor ! TraceRecorder.currentContext - super.postRestart(reason) - } - - override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceRecorder.currentContext - super.postStop() - } - - override def preStart(): Unit = { - if (sendPreStart) testActor ! TraceRecorder.currentContext - super.preStart() - } - } - - system.actorOf(Props(new GrandParent)) - } -} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala deleted file mode 100644 index 471cbd4d..00000000 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.instrumentation.akka - -import akka.actor.{ Actor, ActorSystem, Props } -import akka.event.Logging.Warning -import akka.pattern.ask -import akka.testkit.TestKitBase -import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import kamon.trace.{ TraceContextAware, TraceRecorder } -import org.scalatest.{ Matchers, WordSpecLike } - -import scala.concurrent.duration._ - -class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("ask-pattern-tracing-spec", ConfigFactory.parseString( - """ - |kamon { - | akka { - | ask-pattern-timeout-warning = heavyweight - | } - |} - """.stripMargin)) - - "the AskPatternTracing" should { - "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in { - implicit val ec = system.dispatcher - implicit val timeout = Timeout(10 milliseconds) - val noReply = system.actorOf(Props[NoReply], "NoReply") - system.eventStream.subscribe(testActor, classOf[Warning]) - - val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") { - noReply ? "hello" - TraceRecorder.currentContext - } - - val warn = expectMsgPF() { - case warn: Warning if warn.message.toString.contains("Timeout triggered for ask pattern") ⇒ warn - } - val capturedCtx = warn.asInstanceOf[TraceContextAware].traceContext - - capturedCtx should equal(testTraceContext) - } - } -} - -class NoReply extends Actor { - def receive = { - case any ⇒ - } -} diff --git a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala deleted file mode 100644 index 97bcb0cf..00000000 --- a/kamon-core/src/test/scala/kamon/metric/ActorMetricsSpec.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import java.nio.LongBuffer - -import akka.kamon.instrumentation.ActorCellMetrics -import kamon.Kamon -import kamon.metric.ActorMetricsTestActor._ -import kamon.metric.instrument.Histogram.MutableRecord -import org.scalatest.{ WordSpecLike, Matchers } -import akka.testkit.{ ImplicitSender, TestProbe, TestKitBase } -import akka.actor._ -import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.ActorMetrics.{ ActorMetricsRecorder, ActorMetricSnapshot } - -class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 hour - | default-collection-context-buffer-size = 10 - | - | filters = [ - | { - | actor { - | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ] - | excludes = [ "user/tracked-explicitly-excluded"] - | } - | } - | ] - | precision.actor { - | processing-time { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | - | time-in-mailbox { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | - | mailbox-size { - | refresh-interval = 1 hour - | highest-trackable-value = 999999999 - | significant-value-digits = 2 - | } - | } - |} - """.stripMargin)) - - "the Kamon actor metrics" should { - "respect the configured include and exclude filters" in new ActorMetricsFixtures { - val trackedActor = createTestActor("tracked-actor") - actorMetricsRecorderOf(trackedActor) should not be empty - - val nonTrackedActor = createTestActor("non-tracked-actor") - actorMetricsRecorderOf(nonTrackedActor) shouldBe empty - - val trackedButExplicitlyExcluded = createTestActor("tracked-explicitly-excluded") - actorMetricsRecorderOf(trackedButExplicitlyExcluded) shouldBe empty - } - - "reset all recording instruments after taking a snapshot" in new ActorMetricsFixtures { - val trackedActor = createTestActor("clean-after-collect") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - for (i ← 1 to 100) { - trackedActor ! Discard - } - trackedActor ! Fail - trackedActor ! TrackTimings(sleep = Some(1 second)) - expectMsgType[TrackedTimings] - - val firstSnapshot = takeSnapshotOf(trackedActorMetrics) - firstSnapshot.errors.count should be(1L) - firstSnapshot.mailboxSize.numberOfMeasurements should be > 0L - firstSnapshot.processingTime.numberOfMeasurements should be(103L) // 102 examples + Initialize message - firstSnapshot.timeInMailbox.numberOfMeasurements should be(103L) // 102 examples + Initialize message - - val secondSnapshot = takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - secondSnapshot.errors.count should be(0L) - secondSnapshot.mailboxSize.numberOfMeasurements should be(3L) // min, max and current - secondSnapshot.processingTime.numberOfMeasurements should be(0L) - secondSnapshot.timeInMailbox.numberOfMeasurements should be(0L) - } - - "record the processing-time of the receive function" in new ActorMetricsFixtures { - val trackedActor = createTestActor("measuring-processing-time") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - - trackedActor ! TrackTimings(sleep = Some(1 second)) - val timings = expectMsgType[TrackedTimings] - val snapshot = takeSnapshotOf(trackedActorMetrics) - - snapshot.processingTime.numberOfMeasurements should be(1L) - snapshot.processingTime.recordsIterator.next().count should be(1L) - snapshot.processingTime.recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) - } - - "record the number of errors" in new ActorMetricsFixtures { - val trackedActor = createTestActor("measuring-errors") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - - for (i ← 1 to 10) { trackedActor ! Fail } - trackedActor ! Ping - expectMsg(Pong) - val snapshot = takeSnapshotOf(trackedActorMetrics) - - snapshot.errors.count should be(10) - } - - "record the mailbox-size" in new ActorMetricsFixtures { - val trackedActor = createTestActor("measuring-mailbox-size") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - - trackedActor ! TrackTimings(sleep = Some(1 second)) - for (i ← 1 to 10) { - trackedActor ! Discard - } - trackedActor ! Ping - - val timings = expectMsgType[TrackedTimings] - expectMsg(Pong) - val snapshot = takeSnapshotOf(trackedActorMetrics) - - snapshot.mailboxSize.min should be(0L) - snapshot.mailboxSize.max should be(11L +- 1L) - } - - "record the time-in-mailbox" in new ActorMetricsFixtures { - val trackedActor = createTestActor("measuring-time-in-mailbox") - val trackedActorMetrics = actorMetricsRecorderOf(trackedActor).get - takeSnapshotOf(trackedActorMetrics) // Ensure that the recorders are clean - - trackedActor ! TrackTimings(sleep = Some(1 second)) - val timings = expectMsgType[TrackedTimings] - val snapshot = takeSnapshotOf(trackedActorMetrics) - - snapshot.timeInMailbox.numberOfMeasurements should be(1L) - snapshot.timeInMailbox.recordsIterator.next().count should be(1L) - snapshot.timeInMailbox.recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) - } - - "clean up the associated recorder when the actor is stopped" in new ActorMetricsFixtures { - val trackedActor = createTestActor("stop") - actorMetricsRecorderOf(trackedActor).get // force the actor to be initialized - Kamon(Metrics).storage.get(ActorMetrics("user/stop")) should not be empty - - val deathWatcher = TestProbe() - deathWatcher.watch(trackedActor) - trackedActor ! PoisonPill - deathWatcher.expectTerminated(trackedActor) - - Kamon(Metrics).storage.get(ActorMetrics("user/stop")) shouldBe empty - } - } - - trait ActorMetricsFixtures { - val collectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(10000) - } - - def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetricsRecorder] = { - val initialisationListener = TestProbe() - ref.tell(Ping, initialisationListener.ref) - initialisationListener.expectMsg(Pong) - - val underlyingCellField = ref.getClass.getDeclaredMethod("underlying") - val cell = underlyingCellField.invoke(ref).asInstanceOf[ActorCellMetrics] - - cell.actorMetricsRecorder - } - - def createTestActor(name: String): ActorRef = system.actorOf(Props[ActorMetricsTestActor], name) - - def takeSnapshotOf(amr: ActorMetricsRecorder): ActorMetricSnapshot = amr.collect(collectionContext) - } -} - -class ActorMetricsTestActor extends Actor { - def receive = { - case Discard ⇒ - case Fail ⇒ throw new ArithmeticException("Division by zero.") - case Ping ⇒ sender ! Pong - case TrackTimings(sendTimestamp, sleep) ⇒ { - val dequeueTimestamp = System.nanoTime() - sleep.map(s ⇒ Thread.sleep(s.toMillis)) - val afterReceiveTimestamp = System.nanoTime() - - sender ! TrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) - } - } -} - -object ActorMetricsTestActor { - case object Ping - case object Pong - case object Fail - case object Discard - - case class TrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) - case class TrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { - def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp - def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp - } -} diff --git a/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala deleted file mode 100644 index ae324b73..00000000 --- a/kamon-core/src/test/scala/kamon/metric/DispatcherMetricsSpec.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import org.scalatest.{ WordSpecLike, Matchers } -import akka.testkit.{ TestProbe, TestKitBase } -import akka.actor.{ ActorRef, Props, ActorSystem } -import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ -import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.DispatcherMetrics.DispatcherMetricSnapshot - -class DispatcherMetricsSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("dispatcher-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 second - | default-collection-context-buffer-size = 10 - | - | filters = [ - | { - | dispatcher { - | includes = ["*"] - | excludes = ["dispatcher-explicitly-excluded"] - | } - | } - | ] - |} - | - |dispatcher-explicitly-excluded { - | type = "Dispatcher" - | executor = "fork-join-executor" - |} - | - |tracked-dispatcher { - | type = "Dispatcher" - | executor = "thread-pool-executor" - |} - | - """.stripMargin)) - - "the Kamon dispatcher metrics" should { - "respect the configured include and exclude filters" in { - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("tracked-dispatcher"), "actor-with-tracked-dispatcher") - system.actorOf(Props[ActorMetricsTestActor].withDispatcher("dispatcher-explicitly-excluded"), "actor-with-excluded-dispatcher") - - Kamon(Metrics).subscribe(DispatcherMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] - - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(DispatcherMetrics("tracked-dispatcher")) - tickSnapshot.metrics.keys should not contain (DispatcherMetrics("dispatcher-explicitly-excluded")) - } - } - - "record maximumPoolSize, runningThreadCount, queueTaskCount, poolSize metrics" in new DelayableActorFixture { - val (delayable, metricsListener) = delayableActor("worker-actor", "tracked-dispatcher") - - for (_ ← 1 to 100) { - //delayable ! Discard - } - - val dispatcherMetrics = expectDispatcherMetrics("tracked-dispatcher", metricsListener, 3 seconds) - dispatcherMetrics.maximumPoolSize.max should be <= 64L //fail in travis - dispatcherMetrics.poolSize.max should be <= 22L //fail in travis - dispatcherMetrics.queueTaskCount.max should be(0L) - dispatcherMetrics.runningThreadCount.max should be(0L) - } - - } - - def expectDispatcherMetrics(dispatcherId: String, listener: TestProbe, waitTime: FiniteDuration): DispatcherMetricSnapshot = { - val tickSnapshot = within(waitTime) { - listener.expectMsgType[TickMetricSnapshot] - } - val dispatcherMetricsOption = tickSnapshot.metrics.get(DispatcherMetrics(dispatcherId)) - dispatcherMetricsOption should not be empty - dispatcherMetricsOption.get.asInstanceOf[DispatcherMetricSnapshot] - } - - trait DelayableActorFixture { - def delayableActor(name: String, dispatcher: String): (ActorRef, TestProbe) = { - val actor = system.actorOf(Props[ActorMetricsTestActor].withDispatcher(dispatcher), name) - val metricsListener = TestProbe() - - Kamon(Metrics).subscribe(DispatcherMetrics, "*", metricsListener.ref, permanently = true) - // Wait for one empty snapshot before proceeding to the test. - metricsListener.expectMsgType[TickMetricSnapshot] - - (actor, metricsListener) - } - } -} diff --git a/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala deleted file mode 100644 index ebc43091..00000000 --- a/kamon-core/src/test/scala/kamon/metric/RouterMetricsSpec.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import java.nio.LongBuffer - -import akka.actor._ -import akka.routing.RoundRobinPool -import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } -import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metric.RouterMetrics._ -import kamon.metric.RouterMetricsTestActor._ -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.instrument.{ Counter, Histogram } -import org.scalatest.{ Matchers, WordSpecLike } - -import scala.concurrent.duration._ - -class RouterMetricsSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("router-metrics-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 second - | default-collection-context-buffer-size = 10 - | - | filters = [ - | { - | router { - | includes = [ "user/tracked-*", "user/measuring-*", "user/stop" ] - | excludes = [ "user/tracked-explicitly-excluded"] - | } - | } - | ] - | precision { - | default-histogram-precision { - | highest-trackable-value = 3600000000000 - | significant-value-digits = 2 - | } - | } - |} - """.stripMargin)) - - "the Kamon router metrics" should { - "respect the configured include and exclude filters" in new RouterMetricsFixtures { - createTestRouter("tracked-router") - createTestRouter("non-tracked-router") - createTestRouter("tracked-explicitly-excluded") - - Kamon(Metrics).subscribe(RouterMetrics, "*", testActor, permanently = true) - expectMsgType[TickMetricSnapshot] - - within(2 seconds) { - val tickSnapshot = expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.keys should contain(RouterMetrics("user/tracked-router")) - tickSnapshot.metrics.keys should not contain (RouterMetrics("user/non-tracked-router")) - tickSnapshot.metrics.keys should not contain (RouterMetrics("user/tracked-explicitly-excluded")) - } - } - - "record the processing-time of the receive function" in new RouterMetricsFixtures { - val metricsListener = TestProbe() - val trackedRouter = createTestRouter("measuring-processing-time") - - trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) - val timings = metricsListener.expectMsgType[RouterTrackedTimings] - - val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics - tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) - tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) - // tickSnapshot(RouterMetrics("user/measuring-processing-time")).metrics(ProcessingTime).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateProcessingTime +- 10.millis.toNanos) - } - - "record the number of errors" in new RouterMetricsFixtures { - val metricsListener = TestProbe() - val trackedRouter = createTestRouter("measuring-errors") - - for (i ← 1 to 10) { - trackedRouter.tell(Fail, metricsListener.ref) - } - val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics - tickSnapshot(RouterMetrics("user/measuring-errors")).metrics(Errors).asInstanceOf[Counter.Snapshot].count should be(10L) - } - - "record the time-in-mailbox" in new RouterMetricsFixtures { - val metricsListener = TestProbe() - val trackedRouter = createTestRouter("measuring-time-in-mailbox") - - trackedRouter.tell(RouterTrackTimings(sleep = Some(1 second)), metricsListener.ref) - val timings = metricsListener.expectMsgType[RouterTrackedTimings] - val tickSnapshot = expectMsgType[TickMetricSnapshot].metrics - - tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].numberOfMeasurements should be(1L) - tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().count should be(1L) - tickSnapshot(RouterMetrics("user/measuring-time-in-mailbox")).metrics(TimeInMailbox).asInstanceOf[Histogram.Snapshot].recordsIterator.next().level should be(timings.approximateTimeInMailbox +- 10.millis.toNanos) - } - - "clean up the associated recorder when the actor is stopped" in new RouterMetricsFixtures { - val trackedRouter = createTestRouter("stop") - trackedRouter ! Ping - Kamon(Metrics).storage.toString() // force to be initialized - Kamon(Metrics).storage.get(RouterMetrics("user/stop")) should not be empty - - val deathWatcher = TestProbe() - deathWatcher.watch(trackedRouter) - trackedRouter ! PoisonPill - deathWatcher.expectTerminated(trackedRouter) - - Kamon(Metrics).storage.get(RouterMetrics("user/stop")) shouldBe empty - } - } - - trait RouterMetricsFixtures { - val collectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(10000) - } - - def createTestRouter(name: String): ActorRef = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), name) - } -} - -class RouterMetricsTestActor extends Actor { - def receive = { - case Discard ⇒ - case Fail ⇒ throw new ArithmeticException("Division by zero.") - case Ping ⇒ sender ! Pong - case RouterTrackTimings(sendTimestamp, sleep) ⇒ { - val dequeueTimestamp = System.nanoTime() - sleep.map(s ⇒ Thread.sleep(s.toMillis)) - val afterReceiveTimestamp = System.nanoTime() - - sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) - } - } -} - -object RouterMetricsTestActor { - case object Ping - case object Pong - case object Fail - case object Discard - - case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) - case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { - def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp - def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp - } -} -- cgit v1.2.3