From 2bde052cc3a3a6eb97b72d3504cd7a4a40a5c2a8 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Mon, 30 Sep 2013 11:21:25 -0300 Subject: wip --- kamon-core/src/main/resources/application.conf | 6 ++--- .../ActorRefTellInstrumentation.scala | 14 +++++++--- .../instrumentation/RunnableInstrumentation.scala | 12 ++++++--- .../SprayServerInstrumentation.scala | 8 +++--- kamon-core/src/main/scala/test/PingPong.scala | 31 +++++++++++++++++++++- .../instrumentation/ActorInstrumentationSpec.scala | 22 ++++++++++++++- 6 files changed, 78 insertions(+), 15 deletions(-) (limited to 'kamon-core') diff --git a/kamon-core/src/main/resources/application.conf b/kamon-core/src/main/resources/application.conf index d51f6b15..c87c0ced 100644 --- a/kamon-core/src/main/resources/application.conf +++ b/kamon-core/src/main/resources/application.conf @@ -1,7 +1,7 @@ akka { - loglevel = DEBUG - stdout-loglevel = DEBUG - log-dead-letters = on + loglevel = INFO + stdout-loglevel = INFO + log-dead-letters = off #extensions = ["kamon.dashboard.DashboardExtension"] diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index d92d7f6c..df124f41 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -64,8 +64,11 @@ class ActorCellInvokeInstrumentation { Tracer.clear //MDC.remove("uow") } - case None => pjp.proceedWith(originalEnvelope) + case None => + assert(Tracer.context() == None) + pjp.proceedWith(originalEnvelope) } + Tracer.clear } } @@ -79,6 +82,7 @@ class UnregisteredActorRefInstrumentation { def sprayInvokeAround(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = { import ProceedingJoinPointPimp._ //println("Handling unregistered actor ref message: "+message) + message match { case SimpleTraceMessage(msg, ctx) => { ctx match { @@ -87,10 +91,14 @@ class UnregisteredActorRefInstrumentation { pjp.proceedWith(msg.asInstanceOf[AnyRef]) // TODO: define if we should use Any or AnyRef and unify with the rest of the instrumentation. Tracer.clear } - case None => pjp.proceedWith(msg.asInstanceOf[AnyRef]) + case None => + assert(Tracer.context() == None) + pjp.proceedWith(msg.asInstanceOf[AnyRef]) } } - case _ => pjp.proceed + case _ => + //assert(Tracer.context() == None) + pjp.proceed } } } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index 30041321..393293f1 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -37,7 +37,7 @@ class RunnableInstrumentation { * Aspect members */ - private val traceContext = Tracer.context + private var traceContext = Tracer.context /** @@ -47,15 +47,21 @@ class RunnableInstrumentation { @Before("instrumentedRunnableCreation()") def beforeCreation = { - //println((new Throwable).getStackTraceString) + traceContext = Tracer.context + /* if(traceContext.isEmpty) + println("NO TRACE CONTEXT FOR RUNNABLE at: [[[%s]]]", (new Throwable).getStackTraceString)//println((new Throwable).getStackTraceString) + else + println("SUPER TRACE CONTEXT FOR RUNNABLE at: [[[%s]]]", (new Throwable).getStackTraceString)*/ } @Around("runnableExecution()") def around(pjp: ProceedingJoinPoint) = { import pjp._ - + /*if(traceContext.isEmpty) + println("OOHHH NOOOOO")*/ withContext(traceContext, proceed()) } } + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index 9422a9f7..4eafcebe 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -32,7 +32,7 @@ class SprayServerInstrumentation { //def afterInit(): Unit = { Tracer.start //openRequest.traceContext - println("Created the context: " + Tracer.context() + " for the transaction: " + request) + //println("Created the context: " + Tracer.context() + " for the transaction: " + request) Tracer.context().map(_.entries ! Rename(request.uri.path.toString())) } @@ -41,13 +41,13 @@ class SprayServerInstrumentation { @After("openRequestCreation()") def afterFinishingRequest(): Unit = { - println("Finishing a request: " + Tracer.context()) + //println("Finishing a request: " + Tracer.context()) Tracer.context().map(_.entries ! Finish()) - +/* if(Tracer.context().isEmpty) { println("WOOOOOPAAAAAAAAA") - } + }*/ } diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala index 93aa322d..c942c6ab 100644 --- a/kamon-core/src/main/scala/test/PingPong.scala +++ b/kamon-core/src/main/scala/test/PingPong.scala @@ -1,11 +1,12 @@ package test -import akka.actor.{Deploy, Props, Actor, ActorSystem} +import akka.actor._ import java.util.concurrent.atomic.AtomicLong import kamon.Tracer import spray.routing.SimpleRoutingApp import akka.util.Timeout import spray.httpx.RequestBuilding +import scala.concurrent.Future object PingPong extends App { import scala.concurrent.duration._ @@ -54,6 +55,7 @@ class Ponger extends Actor { object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding { import scala.concurrent.duration._ import spray.client.pipelining._ + import akka.pattern.ask implicit val system = ActorSystem("test") import system.dispatcher @@ -61,6 +63,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil implicit val timeout = Timeout(30 seconds) val pipeline = sendReceive + val replier = system.actorOf(Props[Replier]) startServer(interface = "localhost", port = 9090) { get { @@ -68,8 +71,34 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil complete { pipeline(Get("http://www.despegar.com.ar")).map(r => "Ok") } + } ~ + path("reply") { + complete { + if (Tracer.context().isEmpty) + println("ROUTE NO CONTEXT") + + (replier ? "replytome").mapTo[String] + } + } ~ + path("ok") { + complete("ok") + } ~ + path("future") { + dynamic { + complete(Future { "OK" }) + } } } } } + +class Replier extends Actor with ActorLogging { + def receive = { + case _ => + if(Tracer.context.isEmpty) + log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") + + sender ! "Ok" + } +} diff --git a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala index ccc7740b..454b4514 100644 --- a/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/akka/instrumentation/ActorInstrumentationSpec.scala @@ -8,6 +8,7 @@ import kamon.{TraceContext, Tracer} import akka.pattern.{pipe, ask} import akka.util.Timeout import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} import akka.routing.RoundRobinRouter @@ -35,11 +36,30 @@ class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentation expectMsg(Some(testTraceContext)) } - "propagate the trace context to actors behind a rounter" in new RoutedTraceContextEchoFixture { + "propagate the trace context to actors behind a router" in new RoutedTraceContextEchoFixture { val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test")) expectMsgAllOf(contexts: _*) } + + "propagate with many asks" in { + val echo = system.actorOf(Props[TraceContextEcho]) + val iterations = 50000 + implicit val timeout = Timeout(10 seconds) + + val futures = for(_ <- 1 to iterations) yield { + Tracer.start + val result = (echo ? "test") + Tracer.clear + + result + } + + val allResults = Await.result(Future.sequence(futures), 10 seconds) + assert(iterations == allResults.collect { + case Some(_) => 1 + }.sum) + } } } -- cgit v1.2.3