aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace/TraceContext.scala
blob: 307cf17a4703e5c7ba6d698afa5ebc4587291a28 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
 * =========================================================================================
 * Copyright © 2013 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon.trace

import akka.actor.ActorSystem
import kamon.Kamon
import kamon.metrics._
import java.util.concurrent.ConcurrentLinkedQueue
import kamon.trace.TraceContextAware.DefaultTraceContextAware
import kamon.trace.TraceContext.SegmentIdentity
import kamon.metrics.TraceMetrics.TraceMetricRecorder

trait TraceContext {
  def name: String
  def token: String
  def system: ActorSystem
  def rename(name: String): Unit
  def levelOfDetail: TracingLevelOfDetail
  def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle
  def finish(metadata: Map[String, String])

  private[kamon] val traceLocalStorage: TraceLocalStorage = new TraceLocalStorage
}

object TraceContext {
  type SegmentIdentity = MetricIdentity
}

trait SegmentCompletionHandle {
  def finish(metadata: Map[String, String])
}

case class SegmentData(identity: MetricIdentity, duration: Long, metadata: Map[String, String])

sealed trait TracingLevelOfDetail
case object OnlyMetrics extends TracingLevelOfDetail
case object SimpleTrace extends TracingLevelOfDetail
case object FullTrace extends TracingLevelOfDetail

trait TraceContextAware {
  def captureNanoTime: Long
  def traceContext: Option[TraceContext]
}

object TraceContextAware {
  def default: TraceContextAware = new DefaultTraceContextAware

  class DefaultTraceContextAware extends TraceContextAware {
    val captureNanoTime = System.nanoTime()
    val traceContext = TraceRecorder.currentContext
  }
}

trait SegmentCompletionHandleAware extends TraceContextAware {
  @volatile var segmentCompletionHandle: Option[SegmentCompletionHandle] = None
}

object SegmentCompletionHandleAware {
  def default: SegmentCompletionHandleAware = new DefaultSegmentCompletionHandleAware

  class DefaultSegmentCompletionHandleAware extends DefaultTraceContextAware with SegmentCompletionHandleAware {}
}

class SimpleMetricCollectionContext(@volatile private var _name: String, val token: String, metadata: Map[String, String],
                                    val system: ActorSystem) extends TraceContext {
  @volatile private var _isOpen = true
  val levelOfDetail = OnlyMetrics
  val startMark = System.nanoTime()
  val finishedSegments = new ConcurrentLinkedQueue[SegmentData]()
  val metricsExtension = Kamon(Metrics)(system)

  def name: String = _name

  def rename(newName: String): Unit = _name = newName

  def isOpen(): Boolean = _isOpen

  def finish(metadata: Map[String, String]): Unit = {
    _isOpen = false
    val finishMark = System.nanoTime()
    val metricRecorder = metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory)

    metricRecorder.map { traceMetrics 
      traceMetrics.elapsedTime.record(finishMark - startMark)
      drainFinishedSegments(traceMetrics)
    }
  }

  private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = {
    while (!finishedSegments.isEmpty) {
      val segmentData = finishedSegments.poll()
      metricRecorder.segmentRecorder(segmentData.identity).record(segmentData.duration)
    }
  }

  private def finishSegment(identity: MetricIdentity, duration: Long, metadata: Map[String, String]): Unit = {
    finishedSegments.add(SegmentData(identity, duration, metadata))

    if (!_isOpen) {
      metricsExtension.register(TraceMetrics(name), TraceMetrics.Factory).map { traceMetrics 
        drainFinishedSegments(traceMetrics)
      }
    }
  }

  def startSegment(identity: SegmentIdentity, metadata: Map[String, String]): SegmentCompletionHandle =
    new SimpleMetricCollectionCompletionHandle(identity, metadata)

  class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle {
    val segmentStartNanoTime = System.nanoTime()

    def finish(metadata: Map[String, String] = Map.empty): Unit = {
      val segmentFinishNanoTime = System.nanoTime()
      finishSegment(identity, (segmentFinishNanoTime - segmentStartNanoTime), startMetadata ++ metadata)
    }
  }
}