From 36ca84c0505c65e7c4947d0b0a7edf12fcdec48e Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Mon, 12 Aug 2013 19:00:49 -0300 Subject: fixed the instrumentation to work nicely with spray --- kamon-core/src/main/scala/kamon/Kamon.scala | 36 +--------------- kamon-core/src/main/scala/kamon/TraceContext.scala | 21 ++++++---- .../src/main/scala/kamon/executor/eventbus.scala | 12 +++--- .../ActorRefTellInstrumentation.scala | 49 +++++++++++++++------- 4 files changed, 54 insertions(+), 64 deletions(-) (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index c3080909..07773c55 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -8,33 +8,8 @@ import scala.concurrent.duration.{FiniteDuration, Duration} import com.newrelic.api.agent.NewRelic object Kamon { - - val ctx = new ThreadLocal[Option[TraceContext]] { - override def initialValue() = None - } - implicit lazy val actorSystem = ActorSystem("kamon") - - def context() = ctx.get() - def clear = ctx.remove() - def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - - def start = set(newTraceContext) - def stop = ctx.get match { - case Some(context) => context.close - case None => - } - - def newTraceContext(): TraceContext = TraceContext() - - - val publisher = actorSystem.actorOf(Props[TransactionPublisher]) - - def publish(tx: FullTransaction) = publisher ! tx - - - object Metric { val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala @@ -44,21 +19,12 @@ object Kamon { def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) } - - val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") } - - - - - - - object Tracer { val ctx = new ThreadLocal[Option[TraceContext]] { override def initialValue() = None @@ -74,7 +40,7 @@ object Tracer { case None => } - //def newTraceContext(): TraceContext = TraceContext() + def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem) } diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 6b32550f..62d7f57e 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -1,31 +1,34 @@ package kamon import java.util.UUID -import akka.actor.{ActorSystem, ActorPath} +import akka.actor._ import akka.agent.Agent import java.util.concurrent.TimeUnit import scala.util.{Failure, Success} import akka.util.Timeout -case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) { +case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) { implicit val timeout = Timeout(30, TimeUnit.SECONDS) implicit val as = Kamon.actorSystem.dispatcher - def append(entry: TraceEntry) = entries send (entry :: _) - def close = entries.future.onComplete({ - case Success(list) => Kamon.publish(FullTransaction(id, list)) - case Failure(t) => println("WTF!") - }) + def append(entry: TraceEntry) = entries ! entry + def close = entries ! "Close" // TODO type this thing!. } object TraceContext { - implicit val as2 = Kamon.actorSystem.dispatcher - def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) + def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer } +class TraceAccumulator extends Actor { + def receive = { + case a => println("Trace Accumulated: "+a) + } +} + + trait TraceEntry case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala index 599f2a7a..33ff4a4e 100644 --- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -5,7 +5,7 @@ import akka.event.LookupClassification import akka.actor._ import java.util.concurrent.TimeUnit -import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} +import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext} import akka.util.Timeout import scala.util.{Random, Success, Failure} import scala.concurrent.Future @@ -66,14 +66,14 @@ object TryAkka extends App{ } })) - Kamon.start + Tracer.start for(i <- 1 to 4) { val ping = system.actorOf(Props[PingActor]) ping ! Pong() } - def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body") /* val newRelicReporter = new NewRelicReporter(registry) @@ -86,13 +86,13 @@ object TryAkka extends App{ - Kamon.start + Tracer.start - Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) + Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) threadPrintln("Before doing it") val f = Future { threadPrintln("This is happening inside the future body") } - Kamon.stop + Tracer.stop //Thread.sleep(3000) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index c543123c..f3e1828d 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -17,7 +17,7 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - @Pointcut("execution(* (akka.actor.ScalaActorRef+ && !akka.event.Logging$StandardOutLogger).$bang(..)) && target(actor) && args(message, sender)") + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") @@ -25,16 +25,8 @@ class ActorRefTellInstrumentation { val actorName = MetricDirectory.nameForActor(actor) val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") - if(!actor.toString().contains("StandardOutLogger")) { - println("Skipped the actor") - pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) + pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) - } - else { - println("Got the standardLogger!!") - pjp.proceed() - } } } @@ -55,7 +47,7 @@ class ActorCellInvokeInstrumentation { val actorName = MetricDirectory.nameForActor(ref) val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - println("=====> Created ActorCell for: "+ref.toString()) + //println("=====> Created ActorCell for: "+ref.toString()) /** TODO: Find a better way to filter the things we don't want to measure. */ //if(system.name != "kamon" && actorName.startsWith("/user")) { processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") @@ -64,14 +56,14 @@ class ActorCellInvokeInstrumentation { } - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import ProceedingJoinPointPimp._ - println("ENVELOPE --------------------->"+envelope) + //println("ENVELOPE --------------------->"+envelope) envelope match { case Envelope(TraceableMessage(ctx, msg, timer), sender) => { timer.stop() @@ -83,7 +75,36 @@ class ActorCellInvokeInstrumentation { ctx match { case Some(c) => { Tracer.set(c) - println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + pjp.proceedWith(originalEnvelope) + Tracer.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + pt.stop() + } + case _ => pjp.proceed + } + } + + + @Pointcut("execution(* spray.can.server.ResponseReceiverRef.handle(*)) && args(message)") + def sprayResponderHandle(message: AnyRef) = {} + + @Around("sprayResponderHandle(message)") + def sprayInvokeAround(pjp: ProceedingJoinPoint, message: AnyRef): Unit = { + import ProceedingJoinPointPimp._ + message match { + case TraceableMessage(ctx, msg, timer) => { + timer.stop() + + val originalEnvelope: AnyRef = msg.asInstanceOf[AnyRef] + + //println("PROCESSING TIME TIMER: "+processingTimeTimer) + val pt = processingTimeTimer.time() + ctx match { + case Some(c) => { + Tracer.set(c) pjp.proceedWith(originalEnvelope) Tracer.clear } -- cgit v1.2.3