1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
/*
* =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon.trace
import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.{ ExtensionId, ActorSystem }
import akka.event.LoggingAdapter
import kamon.Kamon.Extension
import kamon.metric.{ MetricsExtension, TraceMetrics }
import kamon.util.{ NanoInterval, RelativeNanoTimestamp }
import scala.annotation.tailrec
private[kamon] class MetricsOnlyContext(traceName: String, val token: String, izOpen: Boolean, val levelOfDetail: LevelOfDetail,
val startTimestamp: RelativeNanoTimestamp, log: LoggingAdapter, metricsExtension: MetricsExtension, val actorSystem: ActorSystem)
extends TraceContext {
@volatile private var _name = traceName
@volatile private var _isOpen = izOpen
@volatile protected var _elapsedTime = NanoInterval.default
private val _finishedSegments = new ConcurrentLinkedQueue[SegmentLatencyData]()
private val _traceLocalStorage = new TraceLocalStorage
def rename(newName: String): Unit =
if (isOpen)
_name = newName
else if (log.isWarningEnabled)
log.warning("Can't rename trace from [{}] to [{}] because the trace is already closed.", name, newName)
def name: String = _name
def isEmpty: Boolean = false
def isOpen: Boolean = _isOpen
def addMetadata(key: String, value: String): Unit = {}
def lookupExtension[T <: Extension](id: ExtensionId[T]): T = id(actorSystem)
def finish(): Unit = {
_isOpen = false
val traceElapsedTime = NanoInterval.since(startTimestamp)
_elapsedTime = traceElapsedTime
metricsExtension.register(TraceMetrics, name).map { registration ⇒
registration.recorder.ElapsedTime.record(traceElapsedTime.nanos)
drainFinishedSegments(registration.recorder)
}
}
def startSegment(segmentName: String, category: String, library: String): Segment =
new MetricsOnlySegment(segmentName, category, library)
@tailrec private def drainFinishedSegments(recorder: TraceMetrics): Unit = {
val segment = _finishedSegments.poll()
if (segment != null) {
recorder.segment(segment.name, segment.category, segment.library).record(segment.duration.nanos)
drainFinishedSegments(recorder)
}
}
protected def finishSegment(segmentName: String, category: String, library: String, duration: NanoInterval): Unit = {
_finishedSegments.add(SegmentLatencyData(segmentName, category, library, duration))
if (isClosed) {
metricsExtension.register(TraceMetrics, name).map { registration ⇒
drainFinishedSegments(registration.recorder)
}
}
}
// Should only be used by the TraceLocal utilities.
def traceLocalStorage: TraceLocalStorage = _traceLocalStorage
// Handle with care and make sure that the trace is closed before calling this method, otherwise NanoInterval.default
// will be returned.
def elapsedTime: NanoInterval = _elapsedTime
class MetricsOnlySegment(segmentName: String, val category: String, val library: String) extends Segment {
private val _startTimestamp = RelativeNanoTimestamp.now
@volatile private var _segmentName = segmentName
@volatile private var _elapsedTime = NanoInterval.default
@volatile private var _isOpen = true
def name: String = _segmentName
def isEmpty: Boolean = false
def addMetadata(key: String, value: String): Unit = {}
def isOpen: Boolean = _isOpen
def rename(newName: String): Unit =
if (isOpen)
_segmentName = newName
else if (log.isWarningEnabled)
log.warning("Can't rename segment from [{}] to [{}] because the segment is already closed.", name, newName)
def finish: Unit = {
_isOpen = false
val segmentElapsedTime = NanoInterval.since(_startTimestamp)
_elapsedTime = segmentElapsedTime
finishSegment(name, category, library, segmentElapsedTime)
}
// Handle with care and make sure that the segment is closed before calling this method, otherwise
// NanoInterval.default will be returned.
def elapsedTime: NanoInterval = _elapsedTime
def startTimestamp: RelativeNanoTimestamp = _startTimestamp
}
}
case class SegmentLatencyData(name: String, category: String, library: String, duration: NanoInterval)
|