aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-12-04 03:22:20 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2014-12-04 03:22:20 +0100
commitfcbde8b48a539b649cb5c1237824cdaf6badb370 (patch)
treeb5209a4305709d3b668d73cd64f40cbe358f1402 /kamon-core/src
parent84ceb0a7b293e1a4279337eb0d4eb2b5b8b131bb (diff)
parent432fb45952c587bcebf81d718188e7067572cf49 (diff)
downloadKamon-fcbde8b48a539b649cb5c1237824cdaf6badb370.tar.gz
Kamon-fcbde8b48a539b649cb5c1237824cdaf6badb370.tar.bz2
Kamon-fcbde8b48a539b649cb5c1237824cdaf6badb370.zip
Merge branch 'wip/simple-tracing-implementation'
Diffstat (limited to 'kamon-core/src')
-rw-r--r--kamon-core/src/main/resources/reference.conf44
-rw-r--r--kamon-core/src/main/scala/kamon/TimeUnits.scala57
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Incubator.scala80
-rw-r--r--kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala105
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Sampler.scala46
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala119
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceExtension.scala56
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceLocal.scala8
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala29
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracingContext.scala76
-rw-r--r--kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala4
-rw-r--r--kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala103
13 files changed, 640 insertions, 118 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 639d4aba..7c4b4ecb 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -127,6 +127,50 @@ kamon {
trace {
+ # Level of detail used when recording trace information. The posible values are:
+ # - metrics-only: metrics for all included traces and all segments are recorded, but no Trace messages will be sent
+ # to the subscriptors of trace data.
+ # - simple-trace: metrics for all included traces and all segments are recorded and additionally a Trace message
+ # containing the trace and segments details and metadata.
+ level = metrics-only
+
+ # Sampling strategy to apply when the tracing level is set to `simple-trace`. The options are: all, random, ordered
+ # and threshold. The details of each sampler are bellow.
+ sampling = random
+
+ # Use a ThreadLocalRandom to generate numbers between 1 and 100, if the random number is less or equal to .chance
+ # then tracing information will be gathered and reported for the current trace.
+ random-sampler {
+ chance = 10
+ }
+
+ # Use a AtomicLong to ensure that every .sample-interval number of requests tracing information will be gathered and
+ # reported.
+ ordered-sampler {
+ sample-interval = 10
+ }
+
+ # Gather tracing information for all traces but only report those whose elapsed-time is equal or greated to the
+ # .minimum-elapsed-time setting.
+ threshold-sampler {
+ threshold = 1 second
+ }
+
+ incubator {
+ # Minimum time to stay in the trace incubator before checking if the trace should not be incubated anymore. No
+ # checks are made at least until this period has passed.
+ min-incubation-time = 5 seconds
+
+ # Time to wait between incubation checks. After min-incubation-time, a trace is checked using this interval and if
+ # if shouldn't be incubated anymore, the TraceInfo is collected and reported for it.
+ check-interval = 1 second
+
+ # Max amount of time that a trace can be in the incubator. If this time is reached for a given trace then it will
+ # be reported with whatever information is available at the moment, logging a warning for each segment that remains
+ # open after this point.
+ max-incubation-time = 20 seconds
+ }
+
# If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask`
# pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment
# the future was created.
diff --git a/kamon-core/src/main/scala/kamon/TimeUnits.scala b/kamon-core/src/main/scala/kamon/TimeUnits.scala
new file mode 100644
index 00000000..44f5b4c3
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/TimeUnits.scala
@@ -0,0 +1,57 @@
+package kamon
+
+/**
+ * Epoch time stamp in milliseconds.
+ */
+class MilliTimestamp(val millis: Long) extends AnyVal {
+ override def toString: String = String.valueOf(millis) + ".millis"
+}
+
+object MilliTimestamp {
+ def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis())
+}
+
+/**
+ * Epoch time stamp in nanoseconds.
+ *
+ * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch
+ * timestamp in nanoseconds.
+ */
+class NanoTimestamp(val nanos: Long) extends AnyVal {
+ override def toString: String = String.valueOf(nanos) + ".nanos"
+}
+
+object NanoTimestamp {
+ def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000)
+}
+
+/**
+ * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime()
+ */
+class RelativeNanoTimestamp(val nanos: Long) extends AnyVal {
+ override def toString: String = String.valueOf(nanos) + ".nanos"
+}
+
+object RelativeNanoTimestamp {
+ def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime())
+ def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp =
+ new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000)
+}
+
+/**
+ * Number of nanoseconds that passed between two points in time.
+ */
+class NanoInterval(val nanos: Long) extends AnyVal {
+ def <(that: NanoInterval): Boolean = this.nanos < that.nanos
+ def >(that: NanoInterval): Boolean = this.nanos > that.nanos
+ def ==(that: NanoInterval): Boolean = this.nanos == that.nanos
+ def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos
+ def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos
+
+ override def toString: String = String.valueOf(nanos) + ".nanos"
+}
+
+object NanoInterval {
+ def default: NanoInterval = new NanoInterval(0L)
+ def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos)
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
new file mode 100644
index 00000000..df51f411
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
@@ -0,0 +1,80 @@
+package kamon.trace
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{ ActorLogging, Props, Actor, ActorRef }
+import kamon.{ NanoInterval, RelativeNanoTimestamp }
+import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace }
+import scala.annotation.tailrec
+import scala.collection.immutable.Queue
+import scala.concurrent.duration._
+
+class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging {
+ import context.dispatcher
+ val config = context.system.settings.config.getConfig("kamon.trace.incubator")
+
+ val minIncubationTime = new NanoInterval(config.getDuration("min-incubation-time", TimeUnit.NANOSECONDS))
+ val maxIncubationTime = new NanoInterval(config.getDuration("max-incubation-time", TimeUnit.NANOSECONDS))
+ val checkInterval = config.getDuration("check-interval", TimeUnit.MILLISECONDS)
+
+ val checkSchedule = context.system.scheduler.schedule(checkInterval.millis, checkInterval.millis, self, CheckForCompletedTraces)
+ var waitingForMinimumIncubation = Queue.empty[IncubatingTrace]
+ var waitingForIncubationFinish = List.empty[IncubatingTrace]
+
+ def receive = {
+ case tc: TracingContext ⇒ incubate(tc)
+ case CheckForCompletedTraces ⇒
+ checkWaitingForMinimumIncubation()
+ checkWaitingForIncubationFinish()
+ }
+
+ def incubate(tc: TracingContext): Unit =
+ waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now))
+
+ @tailrec private def checkWaitingForMinimumIncubation(): Unit = {
+ if (waitingForMinimumIncubation.nonEmpty) {
+ val it = waitingForMinimumIncubation.head
+ if (NanoInterval.since(it.incubationStart) >= minIncubationTime) {
+ waitingForMinimumIncubation = waitingForMinimumIncubation.tail
+
+ if (it.tc.shouldIncubate)
+ waitingForIncubationFinish = it :: waitingForIncubationFinish
+ else
+ dispatchTraceInfo(it.tc)
+
+ checkWaitingForMinimumIncubation()
+ }
+ }
+ }
+
+ private def checkWaitingForIncubationFinish(): Unit = {
+ waitingForIncubationFinish = waitingForIncubationFinish.filter {
+ case IncubatingTrace(context, incubationStart) ⇒
+ if (!context.shouldIncubate) {
+ dispatchTraceInfo(context)
+ false
+ } else {
+ if (NanoInterval.since(incubationStart) >= maxIncubationTime) {
+ log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token)
+ dispatchTraceInfo(context);
+ false
+ } else true
+ }
+ }
+ }
+
+ def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo
+
+ override def postStop(): Unit = {
+ super.postStop()
+ checkSchedule.cancel()
+ }
+}
+
+object Incubator {
+
+ def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions))
+
+ case object CheckForCompletedTraces
+ case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp)
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
new file mode 100644
index 00000000..f478d971
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
@@ -0,0 +1,105 @@
+package kamon.trace
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import akka.actor.ActorSystem
+import akka.event.LoggingAdapter
+import kamon.{ RelativeNanoTimestamp, NanoInterval }
+import kamon.metric.TraceMetrics.TraceMetricRecorder
+import kamon.metric.{ MetricsExtension, TraceMetrics }
+
+import scala.annotation.tailrec
+
+private[trace] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, val origin: TraceContextOrigin,
+ val startRelativeTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val system: ActorSystem)
+ extends TraceContext {
+
+ @volatile private var _name = traceName
+ @volatile private var _isOpen = izOpen
+ @volatile protected var _elapsedTime = NanoInterval.default
+
+ private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]()
+ private val _traceLocalStorage = new TraceLocalStorage
+
+ def rename(newName: String): Unit =
+ if (isOpen)
+ _name = newName
+ else if (log.isWarningEnabled)
+ log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName)
+
+ def name: String = _name
+ def isEmpty: Boolean = false
+ def isOpen: Boolean = _isOpen
+ def addMetadata(key: String, value: String): Unit = {}
+
+ def finish(): Unit = {
+ _isOpen = false
+ val traceElapsedTime = NanoInterval.since(startRelativeTimestamp)
+ _elapsedTime = traceElapsedTime
+
+ val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
+ metricRecorder.map { traceMetrics ⇒
+ traceMetrics.elapsedTime.record(traceElapsedTime.nanos)
+ drainFinishedSegments(traceMetrics)
+ }
+ }
+
+ def startSegment(segmentName: String, category: String, library: String): Segment =
+ new MetricsOnlySegment(segmentName, category, library)
+
+ @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
+ val segment = _finishedSegments.poll()
+ if (segment != null) {
+ metricRecorder.segmentRecorder(segment.identity).record(segment.duration.nanos)
+ drainFinishedSegments(metricRecorder)
+ }
+ }
+
+ protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = {
+ _finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration))
+
+ if (isClosed) {
+ metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
+ drainFinishedSegments(traceMetrics)
+ }
+ }
+ }
+
+ // Should only be used by the TraceLocal utilities.
+ def traceLocalStorage: TraceLocalStorage = _traceLocalStorage
+
+ // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default
+ // will be returned.
+ def elapsedTime: NanoInterval = _elapsedTime
+
+ class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment {
+ private val _startTimestamp = RelativeNanoTimestamp.now
+ @volatile private var _segmentName = segmentName
+ @volatile private var _elapsedTime = NanoInterval.default
+ @volatile private var _isOpen = true
+
+ def name: String = _segmentName
+ def isEmpty: Boolean = false
+ def addMetadata(key: String, value: String): Unit = {}
+ def isOpen: Boolean = _isOpen
+
+ def rename(newName: String): Unit =
+ if (isOpen)
+ _segmentName = newName
+ else if (log.isWarningEnabled)
+ log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName)
+
+ def finish: Unit = {
+ _isOpen = false
+ val segmentElapsedTime = NanoInterval.since(_startTimestamp)
+ _elapsedTime = segmentElapsedTime
+
+ finishSegment(name, category, library, segmentElapsedTime)
+ }
+
+ // Handle with care and make sure that the segment is closed before calling this method, otherwise
+ // NanoInterval.default will be returned.
+ def elapsedTime: NanoInterval = _elapsedTime
+ def startTimestamp: RelativeNanoTimestamp = _startTimestamp
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
new file mode 100644
index 00000000..650592a5
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
@@ -0,0 +1,46 @@
+package kamon.trace
+
+import java.util.concurrent.atomic.AtomicLong
+
+import kamon.NanoInterval
+import scala.concurrent.forkjoin.ThreadLocalRandom
+
+trait Sampler {
+ def shouldTrace: Boolean
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean
+}
+
+object NoSampling extends Sampler {
+ def shouldTrace: Boolean = false
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = false
+}
+
+object SampleAll extends Sampler {
+ def shouldTrace: Boolean = true
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = true
+}
+
+class RandomSampler(chance: Int) extends Sampler {
+ require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0")
+ require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100")
+
+ def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = true
+}
+
+class OrderedSampler(interval: Int) extends Sampler {
+ require(interval > 0, "kamon.trace.ordered-sampler.interval cannot be <= 0")
+
+ private val counter = new AtomicLong(0L)
+ def shouldTrace: Boolean = counter.incrementAndGet() % interval == 0
+ // TODO: find a more efficient way to do this, protect from long overflow.
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = true
+}
+
+class ThresholdSampler(thresholdInNanoseconds: Long) extends Sampler {
+ require(thresholdInNanoseconds > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0")
+
+ def shouldTrace: Boolean = true
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime.nanos >= thresholdInNanoseconds
+}
+
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 5b74e6b2..466ceafd 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -17,128 +17,71 @@
package kamon.trace
import java.io.ObjectStreamException
-
import akka.actor.ActorSystem
-import kamon.Kamon
+import kamon._
import kamon.metric._
-import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
-import kamon.metric.TraceMetrics.TraceMetricRecorder
-
-import scala.annotation.tailrec
-sealed trait TraceContext {
+trait TraceContext {
def name: String
def token: String
- def rename(name: String): Unit
- def finish(): Unit
def origin: TraceContextOrigin
- def isOpen: Boolean
- def isClosed: Boolean = !isOpen
def isEmpty: Boolean
def nonEmpty: Boolean = !isEmpty
+ def isOpen: Boolean
+ def isClosed: Boolean = !isOpen
+ def system: ActorSystem
+
+ def finish(): Unit
+ def rename(newName: String): Unit
def startSegment(segmentName: String, category: String, library: String): Segment
- def nanoTimestamp: Long
+ def addMetadata(key: String, value: String)
+ def startRelativeTimestamp: RelativeNanoTimestamp
}
-sealed trait Segment {
+trait Segment {
def name: String
- def rename(newName: String): Unit
def category: String
def library: String
- def finish(): Unit
def isEmpty: Boolean
+ def nonEmpty: Boolean = !isEmpty
+ def isOpen: Boolean
+ def isClosed: Boolean = !isOpen
+
+ def finish(): Unit
+ def rename(newName: String): Unit
+ def addMetadata(key: String, value: String)
}
case object EmptyTraceContext extends TraceContext {
def name: String = "empty-trace"
def token: String = ""
- def rename(name: String): Unit = {}
- def finish(): Unit = {}
def origin: TraceContextOrigin = TraceContextOrigin.Local
- def isOpen: Boolean = false
def isEmpty: Boolean = true
+ def isOpen: Boolean = false
+ def system: ActorSystem = sys.error("Can't obtain a ActorSystem from a EmptyTraceContext.")
+
+ def finish(): Unit = {}
+ def rename(name: String): Unit = {}
def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment
- def nanoTimestamp: Long = 0L
+ def addMetadata(key: String, value: String): Unit = {}
+ def startRelativeTimestamp = new RelativeNanoTimestamp(0L)
case object EmptySegment extends Segment {
val name: String = "empty-segment"
val category: String = "empty-category"
val library: String = "empty-library"
def isEmpty: Boolean = true
- def rename(newName: String): Unit = {}
- def finish: Unit = {}
- }
-}
-
-class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
- val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext {
-
- val isEmpty: Boolean = false
- @volatile private var _name = traceName
- @volatile private var _isOpen = izOpen
-
- private val _nanoTimestamp = nanoTimeztamp
- private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
- private val metricsExtension = Kamon(Metrics)(system)
- private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
-
- def name: String = _name
- def rename(newName: String): Unit =
- if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace.
-
- def isOpen: Boolean = _isOpen
- def nanoTimestamp: Long = _nanoTimestamp
-
- def finish(): Unit = {
- _isOpen = false
- val elapsedNanoTime = System.nanoTime() - _nanoTimestamp
- val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
-
- metricRecorder.map { traceMetrics ⇒
- traceMetrics.elapsedTime.record(elapsedNanoTime)
- drainFinishedSegments(traceMetrics)
- }
- }
-
- def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library)
+ def isOpen: Boolean = false
- @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
- val segment = finishedSegments.poll()
- if (segment != null) {
- metricRecorder.segmentRecorder(segment.identity).record(segment.duration)
- drainFinishedSegments(metricRecorder)
- }
- }
-
- private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = {
- finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration))
-
- if (isClosed) {
- metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
- drainFinishedSegments(traceMetrics)
- }
- }
- }
-
- class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment {
- private val _segmentStartNanoTime = System.nanoTime()
- @volatile private var _segmentName = segmentName
- @volatile private var _isOpen = true
-
- def name: String = _segmentName
- def rename(newName: String): Unit = _segmentName = newName
- def isEmpty: Boolean = false
-
- def finish: Unit = {
- val segmentFinishNanoTime = System.nanoTime()
- finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime))
- }
+ def finish: Unit = {}
+ def rename(newName: String): Unit = {}
+ def addMetadata(key: String, value: String): Unit = {}
}
}
case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity
-case class SegmentData(identity: SegmentMetricIdentity, duration: Long)
+case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: NanoInterval)
object SegmentCategory {
val HttpClient = "http-client"
@@ -146,7 +89,7 @@ object SegmentCategory {
sealed trait LevelOfDetail
object LevelOfDetail {
- case object OnlyMetrics extends LevelOfDetail
+ case object MetricsOnly extends LevelOfDetail
case object SimpleTrace extends LevelOfDetail
case object FullTrace extends LevelOfDetail
}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
index a59abc18..8bd9384a 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
@@ -16,14 +16,63 @@
package kamon.trace
-import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
+import akka.actor._
import akka.actor
+import akka.event.Logging
+import kamon._
+import kamon.metric.Metrics
import kamon.util.GlobPathFilter
-import kamon.Kamon
class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val config = system.settings.config.getConfig("kamon.trace")
val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing")
+
+ val detailLevel: LevelOfDetail = config.getString("level") match {
+ case "metrics-only" ⇒ LevelOfDetail.MetricsOnly
+ case "simple-trace" ⇒ LevelOfDetail.SimpleTrace
+ case other ⇒ sys.error(s"Unknown tracing level $other present in the configuration file.")
+ }
+
+ val sampler: Sampler =
+ if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling
+ else config.getString("sampling") match {
+ case "all" ⇒ SampleAll
+ case "random" ⇒ new RandomSampler(config.getInt("random-sampler.chance"))
+ case "ordered" ⇒ new OrderedSampler(config.getInt("ordered-sampler.interval"))
+ case "threshold" ⇒ new RandomSampler(config.getInt("threshold-sampler.threshold"))
+ }
+
+ val log = Logging(system, "TraceExtension")
+ val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions")
+ val incubator = system.actorOf(Incubator.props(subscriptions))
+ val metricsExtension = Kamon(Metrics)(system)
+
+ def newTraceContext(traceName: String, token: String, origin: TraceContextOrigin, system: ActorSystem): TraceContext =
+ newTraceContext(traceName, token, true, origin, RelativeNanoTimestamp.now, system)
+
+ def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin,
+ startTimestamp: RelativeNanoTimestamp, system: ActorSystem): TraceContext = {
+ def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, detailLevel, origin, startTimestamp, log, metricsExtension, system)
+
+ if (detailLevel == LevelOfDetail.MetricsOnly || origin == TraceContextOrigin.Remote)
+ newMetricsOnlyContext
+ else {
+ if (!sampler.shouldTrace)
+ newMetricsOnlyContext
+ else
+ new TracingContext(traceName, token, true, detailLevel, origin, startTimestamp, log, metricsExtension, this, system)
+ }
+ }
+
+ def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber)
+ def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber)
+
+ private[kamon] def dispatchTracingContext(trace: TracingContext): Unit =
+ if (sampler.shouldReport(trace.elapsedTime))
+ if (trace.shouldIncubate)
+ incubator ! trace
+ else
+ subscriptions ! trace.generateTraceInfo
}
object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
@@ -34,3 +83,6 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
}
}
+
+case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo])
+case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
index c5fb100c..84e234f3 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
@@ -43,13 +43,13 @@ object TraceLocal {
object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext }
def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value)
- case EmptyTraceContext ⇒ // Can't store in the empty context.
+ case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value)
+ case EmptyTraceContext ⇒ // Can't store in the empty context.
}
def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key)
- case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context.
+ case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key)
+ case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context.
}
def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value)
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
index 8da187cb..703896c3 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
@@ -16,6 +16,8 @@
package kamon.trace
+import kamon.{ MilliTimestamp, RelativeNanoTimestamp, Kamon }
+
import scala.language.experimental.macros
import java.util.concurrent.atomic.AtomicLong
import kamon.macros.InlineTraceContextMacro
@@ -34,27 +36,12 @@ object TraceRecorder {
def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet())
- private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = {
- new DefaultTraceContext(
- name,
- token.getOrElse(newToken),
- izOpen = true,
- LevelOfDetail.OnlyMetrics,
- TraceContextOrigin.Local,
- nanoTimeztamp = System.nanoTime,
- system)
- }
+ private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext =
+ Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), TraceContextOrigin.Local, system)
- def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = {
- val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000)
- new DefaultTraceContext(
- traceName,
- traceToken,
- isOpen,
- LevelOfDetail.OnlyMetrics,
- TraceContextOrigin.Remote,
- equivalentNanotime,
- system)
+ def joinRemoteTraceContext(traceName: String, traceToken: String, startTimestamp: MilliTimestamp, isOpen: Boolean, system: ActorSystem): TraceContext = {
+ val equivalentStartTimestamp = RelativeNanoTimestamp.relativeTo(startTimestamp)
+ Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentStartTimestamp, system)
}
def setContext(context: TraceContext): Unit = traceContextStorage.set(context)
@@ -81,8 +68,8 @@ object TraceRecorder {
}
def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match {
- case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system))
- case EmptyTraceContext ⇒ None
+ case ctx: MetricsOnlyContext ⇒ Some(thunk(ctx, ctx.system))
+ case EmptyTraceContext ⇒ None
}
def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext]
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala
new file mode 100644
index 00000000..d533a344
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala
@@ -0,0 +1,29 @@
+package kamon.trace
+
+import akka.actor.{ Terminated, ActorRef, Actor }
+
+class TraceSubscriptions extends Actor {
+ import TraceSubscriptions._
+
+ var subscribers: List[ActorRef] = Nil
+
+ def receive = {
+ case Subscribe(newSubscriber) ⇒
+ if (!subscribers.contains(newSubscriber))
+ subscribers = context.watch(newSubscriber) :: subscribers
+
+ case Unsubscribe(leavingSubscriber) ⇒
+ subscribers = subscribers.filter(_ == leavingSubscriber)
+
+ case Terminated(terminatedSubscriber) ⇒
+ subscribers = subscribers.filter(_ == terminatedSubscriber)
+
+ case trace: TraceInfo ⇒
+ subscribers.foreach(_ ! trace)
+ }
+}
+
+object TraceSubscriptions {
+ case class Subscribe(subscriber: ActorRef)
+ case class Unsubscribe(subscriber: ActorRef)
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
new file mode 100644
index 00000000..6a8cb1c6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
@@ -0,0 +1,76 @@
+package kamon.trace
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
+
+import akka.actor.ActorSystem
+import akka.event.LoggingAdapter
+import kamon.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp }
+import kamon.metric.MetricsExtension
+
+import scala.collection.concurrent.TrieMap
+
+private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin,
+ startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, traceExtension: TraceExtension, system: ActorSystem)
+ extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, startTimeztamp, log, metricsExtension, system) {
+
+ private val _openSegments = new AtomicInteger(0)
+ private val _startTimestamp = NanoTimestamp.now
+ private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]()
+ private val _metadata = TrieMap.empty[String, String]
+
+ override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value)
+
+ override def startSegment(segmentName: String, category: String, library: String): Segment = {
+ _openSegments.incrementAndGet()
+ val newSegment = new TracingSegment(segmentName, category, library)
+ _allSegments.add(newSegment)
+ newSegment
+ }
+
+ override def finish(): Unit = {
+ super.finish()
+ traceExtension.dispatchTracingContext(this)
+ }
+
+ override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = {
+ _openSegments.decrementAndGet()
+ super.finishSegment(segmentName, category, library, duration)
+ }
+
+ def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0
+
+ // Handle with care, should only be used after a trace is finished.
+ def generateTraceInfo: TraceInfo = {
+ require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.")
+
+ val currentSegments = _allSegments.iterator()
+ var segmentsInfo = List.newBuilder[SegmentInfo]
+
+ while (currentSegments.hasNext()) {
+ val segment = currentSegments.next()
+ if (segment.isClosed)
+ segmentsInfo += segment.createSegmentInfo(_startTimestamp, startRelativeTimestamp)
+ else
+ log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name)
+ }
+
+ TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result())
+ }
+
+ class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) {
+ private val metadata = TrieMap.empty[String, String]
+ override def addMetadata(key: String, value: String): Unit = metadata.put(key, value)
+
+ // Handle with care, should only be used after the segment has finished.
+ def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = {
+ require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.")
+
+ // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both
+ // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it.
+ val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos)
+
+ SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap)
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
index d79a3ab6..4f4efa4d 100644
--- a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
+++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
@@ -17,7 +17,7 @@
package kamon.trace.logging
import kamon.trace.TraceLocal.AvailableToMdc
-import kamon.trace.{ EmptyTraceContext, DefaultTraceContext, TraceContext, TraceRecorder }
+import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext, TraceRecorder }
import org.slf4j.MDC
@@ -29,7 +29,7 @@ trait MdcKeysSupport {
}
private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match {
- case ctx: DefaultTraceContext ⇒
+ case ctx: MetricsOnlyContext ⇒
ctx.traceLocalStorage.underlyingStorage.collect {
case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey -> String.valueOf(value))
}.map { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } }.flatten
diff --git a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
new file mode 100644
index 00000000..cda9cad7
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
@@ -0,0 +1,103 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import akka.actor.ActorSystem
+import akka.testkit.{ ImplicitSender, TestKitBase }
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import org.scalatest.{ Matchers, WordSpecLike }
+import scala.concurrent.duration._
+
+class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("simple-trace-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | tick-interval = 1 hour
+ | filters = [
+ | {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = [ "non-tracked-trace"]
+ | }
+ | }
+ | ]
+ | precision {
+ | default-histogram-precision {
+ | highest-trackable-value = 3600000000000
+ | significant-value-digits = 2
+ | }
+ |
+ | default-min-max-counter-precision {
+ | refresh-interval = 1 second
+ | highest-trackable-value = 999999999
+ | significant-value-digits = 2
+ | }
+ | }
+ |}
+ |
+ |kamon.trace {
+ | level = simple-trace
+ | sampling = all
+ |}
+ """.stripMargin))
+
+ "the simple tracing" should {
+ "send a TraceInfo when the trace has finished and all segments are finished" in {
+ Kamon(Trace)(system).subscribe(testActor)
+
+ TraceRecorder.withNewTraceContext("simple-trace-without-segments") {
+ TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish()
+ TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test").finish()
+ TraceRecorder.finish()
+ }
+
+ val traceInfo = expectMsgType[TraceInfo]
+ Kamon(Trace)(system).unsubscribe(testActor)
+
+ traceInfo.name should be("simple-trace-without-segments")
+ traceInfo.segments.size should be(2)
+ traceInfo.segments.find(_.name == "segment-one") should be('defined)
+ traceInfo.segments.find(_.name == "segment-two") should be('defined)
+ }
+
+ "incubate the tracing context if there are open segments after finishing" in {
+ Kamon(Trace)(system).subscribe(testActor)
+
+ val secondSegment = TraceRecorder.withNewTraceContext("simple-trace-without-segments") {
+ TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish()
+ val segment = TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test")
+ TraceRecorder.finish()
+ segment
+ }
+
+ expectNoMsg(2 seconds)
+ secondSegment.finish()
+
+ within(10 seconds) {
+ val traceInfo = expectMsgType[TraceInfo]
+ Kamon(Trace)(system).unsubscribe(testActor)
+
+ traceInfo.name should be("simple-trace-without-segments")
+ traceInfo.segments.size should be(2)
+ traceInfo.segments.find(_.name == "segment-one") should be('defined)
+ traceInfo.segments.find(_.name == "segment-two") should be('defined)
+ }
+ }
+
+ }
+}