aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-12-03 02:10:46 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2014-12-03 02:10:46 +0100
commit594c7a1729789eae7037918cde7287bdc4111b70 (patch)
tree1c54caaee4d218640c93d23dcdf6a8167cb2b4e0 /kamon-core/src
parentf511e1c25aa683e0d436ef9b65d3e841b4a83732 (diff)
downloadKamon-594c7a1729789eae7037918cde7287bdc4111b70.tar.gz
Kamon-594c7a1729789eae7037918cde7287bdc4111b70.tar.bz2
Kamon-594c7a1729789eae7037918cde7287bdc4111b70.zip
= core: first simple approach to providing traces and a subscription mechanism.
Diffstat (limited to 'kamon-core/src')
-rw-r--r--kamon-core/src/main/resources/reference.conf29
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Incubator.scala45
-rw-r--r--kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala99
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Sampler.scala45
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala87
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceExtension.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceLocal.scala8
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala25
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala29
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracingContext.scala72
-rw-r--r--kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala4
-rw-r--r--kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala101
12 files changed, 503 insertions, 92 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 639d4aba..78d0900e 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -127,6 +127,35 @@ kamon {
trace {
+ # Level of detail used when recording trace information. The posible values are:
+ # - metrics-only: metrics for all included traces and all segments are recorded, but no Trace messages will be sent
+ # to the subscriptors of trace data.
+ # - simple-trace: metrics for all included traces and all segments are recorded and additionally a Trace message
+ # containing the trace and segments details and metadata.
+ level = metrics-only
+
+ # Sampling strategy to apply when the tracing level is set to `simple-trace`. The options are: all, random, ordered
+ # and threshold. The details of each sampler are bellow.
+ sampling = random
+
+ # Use a ThreadLocalRandom to generate numbers between 1 and 100, if the random number is less or equal to .chance
+ # then tracing information will be gathered and reported for the current trace.
+ random-sampler {
+ chance = 10
+ }
+
+ # Use a AtomicLong to ensure that every .sample-interval number of requests tracing information will be gathered and
+ # reported.
+ ordered-sampler {
+ sample-interval = 10
+ }
+
+ # Gather tracing information for all traces but only report those whose elapsed-time is equal or greated to the
+ # .minimum-elapsed-time setting.
+ threshold-sampler {
+ threshold = 1 second
+ }
+
# If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask`
# pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment
# the future was created.
diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
new file mode 100644
index 00000000..d363d771
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
@@ -0,0 +1,45 @@
+package kamon.trace
+
+import akka.actor.{ Props, Actor, ActorRef }
+import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace }
+import scala.annotation.tailrec
+import scala.collection.immutable.Queue
+import scala.concurrent.duration._
+
+class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor {
+ import context.dispatcher
+ val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces)
+ var incubating = Queue.empty[IncubatingTrace]
+
+ def receive = {
+ case CheckForCompletedTraces ⇒ dispatchCompleted()
+ case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc))
+ }
+
+ @tailrec private def dispatchCompleted(): Unit = {
+ if (incubating.nonEmpty) {
+ val it = incubating.head
+ if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) {
+ it.tc.generateTraceInfo.map(subscriptions ! _)
+ incubating = incubating.tail
+ dispatchCompleted()
+ }
+ }
+ }
+
+ override def postStop(): Unit = {
+ super.postStop()
+ checkSchedule.cancel()
+ }
+}
+
+object Incubator {
+
+ def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime))
+
+ case object CheckForCompletedTraces
+ case class IncubatingTrace(tc: TracingContext) {
+ private val incubationStartNanoTime = System.nanoTime()
+ def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
new file mode 100644
index 00000000..04e61407
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
@@ -0,0 +1,99 @@
+package kamon.trace
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import akka.actor.ActorSystem
+import akka.event.LoggingAdapter
+import kamon.Kamon
+import kamon.metric.TraceMetrics.TraceMetricRecorder
+import kamon.metric.{ MetricsExtension, TraceMetrics, Metrics }
+
+import scala.annotation.tailrec
+
+class MetricsOnlyContext(
+ traceName: String,
+ val token: String,
+ izOpen: Boolean,
+ val levelOfDetail: LevelOfDetail,
+ val origin: TraceContextOrigin,
+ nanoTimeztamp: Long,
+ log: LoggingAdapter,
+ metricsExtension: MetricsExtension,
+ val system: ActorSystem)
+ extends TraceContext {
+
+ @volatile private var _name = traceName
+ @volatile private var _isOpen = izOpen
+ @volatile protected var _elapsedNanoTime = 0L
+
+ private val _nanoTimestamp = nanoTimeztamp
+ private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]()
+ private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
+
+ def rename(newName: String): Unit =
+ if (isOpen) _name = newName
+ else if (log.isWarningEnabled) log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName)
+
+ def name: String = _name
+ def isEmpty: Boolean = false
+ def isOpen: Boolean = _isOpen
+ def nanoTimestamp: Long = _nanoTimestamp
+ def elapsedNanoTime: Long = _elapsedNanoTime
+ def addMetadata(key: String, value: String): Unit = {}
+
+ def finish(): Unit = {
+ _isOpen = false
+ _elapsedNanoTime = System.nanoTime() - _nanoTimestamp
+ val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
+
+ metricRecorder.map { traceMetrics ⇒
+ traceMetrics.elapsedTime.record(elapsedNanoTime)
+ drainFinishedSegments(traceMetrics)
+ }
+ }
+
+ def startSegment(segmentName: String, category: String, library: String): Segment =
+ new MetricsOnlySegment(segmentName, category, library)
+
+ @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
+ val segment = _finishedSegments.poll()
+ if (segment != null) {
+ metricRecorder.segmentRecorder(segment.identity).record(segment.duration)
+ drainFinishedSegments(metricRecorder)
+ }
+ }
+
+ protected def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = {
+ _finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration))
+
+ if (isClosed) {
+ metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
+ drainFinishedSegments(traceMetrics)
+ }
+ }
+ }
+
+ class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment {
+ protected val segmentStartNanoTime = System.nanoTime()
+ @volatile private var _segmentName = segmentName
+ @volatile private var _elapsedNanoTime = 0L
+ @volatile protected var _isOpen = true
+
+ def name: String = _segmentName
+ def isEmpty: Boolean = false
+ def addMetadata(key: String, value: String): Unit = {}
+ def isOpen: Boolean = _isOpen
+ def elapsedNanoTime: Long = _elapsedNanoTime
+
+ def rename(newName: String): Unit =
+ if (isOpen) _segmentName = newName
+ else if (log.isWarningEnabled) log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName)
+
+ def finish: Unit = {
+ _isOpen = false
+ _elapsedNanoTime = System.nanoTime() - segmentStartNanoTime
+
+ finishSegment(name, category, library, elapsedNanoTime)
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
new file mode 100644
index 00000000..60a400f8
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
@@ -0,0 +1,45 @@
+package kamon.trace
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.concurrent.forkjoin.ThreadLocalRandom
+
+trait Sampler {
+ def shouldTrace: Boolean
+ def shouldReport(traceElapsedNanoTime: Long): Boolean
+}
+
+object NoSampling extends Sampler {
+ def shouldTrace: Boolean = false
+ def shouldReport(traceElapsedNanoTime: Long): Boolean = false
+}
+
+object SampleAll extends Sampler {
+ def shouldTrace: Boolean = true
+ def shouldReport(traceElapsedNanoTime: Long): Boolean = true
+}
+
+class RandomSampler(chance: Int) extends Sampler {
+ require(chance > 0, "kamon.trace.random-sampler.chance cannot be <= 0")
+ require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100")
+
+ def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance
+ def shouldReport(traceElapsedNanoTime: Long): Boolean = true
+}
+
+class OrderedSampler(interval: Int) extends Sampler {
+ require(interval > 0, "kamon.trace.ordered-sampler.interval cannot be <= 0")
+
+ private val counter = new AtomicLong(0L)
+ def shouldTrace: Boolean = counter.incrementAndGet() % interval == 0
+ // TODO: find a more efficient way to do this, protect from long overflow.
+ def shouldReport(traceElapsedNanoTime: Long): Boolean = true
+}
+
+class ThresholdSampler(thresholdInNanoseconds: Long) extends Sampler {
+ require(thresholdInNanoseconds > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0")
+
+ def shouldTrace: Boolean = true
+ def shouldReport(traceElapsedNanoTime: Long): Boolean = traceElapsedNanoTime >= thresholdInNanoseconds
+}
+
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 5b74e6b2..2c0b38bf 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -27,7 +27,7 @@ import kamon.metric.TraceMetrics.TraceMetricRecorder
import scala.annotation.tailrec
-sealed trait TraceContext {
+trait TraceContext {
def name: String
def token: String
def rename(name: String): Unit
@@ -39,15 +39,23 @@ sealed trait TraceContext {
def nonEmpty: Boolean = !isEmpty
def startSegment(segmentName: String, category: String, library: String): Segment
def nanoTimestamp: Long
+ def addMetadata(key: String, value: String)
+
+ protected def elapsedNanoTime: Long
}
-sealed trait Segment {
+trait Segment {
def name: String
def rename(newName: String): Unit
def category: String
def library: String
def finish(): Unit
+ def isOpen: Boolean
+ def isClosed: Boolean = !isOpen
def isEmpty: Boolean
+ def addMetadata(key: String, value: String)
+
+ protected def elapsedNanoTime: Long
}
case object EmptyTraceContext extends TraceContext {
@@ -60,85 +68,24 @@ case object EmptyTraceContext extends TraceContext {
def isEmpty: Boolean = true
def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment
def nanoTimestamp: Long = 0L
+ def addMetadata(key: String, value: String): Unit = {}
+ def elapsedNanoTime: Long = 0L
case object EmptySegment extends Segment {
val name: String = "empty-segment"
val category: String = "empty-category"
val library: String = "empty-library"
def isEmpty: Boolean = true
+ def isOpen: Boolean = false
def rename(newName: String): Unit = {}
def finish: Unit = {}
- }
-}
-
-class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
- val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext {
-
- val isEmpty: Boolean = false
- @volatile private var _name = traceName
- @volatile private var _isOpen = izOpen
-
- private val _nanoTimestamp = nanoTimeztamp
- private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
- private val metricsExtension = Kamon(Metrics)(system)
- private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
-
- def name: String = _name
- def rename(newName: String): Unit =
- if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace.
-
- def isOpen: Boolean = _isOpen
- def nanoTimestamp: Long = _nanoTimestamp
-
- def finish(): Unit = {
- _isOpen = false
- val elapsedNanoTime = System.nanoTime() - _nanoTimestamp
- val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
-
- metricRecorder.map { traceMetrics ⇒
- traceMetrics.elapsedTime.record(elapsedNanoTime)
- drainFinishedSegments(traceMetrics)
- }
- }
-
- def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library)
-
- @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
- val segment = finishedSegments.poll()
- if (segment != null) {
- metricRecorder.segmentRecorder(segment.identity).record(segment.duration)
- drainFinishedSegments(metricRecorder)
- }
- }
-
- private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = {
- finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration))
-
- if (isClosed) {
- metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
- drainFinishedSegments(traceMetrics)
- }
- }
- }
-
- class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment {
- private val _segmentStartNanoTime = System.nanoTime()
- @volatile private var _segmentName = segmentName
- @volatile private var _isOpen = true
-
- def name: String = _segmentName
- def rename(newName: String): Unit = _segmentName = newName
- def isEmpty: Boolean = false
-
- def finish: Unit = {
- val segmentFinishNanoTime = System.nanoTime()
- finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime))
- }
+ def addMetadata(key: String, value: String): Unit = {}
+ def elapsedNanoTime: Long = 0L
}
}
case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity
-case class SegmentData(identity: SegmentMetricIdentity, duration: Long)
+case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: Long)
object SegmentCategory {
val HttpClient = "http-client"
@@ -146,7 +93,7 @@ object SegmentCategory {
sealed trait LevelOfDetail
object LevelOfDetail {
- case object OnlyMetrics extends LevelOfDetail
+ case object MetricsOnly extends LevelOfDetail
case object SimpleTrace extends LevelOfDetail
case object FullTrace extends LevelOfDetail
}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
index a59abc18..a80a4321 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
@@ -16,14 +16,60 @@
package kamon.trace
-import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
+import akka.actor._
import akka.actor
+import akka.event.Logging
+import kamon.metric.Metrics
import kamon.util.GlobPathFilter
import kamon.Kamon
class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val config = system.settings.config.getConfig("kamon.trace")
val enableAskPatternTracing = config.getBoolean("ask-pattern-tracing")
+
+ val detailLevel: LevelOfDetail = config.getString("level") match {
+ case "metrics-only" ⇒ LevelOfDetail.MetricsOnly
+ case "simple-trace" ⇒ LevelOfDetail.SimpleTrace
+ case other ⇒ sys.error(s"Unknown tracing level $other present in the configuration file.")
+ }
+
+ val sampler: Sampler =
+ if (detailLevel == LevelOfDetail.MetricsOnly) NoSampling
+ else config.getString("sampling") match {
+ case "all" ⇒ SampleAll
+ case "random" ⇒ new RandomSampler(config.getInt("random-sampler.chance"))
+ case "ordered" ⇒ new OrderedSampler(config.getInt("ordered-sampler.interval"))
+ case "threshold" ⇒ new RandomSampler(config.getInt("threshold-sampler.threshold"))
+ }
+
+ val log = Logging(system, "TraceExtension")
+ val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions")
+ val incubator = system.actorOf(Incubator.props(subscriptions, 10000000000L))
+ val metricsExtension = Kamon(Metrics)(system)
+
+ def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, nanoTimestamp: Long, system: ActorSystem): TraceContext = {
+ def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, metricsExtension, system)
+
+ if (detailLevel == LevelOfDetail.MetricsOnly)
+ newMetricsOnlyContext
+ else {
+ if (!sampler.shouldTrace)
+ newMetricsOnlyContext
+ else
+ new TracingContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, this, metricsExtension, system)
+ }
+ }
+
+ def report(trace: TracingContext): Unit = if (sampler.shouldReport(trace.elapsedNanoTime)) {
+ if (trace.shouldIncubate)
+ incubator ! trace
+ else
+ trace.generateTraceInfo.map(subscriptions ! _)
+ }
+
+ def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber)
+ def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber)
+
}
object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
@@ -34,3 +80,6 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
}
}
+
+case class TraceInfo(name: String, token: String, startMilliTime: Long, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String], segments: List[SegmentInfo])
+case class SegmentInfo(name: String, category: String, library: String, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String]) \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
index c5fb100c..84e234f3 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceLocal.scala
@@ -43,13 +43,13 @@ object TraceLocal {
object HttpContextKey extends TraceLocal.TraceLocalKey { type ValueType = HttpContext }
def store(key: TraceLocalKey)(value: key.ValueType): Unit = TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.store(key)(value)
- case EmptyTraceContext ⇒ // Can't store in the empty context.
+ case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.store(key)(value)
+ case EmptyTraceContext ⇒ // Can't store in the empty context.
}
def retrieve(key: TraceLocalKey): Option[key.ValueType] = TraceRecorder.currentContext match {
- case ctx: DefaultTraceContext ⇒ ctx.traceLocalStorage.retrieve(key)
- case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context.
+ case ctx: MetricsOnlyContext ⇒ ctx.traceLocalStorage.retrieve(key)
+ case EmptyTraceContext ⇒ None // Can't retrieve anything from the empty context.
}
def storeForMdc(key: String, value: String): Unit = store(AvailableToMdc.fromKey(key))(value)
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
index 8da187cb..572d94e5 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
@@ -16,6 +16,8 @@
package kamon.trace
+import kamon.Kamon
+
import scala.language.experimental.macros
import java.util.concurrent.atomic.AtomicLong
import kamon.macros.InlineTraceContextMacro
@@ -34,27 +36,20 @@ object TraceRecorder {
def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet())
- private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext = {
- new DefaultTraceContext(
- name,
- token.getOrElse(newToken),
- izOpen = true,
- LevelOfDetail.OnlyMetrics,
- TraceContextOrigin.Local,
- nanoTimeztamp = System.nanoTime,
- system)
- }
+ private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext =
+ Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), true, TraceContextOrigin.Local, System.nanoTime(), system)
def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = {
val equivalentNanotime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000)
- new DefaultTraceContext(
+ /*new MetricsOnlyContext(
traceName,
traceToken,
isOpen,
- LevelOfDetail.OnlyMetrics,
+ LevelOfDetail.MetricsOnly,
TraceContextOrigin.Remote,
equivalentNanotime,
- system)
+ system)*/
+ ???
}
def setContext(context: TraceContext): Unit = traceContextStorage.set(context)
@@ -81,8 +76,8 @@ object TraceRecorder {
}
def withTraceContextAndSystem[T](thunk: (TraceContext, ActorSystem) ⇒ T): Option[T] = currentContext match {
- case ctx: DefaultTraceContext ⇒ Some(thunk(ctx, ctx.system))
- case EmptyTraceContext ⇒ None
+ case ctx: MetricsOnlyContext ⇒ Some(thunk(ctx, ctx.system))
+ case EmptyTraceContext ⇒ None
}
def withInlineTraceContextReplacement[T](traceCtx: TraceContext)(thunk: ⇒ T): T = macro InlineTraceContextMacro.withInlineTraceContextImpl[T, TraceContext]
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala
new file mode 100644
index 00000000..d533a344
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TraceSubscriptions.scala
@@ -0,0 +1,29 @@
+package kamon.trace
+
+import akka.actor.{ Terminated, ActorRef, Actor }
+
+class TraceSubscriptions extends Actor {
+ import TraceSubscriptions._
+
+ var subscribers: List[ActorRef] = Nil
+
+ def receive = {
+ case Subscribe(newSubscriber) ⇒
+ if (!subscribers.contains(newSubscriber))
+ subscribers = context.watch(newSubscriber) :: subscribers
+
+ case Unsubscribe(leavingSubscriber) ⇒
+ subscribers = subscribers.filter(_ == leavingSubscriber)
+
+ case Terminated(terminatedSubscriber) ⇒
+ subscribers = subscribers.filter(_ == terminatedSubscriber)
+
+ case trace: TraceInfo ⇒
+ subscribers.foreach(_ ! trace)
+ }
+}
+
+object TraceSubscriptions {
+ case class Subscribe(subscriber: ActorRef)
+ case class Unsubscribe(subscriber: ActorRef)
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
new file mode 100644
index 00000000..c9cbc754
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
@@ -0,0 +1,72 @@
+package kamon.trace
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.{ AtomicInteger, AtomicLongFieldUpdater }
+
+import akka.actor.ActorSystem
+import akka.event.LoggingAdapter
+import kamon.Kamon
+import kamon.metric.TraceMetrics.TraceMetricRecorder
+import kamon.metric.{ MetricsExtension, Metrics, TraceMetrics }
+
+import scala.annotation.tailrec
+import scala.collection.concurrent.TrieMap
+
+class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin,
+ nanoTimeztamp: Long, log: LoggingAdapter, traceExtension: TraceExtension, metricsExtension: MetricsExtension, system: ActorSystem)
+ extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, nanoTimeztamp, log, metricsExtension, system) {
+
+ val openSegments = new AtomicInteger(0)
+ private val startMilliTime = System.currentTimeMillis()
+ private val allSegments = new ConcurrentLinkedQueue[TracingSegment]()
+ private val metadata = TrieMap.empty[String, String]
+
+ override def addMetadata(key: String, value: String): Unit = metadata.put(key, value)
+
+ override def startSegment(segmentName: String, category: String, library: String): Segment = {
+ openSegments.incrementAndGet()
+ val newSegment = new TracingSegment(segmentName, category, library)
+ allSegments.add(newSegment)
+ newSegment
+ }
+
+ override def finish(): Unit = {
+ super.finish()
+ traceExtension.report(this)
+ }
+
+ override def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = {
+ openSegments.decrementAndGet()
+ super.finishSegment(segmentName, category, library, duration)
+ }
+
+ def shouldIncubate: Boolean = isOpen || openSegments.get() > 0
+
+ def generateTraceInfo: Option[TraceInfo] = if (isOpen) None else {
+ val currentSegments = allSegments.iterator()
+ var segmentsInfo: List[SegmentInfo] = Nil
+
+ while (currentSegments.hasNext()) {
+ val segment = currentSegments.next()
+ segment.createSegmentInfo match {
+ case Some(si) ⇒ segmentsInfo = si :: segmentsInfo
+ case None ⇒ log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name)
+ }
+ }
+
+ Some(TraceInfo(traceName, token, startMilliTime, nanoTimeztamp, elapsedNanoTime, metadata.toMap, segmentsInfo))
+ }
+
+ class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) {
+ private val metadata = TrieMap.empty[String, String]
+ override def addMetadata(key: String, value: String): Unit = metadata.put(key, value)
+
+ override def finish: Unit = {
+ super.finish()
+ }
+
+ def createSegmentInfo: Option[SegmentInfo] =
+ if (isOpen) None
+ else Some(SegmentInfo(this.name, category, library, segmentStartNanoTime, elapsedNanoTime, metadata.toMap))
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
index d79a3ab6..4f4efa4d 100644
--- a/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
+++ b/kamon-core/src/main/scala/kamon/trace/logging/MdcKeysSupport.scala
@@ -17,7 +17,7 @@
package kamon.trace.logging
import kamon.trace.TraceLocal.AvailableToMdc
-import kamon.trace.{ EmptyTraceContext, DefaultTraceContext, TraceContext, TraceRecorder }
+import kamon.trace.{ EmptyTraceContext, MetricsOnlyContext, TraceContext, TraceRecorder }
import org.slf4j.MDC
@@ -29,7 +29,7 @@ trait MdcKeysSupport {
}
private[this] def copyToMdc(traceContext: TraceContext): Iterable[String] = traceContext match {
- case ctx: DefaultTraceContext ⇒
+ case ctx: MetricsOnlyContext ⇒
ctx.traceLocalStorage.underlyingStorage.collect {
case (available: AvailableToMdc, value) ⇒ Map(available.mdcKey -> String.valueOf(value))
}.map { value ⇒ value.map { case (k, v) ⇒ MDC.put(k, v); k } }.flatten
diff --git a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
new file mode 100644
index 00000000..a263ff7f
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
@@ -0,0 +1,101 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import akka.actor.ActorSystem
+import akka.testkit.{ ImplicitSender, TestKitBase }
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import org.scalatest.{ Matchers, WordSpecLike }
+import scala.concurrent.duration._
+
+class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("simple-trace-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | tick-interval = 1 hour
+ | filters = [
+ | {
+ | trace {
+ | includes = [ "*" ]
+ | excludes = [ "non-tracked-trace"]
+ | }
+ | }
+ | ]
+ | precision {
+ | default-histogram-precision {
+ | highest-trackable-value = 3600000000000
+ | significant-value-digits = 2
+ | }
+ |
+ | default-min-max-counter-precision {
+ | refresh-interval = 1 second
+ | highest-trackable-value = 999999999
+ | significant-value-digits = 2
+ | }
+ | }
+ |}
+ |
+ |kamon.trace {
+ | level = simple-trace
+ | sampling = all
+ |}
+ """.stripMargin))
+
+ "the simple tracing" should {
+ "send a TraceInfo when the trace has finished and all segments are finished" in {
+ Kamon(Trace)(system).subscribe(testActor)
+
+ TraceRecorder.withNewTraceContext("simple-trace-without-segments") {
+ TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish()
+ TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test").finish()
+ TraceRecorder.finish()
+ }
+
+ val traceInfo = expectMsgType[TraceInfo]
+ Kamon(Trace)(system).unsubscribe(testActor)
+
+ traceInfo.name should be("simple-trace-without-segments")
+ traceInfo.segments.size should be(2)
+ traceInfo.segments.find(_.name == "segment-one") should be('defined)
+ traceInfo.segments.find(_.name == "segment-two") should be('defined)
+ }
+
+ "incubate the tracing context if there are open segments after finishing" in {
+ Kamon(Trace)(system).subscribe(testActor)
+
+ val secondSegment = TraceRecorder.withNewTraceContext("simple-trace-without-segments") {
+ TraceRecorder.currentContext.startSegment("segment-one", "test-segment", "test").finish()
+ val segment = TraceRecorder.currentContext.startSegment("segment-two", "test-segment", "test")
+ TraceRecorder.finish()
+ segment
+ }
+
+ expectNoMsg(2 seconds)
+ secondSegment.finish()
+
+ val traceInfo = expectMsgType[TraceInfo]
+ Kamon(Trace)(system).unsubscribe(testActor)
+
+ traceInfo.name should be("simple-trace-without-segments")
+ traceInfo.segments.size should be(2)
+ traceInfo.segments.find(_.name == "segment-one") should be('defined)
+ traceInfo.segments.find(_.name == "segment-two") should be('defined)
+ }
+
+ }
+}