From 0b62687fd294de343ae90824f4d570e4273586c1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 2 Oct 2013 19:01:00 -0300 Subject: Switched to DynamicVariables, solve context passing issue produced by runnable batching --- kamon-core/src/main/scala/kamon/Kamon.scala | 16 +++------ .../src/main/scala/kamon/TraceContextSwap.scala | 34 ------------------ .../src/main/scala/kamon/executor/eventbus.scala | 2 +- .../ActorRefTellInstrumentation.scala | 16 +++------ .../instrumentation/RunnableInstrumentation.scala | 37 ++++--------------- .../SprayServerInstrumentation.scala | 32 ++--------------- .../src/main/scala/kamon/metric/MetricFilter.scala | 6 ---- .../RunnableInstrumentationSpec.scala | 41 ++++++++++++---------- project/AspectJ.scala | 3 ++ project/Build.scala | 5 +-- project/Dependencies.scala | 2 +- project/Settings.scala | 2 +- 12 files changed, 50 insertions(+), 146 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/TraceContextSwap.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/MetricFilter.scala diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 118239f7..fb1b2393 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -6,6 +6,7 @@ import scala.concurrent.duration.FiniteDuration import com.newrelic.api.agent.NewRelic import scala.collection.concurrent.TrieMap import kamon.instrumentation.{SimpleContextPassingInstrumentation, ActorInstrumentationConfiguration} +import scala.util.DynamicVariable object Instrument { @@ -32,20 +33,13 @@ object Kamon { object Tracer { - val ctx = new ThreadLocal[Option[TraceContext]] { - override def initialValue() = None - } + val traceContext = new DynamicVariable[Option[TraceContext]](None) - def context() = ctx.get() - def clear = ctx.remove() - def set(traceContext: TraceContext) = ctx.set(Some(traceContext)) - def start = set(newTraceContext) - def stop = ctx.get match { - case Some(context) => context.close - case None => - } + def context() = traceContext.value + def set(ctx: TraceContext) = traceContext.value = Some(ctx) + def start = set(newTraceContext) def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem) } diff --git a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala b/kamon-core/src/main/scala/kamon/TraceContextSwap.scala deleted file mode 100644 index 470b2f34..00000000 --- a/kamon-core/src/main/scala/kamon/TraceContextSwap.scala +++ /dev/null @@ -1,34 +0,0 @@ -package kamon - -import org.slf4j.MDC - -/** - * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards. - */ -trait TraceContextSwap { - - def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body) - - def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = { - - val previous = Tracer.context() - val r = ctx match { - case Some(context) => { - //MDC.put("uow", context.userContext.get.asInstanceOf[String]) - Tracer.set(context) - val bodyResult = primary - //Tracer.clear - //MDC.remove("uow") - - bodyResult - } - case None => fallback - } - previous.map(ctx => Tracer.set(ctx)) - - r - } - -} - -object TraceContextSwap extends TraceContextSwap diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala index a1c099d4..d51305a8 100644 --- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala +++ b/kamon-core/src/main/scala/kamon/executor/eventbus.scala @@ -92,7 +92,7 @@ object TryAkka extends App{ threadPrintln("Before doing it") val f = Future { threadPrintln("This is happening inside the future body") } - Tracer.stop + //Thread.sleep(3000) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 6126d642..915f9635 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -10,7 +10,7 @@ import scala.Some import kamon.trace.context.TracingAwareContext case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) -case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext +case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Tracer.traceContext.value, timestamp: Long = System.nanoTime) extends TracingAwareContext @Aspect("perthis(actorCellCreation(akka.actor.ActorSystem, akka.actor.ActorRef, akka.actor.Props, akka.dispatch.MessageDispatcher, akka.actor.ActorRef))") class ActorCellInvokeInstrumentation { @@ -24,17 +24,11 @@ class ActorCellInvokeInstrumentation { @Around("invokingActorBehaviourAtActorCell(envelope)") def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { //safe cast - envelope.asInstanceOf[TracingAwareContext].traceContext match { - case Some(c) => { - Tracer.set(c) - pjp.proceed() - Tracer.clear - } - case None => - //assert(Tracer.context() == None) - pjp.proceed() + val msgContext = envelope.asInstanceOf[TracingAwareContext].traceContext + + Tracer.traceContext.withValue(msgContext) { + pjp.proceed() } - Tracer.clear } } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala index 456917e0..02d74287 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala @@ -22,7 +22,7 @@ class RunnableInstrumentation { */ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable { - val traceContext: Option[TraceContext] = Tracer.context() + val traceContext: Option[TraceContext] = Tracer.traceContext.value } @@ -38,43 +38,20 @@ class RunnableInstrumentation { - import kamon.TraceContextSwap.withContext - @After("instrumentedRunnableCreation(runnable)") def beforeCreation(runnable: TraceContextAwareRunnable) = { - val x = runnable.traceContext - /*if(runnable.traceContext.isEmpty) - println("WTFWI from: " + (new Throwable).getStackTraceString) - else - println("NOWTF: " + (new Throwable).getStackTraceString)*/ - /* if(traceContext.isEmpty) - println("NO TRACE CONTEXT FOR RUNNABLE at: [[[%s]]]", (new Throwable).getStackTraceString)//println((new Throwable).getStackTraceString) - else - println("SUPER TRACE CONTEXT FOR RUNNABLE at: [[[%s]]]", (new Throwable).getStackTraceString)*/ + // Force traceContext initialization. + runnable.traceContext } @Around("runnableExecution(runnable)") - def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable) = { + def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = { import pjp._ - /*println("EXECUTING") - if(runnable.traceContext.isEmpty) - println("NOMONEY") - - runnable.traceContext match { - case Some(context) => { - //MDC.put("uow", context.userContext.get.asInstanceOf[String]) - Tracer.set(context) - val bodyResult = proceed() - Tracer.clear - //MDC.remove("uow") - - bodyResult - } - case None => proceed() - }*/ - withContext(runnable.traceContext, proceed()) + Tracer.traceContext.withValue(runnable.traceContext) { + proceed() + } } } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index 06254739..32eabe71 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -17,7 +17,7 @@ trait ContextAware { class SprayOpenRequestContextTracing { @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") def mixinContextAwareToOpenRequest: ContextAware = new ContextAware { - val traceContext: Option[TraceContext] = Tracer.context() + val traceContext: Option[TraceContext] = Tracer.traceContext.value } } @@ -29,18 +29,9 @@ class SprayServerInstrumentation { @After("openRequestInit(openRequest, enclosing, request, closeAfterResponseCompletion, timestamp)") def afterInit(openRequest: OpenRequest, enclosing: OpenRequestComponent, request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = { - //@After("openRequestInit()") - //def afterInit(): Unit = { Tracer.start val discard = openRequest.asInstanceOf[ContextAware].traceContext - //println("Reply: %s - %s ", Tracer.context().get.id, request.uri.path.toString()) - -// if(discard.isEmpty || discard != Tracer.context()) { -// println("MEGA ERROR") -// } - //openRequest.traceContext - //println("Created the context: " + Tracer.context() + " for the transaction: " + request) Tracer.context().map(_.entries ! Rename(request.uri.path.toString())) } @@ -49,18 +40,13 @@ class SprayServerInstrumentation { @After("openRequestCreation(openRequest)") def afterFinishingRequest(openRequest: OpenRequest): Unit = { -// println("Finishing a request: " + Tracer.context()) val original = openRequest.asInstanceOf[ContextAware].traceContext - println("The original is: " + original + " - " + openRequest.request.uri.path) + Tracer.context().map(_.entries ! Finish()) if(Tracer.context() != original) { println(s"OMG DIFFERENT Original: [${original}] - Came in: [${Tracer.context}]") } - - if(Tracer.context().isEmpty) { - println("WOOOOOPAAAAAAAAA") - } } @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx)") @@ -71,20 +57,6 @@ class SprayServerInstrumentation { // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. ctx.traceContext } - - @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(ctx, msg)") - def requestRecordInit2(ctx: TracingAwareContext, msg: Any): Unit = {} - - @After("requestRecordInit2(ctx, msg)") - def whenCreatedRequestRecord2(ctx: TracingAwareContext, msg: Any): Unit = { - //println("=======> Spent in WEB External: " + (System.nanoTime() - ctx.timestamp)) - - // TODO: REMOVE THIS: -// val request = (ctx.asInstanceOf[RequestContext]).request - -// ctx.context.map(_.entries ! WebExternal(ctx.timestamp, System.nanoTime(), request.header[Host].map(_.host).getOrElse("UNKNOWN"))) - - } } case class DefaultTracingAwareRequestContext(traceContext: Option[TraceContext] = Tracer.context(), timestamp: Long = System.nanoTime) extends TracingAwareContext diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala deleted file mode 100644 index fb117968..00000000 --- a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala +++ /dev/null @@ -1,6 +0,0 @@ -package kamon.metric - -object MetricFilter { - def actorSystem(system: String): Boolean = !system.startsWith("kamon") - def actor(path: String, system: String): Boolean = true -} diff --git a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala index 789c7c77..6010a185 100644 --- a/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala +++ b/kamon-core/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala @@ -23,39 +23,41 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur } } - "should be available during the execution of onComplete callbacks" in { new FutureWithContextFixture { - val onCompleteContext = Promise[TraceContext]() + "should be available during the execution of onComplete callbacks" in new FutureWithContextFixture { + val onCompleteContext = Promise[Option[TraceContext]]() + + Tracer.traceContext.withValue(Some(testContext)) { futureWithContext.onComplete({ - case _ => onCompleteContext.complete(Success(Tracer.context.get)) + case _ => println("Completing second promise from: "+Thread.currentThread().getName + " With Context: " + Tracer.traceContext.value); onCompleteContext.complete(Success(Tracer.traceContext.value)) }) + } - whenReady(onCompleteContext.future) { result => - result should equal(testContext) - } - }} + whenReady(onCompleteContext.future) { result => + result should equal(Some(testContext)) + } + } } } "created in a thread that doest have a TraceContext" must { - "not capture any TraceContext for the body execution" in { new FutureWithoutContextFixture{ - + "not capture any TraceContext for the body execution" in new FutureWithoutContextFixture{ whenReady(futureWithoutContext) { result => result should equal(None) } - }} + } - "not make any TraceContext available during the onComplete callback" in { new FutureWithoutContextFixture { + "not make any TraceContext available during the onComplete callback" in new FutureWithoutContextFixture { val onCompleteContext = Promise[Option[TraceContext]]() - futureWithoutContext.onComplete({ - case _ => onCompleteContext.complete(Success(Tracer.context)) - }) + futureWithoutContext.onComplete { + case _ => onCompleteContext.complete(Success(Tracer.traceContext.value)) + } whenReady(onCompleteContext.future) { result => result should equal(None) } - }} + } } } @@ -68,14 +70,15 @@ class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutur class FutureWithContextFixture { val testContext = TraceContext() - Tracer.set(testContext) - val futureWithContext = Future { Tracer.context } + var futureWithContext: Future[Option[TraceContext]] = _ + Tracer.traceContext.withValue(Some(testContext)) { + futureWithContext = Future { Tracer.traceContext.value } + } } trait FutureWithoutContextFixture { - Tracer.clear // Make sure no TraceContext is available - val futureWithoutContext = Future { Tracer.context } + val futureWithoutContext = Future { Tracer.traceContext.value } } } diff --git a/project/AspectJ.scala b/project/AspectJ.scala index fb5d8bf9..b6c5037a 100644 --- a/project/AspectJ.scala +++ b/project/AspectJ.scala @@ -10,6 +10,9 @@ object AspectJ { compileOnly in Aspectj := true, fork in Test := true, javaOptions in Test <++= weaverOptions in Aspectj, + fork in run := true, + javaOptions in run <++= weaverOptions in Aspectj, + lintProperties in Aspectj += "invalidAbsoluteTypeName = ignore" // Add this line if we need to include some .aj aspects again in the project. diff --git a/project/Build.scala b/project/Build.scala index 80bc5d32..1e5c9a2f 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -19,11 +19,12 @@ object Build extends Build { .settings(basicSettings: _*) .settings(revolverSettings: _*) .settings(aspectJSettings: _*) - .settings(newrelicSettings: _*) + //.settings(newrelicSettings: _*) .settings( libraryDependencies ++= - compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, aspectJ, aspectJWeaver, metrics, sprayJson, newrelic) ++ + compile(akkaActor, akkaAgent, aspectJ, aspectJWeaver, metrics, newrelic, sprayJson) ++ + provided(sprayCan, sprayClient, sprayRouting) ++ test(scalatest, akkaTestKit, sprayTestkit)) //.dependsOn(kamonDashboard) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index bf139b19..66852801 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -14,7 +14,7 @@ object Dependencies { val sprayRouting = "io.spray" % "spray-routing" % sprayVersion val sprayTestkit = "io.spray" % "spray-testkit" % sprayVersion val sprayClient = "io.spray" % "spray-client" % sprayVersion - val sprayJson = "io.spray" %% "spray-json" % "1.2.3" + val sprayJson = "io.spray" %% "spray-json" % "1.2.5" val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1" val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion val akkaAgent = "com.typesafe.akka" %% "akka-agent" % akkaVersion diff --git a/project/Settings.scala b/project/Settings.scala index 6aca813f..9633b405 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -3,7 +3,7 @@ import Keys._ import spray.revolver.RevolverPlugin.Revolver object Settings { - val VERSION = "0.0.2" + val VERSION = "0.0.3" lazy val basicSettings = seq( version := VERSION, -- cgit v1.2.3