diff options
author | Ivan Topolnak <ivantopo@gmail.com> | 2013-11-05 18:38:39 -0300 |
---|---|---|
committer | Ivan Topolnak <ivantopo@gmail.com> | 2013-11-05 18:38:39 -0300 |
commit | 2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1 (patch) | |
tree | 56c4ad1f025c9144376cd4463ad4d4a23e37b571 /kamon-trace | |
parent | 5127c3bb83cd6fe90e071720d995cfb53d913e6a (diff) | |
download | Kamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.tar.gz Kamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.tar.bz2 Kamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.zip |
basic separation of concerns between sub-projects
Diffstat (limited to 'kamon-trace')
-rw-r--r-- | kamon-trace/src/main/scala/kamon/trace/Trace.scala (renamed from kamon-trace/src/main/scala/kamon/trace/Tracer.scala) | 36 | ||||
-rw-r--r-- | kamon-trace/src/main/scala/kamon/trace/TraceContext.scala | 54 | ||||
-rw-r--r-- | kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala | 31 | ||||
-rw-r--r-- | kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala | 49 | ||||
-rw-r--r-- | kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala | 6 | ||||
-rw-r--r-- | kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala | 94 | ||||
-rw-r--r-- | kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala | 85 | ||||
-rw-r--r-- | kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala | 37 |
8 files changed, 337 insertions, 55 deletions
diff --git a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala b/kamon-trace/src/main/scala/kamon/trace/Trace.scala index 4ea89850..232b7420 100644 --- a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-trace/src/main/scala/kamon/trace/Trace.scala @@ -6,6 +6,7 @@ import akka.actor._ import scala.Some import kamon.trace.Trace.Register import scala.concurrent.duration._ +import java.util.concurrent.atomic.AtomicLong object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: Extension] = Trace @@ -14,10 +15,31 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { /*** Protocol */ case object Register + + + + /** User API */ + private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None) + private[trace] val tranid = new AtomicLong() + + + def context() = traceContext.value + def set(ctx: TraceContext) = traceContext.value = Some(ctx) + + def start(name: String)(implicit system: ActorSystem) = set(newTraceContext) + + def finish(): Option[TraceContext] = { + val ctx = context() + ctx.map(_.finish) + ctx + } + + // TODO: FIX + def newTraceContext()(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace), tranid.getAndIncrement) } class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { - def manager: ActorRef = ??? + def manager: ActorRef = system.actorOf(Props[TraceManager]) } class TraceManager extends Actor { @@ -35,15 +57,3 @@ class TraceManager extends Actor { listeners foreach(_ ! trace) } } - - -object Tracer { - val traceContext = new DynamicVariable[Option[TraceContext]](None) - - - def context() = traceContext.value - def set(ctx: TraceContext) = traceContext.value = Some(ctx) - - def start = set(newTraceContext) - def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem) -} diff --git a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala index c3f1f2c2..f8491c12 100644 --- a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala @@ -4,56 +4,32 @@ import java.util.UUID import akka.actor._ import java.util.concurrent.atomic.AtomicLong import scala.concurrent.duration._ +import kamon.Kamon +import kamon.trace.UowTracing.{Finish, Start} // TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. -case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None) +protected[kamon] case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) { + collector ! Start(id) -object TraceContext { - - def apply()(implicit system: ActorSystem) = { - val n = traceIdCounter.incrementAndGet() - val actor = system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), s"tracer-${n}") - actor ! Start() - - new TraceContext(n, actor) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer + def finish: Unit = { + collector ! Finish(id) } -} - -class TraceAccumulator extends Actor { - def receive = { - case a => println("Trace Accumulated: "+a) - } } -trait TraceEntry -case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry -case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry]) - -object Collector - -trait TraceEntryStorage { - def store(entry: TraceEntry): Boolean +trait ContextAware { + def traceContext: Option[TraceContext] } -class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) { - def store(entry: TraceEntry) = storage.store(entry) -} - -object ThreadLocalTraceEntryStorage extends TraceEntryStorage { - - private val storage = new ThreadLocal[List[TraceEntry]] { - override def initialValue(): List[TraceEntry] = Nil - } - - def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get) - - def store(entry: TraceEntry): Boolean = { - update(entry :: _) - true +object ContextAware { + def default: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Trace.context() } } - +trait TimedContextAware { + def timestamp: Long + def traceContext: Option[TraceContext] +} diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala new file mode 100644 index 00000000..77993cdd --- /dev/null +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala @@ -0,0 +1,31 @@ +package kamon.trace.instrumentation + +import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import org.slf4j.MDC +import kamon.trace.{TraceContext, ContextAware, Trace} + +@Aspect +class ActorLoggingInstrumentation { + + + @DeclareMixin("akka.event.Logging.LogEvent+") + def traceContextMixin: ContextAware = new ContextAware { + def traceContext: Option[TraceContext] = Trace.context() + } + + @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") + def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {} + + @Around("withMdcInvocation(logSource, logEvent, logStatement)") + def putTraceContextInMDC(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = { + logEvent.traceContext match { + case Some(ctx) => + MDC.put("uow", ctx.uow) + pjp.proceed() + MDC.remove("uow") + + case None => pjp.proceed() + } + } +} diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala new file mode 100644 index 00000000..3caba77c --- /dev/null +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala @@ -0,0 +1,49 @@ +package kamon.trace.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import akka.actor.{Props, ActorSystem, ActorRef} +import akka.dispatch.{Envelope, MessageDispatcher} +import com.codahale.metrics.Timer +import kamon.trace.{ContextAware, TraceContext, Trace} + +case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context) +case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Trace.traceContext.value, timestamp: Long = System.nanoTime) extends ContextAware + +@Aspect +class ActorCellInvokeInstrumentation { + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") + def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(envelope)") + def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + //safe cast + val msgContext = envelope.asInstanceOf[ContextAware].traceContext + + Trace.traceContext.withValue(msgContext) { + pjp.proceed() + } + } +} + +@Aspect +class EnvelopeTracingContext { + + @DeclareMixin("akka.dispatch.Envelope") + def mixin: ContextAware = new ContextAware { + val traceContext: Option[TraceContext] = Trace.context() + } + + @Pointcut("execution(akka.dispatch.ContextAware.new(..)) && this(ctx)") + def requestRecordInit(ctx: ContextAware): Unit = {} + + @After("requestRecordInit(ctx)") + def whenCreatedRequestRecord(ctx: ContextAware): Unit = { + // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation. + ctx.traceContext + } +} diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala index 236fd4fc..3e5a7cce 100644 --- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala @@ -2,7 +2,7 @@ package kamon.trace.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.TraceContext +import kamon.trace.{TraceContext, Trace} @Aspect class RunnableTracing { @@ -13,7 +13,7 @@ class RunnableTracing { */ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable { - val traceContext: Option[TraceContext] = Tracer.traceContext.value + val traceContext: Option[TraceContext] = Trace.traceContext.value } @@ -40,7 +40,7 @@ class RunnableTracing { def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = { import pjp._ - Tracer.traceContext.withValue(runnable.traceContext) { + Trace.traceContext.withValue(runnable.traceContext) { proceed() } } diff --git a/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala b/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala new file mode 100644 index 00000000..f5d88f06 --- /dev/null +++ b/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala @@ -0,0 +1,94 @@ +package kamon + +import org.scalatest.{WordSpecLike, Matchers} +import akka.actor.{ActorRef, Actor, Props, ActorSystem} + +import akka.testkit.{ImplicitSender, TestKit} +import kamon.trace.Trace +import akka.pattern.{pipe, ask} +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import akka.routing.RoundRobinRouter +import kamon.trace.TraceContext + + +class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender { + implicit val executionContext = system.dispatcher + + "an instrumented actor ref" when { + "used inside the context of a transaction" should { + "propagate the trace context using bang" in new TraceContextEchoFixture { + echo ! "test" + + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context using tell" in new TraceContextEchoFixture { + echo.tell("test", testActor) + + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context using ask" in new TraceContextEchoFixture { + implicit val timeout = Timeout(1 seconds) + (echo ? "test") pipeTo(testActor) + + expectMsg(Some(testTraceContext)) + } + + "propagate the trace context to actors behind a router" in new RoutedTraceContextEchoFixture { + val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test")) + + expectMsgAllOf(contexts: _*) + } + + /*"propagate with many asks" in { + val echo = system.actorOf(Props[TraceContextEcho]) + val iterations = 50000 + implicit val timeout = Timeout(10 seconds) + + val futures = for(_ <- 1 to iterations) yield { + Tracer.start + val result = (echo ? "test") + Tracer.clear + + result + } + + val allResults = Await.result(Future.sequence(futures), 10 seconds) + assert(iterations == allResults.collect { + case Some(_) => 1 + }.sum) + }*/ + } + } + + trait TraceContextEchoFixture { + val testTraceContext = Trace.newTraceContext() + val echo = system.actorOf(Props[TraceContextEcho]) + + Trace.set(testTraceContext) + } + + trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture { + override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10))) + + def tellWithNewContext(target: ActorRef, message: Any): TraceContext = { + val context = Trace.newTraceContext() + Trace.set(context) + + target ! message + context + } + } + +} + +class TraceContextEcho extends Actor { + def receive = { + case msg: String ⇒ sender ! Trace.context() + } +} + + diff --git a/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala b/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala new file mode 100644 index 00000000..f968fa83 --- /dev/null +++ b/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala @@ -0,0 +1,85 @@ +package kamon + +import scala.concurrent.{Await, Promise, Future} +import org.scalatest.{Matchers, OptionValues, WordSpec} +import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration} +import java.util.UUID +import scala.util.Success +import scala.concurrent.duration._ +import java.util.concurrent.TimeUnit +import akka.actor.{Actor, ActorSystem} +import kamon.trace.{Trace, TraceContext} + + +class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues { + + "a instrumented runnable" when { + "created in a thread that does have a TraceContext" must { + "preserve the TraceContext" which { + "should be available during the run method execution" in new FutureWithContextFixture { + +/* whenReady(futureWithContext) { result => + result.value should equal(testContext) + }*/ + } + + "should be available during the execution of onComplete callbacks" in new FutureWithContextFixture { + + val onCompleteContext = Promise[Option[TraceContext]]() + +/* Tracer.traceContext.withValue(Some(testContext)) { + futureWithContext.onComplete({ + case _ => println("Completing second promise from: "+Thread.currentThread().getName + " With Context: " + Tracer.traceContext.value); onCompleteContext.complete(Success(Tracer.traceContext.value)) + }) + }*/ + + whenReady(onCompleteContext.future) { result => + result should equal(Some(testContext)) + } + } + } + } + + "created in a thread that doest have a TraceContext" must { + "not capture any TraceContext for the body execution" in new FutureWithoutContextFixture{ + whenReady(futureWithoutContext) { result => + result should equal(None) + } + } + + "not make any TraceContext available during the onComplete callback" in new FutureWithoutContextFixture { + val onCompleteContext = Promise[Option[TraceContext]]() + + futureWithoutContext.onComplete { + case _ => onCompleteContext.complete(Success(Trace.context())) + } + + whenReady(onCompleteContext.future) { result => + result should equal(None) + } + } + } + } + + + /** + * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have. + */ + implicit val testActorSystem = ActorSystem("test-actorsystem") + implicit val execContext = testActorSystem.dispatcher + + class FutureWithContextFixture { + val testContext = TraceContext(Actor.noSender, 1) + +/* var futureWithContext: Future[Option[TraceContext]] = _ + Tracer.context.withValue(Some(testContext)) { + futureWithContext = Future { Tracer.traceContext.value } + }*/ + } + + trait FutureWithoutContextFixture { + val futureWithoutContext = Future { Trace.context.value } + } +} + + diff --git a/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala new file mode 100644 index 00000000..a8e736ae --- /dev/null +++ b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala @@ -0,0 +1,37 @@ +package kamon + +import org.scalatest.{WordSpecLike, WordSpec} +import akka.testkit.{TestKitBase, TestKit} +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import kamon.trace.UowTracing.{Finish, Rename, Start} +import kamon.trace.{UowTrace, UowTraceAggregator} + +class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike { + + "a TraceAggregator" should { + "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start(1) + aggregator ! Finish(1) + + expectMsg(UowTrace("UNKNOWN", Seq(Start(1), Finish(1)))) + } + } + + "change the uow name after receiving a Rename message" in new AggregatorFixture { + within(1 second) { + aggregator ! Start(1) + aggregator ! Rename(1, "test-uow") + aggregator ! Finish(1) + + expectMsg(UowTrace("test-uow", Seq(Start(1), Finish(1)))) + } + } + } + + + trait AggregatorFixture { + val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds)) + } +} |