aboutsummaryrefslogtreecommitdiff
path: root/kamon-statsd
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-04-25 13:17:12 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-04-25 13:18:22 -0300
commita92a45a476dc12ef9c527a3e82b3c5d333e3ec42 (patch)
tree01a4515e28187ca60ec3f52d83c84601e97c7a58 /kamon-statsd
parent2fae6957137b8c2ee62dd50421a9fdc9abeb8907 (diff)
downloadKamon-a92a45a476dc12ef9c527a3e82b3c5d333e3ec42.tar.gz
Kamon-a92a45a476dc12ef9c527a3e82b3c5d333e3ec42.tar.bz2
Kamon-a92a45a476dc12ef9c527a3e82b3c5d333e3ec42.zip
! statsd: the max-packet-size setting is now expressed in bytes rather than a plain Int, fixes #27
Diffstat (limited to 'kamon-statsd')
-rw-r--r--kamon-statsd/src/main/resources/reference.conf4
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala4
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala11
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala12
4 files changed, 16 insertions, 15 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf
index f648b7af..06083623 100644
--- a/kamon-statsd/src/main/resources/reference.conf
+++ b/kamon-statsd/src/main/resources/reference.conf
@@ -13,8 +13,8 @@ kamon {
# kamon.metrics.tick-interval setting.
flush-interval = 1 second
- # Max packet size in bytes for UDP metrics data sent to StatsD.
- max-packet-size = 1024
+ # Max packet size for UDP metrics data sent to StatsD.
+ max-packet-size = 1024 bytes
# Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
index 3dd25b74..2cc9c0c8 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
@@ -44,7 +44,7 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val statsDHost = new InetSocketAddress(statsDConfig.getString("hostname"), statsDConfig.getInt("port"))
val flushInterval = statsDConfig.getDuration("flush-interval", MILLISECONDS)
- val maxPacketSize = statsDConfig.getInt("max-packet-size")
+ val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size")
val tickInterval = system.settings.config.getDuration("kamon.metrics.tick-interval", MILLISECONDS)
val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval)
@@ -64,7 +64,7 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
- val metricsTranslator = system.actorOf(StatsDMetricsSender.props(statsDHost, maxPacketSize), "statsd-metrics-sender")
+ val metricsTranslator = system.actorOf(StatsDMetricsSender.props(statsDHost, maxPacketSizeInBytes), "statsd-metrics-sender")
if (flushInterval == tickInterval) {
// No need to buffer the metrics, let's go straight to the metrics sender.
metricsTranslator
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index cff970b4..e0526f8e 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -20,13 +20,12 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
-import kamon.Kamon
import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics.MetricSnapshot.Measurement
import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
import java.text.DecimalFormat
-class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends Actor with UdpExtensionProvider {
+class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {
import context.system
val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config)
@@ -45,7 +44,7 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends
}
def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
- val dataBuilder = new MetricDataPacketBuilder(maxPacketSize, udpSender, remote)
+ val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
for (
(groupIdentity, groupSnapshot) ← tick.metrics;
@@ -76,14 +75,14 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends
}
object StatsDMetricsSender {
- def props(remote: InetSocketAddress, maxPacketSize: Int): Props = Props(new StatsDMetricsSender(remote, maxPacketSize))
+ def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new StatsDMetricsSender(remote, maxPacketSize))
}
trait UdpExtensionProvider {
def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
}
-class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: InetSocketAddress) {
+class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) {
val metricSeparator = ByteString("\n")
val measurementSeparator = ByteString(":")
@@ -112,7 +111,7 @@ class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: I
}
}
- def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSize
+ def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSizeInBytes
private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote)
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
index 6fdb48f1..8a61d70e 100644
--- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
+++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
@@ -16,7 +16,7 @@
package kamon.statsd
-import akka.testkit.{ TestKit, TestProbe }
+import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.{ ActorRef, Props, ActorSystem }
import org.scalatest.{ Matchers, WordSpecLike }
import kamon.metrics._
@@ -24,10 +24,12 @@ import akka.io.Udp
import org.HdrHistogram.HdrRecorder
import kamon.metrics.Subscriptions.TickMetricSnapshot
import java.lang.management.ManagementFactory
-import kamon.Kamon
import java.net.InetSocketAddress
+import com.typesafe.config.ConfigFactory
-class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-spec")) with WordSpecLike with Matchers {
+class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers {
+ implicit lazy val system = ActorSystem("statsd-metric-sender-spec",
+ ConfigFactory.parseString("kamon.statsd.max-packet-size = 256 bytes"))
"the StatsDMetricSender" should {
"flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture {
@@ -72,7 +74,7 @@ class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-s
"flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
val testMetricName = "test-metric"
val testMetricKey = buildMetricKey(testMetricName)
- val testRecorder = HdrRecorder(1000L, 3, Scale.Unit)
+ val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit)
var bytes = testMetricKey.length
var level = 0
@@ -115,7 +117,7 @@ class StatsDMetricSenderSpec extends TestKit(ActorSystem("statsd-metric-sender-s
trait UdpListenerFixture {
val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
- val testMaxPacketSize = 256
+ val testMaxPacketSize = system.settings.config.getBytes("kamon.statsd.max-packet-size")
def buildMetricKey(metricName: String): String = s"kamon.$localhostName.test-metric-category.test-group.$metricName"