diff options
Diffstat (limited to 'kamon-core/src/legacy-main/scala/kamon/trace')
11 files changed, 1133 insertions, 0 deletions
diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/Incubator.scala b/kamon-core/src/legacy-main/scala/kamon/trace/Incubator.scala new file mode 100644 index 00000000..fe5ad569 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/Incubator.scala @@ -0,0 +1,94 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import akka.actor.{ActorLogging, Props, Actor, ActorRef} +import kamon.trace.Incubator.{CheckForCompletedTraces, IncubatingTrace} +import kamon.util.{NanoInterval, RelativeNanoTimestamp} +import scala.annotation.tailrec +import scala.collection.immutable.Queue +import kamon.util.ConfigTools.Syntax + +class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging { + import context.dispatcher + val config = context.system.settings.config.getConfig("kamon.trace.incubator") + + val minIncubationTime = new NanoInterval(config.getFiniteDuration("min-incubation-time").toNanos) + val maxIncubationTime = new NanoInterval(config.getFiniteDuration("max-incubation-time").toNanos) + val checkInterval = config.getFiniteDuration("check-interval") + + val checkSchedule = context.system.scheduler.schedule(checkInterval, checkInterval, self, CheckForCompletedTraces) + var waitingForMinimumIncubation = Queue.empty[IncubatingTrace] + var waitingForIncubationFinish = List.empty[IncubatingTrace] + + def receive = { + case tc: TracingContext ⇒ incubate(tc) + case CheckForCompletedTraces ⇒ + checkWaitingForMinimumIncubation() + checkWaitingForIncubationFinish() + } + + def incubate(tc: TracingContext): Unit = + waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now)) + + @tailrec private def checkWaitingForMinimumIncubation(): Unit = { + if (waitingForMinimumIncubation.nonEmpty) { + val it = waitingForMinimumIncubation.head + if (NanoInterval.since(it.incubationStart) >= minIncubationTime) { + waitingForMinimumIncubation = waitingForMinimumIncubation.tail + + if (it.tc.shouldIncubate) + waitingForIncubationFinish = it :: waitingForIncubationFinish + else + dispatchTraceInfo(it.tc) + + checkWaitingForMinimumIncubation() + } + } + } + + private def checkWaitingForIncubationFinish(): Unit = { + waitingForIncubationFinish = waitingForIncubationFinish.filter { + case IncubatingTrace(context, incubationStart) ⇒ + if (!context.shouldIncubate) { + dispatchTraceInfo(context) + false + } else { + if (NanoInterval.since(incubationStart) >= maxIncubationTime) { + log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token) + dispatchTraceInfo(context); + false + } else true + } + } + } + + def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo + + override def postStop(): Unit = { + super.postStop() + checkSchedule.cancel() + } +} + +object Incubator { + + def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions)) + + case object CheckForCompletedTraces + case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp) +} diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/legacy-main/scala/kamon/trace/MetricsOnlyContext.scala new file mode 100644 index 00000000..bc0bedba --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/MetricsOnlyContext.scala @@ -0,0 +1,186 @@ +/* + * ========================================================================================= + * Copyright © 2013-2016 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue + +import akka.event.LoggingAdapter +import kamon.Kamon +import kamon.metric.{SegmentMetrics, TraceMetrics} +import kamon.util.{NanoInterval, RelativeNanoTimestamp} + +import scala.annotation.tailrec +import scala.collection.concurrent.TrieMap + +private[kamon] class MetricsOnlyContext( + traceName: String, + val token: String, + traceTags: Map[String, String], + currentStatus: Status, + val levelOfDetail: LevelOfDetail, + val startTimestamp: RelativeNanoTimestamp, + log: LoggingAdapter +) extends TraceContext { + + @volatile private var _name = traceName + @volatile private var _status = currentStatus + @volatile protected var _elapsedTime = NanoInterval.default + + private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]() + private val _traceLocalStorage = new TraceLocalStorage + private val _tags = TrieMap.empty[String, String] ++= traceTags + + def rename(newName: String): Unit = + if (Status.Open == status) + _name = newName + else + log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName) + + def name: String = _name + def tags: Map[String, String] = _tags.toMap + def isEmpty: Boolean = false + def status: Status = _status + def addMetadata(key: String, value: String): Unit = {} + def addTag(key: String, value: String): Unit = _tags.put(key, value) + def removeTag(key: String, value: String): Boolean = _tags.remove(key, value) + + private def finish(withError: Boolean): Unit = { + _status = if (withError) Status.FinishedWithError else Status.FinishedSuccessfully + val traceElapsedTime = NanoInterval.since(startTimestamp) + _elapsedTime = traceElapsedTime + + if (Kamon.metrics.shouldTrack(name, TraceMetrics.category)) { + val traceEntity = Kamon.metrics.entity(TraceMetrics, name, _tags.toMap) + traceEntity.elapsedTime.record(traceElapsedTime.nanos) + if (withError) traceEntity.errors.increment() + } + drainFinishedSegments() + } + + def finish(): Unit = finish(withError = false) + + def finishWithError(cause: Throwable): Unit = { + //we should do something with the Throwable in a near future + finish(withError = true) + } + + def startSegment(segmentName: String, category: String, library: String): Segment = + startSegment(segmentName, category, library, Map.empty[String, String]) + + def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = + new MetricsOnlySegment(segmentName, category, library, tags) + + @tailrec private def drainFinishedSegments(): Unit = { + val segment = _finishedSegments.poll() + if (segment != null) { + val defaultTags = Map( + "trace" → name, + "category" → segment.category, + "library" → segment.library + ) + + if (Kamon.metrics.shouldTrack(segment.name, SegmentMetrics.category)) { + val segmentEntity = Kamon.metrics.entity(SegmentMetrics, segment.name, defaultTags ++ segment.tags) + segmentEntity.elapsedTime.record(segment.duration.nanos) + if (segment.isFinishedWithError) segmentEntity.errors.increment() + } + drainFinishedSegments() + } + } + + protected def finishSegment( + segmentName: String, + category: String, + library: String, + duration: NanoInterval, + segmentTags: Map[String, String], + isFinishedWithError: Boolean + ): Unit = { + + _finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration, segmentTags, isFinishedWithError)) + + if (isClosed) { + drainFinishedSegments() + } + } + + // Should only be used by the TraceLocal utilities. + def traceLocalStorage: TraceLocalStorage = _traceLocalStorage + + // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default + // will be returned. + def elapsedTime: NanoInterval = _elapsedTime + + class MetricsOnlySegment( + segmentName: String, + val category: String, + val library: String, + segmentTags: Map[String, String] + ) extends Segment { + + private val _startTimestamp = RelativeNanoTimestamp.now + private val _tags = TrieMap.empty[String, String] ++= segmentTags + + @volatile private var _segmentName = segmentName + @volatile private var _elapsedTime = NanoInterval.default + @volatile private var _status: Status = Status.Open + + def name: String = _segmentName + def tags: Map[String, String] = _tags.toMap + def isEmpty: Boolean = false + def status: Status = _status + def addMetadata(key: String, value: String): Unit = {} + def addTag(key: String, value: String): Unit = _tags.put(key, value) + def removeTag(key: String, value: String): Boolean = _tags.remove(key, value) + + def rename(newName: String): Unit = + if (Status.Open == status) + _segmentName = newName + else + log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName) + + private def finish(withError: Boolean): Unit = { + _status = if (withError) Status.FinishedWithError else Status.FinishedSuccessfully + val segmentElapsedTime = NanoInterval.since(_startTimestamp) + _elapsedTime = segmentElapsedTime + + finishSegment(name, category, library, segmentElapsedTime, _tags.toMap, withError) + } + + def finishWithError(cause: Throwable): Unit = { + //we should do something with the Throwable in a near future + finish(withError = true) + } + + def finish(): Unit = finish(withError = false) + + // Handle with care and make sure that the segment is closed before calling this method, otherwise + // NanoInterval.default will be returned. + def elapsedTime: NanoInterval = _elapsedTime + def startTimestamp: RelativeNanoTimestamp = _startTimestamp + + } +} + +case class SegmentLatencyData( + name: String, + category: String, + library: String, + duration: NanoInterval, + tags: Map[String, String], + isFinishedWithError: Boolean +)
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/Sampler.scala b/kamon-core/src/legacy-main/scala/kamon/trace/Sampler.scala new file mode 100644 index 00000000..234c67bb --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/Sampler.scala @@ -0,0 +1,101 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.net.InetAddress +import java.util.concurrent.atomic.AtomicLong +import kamon.util.{NanoTimestamp, NanoInterval, Sequencer} +import scala.concurrent.forkjoin.ThreadLocalRandom + +import scala.util.Try + +trait Sampler { + def shouldTrace: Boolean + def shouldReport(traceElapsedTime: NanoInterval): Boolean +} + +object NoSampling extends Sampler { + def shouldTrace: Boolean = false + def shouldReport(traceElapsedTime: NanoInterval): Boolean = false +} + +object SampleAll extends Sampler { + def shouldTrace: Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class RandomSampler(chance: Int) extends Sampler { + require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0") + require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100") + + def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class OrderedSampler(interval: Int) extends Sampler { + import OrderedSampler._ + + require(interval > 0, "kamon.trace.ordered-sampler.sample-interval cannot be <= 0") + assume(interval isPowerOfTwo, "kamon.trace.ordered-sampler.sample-interval must be power of two") + + private val sequencer = Sequencer() + + def shouldTrace: Boolean = (sequencer.next() fastMod interval) == 0 + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +object OrderedSampler { + implicit class EnhancedInt(i: Int) { + def isPowerOfTwo = (i & (i - 1)) == 0 + } + + implicit class EnhancedLong(dividend: Long) { + def fastMod(divisor: Int) = dividend & (divisor - 1) + } +} + +class ThresholdSampler(thresholdInNanoseconds: NanoInterval) extends Sampler { + require(thresholdInNanoseconds.nanos > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0") + + def shouldTrace: Boolean = true + def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime >= thresholdInNanoseconds +} + +class ClockSampler(pauseInNanoseconds: NanoInterval) extends Sampler { + require(pauseInNanoseconds.nanos > 0, "kamon.trace.clock-sampler.pause cannot be <= 0") + + private val timer: AtomicLong = new AtomicLong(0L) + + def shouldTrace: Boolean = { + val now = NanoTimestamp.now.nanos + val lastTimer = timer.get() + if ((lastTimer + pauseInNanoseconds.nanos) < now) + timer.compareAndSet(lastTimer, now) + else + false + } + def shouldReport(traceElapsedTime: NanoInterval): Boolean = true +} + +class DefaultTokenGenerator extends Function0[String] { + private val _hostnamePrefix = Try(InetAddress.getLocalHost.getHostName).getOrElse("unknown-localhost") + private val _tokenCounter = new AtomicLong + + def apply(): String = { + _hostnamePrefix + "-" + String.valueOf(_tokenCounter.incrementAndGet()) + } +}
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/legacy-main/scala/kamon/trace/TraceContext.scala new file mode 100644 index 00000000..bbf40d8d --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/TraceContext.scala @@ -0,0 +1,202 @@ +/* + * ========================================================================================= + * Copyright © 2013-2016 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.io.ObjectStreamException +import java.util + +import kamon.trace.Status.Closed +import kamon.trace.TraceContextAware.DefaultTraceContextAware +import kamon.util.{Function, RelativeNanoTimestamp, SameThreadExecutionContext, Supplier} + +import scala.concurrent.Future + +trait TraceContext { + def name: String + def token: String + def tags: Map[String, String] + def isEmpty: Boolean + def nonEmpty: Boolean = !isEmpty + def isClosed: Boolean = !(Status.Open == status) + def status: Status + def finish(): Unit + def finishWithError(cause: Throwable): Unit + def rename(newName: String): Unit + def startSegment(segmentName: String, category: String, library: String): Segment + def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment + def addMetadata(key: String, value: String): Unit + def addTag(key: String, value: String): Unit + def removeTag(key: String, value: String): Boolean + def startTimestamp: RelativeNanoTimestamp + + def collect[T](f: TraceContext ⇒ T): Option[T] = + if (nonEmpty) + Some(f(this)) + else None + + def collect[T](f: Function[TraceContext, T]): Option[T] = + if (nonEmpty) + Some(f(this)) + else None + + def withNewSegment[T](segmentName: String, category: String, library: String)(code: ⇒ T): T = { + withNewSegment(segmentName, category, library, Map.empty[String, String])(code) + } + + def withNewSegment[T](segmentName: String, category: String, library: String, tags: Map[String, String])(code: ⇒ T): T = { + val segment = startSegment(segmentName, category, library, tags) + try code finally segment.finish() + } + + def withNewAsyncSegment[T](segmentName: String, category: String, library: String)(code: ⇒ Future[T]): Future[T] = { + withNewAsyncSegment(segmentName, category, library, Map.empty[String, String])(code) + } + + def withNewAsyncSegment[T](segmentName: String, category: String, library: String, tags: Map[String, String])(code: ⇒ Future[T]): Future[T] = { + val segment = startSegment(segmentName, category, library, tags) + val result = code + result.onComplete(_ ⇒ segment.finish())(SameThreadExecutionContext) + result + } + + // Java variant. + def withNewSegment[T](segmentName: String, category: String, library: String, code: Supplier[T]): T = + withNewSegment(segmentName, category, library)(code.get) + + def withNewSegment[T](segmentName: String, category: String, library: String, tags: util.Map[String, String], code: Supplier[T]): T = { + import scala.collection.JavaConverters._ + withNewSegment(segmentName, category, library, tags.asScala.toMap)(code.get) + } +} + +trait Segment { + def name: String + def category: String + def library: String + def tags: Map[String, String] + def isEmpty: Boolean + def nonEmpty: Boolean = !isEmpty + def isClosed: Boolean = !(Status.Open == status) + def status: Status + def finish(): Unit + def finishWithError(cause: Throwable): Unit + def rename(newName: String): Unit + def addMetadata(key: String, value: String): Unit + def addTag(key: String, value: String): Unit + def removeTag(key: String, value: String): Boolean +} + +case object EmptyTraceContext extends TraceContext { + def name: String = "empty-trace" + def token: String = "" + def tags: Map[String, String] = Map.empty + def isEmpty: Boolean = true + def status: Status = Closed + def finish(): Unit = {} + def finishWithError(cause: Throwable): Unit = {} + def rename(name: String): Unit = {} + def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment + def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = EmptySegment + def addMetadata(key: String, value: String): Unit = {} + def startTimestamp = new RelativeNanoTimestamp(0L) + def addTag(key: String, value: String): Unit = {} + def removeTag(key: String, value: String): Boolean = false + + case object EmptySegment extends Segment { + val name: String = "empty-segment" + val category: String = "empty-category" + val library: String = "empty-library" + def tags: Map[String, String] = Map.empty + def isEmpty: Boolean = true + def status: Status = Closed + def finish(): Unit = {} + def finishWithError(cause: Throwable): Unit = {} + def rename(newName: String): Unit = {} + def addMetadata(key: String, value: String): Unit = {} + def addTag(key: String, value: String): Unit = {} + def removeTag(key: String, value: String): Boolean = false + } +} + +object SegmentCategory { + val HttpClient = "http-client" + val Database = "database" +} + +class LOD private[trace] (val level: Int) extends AnyVal +object LOD { + val MetricsOnly = new LOD(1) + val SimpleTrace = new LOD(2) +} + +sealed trait LevelOfDetail +object LevelOfDetail { + case object MetricsOnly extends LevelOfDetail + case object SimpleTrace extends LevelOfDetail + case object FullTrace extends LevelOfDetail +} + +sealed trait Status +object Status { + case object Open extends Status + case object Closed extends Status + case object FinishedWithError extends Status + case object FinishedSuccessfully extends Status +} + +trait TraceContextAware extends Serializable { + def traceContext: TraceContext +} + +object TraceContextAware { + def default: TraceContextAware = new DefaultTraceContextAware + + class DefaultTraceContextAware extends TraceContextAware { + @transient val traceContext = Tracer.currentContext + + // + // Beware of this hack, it might bite us in the future! + // + // When using remoting/cluster all messages carry the TraceContext in the envelope in which they + // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is + // available (if any) when the system messages are read and this will make sure that it is correctly + // captured and propagated. + @throws[ObjectStreamException] + private def readResolve: AnyRef = { + new DefaultTraceContextAware + } + } +} + +trait TimestampedTraceContextAware extends TraceContextAware { + def captureNanoTime: Long +} + +object TimestampedTraceContextAware { + def default: TimestampedTraceContextAware = new DefaultTraceContextAware with TimestampedTraceContextAware { + @transient val captureNanoTime = System.nanoTime() + } +} + +trait SegmentAware { + @volatile @transient var segment: Segment = EmptyTraceContext.EmptySegment +} + +object SegmentAware { + def default: SegmentAware = new DefaultSegmentAware + class DefaultSegmentAware extends DefaultTraceContextAware with SegmentAware {} +}
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/legacy-main/scala/kamon/trace/TraceLocal.scala new file mode 100644 index 00000000..460e4b22 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/TraceLocal.scala @@ -0,0 +1,64 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import kamon.trace.TraceLocal.TraceLocalKey +import kamon.util.Supplier +import scala.collection.concurrent.TrieMap + +object TraceLocal { + + trait TraceLocalKey[T] + + trait AvailableToMdc extends TraceLocalKey[String] { + def mdcKey: String + } + + object AvailableToMdc { + case class DefaultKeyAvailableToMdc(mdcKey: String) extends AvailableToMdc + + def fromKey(mdcKey: String): AvailableToMdc = DefaultKeyAvailableToMdc(mdcKey) + def apply(mdcKey: String): AvailableToMdc = fromKey(mdcKey) + } + + def store[T](key: TraceLocalKey[T])(value: Any): Unit = Tracer.currentContext match { + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value) + case EmptyTraceContext ⇒ // Can't store in the empty context. + } + + def retrieve[T](key: TraceLocalKey[T]): Option[T] = Tracer.currentContext match { + case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key) + case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context. + } + + // Java variant + @throws(classOf[NoSuchElementException]) + def get[T](key: TraceLocalKey[T]): T = retrieve(key).get + + def getOrElse[T](key: TraceLocalKey[T], code: Supplier[T]): T = retrieve(key).getOrElse(code.get) + + def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value) + + def newTraceLocalKey[T]: TraceLocalKey[T] = new TraceLocalKey[T] {} +} + +class TraceLocalStorage { + val underlyingStorage = TrieMap[TraceLocalKey[_], Any]() + + def store[T](key: TraceLocalKey[T])(value: Any): Unit = underlyingStorage.put(key, value) + def retrieve[T](key: TraceLocalKey[T]): Option[T] = underlyingStorage.get(key).map(_.asInstanceOf[T]) +} diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/TraceSettings.scala b/kamon-core/src/legacy-main/scala/kamon/trace/TraceSettings.scala new file mode 100644 index 00000000..c3a83e93 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/TraceSettings.scala @@ -0,0 +1,51 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import kamon.util.ConfigTools.Syntax +import com.typesafe.config.Config +import kamon.util.NanoInterval + +case class TraceSettings(levelOfDetail: LevelOfDetail, sampler: Sampler, tokenGenerator: () ⇒ String) + +object TraceSettings { + def apply(config: Config): TraceSettings = { + val tracerConfig = config.getConfig("kamon.trace") + + val detailLevel: LevelOfDetail = tracerConfig.getString("level-of-detail") match { + case "metrics-only" ⇒ LevelOfDetail.MetricsOnly + case "simple-trace" ⇒ LevelOfDetail.SimpleTrace + case other ⇒ sys.error(s"Unknown tracer level of detail [$other] present in the configuration file.") + } + + val sampler: Sampler = + if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling + else tracerConfig.getString("sampling") match { + case "all" ⇒ SampleAll + case "random" ⇒ new RandomSampler(tracerConfig.getInt("random-sampler.chance")) + case "ordered" ⇒ new OrderedSampler(tracerConfig.getInt("ordered-sampler.sample-interval")) + case "threshold" ⇒ new ThresholdSampler(new NanoInterval(tracerConfig.getFiniteDuration("threshold-sampler.minimum-elapsed-time").toNanos)) + case "clock" ⇒ new ClockSampler(new NanoInterval(tracerConfig.getFiniteDuration("clock-sampler.pause").toNanos)) + } + + val dynamic = new akka.actor.ReflectiveDynamicAccess(getClass.getClassLoader) + val tokenGeneratorFQN = tracerConfig.getString("token-generator") + val tokenGenerator = dynamic.createInstanceFor[() ⇒ String](tokenGeneratorFQN, Nil).get // let's bubble up any problems. + + TraceSettings(detailLevel, sampler, tokenGenerator) + } +}
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/legacy-main/scala/kamon/trace/TraceSubscriptions.scala new file mode 100644 index 00000000..47455ec5 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/TraceSubscriptions.scala @@ -0,0 +1,45 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import akka.actor.{Terminated, ActorRef, Actor} + +class TraceSubscriptions extends Actor { + import TraceSubscriptions._ + + var subscribers: List[ActorRef] = Nil + + def receive = { + case Subscribe(newSubscriber) ⇒ + if (!subscribers.contains(newSubscriber)) + subscribers = context.watch(newSubscriber) :: subscribers + + case Unsubscribe(leavingSubscriber) ⇒ + subscribers = subscribers.filter(_ == leavingSubscriber) + + case Terminated(terminatedSubscriber) ⇒ + subscribers = subscribers.filter(_ == terminatedSubscriber) + + case trace: TraceInfo ⇒ + subscribers.foreach(_ ! trace) + } +} + +object TraceSubscriptions { + case class Subscribe(subscriber: ActorRef) + case class Unsubscribe(subscriber: ActorRef) +} diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/TracerModule.scala b/kamon-core/src/legacy-main/scala/kamon/trace/TracerModule.scala new file mode 100644 index 00000000..552962eb --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/TracerModule.scala @@ -0,0 +1,197 @@ +/* + * ========================================================================================= + * Copyright © 2013-2016 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util + +import akka.actor._ +import akka.event.{Logging, LoggingAdapter} +import com.typesafe.config.Config +import kamon.Kamon +import kamon.metric.MetricsModule +import kamon.util._ + +import scala.collection.JavaConverters._ + +trait TracerModule { + def newContext(name: String): TraceContext + def newContext(name: String, token: Option[String]): TraceContext + def newContext(name: String, token: Option[String], tags: Map[String, String]): TraceContext + def newContext(name: String, token: Option[String], tags: Map[String, String], timestamp: RelativeNanoTimestamp, state: Status, isLocal: Boolean): TraceContext + + def subscribe(subscriber: ActorRef): Unit + def unsubscribe(subscriber: ActorRef): Unit +} + +object Tracer { + private[kamon] val _traceContextStorage = new ThreadLocal[TraceContext] { + override def initialValue(): TraceContext = EmptyTraceContext + } + + def currentContext: TraceContext = + _traceContextStorage.get() + + def setCurrentContext(context: TraceContext): Unit = + _traceContextStorage.set(context) + + def clearCurrentContext: Unit = + _traceContextStorage.remove() + + def withContext[T](context: TraceContext)(code: ⇒ T): T = { + val oldContext = _traceContextStorage.get() + _traceContextStorage.set(context) + + try code finally _traceContextStorage.set(oldContext) + } + + // Java variant. + def withContext[T](context: TraceContext, code: Supplier[T]): T = + withContext(context)(code.get) + + def withNewContext[T](traceName: String, traceToken: Option[String], tags: Map[String, String], autoFinish: Boolean)(code: ⇒ T): T = { + withContext(Kamon.tracer.newContext(traceName, traceToken, tags)) { + val codeResult = code + if (autoFinish) + currentContext.finish() + + codeResult + } + } + + def withNewContext[T](traceName: String)(code: ⇒ T): T = + withNewContext(traceName, None)(code) + + def withNewContext[T](traceName: String, tags: Map[String, String])(code: ⇒ T): T = + withNewContext(traceName, None, tags)(code) + + def withNewContext[T](traceName: String, traceToken: Option[String])(code: ⇒ T): T = + withNewContext(traceName, traceToken, Map.empty[String, String])(code) + + def withNewContext[T](traceName: String, traceToken: Option[String], tags: Map[String, String])(code: ⇒ T): T = + withNewContext(traceName, traceToken, tags, autoFinish = false)(code) + + def withNewContext[T](traceName: String, autoFinish: Boolean)(code: ⇒ T): T = + withNewContext(traceName, None, Map.empty[String, String], autoFinish)(code) + + def withNewContext[T](traceName: String, tags: Map[String, String], autoFinish: Boolean)(code: ⇒ T): T = + withNewContext(traceName, None, tags, autoFinish)(code) + + // Java variants. + def withNewContext[T](traceName: String, traceToken: Option[String], autoFinish: Boolean, code: Supplier[T]): T = + withNewContext(traceName, traceToken, Map.empty[String, String], autoFinish)(code.get) + + def withNewContext[T](traceName: String, traceToken: Option[String], tags: util.Map[String, String], autoFinish: Boolean, code: Supplier[T]): T = + withNewContext(traceName, traceToken, tags.asScala.toMap, autoFinish)(code.get) + + def withNewContext[T](traceName: String, code: Supplier[T]): T = + withNewContext(traceName, None, Map.empty[String, String], autoFinish = false)(code.get) + + def withNewContext[T](traceName: String, tags: util.Map[String, String], code: Supplier[T]): T = + withNewContext(traceName, None, tags.asScala.toMap, autoFinish = false)(code.get) + + def withNewContext[T](traceName: String, traceToken: Option[String], code: Supplier[T]): T = + withNewContext(traceName, traceToken, Map.empty[String, String], autoFinish = false)(code.get) + + def withNewContext[T](traceName: String, traceToken: Option[String], tags: util.Map[String, String], code: Supplier[T]): T = + withNewContext(traceName, traceToken, tags.asScala.toMap, autoFinish = false)(code.get) + + def withNewContext[T](traceName: String, autoFinish: Boolean, code: Supplier[T]): T = + withNewContext(traceName, None, Map.empty[String, String], autoFinish)(code.get) + + def withNewContext[T](traceName: String, tags: util.Map[String, String], autoFinish: Boolean, code: Supplier[T]): T = + withNewContext(traceName, None, tags.asScala.toMap, autoFinish)(code.get) +} + +private[kamon] class TracerModuleImpl(metricsExtension: MetricsModule, config: Config) extends TracerModule { + @volatile private var _settings = TraceSettings(config) + + private val _subscriptions = new LazyActorRef + private val _incubator = new LazyActorRef + + private def newToken: String = _settings.tokenGenerator() + + def newContext(name: String): TraceContext = + createTraceContext(name, None) + + def newContext(name: String, token: Option[String]): TraceContext = + createTraceContext(name, token) + + def newContext(name: String, token: Option[String], tags: Map[String, String]): TraceContext = + createTraceContext(name, token, tags) + + def newContext(name: String, token: Option[String], tags: Map[String, String], timestamp: RelativeNanoTimestamp, status: Status, isLocal: Boolean): TraceContext = + createTraceContext(name, token, tags, timestamp, status, isLocal) + + private def createTraceContext(traceName: String, token: Option[String], tags: Map[String, String] = Map.empty, startTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now, + status: Status = Status.Open, isLocal: Boolean = true): TraceContext = { + + def newMetricsOnlyContext(token: String): TraceContext = + new MetricsOnlyContext(traceName, token, tags, status, _settings.levelOfDetail, startTimestamp, _logger) + + val traceToken = token.getOrElse(newToken) + + _settings.levelOfDetail match { + case LevelOfDetail.MetricsOnly ⇒ + newMetricsOnlyContext(traceToken) + case _ if !isLocal || !_settings.sampler.shouldTrace ⇒ + newMetricsOnlyContext(traceToken) + case _ ⇒ + new TracingContext(traceName, traceToken, tags, currentStatus = Status.Open, _settings.levelOfDetail, isLocal, startTimestamp, _logger, dispatchTracingContext) + } + } + + def subscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(TraceSubscriptions.Subscribe(subscriber)) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(TraceSubscriptions.Unsubscribe(subscriber)) + + private[kamon] def dispatchTracingContext(trace: TracingContext): Unit = + if (_settings.sampler.shouldReport(trace.elapsedTime)) + if (trace.shouldIncubate) + _incubator.tell(trace) + else + _subscriptions.tell(trace.generateTraceInfo) + + /** + * Tracer Extension initialization. + */ + private var _system: ActorSystem = null + private var _logger: LoggingAdapter = null + private lazy val _start = { + val subscriptions = _system.actorOf(Props[TraceSubscriptions], "trace-subscriptions") + _subscriptions.point(subscriptions) + _incubator.point(_system.actorOf(Incubator.props(subscriptions))) + } + + def start(system: ActorSystem, newConfig: Config): Unit = synchronized { + _settings = TraceSettings(newConfig) + _system = system + _logger = Logging(_system, "TracerModule") + _start + _system = null + } +} + +private[kamon] object TracerModuleImpl { + + def apply(metricsExtension: MetricsModule, config: Config) = + new TracerModuleImpl(metricsExtension, config) +} + +case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], tags: Map[String, String], segments: List[SegmentInfo], status: Status) +case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], tags: Map[String, String], status: Status)
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/legacy-main/scala/kamon/trace/TracingContext.scala new file mode 100644 index 00000000..ff128f85 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/TracingContext.scala @@ -0,0 +1,113 @@ +/* + * ========================================================================================= + * Copyright © 2013-2016 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger + +import akka.event.LoggingAdapter +import kamon.util.{NanoInterval, NanoTimestamp, RelativeNanoTimestamp} + +import scala.collection.concurrent.TrieMap + +private[trace] class TracingContext( + traceName: String, + token: String, + traceTags: Map[String, String], + currentStatus: Status, + levelOfDetail: LevelOfDetail, + isLocal: Boolean, + startTimeztamp: RelativeNanoTimestamp, + log: LoggingAdapter, + traceInfoSink: TracingContext ⇒ Unit +) extends MetricsOnlyContext(traceName, token, traceTags, currentStatus, levelOfDetail, startTimeztamp, log) { + + private val _openSegments = new AtomicInteger(0) + private val _startTimestamp = NanoTimestamp.now + private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]() + private val _metadata = TrieMap.empty[String, String] + + override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value) + + override def startSegment(segmentName: String, category: String, library: String): Segment = { + startSegment(segmentName, category, library, Map.empty[String, String]) + } + + override def startSegment(segmentName: String, category: String, library: String, tags: Map[String, String]): Segment = { + _openSegments.incrementAndGet() + val newSegment = new TracingSegment(segmentName, category, library, tags) + _allSegments.add(newSegment) + newSegment + } + + override def finish(): Unit = { + super.finish() + traceInfoSink(this) + } + + override def finishWithError(cause: Throwable): Unit = { + super.finishWithError(cause) + traceInfoSink(this) + } + + override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval, tags: Map[String, String], isFinishedWithError: Boolean = false): Unit = { + _openSegments.decrementAndGet() + super.finishSegment(segmentName, category, library, duration, tags, isFinishedWithError) + } + + def shouldIncubate: Boolean = (Status.Open == status) || _openSegments.get() > 0 + + // Handle with care, should only be used after a trace is finished. + def generateTraceInfo: TraceInfo = { + require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.") + + val currentSegments = _allSegments.iterator() + var segmentsInfo = List.newBuilder[SegmentInfo] + + while (currentSegments.hasNext) { + val segment = currentSegments.next() + if (segment.isClosed) + segmentsInfo += segment.createSegmentInfo(_startTimestamp, startTimestamp) + else + log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name) + } + + TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, tags, segmentsInfo.result(), status) + } + + class TracingSegment( + segmentName: String, + category: String, + library: String, + segmentTags: Map[String, String] + ) extends MetricsOnlySegment(segmentName, category, library, segmentTags) { + + private val metadata = TrieMap.empty[String, String] + override def addMetadata(key: String, value: String): Unit = metadata.put(key, value) + + // Handle with care, should only be used after the segment has finished. + def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = { + require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.") + + // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both + // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it. + val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos) + + SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap, tags, status) + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala b/kamon-core/src/legacy-main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala new file mode 100644 index 00000000..8177ed14 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/logging/LogbackTraceTokenConverter.scala @@ -0,0 +1,26 @@ +/* =================================================== + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================================================== */ +package kamon.trace.logging + +import ch.qos.logback.classic.pattern.ClassicConverter +import ch.qos.logback.classic.spi.ILoggingEvent +import kamon.trace.Tracer + +class LogbackTraceTokenConverter extends ClassicConverter { + + def convert(event: ILoggingEvent): String = + Tracer.currentContext.collect(_.token).getOrElse("undefined") +} diff --git a/kamon-core/src/legacy-main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/legacy-main/scala/kamon/trace/logging/MdcKeysSupport.scala new file mode 100644 index 00000000..556366b0 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/trace/logging/MdcKeysSupport.scala @@ -0,0 +1,54 @@ +/* + * ========================================================================================= + * Copyright © 2013-2016 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace.logging + +import kamon.trace.TraceLocal.AvailableToMdc +import kamon.trace.{EmptyTraceContext, MetricsOnlyContext, TraceContext, Tracer} +import kamon.util.Supplier +import org.slf4j.MDC + +trait MdcKeysSupport { + + val traceTokenKey = "traceToken" + val traceNameKey = "traceName" + + private val defaultKeys = Seq(traceTokenKey, traceNameKey) + + def withMdc[A](thunk: ⇒ A): A = { + val keys = copyToMdc(Tracer.currentContext) + try thunk finally keys.foreach(key ⇒ MDC.remove(key)) + } + + // Java variant. + def withMdc[A](thunk: Supplier[A]): A = withMdc(thunk.get) + + private[kamon] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match { + case ctx: MetricsOnlyContext ⇒ + + // Add the default key value pairs for the trace token and trace name. + MDC.put(traceTokenKey, ctx.token) + MDC.put(traceNameKey, ctx.name) + + defaultKeys ++ ctx.traceLocalStorage.underlyingStorage.collect { + case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey → String.valueOf(value)) + }.flatMap { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } } + + case EmptyTraceContext ⇒ Iterable.empty[String] + } +} + +object MdcKeysSupport extends MdcKeysSupport
\ No newline at end of file |