aboutsummaryrefslogtreecommitdiff
path: root/kamon-statsd/src
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-statsd/src')
-rw-r--r--kamon-statsd/src/main/resources/reference.conf11
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala16
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala65
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala8
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala14
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala133
6 files changed, 83 insertions, 164 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf
index a10ac735..32b99353 100644
--- a/kamon-statsd/src/main/resources/reference.conf
+++ b/kamon-statsd/src/main/resources/reference.conf
@@ -19,11 +19,12 @@ kamon {
# Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
- includes {
- actor = [ "*" ]
- trace = [ "*" ]
- dispatcher = [ "*" ]
- router = [ "*" ]
+ subscriptions {
+ trace = [ "**" ]
+ actor = [ "**" ]
+ dispatcher = [ "**" ]
+ user-metric = [ "**" ]
+ system-metric = [ "**" ]
}
# Enable system metrics
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala
index 28354423..0fce855c 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala
@@ -3,11 +3,10 @@ package kamon.statsd
import java.lang.management.ManagementFactory
import com.typesafe.config.Config
-import kamon.metric.UserMetrics.UserMetricGroup
-import kamon.metric.{ MetricIdentity, MetricGroupIdentity }
+import kamon.metric.{ MetricKey, Entity }
trait MetricKeyGenerator {
- def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String
+ def generateKey(entity: Entity, metricKey: MetricKey): String
}
class SimpleMetricKeyGenerator(config: Config) extends MetricKeyGenerator {
@@ -27,16 +26,11 @@ class SimpleMetricKeyGenerator(config: Config) extends MetricKeyGenerator {
if (includeHostname) s"$application.$normalizedHostname"
else application
- def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = {
- val normalizedGroupName = normalizer(groupIdentity.name)
- val key = s"${baseName}.${groupIdentity.category.name}.${normalizedGroupName}"
-
- if (isUserMetric(groupIdentity)) key
- else s"${key}.${metricIdentity.name}"
+ def generateKey(entity: Entity, metricKey: MetricKey): String = {
+ val normalizedGroupName = normalizer(entity.name)
+ s"${baseName}.${entity.category}.${normalizedGroupName}.${metricKey.name}"
}
- def isUserMetric(groupIdentity: MetricGroupIdentity): Boolean = groupIdentity.isInstanceOf[UserMetricGroup]
-
def hostName: String = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
def createNormalizer(strategy: String): Normalizer = strategy match {
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
index 2505f06a..e5a15a9d 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
@@ -18,17 +18,14 @@ package kamon.statsd
import akka.actor._
import kamon.Kamon
-import kamon.akka.{RouterMetrics, DispatcherMetrics, ActorMetrics}
-import kamon.http.HttpServerMetrics
-import kamon.metric.UserMetrics._
import kamon.metric._
-import kamon.metrics._
+import kamon.util.ConfigTools.Syntax
import scala.concurrent.duration._
-import scala.collection.JavaConverters._
import com.typesafe.config.Config
import akka.event.Logging
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit.MILLISECONDS
+import scala.collection.JavaConverters._
object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = StatsD
@@ -36,6 +33,8 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
}
class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ implicit val as = system
+
val log = Logging(system, classOf[StatsDExtension])
log.info("Starting the Kamon(StatsD) extension")
@@ -50,57 +49,11 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config)
- // Subscribe to all user metrics
- Kamon(Metrics)(system).subscribe(UserHistograms, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(UserCounters, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(UserGauges, "*", statsDMetricsListener, permanently = true)
-
- // Subscribe to server metrics
- Kamon(Metrics)(system).subscribe(HttpServerMetrics.category, "*", statsDMetricsListener, permanently = true)
-
- // Subscribe to Actors
- val includedActors = statsDConfig.getStringList("includes.actor").asScala
- for (actorPathPattern ← includedActors) {
- Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true)
- }
-
- // Subscribe to Routers
- val includedRouters = statsDConfig.getStringList("includes.router").asScala
- for (routerPathPattern ← includedRouters) {
- Kamon(Metrics)(system).subscribe(RouterMetrics, routerPathPattern, statsDMetricsListener, permanently = true)
- }
-
- // Subscribe to Traces
- val includedTraces = statsDConfig.getStringList("includes.trace").asScala
- for (tracePathPattern ← includedTraces) {
- Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, statsDMetricsListener, permanently = true)
- }
-
- // Subscribe to Dispatchers
- val includedDispatchers = statsDConfig.getStringList("includes.dispatcher").asScala
- for (dispatcherPathPattern ← includedDispatchers) {
- Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, statsDMetricsListener, permanently = true)
- }
-
- // Subscribe to SystemMetrics
- val includeSystemMetrics = statsDConfig.getBoolean("report-system-metrics")
- if (includeSystemMetrics) {
- //OS
- Kamon(Metrics)(system).subscribe(CPUMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ProcessCPUMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(MemoryMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(NetworkMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(DiskMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ContextSwitchesMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(LoadAverageMetrics, "*", statsDMetricsListener, permanently = true)
-
- //JVM
- Kamon(Metrics)(system).subscribe(HeapMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(NonHeapMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ThreadMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ClassLoadingMetrics, "*", statsDMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(GCMetrics, "*", statsDMetricsListener, permanently = true)
+ val subscriptions = statsDConfig.getConfig("subscriptions")
+ subscriptions.firstLevelKeys.map { subscriptionCategory ⇒
+ subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒
+ Kamon(Metrics).subscribe(subscriptionCategory, pattern, statsDMetricsListener, permanently = true)
+ }
}
def buildMetricsListener(tickInterval: Long, flushInterval: Long, keyGeneratorFQCN: String, config: Config): ActorRef = {
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index 2aac3a52..3241e1f3 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -20,7 +20,7 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
-import kamon.metric.Subscriptions.TickMetricSnapshot
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import java.text.{ DecimalFormatSymbols, DecimalFormat }
import java.util.Locale
@@ -51,11 +51,11 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long,
val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
for (
- (groupIdentity, groupSnapshot) ← tick.metrics;
- (metricIdentity, metricSnapshot) ← groupSnapshot.metrics
+ (entity, snapshot) ← tick.metrics;
+ (metricKey, metricSnapshot) ← snapshot.metrics
) {
- val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity)
+ val key = metricKeyGenerator.generateKey(entity, metricKey)
metricSnapshot match {
case hs: Histogram.Snapshot ⇒
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala
index ed3fae5b..0edeb3df 100644
--- a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala
+++ b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala
@@ -1,7 +1,8 @@
package kamon.statsd
import com.typesafe.config.ConfigFactory
-import kamon.metric.{ MetricGroupCategory, MetricGroupIdentity, MetricIdentity }
+import kamon.metric.instrument.UnitOfMeasurement
+import kamon.metric._
import org.scalatest.{ Matchers, WordSpec }
class SimpleMetricKeyGeneratorSpec extends WordSpec with Matchers {
@@ -68,13 +69,8 @@ class SimpleMetricKeyGeneratorSpec extends WordSpec with Matchers {
}
def buildMetricKey(categoryName: String, entityName: String, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = {
- val metricIdentity = new MetricIdentity { val name: String = metricName }
- val groupIdentity = new MetricGroupIdentity {
- val name: String = entityName
- val category: MetricGroupCategory = new MetricGroupCategory {
- val name: String = categoryName
- }
- }
- metricKeyGenerator.generateKey(groupIdentity, metricIdentity)
+ val metric = HistogramKey(metricName, UnitOfMeasurement.Unknown, Map.empty)
+ val entity = Entity(entityName, categoryName)
+ metricKeyGenerator.generateKey(entity, metric)
}
}
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
index 6c77f321..a0d787d9 100644
--- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
+++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
@@ -18,14 +18,13 @@ package kamon.statsd
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.{ ActorRef, Props, ActorSystem }
-import kamon.{ MilliTimestamp, Kamon }
-import kamon.metric.instrument.Histogram.Precision
-import kamon.metric.instrument.Histogram
+import kamon.Kamon
+import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement }
+import kamon.util.MilliTimestamp
import org.scalatest.{ Matchers, WordSpecLike }
import kamon.metric._
import akka.io.Udp
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import java.lang.management.ManagementFactory
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import java.net.InetSocketAddress
import com.typesafe.config.ConfigFactory
@@ -33,10 +32,6 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
implicit lazy val system: ActorSystem = ActorSystem("statsd-metric-sender-spec", ConfigFactory.parseString(
"""
|kamon {
- | metrics {
- | disable-aspectj-weaver-missing-error = true
- | }
- |
| statsd.simple-metric-key-generator {
| application = kamon
| hostname-override = kamon-host
@@ -56,58 +51,54 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
"the StatsDMetricSender" should {
"flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture {
- val testMetricName = "processing-time"
- val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName)
- val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
- testRecorder.record(10L)
+ val testMetricKey = buildMetricKey(testEntity, "metric-one")
+ val testRecorder = buildRecorder("user/kamon")
+ testRecorder.metricOne.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$testMetricKey:10|ms")
}
"render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture {
- val testMetricName = "processing-time"
- val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName)
- val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
- testRecorder.record(10L)
- testRecorder.record(11L)
- testRecorder.record(12L)
-
- val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
+ val testMetricKey = buildMetricKey(testEntity, "metric-one")
+ val testRecorder = buildRecorder("user/kamon")
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(11L)
+ testRecorder.metricOne.record(12L)
+
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms")
}
"include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture {
- val testMetricName = "processing-time"
- val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName)
- val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
- testRecorder.record(10L)
- testRecorder.record(10L)
+ val testMetricKey = buildMetricKey(testEntity, "metric-one")
+ val testRecorder = buildRecorder("user/kamon")
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"$testMetricKey:10|ms|@0.5")
}
"flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
- val testMetricName = "processing-time"
- val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName)
- val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit)
+ val testMetricKey = buildMetricKey(testEntity, "metric-one")
+ val testRecorder = buildRecorder("user/kamon")
var bytes = testMetricKey.length
var level = 0
while (bytes <= testMaxPacketSize) {
level += 1
- testRecorder.record(level)
+ testRecorder.metricOne.record(level)
bytes += s":$level|ms".length
}
- val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
udp.expectMsgType[Udp.Send] // let the first flush pass
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
@@ -115,51 +106,38 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
}
"render multiple keys in the same packet using newline as separator" in new UdpListenerFixture {
- val firstTestMetricName = "first-test-metric"
- val firstTestMetricKey = buildMetricKey("actor", "/user/kamon", firstTestMetricName)
- val secondTestMetricName = "second-test-metric"
- val secondTestMetricKey = buildMetricKey("actor", "/user/kamon", secondTestMetricName)
+ val testMetricKey1 = buildMetricKey(testEntity, "metric-one")
+ val testMetricKey2 = buildMetricKey(testEntity, "metric-two")
+ val testRecorder = buildRecorder("user/kamon")
- val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
- val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(11L)
- firstTestRecorder.record(10L)
- firstTestRecorder.record(10L)
- firstTestRecorder.record(11L)
+ testRecorder.metricTwo.record(20L)
+ testRecorder.metricTwo.record(21L)
- secondTestRecorder.record(20L)
- secondTestRecorder.record(21L)
-
- val udp = setup(Map(
- firstTestMetricName -> firstTestRecorder.collect(collectionContext),
- secondTestMetricName -> secondTestRecorder.collect(collectionContext)))
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
- data.utf8String should be(s"$firstTestMetricKey:10|ms|@0.5:11|ms\n$secondTestMetricKey:20|ms:21|ms")
+ data.utf8String should be(s"$testMetricKey1:10|ms|@0.5:11|ms\n$testMetricKey2:20|ms:21|ms")
}
}
trait UdpListenerFixture {
val testMaxPacketSize = system.settings.config.getBytes("kamon.statsd.max-packet-size")
- val testGroupIdentity = new MetricGroupIdentity {
- val name: String = "/user/kamon"
- val category: MetricGroupCategory = new MetricGroupCategory {
- val name: String = "actor"
- }
+ val testEntity = Entity("user/kamon", "test")
+
+ def buildMetricKey(entity: Entity, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = {
+ val metricKey = HistogramKey(metricName, UnitOfMeasurement.Unknown, Map.empty)
+ metricKeyGenerator.generateKey(entity, metricKey)
}
- def buildMetricKey(categoryName: String, entityName: String, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = {
- val metricIdentity = new MetricIdentity { val name: String = metricName }
- val groupIdentity = new MetricGroupIdentity {
- val name: String = entityName
- val category: MetricGroupCategory = new MetricGroupCategory {
- val name: String = categoryName
- }
- }
- metricKeyGenerator.generateKey(groupIdentity, metricIdentity)
+ def buildRecorder(name: String): TestEntityRecorder = {
+ Kamon(Metrics).register(TestEntityRecorder, name).get.recorder
}
- def setup(metrics: Map[String, MetricSnapshot]): TestProbe = {
+ def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = {
val udp = TestProbe()
val metricsSender = system.actorOf(Props(new StatsDMetricsSender(new InetSocketAddress("127.0.0.1", 0), testMaxPacketSize, metricKeyGenerator) {
override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref
@@ -169,22 +147,19 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
udp.expectMsgType[Udp.SimpleSender]
udp.reply(Udp.SimpleSenderReady)
- val testMetrics = for ((metricName, snapshot) ← metrics) yield {
- val testMetricIdentity = new MetricIdentity {
- val name: String = metricName
- }
-
- (testMetricIdentity, snapshot)
- }
-
- metricsSender ! TickMetricSnapshot(new MilliTimestamp(0), new MilliTimestamp(0), Map(testGroupIdentity -> new MetricGroupSnapshot {
- type GroupSnapshotType = Histogram.Snapshot
- def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ???
-
- val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap
- }))
-
+ val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics)
+ metricsSender ! fakeSnapshot
udp
}
}
}
+
+class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+ val metricOne = histogram("metric-one")
+ val metricTwo = histogram("metric-two")
+}
+
+object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] {
+ def category: String = "test"
+ def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory)
+}