aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src/main/scala/kamon/akka/instrumentation
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-04-18 16:11:23 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2015-04-18 16:11:23 +0200
commitf476985fd26b4be2281bb7e68dc916190c681467 (patch)
treec9519cd05c46b27d8c78fb1387784da0f9c32d26 /kamon-akka/src/main/scala/kamon/akka/instrumentation
parentbf04eba8d967eb432c3ff825228a523f5393c2f1 (diff)
downloadKamon-f476985fd26b4be2281bb7e68dc916190c681467.tar.gz
Kamon-f476985fd26b4be2281bb7e68dc916190c681467.tar.bz2
Kamon-f476985fd26b4be2281bb7e68dc916190c681467.zip
= akka: clean up the ask pattern warning instrumentation and tests.
Diffstat (limited to 'kamon-akka/src/main/scala/kamon/akka/instrumentation')
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala77
1 files changed, 36 insertions, 41 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala
index 42edf4db..e64b241e 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/AskPatternInstrumentation.scala
@@ -18,11 +18,12 @@ package akka.kamon.instrumentation
import akka.util.Timeout
import kamon.Kamon
-import kamon.akka.Akka
-import kamon.trace.Tracer
-import akka.actor.{ InternalActorRef, ActorSystem, ActorRef }
-import akka.event.Logging.Warning
+import kamon.akka.{ AkkaExtension, Akka }
+import kamon.akka.AskPatternTimeoutWarningSettings.{ Heavyweight, Lightweight, Off }
+import akka.actor.{ InternalActorRef, ActorRef }
import akka.pattern.AskTimeoutException
+import kamon.trace.Tracer
+import kamon.util.SameThreadExecutionContext
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
import org.aspectj.lang.reflect.SourceLocation
@@ -39,53 +40,47 @@ class AskPatternInstrumentation {
@Around("askableActorRefAsk(actor, timeout)")
def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, actor: ActorRef, timeout: Timeout): AnyRef =
- Tracer.currentContext.collect { ctx ⇒
- actor match {
- // the AskPattern will only work for InternalActorRef's with these conditions.
- case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 ⇒
- val akkaExtension = Kamon.extension(Akka)
- val future = pjp.proceed().asInstanceOf[Future[AnyRef]]
- val system = ref.provider.guardian.underlying.system
+ actor match {
+ // the AskPattern will only work for InternalActorRef's with these conditions.
+ case ref: InternalActorRef if !ref.isTerminated && timeout.duration.length > 0 && Tracer.currentContext.nonEmpty ⇒
+ val akkaExtension = Kamon.extension(Akka)
+ val future = pjp.proceed().asInstanceOf[Future[AnyRef]]
- val handler = akkaExtension.askPatternTimeoutWarning match {
- case "off" ⇒ None
- case "lightweight" ⇒ Some(errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)))(system))
- case "heavyweight" ⇒ Some(errorHandler(stack = Some(new StackTraceCaptureException))(system))
- }
+ akkaExtension.askPatternTimeoutWarning match {
+ case Off ⇒
+ case Lightweight ⇒ hookLightweightWarning(future, pjp.getSourceLocation, actor, akkaExtension)
+ case Heavyweight ⇒ hookHeavyweightWarning(future, new StackTraceCaptureException, actor, akkaExtension)
+ }
- handler.map(future.onFailure(_)(akkaExtension.dispatcher))
- future
+ future
- case _ ⇒ pjp.proceed() //
- }
+ case _ ⇒ pjp.proceed().asInstanceOf[Future[AnyRef]] //
+ }
+
+ def ifAskTimeoutException(code: ⇒ Unit): PartialFunction[Throwable, Unit] = {
+ case tmo: AskTimeoutException ⇒ code
+ case _ ⇒
+ }
- } getOrElse (pjp.proceed())
+ def hookLightweightWarning(future: Future[AnyRef], sourceLocation: SourceLocation, actor: ActorRef, akkaExtension: AkkaExtension): Unit = {
+ val locationString = Option(sourceLocation)
+ .map(location ⇒ s"${location.getFileName}:${location.getLine}")
+ .getOrElse("<unknown position>")
- def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = {
- case e: AskTimeoutException ⇒
- val message = {
- if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL))
- else callInfo.map(_.message)
- }
- publish(message)
+ future.onFailure(ifAskTimeoutException {
+ akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString)
+ })(SameThreadExecutionContext)
}
- def publish(message: Option[String])(implicit system: ActorSystem) = message map { msg ⇒
- system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation],
- s"Timeout triggered for ask pattern registered at: $msg"))
+ def hookHeavyweightWarning(future: Future[AnyRef], captureException: StackTraceCaptureException, actor: ActorRef, akkaExtension: AkkaExtension): Unit = {
+ val locationString = captureException.getStackTrace.drop(3).mkString("", EOL, EOL)
+
+ future.onFailure(ifAskTimeoutException {
+ akkaExtension.log.warning("Timeout triggered for ask pattern to actor [{}] at [{}]", actor.path.name, locationString)
+ })(SameThreadExecutionContext)
}
}
object AskPatternInstrumentation {
- type ErrorHandler = PartialFunction[Throwable, Unit]
-
class StackTraceCaptureException extends Throwable
-
- case class CallInfo(name: String, sourceLocation: SourceLocation) {
- def message: String = {
- def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("<unknown position>")
- def line: String = s"$name @ $locationInfo"
- s"$line"
- }
- }
} \ No newline at end of file