From a0a57b110a3ee4876797ab51c4758525d166796f Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 30 Jan 2014 18:23:33 -0300 Subject: wip --- kamon-core/src/main/resources/reference.conf | 24 +++- .../ActorMessagePassingTracing.scala | 24 ++-- .../ActorSystemMessagePassingTracing.scala | 24 ++-- .../instrumentation/ActorLoggingTracing.scala | 10 +- .../kamon/instrumentation/FutureTracing.scala | 14 +-- .../main/scala/kamon/metrics/ActorMetrics.scala | 8 +- .../src/main/scala/kamon/metrics/Metrics.scala | 8 +- .../scala/kamon/metrics/MetricsExtension.scala | 6 +- .../main/scala/kamon/metrics/TraceMetrics.scala | 65 +++++++++++ .../src/main/scala/kamon/trace/Segments.scala | 5 +- kamon-core/src/main/scala/kamon/trace/Trace.scala | 22 ++-- .../src/main/scala/kamon/trace/TraceContext.scala | 17 +-- .../src/main/scala/kamon/trace/TraceCtx.scala | 125 +++++++++++++++++++++ .../trace/logging/LogbackTraceTokenConverter.scala | 4 +- .../test/scala/akka/testkit/TestProbeTracing.scala | 12 +- 15 files changed, 278 insertions(+), 90 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/TraceCtx.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index f05c11f6..e6124365 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -9,6 +9,12 @@ kamon { excludes = [ "system/*", "user/IO-*" ] } }, + { + trace { + includes = [ "**" ] + excludes = [] + } + }, { dispatcher { includes = [ "default-dispatcher" ] @@ -19,8 +25,8 @@ kamon { - actors { - hdr-settings { + precision { + actor { processing-time { highest-trackable-value = 3600000000000 significant-value-digits = 2 @@ -34,7 +40,21 @@ kamon { significant-value-digits = 2 } } + + trace { + elapsed-time { + highest-trackable-value = 3600000000000 + significant-value-digits = 2 + } + segments { + highest-trackable-value = 3600000000000 + significant-value-digits = 2 + } + } } + + + } trace { diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index d43de311..98700974 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -19,10 +19,9 @@ import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor._ import akka.dispatch.{ Envelope, MessageDispatcher } -import kamon.trace.{ TraceContext, ContextAware, Trace } +import kamon.trace._ import kamon.metrics.{ ActorMetrics, Metrics } import kamon.Kamon -import kamon.metrics.ActorMetrics.ActorMetricRecorder import kamon.trace.TraceContext import kamon.metrics.ActorMetrics.ActorMetricRecorder @@ -48,15 +47,15 @@ class BehaviourInvokeTracing { @Around("invokingActorBehaviourAtActorCell(cell, envelope)") def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Unit = { val timestampBeforeProcessing = System.nanoTime() - val contextAndTimestamp = envelope.asInstanceOf[ContextAndTimestampAware] + val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] - Trace.withContext(contextAndTimestamp.traceContext) { + TraceRecorder.withContext(contextAndTimestamp.traceContext) { pjp.proceed() } actorMetrics.map { am ⇒ am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) - am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.timestamp) + am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureMark) am.mailboxSize.record(cell.numberOfMessages) } } @@ -74,21 +73,14 @@ class BehaviourInvokeTracing { class EnvelopeTraceContextMixin { @DeclareMixin("akka.dispatch.Envelope") - def mixin: ContextAndTimestampAware = new ContextAndTimestampAware { - val traceContext: Option[TraceContext] = Trace.context() - val timestamp: Long = System.nanoTime() - } + def mixin: TraceContextAware = new TraceContextAware {} @Pointcut("execution(akka.dispatch.Envelope.new(..)) && this(ctx)") - def envelopeCreation(ctx: ContextAware): Unit = {} + def envelopeCreation(ctx: TraceContextAware): Unit = {} @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: ContextAware): Unit = { + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { // Necessary to force the initialization of ContextAware at the moment of creation. ctx.traceContext } -} - -trait ContextAndTimestampAware extends ContextAware { - def timestamp: Long -} +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala index 7d26016e..d4f8f769 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorSystemMessagePassingTracing.scala @@ -1,21 +1,21 @@ package akka.instrumentation import org.aspectj.lang.annotation._ -import kamon.trace.{ Trace, ContextAware } import akka.dispatch.sysmsg.EarliestFirstSystemMessageList import org.aspectj.lang.ProceedingJoinPoint +import kamon.trace.{TraceRecorder, TraceContextAware} @Aspect class SystemMessageTraceContextMixin { @DeclareMixin("akka.dispatch.sysmsg.SystemMessage+") - def mixin: ContextAware = ContextAware.default + def mixin: TraceContextAware = new TraceContextAware {} @Pointcut("execution(akka.dispatch.sysmsg.SystemMessage+.new(..)) && this(ctx)") - def envelopeCreation(ctx: ContextAware): Unit = {} + def envelopeCreation(ctx: TraceContextAware): Unit = {} @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: ContextAware): Unit = { + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { // Necessary to force the initialization of ContextAware at the moment of creation. ctx.traceContext } @@ -25,23 +25,23 @@ class SystemMessageTraceContextMixin { class RepointableActorRefTraceContextMixin { @DeclareMixin("akka.actor.RepointableActorRef") - def mixin: ContextAware = ContextAware.default + def mixin: TraceContextAware = new TraceContextAware {} @Pointcut("execution(akka.actor.RepointableActorRef.new(..)) && this(ctx)") - def envelopeCreation(ctx: ContextAware): Unit = {} + def envelopeCreation(ctx: TraceContextAware): Unit = {} @After("envelopeCreation(ctx)") - def afterEnvelopeCreation(ctx: ContextAware): Unit = { + def afterEnvelopeCreation(ctx: TraceContextAware): Unit = { // Necessary to force the initialization of ContextAware at the moment of creation. ctx.traceContext } @Pointcut("execution(* akka.actor.RepointableActorRef.point(..)) && this(repointableActorRef)") - def repointableActorRefCreation(repointableActorRef: ContextAware): Unit = {} + def repointableActorRefCreation(repointableActorRef: TraceContextAware): Unit = {} @Around("repointableActorRefCreation(repointableActorRef)") - def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: ContextAware): Any = { - Trace.withContext(repointableActorRef.traceContext) { + def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { + TraceRecorder.withContext(repointableActorRef.traceContext) { pjp.proceed() } } @@ -57,8 +57,8 @@ class ActorSystemMessagePassingTracing { @Around("systemMessageProcessing(messages)") def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { if (messages.nonEmpty) { - val ctx = messages.head.asInstanceOf[ContextAware].traceContext - Trace.withContext(ctx)(pjp.proceed()) + val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext + TraceRecorder.withContext(ctx)(pjp.proceed()) } else pjp.proceed() } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala index 297017cf..abd3514e 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ActorLoggingTracing.scala @@ -17,20 +17,20 @@ package kamon.instrumentation import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ ContextAware, Trace } +import kamon.trace.{ TraceContextAware, TraceRecorder } @Aspect class ActorLoggingTracing { @DeclareMixin("akka.event.Logging.LogEvent+") - def mixin: ContextAware = ContextAware.default + def mixin: TraceContextAware = new TraceContextAware {} @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = {} + def withMdcInvocation(logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = {} @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = { - Trace.withContext(logEvent.traceContext) { + def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { + TraceRecorder.withContext(logEvent.traceContext) { pjp.proceed() } } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala index 5600d582..b8725dd7 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/FutureTracing.scala @@ -17,29 +17,29 @@ package kamon.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ ContextAware, TraceContext, Trace } +import kamon.trace.{ TraceContextAware, TraceRecorder } @Aspect class FutureTracing { @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") - def mixin: ContextAware = ContextAware.default + def mixin: TraceContextAware = new TraceContextAware {} @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") - def futureRelatedRunnableCreation(runnable: ContextAware): Unit = {} + def futureRelatedRunnableCreation(runnable: TraceContextAware): Unit = {} @After("futureRelatedRunnableCreation(runnable)") - def afterCreation(runnable: ContextAware): Unit = { + def afterCreation(runnable: TraceContextAware): Unit = { // Force traceContext initialization. runnable.traceContext } @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") - def futureRelatedRunnableExecution(runnable: ContextAware) = {} + def futureRelatedRunnableExecution(runnable: TraceContextAware) = {} @Around("futureRelatedRunnableExecution(runnable)") - def aroundExecution(pjp: ProceedingJoinPoint, runnable: ContextAware): Any = { - Trace.withContext(runnable.traceContext) { + def aroundExecution(pjp: ProceedingJoinPoint, runnable: TraceContextAware): Any = { + TraceRecorder.withContext(runnable.traceContext) { pjp.proceed() } } diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala index 96d2cd48..e588449e 100644 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala @@ -20,8 +20,8 @@ import com.typesafe.config.Config import org.HdrHistogram.HighDynamicRangeRecorder object ActorMetrics extends MetricGroupIdentity.Category with MetricGroupFactory { - val name: String = "actor" - type Group = ActorMetricRecorder + type GroupRecorder = ActorMetricRecorder + val entityName = "actor" case object ProcessingTime extends MetricIdentity { val name = "ProcessingTime" } case object MailboxSize extends MetricIdentity { val name = "MailboxSize" } @@ -53,12 +53,12 @@ object ActorMetrics extends MetricGroupIdentity.Category with MetricGroupFactory def create(config: Config): ActorMetricRecorder = { import HighDynamicRangeRecorder.Configuration - val settings = config.getConfig("kamon.metrics.actors.hdr-settings") + val settings = config.getConfig("kamon.metrics.precision.actor") val processingTimeHdrConfig = Configuration.fromConfig(settings.getConfig("processing-time")) val mailboxSizeHdrConfig = Configuration.fromConfig(settings.getConfig("mailbox-size")) val timeInMailboxHdrConfig = Configuration.fromConfig(settings.getConfig("time-in-mailbox")) - ActorMetricRecorder( + new ActorMetricRecorder( HighDynamicRangeRecorder(processingTimeHdrConfig), HighDynamicRangeRecorder(mailboxSizeHdrConfig), HighDynamicRangeRecorder(timeInMailboxHdrConfig)) diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala index 61f79a29..a3d7af87 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala @@ -96,17 +96,17 @@ case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: object MetricGroupIdentity { trait Category { - def name: String + def entityName: String } val AnyCategory = new Category { - def name: String = "match-all" + val entityName: String = "match-all" override def equals(that: Any): Boolean = that.isInstanceOf[Category] } } trait MetricGroupFactory { - type Group <: MetricGroupRecorder - def create(config: Config): Group + type GroupRecorder <: MetricGroupRecorder + def create(config: Config): GroupRecorder } diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala index 4d7ff354..a717e25a 100644 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala @@ -33,9 +33,9 @@ class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension val filters = loadFilters(config) lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") - def register(name: String, category: MetricGroupIdentity.Category with MetricGroupFactory): Option[category.Group] = { + def register(name: String, category: MetricGroupIdentity.Category with MetricGroupFactory): Option[category.GroupRecorder] = { if (shouldTrack(name, category)) - Some(storage.getOrElseUpdate(MetricGroupIdentity(name, category), category.create(config)).asInstanceOf[category.Group]) + Some(storage.getOrElseUpdate(MetricGroupIdentity(name, category), category.create(config)).asInstanceOf[category.GroupRecorder]) else None } @@ -53,7 +53,7 @@ class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension } private def shouldTrack(name: String, category: MetricGroupIdentity.Category): Boolean = { - filters.get(category.name).map(filter ⇒ filter.accept(name)).getOrElse(false) + filters.get(category.entityName).map(filter ⇒ filter.accept(name)).getOrElse(false) } def loadFilters(config: Config): Map[String, MetricGroupFilter] = { diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala new file mode 100644 index 00000000..25ebce00 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala @@ -0,0 +1,65 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.HdrHistogram.HighDynamicRangeRecorder +import scala.collection.concurrent.TrieMap +import com.typesafe.config.Config + +object TraceMetrics extends MetricGroupIdentity.Category with MetricGroupFactory { + type GroupRecorder = TraceMetricRecorder + val entityName = "trace" + + case object ElapsedTime extends MetricIdentity { + val name = "ElapsedTime" + } + + case class HttpClientRequest(name: String) extends MetricIdentity + + class TraceMetricRecorder(val elapsedTime: HighDynamicRangeRecorder, private val segmentRecorderFactory: () ⇒ HighDynamicRangeRecorder) + extends MetricGroupRecorder { + + private val segments = TrieMap[MetricIdentity, HighDynamicRangeRecorder]() + + def record(identity: MetricIdentity, value: Long): Unit = identity match { + case ElapsedTime ⇒ elapsedTime.record(value) + case id: MetricIdentity ⇒ segments.getOrElseUpdate(id, segmentRecorderFactory.apply()).record(value) + } + + def collect: MetricGroupSnapshot = TraceMetricSnapshot(elapsedTime.collect(), + segments.map { case (identity, recorder) ⇒ (identity, recorder.collect()) }.toMap) + } + + case class TraceMetricSnapshot(elapsedTime: MetricSnapshot, segments: Map[MetricIdentity, MetricSnapshot]) + extends MetricGroupSnapshot { + + def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) + } + + def create(config: Config): TraceMetricRecorder = { + import HighDynamicRangeRecorder.Configuration + + val settings = config.getConfig("kamon.metrics.precision.trace") + val elapsedTimeHdrConfig = Configuration.fromConfig(settings.getConfig("elapsed-time")) + val segmentHdrConfig = Configuration.fromConfig(settings.getConfig("segment")) + + new TraceMetricRecorder( + HighDynamicRangeRecorder(elapsedTimeHdrConfig), + () ⇒ HighDynamicRangeRecorder(segmentHdrConfig)) + } + +} diff --git a/kamon-core/src/main/scala/kamon/trace/Segments.scala b/kamon-core/src/main/scala/kamon/trace/Segments.scala index 0bc68ee7..e6d9745b 100644 --- a/kamon-core/src/main/scala/kamon/trace/Segments.scala +++ b/kamon-core/src/main/scala/kamon/trace/Segments.scala @@ -16,7 +16,8 @@ package kamon.trace -import kamon.trace.Trace.SegmentCompletionHandle + +import kamon.trace.TraceOld.SegmentCompletionHandle object Segments { @@ -34,5 +35,5 @@ object Segments { var completionHandle: Option[SegmentCompletionHandle] } - trait ContextAndSegmentCompletionAware extends ContextAware with SegmentCompletionHandleAware + trait ContextAndSegmentCompletionAware extends TraceContextAware with SegmentCompletionHandleAware } diff --git a/kamon-core/src/main/scala/kamon/trace/Trace.scala b/kamon-core/src/main/scala/kamon/trace/Trace.scala index 7dd3a6f8..bdfd6aa3 100644 --- a/kamon-core/src/main/scala/kamon/trace/Trace.scala +++ b/kamon-core/src/main/scala/kamon/trace/Trace.scala @@ -18,14 +18,14 @@ package kamon.trace import kamon.Kamon import akka.actor._ import scala.Some -import kamon.trace.Trace.Register +import kamon.trace.TraceOld.Register import scala.concurrent.duration._ import java.util.concurrent.atomic.AtomicLong import scala.util.Try import java.net.InetAddress -object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: Extension] = Trace +object TraceOld extends ExtensionId[TraceExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: Extension] = TraceOld def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) /*** Protocol */ @@ -33,16 +33,16 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { /** User API */ //private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None) - private[trace] val traceContext = new ThreadLocal[Option[TraceContext]] { - override def initialValue(): Option[TraceContext] = None + private[trace] val traceContext = new ThreadLocal[Option[TraceContextOld]] { + override def initialValue(): Option[TraceContextOld] = None } private[trace] val tranid = new AtomicLong() def context() = traceContext.get - private def set(ctx: Option[TraceContext]) = traceContext.set(ctx) + private def set(ctx: Option[TraceContextOld]) = traceContext.set(ctx) def clear: Unit = traceContext.remove() - def start(name: String, token: Option[String])(implicit system: ActorSystem): TraceContext = { + def start(name: String, token: Option[String])(implicit system: ActorSystem): TraceContextOld = { val ctx = newTraceContext(name, token.getOrElse(TraceToken.generate())) ctx.start(name) set(Some(ctx)) @@ -50,7 +50,7 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { ctx } - def withContext[T](ctx: Option[TraceContext])(thunk: ⇒ T): T = { + def withContext[T](ctx: Option[TraceContextOld])(thunk: ⇒ T): T = { val oldval = context set(ctx) @@ -58,11 +58,11 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { finally set(oldval) } - def transformContext(f: TraceContext ⇒ TraceContext): Unit = { + def transformContext(f: TraceContextOld ⇒ TraceContextOld): Unit = { context.map(f).foreach(ctx ⇒ set(Some(ctx))) } - def finish(): Option[TraceContext] = { + def finish(): Option[TraceContextOld] = { val ctx = context() ctx.map(_.finish) clear @@ -70,7 +70,7 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { } // TODO: FIX - def newTraceContext(name: String, token: String)(implicit system: ActorSystem): TraceContext = TraceContext(Kamon(Trace).api, tranid.getAndIncrement, name, token) + def newTraceContext(name: String, token: String)(implicit system: ActorSystem): TraceContextOld = TraceContextOld(Kamon(TraceOld).api, tranid.getAndIncrement, name, token) def startSegment(category: Segments.Category, description: String = "", attributes: Map[String, String] = Map()): SegmentCompletionHandle = { val start = Segments.Start(category, description, attributes) diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 5780b749..95a3a8b2 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -23,7 +23,7 @@ import kamon.Kamon import kamon.trace.UowTracing.{ Finish, Start } // TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. -case class TraceContext(private val collector: ActorRef, id: Long, name: String, token: String, userContext: Option[Any] = None) { +case class TraceContextOld(private val collector: ActorRef, id: Long, name: String, token: String, userContext: Option[Any] = None) { def start(name: String) = { collector ! Start(id, name) @@ -34,18 +34,3 @@ case class TraceContext(private val collector: ActorRef, id: Long, name: String, } } - -trait ContextAware { - def traceContext: Option[TraceContext] -} - -object ContextAware { - def default: ContextAware = new ContextAware { - val traceContext: Option[TraceContext] = Trace.context() - } -} - -trait TimedContextAware { - def timestamp: Long - def traceContext: Option[TraceContext] -} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala b/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala new file mode 100644 index 00000000..1e552563 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TraceCtx.scala @@ -0,0 +1,125 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import akka.actor.{ExtendedActorSystem, ActorSystem} +import akka.dispatch.AbstractNodeQueue +import kamon.Kamon +import kamon.metrics.{TraceMetrics, Metrics} +import java.util.concurrent.atomic.AtomicLong + +sealed trait TracingLevelOfDetail +case object OnlyMetrics extends TracingLevelOfDetail +case object SimpleTrace extends TracingLevelOfDetail +case object FullTrace extends TracingLevelOfDetail + +trait TraceContext { + def token: String + def name: String + def rename(newName: String): Unit + def levelOfDetail: TracingLevelOfDetail + def startSegment + def finish(metadata: Map[String, String]) + +} + +trait TraceContextAware { + def captureMark: Long + def traceContext: Option[TraceContext] +} + +object TraceContextAware { + def default: TraceContextAware = new TraceContextAware { + val captureMark = System.nanoTime() + val traceContext = TraceRecorder.currentContext + } +} + +object TraceContext { + +} + +class SimpleMetricCollectionContext(val token: String, @volatile private var _name: String, val system: ActorSystem, + metadata: Map[String, String]) extends TraceContext { + val levelOfDetail = OnlyMetrics + + @volatile private var _isOpen = true + + val startMark = System.nanoTime() + + def name: String = _name + + def rename(newName: String): Unit = _name = newName + + def isOpen(): Boolean = _isOpen + + def finish(metadata: Map[String, String]): Unit = { + val finishMark = System.nanoTime() + + // Store all metrics! + val metricsExtension = Kamon(Metrics)(system) + val metricRecorder = metricsExtension.register(name, TraceMetrics) + + metricRecorder.map { traceMetrics => + traceMetrics.elapsedTime.record(finishMark - startMark) + } + + } + + override def startSegment: Unit = ??? + + +} + +private[kamon] class SegmentRecordingQueue extends AbstractNodeQueue[String] + + + + +class TraceRecorder(system: ExtendedActorSystem) { + +} + +object TraceRecorder { + private val tokenCounter = new AtomicLong + private val traceContextStorage = new ThreadLocal[Option[TraceContext]] { + override def initialValue(): Option[TraceContext] = None + } + + private def newTraceContext(name: String, token: Option[String], metadata: Map[String, String], system: ActorSystem): TraceContext = ??? + + def setContext(context: Option[TraceContext]): Unit = traceContextStorage.set(context) + + def clearContext: Unit = traceContextStorage.set(None) + + def currentContext: Option[TraceContext] = traceContextStorage.get() + + def start(name: String, token: Option[String] = None, metadata: Map[String, String] = Map.empty)(implicit system: ActorSystem) = { + val ctx = newTraceContext(name, token, metadata, system) + traceContextStorage.set(Some(ctx)) + } + + def withContext[T](context: Option[TraceContext])(thunk: => T): T = { + val oldContext = currentContext + setContext(context) + + try thunk finally setContext(oldContext) + } + + def finish(metadata: Map[String, String] = Map.empty): Unit = currentContext.map(_.finish(metadata)) + +} diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala index 403e4ee7..4b7dbb28 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala @@ -17,8 +17,8 @@ package kamon.trace.logging import ch.qos.logback.classic.pattern.ClassicConverter import ch.qos.logback.classic.spi.ILoggingEvent -import kamon.trace.Trace +import kamon.trace.TraceRecorder class LogbackTraceTokenConverter extends ClassicConverter { - def convert(event: ILoggingEvent): String = Trace.context().map(_.token).getOrElse("undefined") + def convert(event: ILoggingEvent): String = TraceRecorder.currentContext.map(_.token).getOrElse("undefined") } diff --git a/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala b/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala index dbbf1246..c681e921 100644 --- a/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala +++ b/kamon-core/src/test/scala/akka/testkit/TestProbeTracing.scala @@ -17,7 +17,7 @@ package akka.testkit import org.aspectj.lang.annotation._ -import kamon.trace.{ ContextAware, Trace } +import kamon.trace.{ TraceContextAware, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import akka.testkit.TestActor.RealMessage @@ -25,13 +25,13 @@ import akka.testkit.TestActor.RealMessage class TestProbeTracing { @DeclareMixin("akka.testkit.TestActor.RealMessage") - def mixin: ContextAware = ContextAware.default + def mixin: TraceContextAware = TraceContextAware.default @Pointcut("execution(akka.testkit.TestActor.RealMessage.new(..)) && this(ctx)") - def realMessageCreation(ctx: ContextAware): Unit = {} + def realMessageCreation(ctx: TraceContextAware): Unit = {} @After("realMessageCreation(ctx)") - def afterRealMessageCreation(ctx: ContextAware): Unit = { + def afterRealMessageCreation(ctx: TraceContextAware): Unit = { // Necessary to force the initialization of ContextAware at the moment of creation. ctx.traceContext } @@ -42,11 +42,11 @@ class TestProbeTracing { @Around("testProbeReply(testProbe)") def aroundTestProbeReply(pjp: ProceedingJoinPoint, testProbe: TestProbe): Any = { val traceContext = testProbe.lastMessage match { - case msg: RealMessage ⇒ msg.asInstanceOf[ContextAware].traceContext + case msg: RealMessage ⇒ msg.asInstanceOf[TraceContextAware].traceContext case _ ⇒ None } - Trace.withContext(traceContext) { + TraceRecorder.withContext(traceContext) { pjp.proceed } } -- cgit v1.2.3