aboutsummaryrefslogtreecommitdiff
path: root/kamon-trace/src/main
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-11-04 18:11:16 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-11-04 18:11:16 -0300
commit5127c3bb83cd6fe90e071720d995cfb53d913e6a (patch)
treea29cc7be7f729566f3a5a2fc3bc8044c7f47f3cc /kamon-trace/src/main
parentca1e93621ddad4b9f2a9028ea183b1c2f4c25a27 (diff)
downloadKamon-5127c3bb83cd6fe90e071720d995cfb53d913e6a.tar.gz
Kamon-5127c3bb83cd6fe90e071720d995cfb53d913e6a.tar.bz2
Kamon-5127c3bb83cd6fe90e071720d995cfb53d913e6a.zip
wip
Diffstat (limited to 'kamon-trace/src/main')
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/TraceContext.scala2
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/Tracer.scala33
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/UowTracing.scala58
-rw-r--r--kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala55
4 files changed, 146 insertions, 2 deletions
diff --git a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala
index 78db911d..c3f1f2c2 100644
--- a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala
@@ -9,8 +9,6 @@ import scala.concurrent.duration._
case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None)
object TraceContext {
- val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting])
- val traceIdCounter = new AtomicLong
def apply()(implicit system: ActorSystem) = {
val n = traceIdCounter.incrementAndGet()
diff --git a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala b/kamon-trace/src/main/scala/kamon/trace/Tracer.scala
index e64cfaa6..4ea89850 100644
--- a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-trace/src/main/scala/kamon/trace/Tracer.scala
@@ -1,7 +1,40 @@
package kamon.trace
+import kamon.Kamon
import scala.util.DynamicVariable
+import akka.actor._
+import scala.Some
+import kamon.trace.Trace.Register
+import scala.concurrent.duration._
+object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: Extension] = Trace
+ def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system)
+
+
+ /*** Protocol */
+ case object Register
+}
+
+class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ def manager: ActorRef = ???
+}
+
+class TraceManager extends Actor {
+ var listeners: Seq[ActorRef] = Seq.empty
+
+ def receive = {
+ case Register => listeners = sender +: listeners
+ case segment: UowSegment =>
+ context.child(segment.id.toString) match {
+ case Some(agreggator) => agreggator ! segment
+ case None => context.actorOf(UowTraceAggregator.props(self, 30 seconds))
+ }
+
+ case trace: UowTrace =>
+ listeners foreach(_ ! trace)
+ }
+}
object Tracer {
diff --git a/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala
new file mode 100644
index 00000000..c7dd1fb1
--- /dev/null
+++ b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala
@@ -0,0 +1,58 @@
+package kamon.trace
+
+import akka.actor._
+import scala.concurrent.duration.Duration
+import kamon.trace.UowTracing._
+
+sealed trait UowSegment {
+ def id: Long
+ def timestamp: Long
+}
+
+trait AutoTimestamp extends UowSegment {
+ val timestamp = System.nanoTime
+}
+
+object UowTracing {
+ case class Start(id: Long) extends AutoTimestamp
+ case class Finish(id: Long) extends AutoTimestamp
+ case class Rename(id: Long, name: String) extends AutoTimestamp
+ case class WebExternalStart(id: Long, host: String) extends AutoTimestamp
+ case class WebExternalFinish(id: Long) extends AutoTimestamp
+ case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp
+}
+
+case class UowTrace(name: String, segments: Seq[UowSegment])
+
+
+class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging {
+ context.setReceiveTimeout(aggregationTimeout)
+
+ var name: Option[String] = None
+ var segments: Seq[UowSegment] = Nil
+
+ var pendingExternal = List[WebExternalStart]()
+
+ def receive = {
+ case finish: Finish => segments = segments :+ finish; finishTracing()
+ case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes
+ case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => {
+ segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host)
+ })
+ case Rename(id, newName) => name = Some(newName)
+ case segment: UowSegment => segments = segments :+ segment
+ case ReceiveTimeout =>
+ log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments)
+ context.stop(self)
+ }
+
+ def finishTracing(): Unit = {
+ reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments)
+ println("Recorded Segments: " + segments)
+ context.stop(self)
+ }
+}
+
+object UowTraceAggregator {
+ def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout)
+} \ No newline at end of file
diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala
new file mode 100644
index 00000000..236fd4fc
--- /dev/null
+++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala
@@ -0,0 +1,55 @@
+package kamon.trace.instrumentation
+
+import org.aspectj.lang.annotation._
+import org.aspectj.lang.ProceedingJoinPoint
+import kamon.trace.TraceContext
+
+@Aspect
+class RunnableTracing {
+
+ /**
+ * These are the Runnables that need to be instrumented and make the TraceContext available
+ * while their run method is executed.
+ */
+ @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable")
+ def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable {
+ val traceContext: Option[TraceContext] = Tracer.traceContext.value
+ }
+
+
+ /**
+ * Pointcuts
+ */
+
+ @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..)) && this(runnable)")
+ def instrumentedRunnableCreation(runnable: TraceContextAwareRunnable): Unit = {}
+
+ @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable+.run()) && this(runnable)")
+ def runnableExecution(runnable: TraceContextAwareRunnable) = {}
+
+
+
+ @After("instrumentedRunnableCreation(runnable)")
+ def beforeCreation(runnable: TraceContextAwareRunnable): Unit = {
+ // Force traceContext initialization.
+ runnable.traceContext
+ }
+
+
+ @Around("runnableExecution(runnable)")
+ def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = {
+ import pjp._
+
+ Tracer.traceContext.withValue(runnable.traceContext) {
+ proceed()
+ }
+ }
+
+}
+
+/**
+ * Marker interface, just to make sure we don't instrument all the Runnables in the classpath.
+ */
+trait TraceContextAwareRunnable {
+ def traceContext: Option[TraceContext]
+} \ No newline at end of file