From 5127c3bb83cd6fe90e071720d995cfb53d913e6a Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Mon, 4 Nov 2013 18:11:16 -0300 Subject: wip --- kamon-core/src/main/resources/META-INF/aop.xml | 2 +- kamon-core/src/main/scala/kamon/Kamon.scala | 47 ----------------- .../instrumentation/RunnableInstrumentation.scala | 59 ---------------------- .../src/main/scala/kamon/trace/UowTracing.scala | 57 --------------------- kamon-metrics/src/main/scala/kamon/Metrics.scala | 12 +++-- .../src/main/scala/kamon/trace/TraceContext.scala | 2 - .../src/main/scala/kamon/trace/Tracer.scala | 33 ++++++++++++ .../src/main/scala/kamon/trace/UowTracing.scala | 58 +++++++++++++++++++++ .../trace/instrumentation/RunnableTracing.scala | 55 ++++++++++++++++++++ project/Build.scala | 3 ++ 10 files changed, 157 insertions(+), 171 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/trace/UowTracing.scala create mode 100644 kamon-trace/src/main/scala/kamon/trace/UowTracing.scala create mode 100644 kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index f13effb9..c63e17e5 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -9,7 +9,7 @@ - + diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 75ef1efe..8c934f60 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -20,53 +20,6 @@ object Kamon { def apply[T <: Extension](key: ExtensionId[T])(implicit system: ActorSystem): ActorRef = key(system).manager - - - - - - implicit lazy val actorSystem = ActorSystem("kamon") - - object Metric { - - val actorSystems = TrieMap.empty[String, ActorSystemMetrics] - - def actorSystemNames: List[String] = actorSystems.keys.toList - def registerActorSystem(name: String) = actorSystems.getOrElseUpdate(name, ActorSystemMetrics(name)) - - def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) - } - - //val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") - //val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") - -} - - -class MetricManager extends Actor { - implicit val ec = context.system.dispatcher - - def receive = { - case RegisterForAllDispatchers(frequency) => { - val subscriber = sender - context.system.scheduler.schedule(frequency, frequency) { - Kamon.Metric.actorSystems.foreach { - case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach { - case (dispatcherName, dispatcherMetrics) => { - val activeThreads = dispatcherMetrics.activeThreadCount.snapshot - val poolSize = dispatcherMetrics.poolSize.snapshot - val queueSize = dispatcherMetrics.queueSize.snapshot - - subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize) - - } - } - } - } - } - } } -case class RegisterForAllDispatchers(frequency: FiniteDuration) -case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot) diff --git a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala deleted file mode 100644 index 2be6e5d1..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/RunnableInstrumentation.scala +++ /dev/null @@ -1,59 +0,0 @@ -package kamon.instrumentation - -import org.aspectj.lang.annotation._ -import kamon.{Tracer} -import org.aspectj.lang.ProceedingJoinPoint -import scala.Some -import kamon.trace.TraceContext - -/** - * Marker interface, just to make sure we don't instrument all the Runnables in the classpath. - */ -trait TraceContextAwareRunnable { - def traceContext: Option[TraceContext] -} - - -@Aspect -class RunnableInstrumentation { - - /** - * These are the Runnables that need to be instrumented and make the TraceContext available - * while their run method is executed. - */ - @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") - def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable { - val traceContext: Option[TraceContext] = Tracer.traceContext.value - } - - - /** - * Pointcuts - */ - - @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..)) && this(runnable)") - def instrumentedRunnableCreation(runnable: TraceContextAwareRunnable): Unit = {} - - @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable+.run()) && this(runnable)") - def runnableExecution(runnable: TraceContextAwareRunnable) = {} - - - - @After("instrumentedRunnableCreation(runnable)") - def beforeCreation(runnable: TraceContextAwareRunnable): Unit = { - // Force traceContext initialization. - runnable.traceContext - } - - - @Around("runnableExecution(runnable)") - def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = { - import pjp._ - - Tracer.traceContext.withValue(runnable.traceContext) { - proceed() - } - } - -} - diff --git a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala b/kamon-core/src/main/scala/kamon/trace/UowTracing.scala deleted file mode 100644 index b09478cc..00000000 --- a/kamon-core/src/main/scala/kamon/trace/UowTracing.scala +++ /dev/null @@ -1,57 +0,0 @@ -package kamon.trace - -import akka.actor._ -import scala.concurrent.duration.Duration -import kamon.trace.UowTracing._ - -sealed trait UowSegment { - def timestamp: Long -} - -trait AutoTimestamp extends UowSegment { - val timestamp = System.nanoTime -} - -object UowTracing { - case class Start() extends AutoTimestamp - case class Finish() extends AutoTimestamp - case class Rename(name: String) extends AutoTimestamp - case class WebExternalStart(id: Long, host: String) extends AutoTimestamp - case class WebExternalFinish(id: Long) extends AutoTimestamp - case class WebExternal(start: Long, finish: Long, host: String) extends AutoTimestamp -} - -case class UowTrace(name: String, segments: Seq[UowSegment]) - - -class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { - context.setReceiveTimeout(aggregationTimeout) - - var name: Option[String] = None - var segments: Seq[UowSegment] = Nil - - var pendingExternal = List[WebExternalStart]() - - def receive = { - case finish: Finish => segments = segments :+ finish; finishTracing() - case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes - case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => { - segments = segments :+ WebExternal(start.timestamp, finish.timestamp, start.host) - }) - case Rename(newName) => name = Some(newName) - case segment: UowSegment => segments = segments :+ segment - case ReceiveTimeout => - log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) - context.stop(self) - } - - def finishTracing(): Unit = { - reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments) - println("Recorded Segments: " + segments) - context.stop(self) - } -} - -object UowTraceAggregator { - def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout) -} \ No newline at end of file diff --git a/kamon-metrics/src/main/scala/kamon/Metrics.scala b/kamon-metrics/src/main/scala/kamon/Metrics.scala index 355c67c7..c3aedfd4 100644 --- a/kamon-metrics/src/main/scala/kamon/Metrics.scala +++ b/kamon-metrics/src/main/scala/kamon/Metrics.scala @@ -6,15 +6,17 @@ import akka.actor.{ActorRef, ExtendedActorSystem, ExtensionIdProvider, Extension import kamon.Kamon.Extension import akka.actor -class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - def manager: ActorRef = ??? -} - object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = Metrics def createExtension(system: ExtendedActorSystem): Extension = new MetricsExtension(system) - val registry = new MetricRegistry +} +class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + def manager: ActorRef = ??? } + + + + diff --git a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala index 78db911d..c3f1f2c2 100644 --- a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala @@ -9,8 +9,6 @@ import scala.concurrent.duration._ case class TraceContext(id: Long, tracer: ActorRef, uow: String = "", userContext: Option[Any] = None) object TraceContext { - val reporter = Kamon.actorSystem.actorOf(Props[NewRelicReporting]) - val traceIdCounter = new AtomicLong def apply()(implicit system: ActorSystem) = { val n = traceIdCounter.incrementAndGet() diff --git a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala b/kamon-trace/src/main/scala/kamon/trace/Tracer.scala index e64cfaa6..4ea89850 100644 --- a/kamon-trace/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-trace/src/main/scala/kamon/trace/Tracer.scala @@ -1,7 +1,40 @@ package kamon.trace +import kamon.Kamon import scala.util.DynamicVariable +import akka.actor._ +import scala.Some +import kamon.trace.Trace.Register +import scala.concurrent.duration._ +object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: Extension] = Trace + def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) + + + /*** Protocol */ + case object Register +} + +class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { + def manager: ActorRef = ??? +} + +class TraceManager extends Actor { + var listeners: Seq[ActorRef] = Seq.empty + + def receive = { + case Register => listeners = sender +: listeners + case segment: UowSegment => + context.child(segment.id.toString) match { + case Some(agreggator) => agreggator ! segment + case None => context.actorOf(UowTraceAggregator.props(self, 30 seconds)) + } + + case trace: UowTrace => + listeners foreach(_ ! trace) + } +} object Tracer { diff --git a/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala new file mode 100644 index 00000000..c7dd1fb1 --- /dev/null +++ b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala @@ -0,0 +1,58 @@ +package kamon.trace + +import akka.actor._ +import scala.concurrent.duration.Duration +import kamon.trace.UowTracing._ + +sealed trait UowSegment { + def id: Long + def timestamp: Long +} + +trait AutoTimestamp extends UowSegment { + val timestamp = System.nanoTime +} + +object UowTracing { + case class Start(id: Long) extends AutoTimestamp + case class Finish(id: Long) extends AutoTimestamp + case class Rename(id: Long, name: String) extends AutoTimestamp + case class WebExternalStart(id: Long, host: String) extends AutoTimestamp + case class WebExternalFinish(id: Long) extends AutoTimestamp + case class WebExternal(id: Long, start: Long, finish: Long, host: String) extends AutoTimestamp +} + +case class UowTrace(name: String, segments: Seq[UowSegment]) + + +class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { + context.setReceiveTimeout(aggregationTimeout) + + var name: Option[String] = None + var segments: Seq[UowSegment] = Nil + + var pendingExternal = List[WebExternalStart]() + + def receive = { + case finish: Finish => segments = segments :+ finish; finishTracing() + case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes + case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => { + segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host) + }) + case Rename(id, newName) => name = Some(newName) + case segment: UowSegment => segments = segments :+ segment + case ReceiveTimeout => + log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) + context.stop(self) + } + + def finishTracing(): Unit = { + reporting ! UowTrace(name.getOrElse("UNKNOWN"), segments) + println("Recorded Segments: " + segments) + context.stop(self) + } +} + +object UowTraceAggregator { + def props(reporting: ActorRef, aggregationTimeout: Duration) = Props(classOf[UowTraceAggregator], reporting, aggregationTimeout) +} \ No newline at end of file diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala new file mode 100644 index 00000000..236fd4fc --- /dev/null +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/RunnableTracing.scala @@ -0,0 +1,55 @@ +package kamon.trace.instrumentation + +import org.aspectj.lang.annotation._ +import org.aspectj.lang.ProceedingJoinPoint +import kamon.trace.TraceContext + +@Aspect +class RunnableTracing { + + /** + * These are the Runnables that need to be instrumented and make the TraceContext available + * while their run method is executed. + */ + @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") + def onCompleteCallbacksRunnable: TraceContextAwareRunnable = new TraceContextAwareRunnable { + val traceContext: Option[TraceContext] = Tracer.traceContext.value + } + + + /** + * Pointcuts + */ + + @Pointcut("execution(kamon.instrumentation.TraceContextAwareRunnable+.new(..)) && this(runnable)") + def instrumentedRunnableCreation(runnable: TraceContextAwareRunnable): Unit = {} + + @Pointcut("execution(* kamon.instrumentation.TraceContextAwareRunnable+.run()) && this(runnable)") + def runnableExecution(runnable: TraceContextAwareRunnable) = {} + + + + @After("instrumentedRunnableCreation(runnable)") + def beforeCreation(runnable: TraceContextAwareRunnable): Unit = { + // Force traceContext initialization. + runnable.traceContext + } + + + @Around("runnableExecution(runnable)") + def around(pjp: ProceedingJoinPoint, runnable: TraceContextAwareRunnable): Any = { + import pjp._ + + Tracer.traceContext.withValue(runnable.traceContext) { + proceed() + } + } + +} + +/** + * Marker interface, just to make sure we don't instrument all the Runnables in the classpath. + */ +trait TraceContextAwareRunnable { + def traceContext: Option[TraceContext] +} \ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index a874302f..acdd1791 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -30,6 +30,7 @@ object Build extends Build { libraryDependencies ++= compile(akkaActor, aspectJ) ++ test(scalatest, akkaTestKit, sprayTestkit)) + .dependsOn(kamonCore) lazy val kamonMetrics = Project("kamon-metrics", file("kamon-metrics")) .settings(basicSettings: _*) @@ -48,6 +49,7 @@ object Build extends Build { libraryDependencies ++= compile(akkaActor, aspectJ, sprayCan, sprayClient, sprayRouting) ++ test(scalatest, akkaTestKit, sprayTestkit)) + .dependsOn(kamonTrace) lazy val kamonNewrelic = Project("kamon-newrelic", file("kamon-newrelic")) @@ -57,6 +59,7 @@ object Build extends Build { libraryDependencies ++= compile(aspectJ, sprayCan, sprayClient, sprayRouting, newrelic) ++ test(scalatest, akkaTestKit, sprayTestkit)) + .dependsOn(kamonTrace) lazy val kamonPlayground = Project("kamon-playground", file("kamon-playground")) .settings(basicSettings: _*) -- cgit v1.2.3