From 36ca84c0505c65e7c4947d0b0a7edf12fcdec48e Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Mon, 12 Aug 2013 19:00:49 -0300 Subject: fixed the instrumentation to work nicely with spray --- kamon-core/src/main/resources/application.conf | 3 ++ kamon-core/src/main/scala/kamon/Kamon.scala | 36 +--------------- kamon-core/src/main/scala/kamon/TraceContext.scala | 21 ++++++---- .../src/main/scala/kamon/executor/eventbus.scala | 12 +++--- .../ActorRefTellInstrumentation.scala | 49 +++++++++++++++------- .../instrumentation/ActorInstrumentationSpec.scala | 42 +++++++++++++++---- .../RunnableInstrumentationSpec.scala | 14 +++---- 7 files changed, 99 insertions(+), 78 deletions(-) (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf index fb69ecd4..06bdf13a 100644 --- a/kamon-core/src/main/resources/application.conf +++ b/kamon-core/src/main/resources/application.conf @@ -19,6 +19,9 @@ akka { throughput = 100 } + debug { + unhandled = on + } } } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index c3080909..07773c55 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -8,33 +8,8 @@ import scala.concurrent.duration.{FiniteDuration, Duration} import com.newrelic.api.agent.NewRelic object Kamon { - - val ctx = new ThreadLocal[Option[TraceContext]] { - override def initialValue() = None - } - implicit lazy val actorSystem = ActorSystem("kamon") - - def context() = ctx.get() - def clear = ctx.remove() - def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - - def start = set(newTraceContext) - def stop = ctx.get match { - case Some(context) => context.close - case None => - } - - def newTraceContext(): TraceContext = TraceContext() - - - val publisher = actorSystem.actorOf(Props[TransactionPublisher]) - - def publish(tx: FullTransaction) = publisher ! tx - - - object Metric { val actorSystems = new ConcurrentHashMap[String, ActorSystemMetrics] asScala @@ -44,21 +19,12 @@ object Kamon { def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) } - - val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") } - - - - - - - object Tracer { val ctx = new ThreadLocal[Option[TraceContext]] { override def initialValue() = None @@ -74,7 +40,7 @@ object Tracer { case None => } - //def newTraceContext(): TraceContext = TraceContext() + def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem) } diff --git a/kamon-core/src/main/scala/kamon/TraceContext.scala b/kamon-core/src/main/scala/kamon/TraceContext.scala index 6b32550f..62d7f57e 100644 --- a/kamon-core/src/main/scala/kamon/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/TraceContext.scala @@ -1,31 +1,34 @@ package kamon import java.util.UUID -import akka.actor.{ActorSystem, ActorPath} +import akka.actor._ import akka.agent.Agent import java.util.concurrent.TimeUnit import scala.util.{Failure, Success} import akka.util.Timeout -case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) { +case class TraceContext(id: UUID, entries: ActorRef, userContext: Option[Any] = None) { implicit val timeout = Timeout(30, TimeUnit.SECONDS) implicit val as = Kamon.actorSystem.dispatcher - def append(entry: TraceEntry) = entries send (entry :: _) - def close = entries.future.onComplete({ - case Success(list) => Kamon.publish(FullTransaction(id, list)) - case Failure(t) => println("WTF!") - }) + def append(entry: TraceEntry) = entries ! entry + def close = entries ! "Close" // TODO type this thing!. } object TraceContext { - implicit val as2 = Kamon.actorSystem.dispatcher - def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) + def apply()(implicit system: ActorSystem) = new TraceContext(UUID.randomUUID(), system.actorOf(Props[TraceAccumulator])) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer } +class TraceAccumulator extends Actor { + def receive = { + case a => println("Trace Accumulated: "+a) + } +} + + trait TraceEntry case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala index 599f2a7a..33ff4a4e 100644 --- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -5,7 +5,7 @@ import akka.event.LookupClassification import akka.actor._ import java.util.concurrent.TimeUnit -import kamon.{CodeBlockExecutionTime, Kamon, TraceContext} +import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext} import akka.util.Timeout import scala.util.{Random, Success, Failure} import scala.concurrent.Future @@ -66,14 +66,14 @@ object TryAkka extends App{ } })) - Kamon.start + Tracer.start for(i <- 1 to 4) { val ping = system.actorOf(Props[PingActor]) ping ! Pong() } - def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body") + def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body") /* val newRelicReporter = new NewRelicReporter(registry) @@ -86,13 +86,13 @@ object TryAkka extends App{ - Kamon.start + Tracer.start - Kamon.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) + Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) threadPrintln("Before doing it") val f = Future { threadPrintln("This is happening inside the future body") } - Kamon.stop + Tracer.stop //Thread.sleep(3000) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index c543123c..f3e1828d 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -17,7 +17,7 @@ case class TraceableMessage(traceContext: Option[TraceContext], message: Any, ti class ActorRefTellInstrumentation { import ProceedingJoinPointPimp._ - @Pointcut("execution(* (akka.actor.ScalaActorRef+ && !akka.event.Logging$StandardOutLogger).$bang(..)) && target(actor) && args(message, sender)") + @Pointcut("execution(* akka.actor.ScalaActorRef+.$bang(..)) && !within(akka.event.Logging.StandardOutLogger) && !within(akka.pattern.PromiseActorRef) && target(actor) && args(message, sender)") def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {} @Around("sendingMessageToActorRef(actor, message, sender)") @@ -25,16 +25,8 @@ class ActorRefTellInstrumentation { val actorName = MetricDirectory.nameForActor(actor) val t = Metrics.registry.timer(actorName + "LATENCY") - //println(s"About to proceed with: $actor $message $sender ${Kamon.context}") - if(!actor.toString().contains("StandardOutLogger")) { - println("Skipped the actor") - pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) + pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender) - } - else { - println("Got the standardLogger!!") - pjp.proceed() - } } } @@ -55,7 +47,7 @@ class ActorCellInvokeInstrumentation { val actorName = MetricDirectory.nameForActor(ref) val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) - println("=====> Created ActorCell for: "+ref.toString()) + //println("=====> Created ActorCell for: "+ref.toString()) /** TODO: Find a better way to filter the things we don't want to measure. */ //if(system.name != "kamon" && actorName.startsWith("/user")) { processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME") @@ -64,14 +56,14 @@ class ActorCellInvokeInstrumentation { } - @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)") + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { import ProceedingJoinPointPimp._ - println("ENVELOPE --------------------->"+envelope) + //println("ENVELOPE --------------------->"+envelope) envelope match { case Envelope(TraceableMessage(ctx, msg, timer), sender) => { timer.stop() @@ -83,7 +75,36 @@ class ActorCellInvokeInstrumentation { ctx match { case Some(c) => { Tracer.set(c) - println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + //println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope) + pjp.proceedWith(originalEnvelope) + Tracer.clear + } + case None => pjp.proceedWith(originalEnvelope) + } + pt.stop() + } + case _ => pjp.proceed + } + } + + + @Pointcut("execution(* spray.can.server.ResponseReceiverRef.handle(*)) && args(message)") + def sprayResponderHandle(message: AnyRef) = {} + + @Around("sprayResponderHandle(message)") + def sprayInvokeAround(pjp: ProceedingJoinPoint, message: AnyRef): Unit = { + import ProceedingJoinPointPimp._ + message match { + case TraceableMessage(ctx, msg, timer) => { + timer.stop() + + val originalEnvelope: AnyRef = msg.asInstanceOf[AnyRef] + + //println("PROCESSING TIME TIMER: "+processingTimeTimer) + val pt = processingTimeTimer.time() + ctx match { + case Some(c) => { + Tracer.set(c) pjp.proceedWith(originalEnvelope) Tracer.clear } diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala index 0026d953..ccc7740b 100644 --- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala @@ -1,13 +1,18 @@ package akka.instrumentation import org.scalatest.{WordSpecLike, Matchers} -import akka.actor.{Actor, Props, ActorSystem} +import akka.actor.{ActorRef, Actor, Props, ActorSystem} import akka.testkit.{ImplicitSender, TestKit} -import kamon.{TraceContext, Kamon} +import kamon.{TraceContext, Tracer} +import akka.pattern.{pipe, ask} +import akka.util.Timeout +import scala.concurrent.duration._ +import akka.routing.RoundRobinRouter class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender { + implicit val executionContext = system.dispatcher "an instrumented actor ref" when { "used inside the context of a transaction" should { @@ -17,28 +22,51 @@ class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentation expectMsg(Some(testTraceContext)) } - "propagate the trace context using tell" in { + "propagate the trace context using tell" in new TraceContextEchoFixture { + echo.tell("test", testActor) + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context using ask" in new TraceContextEchoFixture { + implicit val timeout = Timeout(1 seconds) + (echo ? "test") pipeTo(testActor) + + expectMsg(Some(testTraceContext)) } - "propagate the trace context using ask" in { + "propagate the trace context to actors behind a rounter" in new RoutedTraceContextEchoFixture { + val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test")) + expectMsgAllOf(contexts: _*) } } } trait TraceContextEchoFixture { - val testTraceContext = Kamon.newTraceContext() + val testTraceContext = Tracer.newTraceContext() val echo = system.actorOf(Props[TraceContextEcho]) - Kamon.set(testTraceContext) + Tracer.set(testTraceContext) + } + + trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { + override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10))) + + def tellWithNewContext(target: ActorRef, message: Any): TraceContext = { + val context = Tracer.newTraceContext() + Tracer.set(context) + + target ! message + context + } } } class TraceContextEcho extends Actor { def receive = { - case msg ⇒ sender ! Kamon.context() + case msg: String ⇒ sender ! Tracer.context() } } diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala index de65aaca..fe89695b 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala @@ -3,7 +3,7 @@ package kamon.instrumentation import scala.concurrent.{Await, Promise, Future} import org.scalatest.{Matchers, OptionValues, WordSpec} import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} -import kamon.{Kamon, TraceContext} +import kamon.{Tracer, Kamon, TraceContext} import java.util.UUID import scala.util.Success import scala.concurrent.duration._ @@ -27,7 +27,7 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur val onCompleteContext = Promise[TraceContext]() futureWithContext.onComplete({ - case _ => onCompleteContext.complete(Success(Kamon.context.get)) + case _ => onCompleteContext.complete(Success(Tracer.context.get)) }) whenReady(onCompleteContext.future) { result => @@ -49,7 +49,7 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur val onCompleteContext = Promise[Option[TraceContext]]() futureWithoutContext.onComplete({ - case _ => onCompleteContext.complete(Success(Kamon.context)) + case _ => onCompleteContext.complete(Success(Tracer.context)) }) whenReady(onCompleteContext.future) { result => @@ -68,14 +68,14 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur class FutureWithContextFixture { val testContext = TraceContext() - Kamon.set(testContext) + Tracer.set(testContext) - val futureWithContext = Future { Kamon.context} + val futureWithContext = Future { Tracer.context} } trait FutureWithoutContextFixture { - Kamon.clear // Make sure no TraceContext is available - val futureWithoutContext = Future { Kamon.context } + Tracer.clear // Make sure no TraceContext is available + val futureWithoutContext = Future { Tracer.context } } } -- cgit v1.2.3