1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
/*
* =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon.trace
import akka.actor.ActorSystem
import kamon.Kamon
import kamon.metrics._
import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
import kamon.trace.TraceContext.SegmentIdentity
import kamon.metrics.TraceMetrics.TraceMetricRecorder
trait TraceContext {
def name: String
def token: String
def system: ActorSystem
def rename(name: String): Unit
def levelOfDetail: TracingLevelOfDetail
def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
def finish(metadata: Map[String, String])
}
object TraceContext {
type SegmentIdentity = MetricIdentity
}
trait SegmentCompletionHandle {
def finish(metadata: Map[String, String])
}
case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String])
sealed trait TracingLevelOfDetail
case object OnlyMetrics extends TracingLevelOfDetail
case object SimpleTrace extends TracingLevelOfDetail
case object FullTrace extends TracingLevelOfDetail
trait TraceContextAware {
def captureNanoTime: Long
def traceContext: Option[TraceContext]
}
object TraceContextAware {
def default: TraceContextAware = new DefaultTraceContextAware
class DefaultTraceContextAware extends TraceContextAware {
val captureNanoTime = System.nanoTime()
val traceContext = TraceRecorder.currentContext
}
}
trait SegmentCompletionHandleAware extends TraceContextAware {
@volatile var segmentCompletionHandle: Option[SegmentCompletionHandle] = None
}
object SegmentCompletionHandleAware {
def default: SegmentCompletionHandleAware = new DefaultSegmentCompletionHandleAware
class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {}
}
class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String],
val system: ActorSystem) extends TraceContext {
@volatile private var _isOpen = true
val levelOfDetail = OnlyMetrics
val startMark = System.nanoTime()
val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
val metricsExtension = Kamon(Metrics)(system)
def name: String = _name
def rename(newName: String): Unit = _name = newName
def isOpen(): Boolean = _isOpen
def finish(metadata: Map[String, String]): Unit = {
_isOpen = false
val finishMark = System.nanoTime()
val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)
metricRecorder.map { traceMetrics ⇒
traceMetrics.elapsedTime.record(finishMark - startMark)
drainFinishedSegments(traceMetrics)
}
}
private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
while (!finishedSegments.isEmpty) {
val segmentData = finishedSegments.poll()
metricRecorder.segmentRecorder(segmentData.identity).record(segmentData.duration)
}
}
private def finishSegment(identity: MetricIdentity, duration: Long, metadata: Map[String, String]): Unit = {
finishedSegments.add(SegmentData(identity, duration, metadata))
if (!_isOpen) {
metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics ⇒
drainFinishedSegments(traceMetrics)
}
}
}
def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle =
new SimpleMetricCollectionCompletionHandle(identity, metadata)
class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle {
val segmentStartNanoTime = System.nanoTime()
def finish(metadata: Map[String, String] = Map.empty): Unit = {
val segmentFinishNanoTime = System.nanoTime()
finishSegment(identity, (segmentFinishNanoTime - segmentStartNanoTime), startMetadata ++ metadata)
}
}
}
|