aboutsummaryrefslogtreecommitdiff
path: root/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-04-03 20:48:02 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-04-03 20:48:02 -0300
commit1f7c5967e0e93b129754d68f494665d35031d971 (patch)
treec562b2fb9ca2a06a1a9b9908d5ad8f0537fdf8ee /kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
parentde6a2c9bd6ab6034f59ee51b0eb48beae3d70561 (diff)
downloadKamon-1f7c5967e0e93b129754d68f494665d35031d971.tar.gz
Kamon-1f7c5967e0e93b129754d68f494665d35031d971.tar.bz2
Kamon-1f7c5967e0e93b129754d68f494665d35031d971.zip
= statsd: honor the max-packet-size setting and include newline char on multi-metric packets
Diffstat (limited to 'kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala')
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala20
1 files changed, 9 insertions, 11 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index b14e6022..cfc228d3 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -28,7 +28,6 @@ class StatsDMetricsSender extends Actor with ActorLogging {
val statsDExtension = Kamon(StatsD)
val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port)
- val maxPacketSize = 1024
IO(Udp) ! Udp.SimpleSender
@@ -38,24 +37,23 @@ class StatsDMetricsSender extends Actor with ActorLogging {
}
def ready(udpSender: ActorRef): Receive = {
- case StatsD.MetricBatch(metrics) ⇒ writeDown(metrics, ByteString.empty, udpSender)
+ case StatsD.MetricBatch(metrics) ⇒ sendMetricsToRemote(metrics, ByteString.empty, udpSender)
}
+ @tailrec final def sendMetricsToRemote(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = {
+ def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote)
- def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote)
-
- @tailrec final def writeDown(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = {
- if(metrics.isEmpty)
+ if (metrics.isEmpty)
flushToRemote(buffer, udpSender)
else {
- val headData = metrics.head.toByteString
- if(buffer.size + headData.size > maxPacketSize) {
+ val headMetricData = metrics.head.toByteString(includeTrailingNewline = true)
+
+ if (buffer.size + headMetricData.size > statsDExtension.maxPacketSize) {
flushToRemote(buffer, udpSender)
- writeDown(metrics.tail, headData, udpSender)
+ sendMetricsToRemote(metrics.tail, headMetricData, udpSender)
} else {
- writeDown(metrics.tail, buffer ++ headData, udpSender)
+ sendMetricsToRemote(metrics.tail, buffer ++ headMetricData, udpSender)
}
-
}
}
}