aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-04-08 02:25:47 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-04-08 02:25:47 -0300
commitd23360425cf41fd5d6a6ab6d6507d3e97bb536e1 (patch)
tree49059d70b59439a5e2a7d17cd9c7544523e8fc55
parenta22e467a9b4131d978861b048a5402c9bf08e20b (diff)
downloadKamon-d23360425cf41fd5d6a6ab6d6507d3e97bb536e1.tar.gz
Kamon-d23360425cf41fd5d6a6ab6d6507d3e97bb536e1.tar.bz2
Kamon-d23360425cf41fd5d6a6ab6d6507d3e97bb536e1.zip
= statsd: correctly send multiple packets for a single metric
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala53
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala21
2 files changed, 52 insertions, 22 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index 42eb57d0..60404d7e 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -16,14 +16,14 @@
package kamon.statsd
-import akka.actor.{ActorSystem, Props, ActorRef, Actor}
+import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
import kamon.Kamon
import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics.MetricSnapshot.Measurement
-import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType}
+import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
class StatsDMetricsSender extends Actor with UdpExtensionProvider {
import context.system
@@ -40,18 +40,20 @@ class StatsDMetricsSender extends Actor with UdpExtensionProvider {
}
def ready(udpSender: ActorRef): Receive = {
- case tick: TickMetricSnapshot => writeMetricsToRemote(tick, udpSender)
+ case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender)
}
def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
val dataBuilder = new MetricDataPacketBuilder(statsDExtension.maxPacketSize, udpSender, remote)
- for((groupIdentity, groupSnapshot) <- tick.metrics;
- (metricIdentity, metricSnapshot) <- groupSnapshot.metrics) {
+ for (
+ (groupIdentity, groupSnapshot) ← tick.metrics;
+ (metricIdentity, metricSnapshot) ← groupSnapshot.metrics
+ ) {
val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity))
- for(measurement <- metricSnapshot.measurements) {
+ for (measurement ← metricSnapshot.measurements) {
val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType)
dataBuilder.appendMeasurement(key, measurementData)
}
@@ -65,9 +67,9 @@ class StatsDMetricsSender extends Actor with UdpExtensionProvider {
ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRate else ""))
instrumentType match {
- case Histogram => statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
- case Gauge => statsDMetricFormat(measurement.value.toString, "g")
- case Counter => ByteString.empty // TODO: Need to decide how to report counters, when we have them!
+ case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
+ case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g")
+ case Counter ⇒ ByteString.empty // TODO: Need to decide how to report counters, when we have them!
}
}
}
@@ -84,26 +86,33 @@ class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: I
val metricSeparator = ByteString("\n")
val measurementSeparator = ByteString(":")
- var lastKey= ByteString.empty
+ var lastKey = ByteString.empty
var buffer = ByteString.empty
def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = {
- val appendData =
- if(key == lastKey)
- measurementSeparator ++ measurementData
+ if (key == lastKey) {
+ val dataWithoutKey = measurementSeparator ++ measurementData
+ if (fitsOnBuffer(dataWithoutKey))
+ buffer = buffer ++ dataWithoutKey
else {
- lastKey = key
- val keySeparator = if(buffer.length == 0) ByteString.empty else metricSeparator
- keySeparator ++ key ++ measurementSeparator ++ measurementData
+ flushToUDP(buffer)
+ buffer = key ++ dataWithoutKey
}
-
- if(buffer.length + appendData.length >= maxPacketSize) {
- flushToUDP(buffer)
- buffer = appendData
- } else
- buffer = buffer ++ appendData
+ } else {
+ lastKey = key
+ val dataWithoutSeparator = key ++ measurementSeparator ++ measurementData
+ if (fitsOnBuffer(metricSeparator ++ dataWithoutSeparator)) {
+ val mSeparator = if (buffer.length > 0) metricSeparator else ByteString.empty
+ buffer = buffer ++ mSeparator ++ dataWithoutSeparator
+ } else {
+ flushToUDP(buffer)
+ buffer = dataWithoutSeparator
+ }
+ }
}
+ def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSize
+
private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote)
def flush(): Unit = {
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
index caeaee28..94af4645 100644
--- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
+++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
@@ -25,6 +25,7 @@ import org.HdrHistogram.HdrRecorder
import kamon.metrics.Subscriptions.TickMetricSnapshot
import java.lang.management.ManagementFactory
import com.typesafe.config.ConfigFactory
+import kamon.Kamon
class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers {
@@ -76,6 +77,26 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers
data.utf8String should be(s"$testMetricKey:10|ms|@0.5")
}
+ "flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
+ val testMetricName = "test-metric"
+ val testMetricKey = buildMetricKey(testMetricName)
+ val testRecorder = HdrRecorder(1000L, 3, Scale.Unit)
+
+ var bytes = testMetricKey.length
+ var level = 0
+ while(bytes <= Kamon(StatsD).maxPacketSize) {
+ level += 1
+ testRecorder.record(level)
+ bytes += s":$level|ms".length
+ }
+
+ val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ udp.expectMsgType[Udp.Send] // let the first flush pass
+ val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
+
+ data.utf8String should be(s"$testMetricKey:$level|ms")
+ }
+
"render multiple keys in the same packet using newline as separator" in new UdpListenerFixture {
val firstTestMetricName = "first-test-metric"