From 1f7c5967e0e93b129754d68f494665d35031d971 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 3 Apr 2014 20:48:02 -0300 Subject: = statsd: honor the max-packet-size setting and include newline char on multi-metric packets --- .../scala/kamon/statsd/StatsDMetricsSender.scala | 20 +++++++++----------- .../src/main/scala/kamon/statsd/Statsd.scala | 12 ++++-------- .../scala/kamon/statsd/StatsdMetricTranslator.scala | 7 +++---- 3 files changed, 16 insertions(+), 23 deletions(-) (limited to 'kamon-statsd/src/main/scala/kamon/statsd') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index b14e6022..cfc228d3 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -28,7 +28,6 @@ class StatsDMetricsSender extends Actor with ActorLogging { val statsDExtension = Kamon(StatsD) val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port) - val maxPacketSize = 1024 IO(Udp) ! Udp.SimpleSender @@ -38,24 +37,23 @@ class StatsDMetricsSender extends Actor with ActorLogging { } def ready(udpSender: ActorRef): Receive = { - case StatsD.MetricBatch(metrics) ⇒ writeDown(metrics, ByteString.empty, udpSender) + case StatsD.MetricBatch(metrics) ⇒ sendMetricsToRemote(metrics, ByteString.empty, udpSender) } + @tailrec final def sendMetricsToRemote(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = { + def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote) - def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote) - - @tailrec final def writeDown(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = { - if(metrics.isEmpty) + if (metrics.isEmpty) flushToRemote(buffer, udpSender) else { - val headData = metrics.head.toByteString - if(buffer.size + headData.size > maxPacketSize) { + val headMetricData = metrics.head.toByteString(includeTrailingNewline = true) + + if (buffer.size + headMetricData.size > statsDExtension.maxPacketSize) { flushToRemote(buffer, udpSender) - writeDown(metrics.tail, headData, udpSender) + sendMetricsToRemote(metrics.tail, headMetricData, udpSender) } else { - writeDown(metrics.tail, buffer ++ headData, udpSender) + sendMetricsToRemote(metrics.tail, buffer ++ headMetricData, udpSender) } - } } } diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala index 0ded1394..167e993e 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala @@ -29,7 +29,6 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = StatsD override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system) - trait MetricKeyGenerator { def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String } @@ -46,8 +45,8 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { * For timing, it provides something like {@code key:millis|ms}. * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate} */ - def toByteString: ByteString = - if(samplingRate >= 1D) + def toByteString(includeTrailingNewline: Boolean = true): ByteString = + if (samplingRate >= 1D) ByteString(s"$key:$value|$suffix") else ByteString(s"$key:$value|$suffix|@$samplingRate") @@ -68,7 +67,6 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { case class MetricBatch(metrics: Iterable[Metric]) } - class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { private val statsDConfig = system.settings.config.getConfig("kamon.statsd") @@ -81,16 +79,15 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) val includedActors = statsDConfig.getStringList("includes.actor").asScala - for(actorPathPattern <- includedActors) { + for (actorPathPattern ← includedActors) { Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) } - def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") val metricsTranslator = system.actorOf(StatsDMetricTranslator.props, "statsd-metrics-translator") - if(flushInterval == tickInterval) { + if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics translator. metricsTranslator } else { @@ -99,7 +96,6 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { } } - class SimpleMetricKeyGenerator(config: Config) extends StatsD.MetricKeyGenerator { val application = config.getString("kamon.statsd.simple-metric-key-generator.application") val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala index 2cf672b8..2ef41c6d 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala @@ -29,7 +29,7 @@ class StatsDMetricTranslator extends Actor { def receive = { case TickMetricSnapshot(from, to, metrics) ⇒ val translatedMetrics = metrics.collect { - case (am @ ActorMetrics(_), snapshot: ActorMetricSnapshot) => transformActorMetric(am, snapshot) + case (am @ ActorMetrics(_), snapshot: ActorMetricSnapshot) ⇒ transformActorMetric(am, snapshot) } metricSender ! StatsD.MetricBatch(translatedMetrics.flatten) @@ -42,9 +42,9 @@ class StatsDMetricTranslator extends Actor { roll(timeInMailboxKey, snapshot.timeInMailbox, StatsD.Timing) ++ roll(processingTimeKey, snapshot.processingTime, StatsD.Timing) } - def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Double, Double) => StatsD.Metric): Vector[StatsD.Metric] = { + def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Double, Double) ⇒ StatsD.Metric): Vector[StatsD.Metric] = { val builder = Vector.newBuilder[StatsD.Metric] - for(measurement <- snapshot.measurements) { + for (measurement ← snapshot.measurements) { val samplingRate = 1D / measurement.count val scaledValue = Scale.convert(snapshot.scale, Scale.Milli, measurement.value) builder += metricBuilder.apply(key, scaledValue, samplingRate) @@ -53,7 +53,6 @@ class StatsDMetricTranslator extends Actor { builder.result() } - } object StatsDMetricTranslator { -- cgit v1.2.3