aboutsummaryrefslogtreecommitdiff
path: root/kamon-trace
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-11-05 18:38:39 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-11-05 18:38:39 -0300
commit2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1 (patch)
tree56c4ad1f025c9144376cd4463ad4d4a23e37b571 /kamon-trace
parent5127c3bb83cd6fe90e071720d995cfb53d913e6a (diff)
downloadKamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.tar.gz
Kamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.tar.bz2
Kamon-2b63540e5fffab545d0846cfb3dab5c0e1d0c9e1.zip
basic separation of concerns between sub-projects
Diffstat (limited to 'kamon-trace')
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/Trace.scala (renamed from kamon-trace/src/main/scala/kamon/trace/Tracer.scala)36
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/TraceContext.scala54
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala31
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala49
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala6
-rw-r--r--kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala94
-rw-r--r--kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala85
-rw-r--r--kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala37
8 files changed, 337 insertions, 55 deletions
diff --git a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala b/kamon-trace/src/main/scala/kamon/trace/Trace.scala
index 4ea89850..232b7420 100644
--- a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-trace/src/main/scala/kamon/trace/Trace.scala
@@ -6,6 +6,7 @@ import akka.actor._
import scala.Some
import kamon.trace.Trace.Register
import scala.concurrent.duration._
+import java.util.concurrent.atomic.AtomicLong
object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
def lookup(): ExtensionId[_ <: Extension] = Trace
@@ -14,10 +15,31 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
/*** Protocol */
case object Register
+
+
+
+ /** User API */
+ private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None)
+ private[trace] val tranid = new AtomicLong()
+
+
+ def context() = traceContext.value
+ def set(ctx: TraceContext) = traceContext.value = Some(ctx)
+
+ def start(name: String)(implicit system: ActorSystem) = set(newTraceContext)
+
+ def finish(): Option[TraceContext] = {
+ val ctx = context()
+ ctx.map(_.finish)
+ ctx
+ }
+
+ // TODO: FIX
+ def newTraceContext()(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace), tranid.getAndIncrement)
}
class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
- def manager: ActorRef = ???
+ def manager: ActorRef = system.actorOf(Props[TraceManager])
}
class TraceManager extends Actor {
@@ -35,15 +57,3 @@ class TraceManager extends Actor {
listeners foreach(_ ! trace)
}
}
-
-
-object Tracer {
- val traceContext = new DynamicVariable[Option[TraceContext]](None)
-
-
- def context() = traceContext.value
- def set(ctx: TraceContext) = traceContext.value = Some(ctx)
-
- def start = set(newTraceContext)
- def newTraceContext(): TraceContext = TraceContext()(Kamon.actorSystem)
-}
diff --git a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala
index c3f1f2c2..f8491c12 100644
--- a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala
@@ -4,56 +4,32 @@ import java.util.UUID
import akka.actor._
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration._
+import kamon.Kamon
+import kamon.trace.UowTracing.{Finish, Start}
// TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary.
-case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None)
+protected[kamon] case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) {
+ collector ! Start(id)
-object TraceContext {
-
- def apply()(implicit system: ActorSystem) = {
- val n = traceIdCounter.incrementAndGet()
- val actor = system.actorOf(UowTraceAggregator.props(reporter, 30 seconds), s"tracer-${n}")
- actor ! Start()
-
- new TraceContext(n, actor) // TODO: Move to a kamon specific supervisor, like /user/kamon/tracer
+ def finish: Unit = {
+ collector ! Finish(id)
}
-}
-
-class TraceAccumulator extends Actor {
- def receive = {
- case a => println("Trace Accumulated: "+a)
- }
}
-trait TraceEntry
-case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
-case class TransactionTrace(id: UUID, start: Long, end: Long, entries: Seq[TraceEntry])
-
-object Collector
-
-trait TraceEntryStorage {
- def store(entry: TraceEntry): Boolean
+trait ContextAware {
+ def traceContext: Option[TraceContext]
}
-class TransactionContext(val id: UUID, private val storage: TraceEntryStorage) {
- def store(entry: TraceEntry) = storage.store(entry)
-}
-
-object ThreadLocalTraceEntryStorage extends TraceEntryStorage {
-
- private val storage = new ThreadLocal[List[TraceEntry]] {
- override def initialValue(): List[TraceEntry] = Nil
- }
-
- def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get)
-
- def store(entry: TraceEntry): Boolean = {
- update(entry :: _)
- true
+object ContextAware {
+ def default: ContextAware = new ContextAware {
+ val traceContext: Option[TraceContext] = Trace.context()
}
}
-
+trait TimedContextAware {
+ def timestamp: Long
+ def traceContext: Option[TraceContext]
+}
diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala
new file mode 100644
index 00000000..77993cdd
--- /dev/null
+++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingInstrumentation.scala
@@ -0,0 +1,31 @@
+package kamon.trace.instrumentation
+
+import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect}
+import org.aspectj.lang.ProceedingJoinPoint
+import org.slf4j.MDC
+import kamon.trace.{TraceContext, ContextAware, Trace}
+
+@Aspect
+class ActorLoggingInstrumentation {
+
+
+ @DeclareMixin("akka.event.Logging.LogEvent+")
+ def traceContextMixin: ContextAware = new ContextAware {
+ def traceContext: Option[TraceContext] = Trace.context()
+ }
+
+ @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)")
+ def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {}
+
+ @Around("withMdcInvocation(logSource, logEvent, logStatement)")
+ def putTraceContextInMDC(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {
+ logEvent.traceContext match {
+ case Some(ctx) =>
+ MDC.put("uow", ctx.uow)
+ pjp.proceed()
+ MDC.remove("uow")
+
+ case None => pjp.proceed()
+ }
+ }
+}
diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala
new file mode 100644
index 00000000..3caba77c
--- /dev/null
+++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorRefTellInstrumentation.scala
@@ -0,0 +1,49 @@
+package kamon.trace.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import akka.actor.{Props, ActorSystem, ActorRef}
+import akka.dispatch.{Envelope, MessageDispatcher}
+import com.codahale.metrics.Timer
+import kamon.trace.{ContextAware, TraceContext, Trace}
+
+case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)
+case class DefaultTracingAwareEnvelopeContext(traceContext: Option[TraceContext] = Trace.traceContext.value, timestamp: Long = System.nanoTime) extends ContextAware
+
+@Aspect
+class ActorCellInvokeInstrumentation {
+
+ @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
+ def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
+
+ @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+
+ @Around("invokingActorBehaviourAtActorCell(envelope)")
+ def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
+ //safe cast
+ val msgContext = envelope.asInstanceOf[ContextAware].traceContext
+
+ Trace.traceContext.withValue(msgContext) {
+ pjp.proceed()
+ }
+ }
+}
+
+@Aspect
+class EnvelopeTracingContext {
+
+ @DeclareMixin("akka.dispatch.Envelope")
+ def mixin: ContextAware = new ContextAware {
+ val traceContext: Option[TraceContext] = Trace.context()
+ }
+
+ @Pointcut("execution(akka.dispatch.ContextAware.new(..)) && this(ctx)")
+ def requestRecordInit(ctx: ContextAware): Unit = {}
+
+ @After("requestRecordInit(ctx)")
+ def whenCreatedRequestRecord(ctx: ContextAware): Unit = {
+ // Necessary to force the initialization of TracingAwareRequestContext at the moment of creation.
+ ctx.traceContext
+ }
+}
diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala
index 236fd4fc..3e5a7cce 100644
--- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala
+++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala
@@ -2,7 +2,7 @@ package kamon.trace.instrumentation
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import kamon.trace.TraceContext
+import kamon.trace.{TraceContext, Trace}
@Aspect
class RunnableTracing {
@@ -13,7 +13,7 @@ class RunnableTracing {
*/
@DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable {
- val traceContext: Option[TraceContext] = Tracer.traceContext.value
+ val traceContext: Option[TraceContext] = Trace.traceContext.value
}
@@ -40,7 +40,7 @@ class RunnableTracing {
def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = {
import pjp._
- Tracer.traceContext.withValue(runnable.traceContext) {
+ Trace.traceContext.withValue(runnable.traceContext) {
proceed()
}
}
diff --git a/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala b/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala
new file mode 100644
index 00000000..f5d88f06
--- /dev/null
+++ b/kamon-trace/src/test/scala/kamon/ActorInstrumentationSpec.scala
@@ -0,0 +1,94 @@
+package kamon
+
+import org.scalatest.{WordSpecLike, Matchers}
+import akka.actor.{ActorRef, Actor, Props, ActorSystem}
+
+import akka.testkit.{ImplicitSender, TestKit}
+import kamon.trace.Trace
+import akka.pattern.{pipe, ask}
+import akka.util.Timeout
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import akka.routing.RoundRobinRouter
+import kamon.trace.TraceContext
+
+
+class ActorInstrumentationSpec extends TestKit(ActorSystem("ActorInstrumentationSpec")) with WordSpecLike with Matchers with ImplicitSender {
+ implicit val executionContext = system.dispatcher
+
+ "an instrumented actor ref" when {
+ "used inside the context of a transaction" should {
+ "propagate the trace context using bang" in new TraceContextEchoFixture {
+ echo ! "test"
+
+ expectMsg(Some(testTraceContext))
+ }
+
+ "propagate the trace context using tell" in new TraceContextEchoFixture {
+ echo.tell("test", testActor)
+
+ expectMsg(Some(testTraceContext))
+ }
+
+ "propagate the trace context using ask" in new TraceContextEchoFixture {
+ implicit val timeout = Timeout(1 seconds)
+ (echo ? "test") pipeTo(testActor)
+
+ expectMsg(Some(testTraceContext))
+ }
+
+ "propagate the trace context to actors behind a router" in new RoutedTraceContextEchoFixture {
+ val contexts: Seq[Option[TraceContext]] = for(_ <- 1 to 10) yield Some(tellWithNewContext(echo, "test"))
+
+ expectMsgAllOf(contexts: _*)
+ }
+
+ /*"propagate with many asks" in {
+ val echo = system.actorOf(Props[TraceContextEcho])
+ val iterations = 50000
+ implicit val timeout = Timeout(10 seconds)
+
+ val futures = for(_ <- 1 to iterations) yield {
+ Tracer.start
+ val result = (echo ? "test")
+ Tracer.clear
+
+ result
+ }
+
+ val allResults = Await.result(Future.sequence(futures), 10 seconds)
+ assert(iterations == allResults.collect {
+ case Some(_) => 1
+ }.sum)
+ }*/
+ }
+ }
+
+ trait TraceContextEchoFixture {
+ val testTraceContext = Trace.newTraceContext()
+ val echo = system.actorOf(Props[TraceContextEcho])
+
+ Trace.set(testTraceContext)
+ }
+
+ trait RoutedTraceContextEchoFixture extends TraceContextEchoFixture {
+ override val echo = system.actorOf(Props[TraceContextEcho].withRouter(RoundRobinRouter(nrOfInstances = 10)))
+
+ def tellWithNewContext(target: ActorRef, message: Any): TraceContext = {
+ val context = Trace.newTraceContext()
+ Trace.set(context)
+
+ target ! message
+ context
+ }
+ }
+
+}
+
+class TraceContextEcho extends Actor {
+ def receive = {
+ case msg: String ⇒ sender ! Trace.context()
+ }
+}
+
+
diff --git a/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala b/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala
new file mode 100644
index 00000000..f968fa83
--- /dev/null
+++ b/kamon-trace/src/test/scala/kamon/RunnableInstrumentationSpec.scala
@@ -0,0 +1,85 @@
+package kamon
+
+import scala.concurrent.{Await, Promise, Future}
+import org.scalatest.{Matchers, OptionValues, WordSpec}
+import org.scalatest.concurrent.{ScalaFutures, PatienceConfiguration}
+import java.util.UUID
+import scala.util.Success
+import scala.concurrent.duration._
+import java.util.concurrent.TimeUnit
+import akka.actor.{Actor, ActorSystem}
+import kamon.trace.{Trace, TraceContext}
+
+
+class RunnableInstrumentationSpec extends WordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues {
+
+ "a instrumented runnable" when {
+ "created in a thread that does have a TraceContext" must {
+ "preserve the TraceContext" which {
+ "should be available during the run method execution" in new FutureWithContextFixture {
+
+/* whenReady(futureWithContext) { result =>
+ result.value should equal(testContext)
+ }*/
+ }
+
+ "should be available during the execution of onComplete callbacks" in new FutureWithContextFixture {
+
+ val onCompleteContext = Promise[Option[TraceContext]]()
+
+/* Tracer.traceContext.withValue(Some(testContext)) {
+ futureWithContext.onComplete({
+ case _ => println("Completing second promise from: "+Thread.currentThread().getName + " With Context: " + Tracer.traceContext.value); onCompleteContext.complete(Success(Tracer.traceContext.value))
+ })
+ }*/
+
+ whenReady(onCompleteContext.future) { result =>
+ result should equal(Some(testContext))
+ }
+ }
+ }
+ }
+
+ "created in a thread that doest have a TraceContext" must {
+ "not capture any TraceContext for the body execution" in new FutureWithoutContextFixture{
+ whenReady(futureWithoutContext) { result =>
+ result should equal(None)
+ }
+ }
+
+ "not make any TraceContext available during the onComplete callback" in new FutureWithoutContextFixture {
+ val onCompleteContext = Promise[Option[TraceContext]]()
+
+ futureWithoutContext.onComplete {
+ case _ => onCompleteContext.complete(Success(Trace.context()))
+ }
+
+ whenReady(onCompleteContext.future) { result =>
+ result should equal(None)
+ }
+ }
+ }
+ }
+
+
+ /**
+ * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have.
+ */
+ implicit val testActorSystem = ActorSystem("test-actorsystem")
+ implicit val execContext = testActorSystem.dispatcher
+
+ class FutureWithContextFixture {
+ val testContext = TraceContext(Actor.noSender, 1)
+
+/* var futureWithContext: Future[Option[TraceContext]] = _
+ Tracer.context.withValue(Some(testContext)) {
+ futureWithContext = Future { Tracer.traceContext.value }
+ }*/
+ }
+
+ trait FutureWithoutContextFixture {
+ val futureWithoutContext = Future { Trace.context.value }
+ }
+}
+
+
diff --git a/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala
new file mode 100644
index 00000000..a8e736ae
--- /dev/null
+++ b/kamon-trace/src/test/scala/kamon/TraceAggregatorSpec.scala
@@ -0,0 +1,37 @@
+package kamon
+
+import org.scalatest.{WordSpecLike, WordSpec}
+import akka.testkit.{TestKitBase, TestKit}
+import akka.actor.ActorSystem
+import scala.concurrent.duration._
+import kamon.trace.UowTracing.{Finish, Rename, Start}
+import kamon.trace.{UowTrace, UowTraceAggregator}
+
+class TraceAggregatorSpec extends TestKit(ActorSystem("TraceAggregatorSpec")) with WordSpecLike {
+
+ "a TraceAggregator" should {
+ "send a UowTrace message out after receiving a Finish message" in new AggregatorFixture {
+ within(1 second) {
+ aggregator ! Start(1)
+ aggregator ! Finish(1)
+
+ expectMsg(UowTrace("UNKNOWN", Seq(Start(1), Finish(1))))
+ }
+ }
+
+ "change the uow name after receiving a Rename message" in new AggregatorFixture {
+ within(1 second) {
+ aggregator ! Start(1)
+ aggregator ! Rename(1, "test-uow")
+ aggregator ! Finish(1)
+
+ expectMsg(UowTrace("test-uow", Seq(Start(1), Finish(1))))
+ }
+ }
+ }
+
+
+ trait AggregatorFixture {
+ val aggregator = system.actorOf(UowTraceAggregator.props(testActor, 10 seconds))
+ }
+}