diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
13 files changed, 719 insertions, 238 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala new file mode 100644 index 00000000..19ea4f39 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala @@ -0,0 +1,97 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util.concurrent.TimeUnit + +import akka.actor.{ ActorLogging, Props, Actor, ActorRef } +import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace } +import kamon.util.{ NanoInterval, RelativeNanoTimestamp } +import scala.annotation.tailrec +import scala.collection.immutable.Queue +import scala.concurrent.duration._ + +class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging { + import kamon.util.ConfigTools.Syntax + import context.dispatcher + val config = context.system.settings.config.getConfig("kamon.trace.incubator") + + val minIncubationTime = new NanoInterval(config.getFiniteDuration("min-incubation-time").toNanos) + val maxIncubationTime = new NanoInterval(config.getFiniteDuration("max-incubation-time").toNanos) + val checkInterval = config.getFiniteDuration("check-interval") + + val checkSchedule = context.system.scheduler.schedule(checkInterval, checkInterval, self, CheckForCompletedTraces) + var waitingForMinimumIncubation = Queue.empty[IncubatingTrace] + var waitingForIncubationFinish = List.empty[IncubatingTrace] + + def receive = { + case tc: TracingContext ⇒ incubate(tc) + case CheckForCompletedTraces ⇒ + checkWaitingForMinimumIncubation() + checkWaitingForIncubationFinish() + } + + def incubate(tc: TracingContext): Unit = + waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now)) + + @tailrec private def checkWaitingForMinimumIncubation(): Unit = { + if (waitingForMinimumIncubation.nonEmpty) { + val it = waitingForMinimumIncubation.head + if (NanoInterval.since(it.incubationStart) >= minIncubationTime) { + waitingForMinimumIncubation = waitingForMinimumIncubation.tail + + if (it.tc.shouldIncubate) + waitingForIncubationFinish = it :: waitingForIncubationFinish + else + dispatchTraceInfo(it.tc) + + checkWaitingForMinimumIncubation() + } + } + } + + private def checkWaitingForIncubationFinish(): Unit = { + waitingForIncubationFinish = waitingForIncubationFinish.filter { + case IncubatingTrace(context, incubationStart) ⇒ + if (!context.shouldIncubate) { + dispatchTraceInfo(context) + false + } else { + if (NanoInterval.since(incubationStart) >= maxIncubationTime) { + log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token) + dispatchTraceInfo(context); + false + } else true + } + } + } + + def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo + + override def postStop(): Unit = { + super.postStop() + checkSchedule.cancel() + } +} + +object Incubator { + + def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions)) + + case object CheckForCompletedTraces + case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp) +} diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala new file mode 100644 index 00000000..5f7fdff5 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala @@ -0,0 +1,120 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue + +import akka.event.LoggingAdapter +import kamon.metric.{ MetricsExtension, TraceMetrics } +import kamon.util.{ NanoInterval, RelativeNanoTimestamp } + +import scala.annotation.tailrec + +private[kamon] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, + val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension) + extends TraceContext { + + @volatile private var _name = traceName + @volatile private var _isOpen = izOpen + @volatile protected var _elapsedTime = NanoInterval.default + + private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() + private val _traceLocalStorage = new TraceLocalStorage + + def rename(newName: String): Unit = + if (isOpen) + _name = newName + else if (log.isWarningEnabled) + log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) + + def name: String = _name + def isEmpty: Boolean = false + def isOpen: Boolean = _isOpen + def addMetadata(key: String, value: String): Unit = {} + + def finish(): Unit = { + _isOpen = false + val traceElapsedTime = NanoInterval.since(startTimestamp) + _elapsedTime = traceElapsedTime + + metricsExtension.register(TraceMetrics, name).map { registration ⇒ + registration.recorder.ElapsedTime.record(traceElapsedTime.nanos) + drainFinishedSegments(registration.recorder) + } + } + + def startSegment(segmentName: String, category: String, library: String): Segment = + new MetricsOnlySegment(segmentName, category, library) + + @tailrec private def drainFinishedSegments(recorder: TraceMetrics): Unit = { + val segment = _finishedSegments.poll() + if (segment != null) { + recorder.segment(segment.name, segment.category, segment.library).record(segment.duration.nanos) + drainFinishedSegments(recorder) + } + } + + protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration)) + + if (isClosed) { + metricsExtension.register(TraceMetrics, name).map { registration ⇒ + drainFinishedSegments(registration.recorder) + } + } + } + + // Should only be used by the TraceLocal utilities. + def traceLocalStorage: TraceLocalStorage = _traceLocalStorage + + // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default + // will be returned. + def elapsedTime: NanoInterval = _elapsedTime + + class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment { + private val _startTimestamp = RelativeNanoTimestamp.now + @volatile private var _segmentName = segmentName + @volatile private var _elapsedTime = NanoInterval.default + @volatile private var _isOpen = true + + def name: String = _segmentName + def isEmpty: Boolean = false + def addMetadata(key: String, value: String): Unit = {} + def isOpen: Boolean = _isOpen + + def rename(newName: String): Unit = + if (isOpen) + _segmentName = newName + else if (log.isWarningEnabled) + log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) + + def finish: Unit = { + _isOpen = false + val segmentElapsedTime = NanoInterval.since(_startTimestamp) + _elapsedTime = segmentElapsedTime + + finishSegment(name, category, library, segmentElapsedTime) + } + + // Handle with care and make sure that the segment is closed before calling this method, otherwise + // NanoInterval.default will be returned. + def elapsedTime: NanoInterval = _elapsedTime + def startTimestamp: RelativeNanoTimestamp = _startTimestamp + } +} + +case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval) diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala new file mode 100644 index 00000000..827840d7 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -0,0 +1,73 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import kamon.util.{ NanoInterval, Sequencer } +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.forkjoin.ThreadLocalRandom + +trait Sampler { + def shouldTrace: Boolean + def shouldReport(traceElapsedTime: NanoInterval): Boolean +} + +object NoSampling extends Sampler { + def shouldTrace: Boolean = false + def shouldReport(traceElapsedTime: NanoInterval): Boolean = false +} + +object SampleAll extends Sampler { + def shouldTrace: Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class RandomSampler(chance: Int) extends Sampler { + require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0") + require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100") + + def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class OrderedSampler(interval: Int) extends Sampler { + import OrderedSampler._ + + require(interval > 0, "kamon.trace.ordered-sampler.interval cannot be <= 0") + assume(interval isPowerOfTwo, "kamon.trace.ordered-sampler.interval must be power of two") + + private val sequencer = Sequencer() + + def shouldTrace: Boolean = (sequencer.next() fastMod interval) == 0 + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +object OrderedSampler { + implicit class EnhancedInt(i: Int) { + def isPowerOfTwo = (i & (i - 1)) == 0 + } + + implicit class EnhancedLong(dividend: Long) { + def fastMod(divisor: Int) = dividend & (divisor - 1) + } +} + +class ThresholdSampler(threshold: FiniteDuration) extends Sampler { + + def shouldTrace: Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime.nanos >= threshold.toNanos +} + diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index 5b74e6b2..48e56153 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -17,146 +17,113 @@ package kamon.trace import java.io.ObjectStreamException - -import akka.actor.ActorSystem -import kamon.Kamon -import kamon.metric._ -import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.metric.TraceMetrics.TraceMetricRecorder - -import scala.annotation.tailrec +import kamon.util.RelativeNanoTimestamp -sealed trait TraceContext { +trait TraceContext { def name: String def token: String - def rename(name: String): Unit - def finish(): Unit - def origin: TraceContextOrigin - def isOpen: Boolean - def isClosed: Boolean = !isOpen def isEmpty: Boolean def nonEmpty: Boolean = !isEmpty + def isOpen: Boolean + def isClosed: Boolean = !isOpen + + def finish(): Unit + def rename(newName: String): Unit + def startSegment(segmentName: String, category: String, library: String): Segment - def nanoTimestamp: Long + def addMetadata(key: String, value: String) + + def startTimestamp: RelativeNanoTimestamp } -sealed trait Segment { +object TraceContext { + private[kamon] val _traceContextStorage = new ThreadLocal[TraceContext] { + override def initialValue(): TraceContext = EmptyTraceContext + } + + def currentContext: TraceContext = + _traceContextStorage.get() + + def setCurrentContext(context: TraceContext): Unit = + _traceContextStorage.set(context) + + def clearCurrentContext: Unit = + _traceContextStorage.remove() + + def withContext[T](context: TraceContext)(code: ⇒ T): T = { + val oldContext = _traceContextStorage.get() + _traceContextStorage.set(context) + + try code finally _traceContextStorage.set(oldContext) + } + + def map[T](f: TraceContext ⇒ T): Option[T] = { + val current = currentContext + if (current.nonEmpty) + Some(f(current)) + else None + } + +} + +trait Segment { def name: String - def rename(newName: String): Unit def category: String def library: String - def finish(): Unit def isEmpty: Boolean + def nonEmpty: Boolean = !isEmpty + def isOpen: Boolean + def isClosed: Boolean = !isOpen + + def finish(): Unit + def rename(newName: String): Unit + def addMetadata(key: String, value: String) } case object EmptyTraceContext extends TraceContext { def name: String = "empty-trace" def token: String = "" - def rename(name: String): Unit = {} - def finish(): Unit = {} - def origin: TraceContextOrigin = TraceContextOrigin.Local - def isOpen: Boolean = false def isEmpty: Boolean = true + def isOpen: Boolean = false + + def finish(): Unit = {} + def rename(name: String): Unit = {} def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment - def nanoTimestamp: Long = 0L + def addMetadata(key: String, value: String): Unit = {} + def startTimestamp = new RelativeNanoTimestamp(0L) case object EmptySegment extends Segment { val name: String = "empty-segment" val category: String = "empty-category" val library: String = "empty-library" def isEmpty: Boolean = true - def rename(newName: String): Unit = {} - def finish: Unit = {} - } -} - -class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, - val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext { - - val isEmpty: Boolean = false - @volatile private var _name = traceName - @volatile private var _isOpen = izOpen - - private val _nanoTimestamp = nanoTimeztamp - private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]() - private val metricsExtension = Kamon(Metrics)(system) - private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage - - def name: String = _name - def rename(newName: String): Unit = - if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace. - - def isOpen: Boolean = _isOpen - def nanoTimestamp: Long = _nanoTimestamp - - def finish(): Unit = { - _isOpen = false - val elapsedNanoTime = System.nanoTime() - _nanoTimestamp - val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory) - - metricRecorder.map { traceMetrics ⇒ - traceMetrics.elapsedTime.record(elapsedNanoTime) - drainFinishedSegments(traceMetrics) - } - } - - def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library) + def isOpen: Boolean = false - @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { - val segment = finishedSegments.poll() - if (segment != null) { - metricRecorder.segmentRecorder(segment.identity).record(segment.duration) - drainFinishedSegments(metricRecorder) - } - } - - private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = { - finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration)) - - if (isClosed) { - metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒ - drainFinishedSegments(traceMetrics) - } - } - } - - class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment { - private val _segmentStartNanoTime = System.nanoTime() - @volatile private var _segmentName = segmentName - @volatile private var _isOpen = true - - def name: String = _segmentName - def rename(newName: String): Unit = _segmentName = newName - def isEmpty: Boolean = false - - def finish: Unit = { - val segmentFinishNanoTime = System.nanoTime() - finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime)) - } + def finish: Unit = {} + def rename(newName: String): Unit = {} + def addMetadata(key: String, value: String): Unit = {} } } -case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity -case class SegmentData(identity: SegmentMetricIdentity, duration: Long) - object SegmentCategory { val HttpClient = "http-client" + val Database = "database" +} + +class LOD private[trace] (val level: Int) extends AnyVal +object LOD { + val MetricsOnly = new LOD(1) + val SimpleTrace = new LOD(2) } sealed trait LevelOfDetail object LevelOfDetail { - case object OnlyMetrics extends LevelOfDetail + case object MetricsOnly extends LevelOfDetail case object SimpleTrace extends LevelOfDetail case object FullTrace extends LevelOfDetail } -sealed trait TraceContextOrigin -object TraceContextOrigin { - case object Local extends TraceContextOrigin - case object Remote extends TraceContextOrigin -} - trait TraceContextAware extends Serializable { def traceContext: TraceContext } @@ -165,7 +132,7 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - @transient val traceContext = TraceRecorder.currentContext + @transient val traceContext = TraceContext.currentContext // // Beware of this hack, it might bite us in the future! diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala deleted file mode 100644 index a59abc18..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } -import akka.actor -import kamon.util.GlobPathFilter -import kamon.Kamon - -class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val config = system.settings.config.getConfig("kamon.trace") - val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing") -} - -object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Trace - def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) - - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala index 0766af74..057f564e 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala @@ -16,23 +16,43 @@ package kamon.trace -import scala.collection.concurrent.TrieMap import kamon.trace.TraceLocal.TraceLocalKey +import scala.collection.concurrent.TrieMap + object TraceLocal { + trait TraceLocalKey { type ValueType } - def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value) - case EmptyTraceContext ⇒ // Can't store in the empty context. + trait AvailableToMdc extends TraceLocalKey { + override type ValueType = String + def mdcKey: String + } + + object AvailableToMdc { + case class DefaultKeyAvailableToMdc(mdcKey: String) extends AvailableToMdc + + def fromKey(mdcKey: String): AvailableToMdc = DefaultKeyAvailableToMdc(mdcKey) + def apply(mdcKey: String): AvailableToMdc = fromKey(mdcKey) + } + + case class HttpContext(agent: String, uri: String, xforwarded: String) + + object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext } + + def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceContext.currentContext match { + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value) + case EmptyTraceContext ⇒ // Can't store in the empty context. } - def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match { - case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key) - case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. + def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceContext.currentContext match { + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key) + case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. } + + def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value) } class TraceLocalStorage { diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala deleted file mode 100644 index 8da187cb..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import scala.language.experimental.macros -import java.util.concurrent.atomic.AtomicLong -import kamon.macros.InlineTraceContextMacro - -import scala.util.Try -import java.net.InetAddress -import akka.actor.ActorSystem - -object TraceRecorder { - private val traceContextStorage = new ThreadLocal[TraceContext] { - override def initialValue(): TraceContext = EmptyTraceContext - } - - private val tokenCounter = new AtomicLong - private val hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - - def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet()) - - private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = { - new DefaultTraceContext( - name, - token.getOrElse(newToken), - izOpen = true, - LevelOfDetail.OnlyMetrics, - TraceContextOrigin.Local, - nanoTimeztamp = System.nanoTime, - system) - } - - def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = { - val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000) - new DefaultTraceContext( - traceName, - traceToken, - isOpen, - LevelOfDetail.OnlyMetrics, - TraceContextOrigin.Remote, - equivalentNanotime, - system) - } - - def setContext(context: TraceContext): Unit = traceContextStorage.set(context) - - def clearContext: Unit = traceContextStorage.set(EmptyTraceContext) - - def currentContext: TraceContext = traceContextStorage.get() - - def start(name: String, token: Option[String] = None)(implicit system: ActorSystem) = { - val ctx = newTraceContext(name, token, system) - traceContextStorage.set(ctx) - } - - def rename(name: String): Unit = currentContext.rename(name) - - def withNewTraceContext[T](name: String, token: Option[String] = None)(thunk: ⇒ T)(implicit system: ActorSystem): T = - withTraceContext(newTraceContext(name, token, system))(thunk) - - def withTraceContext[T](context: TraceContext)(thunk: ⇒ T): T = { - val oldContext = currentContext - setContext(context) - - try thunk finally setContext(oldContext) - } - - def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match { - case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system)) - case EmptyTraceContext ⇒ None - } - - def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext] - - def finish(): Unit = currentContext.finish() - -} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala new file mode 100644 index 00000000..f2da404c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala @@ -0,0 +1,45 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import akka.actor.{ Terminated, ActorRef, Actor } + +class TraceSubscriptions extends Actor { + import TraceSubscriptions._ + + var subscribers: List[ActorRef] = Nil + + def receive = { + case Subscribe(newSubscriber) ⇒ + if (!subscribers.contains(newSubscriber)) + subscribers = context.watch(newSubscriber) :: subscribers + + case Unsubscribe(leavingSubscriber) ⇒ + subscribers = subscribers.filter(_ == leavingSubscriber) + + case Terminated(terminatedSubscriber) ⇒ + subscribers = subscribers.filter(_ == terminatedSubscriber) + + case trace: TraceInfo ⇒ + subscribers.foreach(_ ! trace) + } +} + +object TraceSubscriptions { + case class Subscribe(subscriber: ActorRef) + case class Unsubscribe(subscriber: ActorRef) +} diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala new file mode 100644 index 00000000..be565154 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracerExtension.scala @@ -0,0 +1,110 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.net.InetAddress +import java.util.concurrent.atomic.AtomicLong + +import akka.actor._ +import com.typesafe.config.Config +import kamon.metric.MetricsExtension +import kamon.util._ + +import scala.util.Try + +trait TracerExtension { + def newContext(name: String): TraceContext + def newContext(name: String, token: String): TraceContext + def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext + + def subscribe(subscriber: ActorRef): Unit + def unsubscribe(subscriber: ActorRef): Unit +} + +private[kamon] class TracerExtensionImpl(metricsExtension: MetricsExtension, config: Config) extends TracerExtension { + private val _settings = TraceSettings(config) + private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + private val _tokenCounter = new AtomicLong + + private val _subscriptions = new LazyActorRef + private val _incubator = new LazyActorRef + + private def newToken: String = + _hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet()) + + def newContext(name: String): TraceContext = + createTraceContext(name) + + def newContext(name: String, token: String): TraceContext = + createTraceContext(name, token) + + def newContext(name: String, token: String, timestamp: RelativeNanoTimestamp, isOpen: Boolean, isLocal: Boolean): TraceContext = + createTraceContext(name, token, timestamp, isOpen, isLocal) + + private def createTraceContext(traceName: String, token: String = newToken, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now, + isOpen: Boolean = true, isLocal: Boolean = true): TraceContext = { + + def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, _settings.levelOfDetail, startTimestamp, null, metricsExtension) + + if (_settings.levelOfDetail == LevelOfDetail.MetricsOnly || !isLocal) + newMetricsOnlyContext + else { + if (!_settings.sampler.shouldTrace) + newMetricsOnlyContext + else + new TracingContext(traceName, token, true, _settings.levelOfDetail, isLocal, startTimestamp, null, metricsExtension, this, dispatchTracingContext) + } + } + + def subscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(TraceSubscriptions.Subscribe(subscriber)) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(TraceSubscriptions.Unsubscribe(subscriber)) + + private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = + if (_settings.sampler.shouldReport(trace.elapsedTime)) + if (trace.shouldIncubate) + _incubator.tell(trace) + else + _subscriptions.tell(trace.generateTraceInfo) + + /** + * Tracer Extension initialization. + */ + private var _system: ActorSystem = null + private lazy val _start = { + val subscriptions = _system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") + _subscriptions.point(subscriptions) + _incubator.point(_system.actorOf(Incubator.props(subscriptions))) + } + + def start(system: ActorSystem): Unit = synchronized { + _system = system + _start + _system = null + } +} + +private[kamon] object TracerExtensionImpl { + + def apply(metricsExtension: MetricsExtension, config: Config) = + new TracerExtensionImpl(metricsExtension, config) +} + +case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo]) +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String])
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala new file mode 100644 index 00000000..79f30f23 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracerExtensionSettings.scala @@ -0,0 +1,46 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import com.typesafe.config.Config + +case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler) + +object TraceSettings { + import kamon.util.ConfigTools.Syntax + + def apply(config: Config): TraceSettings = { + val tracerConfig = config.getConfig("kamon.trace") + + val detailLevel: LevelOfDetail = tracerConfig.getString("level-of-detail") match { + case "metrics-only" ⇒ LevelOfDetail.MetricsOnly + case "simple-trace" ⇒ LevelOfDetail.SimpleTrace + case other ⇒ sys.error(s"Unknown tracer level of detail [$other] present in the configuration file.") + } + + val sampler: Sampler = + if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling + else tracerConfig.getString("sampling") match { + case "all" ⇒ SampleAll + case "random" ⇒ new RandomSampler(tracerConfig.getInt("random-sampler.chance")) + case "ordered" ⇒ new OrderedSampler(tracerConfig.getInt("ordered-sampler.interval")) + case "threshold" ⇒ new ThresholdSampler(tracerConfig.getFiniteDuration("threshold-sampler.minimum-elapsed-time")) + } + + TraceSettings(detailLevel, sampler) + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala new file mode 100644 index 00000000..3d324886 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala @@ -0,0 +1,92 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger + +import akka.event.LoggingAdapter +import kamon.util.{ NanoInterval, RelativeNanoTimestamp, NanoTimestamp } +import kamon.metric.MetricsExtension + +import scala.collection.concurrent.TrieMap + +private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, + isLocal: Boolean, startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, + traceExtension: TracerExtensionImpl, traceInfoSink: TracingContext ⇒ Unit) + extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, startTimeztamp, log, metricsExtension) { + + private val _openSegments = new AtomicInteger(0) + private val _startTimestamp = NanoTimestamp.now + private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]() + private val _metadata = TrieMap.empty[String, String] + + override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value) + + override def startSegment(segmentName: String, category: String, library: String): Segment = { + _openSegments.incrementAndGet() + val newSegment = new TracingSegment(segmentName, category, library) + _allSegments.add(newSegment) + newSegment + } + + override def finish(): Unit = { + super.finish() + traceInfoSink(this) + } + + override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = { + _openSegments.decrementAndGet() + super.finishSegment(segmentName, category, library, duration) + } + + def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0 + + // Handle with care, should only be used after a trace is finished. + def generateTraceInfo: TraceInfo = { + require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.") + + val currentSegments = _allSegments.iterator() + var segmentsInfo = List.newBuilder[SegmentInfo] + + while (currentSegments.hasNext()) { + val segment = currentSegments.next() + if (segment.isClosed) + segmentsInfo += segment.createSegmentInfo(_startTimestamp, startTimestamp) + else + log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) + } + + TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result()) + } + + class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) { + private val metadata = TrieMap.empty[String, String] + override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + + // Handle with care, should only be used after the segment has finished. + def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = { + require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.") + + // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both + // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it. + val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos) + + SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap) + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala index f052f009..961c3099 100644 --- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala +++ b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala @@ -17,11 +17,11 @@ package kamon.trace.logging import ch.qos.logback.classic.pattern.ClassicConverter import ch.qos.logback.classic.spi.ILoggingEvent -import kamon.trace.TraceRecorder +import kamon.trace.TraceContext class LogbackTraceTokenConverter extends ClassicConverter { def convert(event: ILoggingEvent): String = { - val ctx = TraceRecorder.currentContext + val ctx = TraceContext.currentContext if (ctx.isEmpty) "undefined" else diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala new file mode 100644 index 00000000..4970d97e --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala @@ -0,0 +1,39 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace.logging + +import kamon.trace.TraceLocal.AvailableToMdc +import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext } + +import org.slf4j.MDC + +trait MdcKeysSupport { + + def withMdc[A](thunk: ⇒ A): A = { + val keys = copyToMdc(TraceContext.currentContext) + try thunk finally keys.foreach(key ⇒ MDC.remove(key)) + } + + private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match { + case ctx: MetricsOnlyContext ⇒ + ctx.traceLocalStorage.underlyingStorage.collect { + case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey -> String.valueOf(value)) + }.map { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } }.flatten + + case EmptyTraceContext ⇒ Iterable.empty[String] + } +} |