/*
* =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon.trace
import java.io.ObjectStreamException
import akka.actor.ActorSystem
import kamon.Kamon
import kamon.metric._
import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
import kamon.metric.TraceMetrics.TraceMetricRecorder
import scala.annotation.tailrec
sealed trait TraceContext {
def name: String
def token: String
def rename(name: String): Unit
def finish(): Unit
def origin: TraceContextOrigin
def isOpen: Boolean
def isClosed: Boolean = !isOpen
def isEmpty: Boolean
def nonEmpty: Boolean = !isEmpty
def startSegment(segmentName: String, category: String, library: String): Segment
def nanoTimestamp: Long
}
sealed trait Segment {
def name: String
def rename(newName: String): Unit
def category: String
def library: String
def finish(): Unit
def isEmpty: Boolean
}
case object EmptyTraceContext extends TraceContext {
def name: String = "empty-trace"
def token: String = ""
def rename(name: String): Unit = {}
def finish(): Unit = {}
def origin: TraceContextOrigin = TraceContextOrigin.Local
def isOpen: Boolean = false
def isEmpty: Boolean = true
def startSegment(segmentName: String, category: String, library: String): Segment = EmptySegment
def nanoTimestamp: Long = 0L
case object EmptySegment extends Segment {
val name: String = "empty-segment"
val category: String = "empty-category"
val library: String = "empty-library"
def isEmpty: Boolean = true
def rename(newName: String): Unit = {}
def finish: Unit = {}
}
}
class DefaultTraceContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
val origin: TraceContextOrigin, nanoTimeztamp: Long, val system: ActorSystem) extends TraceContext {
val isEmpty: Boolean = false
@volatile private var _name = traceName
@volatile private var _isOpen = izOpen
private val _nanoTimestamp = nanoTimeztamp
private val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
private val metricsExtension = Kamon(Metrics)(system)
private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
def name: String = _name
def rename(newName: String): Unit =
if (isOpen) _name = newName // TODO: log a warning about renaming a closed trace.
def isOpen: Boolean = _isOpen
def nanoTimestamp: Long = _nanoTimestamp
def finish(): Unit = {
_isOpen = false
val elapsedNanoTime = System.nanoTime() - _nanoTimestamp
val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
metricRecorder.map { traceMetrics ⇒
traceMetrics.elapsedTime.record(elapsedNanoTime)
drainFinishedSegments(traceMetrics)
}
}
def startSegment(segmentName: String, category: String, library: String): Segment = new DefaultSegment(segmentName, category, library)
@tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
val segment = finishedSegments.poll()
if (segment != null) {
metricRecorder.segmentRecorder(segment.identity).record(segment.duration)
drainFinishedSegments(metricRecorder)
}
}
private def finishSegment(segmentName: String, category: String, library: String, duration: Long): Unit = {
finishedSegments.add(SegmentData(SegmentMetricIdentity(segmentName, category, library), duration))
if (isClosed) {
metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
drainFinishedSegments(traceMetrics)
}
}
}
class DefaultSegment(segmentName: String, val category: String, val library: String) extends Segment {
private val _segmentStartNanoTime = System.nanoTime()
@volatile private var _segmentName = segmentName
@volatile private var _isOpen = true
def name: String = _segmentName
def rename(newName: String): Unit = _segmentName = newName
def isEmpty: Boolean = false
def finish: Unit = {
val segmentFinishNanoTime = System.nanoTime()
finishSegment(name, category, library, (segmentFinishNanoTime - _segmentStartNanoTime))
}
}
}
case class SegmentMetricIdentity(name: String, category: String, library: String) extends MetricIdentity
case class SegmentData(identity: SegmentMetricIdentity, duration: Long)
object SegmentCategory {
val HttpClient = "http-client"
}
sealed trait LevelOfDetail
object LevelOfDetail {
case object OnlyMetrics extends LevelOfDetail
case object SimpleTrace extends LevelOfDetail
case object FullTrace extends LevelOfDetail
}
sealed trait TraceContextOrigin
object TraceContextOrigin {
case object Local extends TraceContextOrigin
case object Remote extends TraceContextOrigin
}
trait TraceContextAware extends Serializable {
def traceContext: TraceContext
}
object TraceContextAware {
def default: TraceContextAware = new DefaultTraceContextAware
class DefaultTraceContextAware extends TraceContextAware {
@transient val traceContext = TraceRecorder.currentContext
//
// Beware of this hack, it might bite us in the future!
//
// When using remoting/cluster all messages carry the TraceContext in the envelope in which they
// are sent but that doesn't apply to System Messages. We are certain that the TraceContext is
// available (if any) when the system messages are read and this will make sure that it is correctly
// captured and propagated.
@throws[ObjectStreamException]
private def readResolve: AnyRef = {
new DefaultTraceContextAware
}
}
}
trait TimestampedTraceContextAware extends TraceContextAware {
def captureNanoTime: Long
}
object TimestampedTraceContextAware {
def default: TimestampedTraceContextAware = new DefaultTraceContextAware with TimestampedTraceContextAware {
@transient val captureNanoTime = System.nanoTime()
}
}
trait SegmentAware {
@volatile var segment: Segment = EmptyTraceContext.EmptySegment
}
object SegmentAware {
def default: SegmentAware = new DefaultSegmentAware
class DefaultSegmentAware extends DefaultTraceContextAware with SegmentAware {}
}