From a92a45a476dc12ef9c527a3e82b3c5d333e3ec42 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Fri, 25 Apr 2014 13:17:12 -0300 Subject: ! statsd: the max-packet-size setting is now expressed in bytes rather than a plain Int, fixes #27 --- kamon-statsd/src/main/resources/reference.conf | 4 ++-- kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala | 4 ++-- .../src/main/scala/kamon/statsd/StatsDMetricsSender.scala | 11 +++++------ .../src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala | 12 +++++++----- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf index f648b7af..06083623 100644 --- a/kamon-statsd/src/main/resources/reference.conf +++ b/kamon-statsd/src/main/resources/reference.conf @@ -13,8 +13,8 @@ kamon { # kamon.metrics.tick-interval setting. flush-interval = 1 second - # Max packet size in bytes for UDP metrics data sent to StatsD. - max-packet-size = 1024 + # Max packet size for UDP metrics data sent to StatsD. + max-packet-size = 1024 bytes # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics # collection for your desired entities must be activated under the kamon.metrics.filters settings. diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 3dd25b74..2cc9c0c8 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -44,7 +44,7 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { val statsDHost = new InetSocketAddress(statsDConfig.getString("hostname"), statsDConfig.getInt("port")) val flushInterval = statsDConfig.getDuration("flush-interval", MILLISECONDS) - val maxPacketSize = statsDConfig.getInt("max-packet-size") + val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size") val tickInterval = system.settings.config.getDuration("kamon.metrics.tick-interval", MILLISECONDS) val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) @@ -64,7 +64,7 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") - val metricsTranslator = system.actorOf(StatsDMetricsSender.props(statsDHost, maxPacketSize), "statsd-metrics-sender") + val metricsTranslator = system.actorOf(StatsDMetricsSender.props(statsDHost, maxPacketSizeInBytes), "statsd-metrics-sender") if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics sender. metricsTranslator diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index cff970b4..e0526f8e 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -20,13 +20,12 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.Kamon import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics.MetricSnapshot.Measurement import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } import java.text.DecimalFormat -class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends Actor with UdpExtensionProvider { +class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { import context.system val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config) @@ -45,7 +44,7 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val dataBuilder = new MetricDataPacketBuilder(maxPacketSize, udpSender, remote) + val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) for ( (groupIdentity, groupSnapshot) ← tick.metrics; @@ -76,14 +75,14 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends } object StatsDMetricsSender { - def props(remote: InetSocketAddress, maxPacketSize: Int): Props = Props(new StatsDMetricsSender(remote, maxPacketSize)) + def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new StatsDMetricsSender(remote, maxPacketSize)) } trait UdpExtensionProvider { def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) } -class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: InetSocketAddress) { +class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) { val metricSeparator = ByteString("\n") val measurementSeparator = ByteString(":") @@ -112,7 +111,7 @@ class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: I } } - def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSize + def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSizeInBytes private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote) diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala index 6fdb48f1..8a61d70e 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala @@ -16,7 +16,7 @@ package kamon.statsd -import akka.testkit.{ TestKit, TestProbe } +import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.{ ActorRef, Props, ActorSystem } import org.scalatest.{ Matchers, WordSpecLike } import kamon.metrics._ @@ -24,10 +24,12 @@ import akka.io.Udp import org.HdrHistogram.HdrRecorder import kamon.metrics.Subscriptions.TickMetricSnapshot import java.lang.management.ManagementFactory -import kamon.Kamon import java.net.InetSocketAddress +import com.typesafe.config.ConfigFactory -class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-spec")) with WordSpecLike with Matchers { +class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers { + implicit lazy val system = ActorSystem("statsd-metric-sender-spec", + ConfigFactory.parseString("kamon.statsd.max-packet-size = 256 bytes")) "the StatsDMetricSender" should { "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture { @@ -72,7 +74,7 @@ class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-s "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { val testMetricName = "test-metric" val testMetricKey = buildMetricKey(testMetricName) - val testRecorder = HdrRecorder(1000L, 3, Scale.Unit) + val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit) var bytes = testMetricKey.length var level = 0 @@ -115,7 +117,7 @@ class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-s trait UdpListenerFixture { val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) - val testMaxPacketSize = 256 + val testMaxPacketSize = system.settings.config.getBytes("kamon.statsd.max-packet-size") def buildMetricKey(metricName: String): String = s"kamon.$localhostName.test-metric-category.test-group.$metricName" -- cgit v1.2.3