diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-04-18 16:11:23 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-04-18 16:11:23 +0200 |
commit | f476985fd26b4be2281bb7e68dc916190c681467 (patch) | |
tree | c9519cd05c46b27d8c78fb1387784da0f9c32d26 | |
parent | bf04eba8d967eb432c3ff825228a523f5393c2f1 (diff) | |
download | Kamon-f476985fd26b4be2281bb7e68dc916190c681467.tar.gz Kamon-f476985fd26b4be2281bb7e68dc916190c681467.tar.bz2 Kamon-f476985fd26b4be2281bb7e68dc916190c681467.zip |
= akka: clean up the ask pattern warning instrumentation and tests.
3 files changed, 78 insertions, 99 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala index 5b3d19d4..e84a1030 100644 --- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -18,17 +18,33 @@ package kamon.akka import _root_.akka.actor import _root_.akka.actor._ +import _root_.akka.event.Logging +import com.typesafe.config.Config import kamon._ class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val log = Logging(system, classOf[AkkaExtension]) val config = system.settings.config.getConfig("kamon.akka") - val askPatternTimeoutWarning = config.getString("ask-pattern-timeout-warning") - val dispatcher = system.dispatcher + + val askPatternTimeoutWarning = AskPatternTimeoutWarningSettings.fromConfig(config) } object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Akka def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) +} + +sealed trait AskPatternTimeoutWarningSetting +object AskPatternTimeoutWarningSettings { + case object Off extends AskPatternTimeoutWarningSetting + case object Lightweight extends AskPatternTimeoutWarningSetting + case object Heavyweight extends AskPatternTimeoutWarningSetting + def fromConfig(config: Config): AskPatternTimeoutWarningSetting = config.getString("ask-pattern-timeout-warning") match { + case "off" ⇒ Off + case "lightweight" ⇒ Lightweight + case "heavyweight" ⇒ Heavyweight + case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.ask-pattern-timeout-warning config.") + } } diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala index 42edf4db..e64b241e 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala @@ -18,11 +18,12 @@ package akka.kamon.instrumentation import akka.util.Timeout import kamon.Kamon -import kamon.akka.Akka -import kamon.trace.Tracer -import akka.actor.{ InternalActorRef, ActorSystem, ActorRef } -import akka.event.Logging.Warning +import kamon.akka.{ AkkaExtension, Akka } +import kamon.akka.AskPatternTimeoutWarningSettings.{ Heavyweight, Lightweight, Off } +import akka.actor.{ InternalActorRef, ActorRef } import akka.pattern.AskTimeoutException +import kamon.trace.Tracer +import kamon.util.SameThreadExecutionContext import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ import org.aspectj.lang.reflect.SourceLocation @@ -39,53 +40,47 @@ class AskPatternInstrumentation { @Around("askableActorRefAsk(actor, timeout)") def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef, timeout: Timeout): AnyRef = - Tracer.currentContext.collect { ctx ⇒ - actor match { - // the AskPattern will only work for InternalActorRef's with these conditions. - case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 ⇒ - val akkaExtension = Kamon.extension(Akka) - val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - val system = ref.provider.guardian.underlying.system + actor match { + // the AskPattern will only work for InternalActorRef's with these conditions. + case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 && Tracer.currentContext.nonEmpty ⇒ + val akkaExtension = Kamon.extension(Akka) + val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - val handler = akkaExtension.askPatternTimeoutWarning match { - case "off" ⇒ None - case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) - case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) - } + akkaExtension.askPatternTimeoutWarning match { + case Off ⇒ + case Lightweight ⇒ hookLightweightWarning(future, pjp.getSourceLocation, actor, akkaExtension) + case Heavyweight ⇒ hookHeavyweightWarning(future, new StackTraceCaptureException, actor, akkaExtension) + } - handler.map(future.onFailure(_)(akkaExtension.dispatcher)) - future + future - case _ ⇒ pjp.proceed() // - } + case _ ⇒ pjp.proceed().asInstanceOf[Future[AnyRef]] // + } + + def ifAskTimeoutException(code: ⇒ Unit): PartialFunction[Throwable, Unit] = { + case tmo: AskTimeoutException ⇒ code + case _ ⇒ + } - } getOrElse (pjp.proceed()) + def hookLightweightWarning(future: Future[AnyRef], sourceLocation: SourceLocation, actor: ActorRef, akkaExtension: AkkaExtension): Unit = { + val locationString = Option(sourceLocation) + .map(location ⇒ s"${location.getFileName}:${location.getLine}") + .getOrElse("<unknown position>") - def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = { - case e: AskTimeoutException ⇒ - val message = { - if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL)) - else callInfo.map(_.message) - } - publish(message) + future.onFailure(ifAskTimeoutException { + akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString) + })(SameThreadExecutionContext) } - def publish(message: Option[String])(implicit system: ActorSystem) = message map { msg ⇒ - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], - s"Timeout triggered for ask pattern registered at: $msg")) + def hookHeavyweightWarning(future: Future[AnyRef], captureException: StackTraceCaptureException, actor: ActorRef, akkaExtension: AkkaExtension): Unit = { + val locationString = captureException.getStackTrace.drop(3).mkString("", EOL, EOL) + + future.onFailure(ifAskTimeoutException { + akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString) + })(SameThreadExecutionContext) } } object AskPatternInstrumentation { - type ErrorHandler = PartialFunction[Throwable, Unit] - class StackTraceCaptureException extends Throwable - - case class CallInfo(name: String, sourceLocation: SourceLocation) { - def message: String = { - def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("<unknown position>") - def line: String = s"$name @ $locationInfo" - s"$line" - } - } }
\ No newline at end of file diff --git a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala index 4268e53d..d9fbc2df 100644 --- a/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala +++ b/kamon-akka/src/test/scala/kamon/akka/instrumentation/AskPatternInstrumentationSpec.scala @@ -16,18 +16,16 @@ package kamon.instrumentation.akka -import java.util.concurrent.atomic.AtomicInteger - import akka.actor._ -import akka.event.Logging.Warning import akka.pattern.ask -import akka.testkit.TestProbe +import akka.testkit.EventFilter import akka.util.Timeout import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.akka.Akka +import kamon.akka.AskPatternTimeoutWarningSettings.{ Off, Lightweight, Heavyweight } +import kamon.akka.{ AskPatternTimeoutWarningSetting, Akka } import kamon.testkit.BaseKamonSpec -import kamon.trace.{ Tracer, TraceContext, TraceContextAware } +import kamon.trace.Tracer import scala.concurrent.duration._ @@ -37,6 +35,7 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s """ |akka { | loglevel = OFF + | loggers = [akka.testkit.TestEventListener] |} """.stripMargin) @@ -47,10 +46,11 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s "the AskPatternInstrumentation" when { "configured in heavyweight mode" should { - "log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture { - setAskPatternTimeoutWarningMode("heavyweight") + "log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in { + val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-1") + setAskPatternTimeoutWarningMode(Heavyweight) - expectTimeoutWarning() { + EventFilter.warning(start = "Timeout triggered for ask pattern to actor [no-reply-1] at").intercept { Tracer.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" Tracer.currentContext @@ -60,10 +60,11 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s } "configured in lightweight mode" should { - "log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture { - setAskPatternTimeoutWarningMode("lightweight") + "log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in { + val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-2") + setAskPatternTimeoutWarningMode(Lightweight) - expectTimeoutWarning(messageSizeLimit = Some(1)) { + EventFilter.warning(start = "Timeout triggered for ask pattern to actor [no-reply-2] at").intercept { Tracer.withContext(newContext("ask-timeout-warning")) { noReplyActorRef ? "hello" Tracer.currentContext @@ -73,13 +74,16 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s } "configured in off mode" should { - "should not log any warning messages" in new NoReplyFixture { - setAskPatternTimeoutWarningMode("off") - - expectTimeoutWarning(expectWarning = false) { - Tracer.withContext(newContext("ask-timeout-warning")) { - noReplyActorRef ? "hello" - Tracer.currentContext + "should not log any warning messages" in { + val noReplyActorRef = system.actorOf(Props[NoReply], "no-reply-3") + setAskPatternTimeoutWarningMode(Off) + + intercept[AssertionError] { // No message will be logged and the event filter will fail. + EventFilter.warning(start = "Timeout triggered for ask pattern to actor", occurrences = 1).intercept { + Tracer.withContext(newContext("ask-timeout-warning")) { + noReplyActorRef ? "hello" + Tracer.currentContext + } } } } @@ -88,48 +92,12 @@ class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-s override protected def afterAll(): Unit = shutdown() - def expectTimeoutWarning(messageSizeLimit: Option[Int] = None, expectWarning: Boolean = true)(thunk: ⇒ TraceContext): Unit = { - val listener = warningListener() - val testTraceContext = thunk - - if (expectWarning) { - val warning = listener.fishForMessage() { - case Warning(_, _, msg) if msg.toString.startsWith("Timeout triggered for ask pattern registered at") ⇒ true - case others ⇒ false - }.asInstanceOf[Warning] - - warning.asInstanceOf[TraceContextAware].traceContext should equal(testTraceContext) - messageSizeLimit.map { messageLimit ⇒ - warning.message.toString.lines.size should be(messageLimit) - } - } else { - listener.expectNoMsg() - } - } - - def warningListener(): TestProbe = { - val listener = TestProbe() - system.eventStream.subscribe(listener.ref, classOf[Warning]) - listener - } - - def setAskPatternTimeoutWarningMode(mode: String): Unit = { + def setAskPatternTimeoutWarningMode(mode: AskPatternTimeoutWarningSetting): Unit = { val target = Kamon(Akka) val field = target.getClass.getDeclaredField("askPatternTimeoutWarning") field.setAccessible(true) field.set(target, mode) } - - val fixtureCounter = new AtomicInteger(0) - - trait NoReplyFixture { - def noReplyActorRef: ActorRef = system.actorOf(Props[NoReply], "no-reply-" + fixtureCounter.incrementAndGet()) - - def noReplyActorSelection: ActorSelection = { - val target = noReplyActorRef - system.actorSelection(target.path) - } - } } class NoReply extends Actor { |