diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-04-18 16:11:23 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-04-18 16:11:23 +0200 |
commit | f476985fd26b4be2281bb7e68dc916190c681467 (patch) | |
tree | c9519cd05c46b27d8c78fb1387784da0f9c32d26 /kamon-akka/src/main/scala | |
parent | bf04eba8d967eb432c3ff825228a523f5393c2f1 (diff) | |
download | Kamon-f476985fd26b4be2281bb7e68dc916190c681467.tar.gz Kamon-f476985fd26b4be2281bb7e68dc916190c681467.tar.bz2 Kamon-f476985fd26b4be2281bb7e68dc916190c681467.zip |
= akka: clean up the ask pattern warning instrumentation and tests.
Diffstat (limited to 'kamon-akka/src/main/scala')
-rw-r--r-- | kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala | 20 | ||||
-rw-r--r-- | kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala | 77 |
2 files changed, 54 insertions, 43 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala index 5b3d19d4..e84a1030 100644 --- a/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala +++ b/kamon-akka/src/main/scala/kamon/akka/AkkaExtension.scala @@ -18,17 +18,33 @@ package kamon.akka import _root_.akka.actor import _root_.akka.actor._ +import _root_.akka.event.Logging +import com.typesafe.config.Config import kamon._ class AkkaExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val log = Logging(system, classOf[AkkaExtension]) val config = system.settings.config.getConfig("kamon.akka") - val askPatternTimeoutWarning = config.getString("ask-pattern-timeout-warning") - val dispatcher = system.dispatcher + + val askPatternTimeoutWarning = AskPatternTimeoutWarningSettings.fromConfig(config) } object Akka extends ExtensionId[AkkaExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Akka def createExtension(system: ExtendedActorSystem): AkkaExtension = new AkkaExtension(system) +} + +sealed trait AskPatternTimeoutWarningSetting +object AskPatternTimeoutWarningSettings { + case object Off extends AskPatternTimeoutWarningSetting + case object Lightweight extends AskPatternTimeoutWarningSetting + case object Heavyweight extends AskPatternTimeoutWarningSetting + def fromConfig(config: Config): AskPatternTimeoutWarningSetting = config.getString("ask-pattern-timeout-warning") match { + case "off" ⇒ Off + case "lightweight" ⇒ Lightweight + case "heavyweight" ⇒ Heavyweight + case other ⇒ sys.error(s"Unrecognized option [$other] for the kamon.akka.ask-pattern-timeout-warning config.") + } } diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala index 42edf4db..e64b241e 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala @@ -18,11 +18,12 @@ package akka.kamon.instrumentation import akka.util.Timeout import kamon.Kamon -import kamon.akka.Akka -import kamon.trace.Tracer -import akka.actor.{ InternalActorRef, ActorSystem, ActorRef } -import akka.event.Logging.Warning +import kamon.akka.{ AkkaExtension, Akka } +import kamon.akka.AskPatternTimeoutWarningSettings.{ Heavyweight, Lightweight, Off } +import akka.actor.{ InternalActorRef, ActorRef } import akka.pattern.AskTimeoutException +import kamon.trace.Tracer +import kamon.util.SameThreadExecutionContext import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ import org.aspectj.lang.reflect.SourceLocation @@ -39,53 +40,47 @@ class AskPatternInstrumentation { @Around("askableActorRefAsk(actor, timeout)") def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef, timeout: Timeout): AnyRef = - Tracer.currentContext.collect { ctx ⇒ - actor match { - // the AskPattern will only work for InternalActorRef's with these conditions. - case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 ⇒ - val akkaExtension = Kamon.extension(Akka) - val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - val system = ref.provider.guardian.underlying.system + actor match { + // the AskPattern will only work for InternalActorRef's with these conditions. + case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 && Tracer.currentContext.nonEmpty ⇒ + val akkaExtension = Kamon.extension(Akka) + val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - val handler = akkaExtension.askPatternTimeoutWarning match { - case "off" ⇒ None - case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system)) - case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system)) - } + akkaExtension.askPatternTimeoutWarning match { + case Off ⇒ + case Lightweight ⇒ hookLightweightWarning(future, pjp.getSourceLocation, actor, akkaExtension) + case Heavyweight ⇒ hookHeavyweightWarning(future, new StackTraceCaptureException, actor, akkaExtension) + } - handler.map(future.onFailure(_)(akkaExtension.dispatcher)) - future + future - case _ ⇒ pjp.proceed() // - } + case _ ⇒ pjp.proceed().asInstanceOf[Future[AnyRef]] // + } + + def ifAskTimeoutException(code: ⇒ Unit): PartialFunction[Throwable, Unit] = { + case tmo: AskTimeoutException ⇒ code + case _ ⇒ + } - } getOrElse (pjp.proceed()) + def hookLightweightWarning(future: Future[AnyRef], sourceLocation: SourceLocation, actor: ActorRef, akkaExtension: AkkaExtension): Unit = { + val locationString = Option(sourceLocation) + .map(location ⇒ s"${location.getFileName}:${location.getLine}") + .getOrElse("<unknown position>") - def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = { - case e: AskTimeoutException ⇒ - val message = { - if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL)) - else callInfo.map(_.message) - } - publish(message) + future.onFailure(ifAskTimeoutException { + akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString) + })(SameThreadExecutionContext) } - def publish(message: Option[String])(implicit system: ActorSystem) = message map { msg ⇒ - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], - s"Timeout triggered for ask pattern registered at: $msg")) + def hookHeavyweightWarning(future: Future[AnyRef], captureException: StackTraceCaptureException, actor: ActorRef, akkaExtension: AkkaExtension): Unit = { + val locationString = captureException.getStackTrace.drop(3).mkString("", EOL, EOL) + + future.onFailure(ifAskTimeoutException { + akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString) + })(SameThreadExecutionContext) } } object AskPatternInstrumentation { - type ErrorHandler = PartialFunction[Throwable, Unit] - class StackTraceCaptureException extends Throwable - - case class CallInfo(name: String, sourceLocation: SourceLocation) { - def message: String = { - def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("<unknown position>") - def line: String = s"$name @ $locationInfo" - s"$line" - } - } }
\ No newline at end of file |