aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src/test
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-05-09 15:36:16 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2015-05-09 15:36:16 +0200
commit76f503b8f954e1b149bea3adb8927704f7095876 (patch)
tree59cabc6055e855c006115c5847a85f879dc36dd9 /kamon-akka/src/test
parent520895a6a9a6b48b83efe01cf289708efd045b42 (diff)
parentd69f14710b1d933d58412edd63b465b13a09a9d0 (diff)
downloadKamon-76f503b8f954e1b149bea3adb8927704f7095876.tar.gz
Kamon-76f503b8f954e1b149bea3adb8927704f7095876.tar.bz2
Kamon-76f503b8f954e1b149bea3adb8927704f7095876.zip
Merge branch 'master' into release-legacy-akka-2.2
Conflicts: kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala kamon-core/src/test/scala/kamon/metric/TraceMetricsSpec.scala kamon-core/src/test/scala/kamon/testkit/BaseKamonSpec.scala kamon-play/src/main/scala/kamon/play/action/KamonTraceActions.scala kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala kamon-play/src/test/scala/kamon/play/RequestInstrumentationSpec.scala project/Dependencies.scala project/Settings.scala
Diffstat (limited to 'kamon-akka/src/test')
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala14
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala48
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala8
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala20
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala10
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala46
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala88
7 files changed, 103 insertions, 131 deletions
diff --git a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala
index 0d8d41e3..36ea8065 100644
--- a/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/ActorMetricsSpec.scala
@@ -38,8 +38,8 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") {
|
| filters {
| akka-actor {
- | includes = [ "user/tracked-*", "user/measuring-*", "user/clean-after-collect", "user/stop" ]
- | excludes = [ "user/tracked-explicitly-excluded", "user/non-tracked-actor" ]
+ | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/clean-after-collect", "*/user/stop", "*/" ]
+ | excludes = [ "*/user/tracked-explicitly-excluded", "*/user/non-tracked-actor" ]
| }
| }
|
@@ -64,6 +64,10 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") {
actorMetricsRecorderOf(trackedButExplicitlyExcluded) shouldBe empty
}
+ "not pick up the root supervisor" in {
+ Kamon.metrics.find("actor-metrics-spec/", ActorMetrics.category) shouldBe empty
+ }
+
"reset all recording instruments after taking a snapshot" in new ActorMetricsFixtures {
val trackedActor = createTestActor("clean-after-collect")
@@ -151,7 +155,7 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") {
trackedActor ! PoisonPill
deathWatcher.expectTerminated(trackedActor)
- actorMetricsRecorderOf(trackedActor).get shouldNot be theSameInstanceAs (firstRecorder)
+ actorMetricsRecorderOf(trackedActor) shouldBe empty
}
}
@@ -160,10 +164,10 @@ class ActorMetricsSpec extends BaseKamonSpec("actor-metrics-spec") {
val buffer: LongBuffer = LongBuffer.allocate(10000)
}
- def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/")
+ def actorRecorderName(ref: ActorRef): String = system.name + "/" + ref.path.elements.mkString("/")
def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetrics] =
- Kamon.metrics.register(ActorMetrics, actorRecorderName(ref)).map(_.recorder)
+ Kamon.metrics.find(actorRecorderName(ref), ActorMetrics.category).map(_.asInstanceOf[ActorMetrics])
def collectMetricsOf(ref: ActorRef): Option[EntitySnapshot] = {
Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested.
diff --git a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
index dd5cfa45..02a5a0d4 100644
--- a/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/DispatcherMetricsSpec.scala
@@ -36,8 +36,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
|
| filters = {
| akka-dispatcher {
- | includes = [ "*" ]
- | excludes = [ "explicitly-excluded" ]
+ | includes = [ "**" ]
+ | excludes = [ "*/explicitly-excluded" ]
| }
| }
|
@@ -84,16 +84,16 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe"))
val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded"))
- findDispatcherRecorder(defaultDispatcher) shouldNot be(empty)
- findDispatcherRecorder(fjpDispatcher) shouldNot be(empty)
- findDispatcherRecorder(tpeDispatcher) shouldNot be(empty)
- findDispatcherRecorder(excludedDispatcher) should be(empty)
+ findDispatcherRecorder(defaultDispatcher, "fork-join-pool") shouldNot be(empty)
+ findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty)
+ findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty)
+ findDispatcherRecorder(excludedDispatcher, "fork-join-pool") should be(empty)
}
"record metrics for a dispatcher with thread-pool-executor" in {
implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
- refreshDispatcherInstruments(tpeDispatcher)
- collectDispatcherMetrics(tpeDispatcher)
+ refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor")
+ collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor")
Await.result({
Future.sequence {
@@ -101,8 +101,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
}
}, 5 seconds)
- refreshDispatcherInstruments(tpeDispatcher)
- val snapshot = collectDispatcherMetrics(tpeDispatcher)
+ refreshDispatcherInstruments(tpeDispatcher, "thread-pool-executor")
+ val snapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor")
snapshot.gauge("active-threads") should not be empty
snapshot.gauge("pool-size").get.min should be >= 7L
@@ -112,13 +112,13 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
snapshot.gauge("processed-tasks").get.max should be(102L +- 5L)
// The processed tasks should be reset to 0 if no more tasks are submitted.
- val secondSnapshot = collectDispatcherMetrics(tpeDispatcher)
+ val secondSnapshot = collectDispatcherMetrics(tpeDispatcher, "thread-pool-executor")
secondSnapshot.gauge("processed-tasks").get.max should be(0)
}
"record metrics for a dispatcher with fork-join-executor" in {
implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
- collectDispatcherMetrics(fjpDispatcher)
+ collectDispatcherMetrics(fjpDispatcher, "fork-join-pool")
Await.result({
Future.sequence {
@@ -126,8 +126,8 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
}
}, 5 seconds)
- refreshDispatcherInstruments(fjpDispatcher)
- val snapshot = collectDispatcherMetrics(fjpDispatcher)
+ refreshDispatcherInstruments(fjpDispatcher, "fork-join-pool")
+ val snapshot = collectDispatcherMetrics(fjpDispatcher, "fork-join-pool")
snapshot.minMaxCounter("parallelism").get.max should be(22)
snapshot.gauge("pool-size").get.min should be >= 0L
@@ -142,28 +142,28 @@ class DispatcherMetricsSpec extends BaseKamonSpec("dispatcher-metrics-spec") {
implicit val tpeDispatcher = system.dispatchers.lookup("tracked-tpe")
implicit val fjpDispatcher = system.dispatchers.lookup("tracked-fjp")
- findDispatcherRecorder(fjpDispatcher) shouldNot be(empty)
- findDispatcherRecorder(tpeDispatcher) shouldNot be(empty)
+ findDispatcherRecorder(fjpDispatcher, "fork-join-pool") shouldNot be(empty)
+ findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") shouldNot be(empty)
shutdownDispatcher(tpeDispatcher)
shutdownDispatcher(fjpDispatcher)
- findDispatcherRecorder(fjpDispatcher) should be(empty)
- findDispatcherRecorder(tpeDispatcher) should be(empty)
+ findDispatcherRecorder(fjpDispatcher, "fork-join-pool") should be(empty)
+ findDispatcherRecorder(tpeDispatcher, "thread-pool-executor") should be(empty)
}
}
def actorRecorderName(ref: ActorRef): String = ref.path.elements.mkString("/")
- def findDispatcherRecorder(dispatcher: MessageDispatcher): Option[EntityRecorder] =
- Kamon.metrics.find(dispatcher.id, "akka-dispatcher")
+ def findDispatcherRecorder(dispatcher: MessageDispatcher, dispatcherType: String): Option[EntityRecorder] =
+ Kamon.metrics.find(system.name + "/" + dispatcher.id, "akka-dispatcher", tags = Map("dispatcher-type" -> dispatcherType))
- def collectDispatcherMetrics(dispatcher: MessageDispatcher): EntitySnapshot =
- findDispatcherRecorder(dispatcher).map(_.collect(collectionContext)).get
+ def collectDispatcherMetrics(dispatcher: MessageDispatcher, dispatcherType: String): EntitySnapshot =
+ findDispatcherRecorder(dispatcher, dispatcherType).map(_.collect(collectionContext)).get
- def refreshDispatcherInstruments(dispatcher: MessageDispatcher): Unit = {
- findDispatcherRecorder(dispatcher) match {
+ def refreshDispatcherInstruments(dispatcher: MessageDispatcher, dispatcherType: String): Unit = {
+ findDispatcherRecorder(dispatcher, dispatcherType) match {
case Some(tpe: ThreadPoolExecutorDispatcherMetrics) ⇒
tpe.processedTasks.refreshValue()
tpe.activeThreads.refreshValue()
diff --git a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala
index c4c1d9ad..9c452a90 100644
--- a/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/RouterMetricsSpec.scala
@@ -39,8 +39,8 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") {
|
| filters = {
| akka-router {
- | includes = [ "user/tracked-*", "user/measuring-*", "user/stop-*" ]
- | excludes = [ "user/tracked-explicitly-excluded-*"]
+ | includes = [ "*/user/tracked-*", "*/user/measuring-*", "*/user/stop-*" ]
+ | excludes = [ "*/user/tracked-explicitly-excluded-*"]
| }
| }
|}
@@ -122,7 +122,7 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") {
trackedRouter ! PoisonPill
deathWatcher.expectTerminated(trackedRouter)
- routerMetricsRecorderOf("user/stop-in-router").get shouldNot be theSameInstanceAs (firstRecorder)
+ routerMetricsRecorderOf("user/stop-in-router") shouldBe empty
}
}
@@ -132,7 +132,7 @@ class RouterMetricsSpec extends BaseKamonSpec("router-metrics-spec") {
}
def routerMetricsRecorderOf(routerName: String): Option[RouterMetrics] =
- Kamon.metrics.register(RouterMetrics, routerName).map(_.recorder)
+ Kamon.metrics.find(system.name + "/" + routerName, RouterMetrics.category).map(_.asInstanceOf[RouterMetrics])
def collectMetricsOf(routerName: String): Option[EntitySnapshot] = {
Thread.sleep(5) // Just in case the test advances a bit faster than the actor being tested.
diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala
index 593a7baa..6b5904b6 100644
--- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorCellInstrumentationSpec.scala
@@ -20,7 +20,7 @@ import akka.pattern.{ ask, pipe }
import akka.routing._
import akka.util.Timeout
import kamon.testkit.BaseKamonSpec
-import kamon.trace.TraceContext
+import kamon.trace.Tracer
import scala.concurrent.duration._
@@ -29,18 +29,18 @@ class ActorCellInstrumentationSpec extends BaseKamonSpec("actor-cell-instrumenta
"the message passing instrumentation" should {
"propagate the TraceContext using bang" in new EchoActorFixture {
- val testTraceContext = TraceContext.withContext(newContext("bang-reply")) {
+ val testTraceContext = Tracer.withContext(newContext("bang-reply")) {
ctxEchoActor ! "test"
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext)
}
"propagate the TraceContext using tell" in new EchoActorFixture {
- val testTraceContext = TraceContext.withContext(newContext("tell-reply")) {
+ val testTraceContext = Tracer.withContext(newContext("tell-reply")) {
ctxEchoActor.tell("test", testActor)
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext)
@@ -48,19 +48,19 @@ class ActorCellInstrumentationSpec extends BaseKamonSpec("actor-cell-instrumenta
"propagate the TraceContext using ask" in new EchoActorFixture {
implicit val timeout = Timeout(1 seconds)
- val testTraceContext = TraceContext.withContext(newContext("ask-reply")) {
+ val testTraceContext = Tracer.withContext(newContext("ask-reply")) {
// The pipe pattern use Futures internally, so FutureTracing test should cover the underpinnings of it.
(ctxEchoActor ? "test") pipeTo (testActor)
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext)
}
"propagate the TraceContext to actors behind a pool router" in new RoundRobinRouterFixture {
- val testTraceContext = TraceContext.withContext(newContext("router-reply")) {
+ val testTraceContext = Tracer.withContext(newContext("router-reply")) {
router ! "test"
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext)
@@ -81,7 +81,7 @@ class ActorCellInstrumentationSpec extends BaseKamonSpec("actor-cell-instrumenta
class TraceContextEcho extends Actor {
def receive = {
- case msg: String ⇒ sender ! TraceContext.currentContext
+ case msg: String ⇒ sender ! Tracer.currentContext
}
}
diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala
index 143c816d..a45a5ae9 100644
--- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorLoggingInstrumentationSpec.scala
@@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory
import kamon.testkit.BaseKamonSpec
import kamon.trace.TraceLocal.AvailableToMdc
import kamon.trace.logging.MdcKeysSupport
-import kamon.trace.{ TraceContextAware, TraceLocal, TraceContext }
+import kamon.trace.{ Tracer, TraceContextAware, TraceLocal }
import org.scalatest.Inspectors
import org.slf4j.MDC
@@ -39,9 +39,9 @@ class ActorLoggingInstrumentationSpec extends BaseKamonSpec("actor-logging-instr
val loggerActor = system.actorOf(Props[LoggerActor])
system.eventStream.subscribe(testActor, classOf[LogEvent])
- val testTraceContext = TraceContext.withContext(newContext("logging")) {
+ val testTraceContext = Tracer.withContext(newContext("logging")) {
loggerActor ! "info"
- TraceContext.currentContext
+ Tracer.currentContext
}
fishForMessage() {
@@ -55,7 +55,7 @@ class ActorLoggingInstrumentationSpec extends BaseKamonSpec("actor-logging-instr
"allow retrieve a value from the MDC when was created a key of type AvailableToMdc" in {
val testString = "Hello World"
- TraceContext.withContext(newContext("logging-with-mdc")) {
+ Tracer.withContext(newContext("logging-with-mdc")) {
TraceLocal.store(AvailableToMdc("some-cool-key"))(testString)
withMdc {
@@ -69,6 +69,6 @@ class ActorLoggingInstrumentationSpec extends BaseKamonSpec("actor-logging-instr
class LoggerActor extends Actor with ActorLogging {
def receive = {
- case "info" ⇒ log.info("TraceContext(name = {}, token = {})", TraceContext.currentContext.name, TraceContext.currentContext.token)
+ case "info" ⇒ log.info("TraceContext(name = {}, token = {})", Tracer.currentContext.name, Tracer.currentContext.token)
}
}
diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala
index cf5f1b5b..832e5b71 100644
--- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/ActorSystemMessageInstrumentationSpec.scala
@@ -21,7 +21,7 @@ import akka.actor._
import akka.testkit.ImplicitSender
import com.typesafe.config.ConfigFactory
import kamon.testkit.BaseKamonSpec
-import kamon.trace.{ EmptyTraceContext, TraceContext }
+import kamon.trace.{ Tracer, EmptyTraceContext }
import org.scalatest.WordSpecLike
import scala.concurrent.duration._
@@ -40,31 +40,31 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-
"the system message passing instrumentation" should {
"keep the TraceContext while processing the Create message in top level actors" in {
- val testTraceContext = TraceContext.withContext(newContext("creating-top-level-actor")) {
+ val testTraceContext = Tracer.withContext(newContext("creating-top-level-actor")) {
system.actorOf(Props(new Actor {
- testActor ! TraceContext.currentContext
+ testActor ! Tracer.currentContext
def receive: Actor.Receive = { case any ⇒ }
}))
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext)
}
"keep the TraceContext while processing the Create message in non top level actors" in {
- val testTraceContext = TraceContext.withContext(newContext("creating-non-top-level-actor")) {
+ val testTraceContext = Tracer.withContext(newContext("creating-non-top-level-actor")) {
system.actorOf(Props(new Actor {
def receive: Actor.Receive = {
case any ⇒
context.actorOf(Props(new Actor {
- testActor ! TraceContext.currentContext
+ testActor ! Tracer.currentContext
def receive: Actor.Receive = { case any ⇒ }
}))
}
})) ! "any"
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext)
@@ -74,9 +74,9 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-
"the actor is resumed" in {
val supervisor = supervisorWithDirective(Resume)
- val testTraceContext = TraceContext.withContext(newContext("fail-and-resume")) {
+ val testTraceContext = Tracer.withContext(newContext("fail-and-resume")) {
supervisor ! "fail"
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -89,9 +89,9 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-
"the actor is restarted" in {
val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true)
- val testTraceContext = TraceContext.withContext(newContext("fail-and-restart")) {
+ val testTraceContext = Tracer.withContext(newContext("fail-and-restart")) {
supervisor ! "fail"
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -106,9 +106,9 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-
"the actor is stopped" in {
val supervisor = supervisorWithDirective(Stop, sendPostStop = true)
- val testTraceContext = TraceContext.withContext(newContext("fail-and-stop")) {
+ val testTraceContext = Tracer.withContext(newContext("fail-and-stop")) {
supervisor ! "fail"
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -119,9 +119,9 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-
"the failure is escalated" in {
val supervisor = supervisorWithDirective(Escalate, sendPostStop = true)
- val testTraceContext = TraceContext.withContext(newContext("fail-and-escalate")) {
+ val testTraceContext = Tracer.withContext(newContext("fail-and-escalate")) {
supervisor ! "fail"
- TraceContext.currentContext
+ Tracer.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
@@ -139,7 +139,7 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-
val child = context.actorOf(Props(new Parent))
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
- case NonFatal(throwable) ⇒ testActor ! TraceContext.currentContext; Stop
+ case NonFatal(throwable) ⇒ testActor ! Tracer.currentContext; Stop
}
def receive = {
@@ -151,7 +151,7 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-
val child = context.actorOf(Props(new Child))
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
- case NonFatal(throwable) ⇒ testActor ! TraceContext.currentContext; directive
+ case NonFatal(throwable) ⇒ testActor ! Tracer.currentContext; directive
}
def receive: Actor.Receive = {
@@ -159,7 +159,7 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-
}
override def postStop(): Unit = {
- if (sendPostStop) testActor ! TraceContext.currentContext
+ if (sendPostStop) testActor ! Tracer.currentContext
super.postStop()
}
}
@@ -167,26 +167,26 @@ class ActorSystemMessageInstrumentationSpec extends BaseKamonSpec("actor-system-
class Child extends Actor {
def receive = {
case "fail" ⇒ throw new ArithmeticException("Division by zero.")
- case "context" ⇒ sender ! TraceContext.currentContext
+ case "context" ⇒ sender ! Tracer.currentContext
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
- if (sendPreRestart) testActor ! TraceContext.currentContext
+ if (sendPreRestart) testActor ! Tracer.currentContext
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
- if (sendPostRestart) testActor ! TraceContext.currentContext
+ if (sendPostRestart) testActor ! Tracer.currentContext
super.postRestart(reason)
}
override def postStop(): Unit = {
- if (sendPostStop) testActor ! TraceContext.currentContext
+ if (sendPostStop) testActor ! Tracer.currentContext
super.postStop()
}
override def preStart(): Unit = {
- if (sendPreStart) testActor ! TraceContext.currentContext
+ if (sendPreStart) testActor ! Tracer.currentContext
super.preStart()
}
}
diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala
index d925fbf6..af6ce76e 100644
--- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala
@@ -16,18 +16,16 @@
package kamon.instrumentation.akka
-import java.util.concurrent.atomic.AtomicInteger
-
import akka.actor._
-import akka.event.Logging.Warning
import akka.pattern.ask
-import akka.testkit.TestProbe
+import akka.testkit.EventFilter
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import kamon.Kamon
-import kamon.akka.Akka
+import kamon.akka.AskPatternTimeoutWarningSettings.{ Off, Lightweight, Heavyweight }
+import kamon.akka.{ AskPatternTimeoutWarningSetting, Akka }
import kamon.testkit.BaseKamonSpec
-import kamon.trace.{ TraceContext, TraceContextAware }
+import kamon.trace.Tracer
import scala.concurrent.duration._
@@ -37,6 +35,7 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s
"""
|akka {
| loglevel = OFF
+ | loggers = [akka.testkit.TestEventListener]
|}
""".stripMargin)
@@ -47,87 +46,56 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s
"the AskPatternInstrumentation" when {
"configured in heavyweight mode" should {
- "log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture {
- setAskPatternTimeoutWarningMode("heavyweight")
+ "log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in {
+ val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-1")
+ setAskPatternTimeoutWarningMode(Heavyweight)
- expectTimeoutWarning() {
- TraceContext.withContext(newContext("ask-timeout-warning")) {
+ EventFilter.warning(start = "Timeout triggered for ask pattern to actor [no-reply-1] at").intercept {
+ Tracer.withContext(newContext("ask-timeout-warning")) {
noReplyActorRef ? "hello"
- TraceContext.currentContext
+ Tracer.currentContext
}
}
}
}
"configured in lightweight mode" should {
- "log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture {
- setAskPatternTimeoutWarningMode("lightweight")
+ "log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in {
+ val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-2")
+ setAskPatternTimeoutWarningMode(Lightweight)
- expectTimeoutWarning(messageSizeLimit = Some(1)) {
- TraceContext.withContext(newContext("ask-timeout-warning")) {
+ EventFilter.warning(start = "Timeout triggered for ask pattern to actor [no-reply-2] at").intercept {
+ Tracer.withContext(newContext("ask-timeout-warning")) {
noReplyActorRef ? "hello"
- TraceContext.currentContext
+ Tracer.currentContext
}
}
}
}
"configured in off mode" should {
- "should not log any warning messages" in new NoReplyFixture {
- setAskPatternTimeoutWarningMode("off")
-
- expectTimeoutWarning(expectWarning = false) {
- TraceContext.withContext(newContext("ask-timeout-warning")) {
- noReplyActorRef ? "hello"
- TraceContext.currentContext
+ "should not log any warning messages" in {
+ val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-3")
+ setAskPatternTimeoutWarningMode(Off)
+
+ intercept[AssertionError] { // No message will be logged and the event filter will fail.
+ EventFilter.warning(start = "Timeout triggered for ask pattern to actor", occurrences = 1).intercept {
+ Tracer.withContext(newContext("ask-timeout-warning")) {
+ noReplyActorRef ? "hello"
+ Tracer.currentContext
+ }
}
}
}
}
}
- def expectTimeoutWarning(messageSizeLimit: Option[Int] = None, expectWarning: Boolean = true)(thunk: ⇒ TraceContext): Unit = {
- val listener = warningListener()
- val testTraceContext = thunk
-
- if (expectWarning) {
- val warning = listener.fishForMessage() {
- case Warning(_, _, msg) if msg.toString.startsWith("Timeout triggered for ask pattern registered at") ⇒ true
- case others ⇒ false
- }.asInstanceOf[Warning]
-
- warning.asInstanceOf[TraceContextAware].traceContext should equal(testTraceContext)
- messageSizeLimit.map { messageLimit ⇒
- warning.message.toString.lines.size should be(messageLimit)
- }
- } else {
- listener.expectNoMsg()
- }
- }
-
- def warningListener(): TestProbe = {
- val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[Warning])
- listener
- }
-
- def setAskPatternTimeoutWarningMode(mode: String): Unit = {
+ def setAskPatternTimeoutWarningMode(mode: AskPatternTimeoutWarningSetting): Unit = {
val target = Kamon(Akka)
val field = target.getClass.getDeclaredField("askPatternTimeoutWarning")
field.setAccessible(true)
field.set(target, mode)
}
-
- val fixtureCounter = new AtomicInteger(0)
-
- trait NoReplyFixture {
- def noReplyActorRef: ActorRef = system.actorOf(Props[NoReply], "no-reply-" + fixtureCounter.incrementAndGet())
-
- def noReplyActorSelection: ActorSelection = {
- val target = noReplyActorRef
- system.actorSelection(target.path)
- }
- }
}
class NoReply extends Actor {