1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
/*
* =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon.statsd
import akka.actor._
import kamon.Kamon
import kamon.metric.UserMetrics._
import kamon.metric._
import kamon.metrics._
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import com.typesafe.config.Config
import java.lang.management.ManagementFactory
import akka.event.Logging
import java.net.InetSocketAddress
object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = StatsD
override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system)
trait MetricKeyGenerator {
def localhostName: String
def normalizedLocalhostName: String
def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String
}
}
class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val log = Logging(system, classOf[StatsDExtension])
log.info("Starting the Kamon(StatsD) extension")
private val statsDConfig = system.settings.config.getConfig("kamon.statsd")
val statsDHost = new InetSocketAddress(statsDConfig.getString("hostname"), statsDConfig.getInt("port"))
val flushInterval = statsDConfig.getMilliseconds("flush-interval")
val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size")
val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval")
val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval)
// Subscribe to all user metrics
Kamon(Metrics)(system).subscribe(UserHistograms, "*", statsDMetricsListener, permanently = true)
Kamon(Metrics)(system).subscribe(UserCounters, "*", statsDMetricsListener, permanently = true)
Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", statsDMetricsListener, permanently = true)
Kamon(Metrics)(system).subscribe(UserGauges, "*", statsDMetricsListener, permanently = true)
// Subscribe to Actors
val includedActors = statsDConfig.getStringList("includes.actor").asScala
for (actorPathPattern ← includedActors) {
Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true)
}
// Subscribe to Routers
val includedRouters = statsDConfig.getStringList("includes.router").asScala
for (routerPathPattern ← includedRouters) {
Kamon(Metrics)(system).subscribe(RouterMetrics, routerPathPattern, statsDMetricsListener, permanently = true)
}
// Subscribe to Traces
val includedTraces = statsDConfig.getStringList("includes.trace").asScala
for (tracePathPattern ← includedTraces) {
Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, statsDMetricsListener, permanently = true)
}
// Subscribe to Dispatchers
val includedDispatchers = statsDConfig.getStringList("includes.dispatcher").asScala
for (dispatcherPathPattern ← includedDispatchers) {
Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, statsDMetricsListener, permanently = true)
}
// Subscribe to SystemMetrics
val includeSystemMetrics = statsDConfig.getBoolean("report-system-metrics")
if (includeSystemMetrics) {
List(CPUMetrics, ProcessCPUMetrics, MemoryMetrics, NetworkMetrics, GCMetrics, HeapMetrics) foreach { metric ⇒
Kamon(Metrics)(system).subscribe(metric, "*", statsDMetricsListener, permanently = true)
}
}
def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
val defaultMetricKeyGenerator = new SimpleMetricKeyGenerator(system.settings.config)
val metricsSender = system.actorOf(StatsDMetricsSender.props(
statsDHost,
maxPacketSizeInBytes,
defaultMetricKeyGenerator), "statsd-metrics-sender")
if (flushInterval == tickInterval) {
// No need to buffer the metrics, let's go straight to the metrics sender.
metricsSender
} else {
system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsSender), "statsd-metrics-buffer")
}
}
}
class SimpleMetricKeyGenerator(config: Config) extends StatsD.MetricKeyGenerator {
val application = config.getString("kamon.statsd.simple-metric-key-generator.application")
val includeHostnameInMetrics =
config.getBoolean("kamon.statsd.simple-metric-key-generator.include-hostname")
val hostnameOverride =
config.getString("kamon.statsd.simple-metric-key-generator.hostname-override")
val _localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
val _normalizedLocalhostName = _localhostName.replace('.', '_')
def localhostName: String = _localhostName
def normalizedLocalhostName: String = _normalizedLocalhostName
val hostname: String =
if (hostnameOverride == "none") normalizedLocalhostName
else hostnameOverride
val baseName: String =
if (includeHostnameInMetrics) s"${application}.${hostname}"
else application
def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = {
val normalizedGroupName = groupIdentity.name.replace(": ", "-").replace(" ", "_").replace("/", "_")
val key = s"${baseName}.${groupIdentity.category.name}.${normalizedGroupName}"
if (isUserMetric(groupIdentity)) key
else s"${key}.${metricIdentity.name}"
}
def isUserMetric(groupIdentity: MetricGroupIdentity): Boolean = groupIdentity match {
case someUserMetric: UserMetricGroup ⇒ true
case everythingElse ⇒ false
}
}
|