diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-04-08 02:25:47 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-04-08 02:25:47 -0300 |
commit | d23360425cf41fd5d6a6ab6d6507d3e97bb536e1 (patch) | |
tree | 49059d70b59439a5e2a7d17cd9c7544523e8fc55 /kamon-statsd/src | |
parent | a22e467a9b4131d978861b048a5402c9bf08e20b (diff) | |
download | Kamon-d23360425cf41fd5d6a6ab6d6507d3e97bb536e1.tar.gz Kamon-d23360425cf41fd5d6a6ab6d6507d3e97bb536e1.tar.bz2 Kamon-d23360425cf41fd5d6a6ab6d6507d3e97bb536e1.zip |
= statsd: correctly send multiple packets for a single metric
Diffstat (limited to 'kamon-statsd/src')
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala | 53 | ||||
-rw-r--r-- | kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala | 21 |
2 files changed, 52 insertions, 22 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index 42eb57d0..60404d7e 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -16,14 +16,14 @@ package kamon.statsd -import akka.actor.{ActorSystem, Props, ActorRef, Actor} +import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString import kamon.Kamon import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType} +import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } class StatsDMetricsSender extends Actor with UdpExtensionProvider { import context.system @@ -40,18 +40,20 @@ class StatsDMetricsSender extends Actor with UdpExtensionProvider { } def ready(udpSender: ActorRef): Receive = { - case tick: TickMetricSnapshot => writeMetricsToRemote(tick, udpSender) + case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender) } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { val dataBuilder = new MetricDataPacketBuilder(statsDExtension.maxPacketSize, udpSender, remote) - for((groupIdentity, groupSnapshot) <- tick.metrics; - (metricIdentity, metricSnapshot) <- groupSnapshot.metrics) { + for ( + (groupIdentity, groupSnapshot) ← tick.metrics; + (metricIdentity, metricSnapshot) ← groupSnapshot.metrics + ) { val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity)) - for(measurement <- metricSnapshot.measurements) { + for (measurement ← metricSnapshot.measurements) { val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType) dataBuilder.appendMeasurement(key, measurementData) } @@ -65,9 +67,9 @@ class StatsDMetricsSender extends Actor with UdpExtensionProvider { ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRate else "")) instrumentType match { - case Histogram => statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) - case Gauge => statsDMetricFormat(measurement.value.toString, "g") - case Counter => ByteString.empty // TODO: Need to decide how to report counters, when we have them! + case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) + case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g") + case Counter ⇒ ByteString.empty // TODO: Need to decide how to report counters, when we have them! } } } @@ -84,26 +86,33 @@ class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: I val metricSeparator = ByteString("\n") val measurementSeparator = ByteString(":") - var lastKey= ByteString.empty + var lastKey = ByteString.empty var buffer = ByteString.empty def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = { - val appendData = - if(key == lastKey) - measurementSeparator ++ measurementData + if (key == lastKey) { + val dataWithoutKey = measurementSeparator ++ measurementData + if (fitsOnBuffer(dataWithoutKey)) + buffer = buffer ++ dataWithoutKey else { - lastKey = key - val keySeparator = if(buffer.length == 0) ByteString.empty else metricSeparator - keySeparator ++ key ++ measurementSeparator ++ measurementData + flushToUDP(buffer) + buffer = key ++ dataWithoutKey } - - if(buffer.length + appendData.length >= maxPacketSize) { - flushToUDP(buffer) - buffer = appendData - } else - buffer = buffer ++ appendData + } else { + lastKey = key + val dataWithoutSeparator = key ++ measurementSeparator ++ measurementData + if (fitsOnBuffer(metricSeparator ++ dataWithoutSeparator)) { + val mSeparator = if (buffer.length > 0) metricSeparator else ByteString.empty + buffer = buffer ++ mSeparator ++ dataWithoutSeparator + } else { + flushToUDP(buffer) + buffer = dataWithoutSeparator + } + } } + def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSize + private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote) def flush(): Unit = { diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala index caeaee28..94af4645 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala @@ -25,6 +25,7 @@ import org.HdrHistogram.HdrRecorder import kamon.metrics.Subscriptions.TickMetricSnapshot import java.lang.management.ManagementFactory import com.typesafe.config.ConfigFactory +import kamon.Kamon class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers { @@ -76,6 +77,26 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers data.utf8String should be(s"$testMetricKey:10|ms|@0.5") } + "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { + val testMetricName = "test-metric" + val testMetricKey = buildMetricKey(testMetricName) + val testRecorder = HdrRecorder(1000L, 3, Scale.Unit) + + var bytes = testMetricKey.length + var level = 0 + while(bytes <= Kamon(StatsD).maxPacketSize) { + level += 1 + testRecorder.record(level) + bytes += s":$level|ms".length + } + + val udp = setup(Map(testMetricName -> testRecorder.collect())) + udp.expectMsgType[Udp.Send] // let the first flush pass + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be(s"$testMetricKey:$level|ms") + } + "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { val firstTestMetricName = "first-test-metric" |