aboutsummaryrefslogtreecommitdiff
path: root/kamon-statsd
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-06-19 00:47:07 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-06-19 00:47:07 -0300
commit1150d528eb5231993e542c086e2df90cf760d8a7 (patch)
treee1c86f9a115872073c46890b5ef32dbe1bd0bd3d /kamon-statsd
parent35b8a715d78ddd194d410ba0cc2119b5a1caa924 (diff)
parent4abab8df49d1bc5d9a051a8b54852e0712be7b74 (diff)
downloadKamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.gz
Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.bz2
Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.zip
Merge branch 'master' into release-0.2
Conflicts: kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala project/Dependencies.scala project/Projects.scala version.sbt
Diffstat (limited to 'kamon-statsd')
-rw-r--r--kamon-statsd/src/main/resources/reference.conf9
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala6
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala69
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala15
4 files changed, 57 insertions, 42 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf
index f648b7af..fd6293d9 100644
--- a/kamon-statsd/src/main/resources/reference.conf
+++ b/kamon-statsd/src/main/resources/reference.conf
@@ -13,14 +13,15 @@ kamon {
# kamon.metrics.tick-interval setting.
flush-interval = 1 second
- # Max packet size in bytes for UDP metrics data sent to StatsD.
- max-packet-size = 1024
+ # Max packet size for UDP metrics data sent to StatsD.
+ max-packet-size = 1024 bytes
# Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
includes {
- actor = [ "*" ]
- trace = [ "*" ]
+ actor = [ "*" ]
+ trace = [ "*" ]
+ dispatcher = [ "*" ]
}
simple-metric-key-generator {
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
index 7676f97f..f10406ed 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
@@ -60,6 +60,12 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, statsDMetricsListener, permanently = true)
}
+ // Subscribe to Dispatchers
+ val includedDispatchers = statsDConfig.getStringList("includes.dispatcher").asScala
+ for (dispatcherPathPattern ← includedDispatchers) {
+ Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, statsDMetricsListener, permanently = true)
+ }
+
def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index cff970b4..adda18cc 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -20,18 +20,21 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
-import kamon.Kamon
import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics.MetricSnapshot.Measurement
import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
-import java.text.DecimalFormat
+import java.text.{ DecimalFormatSymbols, DecimalFormat }
+import java.util.Locale
-class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends Actor with UdpExtensionProvider {
+class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {
import context.system
val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config)
- val samplingRateFormat = new DecimalFormat()
- samplingRateFormat.setMaximumFractionDigits(128) // Absurdly high, let the other end loss precision if it needs to.
+ val symbols = DecimalFormatSymbols.getInstance(Locale.US)
+ symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.
+
+ // Absurdly high number of decimal digits, let the other end lose precision if it needs to.
+ val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)
udpExtension ! Udp.SimpleSender
@@ -45,14 +48,14 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends
}
def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
- val dataBuilder = new MetricDataPacketBuilder(maxPacketSize, udpSender, remote)
+ val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
for (
(groupIdentity, groupSnapshot) ← tick.metrics;
(metricIdentity, metricSnapshot) ← groupSnapshot.metrics
) {
- val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity))
+ val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity)
for (measurement ← metricSnapshot.measurements) {
val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType)
@@ -63,61 +66,63 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends
dataBuilder.flush()
}
- def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): ByteString = {
- def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): ByteString =
- ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else ""))
+ def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): String = {
+ def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String =
+ value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
instrumentType match {
case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g")
- case Counter ⇒ ByteString.empty // TODO: Need to decide how to report counters, when we have them!
+ case Counter ⇒ statsDMetricFormat(measurement.count.toString, "c")
}
}
}
object StatsDMetricsSender {
- def props(remote: InetSocketAddress, maxPacketSize: Int): Props = Props(new StatsDMetricsSender(remote, maxPacketSize))
+ def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new StatsDMetricsSender(remote, maxPacketSize))
}
trait UdpExtensionProvider {
def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
}
-class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: InetSocketAddress) {
- val metricSeparator = ByteString("\n")
- val measurementSeparator = ByteString(":")
+class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) {
+ val metricSeparator = "\n"
+ val measurementSeparator = ":"
- var lastKey = ByteString.empty
- var buffer = ByteString.empty
+ var lastKey = ""
+ var buffer = new StringBuilder()
- def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = {
+ def appendMeasurement(key: String, measurementData: String): Unit = {
if (key == lastKey) {
- val dataWithoutKey = measurementSeparator ++ measurementData
+ val dataWithoutKey = measurementSeparator + measurementData
if (fitsOnBuffer(dataWithoutKey))
- buffer = buffer ++ dataWithoutKey
+ buffer.append(dataWithoutKey)
else {
- flushToUDP(buffer)
- buffer = key ++ dataWithoutKey
+ flushToUDP(buffer.toString())
+ buffer.clear()
+ buffer.append(key).append(dataWithoutKey)
}
} else {
lastKey = key
- val dataWithoutSeparator = key ++ measurementSeparator ++ measurementData
- if (fitsOnBuffer(metricSeparator ++ dataWithoutSeparator)) {
- val mSeparator = if (buffer.length > 0) metricSeparator else ByteString.empty
- buffer = buffer ++ mSeparator ++ dataWithoutSeparator
+ val dataWithoutSeparator = key + measurementSeparator + measurementData
+ if (fitsOnBuffer(metricSeparator + dataWithoutSeparator)) {
+ val mSeparator = if (buffer.length > 0) metricSeparator else ""
+ buffer.append(mSeparator).append(dataWithoutSeparator)
} else {
- flushToUDP(buffer)
- buffer = dataWithoutSeparator
+ flushToUDP(buffer.toString())
+ buffer.clear()
+ buffer.append(dataWithoutSeparator)
}
}
}
- def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSize
+ def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes
- private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote)
+ private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote)
def flush(): Unit = {
- flushToUDP(buffer)
- buffer = ByteString.empty
+ flushToUDP(buffer.toString)
+ buffer.clear()
}
} \ No newline at end of file
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
index 6fdb48f1..9dfd05f7 100644
--- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
+++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -16,7 +16,7 @@
package kamon.statsd
-import akka.testkit.{ TestKit, TestProbe }
+import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.{ ActorRef, Props, ActorSystem }
import org.scalatest.{ Matchers, WordSpecLike }
import kamon.metrics._
@@ -24,10 +24,12 @@ import akka.io.Udp
import org.HdrHistogram.HdrRecorder
import kamon.metrics.Subscriptions.TickMetricSnapshot
import java.lang.management.ManagementFactory
-import kamon.Kamon
import java.net.InetSocketAddress
+import com.typesafe.config.ConfigFactory
-class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-spec")) with WordSpecLike with Matchers {
+class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers {
+ implicit lazy val system = ActorSystem("statsd-metric-sender-spec",
+ ConfigFactory.parseString("kamon.statsd.max-packet-size = 256 bytes"))
"the StatsDMetricSender" should {
"flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture {
@@ -72,7 +74,7 @@ class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-s
"flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
val testMetricName = "test-metric"
val testMetricKey = buildMetricKey(testMetricName)
- val testRecorder = HdrRecorder(1000L, 3, Scale.Unit)
+ val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit)
var bytes = testMetricKey.length
var level = 0
@@ -94,6 +96,7 @@ class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-s
val firstTestMetricKey = buildMetricKey(firstTestMetricName)
val secondTestMetricName = "second-test-metric"
val secondTestMetricKey = buildMetricKey(secondTestMetricName)
+
val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
@@ -115,7 +118,7 @@ class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-s
trait UdpListenerFixture {
val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
- val testMaxPacketSize = 256
+ val testMaxPacketSize = system.settings.config.getBytes("kamon.statsd.max-packet-size")
def buildMetricKey(metricName: String): String = s"kamon.$localhostName.test-metric-category.test-group.$metricName"