aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-core/src/main/resources/reference.conf15
-rw-r--r--kamon-core/src/main/scala/kamon/TimeUnits.scala57
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Incubator.scala71
-rw-r--r--kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala70
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Sampler.scala13
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceContext.scala48
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceExtension.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/trace/TracingContext.scala70
-rw-r--r--kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala14
10 files changed, 259 insertions, 140 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 78d0900e..7c4b4ecb 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -156,6 +156,21 @@ kamon {
threshold = 1 second
}
+ incubator {
+ # Minimum time to stay in the trace incubator before checking if the trace should not be incubated anymore. No
+ # checks are made at least until this period has passed.
+ min-incubation-time = 5 seconds
+
+ # Time to wait between incubation checks. After min-incubation-time, a trace is checked using this interval and if
+ # if shouldn't be incubated anymore, the TraceInfo is collected and reported for it.
+ check-interval = 1 second
+
+ # Max amount of time that a trace can be in the incubator. If this time is reached for a given trace then it will
+ # be reported with whatever information is available at the moment, logging a warning for each segment that remains
+ # open after this point.
+ max-incubation-time = 20 seconds
+ }
+
# If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask`
# pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment
# the future was created.
diff --git a/kamon-core/src/main/scala/kamon/TimeUnits.scala b/kamon-core/src/main/scala/kamon/TimeUnits.scala
new file mode 100644
index 00000000..44f5b4c3
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/TimeUnits.scala
@@ -0,0 +1,57 @@
+package kamon
+
+/**
+ * Epoch time stamp in milliseconds.
+ */
+class MilliTimestamp(val millis: Long) extends AnyVal {
+ override def toString: String = String.valueOf(millis) + ".millis"
+}
+
+object MilliTimestamp {
+ def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis())
+}
+
+/**
+ * Epoch time stamp in nanoseconds.
+ *
+ * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch
+ * timestamp in nanoseconds.
+ */
+class NanoTimestamp(val nanos: Long) extends AnyVal {
+ override def toString: String = String.valueOf(nanos) + ".nanos"
+}
+
+object NanoTimestamp {
+ def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000)
+}
+
+/**
+ * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime()
+ */
+class RelativeNanoTimestamp(val nanos: Long) extends AnyVal {
+ override def toString: String = String.valueOf(nanos) + ".nanos"
+}
+
+object RelativeNanoTimestamp {
+ def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime())
+ def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp =
+ new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000)
+}
+
+/**
+ * Number of nanoseconds that passed between two points in time.
+ */
+class NanoInterval(val nanos: Long) extends AnyVal {
+ def <(that: NanoInterval): Boolean = this.nanos < that.nanos
+ def >(that: NanoInterval): Boolean = this.nanos > that.nanos
+ def ==(that: NanoInterval): Boolean = this.nanos == that.nanos
+ def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos
+ def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos
+
+ override def toString: String = String.valueOf(nanos) + ".nanos"
+}
+
+object NanoInterval {
+ def default: NanoInterval = new NanoInterval(0L)
+ def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos)
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Incubator.scala b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
index d363d771..df51f411 100644
--- a/kamon-core/src/main/scala/kamon/trace/Incubator.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Incubator.scala
@@ -1,32 +1,70 @@
package kamon.trace
-import akka.actor.{ Props, Actor, ActorRef }
+import java.util.concurrent.TimeUnit
+
+import akka.actor.{ ActorLogging, Props, Actor, ActorRef }
+import kamon.{ NanoInterval, RelativeNanoTimestamp }
import kamon.trace.Incubator.{ CheckForCompletedTraces, IncubatingTrace }
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.duration._
-class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Actor {
+class Incubator(subscriptions: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
- val checkSchedule = context.system.scheduler.schedule(100 millis, 100 millis, self, CheckForCompletedTraces)
- var incubating = Queue.empty[IncubatingTrace]
+ val config = context.system.settings.config.getConfig("kamon.trace.incubator")
+
+ val minIncubationTime = new NanoInterval(config.getDuration("min-incubation-time", TimeUnit.NANOSECONDS))
+ val maxIncubationTime = new NanoInterval(config.getDuration("max-incubation-time", TimeUnit.NANOSECONDS))
+ val checkInterval = config.getDuration("check-interval", TimeUnit.MILLISECONDS)
+
+ val checkSchedule = context.system.scheduler.schedule(checkInterval.millis, checkInterval.millis, self, CheckForCompletedTraces)
+ var waitingForMinimumIncubation = Queue.empty[IncubatingTrace]
+ var waitingForIncubationFinish = List.empty[IncubatingTrace]
def receive = {
- case CheckForCompletedTraces ⇒ dispatchCompleted()
- case tc: TracingContext ⇒ incubating = incubating.enqueue(IncubatingTrace(tc))
+ case tc: TracingContext ⇒ incubate(tc)
+ case CheckForCompletedTraces ⇒
+ checkWaitingForMinimumIncubation()
+ checkWaitingForIncubationFinish()
}
- @tailrec private def dispatchCompleted(): Unit = {
- if (incubating.nonEmpty) {
- val it = incubating.head
- if (!it.tc.shouldIncubate || it.incubationNanoTime >= maxIncubationNanoTime) {
- it.tc.generateTraceInfo.map(subscriptions ! _)
- incubating = incubating.tail
- dispatchCompleted()
+ def incubate(tc: TracingContext): Unit =
+ waitingForMinimumIncubation = waitingForMinimumIncubation.enqueue(IncubatingTrace(tc, RelativeNanoTimestamp.now))
+
+ @tailrec private def checkWaitingForMinimumIncubation(): Unit = {
+ if (waitingForMinimumIncubation.nonEmpty) {
+ val it = waitingForMinimumIncubation.head
+ if (NanoInterval.since(it.incubationStart) >= minIncubationTime) {
+ waitingForMinimumIncubation = waitingForMinimumIncubation.tail
+
+ if (it.tc.shouldIncubate)
+ waitingForIncubationFinish = it :: waitingForIncubationFinish
+ else
+ dispatchTraceInfo(it.tc)
+
+ checkWaitingForMinimumIncubation()
}
}
}
+ private def checkWaitingForIncubationFinish(): Unit = {
+ waitingForIncubationFinish = waitingForIncubationFinish.filter {
+ case IncubatingTrace(context, incubationStart) ⇒
+ if (!context.shouldIncubate) {
+ dispatchTraceInfo(context)
+ false
+ } else {
+ if (NanoInterval.since(incubationStart) >= maxIncubationTime) {
+ log.warning("Trace [{}] with token [{}] has reached the maximum incubation time, will be reported as is.", context.name, context.token)
+ dispatchTraceInfo(context);
+ false
+ } else true
+ }
+ }
+ }
+
+ def dispatchTraceInfo(tc: TracingContext): Unit = subscriptions ! tc.generateTraceInfo
+
override def postStop(): Unit = {
super.postStop()
checkSchedule.cancel()
@@ -35,11 +73,8 @@ class Incubator(subscriptions: ActorRef, maxIncubationNanoTime: Long) extends Ac
object Incubator {
- def props(subscriptions: ActorRef, maxIncubationNanoTime: Long): Props = Props(new Incubator(subscriptions, maxIncubationNanoTime))
+ def props(subscriptions: ActorRef): Props = Props(new Incubator(subscriptions))
case object CheckForCompletedTraces
- case class IncubatingTrace(tc: TracingContext) {
- private val incubationStartNanoTime = System.nanoTime()
- def incubationNanoTime: Long = System.nanoTime() - incubationStartNanoTime
- }
+ case class IncubatingTrace(tc: TracingContext, incubationStart: RelativeNanoTimestamp)
}
diff --git a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
index 04e61407..f478d971 100644
--- a/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/MetricsOnlyContext.scala
@@ -4,50 +4,42 @@ import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.ActorSystem
import akka.event.LoggingAdapter
-import kamon.Kamon
+import kamon.{ RelativeNanoTimestamp, NanoInterval }
import kamon.metric.TraceMetrics.TraceMetricRecorder
-import kamon.metric.{ MetricsExtension, TraceMetrics, Metrics }
+import kamon.metric.{ MetricsExtension, TraceMetrics }
import scala.annotation.tailrec
-class MetricsOnlyContext(
- traceName: String,
- val token: String,
- izOpen: Boolean,
- val levelOfDetail: LevelOfDetail,
- val origin: TraceContextOrigin,
- nanoTimeztamp: Long,
- log: LoggingAdapter,
- metricsExtension: MetricsExtension,
- val system: ActorSystem)
+private[trace] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail, val origin: TraceContextOrigin,
+ val startRelativeTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val system: ActorSystem)
extends TraceContext {
@volatile private var _name = traceName
@volatile private var _isOpen = izOpen
- @volatile protected var _elapsedNanoTime = 0L
+ @volatile protected var _elapsedTime = NanoInterval.default
- private val _nanoTimestamp = nanoTimeztamp
private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]()
- private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
+ private val _traceLocalStorage = new TraceLocalStorage
def rename(newName: String): Unit =
- if (isOpen) _name = newName
- else if (log.isWarningEnabled) log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName)
+ if (isOpen)
+ _name = newName
+ else if (log.isWarningEnabled)
+ log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName)
def name: String = _name
def isEmpty: Boolean = false
def isOpen: Boolean = _isOpen
- def nanoTimestamp: Long = _nanoTimestamp
- def elapsedNanoTime: Long = _elapsedNanoTime
def addMetadata(key: String, value: String): Unit = {}
def finish(): Unit = {
_isOpen = false
- _elapsedNanoTime = System.nanoTime() - _nanoTimestamp
- val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
+ val traceElapsedTime = NanoInterval.since(startRelativeTimestamp)
+ _elapsedTime = traceElapsedTime
+ val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
metricRecorder.map { traceMetrics ⇒
- traceMetrics.elapsedTime.record(elapsedNanoTime)
+ traceMetrics.elapsedTime.record(traceElapsedTime.nanos)
drainFinishedSegments(traceMetrics)
}
}
@@ -58,12 +50,12 @@ class MetricsOnlyContext(
@tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
val segment = _finishedSegments.poll()
if (segment != null) {
- metricRecorder.segmentRecorder(segment.identity).record(segment.duration)
+ metricRecorder.segmentRecorder(segment.identity).record(segment.duration.nanos)
drainFinishedSegments(metricRecorder)
}
}
- protected def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = {
+ protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = {
_finishedSegments.add(SegmentLatencyData(SegmentMetricIdentity(segmentName, category, library), duration))
if (isClosed) {
@@ -73,27 +65,41 @@ class MetricsOnlyContext(
}
}
+ // Should only be used by the TraceLocal utilities.
+ def traceLocalStorage: TraceLocalStorage = _traceLocalStorage
+
+ // Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default
+ // will be returned.
+ def elapsedTime: NanoInterval = _elapsedTime
+
class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment {
- protected val segmentStartNanoTime = System.nanoTime()
+ private val _startTimestamp = RelativeNanoTimestamp.now
@volatile private var _segmentName = segmentName
- @volatile private var _elapsedNanoTime = 0L
- @volatile protected var _isOpen = true
+ @volatile private var _elapsedTime = NanoInterval.default
+ @volatile private var _isOpen = true
def name: String = _segmentName
def isEmpty: Boolean = false
def addMetadata(key: String, value: String): Unit = {}
def isOpen: Boolean = _isOpen
- def elapsedNanoTime: Long = _elapsedNanoTime
def rename(newName: String): Unit =
- if (isOpen) _segmentName = newName
- else if (log.isWarningEnabled) log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName)
+ if (isOpen)
+ _segmentName = newName
+ else if (log.isWarningEnabled)
+ log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName)
def finish: Unit = {
_isOpen = false
- _elapsedNanoTime = System.nanoTime() - segmentStartNanoTime
+ val segmentElapsedTime = NanoInterval.since(_startTimestamp)
+ _elapsedTime = segmentElapsedTime
- finishSegment(name, category, library, elapsedNanoTime)
+ finishSegment(name, category, library, segmentElapsedTime)
}
+
+ // Handle with care and make sure that the segment is closed before calling this method, otherwise
+ // NanoInterval.default will be returned.
+ def elapsedTime: NanoInterval = _elapsedTime
+ def startTimestamp: RelativeNanoTimestamp = _startTimestamp
}
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
index 60a400f8..650592a5 100644
--- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
@@ -2,21 +2,22 @@ package kamon.trace
import java.util.concurrent.atomic.AtomicLong
+import kamon.NanoInterval
import scala.concurrent.forkjoin.ThreadLocalRandom
trait Sampler {
def shouldTrace: Boolean
- def shouldReport(traceElapsedNanoTime: Long): Boolean
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean
}
object NoSampling extends Sampler {
def shouldTrace: Boolean = false
- def shouldReport(traceElapsedNanoTime: Long): Boolean = false
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = false
}
object SampleAll extends Sampler {
def shouldTrace: Boolean = true
- def shouldReport(traceElapsedNanoTime: Long): Boolean = true
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = true
}
class RandomSampler(chance: Int) extends Sampler {
@@ -24,7 +25,7 @@ class RandomSampler(chance: Int) extends Sampler {
require(chance <= 100, "kamon.trace.random-sampler.chance cannot be > 100")
def shouldTrace: Boolean = ThreadLocalRandom.current().nextInt(100) <= chance
- def shouldReport(traceElapsedNanoTime: Long): Boolean = true
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = true
}
class OrderedSampler(interval: Int) extends Sampler {
@@ -33,13 +34,13 @@ class OrderedSampler(interval: Int) extends Sampler {
private val counter = new AtomicLong(0L)
def shouldTrace: Boolean = counter.incrementAndGet() % interval == 0
// TODO: find a more efficient way to do this, protect from long overflow.
- def shouldReport(traceElapsedNanoTime: Long): Boolean = true
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = true
}
class ThresholdSampler(thresholdInNanoseconds: Long) extends Sampler {
require(thresholdInNanoseconds > 0, "kamon.trace.threshold-sampler.minimum-elapsed-time cannot be <= 0")
def shouldTrace: Boolean = true
- def shouldReport(traceElapsedNanoTime: Long): Boolean = traceElapsedNanoTime >= thresholdInNanoseconds
+ def shouldReport(traceElapsedTime: NanoInterval): Boolean = traceElapsedTime.nanos >= thresholdInNanoseconds
}
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
index 2c0b38bf..466ceafd 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
@@ -17,59 +17,55 @@
package kamon.trace
import java.io.ObjectStreamException
-
import akka.actor.ActorSystem
-import kamon.Kamon
+import kamon._
import kamon.metric._
-import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
-import kamon.metric.TraceMetrics.TraceMetricRecorder
-
-import scala.annotation.tailrec
trait TraceContext {
def name: String
def token: String
- def rename(name: String): Unit
- def finish(): Unit
def origin: TraceContextOrigin
- def isOpen: Boolean
- def isClosed: Boolean = !isOpen
def isEmpty: Boolean
def nonEmpty: Boolean = !isEmpty
+ def isOpen: Boolean
+ def isClosed: Boolean = !isOpen
+ def system: ActorSystem
+
+ def finish(): Unit
+ def rename(newName: String): Unit
def startSegment(segmentName: String, category: String, library: String): Segment
- def nanoTimestamp: Long
def addMetadata(key: String, value: String)
-
- protected def elapsedNanoTime: Long
+ def startRelativeTimestamp: RelativeNanoTimestamp
}
trait Segment {
def name: String
- def rename(newName: String): Unit
def category: String
def library: String
- def finish(): Unit
+ def isEmpty: Boolean
+ def nonEmpty: Boolean = !isEmpty
def isOpen: Boolean
def isClosed: Boolean = !isOpen
- def isEmpty: Boolean
- def addMetadata(key: String, value: String)
- protected def elapsedNanoTime: Long
+ def finish(): Unit
+ def rename(newName: String): Unit
+ def addMetadata(key: String, value: String)
}
case object EmptyTraceContext extends TraceContext {
def name: String = "empty-trace"
def token: String = ""
- def rename(name: String): Unit = {}
- def finish(): Unit = {}
def origin: TraceContextOrigin = TraceContextOrigin.Local
- def isOpen: Boolean = false
def isEmpty: Boolean = true
+ def isOpen: Boolean = false
+ def system: ActorSystem = sys.error("Can't obtain a ActorSystem from a EmptyTraceContext.")
+
+ def finish(): Unit = {}
+ def rename(name: String): Unit = {}
def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment
- def nanoTimestamp: Long = 0L
def addMetadata(key: String, value: String): Unit = {}
- def elapsedNanoTime: Long = 0L
+ def startRelativeTimestamp = new RelativeNanoTimestamp(0L)
case object EmptySegment extends Segment {
val name: String = "empty-segment"
@@ -77,15 +73,15 @@ case object EmptyTraceContext extends TraceContext {
val library: String = "empty-library"
def isEmpty: Boolean = true
def isOpen: Boolean = false
- def rename(newName: String): Unit = {}
+
def finish: Unit = {}
+ def rename(newName: String): Unit = {}
def addMetadata(key: String, value: String): Unit = {}
- def elapsedNanoTime: Long = 0L
}
}
case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity
-case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: Long)
+case class SegmentLatencyData(identity: SegmentMetricIdentity, duration: NanoInterval)
object SegmentCategory {
val HttpClient = "http-client"
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
index e5fbb15e..8bd9384a 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceExtension.scala
@@ -19,9 +19,9 @@ package kamon.trace
import akka.actor._
import akka.actor
import akka.event.Logging
+import kamon._
import kamon.metric.Metrics
import kamon.util.GlobPathFilter
-import kamon.Kamon
class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val config = system.settings.config.getConfig("kamon.trace")
@@ -44,11 +44,15 @@ class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val log = Logging(system, "TraceExtension")
val subscriptions = system.actorOf(Props[TraceSubscriptions], "trace-subscriptions")
- val incubator = system.actorOf(Incubator.props(subscriptions, 10000000000L))
+ val incubator = system.actorOf(Incubator.props(subscriptions))
val metricsExtension = Kamon(Metrics)(system)
- def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin, nanoTimestamp: Long, system: ActorSystem): TraceContext = {
- def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, metricsExtension, system)
+ def newTraceContext(traceName: String, token: String, origin: TraceContextOrigin, system: ActorSystem): TraceContext =
+ newTraceContext(traceName, token, true, origin, RelativeNanoTimestamp.now, system)
+
+ def newTraceContext(traceName: String, token: String, isOpen: Boolean, origin: TraceContextOrigin,
+ startTimestamp: RelativeNanoTimestamp, system: ActorSystem): TraceContext = {
+ def newMetricsOnlyContext = new MetricsOnlyContext(traceName, token, isOpen, detailLevel, origin, startTimestamp, log, metricsExtension, system)
if (detailLevel == LevelOfDetail.MetricsOnly || origin == TraceContextOrigin.Remote)
newMetricsOnlyContext
@@ -56,20 +60,19 @@ class TraceExtension(system: ExtendedActorSystem) extends Kamon.Extension {
if (!sampler.shouldTrace)
newMetricsOnlyContext
else
- new TracingContext(traceName, token, true, detailLevel, origin, nanoTimestamp, log, this, metricsExtension, system)
+ new TracingContext(traceName, token, true, detailLevel, origin, startTimestamp, log, metricsExtension, this, system)
}
}
- def report(trace: TracingContext): Unit = if (sampler.shouldReport(trace.elapsedNanoTime)) {
- if (trace.shouldIncubate)
- incubator ! trace
- else
- trace.generateTraceInfo.map(subscriptions ! _)
- }
-
def subscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Subscribe(subscriber)
def unsubscribe(subscriber: ActorRef): Unit = subscriptions ! TraceSubscriptions.Unsubscribe(subscriber)
+ private[kamon] def dispatchTracingContext(trace: TracingContext): Unit =
+ if (sampler.shouldReport(trace.elapsedTime))
+ if (trace.shouldIncubate)
+ incubator ! trace
+ else
+ subscriptions ! trace.generateTraceInfo
}
object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
@@ -81,5 +84,5 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider {
}
}
-case class TraceInfo(name: String, token: String, startMilliTime: Long, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String], segments: List[SegmentInfo])
-case class SegmentInfo(name: String, category: String, library: String, startNanoTime: Long, elapsedNanoTime: Long, metadata: Map[String, String]) \ No newline at end of file
+case class TraceInfo(name: String, token: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String], segments: List[SegmentInfo])
+case class SegmentInfo(name: String, category: String, library: String, timestamp: NanoTimestamp, elapsedTime: NanoInterval, metadata: Map[String, String]) \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
index af47bf3c..703896c3 100644
--- a/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TraceRecorder.scala
@@ -16,7 +16,7 @@
package kamon.trace
-import kamon.Kamon
+import kamon.{ MilliTimestamp, RelativeNanoTimestamp, Kamon }
import scala.language.experimental.macros
import java.util.concurrent.atomic.AtomicLong
@@ -37,11 +37,11 @@ object TraceRecorder {
def newToken: String = hostnamePrefix + "-" + String.valueOf(tokenCounter.incrementAndGet())
private def newTraceContext(name: String, token: Option[String], system: ActorSystem): TraceContext =
- Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), true, TraceContextOrigin.Local, System.nanoTime(), system)
+ Kamon(Trace)(system).newTraceContext(name, token.getOrElse(newToken), TraceContextOrigin.Local, system)
- def joinRemoteTraceContext(traceName: String, traceToken: String, startMilliTime: Long, isOpen: Boolean, system: ActorSystem): TraceContext = {
- val equivalentNanoTime = System.nanoTime() - ((System.currentTimeMillis() - startMilliTime) * 1000000)
- Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentNanoTime, system)
+ def joinRemoteTraceContext(traceName: String, traceToken: String, startTimestamp: MilliTimestamp, isOpen: Boolean, system: ActorSystem): TraceContext = {
+ val equivalentStartTimestamp = RelativeNanoTimestamp.relativeTo(startTimestamp)
+ Kamon(Trace)(system).newTraceContext(traceName, traceToken, isOpen, TraceContextOrigin.Remote, equivalentStartTimestamp, system)
}
def setContext(context: TraceContext): Unit = traceContextStorage.set(context)
diff --git a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
index c9cbc754..6a8cb1c6 100644
--- a/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/TracingContext.scala
@@ -1,72 +1,76 @@
package kamon.trace
import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.{ AtomicInteger, AtomicLongFieldUpdater }
+import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.event.LoggingAdapter
-import kamon.Kamon
-import kamon.metric.TraceMetrics.TraceMetricRecorder
-import kamon.metric.{ MetricsExtension, Metrics, TraceMetrics }
+import kamon.{ NanoInterval, NanoTimestamp, RelativeNanoTimestamp }
+import kamon.metric.MetricsExtension
-import scala.annotation.tailrec
import scala.collection.concurrent.TrieMap
-class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin,
- nanoTimeztamp: Long, log: LoggingAdapter, traceExtension: TraceExtension, metricsExtension: MetricsExtension, system: ActorSystem)
- extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, nanoTimeztamp, log, metricsExtension, system) {
+private[trace] class TracingContext(traceName: String, token: String, izOpen: Boolean, levelOfDetail: LevelOfDetail, origin: TraceContextOrigin,
+ startTimeztamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, traceExtension: TraceExtension, system: ActorSystem)
+ extends MetricsOnlyContext(traceName, token, izOpen, levelOfDetail, origin, startTimeztamp, log, metricsExtension, system) {
- val openSegments = new AtomicInteger(0)
- private val startMilliTime = System.currentTimeMillis()
- private val allSegments = new ConcurrentLinkedQueue[TracingSegment]()
- private val metadata = TrieMap.empty[String, String]
+ private val _openSegments = new AtomicInteger(0)
+ private val _startTimestamp = NanoTimestamp.now
+ private val _allSegments = new ConcurrentLinkedQueue[TracingSegment]()
+ private val _metadata = TrieMap.empty[String, String]
- override def addMetadata(key: String, value: String): Unit = metadata.put(key, value)
+ override def addMetadata(key: String, value: String): Unit = _metadata.put(key, value)
override def startSegment(segmentName: String, category: String, library: String): Segment = {
- openSegments.incrementAndGet()
+ _openSegments.incrementAndGet()
val newSegment = new TracingSegment(segmentName, category, library)
- allSegments.add(newSegment)
+ _allSegments.add(newSegment)
newSegment
}
override def finish(): Unit = {
super.finish()
- traceExtension.report(this)
+ traceExtension.dispatchTracingContext(this)
}
- override def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = {
- openSegments.decrementAndGet()
+ override def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = {
+ _openSegments.decrementAndGet()
super.finishSegment(segmentName, category, library, duration)
}
- def shouldIncubate: Boolean = isOpen || openSegments.get() > 0
+ def shouldIncubate: Boolean = isOpen || _openSegments.get() > 0
- def generateTraceInfo: Option[TraceInfo] = if (isOpen) None else {
- val currentSegments = allSegments.iterator()
- var segmentsInfo: List[SegmentInfo] = Nil
+ // Handle with care, should only be used after a trace is finished.
+ def generateTraceInfo: TraceInfo = {
+ require(isClosed, "Can't generated a TraceInfo if the Trace has not closed yet.")
+
+ val currentSegments = _allSegments.iterator()
+ var segmentsInfo = List.newBuilder[SegmentInfo]
while (currentSegments.hasNext()) {
val segment = currentSegments.next()
- segment.createSegmentInfo match {
- case Some(si) ⇒ segmentsInfo = si :: segmentsInfo
- case None ⇒ log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name)
- }
+ if (segment.isClosed)
+ segmentsInfo += segment.createSegmentInfo(_startTimestamp, startRelativeTimestamp)
+ else
+ log.warning("Segment [{}] will be left out of TraceInfo because it was still open.", segment.name)
}
- Some(TraceInfo(traceName, token, startMilliTime, nanoTimeztamp, elapsedNanoTime, metadata.toMap, segmentsInfo))
+ TraceInfo(name, token, _startTimestamp, elapsedTime, _metadata.toMap, segmentsInfo.result())
}
class TracingSegment(segmentName: String, category: String, library: String) extends MetricsOnlySegment(segmentName, category, library) {
private val metadata = TrieMap.empty[String, String]
override def addMetadata(key: String, value: String): Unit = metadata.put(key, value)
- override def finish: Unit = {
- super.finish()
- }
+ // Handle with care, should only be used after the segment has finished.
+ def createSegmentInfo(traceStartTimestamp: NanoTimestamp, traceRelativeTimestamp: RelativeNanoTimestamp): SegmentInfo = {
+ require(isClosed, "Can't generated a SegmentInfo if the Segment has not closed yet.")
- def createSegmentInfo: Option[SegmentInfo] =
- if (isOpen) None
- else Some(SegmentInfo(this.name, category, library, segmentStartNanoTime, elapsedNanoTime, metadata.toMap))
+ // We don't have a epoch-based timestamp for the segments because calling System.currentTimeMillis() is both
+ // expensive and inaccurate, but we can do that once for the trace and calculate all the segments relative to it.
+ val segmentStartTimestamp = new NanoTimestamp((this.startTimestamp.nanos - traceRelativeTimestamp.nanos) + traceStartTimestamp.nanos)
+
+ SegmentInfo(this.name, category, library, segmentStartTimestamp, this.elapsedTime, metadata.toMap)
+ }
}
} \ No newline at end of file
diff --git a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
index a263ff7f..cda9cad7 100644
--- a/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
+++ b/kamon-core/src/test/scala/kamon/trace/SimpleTraceSpec.scala
@@ -88,13 +88,15 @@ class SimpleTraceSpec extends TestKitBase with WordSpecLike with Matchers with I
expectNoMsg(2 seconds)
secondSegment.finish()
- val traceInfo = expectMsgType[TraceInfo]
- Kamon(Trace)(system).unsubscribe(testActor)
+ within(10 seconds) {
+ val traceInfo = expectMsgType[TraceInfo]
+ Kamon(Trace)(system).unsubscribe(testActor)
- traceInfo.name should be("simple-trace-without-segments")
- traceInfo.segments.size should be(2)
- traceInfo.segments.find(_.name == "segment-one") should be('defined)
- traceInfo.segments.find(_.name == "segment-two") should be('defined)
+ traceInfo.name should be("simple-trace-without-segments")
+ traceInfo.segments.size should be(2)
+ traceInfo.segments.find(_.name == "segment-one") should be('defined)
+ traceInfo.segments.find(_.name == "segment-two") should be('defined)
+ }
}
}