From 7208963c97d89c2b9835dae4c400611d222f94bb Mon Sep 17 00:00:00 2001 From: Diego Date: Sat, 13 Dec 2014 17:14:41 -0300 Subject: + core: introduce an alternative way for tracing the Akka.ask timeouts --- kamon-core/src/main/resources/reference.conf | 3 ++ .../akka/AskPatternInstrumentation.scala | 53 +++++++++++++--------- .../main/scala/kamon/trace/TraceExtension.scala | 2 + .../akka/AskPatternInstrumentationSpec.scala | 2 +- .../metric/TickMetricSnapshotBufferSpec.scala | 2 +- 5 files changed, 38 insertions(+), 24 deletions(-) (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index b9039b73..b968af36 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -176,6 +176,9 @@ kamon { # pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment # the future was created. ask-pattern-tracing = off + + # Default dispatcher for all trace operations + dispatcher = ${kamon.default-dispatcher} } kamon-dispatcher { diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala index ebc204a5..5bfa2467 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala @@ -16,40 +16,49 @@ package akka.kamon.instrumentation -import akka.actor.ActorRefProvider -import akka.event.Logging.Warning -import akka.pattern.{ AskTimeoutException, PromiseActorRef } import kamon.Kamon -import kamon.trace.Trace -import org.aspectj.lang.annotation.{ AfterReturning, Aspect, Pointcut } - -import scala.compat.Platform.EOL +import kamon.trace.{ Trace, TraceContextAware } +import akka.actor.ActorRef +import akka.event.Logging.Warning +import akka.pattern.AskTimeoutException +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ +import org.aspectj.lang.reflect.SourceLocation +import scala.concurrent.Future @Aspect class AskPatternInstrumentation { - class StackTraceCaptureException extends Throwable + @DeclareMixin("akka.pattern.AskableActorRef$") + def mixinContextAwareToAskableActorRef: TraceContextAware = TraceContextAware.default - @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *, *)", argNames = "provider") - def promiseActorRefApply(provider: ActorRefProvider): Unit = {} + @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && target(traceContext) && args(actor, *, *)") + def askableActorRefAsk(traceContext: TraceContextAware, actor: ActorRef): Unit = {} - @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor") - def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = { - val system = promiseActor.provider.guardian.underlying.system + @Around("askableActorRefAsk(traceContext,actor)") + def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, traceContext: TraceContextAware, actor: ActorRef): AnyRef = { + val callInfo = CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation) + val system = traceContext.traceContext.system val traceExtension = Kamon(Trace)(system) - if (traceExtension.enableAskPatternTracing) { - val future = promiseActor.result.future - implicit val ec = system.dispatcher - val stack = new StackTraceCaptureException + val future = pjp.proceed().asInstanceOf[Future[AnyRef]] - future onFailure { + if (traceExtension.enableAskPatternTracing) { + future.onFailure { case timeout: AskTimeoutException ⇒ - val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) - system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation], - "Timeout triggered for ask pattern registered at: " + stackString)) - } + s"Timeout triggered for ask pattern registered at: ${callInfo.getMessage}")) + }(traceExtension.defaultDispatcher) + } + future + } + + case class CallInfo(name: String, sourceLocation: SourceLocation) { + def getMessage: String = { + def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("") + def line: String = s"$name @ $locationInfo" + s"$line" } } } +//"""Timeout triggered for actor [actor-que-pregunta] asking [actor-al-que-le-pregunta] @ [source location]"?""" \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala index 7d734b8c..e58c9369 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala @@ -28,6 +28,7 @@ import kamon.util.GlobPathFilter class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config.getConfig("kamon.trace") val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing") + val defaultDispatcher = system.dispatchers.lookup(config.getString("dispatcher")) val detailLevel: LevelOfDetail = config.getString("level") match { case "metrics-only" ⇒ LevelOfDetail.MetricsOnly @@ -75,6 +76,7 @@ class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { incubator ! trace else subscriptions ! trace.generateTraceInfo + } object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala index 17312ba3..e712e3ea 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala @@ -41,7 +41,7 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M "log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in { implicit val ec = system.dispatcher implicit val timeout = Timeout(10 milliseconds) - val noReply = system.actorOf(Props[NoReply]) + val noReply = system.actorOf(Props[NoReply], "NoReply") system.eventStream.subscribe(testActor, classOf[Warning]) val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") { diff --git a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala index 44d19f98..a9197ab5 100644 --- a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala @@ -17,7 +17,7 @@ package kamon.metric import com.typesafe.config.ConfigFactory -import kamon.{MilliTimestamp, Kamon} +import kamon.{ MilliTimestamp, Kamon } import kamon.metric.instrument.Histogram import kamon.metric.instrument.Histogram.MutableRecord import org.scalatest.{ Matchers, WordSpecLike } -- cgit v1.2.3