aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-05-30 18:32:41 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-05-30 18:32:41 -0300
commit148827486f116c4196888022f04ad053f4fb6e99 (patch)
treee0cdf5982e778b2e49790bda1572a861884d83f2 /src
parent38316a2b0b3a58b81cf7458b0a719980136bbb97 (diff)
downloadKamon-148827486f116c4196888022f04ad053f4fb6e99.tar.gz
Kamon-148827486f116c4196888022f04ad053f4fb6e99.tar.bz2
Kamon-148827486f116c4196888022f04ad053f4fb6e99.zip
WIP - first functional implementation of TraceContext
Diffstat (limited to 'src')
-rw-r--r--src/main/resources/META-INF/aop.xml2
-rw-r--r--src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala14
-rw-r--r--src/main/scala/kamon/Aggregator.scala18
-rw-r--r--src/main/scala/kamon/Kamon.scala26
-rw-r--r--src/main/scala/kamon/TraceContext.scala37
-rw-r--r--src/main/scala/kamon/TraceContextSwap.scala26
-rw-r--r--src/main/scala/kamon/TransactionPublisher.scala15
-rw-r--r--src/main/scala/kamon/actor/TraceableActor.scala44
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala12
-rw-r--r--src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala (renamed from src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala)16
-rw-r--r--src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala (renamed from src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala)27
11 files changed, 123 insertions, 114 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index d97a00ea..41e9395f 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -12,7 +12,7 @@
<aspect name="akka.PoolMonitorAspect"/>-->
<aspect name="akka.instrumentation.ActorRefTellInstrumentation"/>
<aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/>
- <aspect name="kamon.instrumentation.PromiseCompletingRunnableInstrumentation" />
+ <aspect name="kamon.instrumentation.RunnableInstrumentation" />
<include within="*"/>
<exclude within="javax..*"/>
diff --git a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
index 9e816d11..00e4e066 100644
--- a/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
+++ b/src/main/scala/akka/instrumentation/ActorRefTellInstrumentation.scala
@@ -3,10 +3,12 @@ package akka.instrumentation
import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
import org.aspectj.lang.ProceedingJoinPoint
import akka.actor.{ActorRef}
-import kamon.TraceContext
-import kamon.actor.TraceableMessage
+import kamon.{Kamon, TraceContext}
import akka.dispatch.Envelope
+case class TraceableMessage(traceContext: TraceContext, message: Any)
+
+
@Aspect
class ActorRefTellInstrumentation {
println("Created ActorAspect")
@@ -18,9 +20,9 @@ class ActorRefTellInstrumentation {
def around(pjp: ProceedingJoinPoint, message: Any, sender: ActorRef): Unit = {
import pjp._
- TraceContext.current match {
+ Kamon.context match {
case Some(ctx) => {
- val traceableMessage = TraceableMessage(ctx.fork, message)
+ val traceableMessage = TraceableMessage(ctx, message)
proceed(getArgs.updated(0, traceableMessage))
}
case None => proceed
@@ -42,12 +44,12 @@ class ActorCellInvokeInstrumentation {
envelope match {
case Envelope(TraceableMessage(ctx, msg), sender) => {
- TraceContext.set(ctx)
+ Kamon.set(ctx)
val originalEnvelope = envelope.copy(message = msg)
proceed(getArgs.updated(0, originalEnvelope))
- TraceContext.clear
+ Kamon.clear
}
case _ => proceed
}
diff --git a/src/main/scala/kamon/Aggregator.scala b/src/main/scala/kamon/Aggregator.scala
deleted file mode 100644
index 441178df..00000000
--- a/src/main/scala/kamon/Aggregator.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package kamon
-
-import akka.actor.Actor
-import scala.collection.mutable
-
-class Aggregator extends Actor {
-
- val parts = mutable.LinkedList[TraceEntry]()
-
- def receive = {
- case ContextPart(ctx) => println("registering context information")
- case FinishAggregation() => println("report to newrelic")
- }
-
-}
-
-case class ContextPart(context: TraceContext)
-case class FinishAggregation()
diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala
new file mode 100644
index 00000000..b5998f81
--- /dev/null
+++ b/src/main/scala/kamon/Kamon.scala
@@ -0,0 +1,26 @@
+package kamon
+
+import akka.actor.{Props, ActorSystem}
+
+object Kamon {
+
+ implicit val actorSystem = ActorSystem("kamon")
+
+ private val ctx = new ThreadLocal[Option[TraceContext]] {
+ override def initialValue() = None
+ }
+
+ def context = ctx.get()
+ def clear = ctx.remove()
+ def set(traceContext: TraceContext) = ctx.set(Some(traceContext))
+
+ def start: Unit = set(newTraceContext)
+
+ def newTraceContext(): TraceContext = TraceContext()
+
+
+ val publisher = actorSystem.actorOf(Props[TransactionPublisher])
+
+ def publish(tx: FullTransaction) = publisher ! tx
+
+}
diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala
index e3582c60..19ebc578 100644
--- a/src/main/scala/kamon/TraceContext.scala
+++ b/src/main/scala/kamon/TraceContext.scala
@@ -1,29 +1,30 @@
package kamon
import java.util.UUID
-import akka.actor.ActorPath
-
-
-case class TraceContext(id: UUID, entries: List[TraceEntry]) {
- def fork = this.copy(entries = Nil)
- def withEntry(entry: TraceEntry) = this.copy(entries = entry :: entries)
+import akka.actor.{ActorSystem, ActorPath}
+import akka.agent.Agent
+import java.util.concurrent.TimeUnit
+import scala.util.{Failure, Success}
+import akka.util.Timeout
+
+
+case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], userContext: Option[Any] = None) {
+ implicit val timeout = Timeout(30, TimeUnit.SECONDS)
+ implicit val as = Kamon.actorSystem.dispatcher
+
+ def append(entry: TraceEntry) = entries send (entry :: _)
+ def close = entries.future.onComplete({
+ case Success(list) => Kamon.publish(FullTransaction(id, list))
+ case Failure(t) => println("WTF!")
+ })
}
object TraceContext {
- private val context = new ThreadLocal[Option[TraceContext]] {
- override def initialValue(): Option[TraceContext] = None
- }
-
- def current = context.get()
-
- def clear = context.remove()
+ def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil))
+}
- def set(ctx: TraceContext) = context.set(Some(ctx))
- def start = set(TraceContext(UUID.randomUUID(), Nil))
-}
trait TraceEntry
-case class MessageExecutionTime(actorPath: ActorPath, initiated: Long, ended: Long)
-case class CodeBlockExecutionTime(blockName: String, begin: Long, end: Long) extends TraceEntry
+case class CodeBlockExecutionTime(name: String, begin: Long, end: Long) extends TraceEntry
diff --git a/src/main/scala/kamon/TraceContextSwap.scala b/src/main/scala/kamon/TraceContextSwap.scala
new file mode 100644
index 00000000..68ee808b
--- /dev/null
+++ b/src/main/scala/kamon/TraceContextSwap.scala
@@ -0,0 +1,26 @@
+package kamon
+
+/**
+ * Provides support for making a TraceContext available as ThreadLocal and cleanning up afterwards.
+ */
+trait TraceContextSwap {
+
+ def withContext[A](ctx: Option[TraceContext], body: => A): A = withContext(ctx, body, body)
+
+ def withContext[A](ctx: Option[TraceContext], primary: => A, fallback: => A): A = {
+ ctx match {
+ case Some(context) => {
+ Kamon.set(context)
+ val bodyResult = primary
+ Kamon.clear
+
+ bodyResult
+ }
+ case None => fallback
+ }
+
+ }
+
+}
+
+object TraceContextSwap extends TraceContextSwap
diff --git a/src/main/scala/kamon/TransactionPublisher.scala b/src/main/scala/kamon/TransactionPublisher.scala
new file mode 100644
index 00000000..0626b91d
--- /dev/null
+++ b/src/main/scala/kamon/TransactionPublisher.scala
@@ -0,0 +1,15 @@
+package kamon
+
+import akka.actor.Actor
+import java.util.UUID
+
+class TransactionPublisher extends Actor {
+
+ def receive = {
+ case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries")
+ }
+
+}
+
+
+case class FullTransaction(id: UUID, entries: List[TraceEntry])
diff --git a/src/main/scala/kamon/actor/TraceableActor.scala b/src/main/scala/kamon/actor/TraceableActor.scala
deleted file mode 100644
index 3acbd293..00000000
--- a/src/main/scala/kamon/actor/TraceableActor.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-package kamon.actor
-
-import akka.actor.{ActorRef, Actor}
-import kamon.TraceContext
-
-trait TraceableActor extends Actor with TracingImplicitConversions {
-
- final def receive = {
- case a: Any => {
- a match {
- case TraceableMessage(ctx, message) => {
- //TraceContext.current.set(ctx)
-
- tracedReceive(message)
-
- //TraceContext.current.remove()
-
- /** Publish the partial context information to the EventStream */
- context.system.eventStream.publish(ctx)
- }
- case message: Any => tracedReceive(message)
- }
- }
- }
-
- def tracedReceive: Receive
-
-}
-
-class TraceableActorRef(val target: ActorRef) {
- def !! (message: Any)(implicit sender: ActorRef) = {
- val traceableMessage = TraceableMessage(TraceContext.current.get.fork, message)
- target.tell(traceableMessage, sender)
- }
-}
-
-
-
-trait TracingImplicitConversions {
- implicit def fromActorRefToTraceableActorRef(actorRef: ActorRef) = new TraceableActorRef(actorRef)
-}
-
-case class TraceableMessage(traceContext: TraceContext, message: Any)
-
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
index ebaff7eb..d83f2ac6 100644
--- a/src/main/scala/kamon/executor/eventbus.scala
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -5,7 +5,7 @@ import akka.event.LookupClassification
import akka.actor._
import java.util.concurrent.TimeUnit
-import kamon.{TraceContext}
+import kamon.{Kamon, TraceContext}
import akka.util.Timeout
import scala.util.Success
import scala.util.Failure
@@ -41,7 +41,7 @@ class PingActor(val target: ActorRef) extends Actor with ActorLogging {
def receive = {
case Pong() => {
- log.info(s"pong with context ${TraceContext.current}")
+ log.info(s"pong with context ${Kamon.context}")
Thread.sleep(1000)
sender ! Ping()
}
@@ -57,7 +57,7 @@ class PongActor extends Actor with ActorLogging {
case Ping() => {
Thread.sleep(3000)
sender ! Pong()
- log.info(s"ping with context ${TraceContext.current}")
+ log.info(s"ping with context ${Kamon.context}")
}
case a: Any => println(s"Got ${a} in PONG")
}
@@ -78,7 +78,7 @@ object TryAkka extends App{
- def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${TraceContext.current}] : $body")
+ def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
/*
val newRelicReporter = new NewRelicReporter(registry)
@@ -109,10 +109,12 @@ object TryAkka extends App{
})*/
//}
- TraceContext.start
+ Kamon.start
threadPrintln("Before doing it")
val f = Future { threadPrintln("This is happening inside the future body") }
+ Kamon.context.get.close
+
/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
diff --git a/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
index ce19a7e6..ef908625 100644
--- a/src/main/scala/kamon/instrumentation/PromiseCompletingRunnableInstrumentation.scala
+++ b/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala
@@ -1,7 +1,7 @@
package kamon.instrumentation
import org.aspectj.lang.annotation._
-import kamon.TraceContext
+import kamon.{Kamon, TraceContext}
import org.aspectj.lang.ProceedingJoinPoint
import scala.Some
@@ -12,7 +12,7 @@ trait TraceContextAwareRunnable extends Runnable {}
@Aspect("perthis(instrumentedRunnableCreation())")
-class PromiseCompletingRunnableInstrumentation {
+class RunnableInstrumentation {
/**
* These are the Runnables that need to be instrumented and make the TraceContext available
@@ -37,25 +37,19 @@ class PromiseCompletingRunnableInstrumentation {
* Aspect members
*/
- private val traceContext = TraceContext.current
+ private val traceContext = Kamon.context
/**
* Advices
*/
+ import kamon.TraceContextSwap.withContext
@Around("runnableExecution()")
def around(pjp: ProceedingJoinPoint) = {
import pjp._
- traceContext match {
- case Some(ctx) => {
- TraceContext.set(ctx)
- proceed()
- TraceContext.clear
- }
- case None => proceed()
- }
+ withContext(traceContext, proceed())
}
}
diff --git a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala b/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
index 44f92148..f2e83824 100644
--- a/src/test/scala/kamon/instrumentation/FutureInstrumentationSpec.scala
+++ b/src/test/scala/kamon/instrumentation/RunnableInstrumentationSpec.scala
@@ -5,19 +5,20 @@ import scala.concurrent.ExecutionContext.Implicits.global
import org.scalatest.{OptionValues, WordSpec}
import org.scalatest.matchers.MustMatchers
import org.scalatest.concurrent.PatienceConfiguration
-import kamon.TraceContext
+import kamon.{Kamon, TraceContext}
import java.util.UUID
import scala.util.Success
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit
+import akka.actor.ActorSystem
-class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration with OptionValues {
+class RunnableInstrumentationSpec extends WordSpec with MustMatchers with ScalaFutures with PatienceConfiguration with OptionValues {
- "a instrumented Future" when {
+ "a instrumented runnable" when {
"created in a thread that does have a TraceContext" must {
"preserve the TraceContext" which {
- "should be available during the body's execution" in { new FutureWithContext {
+ "should be available during the run method execution" in { new FutureWithContext {
whenReady(futureWithContext) { result =>
result.value must be === testContext
@@ -28,7 +29,7 @@ class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFut
val onCompleteContext = Promise[TraceContext]()
futureWithContext.onComplete({
- case _ => onCompleteContext.complete(Success(TraceContext.current.get))
+ case _ => onCompleteContext.complete(Success(Kamon.context.get))
})
whenReady(onCompleteContext.future) { result =>
@@ -50,7 +51,7 @@ class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFut
val onCompleteContext = Promise[Option[TraceContext]]()
futureWithoutContext.onComplete({
- case _ => onCompleteContext.complete(Success(TraceContext.current))
+ case _ => onCompleteContext.complete(Success(Kamon.context))
})
whenReady(onCompleteContext.future) { result =>
@@ -61,18 +62,22 @@ class FutureInstrumentationSpec extends WordSpec with MustMatchers with ScalaFut
}
+ /**
+ * We are using Futures for the test since they exercise Runnables in the back and also resemble the real use case we have.
+ */
trait FutureWithContext {
- val testContext = TraceContext(UUID.randomUUID(), Nil)
- TraceContext.set(testContext)
+ implicit val as = ActorSystem("test-actorsystem")
+ val testContext = TraceContext()
+ Kamon.set(testContext)
- val futureWithContext = Future { TraceContext.current }
+ val futureWithContext = Future { Kamon.context}
}
trait FutureWithoutContext {
- TraceContext.clear // Make sure no TraceContext is available
- val futureWithoutContext = Future { TraceContext.current }
+ Kamon.clear // Make sure no TraceContext is available
+ val futureWithoutContext = Future { Kamon.context }
}
}