1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
/*
* =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon.metrics
import scala.collection.concurrent.TrieMap
import akka.actor._
import com.typesafe.config.Config
import kamon.util.GlobPathFilter
import kamon.Kamon
import akka.actor
import kamon.metrics.Metrics.MetricGroupFilter
import kamon.metrics.MetricGroupIdentity.Category
import kamon.metrics.Metrics.MetricGroupFilter
import scala.Some
import kamon.metrics.Subscriptions.Subscribe
case class MetricGroupIdentity(name: String, category: MetricGroupIdentity.Category)
trait MetricIdentity {
def name: String
}
trait MetricGroupRecorder {
def record(identity: MetricIdentity, value: Long)
def collect: MetricGroupSnapshot
}
trait MetricGroupSnapshot {
def metrics: Map[MetricIdentity, MetricSnapshot]
}
trait MetricRecorder {
def record(value: Long)
def collect(): MetricSnapshot
}
trait MetricSnapshot {
def numberOfMeasurements: Long
def measurementLevels: Vector[MetricSnapshot.Measurement]
}
object MetricSnapshot {
case class Measurement(value: Long, count: Long)
}
case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot
object MetricGroupIdentity {
trait Category {
def name: String
}
val AnyCategory = new Category {
def name: String = "match-all"
override def equals(that: Any): Boolean = that.isInstanceOf[Category]
}
}
trait MetricGroupFactory {
type Group <: MetricGroupRecorder
def create(config: Config): Group
}
class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension {
val config = system.settings.config
val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
val filters = loadFilters(config)
lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")
def register(name: String, category: MetricGroupIdentity.Category with MetricGroupFactory): Option[category.Group] = {
if (shouldTrack(name, category))
Some(storage.getOrElseUpdate(MetricGroupIdentity(name, category), category.create(config)).asInstanceOf[category.Group])
else
None
}
def unregister(name: String, category: MetricGroupIdentity.Category with MetricGroupFactory): Unit = {
storage.remove(MetricGroupIdentity(name, category))
}
def subscribe(category: Category, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = {
subscriptions.tell(Subscribe(category, selection, permanently), receiver)
}
def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = {
(for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap
}
private def shouldTrack(name: String, category: MetricGroupIdentity.Category): Boolean = {
filters.get(category.name).map(filter ⇒ filter.accept(name)).getOrElse(false)
}
def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
import scala.collection.JavaConverters._
val filters = config.getObjectList("kamon.metrics.filters").asScala
val allFilters =
for (
filter ← filters;
entry ← filter.entrySet().asScala
) yield {
val key = entry.getKey
val keyBasedConfig = entry.getValue.atKey(key)
val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList
val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList
(key, MetricGroupFilter(includes, excludes))
}
allFilters.toMap
}
}
object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system)
case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
}
}
|