aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-12-13 17:14:41 -0300
committerDiego <diegolparra@gmail.com>2014-12-13 17:14:41 -0300
commit7e0de7001c6b86168cef1607bce198b9c5c81caa (patch)
treea3f6c7b61254170d698e34217d6500c45d25ba8b
parent7671d4cf77c64db47e4e6cf59f6337d6d483eee3 (diff)
downloadKamon-7e0de7001c6b86168cef1607bce198b9c5c81caa.tar.gz
Kamon-7e0de7001c6b86168cef1607bce198b9c5c81caa.tar.bz2
Kamon-7e0de7001c6b86168cef1607bce198b9c5c81caa.zip
+ core: introduce an alternative way for tracing the Akka.ask timeouts
-rw-r--r--kamon-core/src/main/resources/reference.conf3
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala53
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceExtension.scala2
-rw-r--r--kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala2
-rw-r--r--kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala2
5 files changed, 38 insertions, 24 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index b9039b73..b968af36 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -176,6 +176,9 @@ kamon {
# pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment
# the future was created.
ask-pattern-tracing = off
+
+ # Default dispatcher for all trace operations
+ dispatcher = ${kamon.default-dispatcher}
}
kamon-dispatcher {
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala
index ebc204a5..5bfa2467 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/AskPatternInstrumentation.scala
@@ -16,40 +16,49 @@
package akka.kamon.instrumentation
-import akka.actor.ActorRefProvider
-import akka.event.Logging.Warning
-import akka.pattern.{ AskTimeoutException, PromiseActorRef }
import kamon.Kamon
-import kamon.trace.Trace
-import org.aspectj.lang.annotation.{ AfterReturning, Aspect, Pointcut }
-
-import scala.compat.Platform.EOL
+import kamon.trace.{ Trace, TraceContextAware }
+import akka.actor.ActorRef
+import akka.event.Logging.Warning
+import akka.pattern.AskTimeoutException
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.reflect.SourceLocation
+import scala.concurrent.Future
@Aspect
class AskPatternInstrumentation {
- class StackTraceCaptureException extends Throwable
+ @DeclareMixin("akka.pattern.AskableActorRef$")
+ def mixinContextAwareToAskableActorRef: TraceContextAware = TraceContextAware.default
- @Pointcut(value = "execution(* akka.pattern.PromiseActorRef$.apply(..)) && args(provider, *, *)", argNames = "provider")
- def promiseActorRefApply(provider: ActorRefProvider): Unit = {}
+ @Pointcut("call(* akka.pattern.AskableActorRef$.$qmark$extension(..)) && target(traceContext) && args(actor, *, *)")
+ def askableActorRefAsk(traceContext: TraceContextAware, actor: ActorRef): Unit = {}
- @AfterReturning(pointcut = "promiseActorRefApply(provider)", returning = "promiseActor")
- def hookAskTimeoutWarning(provider: ActorRefProvider, promiseActor: PromiseActorRef): Unit = {
- val system = promiseActor.provider.guardian.underlying.system
+ @Around("askableActorRefAsk(traceContext,actor)")
+ def hookAskTimeoutWarning(pjp: ProceedingJoinPoint, traceContext: TraceContextAware, actor: ActorRef): AnyRef = {
+ val callInfo = CallInfo(s"${actor.path.name} ?", pjp.getSourceLocation)
+ val system = traceContext.traceContext.system
val traceExtension = Kamon(Trace)(system)
- if (traceExtension.enableAskPatternTracing) {
- val future = promiseActor.result.future
- implicit val ec = system.dispatcher
- val stack = new StackTraceCaptureException
+ val future = pjp.proceed().asInstanceOf[Future[AnyRef]]
- future onFailure {
+ if (traceExtension.enableAskPatternTracing) {
+ future.onFailure {
case timeout: AskTimeoutException ⇒
- val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL)
-
system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternInstrumentation],
- "Timeout triggered for ask pattern registered at: " + stackString))
- }
+ s"Timeout triggered for ask pattern registered at: ${callInfo.getMessage}"))
+ }(traceExtension.defaultDispatcher)
+ }
+ future
+ }
+
+ case class CallInfo(name: String, sourceLocation: SourceLocation) {
+ def getMessage: String = {
+ def locationInfo: String = Option(sourceLocation).map(location ⇒ s"${location.getFileName}:${location.getLine}").getOrElse("<unknown position>")
+ def line: String = s"$name @ $locationInfo"
+ s"$line"
}
}
}
+//"""Timeout triggered for actor [actor-que-pregunta] asking [actor-al-que-le-pregunta] @ [source location]"?""" \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
index 7d734b8c..e58c9369 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
@@ -28,6 +28,7 @@ import kamon.util.GlobPathFilter
class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val config = system.settings.config.getConfig("kamon.trace")
val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing")
+ val defaultDispatcher = system.dispatchers.lookup(config.getString("dispatcher"))
val detailLevel: LevelOfDetail = config.getString("level") match {
case "metrics-only" ⇒ LevelOfDetail.MetricsOnly
@@ -75,6 +76,7 @@ class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
incubator ! trace
else
subscriptions ! trace.generateTraceInfo
+
}
object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
diff --git a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala
index 17312ba3..e712e3ea 100644
--- a/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala
+++ b/kamon-core/src/test/scala/kamon/instrumentation/akka/AskPatternInstrumentationSpec.scala
@@ -41,7 +41,7 @@ class AskPatternInstrumentationSpec extends TestKitBase with WordSpecLike with M
"log a warning with a stack trace and TraceContext taken from the moment the ask was triggered" in {
implicit val ec = system.dispatcher
implicit val timeout = Timeout(10 milliseconds)
- val noReply = system.actorOf(Props[NoReply])
+ val noReply = system.actorOf(Props[NoReply], "NoReply")
system.eventStream.subscribe(testActor, classOf[Warning])
val testTraceContext = TraceRecorder.withNewTraceContext("ask-timeout-warning") {
diff --git a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
index 44d19f98..a9197ab5 100644
--- a/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/TickMetricSnapshotBufferSpec.scala
@@ -17,7 +17,7 @@
package kamon.metric
import com.typesafe.config.ConfigFactory
-import kamon.{MilliTimestamp, Kamon}
+import kamon.{ MilliTimestamp, Kamon }
import kamon.metric.instrument.Histogram
import kamon.metric.instrument.Histogram.MutableRecord
import org.scalatest.{ Matchers, WordSpecLike }