diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-06-19 00:47:07 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-06-19 00:47:07 -0300 |
commit | 1150d528eb5231993e542c086e2df90cf760d8a7 (patch) | |
tree | e1c86f9a115872073c46890b5ef32dbe1bd0bd3d /kamon-statsd/src/main/scala/kamon/statsd | |
parent | 35b8a715d78ddd194d410ba0cc2119b5a1caa924 (diff) | |
parent | 4abab8df49d1bc5d9a051a8b54852e0712be7b74 (diff) | |
download | Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.gz Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.bz2 Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.zip |
Merge branch 'master' into release-0.2
Conflicts:
kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
project/Dependencies.scala
project/Projects.scala
version.sbt
Diffstat (limited to 'kamon-statsd/src/main/scala/kamon/statsd')
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala | 6 | ||||
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala | 69 |
2 files changed, 43 insertions, 32 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 7676f97f..f10406ed 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -60,6 +60,12 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, statsDMetricsListener, permanently = true) } + // Subscribe to Dispatchers + val includedDispatchers = statsDConfig.getStringList("includes.dispatcher").asScala + for (dispatcherPathPattern ← includedDispatchers) { + Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, statsDMetricsListener, permanently = true) + } + def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index cff970b4..adda18cc 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -20,18 +20,21 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.Kamon import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics.MetricSnapshot.Measurement import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } -import java.text.DecimalFormat +import java.text.{ DecimalFormatSymbols, DecimalFormat } +import java.util.Locale -class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends Actor with UdpExtensionProvider { +class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { import context.system val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config) - val samplingRateFormat = new DecimalFormat() - samplingRateFormat.setMaximumFractionDigits(128) // Absurdly high, let the other end loss precision if it needs to. + val symbols = DecimalFormatSymbols.getInstance(Locale.US) + symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. + + // Absurdly high number of decimal digits, let the other end lose precision if it needs to. + val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) udpExtension ! Udp.SimpleSender @@ -45,14 +48,14 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val dataBuilder = new MetricDataPacketBuilder(maxPacketSize, udpSender, remote) + val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) for ( (groupIdentity, groupSnapshot) ← tick.metrics; (metricIdentity, metricSnapshot) ← groupSnapshot.metrics ) { - val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity)) + val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity) for (measurement ← metricSnapshot.measurements) { val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType) @@ -63,61 +66,63 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends dataBuilder.flush() } - def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): ByteString = { - def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): ByteString = - ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")) + def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): String = { + def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = + value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") instrumentType match { case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g") - case Counter ⇒ ByteString.empty // TODO: Need to decide how to report counters, when we have them! + case Counter ⇒ statsDMetricFormat(measurement.count.toString, "c") } } } object StatsDMetricsSender { - def props(remote: InetSocketAddress, maxPacketSize: Int): Props = Props(new StatsDMetricsSender(remote, maxPacketSize)) + def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new StatsDMetricsSender(remote, maxPacketSize)) } trait UdpExtensionProvider { def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) } -class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: InetSocketAddress) { - val metricSeparator = ByteString("\n") - val measurementSeparator = ByteString(":") +class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) { + val metricSeparator = "\n" + val measurementSeparator = ":" - var lastKey = ByteString.empty - var buffer = ByteString.empty + var lastKey = "" + var buffer = new StringBuilder() - def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = { + def appendMeasurement(key: String, measurementData: String): Unit = { if (key == lastKey) { - val dataWithoutKey = measurementSeparator ++ measurementData + val dataWithoutKey = measurementSeparator + measurementData if (fitsOnBuffer(dataWithoutKey)) - buffer = buffer ++ dataWithoutKey + buffer.append(dataWithoutKey) else { - flushToUDP(buffer) - buffer = key ++ dataWithoutKey + flushToUDP(buffer.toString()) + buffer.clear() + buffer.append(key).append(dataWithoutKey) } } else { lastKey = key - val dataWithoutSeparator = key ++ measurementSeparator ++ measurementData - if (fitsOnBuffer(metricSeparator ++ dataWithoutSeparator)) { - val mSeparator = if (buffer.length > 0) metricSeparator else ByteString.empty - buffer = buffer ++ mSeparator ++ dataWithoutSeparator + val dataWithoutSeparator = key + measurementSeparator + measurementData + if (fitsOnBuffer(metricSeparator + dataWithoutSeparator)) { + val mSeparator = if (buffer.length > 0) metricSeparator else "" + buffer.append(mSeparator).append(dataWithoutSeparator) } else { - flushToUDP(buffer) - buffer = dataWithoutSeparator + flushToUDP(buffer.toString()) + buffer.clear() + buffer.append(dataWithoutSeparator) } } } - def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSize + def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes - private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote) + private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) def flush(): Unit = { - flushToUDP(buffer) - buffer = ByteString.empty + flushToUDP(buffer.toString) + buffer.clear() } }
\ No newline at end of file |