aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/kamon/metric/Metrics.scala
blob: 3992ab43ad3cbefeae616e65ede47c6ab878976e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package kamon.metric

import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit}
import akka.actor.ActorRef
import com.codahale.metrics
import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry}


object Metrics {
  val registry: MetricRegistry = new MetricRegistry

  val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS)
  //consoleReporter.build().start(45, TimeUnit.SECONDS)

  //val newrelicReporter = NewRelicReporter(registry)
  //newrelicReporter.start(5, TimeUnit.SECONDS)

  def include(name: String, metric: Metric) = registry.register(name, metric)

  def exclude(name: String) = {
    registry.removeMatching(new MetricFilter {
      def matches(name: String, metric: Metric): Boolean = name.startsWith(name)
    })
  }



  def deregister(fullName: String) = {
    registry.removeMatching(new MetricFilter {
      def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
    })
  }
}

object Watched {
  case object Actor
  case object Dispatcher
}

object MetricDirectory {
  def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/"

  def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox"

  def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/")

  def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon")


  def shouldInstrumentActor(actorPath: String): Boolean = {
    !(actorPath.isEmpty || actorPath.startsWith("system"))
  }


}












case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram)




trait Histogram {
  def update(value: Long): Unit
  def snapshot: HistogramSnapshot
}

trait HistogramSnapshot {
  def median: Double
  def max: Double
  def min: Double
}


case class ActorSystemMetrics(actorSystemName: String) {
  import scala.collection.JavaConverters._
  val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala

  private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())

  def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
    val stats = createDispatcherCollector
    dispatchers.put(dispatcherName, stats)
    Some(stats)
  }

}


case class CodahaleHistogram() extends Histogram {
  private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir())

  def update(value: Long) = histogram.update(value)
  def snapshot: HistogramSnapshot = {
    val snapshot = histogram.getSnapshot

    CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin)
  }
}

case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot







/**
 *  Dispatcher Metrics that we care about currently with a histogram-like nature:
 *    - Work Queue Size
 *    - Total/Active Thread Count
 */



import annotation.tailrec
import java.util.concurrent.atomic.AtomicReference

object Atomic {
  def apply[T]( obj : T) = new Atomic(new AtomicReference(obj))
  implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref)
}

class Atomic[T](val atomic : AtomicReference[T]) {
  @tailrec
  final def update(f: T => T) : T = {
    val oldValue = atomic.get()
    val newValue = f(oldValue)
    if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f)
  }

  def get() = atomic.get()
}