aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-04-18 16:11:23 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2015-04-18 16:11:23 +0200
commitf476985fd26b4be2281bb7e68dc916190c681467 (patch)
treec9519cd05c46b27d8c78fb1387784da0f9c32d26
parentbf04eba8d967eb432c3ff825228a523f5393c2f1 (diff)
downloadKamon-f476985fd26b4be2281bb7e68dc916190c681467.tar.gz
Kamon-f476985fd26b4be2281bb7e68dc916190c681467.tar.bz2
Kamon-f476985fd26b4be2281bb7e68dc916190c681467.zip
= akka: clean up the ask pattern warning instrumentation and tests.
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala20
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala77
-rw-r--r--kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala80
3 files changed, 78 insertions, 99 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala
index 5b3d19d4..e84a1030 100644
--- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala
@@ -18,17 +18,33 @@ package kamon.akka
import _root_.akka.actor
import _root_.akka.actor._
+import _root_.akka.event.Logging
+import com.typesafe.config.Config
import kamon._
class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ val log = Logging(system, classOf[AkkaExtension])
val config = system.settings.config.getConfig("kamon.akka")
- val askPatternTimeoutWarning = config.getString("ask-pattern-timeout-warning")
- val dispatcher = system.dispatcher
+
+ val askPatternTimeoutWarning = AskPatternTimeoutWarningSettings.fromConfig(config)
}
object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider {
def lookup(): ExtensionId[_ <: actor.Extension] = Akka
def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system)
+}
+
+sealed trait AskPatternTimeoutWarningSetting
+object AskPatternTimeoutWarningSettings {
+ case object Off extends AskPatternTimeoutWarningSetting
+ case object Lightweight extends AskPatternTimeoutWarningSetting
+ case object Heavyweight extends AskPatternTimeoutWarningSetting
+ def fromConfig(config: Config): AskPatternTimeoutWarningSetting = config.getString("ask-pattern-timeout-warning") match {
+ case "off" ⇒ Off
+ case "lightweight" ⇒ Lightweight
+ case "heavyweight" ⇒ Heavyweight
+ case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.ask-pattern-timeout-warning config.")
+ }
}
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala
index 42edf4db..e64b241e 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala
@@ -18,11 +18,12 @@ package akka.kamon.instrumentation
import akka.util.Timeout
import kamon.Kamon
-import kamon.akka.Akka
-import kamon.trace.Tracer
-import akka.actor.{ InternalActorRef, ActorSystem, ActorRef }
-import akka.event.Logging.Warning
+import kamon.akka.{ AkkaExtension, Akka }
+import kamon.akka.AskPatternTimeoutWarningSettings.{ Heavyweight, Lightweight, Off }
+import akka.actor.{ InternalActorRef, ActorRef }
import akka.pattern.AskTimeoutException
+import kamon.trace.Tracer
+import kamon.util.SameThreadExecutionContext
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
import org.aspectj.lang.reflect.SourceLocation
@@ -39,53 +40,47 @@ class AskPatternInstrumentation {
@Around("askableActorRefAsk(actor, timeout)")
def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef, timeout: Timeout): AnyRef =
- Tracer.currentContext.collect { ctx ⇒
- actor match {
- // the AskPattern will only work for InternalActorRef's with these conditions.
- case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 ⇒
- val akkaExtension = Kamon.extension(Akka)
- val future = pjp.proceed().asInstanceOf[Future[AnyRef]]
- val system = ref.provider.guardian.underlying.system
+ actor match {
+ // the AskPattern will only work for InternalActorRef's with these conditions.
+ case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 && Tracer.currentContext.nonEmpty ⇒
+ val akkaExtension = Kamon.extension(Akka)
+ val future = pjp.proceed().asInstanceOf[Future[AnyRef]]
- val handler = akkaExtension.askPatternTimeoutWarning match {
- case "off" ⇒ None
- case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system))
- case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system))
- }
+ akkaExtension.askPatternTimeoutWarning match {
+ case Off ⇒
+ case Lightweight ⇒ hookLightweightWarning(future, pjp.getSourceLocation, actor, akkaExtension)
+ case Heavyweight ⇒ hookHeavyweightWarning(future, new StackTraceCaptureException, actor, akkaExtension)
+ }
- handler.map(future.onFailure(_)(akkaExtension.dispatcher))
- future
+ future
- case _ ⇒ pjp.proceed() //
- }
+ case _ ⇒ pjp.proceed().asInstanceOf[Future[AnyRef]] //
+ }
+
+ def ifAskTimeoutException(code: ⇒ Unit): PartialFunction[Throwable, Unit] = {
+ case tmo: AskTimeoutException ⇒ code
+ case _ ⇒
+ }
- } getOrElse (pjp.proceed())
+ def hookLightweightWarning(future: Future[AnyRef], sourceLocation: SourceLocation, actor: ActorRef, akkaExtension: AkkaExtension): Unit = {
+ val locationString = Option(sourceLocation)
+ .map(location ⇒ s"${location.getFileName}:${location.getLine}")
+ .getOrElse("<unknown position>")
- def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = {
- case e: AskTimeoutException ⇒
- val message = {
- if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL))
- else callInfo.map(_.message)
- }
- publish(message)
+ future.onFailure(ifAskTimeoutException {
+ akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString)
+ })(SameThreadExecutionContext)
}
- def publish(message: Option[String])(implicit system: ActorSystem) = message map { msg ⇒
- system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation],
- s"Timeout triggered for ask pattern registered at: $msg"))
+ def hookHeavyweightWarning(future: Future[AnyRef], captureException: StackTraceCaptureException, actor: ActorRef, akkaExtension: AkkaExtension): Unit = {
+ val locationString = captureException.getStackTrace.drop(3).mkString("", EOL, EOL)
+
+ future.onFailure(ifAskTimeoutException {
+ akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString)
+ })(SameThreadExecutionContext)
}
}
object AskPatternInstrumentation {
- type ErrorHandler = PartialFunction[Throwable, Unit]
-
class StackTraceCaptureException extends Throwable
-
- case class CallInfo(name: String, sourceLocation: SourceLocation) {
- def message: String = {
- def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("<unknown position>")
- def line: String = s"$name @ $locationInfo"
- s"$line"
- }
- }
} \ No newline at end of file
diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala
index 4268e53d..d9fbc2df 100644
--- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala
+++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala
@@ -16,18 +16,16 @@
package kamon.instrumentation.akka
-import java.util.concurrent.atomic.AtomicInteger
-
import akka.actor._
-import akka.event.Logging.Warning
import akka.pattern.ask
-import akka.testkit.TestProbe
+import akka.testkit.EventFilter
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import kamon.Kamon
-import kamon.akka.Akka
+import kamon.akka.AskPatternTimeoutWarningSettings.{ Off, Lightweight, Heavyweight }
+import kamon.akka.{ AskPatternTimeoutWarningSetting, Akka }
import kamon.testkit.BaseKamonSpec
-import kamon.trace.{ Tracer, TraceContext, TraceContextAware }
+import kamon.trace.Tracer
import scala.concurrent.duration._
@@ -37,6 +35,7 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s
"""
|akka {
| loglevel = OFF
+ | loggers = [akka.testkit.TestEventListener]
|}
""".stripMargin)
@@ -47,10 +46,11 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s
"the AskPatternInstrumentation" when {
"configured in heavyweight mode" should {
- "log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture {
- setAskPatternTimeoutWarningMode("heavyweight")
+ "log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in {
+ val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-1")
+ setAskPatternTimeoutWarningMode(Heavyweight)
- expectTimeoutWarning() {
+ EventFilter.warning(start = "Timeout triggered for ask pattern to actor [no-reply-1] at").intercept {
Tracer.withContext(newContext("ask-timeout-warning")) {
noReplyActorRef ? "hello"
Tracer.currentContext
@@ -60,10 +60,11 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s
}
"configured in lightweight mode" should {
- "log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture {
- setAskPatternTimeoutWarningMode("lightweight")
+ "log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in {
+ val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-2")
+ setAskPatternTimeoutWarningMode(Lightweight)
- expectTimeoutWarning(messageSizeLimit = Some(1)) {
+ EventFilter.warning(start = "Timeout triggered for ask pattern to actor [no-reply-2] at").intercept {
Tracer.withContext(newContext("ask-timeout-warning")) {
noReplyActorRef ? "hello"
Tracer.currentContext
@@ -73,13 +74,16 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s
}
"configured in off mode" should {
- "should not log any warning messages" in new NoReplyFixture {
- setAskPatternTimeoutWarningMode("off")
-
- expectTimeoutWarning(expectWarning = false) {
- Tracer.withContext(newContext("ask-timeout-warning")) {
- noReplyActorRef ? "hello"
- Tracer.currentContext
+ "should not log any warning messages" in {
+ val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-3")
+ setAskPatternTimeoutWarningMode(Off)
+
+ intercept[AssertionError] { // No message will be logged and the event filter will fail.
+ EventFilter.warning(start = "Timeout triggered for ask pattern to actor", occurrences = 1).intercept {
+ Tracer.withContext(newContext("ask-timeout-warning")) {
+ noReplyActorRef ? "hello"
+ Tracer.currentContext
+ }
}
}
}
@@ -88,48 +92,12 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s
override protected def afterAll(): Unit = shutdown()
- def expectTimeoutWarning(messageSizeLimit: Option[Int] = None, expectWarning: Boolean = true)(thunk: ⇒ TraceContext): Unit = {
- val listener = warningListener()
- val testTraceContext = thunk
-
- if (expectWarning) {
- val warning = listener.fishForMessage() {
- case Warning(_, _, msg) if msg.toString.startsWith("Timeout triggered for ask pattern registered at") ⇒ true
- case others ⇒ false
- }.asInstanceOf[Warning]
-
- warning.asInstanceOf[TraceContextAware].traceContext should equal(testTraceContext)
- messageSizeLimit.map { messageLimit ⇒
- warning.message.toString.lines.size should be(messageLimit)
- }
- } else {
- listener.expectNoMsg()
- }
- }
-
- def warningListener(): TestProbe = {
- val listener = TestProbe()
- system.eventStream.subscribe(listener.ref, classOf[Warning])
- listener
- }
-
- def setAskPatternTimeoutWarningMode(mode: String): Unit = {
+ def setAskPatternTimeoutWarningMode(mode: AskPatternTimeoutWarningSetting): Unit = {
val target = Kamon(Akka)
val field = target.getClass.getDeclaredField("askPatternTimeoutWarning")
field.setAccessible(true)
field.set(target, mode)
}
-
- val fixtureCounter = new AtomicInteger(0)
-
- trait NoReplyFixture {
- def noReplyActorRef: ActorRef = system.actorOf(Props[NoReply], "no-reply-" + fixtureCounter.incrementAndGet())
-
- def noReplyActorSelection: ActorSelection = {
- val target = noReplyActorRef
- system.actorSelection(target.path)
- }
- }
}
class NoReply extends Actor {