diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-04-24 13:54:40 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-04-24 13:54:40 +0200 |
commit | 4d828e1a3195e55365c865aa3a78af9668742643 (patch) | |
tree | 07fff2683933c96297a8ba577bbdc89888da16e1 /kamon-core/src/main/scala/kamon/trace | |
parent | 469c11dc1ddb140f407a33f48033e533bf60611c (diff) | |
download | Kamon-4d828e1a3195e55365c865aa3a78af9668742643.tar.gz Kamon-4d828e1a3195e55365c865aa3a78af9668742643.tar.bz2 Kamon-4d828e1a3195e55365c865aa3a78af9668742643.zip |
Prepare for the major cleanup
Moved all the original files from src/main to src/legacy-main, same with test files. Also
removed the autoweave module, examples and bench as I'm planning to have them in separate
repositories.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
11 files changed, 0 insertions, 1133 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala deleted file mode 100644 index fe5ad569..00000000 --- a/kamon-core/src/main/scala/kamon/trace/Incubator.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import akka.actor.{ActorLogging, Props, Actor, ActorRef} -import kamon.trace.Incubator.{CheckForCompletedTraces, IncubatingTrace} -import kamon.util.{NanoInterval, RelativeNanoTimestamp} -import scala.annotation.tailrec -import scala.collection.immutable.Queue -import kamon.util.ConfigTools.Syntax - -class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging { - import context.dispatcher - val config = context.system.settings.config.getConfig("kamon.trace.incubator") - - val minIncubationTime = new NanoInterval(config.getFiniteDuration("min-incubation-time").toNanos) - val maxIncubationTime = new NanoInterval(config.getFiniteDuration("max-incubation-time").toNanos) - val checkInterval = config.getFiniteDuration("check-interval") - - val checkSchedule = context.system.scheduler.schedule(checkInterval, checkInterval, self, CheckForCompletedTraces) - var waitingForMinimumIncubation = Queue.empty[IncubatingTrace] - var waitingForIncubationFinish = List.empty[IncubatingTrace] - - def receive = { - case tc: TracingContext ⇒ incubate(tc) - case CheckForCompletedTraces ⇒ - checkWaitingForMinimumIncubation() - checkWaitingForIncubationFinish() - } - - def incubate(tc: TracingContext): Unit = - waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now)) - - @tailrec private def checkWaitingForMinimumIncubation(): Unit = { - if (waitingForMinimumIncubation.nonEmpty) { - val it = waitingForMinimumIncubation.head - if (NanoInterval.since(it.incubationStart) >= minIncubationTime) { - waitingForMinimumIncubation = waitingForMinimumIncubation.tail - - if (it.tc.shouldIncubate) - waitingForIncubationFinish = it :: waitingForIncubationFinish - else - dispatchTraceInfo(it.tc) - - checkWaitingForMinimumIncubation() - } - } - } - - private def checkWaitingForIncubationFinish(): Unit = { - waitingForIncubationFinish = waitingForIncubationFinish.filter { - case IncubatingTrace(context, incubationStart) ⇒ - if (!context.shouldIncubate) { - dispatchTraceInfo(context) - false - } else { - if (NanoInterval.since(incubationStart) >= maxIncubationTime) { - log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token) - dispatchTraceInfo(context); - false - } else true - } - } - } - - def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo - - override def postStop(): Unit = { - super.postStop() - checkSchedule.cancel() - } -} - -object Incubator { - - def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions)) - - case object CheckForCompletedTraces - case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp) -} diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala deleted file mode 100644 index bc0bedba..00000000 --- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala +++ /dev/null @@ -1,186 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2016 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import java.util.concurrent.ConcurrentLinkedQueue - -import akka.event.LoggingAdapter -import kamon.Kamon -import kamon.metric.{SegmentMetrics, TraceMetrics} -import kamon.util.{NanoInterval, RelativeNanoTimestamp} - -import scala.annotation.tailrec -import scala.collection.concurrent.TrieMap - -private[kamon] class MetricsOnlyContext( - traceName: String, - val token: String, - traceTags: Map[String, String], - currentStatus: Status, - val levelOfDetail: LevelOfDetail, - val startTimestamp: RelativeNanoTimestamp, - log: LoggingAdapter -) extends TraceContext { - - @volatile private var _name = traceName - @volatile private var _status = currentStatus - @volatile protected var _elapsedTime = NanoInterval.default - - private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() - private val _traceLocalStorage = new TraceLocalStorage - private val _tags = TrieMap.empty[String, String] ++= traceTags - - def rename(newName: String): Unit = - if (Status.Open == status) - _name = newName - else - log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) - - def name: String = _name - def tags: Map[String, String] = _tags.toMap - def isEmpty: Boolean = false - def status: Status = _status - def addMetadata(key: String, value: String): Unit = {} - def addTag(key: String, value: String): Unit = _tags.put(key, value) - def removeTag(key: String, value: String): Boolean = _tags.remove(key, value) - - private def finish(withError: Boolean): Unit = { - _status = if (withError) Status.FinishedWithError else Status.FinishedSuccessfully - val traceElapsedTime = NanoInterval.since(startTimestamp) - _elapsedTime = traceElapsedTime - - if (Kamon.metrics.shouldTrack(name, TraceMetrics.category)) { - val traceEntity = Kamon.metrics.entity(TraceMetrics, name, _tags.toMap) - traceEntity.elapsedTime.record(traceElapsedTime.nanos) - if (withError) traceEntity.errors.increment() - } - drainFinishedSegments() - } - - def finish(): Unit = finish(withError = false) - - def finishWithError(cause: Throwable): Unit = { - //we should do something with the Throwable in a near future - finish(withError = true) - } - - def startSegment(segmentName: String, category: String, library: String): Segment = - startSegment(segmentName, category, library, Map.empty[String, String]) - - def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = - new MetricsOnlySegment(segmentName, category, library, tags) - - @tailrec private def drainFinishedSegments(): Unit = { - val segment = _finishedSegments.poll() - if (segment != null) { - val defaultTags = Map( - "trace" → name, - "category" → segment.category, - "library" → segment.library - ) - - if (Kamon.metrics.shouldTrack(segment.name, SegmentMetrics.category)) { - val segmentEntity = Kamon.metrics.entity(SegmentMetrics, segment.name, defaultTags ++ segment.tags) - segmentEntity.elapsedTime.record(segment.duration.nanos) - if (segment.isFinishedWithError) segmentEntity.errors.increment() - } - drainFinishedSegments() - } - } - - protected def finishSegment( - segmentName: String, - category: String, - library: String, - duration: NanoInterval, - segmentTags: Map[String, String], - isFinishedWithError: Boolean - ): Unit = { - - _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration, segmentTags, isFinishedWithError)) - - if (isClosed) { - drainFinishedSegments() - } - } - - // Should only be used by the TraceLocal utilities. - def traceLocalStorage: TraceLocalStorage = _traceLocalStorage - - // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default - // will be returned. - def elapsedTime: NanoInterval = _elapsedTime - - class MetricsOnlySegment( - segmentName: String, - val category: String, - val library: String, - segmentTags: Map[String, String] - ) extends Segment { - - private val _startTimestamp = RelativeNanoTimestamp.now - private val _tags = TrieMap.empty[String, String] ++= segmentTags - - @volatile private var _segmentName = segmentName - @volatile private var _elapsedTime = NanoInterval.default - @volatile private var _status: Status = Status.Open - - def name: String = _segmentName - def tags: Map[String, String] = _tags.toMap - def isEmpty: Boolean = false - def status: Status = _status - def addMetadata(key: String, value: String): Unit = {} - def addTag(key: String, value: String): Unit = _tags.put(key, value) - def removeTag(key: String, value: String): Boolean = _tags.remove(key, value) - - def rename(newName: String): Unit = - if (Status.Open == status) - _segmentName = newName - else - log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) - - private def finish(withError: Boolean): Unit = { - _status = if (withError) Status.FinishedWithError else Status.FinishedSuccessfully - val segmentElapsedTime = NanoInterval.since(_startTimestamp) - _elapsedTime = segmentElapsedTime - - finishSegment(name, category, library, segmentElapsedTime, _tags.toMap, withError) - } - - def finishWithError(cause: Throwable): Unit = { - //we should do something with the Throwable in a near future - finish(withError = true) - } - - def finish(): Unit = finish(withError = false) - - // Handle with care and make sure that the segment is closed before calling this method, otherwise - // NanoInterval.default will be returned. - def elapsedTime: NanoInterval = _elapsedTime - def startTimestamp: RelativeNanoTimestamp = _startTimestamp - - } -} - -case class SegmentLatencyData( - name: String, - category: String, - library: String, - duration: NanoInterval, - tags: Map[String, String], - isFinishedWithError: Boolean -)
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala deleted file mode 100644 index 234c67bb..00000000 --- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import java.net.InetAddress -import java.util.concurrent.atomic.AtomicLong -import kamon.util.{NanoTimestamp, NanoInterval, Sequencer} -import scala.concurrent.forkjoin.ThreadLocalRandom - -import scala.util.Try - -trait Sampler { - def shouldTrace: Boolean - def shouldReport(traceElapsedTime: NanoInterval): Boolean -} - -object NoSampling extends Sampler { - def shouldTrace: Boolean = false - def shouldReport(traceElapsedTime: NanoInterval): Boolean = false -} - -object SampleAll extends Sampler { - def shouldTrace: Boolean = true - def shouldReport(traceElapsedTime: NanoInterval): Boolean = true -} - -class RandomSampler(chance: Int) extends Sampler { - require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0") - require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100") - - def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance - def shouldReport(traceElapsedTime: NanoInterval): Boolean = true -} - -class OrderedSampler(interval: Int) extends Sampler { - import OrderedSampler._ - - require(interval > 0, "kamon.trace.ordered-sampler.sample-interval cannot be <= 0") - assume(interval isPowerOfTwo, "kamon.trace.ordered-sampler.sample-interval must be power of two") - - private val sequencer = Sequencer() - - def shouldTrace: Boolean = (sequencer.next() fastMod interval) == 0 - def shouldReport(traceElapsedTime: NanoInterval): Boolean = true -} - -object OrderedSampler { - implicit class EnhancedInt(i: Int) { - def isPowerOfTwo = (i & (i - 1)) == 0 - } - - implicit class EnhancedLong(dividend: Long) { - def fastMod(divisor: Int) = dividend & (divisor - 1) - } -} - -class ThresholdSampler(thresholdInNanoseconds: NanoInterval) extends Sampler { - require(thresholdInNanoseconds.nanos > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0") - - def shouldTrace: Boolean = true - def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime >= thresholdInNanoseconds -} - -class ClockSampler(pauseInNanoseconds: NanoInterval) extends Sampler { - require(pauseInNanoseconds.nanos > 0, "kamon.trace.clock-sampler.pause cannot be <= 0") - - private val timer: AtomicLong = new AtomicLong(0L) - - def shouldTrace: Boolean = { - val now = NanoTimestamp.now.nanos - val lastTimer = timer.get() - if ((lastTimer + pauseInNanoseconds.nanos) < now) - timer.compareAndSet(lastTimer, now) - else - false - } - def shouldReport(traceElapsedTime: NanoInterval): Boolean = true -} - -class DefaultTokenGenerator extends Function0[String] { - private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") - private val _tokenCounter = new AtomicLong - - def apply(): String = { - _hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet()) - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala deleted file mode 100644 index bbf40d8d..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2016 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import java.io.ObjectStreamException -import java.util - -import kamon.trace.Status.Closed -import kamon.trace.TraceContextAware.DefaultTraceContextAware -import kamon.util.{Function, RelativeNanoTimestamp, SameThreadExecutionContext, Supplier} - -import scala.concurrent.Future - -trait TraceContext { - def name: String - def token: String - def tags: Map[String, String] - def isEmpty: Boolean - def nonEmpty: Boolean = !isEmpty - def isClosed: Boolean = !(Status.Open == status) - def status: Status - def finish(): Unit - def finishWithError(cause: Throwable): Unit - def rename(newName: String): Unit - def startSegment(segmentName: String, category: String, library: String): Segment - def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment - def addMetadata(key: String, value: String): Unit - def addTag(key: String, value: String): Unit - def removeTag(key: String, value: String): Boolean - def startTimestamp: RelativeNanoTimestamp - - def collect[T](f: TraceContext ⇒ T): Option[T] = - if (nonEmpty) - Some(f(this)) - else None - - def collect[T](f: Function[TraceContext, T]): Option[T] = - if (nonEmpty) - Some(f(this)) - else None - - def withNewSegment[T](segmentName: String, category: String, library: String)(code: ⇒ T): T = { - withNewSegment(segmentName, category, library, Map.empty[String, String])(code) - } - - def withNewSegment[T](segmentName: String, category: String, library: String, tags: Map[String, String])(code: ⇒ T): T = { - val segment = startSegment(segmentName, category, library, tags) - try code finally segment.finish() - } - - def withNewAsyncSegment[T](segmentName: String, category: String, library: String)(code: ⇒ Future[T]): Future[T] = { - withNewAsyncSegment(segmentName, category, library, Map.empty[String, String])(code) - } - - def withNewAsyncSegment[T](segmentName: String, category: String, library: String, tags: Map[String, String])(code: ⇒ Future[T]): Future[T] = { - val segment = startSegment(segmentName, category, library, tags) - val result = code - result.onComplete(_ ⇒ segment.finish())(SameThreadExecutionContext) - result - } - - // Java variant. - def withNewSegment[T](segmentName: String, category: String, library: String, code: Supplier[T]): T = - withNewSegment(segmentName, category, library)(code.get) - - def withNewSegment[T](segmentName: String, category: String, library: String, tags: util.Map[String, String], code: Supplier[T]): T = { - import scala.collection.JavaConverters._ - withNewSegment(segmentName, category, library, tags.asScala.toMap)(code.get) - } -} - -trait Segment { - def name: String - def category: String - def library: String - def tags: Map[String, String] - def isEmpty: Boolean - def nonEmpty: Boolean = !isEmpty - def isClosed: Boolean = !(Status.Open == status) - def status: Status - def finish(): Unit - def finishWithError(cause: Throwable): Unit - def rename(newName: String): Unit - def addMetadata(key: String, value: String): Unit - def addTag(key: String, value: String): Unit - def removeTag(key: String, value: String): Boolean -} - -case object EmptyTraceContext extends TraceContext { - def name: String = "empty-trace" - def token: String = "" - def tags: Map[String, String] = Map.empty - def isEmpty: Boolean = true - def status: Status = Closed - def finish(): Unit = {} - def finishWithError(cause: Throwable): Unit = {} - def rename(name: String): Unit = {} - def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment - def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = EmptySegment - def addMetadata(key: String, value: String): Unit = {} - def startTimestamp = new RelativeNanoTimestamp(0L) - def addTag(key: String, value: String): Unit = {} - def removeTag(key: String, value: String): Boolean = false - - case object EmptySegment extends Segment { - val name: String = "empty-segment" - val category: String = "empty-category" - val library: String = "empty-library" - def tags: Map[String, String] = Map.empty - def isEmpty: Boolean = true - def status: Status = Closed - def finish(): Unit = {} - def finishWithError(cause: Throwable): Unit = {} - def rename(newName: String): Unit = {} - def addMetadata(key: String, value: String): Unit = {} - def addTag(key: String, value: String): Unit = {} - def removeTag(key: String, value: String): Boolean = false - } -} - -object SegmentCategory { - val HttpClient = "http-client" - val Database = "database" -} - -class LOD private[trace] (val level: Int) extends AnyVal -object LOD { - val MetricsOnly = new LOD(1) - val SimpleTrace = new LOD(2) -} - -sealed trait LevelOfDetail -object LevelOfDetail { - case object MetricsOnly extends LevelOfDetail - case object SimpleTrace extends LevelOfDetail - case object FullTrace extends LevelOfDetail -} - -sealed trait Status -object Status { - case object Open extends Status - case object Closed extends Status - case object FinishedWithError extends Status - case object FinishedSuccessfully extends Status -} - -trait TraceContextAware extends Serializable { - def traceContext: TraceContext -} - -object TraceContextAware { - def default: TraceContextAware = new DefaultTraceContextAware - - class DefaultTraceContextAware extends TraceContextAware { - @transient val traceContext = Tracer.currentContext - - // - // Beware of this hack, it might bite us in the future! - // - // When using remoting/cluster all messages carry the TraceContext in the envelope in which they - // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is - // available (if any) when the system messages are read and this will make sure that it is correctly - // captured and propagated. - @throws[ObjectStreamException] - private def readResolve: AnyRef = { - new DefaultTraceContextAware - } - } -} - -trait TimestampedTraceContextAware extends TraceContextAware { - def captureNanoTime: Long -} - -object TimestampedTraceContextAware { - def default: TimestampedTraceContextAware = new DefaultTraceContextAware with TimestampedTraceContextAware { - @transient val captureNanoTime = System.nanoTime() - } -} - -trait SegmentAware { - @volatile @transient var segment: Segment = EmptyTraceContext.EmptySegment -} - -object SegmentAware { - def default: SegmentAware = new DefaultSegmentAware - class DefaultSegmentAware extends DefaultTraceContextAware with SegmentAware {} -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala deleted file mode 100644 index 460e4b22..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import kamon.trace.TraceLocal.TraceLocalKey -import kamon.util.Supplier -import scala.collection.concurrent.TrieMap - -object TraceLocal { - - trait TraceLocalKey[T] - - trait AvailableToMdc extends TraceLocalKey[String] { - def mdcKey: String - } - - object AvailableToMdc { - case class DefaultKeyAvailableToMdc(mdcKey: String) extends AvailableToMdc - - def fromKey(mdcKey: String): AvailableToMdc = DefaultKeyAvailableToMdc(mdcKey) - def apply(mdcKey: String): AvailableToMdc = fromKey(mdcKey) - } - - def store[T](key: TraceLocalKey[T])(value: Any): Unit = Tracer.currentContext match { - case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value) - case EmptyTraceContext ⇒ // Can't store in the empty context. - } - - def retrieve[T](key: TraceLocalKey[T]): Option[T] = Tracer.currentContext match { - case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key) - case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. - } - - // Java variant - @throws(classOf[NoSuchElementException]) - def get[T](key: TraceLocalKey[T]): T = retrieve(key).get - - def getOrElse[T](key: TraceLocalKey[T], code: Supplier[T]): T = retrieve(key).getOrElse(code.get) - - def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value) - - def newTraceLocalKey[T]: TraceLocalKey[T] = new TraceLocalKey[T] {} -} - -class TraceLocalStorage { - val underlyingStorage = TrieMap[TraceLocalKey[_], Any]() - - def store[T](key: TraceLocalKey[T])(value: Any): Unit = underlyingStorage.put(key, value) - def retrieve[T](key: TraceLocalKey[T]): Option[T] = underlyingStorage.get(key).map(_.asInstanceOf[T]) -} diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSettings.scala b/kamon-core/src/main/scala/kamon/trace/TraceSettings.scala deleted file mode 100644 index c3a83e93..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceSettings.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2015 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import kamon.util.ConfigTools.Syntax -import com.typesafe.config.Config -import kamon.util.NanoInterval - -case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler, tokenGenerator: () ⇒ String) - -object TraceSettings { - def apply(config: Config): TraceSettings = { - val tracerConfig = config.getConfig("kamon.trace") - - val detailLevel: LevelOfDetail = tracerConfig.getString("level-of-detail") match { - case "metrics-only" ⇒ LevelOfDetail.MetricsOnly - case "simple-trace" ⇒ LevelOfDetail.SimpleTrace - case other ⇒ sys.error(s"Unknown tracer level of detail [$other] present in the configuration file.") - } - - val sampler: Sampler = - if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling - else tracerConfig.getString("sampling") match { - case "all" ⇒ SampleAll - case "random" ⇒ new RandomSampler(tracerConfig.getInt("random-sampler.chance")) - case "ordered" ⇒ new OrderedSampler(tracerConfig.getInt("ordered-sampler.sample-interval")) - case "threshold" ⇒ new ThresholdSampler(new NanoInterval(tracerConfig.getFiniteDuration("threshold-sampler.minimum-elapsed-time").toNanos)) - case "clock" ⇒ new ClockSampler(new NanoInterval(tracerConfig.getFiniteDuration("clock-sampler.pause").toNanos)) - } - - val dynamic = new akka.actor.ReflectiveDynamicAccess(getClass.getClassLoader) - val tokenGeneratorFQN = tracerConfig.getString("token-generator") - val tokenGenerator = dynamic.createInstanceFor[() ⇒ String](tokenGeneratorFQN, Nil).get // let's bubble up any problems. - - TraceSettings(detailLevel, sampler, tokenGenerator) - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala deleted file mode 100644 index 47455ec5..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import akka.actor.{Terminated, ActorRef, Actor} - -class TraceSubscriptions extends Actor { - import TraceSubscriptions._ - - var subscribers: List[ActorRef] = Nil - - def receive = { - case Subscribe(newSubscriber) ⇒ - if (!subscribers.contains(newSubscriber)) - subscribers = context.watch(newSubscriber) :: subscribers - - case Unsubscribe(leavingSubscriber) ⇒ - subscribers = subscribers.filter(_ == leavingSubscriber) - - case Terminated(terminatedSubscriber) ⇒ - subscribers = subscribers.filter(_ == terminatedSubscriber) - - case trace: TraceInfo ⇒ - subscribers.foreach(_ ! trace) - } -} - -object TraceSubscriptions { - case class Subscribe(subscriber: ActorRef) - case class Unsubscribe(subscriber: ActorRef) -} diff --git a/kamon-core/src/main/scala/kamon/trace/TracerModule.scala b/kamon-core/src/main/scala/kamon/trace/TracerModule.scala deleted file mode 100644 index 552962eb..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TracerModule.scala +++ /dev/null @@ -1,197 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2016 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import java.util - -import akka.actor._ -import akka.event.{Logging, LoggingAdapter} -import com.typesafe.config.Config -import kamon.Kamon -import kamon.metric.MetricsModule -import kamon.util._ - -import scala.collection.JavaConverters._ - -trait TracerModule { - def newContext(name: String): TraceContext - def newContext(name: String, token: Option[String]): TraceContext - def newContext(name: String, token: Option[String], tags: Map[String, String]): TraceContext - def newContext(name: String, token: Option[String], tags: Map[String, String], timestamp: RelativeNanoTimestamp, state: Status, isLocal: Boolean): TraceContext - - def subscribe(subscriber: ActorRef): Unit - def unsubscribe(subscriber: ActorRef): Unit -} - -object Tracer { - private[kamon] val _traceContextStorage = new ThreadLocal[TraceContext] { - override def initialValue(): TraceContext = EmptyTraceContext - } - - def currentContext: TraceContext = - _traceContextStorage.get() - - def setCurrentContext(context: TraceContext): Unit = - _traceContextStorage.set(context) - - def clearCurrentContext: Unit = - _traceContextStorage.remove() - - def withContext[T](context: TraceContext)(code: ⇒ T): T = { - val oldContext = _traceContextStorage.get() - _traceContextStorage.set(context) - - try code finally _traceContextStorage.set(oldContext) - } - - // Java variant. - def withContext[T](context: TraceContext, code: Supplier[T]): T = - withContext(context)(code.get) - - def withNewContext[T](traceName: String, traceToken: Option[String], tags: Map[String, String], autoFinish: Boolean)(code: ⇒ T): T = { - withContext(Kamon.tracer.newContext(traceName, traceToken, tags)) { - val codeResult = code - if (autoFinish) - currentContext.finish() - - codeResult - } - } - - def withNewContext[T](traceName: String)(code: ⇒ T): T = - withNewContext(traceName, None)(code) - - def withNewContext[T](traceName: String, tags: Map[String, String])(code: ⇒ T): T = - withNewContext(traceName, None, tags)(code) - - def withNewContext[T](traceName: String, traceToken: Option[String])(code: ⇒ T): T = - withNewContext(traceName, traceToken, Map.empty[String, String])(code) - - def withNewContext[T](traceName: String, traceToken: Option[String], tags: Map[String, String])(code: ⇒ T): T = - withNewContext(traceName, traceToken, tags, autoFinish = false)(code) - - def withNewContext[T](traceName: String, autoFinish: Boolean)(code: ⇒ T): T = - withNewContext(traceName, None, Map.empty[String, String], autoFinish)(code) - - def withNewContext[T](traceName: String, tags: Map[String, String], autoFinish: Boolean)(code: ⇒ T): T = - withNewContext(traceName, None, tags, autoFinish)(code) - - // Java variants. - def withNewContext[T](traceName: String, traceToken: Option[String], autoFinish: Boolean, code: Supplier[T]): T = - withNewContext(traceName, traceToken, Map.empty[String, String], autoFinish)(code.get) - - def withNewContext[T](traceName: String, traceToken: Option[String], tags: util.Map[String, String], autoFinish: Boolean, code: Supplier[T]): T = - withNewContext(traceName, traceToken, tags.asScala.toMap, autoFinish)(code.get) - - def withNewContext[T](traceName: String, code: Supplier[T]): T = - withNewContext(traceName, None, Map.empty[String, String], autoFinish = false)(code.get) - - def withNewContext[T](traceName: String, tags: util.Map[String, String], code: Supplier[T]): T = - withNewContext(traceName, None, tags.asScala.toMap, autoFinish = false)(code.get) - - def withNewContext[T](traceName: String, traceToken: Option[String], code: Supplier[T]): T = - withNewContext(traceName, traceToken, Map.empty[String, String], autoFinish = false)(code.get) - - def withNewContext[T](traceName: String, traceToken: Option[String], tags: util.Map[String, String], code: Supplier[T]): T = - withNewContext(traceName, traceToken, tags.asScala.toMap, autoFinish = false)(code.get) - - def withNewContext[T](traceName: String, autoFinish: Boolean, code: Supplier[T]): T = - withNewContext(traceName, None, Map.empty[String, String], autoFinish)(code.get) - - def withNewContext[T](traceName: String, tags: util.Map[String, String], autoFinish: Boolean, code: Supplier[T]): T = - withNewContext(traceName, None, tags.asScala.toMap, autoFinish)(code.get) -} - -private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: Config) extends TracerModule { - @volatile private var _settings = TraceSettings(config) - - private val _subscriptions = new LazyActorRef - private val _incubator = new LazyActorRef - - private def newToken: String = _settings.tokenGenerator() - - def newContext(name: String): TraceContext = - createTraceContext(name, None) - - def newContext(name: String, token: Option[String]): TraceContext = - createTraceContext(name, token) - - def newContext(name: String, token: Option[String], tags: Map[String, String]): TraceContext = - createTraceContext(name, token, tags) - - def newContext(name: String, token: Option[String], tags: Map[String, String], timestamp: RelativeNanoTimestamp, status: Status, isLocal: Boolean): TraceContext = - createTraceContext(name, token, tags, timestamp, status, isLocal) - - private def createTraceContext(traceName: String, token: Option[String], tags: Map[String, String] = Map.empty, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now, - status: Status = Status.Open, isLocal: Boolean = true): TraceContext = { - - def newMetricsOnlyContext(token: String): TraceContext = - new MetricsOnlyContext(traceName, token, tags, status, _settings.levelOfDetail, startTimestamp, _logger) - - val traceToken = token.getOrElse(newToken) - - _settings.levelOfDetail match { - case LevelOfDetail.MetricsOnly ⇒ - newMetricsOnlyContext(traceToken) - case _ if !isLocal || !_settings.sampler.shouldTrace ⇒ - newMetricsOnlyContext(traceToken) - case _ ⇒ - new TracingContext(traceName, traceToken, tags, currentStatus = Status.Open, _settings.levelOfDetail, isLocal, startTimestamp, _logger, dispatchTracingContext) - } - } - - def subscribe(subscriber: ActorRef): Unit = - _subscriptions.tell(TraceSubscriptions.Subscribe(subscriber)) - - def unsubscribe(subscriber: ActorRef): Unit = - _subscriptions.tell(TraceSubscriptions.Unsubscribe(subscriber)) - - private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = - if (_settings.sampler.shouldReport(trace.elapsedTime)) - if (trace.shouldIncubate) - _incubator.tell(trace) - else - _subscriptions.tell(trace.generateTraceInfo) - - /** - * Tracer Extension initialization. - */ - private var _system: ActorSystem = null - private var _logger: LoggingAdapter = null - private lazy val _start = { - val subscriptions = _system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") - _subscriptions.point(subscriptions) - _incubator.point(_system.actorOf(Incubator.props(subscriptions))) - } - - def start(system: ActorSystem, newConfig: Config): Unit = synchronized { - _settings = TraceSettings(newConfig) - _system = system - _logger = Logging(_system, "TracerModule") - _start - _system = null - } -} - -private[kamon] object TracerModuleImpl { - - def apply(metricsExtension: MetricsModule, config: Config) = - new TracerModuleImpl(metricsExtension, config) -} - -case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], tags: Map[String, String], segments: List[SegmentInfo], status: Status) -case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], tags: Map[String, String], status: Status)
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala deleted file mode 100644 index ff128f85..00000000 --- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2016 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicInteger - -import akka.event.LoggingAdapter -import kamon.util.{NanoInterval, NanoTimestamp, RelativeNanoTimestamp} - -import scala.collection.concurrent.TrieMap - -private[trace] class TracingContext( - traceName: String, - token: String, - traceTags: Map[String, String], - currentStatus: Status, - levelOfDetail: LevelOfDetail, - isLocal: Boolean, - startTimeztamp: RelativeNanoTimestamp, - log: LoggingAdapter, - traceInfoSink: TracingContext ⇒ Unit -) extends MetricsOnlyContext(traceName, token, traceTags, currentStatus, levelOfDetail, startTimeztamp, log) { - - private val _openSegments = new AtomicInteger(0) - private val _startTimestamp = NanoTimestamp.now - private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]() - private val _metadata = TrieMap.empty[String, String] - - override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value) - - override def startSegment(segmentName: String, category: String, library: String): Segment = { - startSegment(segmentName, category, library, Map.empty[String, String]) - } - - override def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = { - _openSegments.incrementAndGet() - val newSegment = new TracingSegment(segmentName, category, library, tags) - _allSegments.add(newSegment) - newSegment - } - - override def finish(): Unit = { - super.finish() - traceInfoSink(this) - } - - override def finishWithError(cause: Throwable): Unit = { - super.finishWithError(cause) - traceInfoSink(this) - } - - override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval, tags: Map[String, String], isFinishedWithError: Boolean = false): Unit = { - _openSegments.decrementAndGet() - super.finishSegment(segmentName, category, library, duration, tags, isFinishedWithError) - } - - def shouldIncubate: Boolean = (Status.Open == status) || _openSegments.get() > 0 - - // Handle with care, should only be used after a trace is finished. - def generateTraceInfo: TraceInfo = { - require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.") - - val currentSegments = _allSegments.iterator() - var segmentsInfo = List.newBuilder[SegmentInfo] - - while (currentSegments.hasNext) { - val segment = currentSegments.next() - if (segment.isClosed) - segmentsInfo += segment.createSegmentInfo(_startTimestamp, startTimestamp) - else - log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) - } - - TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, tags, segmentsInfo.result(), status) - } - - class TracingSegment( - segmentName: String, - category: String, - library: String, - segmentTags: Map[String, String] - ) extends MetricsOnlySegment(segmentName, category, library, segmentTags) { - - private val metadata = TrieMap.empty[String, String] - override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) - - // Handle with care, should only be used after the segment has finished. - def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = { - require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.") - - // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both - // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it. - val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos) - - SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap, tags, status) - } - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala deleted file mode 100644 index 8177ed14..00000000 --- a/kamon-core/src/main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* =================================================== - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================================================== */ -package kamon.trace.logging - -import ch.qos.logback.classic.pattern.ClassicConverter -import ch.qos.logback.classic.spi.ILoggingEvent -import kamon.trace.Tracer - -class LogbackTraceTokenConverter extends ClassicConverter { - - def convert(event: ILoggingEvent): String = - Tracer.currentContext.collect(_.token).getOrElse("undefined") -} diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala deleted file mode 100644 index 556366b0..00000000 --- a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2016 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace.logging - -import kamon.trace.TraceLocal.AvailableToMdc -import kamon.trace.{EmptyTraceContext, MetricsOnlyContext, TraceContext, Tracer} -import kamon.util.Supplier -import org.slf4j.MDC - -trait MdcKeysSupport { - - val traceTokenKey = "traceToken" - val traceNameKey = "traceName" - - private val defaultKeys = Seq(traceTokenKey, traceNameKey) - - def withMdc[A](thunk: ⇒ A): A = { - val keys = copyToMdc(Tracer.currentContext) - try thunk finally keys.foreach(key ⇒ MDC.remove(key)) - } - - // Java variant. - def withMdc[A](thunk: Supplier[A]): A = withMdc(thunk.get) - - private[kamon] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match { - case ctx: MetricsOnlyContext ⇒ - - // Add the default key value pairs for the trace token and trace name. - MDC.put(traceTokenKey, ctx.token) - MDC.put(traceNameKey, ctx.name) - - defaultKeys ++ ctx.traceLocalStorage.underlyingStorage.collect { - case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey → String.valueOf(value)) - }.flatMap { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } } - - case EmptyTraceContext ⇒ Iterable.empty[String] - } -} - -object MdcKeysSupport extends MdcKeysSupport
\ No newline at end of file |