diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-05-09 15:36:16 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-05-09 15:36:16 +0200 |
commit | 76f503b8f954e1b149bea3adb8927704f7095876 (patch) | |
tree | 59cabc6055e855c006115c5847a85f879dc36dd9 /kamon-akka/src/test | |
parent | 520895a6a9a6b48b83efe01cf289708efd045b42 (diff) | |
parent | d69f14710b1d933d58412edd63b465b13a09a9d0 (diff) | |
download | Kamon-76f503b8f954e1b149bea3adb8927704f7095876.tar.gz Kamon-76f503b8f954e1b149bea3adb8927704f7095876.tar.bz2 Kamon-76f503b8f954e1b149bea3adb8927704f7095876.zip |
Merge branch 'master' into release-legacy-akka-2.2
Conflicts:
kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala
kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala
kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala
kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala
kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala
kamon-play/src/main/scala/kamon/play/action/KamonTraceActions.scala
kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala
kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala
project/Dependencies.scala
project/Settings.scala
Diffstat (limited to 'kamon-akka/src/test')
7 files changed, 103 insertions, 131 deletions
diff --git a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala index 0d8d41e3..36ea8065 100644 --- a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala @@ -38,8 +38,8 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") { | | filters { | akka-actor { - | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ] - | excludes = [ "user/tracked-explicitly-excluded", "user/non-tracked-actor" ] + | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/clean-after-collect", "*/user/stop", "*/" ] + | excludes = [ "*/user/tracked-explicitly-excluded", "*/user/non-tracked-actor" ] | } | } | @@ -64,6 +64,10 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") { actorMetricsRecorderOf(trackedButExplicitlyExcluded) shouldBe empty } + "not pick up the root supervisor" in { + Kamon.metrics.find("actor-metrics-spec/", ActorMetrics.category) shouldBe empty + } + "reset all recording instruments after taking a snapshot" in new ActorMetricsFixtures { val trackedActor = createTestActor("clean-after-collect") @@ -151,7 +155,7 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") { trackedActor ! PoisonPill deathWatcher.expectTerminated(trackedActor) - actorMetricsRecorderOf(trackedActor).get shouldNot be theSameInstanceAs (firstRecorder) + actorMetricsRecorderOf(trackedActor) shouldBe empty } } @@ -160,10 +164,10 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") { val buffer: LongBuffer = LongBuffer.allocate(10000) } - def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") + def actorRecorderName(ref: ActorRef): String = system.name + "/" + ref.path.elements.mkString("/") def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetrics] = - Kamon.metrics.register(ActorMetrics, actorRecorderName(ref)).map(_.recorder) + Kamon.metrics.find(actorRecorderName(ref), ActorMetrics.category).map(_.asInstanceOf[ActorMetrics]) def collectMetricsOf(ref: ActorRef): Option[EntitySnapshot] = { Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala index dd5cfa45..02a5a0d4 100644 --- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala @@ -36,8 +36,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { | | filters = { | akka-dispatcher { - | includes = [ "*" ] - | excludes = [ "explicitly-excluded" ] + | includes = [ "**" ] + | excludes = [ "*/explicitly-excluded" ] | } | } | @@ -84,16 +84,16 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe")) val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded")) - findDispatcherRecorder(defaultDispatcher) shouldNot be(empty) - findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) - findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) - findDispatcherRecorder(excludedDispatcher) should be(empty) + findDispatcherRecorder(defaultDispatcher, "fork-join-pool") shouldNot be(empty) + findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty) + findDispatcherRecorder(excludedDispatcher, "fork-join-pool") should be(empty) } "record metrics for a dispatcher with thread-pool-executor" in { implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") - refreshDispatcherInstruments(tpeDispatcher) - collectDispatcherMetrics(tpeDispatcher) + refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor") + collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") Await.result({ Future.sequence { @@ -101,8 +101,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { } }, 5 seconds) - refreshDispatcherInstruments(tpeDispatcher) - val snapshot = collectDispatcherMetrics(tpeDispatcher) + refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor") + val snapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") snapshot.gauge("active-threads") should not be empty snapshot.gauge("pool-size").get.min should be >= 7L @@ -112,13 +112,13 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { snapshot.gauge("processed-tasks").get.max should be(102L +- 5L) // The processed tasks should be reset to 0 if no more tasks are submitted. - val secondSnapshot = collectDispatcherMetrics(tpeDispatcher) + val secondSnapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor") secondSnapshot.gauge("processed-tasks").get.max should be(0) } "record metrics for a dispatcher with fork-join-executor" in { implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") - collectDispatcherMetrics(fjpDispatcher) + collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") Await.result({ Future.sequence { @@ -126,8 +126,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { } }, 5 seconds) - refreshDispatcherInstruments(fjpDispatcher) - val snapshot = collectDispatcherMetrics(fjpDispatcher) + refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool") + val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool") snapshot.minMaxCounter("parallelism").get.max should be(22) snapshot.gauge("pool-size").get.min should be >= 0L @@ -142,28 +142,28 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") { implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe") implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp") - findDispatcherRecorder(fjpDispatcher) shouldNot be(empty) - findDispatcherRecorder(tpeDispatcher) shouldNot be(empty) + findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty) shutdownDispatcher(tpeDispatcher) shutdownDispatcher(fjpDispatcher) - findDispatcherRecorder(fjpDispatcher) should be(empty) - findDispatcherRecorder(tpeDispatcher) should be(empty) + findDispatcherRecorder(fjpDispatcher, "fork-join-pool") should be(empty) + findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") should be(empty) } } def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/") - def findDispatcherRecorder(dispatcher: MessageDispatcher): Option[EntityRecorder] = - Kamon.metrics.find(dispatcher.id, "akka-dispatcher") + def findDispatcherRecorder(dispatcher: MessageDispatcher, dispatcherType: String): Option[EntityRecorder] = + Kamon.metrics.find(system.name + "/" + dispatcher.id, "akka-dispatcher", tags = Map("dispatcher-type" -> dispatcherType)) - def collectDispatcherMetrics(dispatcher: MessageDispatcher): EntitySnapshot = - findDispatcherRecorder(dispatcher).map(_.collect(collectionContext)).get + def collectDispatcherMetrics(dispatcher: MessageDispatcher, dispatcherType: String): EntitySnapshot = + findDispatcherRecorder(dispatcher, dispatcherType).map(_.collect(collectionContext)).get - def refreshDispatcherInstruments(dispatcher: MessageDispatcher): Unit = { - findDispatcherRecorder(dispatcher) match { + def refreshDispatcherInstruments(dispatcher: MessageDispatcher, dispatcherType: String): Unit = { + findDispatcherRecorder(dispatcher, dispatcherType) match { case Some(tpe: ThreadPoolExecutorDispatcherMetrics) ⇒ tpe.processedTasks.refreshValue() tpe.activeThreads.refreshValue() diff --git a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala index c4c1d9ad..9c452a90 100644 --- a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala @@ -39,8 +39,8 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") { | | filters = { | akka-router { - | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ] - | excludes = [ "user/tracked-explicitly-excluded-*"] + | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/stop-*" ] + | excludes = [ "*/user/tracked-explicitly-excluded-*"] | } | } |} @@ -122,7 +122,7 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") { trackedRouter ! PoisonPill deathWatcher.expectTerminated(trackedRouter) - routerMetricsRecorderOf("user/stop-in-router").get shouldNot be theSameInstanceAs (firstRecorder) + routerMetricsRecorderOf("user/stop-in-router") shouldBe empty } } @@ -132,7 +132,7 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") { } def routerMetricsRecorderOf(routerName: String): Option[RouterMetrics] = - Kamon.metrics.register(RouterMetrics, routerName).map(_.recorder) + Kamon.metrics.find(system.name + "/" + routerName, RouterMetrics.category).map(_.asInstanceOf[RouterMetrics]) def collectMetricsOf(routerName: String): Option[EntitySnapshot] = { Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested. diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala index 593a7baa..6b5904b6 100644 --- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala @@ -20,7 +20,7 @@ import akka.pattern.{ ask, pipe } import akka.routing._ import akka.util.Timeout import kamon.testkit.BaseKamonSpec -import kamon.trace.TraceContext +import kamon.trace.Tracer import scala.concurrent.duration._ @@ -29,18 +29,18 @@ class ActorCellInstrumentationSpec extends BaseKamonSpec("actor-cell-instrumenta "the message passing instrumentation" should { "propagate the TraceContext using bang" in new EchoActorFixture { - val testTraceContext = TraceContext.withContext(newContext("bang-reply")) { + val testTraceContext = Tracer.withContext(newContext("bang-reply")) { ctxEchoActor ! "test" - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext using tell" in new EchoActorFixture { - val testTraceContext = TraceContext.withContext(newContext("tell-reply")) { + val testTraceContext = Tracer.withContext(newContext("tell-reply")) { ctxEchoActor.tell("test", testActor) - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) @@ -48,19 +48,19 @@ class ActorCellInstrumentationSpec extends BaseKamonSpec("actor-cell-instrumenta "propagate the TraceContext using ask" in new EchoActorFixture { implicit val timeout = Timeout(1 seconds) - val testTraceContext = TraceContext.withContext(newContext("ask-reply")) { + val testTraceContext = Tracer.withContext(newContext("ask-reply")) { // The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it. (ctxEchoActor ? "test") pipeTo (testActor) - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) } "propagate the TraceContext to actors behind a pool router" in new RoundRobinRouterFixture { - val testTraceContext = TraceContext.withContext(newContext("router-reply")) { + val testTraceContext = Tracer.withContext(newContext("router-reply")) { router ! "test" - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) @@ -81,7 +81,7 @@ class ActorCellInstrumentationSpec extends BaseKamonSpec("actor-cell-instrumenta class TraceContextEcho extends Actor { def receive = { - case msg: String ⇒ sender ! TraceContext.currentContext + case msg: String ⇒ sender ! Tracer.currentContext } } diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala index 143c816d..a45a5ae9 100644 --- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala @@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory import kamon.testkit.BaseKamonSpec import kamon.trace.TraceLocal.AvailableToMdc import kamon.trace.logging.MdcKeysSupport -import kamon.trace.{ TraceContextAware, TraceLocal, TraceContext } +import kamon.trace.{ Tracer, TraceContextAware, TraceLocal } import org.scalatest.Inspectors import org.slf4j.MDC @@ -39,9 +39,9 @@ class ActorLoggingInstrumentationSpec extends BaseKamonSpec("actor-logging-instr val loggerActor = system.actorOf(Props[LoggerActor]) system.eventStream.subscribe(testActor, classOf[LogEvent]) - val testTraceContext = TraceContext.withContext(newContext("logging")) { + val testTraceContext = Tracer.withContext(newContext("logging")) { loggerActor ! "info" - TraceContext.currentContext + Tracer.currentContext } fishForMessage() { @@ -55,7 +55,7 @@ class ActorLoggingInstrumentationSpec extends BaseKamonSpec("actor-logging-instr "allow retrieve a value from the MDC when was created a key of type AvailableToMdc" in { val testString = "Hello World" - TraceContext.withContext(newContext("logging-with-mdc")) { + Tracer.withContext(newContext("logging-with-mdc")) { TraceLocal.store(AvailableToMdc("some-cool-key"))(testString) withMdc { @@ -69,6 +69,6 @@ class ActorLoggingInstrumentationSpec extends BaseKamonSpec("actor-logging-instr class LoggerActor extends Actor with ActorLogging { def receive = { - case "info" ⇒ log.info("TraceContext(name = {}, token = {})", TraceContext.currentContext.name, TraceContext.currentContext.token) + case "info" ⇒ log.info("TraceContext(name = {}, token = {})", Tracer.currentContext.name, Tracer.currentContext.token) } } diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala index cf5f1b5b..832e5b71 100644 --- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala @@ -21,7 +21,7 @@ import akka.actor._ import akka.testkit.ImplicitSender import com.typesafe.config.ConfigFactory import kamon.testkit.BaseKamonSpec -import kamon.trace.{ EmptyTraceContext, TraceContext } +import kamon.trace.{ Tracer, EmptyTraceContext } import org.scalatest.WordSpecLike import scala.concurrent.duration._ @@ -40,31 +40,31 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system- "the system message passing instrumentation" should { "keep the TraceContext while processing the Create message in top level actors" in { - val testTraceContext = TraceContext.withContext(newContext("creating-top-level-actor")) { + val testTraceContext = Tracer.withContext(newContext("creating-top-level-actor")) { system.actorOf(Props(new Actor { - testActor ! TraceContext.currentContext + testActor ! Tracer.currentContext def receive: Actor.Receive = { case any ⇒ } })) - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) } "keep the TraceContext while processing the Create message in non top level actors" in { - val testTraceContext = TraceContext.withContext(newContext("creating-non-top-level-actor")) { + val testTraceContext = Tracer.withContext(newContext("creating-non-top-level-actor")) { system.actorOf(Props(new Actor { def receive: Actor.Receive = { case any ⇒ context.actorOf(Props(new Actor { - testActor ! TraceContext.currentContext + testActor ! Tracer.currentContext def receive: Actor.Receive = { case any ⇒ } })) } })) ! "any" - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) @@ -74,9 +74,9 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system- "the actor is resumed" in { val supervisor = supervisorWithDirective(Resume) - val testTraceContext = TraceContext.withContext(newContext("fail-and-resume")) { + val testTraceContext = Tracer.withContext(newContext("fail-and-resume")) { supervisor ! "fail" - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -89,9 +89,9 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system- "the actor is restarted" in { val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true) - val testTraceContext = TraceContext.withContext(newContext("fail-and-restart")) { + val testTraceContext = Tracer.withContext(newContext("fail-and-restart")) { supervisor ! "fail" - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -106,9 +106,9 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system- "the actor is stopped" in { val supervisor = supervisorWithDirective(Stop, sendPostStop = true) - val testTraceContext = TraceContext.withContext(newContext("fail-and-stop")) { + val testTraceContext = Tracer.withContext(newContext("fail-and-stop")) { supervisor ! "fail" - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -119,9 +119,9 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system- "the failure is escalated" in { val supervisor = supervisorWithDirective(Escalate, sendPostStop = true) - val testTraceContext = TraceContext.withContext(newContext("fail-and-escalate")) { + val testTraceContext = Tracer.withContext(newContext("fail-and-escalate")) { supervisor ! "fail" - TraceContext.currentContext + Tracer.currentContext } expectMsg(testTraceContext) // From the parent executing the supervision strategy @@ -139,7 +139,7 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system- val child = context.actorOf(Props(new Parent)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceContext.currentContext; Stop + case NonFatal(throwable) ⇒ testActor ! Tracer.currentContext; Stop } def receive = { @@ -151,7 +151,7 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system- val child = context.actorOf(Props(new Child)) override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case NonFatal(throwable) ⇒ testActor ! TraceContext.currentContext; directive + case NonFatal(throwable) ⇒ testActor ! Tracer.currentContext; directive } def receive: Actor.Receive = { @@ -159,7 +159,7 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system- } override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceContext.currentContext + if (sendPostStop) testActor ! Tracer.currentContext super.postStop() } } @@ -167,26 +167,26 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system- class Child extends Actor { def receive = { case "fail" ⇒ throw new ArithmeticException("Division by zero.") - case "context" ⇒ sender ! TraceContext.currentContext + case "context" ⇒ sender ! Tracer.currentContext } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { - if (sendPreRestart) testActor ! TraceContext.currentContext + if (sendPreRestart) testActor ! Tracer.currentContext super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { - if (sendPostRestart) testActor ! TraceContext.currentContext + if (sendPostRestart) testActor ! Tracer.currentContext super.postRestart(reason) } override def postStop(): Unit = { - if (sendPostStop) testActor ! TraceContext.currentContext + if (sendPostStop) testActor ! Tracer.currentContext super.postStop() } override def preStart(): Unit = { - if (sendPreStart) testActor ! TraceContext.currentContext + if (sendPreStart) testActor ! Tracer.currentContext super.preStart() } } diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala index d925fbf6..af6ce76e 100644 --- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala @@ -16,18 +16,16 @@ package kamon.instrumentation.akka -import java.util.concurrent.atomic.AtomicInteger - import akka.actor._ -import akka.event.Logging.Warning import akka.pattern.ask -import akka.testkit.TestProbe +import akka.testkit.EventFilter import akka.util.Timeout import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.akka.Akka +import kamon.akka.AskPatternTimeoutWarningSettings.{ Off, Lightweight, Heavyweight } +import kamon.akka.{ AskPatternTimeoutWarningSetting, Akka } import kamon.testkit.BaseKamonSpec -import kamon.trace.{ TraceContext, TraceContextAware } +import kamon.trace.Tracer import scala.concurrent.duration._ @@ -37,6 +35,7 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s """ |akka { | loglevel = OFF + | loggers = [akka.testkit.TestEventListener] |} """.stripMargin) @@ -47,87 +46,56 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s "the AskPatternInstrumentation" when { "configured in heavyweight mode" should { - "log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture { - setAskPatternTimeoutWarningMode("heavyweight") + "log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in { + val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-1") + setAskPatternTimeoutWarningMode(Heavyweight) - expectTimeoutWarning() { - TraceContext.withContext(newContext("ask-timeout-warning")) { + EventFilter.warning(start = "Timeout triggered for ask pattern to actor [no-reply-1] at").intercept { + Tracer.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" - TraceContext.currentContext + Tracer.currentContext } } } } "configured in lightweight mode" should { - "log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture { - setAskPatternTimeoutWarningMode("lightweight") + "log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in { + val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-2") + setAskPatternTimeoutWarningMode(Lightweight) - expectTimeoutWarning(messageSizeLimit = Some(1)) { - TraceContext.withContext(newContext("ask-timeout-warning")) { + EventFilter.warning(start = "Timeout triggered for ask pattern to actor [no-reply-2] at").intercept { + Tracer.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" - TraceContext.currentContext + Tracer.currentContext } } } } "configured in off mode" should { - "should not log any warning messages" in new NoReplyFixture { - setAskPatternTimeoutWarningMode("off") - - expectTimeoutWarning(expectWarning = false) { - TraceContext.withContext(newContext("ask-timeout-warning")) { - noReplyActorRef ? "hello" - TraceContext.currentContext + "should not log any warning messages" in { + val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-3") + setAskPatternTimeoutWarningMode(Off) + + intercept[AssertionError] { // No message will be logged and the event filter will fail. + EventFilter.warning(start = "Timeout triggered for ask pattern to actor", occurrences = 1).intercept { + Tracer.withContext(newContext("ask-timeout-warning")) { + noReplyActorRef ? "hello" + Tracer.currentContext + } } } } } } - def expectTimeoutWarning(messageSizeLimit: Option[Int] = None, expectWarning: Boolean = true)(thunk: ⇒ TraceContext): Unit = { - val listener = warningListener() - val testTraceContext = thunk - - if (expectWarning) { - val warning = listener.fishForMessage() { - case Warning(_, _, msg) if msg.toString.startsWith("Timeout triggered for ask pattern registered at") ⇒ true - case others ⇒ false - }.asInstanceOf[Warning] - - warning.asInstanceOf[TraceContextAware].traceContext should equal(testTraceContext) - messageSizeLimit.map { messageLimit ⇒ - warning.message.toString.lines.size should be(messageLimit) - } - } else { - listener.expectNoMsg() - } - } - - def warningListener(): TestProbe = { - val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[Warning]) - listener - } - - def setAskPatternTimeoutWarningMode(mode: String): Unit = { + def setAskPatternTimeoutWarningMode(mode: AskPatternTimeoutWarningSetting): Unit = { val target = Kamon(Akka) val field = target.getClass.getDeclaredField("askPatternTimeoutWarning") field.setAccessible(true) field.set(target, mode) } - - val fixtureCounter = new AtomicInteger(0) - - trait NoReplyFixture { - def noReplyActorRef: ActorRef = system.actorOf(Props[NoReply], "no-reply-" + fixtureCounter.incrementAndGet()) - - def noReplyActorSelection: ActorSelection = { - val target = noReplyActorRef - system.actorSelection(target.path) - } - } } class NoReply extends Actor { |