diff options
6 files changed, 47 insertions, 22 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index b968af36..df5534a4 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -173,8 +173,11 @@ kamon { } # If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask` - # pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment - # the future was created. + # pattern fails with a `AskTimeoutException` and the log message will contain information depending of the strategy selected. + # strategies: + # - off: nothing to do. + # - lightweight: logs the warning when a timeout is reached using org.aspectj.lang.reflect.SourceLocation. + # - heavyweight: logs the warning when a timeout is reached using a stack trace captured at the moment the future was created. ask-pattern-tracing = off # Default dispatcher for all trace operations diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala index 5bfa2467..efce169d 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala @@ -18,47 +18,69 @@ package akka.kamon.instrumentation import kamon.Kamon import kamon.trace.{ Trace, TraceContextAware } -import akka.actor.ActorRef +import akka.actor.{ ActorSystem, ActorRef } import akka.event.Logging.Warning import akka.pattern.AskTimeoutException import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ import org.aspectj.lang.reflect.SourceLocation import scala.concurrent.Future +import scala.compat.Platform.EOL @Aspect class AskPatternInstrumentation { + import AskPatternInstrumentation._ + @DeclareMixin("akka.pattern.AskableActorRef$") def mixinContextAwareToAskableActorRef: TraceContextAware = TraceContextAware.default - @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && target(traceContext) && args(actor, *, *)") - def askableActorRefAsk(traceContext: TraceContextAware, actor: ActorRef): Unit = {} + @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && target(ctx) && args(actor, *, *)") + def askableActorRefAsk(ctx: TraceContextAware, actor: ActorRef): Unit = {} - @Around("askableActorRefAsk(traceContext,actor)") - def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, traceContext: TraceContextAware, actor: ActorRef): AnyRef = { - val callInfo = CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation) - val system = traceContext.traceContext.system + @Around("askableActorRefAsk(ctx, actor)") + def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, ctx: TraceContextAware, actor: ActorRef): AnyRef = { + implicit val system = ctx.traceContext.system val traceExtension = Kamon(Trace)(system) val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - if (traceExtension.enableAskPatternTracing) { - future.onFailure { - case timeout: AskTimeoutException ⇒ - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], - s"Timeout triggered for ask pattern registered at: ${callInfo.getMessage}")) - }(traceExtension.defaultDispatcher) + val handler = traceExtension.askPatternTracing match { + case "off" ⇒ errorHandler() + case "lightweight" ⇒ errorHandler(callInfo = Some(CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation))) + case "heavyweight" ⇒ errorHandler(stack = Some(new StackTraceCaptureException)) } + + future.onFailure(handler)(traceExtension.defaultDispatcher) future } + def errorHandler(callInfo: Option[CallInfo] = None, stack: Option[StackTraceCaptureException] = None)(implicit system: ActorSystem): ErrorHandler = { + case e: AskTimeoutException ⇒ + val message = { + if (stack.isDefined) stack.map(s ⇒ s.getStackTrace.drop(3).mkString("", EOL, EOL)) + else callInfo.map(_.message) + } + publish(message) + } + + def publish(message: Option[String])(implicit system: ActorSystem) = message map { + msg ⇒ + system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], + s"Timeout triggered for ask pattern registered at: $msg")) + } +} + +object AskPatternInstrumentation { + type ErrorHandler = PartialFunction[Throwable, Unit] + + class StackTraceCaptureException extends Throwable + case class CallInfo(name: String, sourceLocation: SourceLocation) { - def getMessage: String = { + def message: String = { def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("<unknown position>") def line: String = s"$name @ $locationInfo" s"$line" } } -} -//"""Timeout triggered for actor [actor-que-pregunta] asking [actor-al-que-le-pregunta] @ [source location]"?"""
\ No newline at end of file +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala index e58c9369..c227ce58 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala @@ -27,7 +27,7 @@ import kamon.util.GlobPathFilter class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.trace") - val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing") + val askPatternTracing = config.getString("ask-pattern-tracing") val defaultDispatcher = system.dispatchers.lookup(config.getString("dispatcher")) val detailLevel: LevelOfDetail = config.getString("level") match { diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala index e712e3ea..e0c861cb 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala @@ -32,7 +32,7 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M """ |kamon { | trace { - | ask-pattern-tracing = on + | ask-pattern-tracing = heavyweight | } |} """.stripMargin)) diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala index db2d7831..f910489a 100644 --- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala +++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala @@ -18,7 +18,7 @@ package kamon.datadog import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.{ Props, ActorRef, ActorSystem } -import kamon.{MilliTimestamp, Kamon} +import kamon.{ MilliTimestamp, Kamon } import kamon.metric.instrument.Histogram.Precision import kamon.metric.instrument.{ Counter, Histogram, HdrHistogram, LongAdderCounter } import org.scalatest.{ Matchers, WordSpecLike } diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala index 608b5c58..6c77f321 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala @@ -18,7 +18,7 @@ package kamon.statsd import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.{ ActorRef, Props, ActorSystem } -import kamon.{MilliTimestamp, Kamon} +import kamon.{ MilliTimestamp, Kamon } import kamon.metric.instrument.Histogram.Precision import kamon.metric.instrument.Histogram import org.scalatest.{ Matchers, WordSpecLike } |