From 5127c3bb83cd6fe90e071720d995cfb53d913e6a Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Mon, 4 Nov 2013 18:11:16 -0300 Subject: wip --- .../src/main/scala/kamon/trace/TraceContext.scala | 2 - .../src/main/scala/kamon/trace/Tracer.scala | 33 ++++++++++++ .../src/main/scala/kamon/trace/UowTracing.scala | 58 ++++++++++++++++++++++ .../trace/instrumentation/RunnableTracing.scala | 55 ++++++++++++++++++++ 4 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 kamon-trace/src/main/scala/kamon/trace/UowTracing.scala create mode 100644 kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala (limited to 'kamon-trace') diff --git a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala index 78db911d..c3f1f2c2 100644 --- a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala @@ -9,8 +9,6 @@ import scala.concurrent.duration._ case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None) object TraceContext { - val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting]) - val traceIdCounter = new AtomicLong def apply()(implicit system: ActorSystem) = { val n = traceIdCounter.incrementAndGet() diff --git a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala b/kamon-trace/src/main/scala/kamon/trace/Tracer.scala index e64cfaa6..4ea89850 100644 --- a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-trace/src/main/scala/kamon/trace/Tracer.scala @@ -1,7 +1,40 @@ package kamon.trace +import kamon.Kamon import scala.util.DynamicVariable +import akka.actor._ +import scala.Some +import kamon.trace.Trace.Register +import scala.concurrent.duration._ +object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: Extension] = Trace + def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) + + + /*** Protocol */ + case object Register +} + +class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { + def manager: ActorRef = ??? +} + +class TraceManager extends Actor { + var listeners: Seq[ActorRef] = Seq.empty + + def receive = { + case Register => listeners = sender +: listeners + case segment: UowSegment => + context.child(segment.id.toString) match { + case Some(agreggator) => agreggator ! segment + case None => context.actorOf(UowTraceAggregator.props(self, 30 seconds)) + } + + case trace: UowTrace => + listeners foreach(_ ! trace) + } +} object Tracer { diff --git a/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala new file mode 100644 index 00000000..c7dd1fb1 --- /dev/null +++ b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala @@ -0,0 +1,58 @@ +package kamon.trace + +import akka.actor._ +import scala.concurrent.duration.Duration +import kamon.trace.UowTracing._ + +sealed trait UowSegment { + def id: Long + def timestamp: Long +} + +trait AutoTimestamp extends UowSegment { + val timestamp = System.nanoTime +} + +object UowTracing { + case class Start(id: Long) extends AutoTimestamp + case class Finish(id: Long) extends AutoTimestamp + case class Rename(id: Long, name: String) extends AutoTimestamp + case class WebExternalStart(id: Long, host: String) extends AutoTimestamp + case class WebExternalFinish(id: Long) extends AutoTimestamp + case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp +} + +case class UowTrace(name: String, segments: Seq[UowSegment]) + + +class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { + context.setReceiveTimeout(aggregationTimeout) + + var name: Option[String] = None + var segments: Seq[UowSegment] = Nil + + var pendingExternal = List[WebExternalStart]() + + def receive = { + case finish: Finish => segments = segments :+ finish; finishTracing() + case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes + case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => { + segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host) + }) + case Rename(id, newName) => name = Some(newName) + case segment: UowSegment => segments = segments :+ segment + case ReceiveTimeout => + log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) + context.stop(self) + } + + def finishTracing(): Unit = { + reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments) + println("Recorded Segments: " + segments) + context.stop(self) + } +} + +object UowTraceAggregator { + def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout) +} \ No newline at end of file diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala new file mode 100644 index 00000000..236fd4fc --- /dev/null +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala @@ -0,0 +1,55 @@ +package kamon.trace.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import kamon.trace.TraceContext + +@Aspect +class RunnableTracing { + + /** + * These are the Runnables that need to be instrumented and make the TraceContext available + * while their run method is executed. + */ + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable { + val traceContext: Option[TraceContext] = Tracer.traceContext.value + } + + + /** + * Pointcuts + */ + + @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..)) && this(runnable)") + def instrumentedRunnableCreation(runnable: TraceContextAwareRunnable): Unit = {} + + @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable+.run()) && this(runnable)") + def runnableExecution(runnable: TraceContextAwareRunnable) = {} + + + + @After("instrumentedRunnableCreation(runnable)") + def beforeCreation(runnable: TraceContextAwareRunnable): Unit = { + // Force traceContext initialization. + runnable.traceContext + } + + + @Around("runnableExecution(runnable)") + def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = { + import pjp._ + + Tracer.traceContext.withValue(runnable.traceContext) { + proceed() + } + } + +} + +/** + * Marker interface, just to make sure we don't instrument all the Runnables in the classpath. + */ +trait TraceContextAwareRunnable { + def traceContext: Option[TraceContext] +} \ No newline at end of file -- cgit v1.2.3