1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
/*
* =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon.trace
import java.io.ObjectStreamException
import akka.actor.ActorSystem
import kamon.Kamon
import kamon.metric._
import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
import kamon.trace.TraceContext.SegmentIdentity
import kamon.metric.TraceMetrics.TraceMetricRecorder
trait TraceContext {
def name: String
def token: String
def system: ActorSystem
def rename(name: String): Unit
def levelOfDetail: TracingLevelOfDetail
def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
def finish(metadata: Map[String, String])
def origin: TraceContextOrigin
def startMilliTime: Long
def isOpen: Boolean
private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
}
object TraceContext {
type SegmentIdentity = MetricIdentity
}
trait SegmentCompletionHandle {
def finish(metadata: Map[String, String] = Map.empty)
}
case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String])
sealed trait TracingLevelOfDetail
case object OnlyMetrics extends TracingLevelOfDetail
case object SimpleTrace extends TracingLevelOfDetail
case object FullTrace extends TracingLevelOfDetail
sealed trait TraceContextOrigin
object TraceContextOrigin {
case object Local extends TraceContextOrigin
case object Remote extends TraceContextOrigin
}
trait TraceContextAware extends Serializable {
def captureNanoTime: Long
def traceContext: Option[TraceContext]
}
object TraceContextAware {
def default: TraceContextAware = new DefaultTraceContextAware
class DefaultTraceContextAware extends TraceContextAware {
@transient val captureNanoTime = System.nanoTime()
@transient val traceContext = TraceRecorder.currentContext
//
// Beware of this hack, it might bite us in the future!
//
// When using remoting/cluster all messages carry the TraceContext in the envelope in which they
// are sent but that doesn't apply to System Messages. We are certain that the TraceContext is
// available (if any) when the system messages are read and this will make sure that it is correctly
// captured and propagated.
@throws[ObjectStreamException]
private def readResolve: AnyRef = {
new DefaultTraceContextAware
}
}
}
trait SegmentCompletionHandleAware extends TraceContextAware {
@volatile var segmentCompletionHandle: Option[SegmentCompletionHandle] = None
}
object SegmentCompletionHandleAware {
def default: SegmentCompletionHandleAware = new DefaultSegmentCompletionHandleAware
class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {}
}
class SimpleMetricCollectionContext(traceName: String, val token: String, metadata: Map[String, String],
val origin: TraceContextOrigin, val system: ActorSystem, val startMilliTime: Long = System.currentTimeMillis,
izOpen: Boolean = true) extends TraceContext {
@volatile private var _name = traceName
@volatile private var _isOpen = izOpen
val levelOfDetail = OnlyMetrics
val startNanoTime = System.nanoTime()
val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
val metricsExtension = Kamon(Metrics)(system)
def name: String = _name
def rename(newName: String): Unit = _name = newName
def isOpen(): Boolean = _isOpen
def finish(metadata: Map[String, String]): Unit = {
_isOpen = false
val elapsedNanoTime =
if (origin == TraceContextOrigin.Local)
// Everything is local, nanoTime is still the best resolution we can use.
System.nanoTime() - startNanoTime
else
// For a remote TraceContext we can only rely on the startMilliTime and we need to scale it to nanoseconds
// to be consistent with unit used for all latency measurements.
(System.currentTimeMillis() - startMilliTime) * 1000000L
val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
metricRecorder.map { traceMetrics ⇒
traceMetrics.elapsedTime.record(elapsedNanoTime)
drainFinishedSegments(traceMetrics)
}
}
private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
while (!finishedSegments.isEmpty) {
val segmentData = finishedSegments.poll()
metricRecorder.segmentRecorder(segmentData.identity).record(segmentData.duration)
}
}
private def finishSegment(identity: MetricIdentity, duration: Long, metadata: Map[String, String]): Unit = {
finishedSegments.add(SegmentData(identity, duration, metadata))
if (!_isOpen) {
metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
drainFinishedSegments(traceMetrics)
}
}
}
def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle =
new SimpleMetricCollectionCompletionHandle(identity, metadata)
class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle {
val segmentStartNanoTime = System.nanoTime()
def finish(metadata: Map[String, String] = Map.empty): Unit = {
val segmentFinishNanoTime = System.nanoTime()
finishSegment(identity, (segmentFinishNanoTime - segmentStartNanoTime), startMetadata ++ metadata)
}
}
}
|