aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
blob: 64ee70bed332611f1005e98a40116917cc28db8a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
 * =========================================================================================
 * Copyright © 2013 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon.trace

import java.io.ObjectStreamException

import akka.actor.ActorSystem
import kamon.Kamon
import kamon.metric._
import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
import kamon.trace.TraceContext.SegmentIdentity
import kamon.metric.TraceMetrics.TraceMetricRecorder

import scala.annotation.tailrec

trait TraceContext {
  def name: String
  def token: String
  def system: ActorSystem
  def rename(name: String): Unit
  def levelOfDetail: TracingLevelOfDetail
  def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
  def finish(metadata: Map[String, String])
  def origin: TraceContextOrigin
  def startMilliTime: Long
  def isOpen: Boolean

  private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
}

object TraceContext {
  type SegmentIdentity = MetricIdentity
}

trait SegmentCompletionHandle {
  def finish(metadata: Map[String, String] = Map.empty)
}

case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String])

sealed trait TracingLevelOfDetail
case object OnlyMetrics extends TracingLevelOfDetail
case object SimpleTrace extends TracingLevelOfDetail
case object FullTrace extends TracingLevelOfDetail

sealed trait TraceContextOrigin
object TraceContextOrigin {
  case object Local extends TraceContextOrigin
  case object Remote extends TraceContextOrigin
}

trait TraceContextAware extends Serializable {
  def captureNanoTime: Long
  def traceContext: Option[TraceContext]
}

object TraceContextAware {
  def default: TraceContextAware = new DefaultTraceContextAware

  class DefaultTraceContextAware extends TraceContextAware {
    @transient val captureNanoTime = System.nanoTime()
    @transient val traceContext = TraceRecorder.currentContext

    //
    // Beware of this hack, it might bite us in the future!
    //
    // When using remoting/cluster all messages carry the TraceContext in the envelope in which they
    // are sent but that doesn't apply to System Messages. We are certain that the TraceContext is
    // available (if any) when the system messages are read and this will make sure that it is correctly
    // captured and propagated.
    @throws[ObjectStreamException]
    private def readResolve: AnyRef = {
      new DefaultTraceContextAware
    }
  }
}

trait SegmentCompletionHandleAware extends TraceContextAware {
  @volatile var segmentCompletionHandle: Option[SegmentCompletionHandle] = None
}

object SegmentCompletionHandleAware {
  def default: SegmentCompletionHandleAware = new DefaultSegmentCompletionHandleAware

  class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {}
}

class SimpleMetricCollectionContext(traceName: String, val token: String, metadata: Map[String, String],
    val origin: TraceContextOrigin, val system: ActorSystem, val startMilliTime: Long = System.currentTimeMillis,
    izOpen: Boolean = true) extends TraceContext {

  @volatile private var _name = traceName
  @volatile private var _isOpen = izOpen

  val levelOfDetail = OnlyMetrics
  val startNanoTime = System.nanoTime()
  val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
  val metricsExtension = Kamon(Metrics)(system)

  def name: String = _name

  def rename(newName: String): Unit = _name = newName

  def isOpen(): Boolean = _isOpen

  def finish(metadata: Map[String, String]): Unit = {
    _isOpen = false

    val elapsedNanoTime =
      if (origin == TraceContextOrigin.Local)
        // Everything is local, nanoTime is still the best resolution we can use.
        System.nanoTime() - startNanoTime
      else
        // For a remote TraceContext we can only rely on the startMilliTime and we need to scale it to nanoseconds
        // to be consistent with unit used for all latency measurements.
        (System.currentTimeMillis() - startMilliTime) * 1000000L

    val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)

    metricRecorder.map { traceMetrics 
      traceMetrics.elapsedTime.record(elapsedNanoTime)
      drainFinishedSegments(traceMetrics)
    }
  }

  @tailrec private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
    val segment = finishedSegments.poll()
    if (segment != null) {
      metricRecorder.segmentRecorder(segment.identity).record(segment.duration)
      drainFinishedSegments(metricRecorder)
    }
  }

  private def finishSegment(identity: MetricIdentity, duration: Long, metadata: Map[String, String]): Unit = {
    finishedSegments.add(SegmentData(identity, duration, metadata))

    if (!_isOpen) {
      metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics 
        drainFinishedSegments(traceMetrics)
      }
    }
  }

  def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle =
    new SimpleMetricCollectionCompletionHandle(identity, metadata)

  class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle {
    val segmentStartNanoTime = System.nanoTime()

    def finish(metadata: Map[String, String] = Map.empty): Unit = {
      val segmentFinishNanoTime = System.nanoTime()
      finishSegment(identity, (segmentFinishNanoTime - segmentStartNanoTime), startMetadata ++ metadata)
    }
  }
}