aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-11-03 20:36:28 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-11-03 20:36:28 +0100
commit7b3f8737564e2d13482bccd6e5d61d6ff0f64560 (patch)
tree92e94e64825b1d4767d4c1c4b5bcf312a7b7fdff
parent88f603e7c0e6965a14c9eb4b8a42f2151f7a8b8c (diff)
parent4bcddee80e277e08ed4afe52a9cf118fcaff1feb (diff)
downloadKamon-7b3f8737564e2d13482bccd6e5d61d6ff0f64560.tar.gz
Kamon-7b3f8737564e2d13482bccd6e5d61d6ff0f64560.tar.bz2
Kamon-7b3f8737564e2d13482bccd6e5d61d6ff0f64560.zip
Merge pull request #270 from jozic/statsd-senders
! statsd: allow custom statsd senders + add simple statsd sender which doesn't batch stats
-rw-r--r--kamon-statsd/src/main/resources/reference.conf17
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala (renamed from kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala)79
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala16
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala62
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala18
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala24
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala76
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/BatchStatsDMetricSenderSpec.scala120
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala18
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala76
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala164
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala81
12 files changed, 519 insertions, 232 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf
index b8aca724..b296c0d9 100644
--- a/kamon-statsd/src/main/resources/reference.conf
+++ b/kamon-statsd/src/main/resources/reference.conf
@@ -14,9 +14,6 @@ kamon {
# kamon.metric.tick-interval setting.
flush-interval = 10 seconds
- # Max packet size for UDP metrics data sent to StatsD.
- max-packet-size = 1024 bytes
-
# Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
subscriptions {
@@ -63,6 +60,20 @@ kamon {
# version of StatsD or if you are running your own, customized version of StatsD that supports this.
metric-name-normalization-strategy = normalize
}
+
+ # FQCN of the implementation of `kamon.statsd.StatsDMetricsSenderFactory` to be instantiated and use for
+ # creating StatsD sender. Provided implementations are:
+ # - `kamon.statsd.BatchStatsDMetricsSender`. Sends a UDP packet every "kamon.statsd.flush-interval" or
+ # as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached. Default one.
+ # - `kamon.statsd.SimpleStatsDMetricsSender`. Sends a UDP packet for each piece of data it receives.
+ metric-sender-factory = kamon.statsd.BatchStatsDMetricsSender
+
+ # Settings for `kamon.statsd.BatchStatsDMetricsSender`.
+ # Used only if kamon.statsd.metric-sender-factory is set to `kamon.statsd.BatchStatsDMetricsSender`
+ batch-metric-sender {
+ # Max packet size for UDP metrics data sent to StatsD.
+ max-packet-size = 1024 bytes
+ }
}
modules {
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala
index 70ff1b45..be9a572a 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -16,41 +16,34 @@
package kamon.statsd
-import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
-import akka.io.{ Udp, IO }
-import java.net.InetSocketAddress
-import akka.util.ByteString
+import akka.actor.Props
+import com.typesafe.config.Config
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
-import java.text.{ DecimalFormatSymbols, DecimalFormat }
-import java.util.Locale
-
import kamon.metric.instrument.{ Counter, Histogram }
-class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBytes: Long, metricKeyGenerator: MetricKeyGenerator)
- extends Actor with UdpExtensionProvider {
- import context.system
-
- val symbols = DecimalFormatSymbols.getInstance(Locale.US)
- symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.
-
- // Absurdly high number of decimal digits, let the other end lose precision if it needs to.
- val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)
-
- udpExtension ! Udp.SimpleSender
-
- def newSocketAddress = new InetSocketAddress(statsDHost, statsDPort)
+/**
+ * Factory for [[BatchStatsDMetricsSender]].
+ * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender"
+ * to select [[BatchStatsDMetricsSender]] as your sender
+ */
+object BatchStatsDMetricsSender extends StatsDMetricsSenderFactory {
+ override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props =
+ Props(new BatchStatsDMetricsSender(statsDConfig, metricKeyGenerator))
+}
- def receive = {
- case Udp.SimpleSenderReady ⇒
- context.become(ready(sender))
- }
+/**
+ * StatsD sender which sends a UDP packet every "kamon.statsd.flush-interval" or
+ * as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached.
+ * @param statsDConfig Config to read settings specific to this sender
+ * @param metricKeyGenerator Key generator for all metrics sent by this sender
+ */
+class BatchStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
+ extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) {
- def ready(udpSender: ActorRef): Receive = {
- case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender)
- }
+ val maxPacketSizeInBytes = statsDConfig.getBytes("batch-metric-sender.max-packet-size")
- def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
- val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, newSocketAddress)
+ def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = {
+ val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, flushToUDP)
for (
(entity, snapshot) ← tick.metrics;
@@ -72,25 +65,9 @@ class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBy
packetBuilder.flush()
}
-
- def encodeStatsDTimer(level: Long, count: Long): String = {
- val samplingRate: Double = 1D / count
- level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
- }
-
- def encodeStatsDCounter(count: Long): String = count.toString + "|c"
}
-object StatsDMetricsSender {
- def props(statsDHost: String, statsDPort: Int, maxPacketSize: Long, metricKeyGenerator: MetricKeyGenerator): Props =
- Props(new StatsDMetricsSender(statsDHost, statsDPort, maxPacketSize, metricKeyGenerator))
-}
-
-trait UdpExtensionProvider {
- def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
-}
-
-class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) {
+class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, flushToUDP: String ⇒ Unit) {
val metricSeparator = "\n"
val measurementSeparator = ":"
@@ -103,8 +80,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r
if (fitsOnBuffer(dataWithoutKey))
buffer.append(dataWithoutKey)
else {
- flushToUDP(buffer.toString())
- buffer.clear()
+ flush()
buffer.append(key).append(dataWithoutKey)
}
} else {
@@ -114,8 +90,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r
val mSeparator = if (buffer.length > 0) metricSeparator else ""
buffer.append(mSeparator).append(dataWithoutSeparator)
} else {
- flushToUDP(buffer.toString())
- buffer.clear()
+ flush()
buffer.append(dataWithoutSeparator)
}
}
@@ -123,8 +98,6 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r
def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes
- private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote)
-
def flush(): Unit = {
flushToUDP(buffer.toString)
buffer.clear()
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala
index 97a27ff3..c4d8682a 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala
@@ -1,3 +1,19 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.statsd
import java.lang.management.ManagementFactory
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala
new file mode 100644
index 00000000..47ce66cd
--- /dev/null
+++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala
@@ -0,0 +1,62 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import akka.actor.Props
+import com.typesafe.config.Config
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric.instrument.{ Counter, Histogram }
+
+/**
+ * Factory for [[SimpleStatsDMetricsSender]].
+ * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender"
+ * to select [[SimpleStatsDMetricsSender]] as your sender
+ */
+object SimpleStatsDMetricsSender extends StatsDMetricsSenderFactory {
+ override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props =
+ Props(new SimpleStatsDMetricsSender(statsDConfig, metricKeyGenerator))
+}
+
+/**
+ * "Traditional" StatsD sender which sends a UDP packet for each piece of data it receives.
+ * @param statsDConfig Config to read settings specific to this sender
+ * @param metricKeyGenerator Key generator for all metrics sent by this sender
+ */
+class SimpleStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
+ extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) {
+
+ def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = {
+
+ for (
+ (entity, snapshot) ← tick.metrics;
+ (metricKey, metricSnapshot) ← snapshot.metrics
+ ) {
+
+ val keyPrefix = metricKeyGenerator.generateKey(entity, metricKey) + ":"
+
+ metricSnapshot match {
+ case hs: Histogram.Snapshot ⇒
+ hs.recordsIterator.foreach { record ⇒
+ flushToUDP(keyPrefix + encodeStatsDTimer(record.level, record.count))
+ }
+
+ case cs: Counter.Snapshot ⇒
+ flushToUDP(keyPrefix + encodeStatsDCounter(cs.count))
+ }
+ }
+ }
+}
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
index 91d05510..a1f7dca3 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -23,8 +23,6 @@ import kamon.util.ConfigTools.Syntax
import scala.concurrent.duration._
import com.typesafe.config.Config
import akka.event.Logging
-import java.net.InetSocketAddress
-import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.collection.JavaConverters._
object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
@@ -44,10 +42,10 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val tickInterval = metricsExtension.settings.tickInterval
val flushInterval = statsDConfig.getFiniteDuration("flush-interval")
- val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size")
val keyGeneratorFQCN = statsDConfig.getString("metric-key-generator")
+ val senderFactoryFQCN = statsDConfig.getString("metric-sender-factory")
- val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config)
+ val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, senderFactoryFQCN, config)
val subscriptions = statsDConfig.getConfig("subscriptions")
subscriptions.firstLevelKeys.map { subscriptionCategory ⇒
@@ -56,15 +54,13 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
}
}
- def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, keyGeneratorFQCN: String, config: Config): ActorRef = {
+ def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration,
+ keyGeneratorFQCN: String, senderFactoryFQCN: String, config: Config): ActorRef = {
assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
val keyGenerator = system.dynamicAccess.createInstanceFor[MetricKeyGenerator](keyGeneratorFQCN, (classOf[Config], config) :: Nil).get
+ val senderFactory = system.dynamicAccess.getObjectFor[StatsDMetricsSenderFactory](senderFactoryFQCN).get
- val metricsSender = system.actorOf(StatsDMetricsSender.props(
- statsDConfig.getString("hostname"),
- statsDConfig.getInt("port"),
- maxPacketSizeInBytes,
- keyGenerator), "statsd-metrics-sender")
+ val metricsSender = system.actorOf(senderFactory.props(statsDConfig, keyGenerator), "statsd-metrics-sender")
if (flushInterval == tickInterval) {
// No need to buffer the metrics, let's go straight to the metrics sender.
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala
new file mode 100644
index 00000000..1f603aea
--- /dev/null
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala
@@ -0,0 +1,24 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import akka.actor.Props
+import com.typesafe.config.Config
+
+trait StatsDMetricsSenderFactory {
+ def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props
+}
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala
new file mode 100644
index 00000000..9e856eda
--- /dev/null
+++ b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala
@@ -0,0 +1,76 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import java.net.InetSocketAddress
+import java.text.{ DecimalFormat, DecimalFormatSymbols }
+import java.util.Locale
+import akka.actor.{ Actor, ActorRef, ActorSystem }
+import akka.io.{ IO, Udp }
+import akka.util.ByteString
+import com.typesafe.config.Config
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+
+/**
+ * Base class for different StatsD senders utilizing UDP protocol. It implies use of one statsd server.
+ * @param statsDConfig Config to read settings specific to this sender
+ * @param metricKeyGenerator Key generator for all metrics sent by this sender
+ */
+abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
+ extends Actor with UdpExtensionProvider {
+
+ import context.system
+
+ val statsDHost = statsDConfig.getString("hostname")
+ val statsDPort = statsDConfig.getInt("port")
+
+ val symbols = DecimalFormatSymbols.getInstance(Locale.US)
+ symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.
+
+ // Absurdly high number of decimal digits, let the other end lose precision if it needs to.
+ val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)
+
+ udpExtension ! Udp.SimpleSender
+
+ lazy val socketAddress = new InetSocketAddress(statsDHost, statsDPort)
+
+ def receive = {
+ case Udp.SimpleSenderReady ⇒
+ context.become(ready(sender))
+ }
+
+ def ready(udpSender: ActorRef): Receive = {
+ case tick: TickMetricSnapshot ⇒
+ writeMetricsToRemote(tick,
+ (data: String) ⇒ udpSender ! Udp.Send(ByteString(data), socketAddress))
+ }
+
+ def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit
+
+ def encodeStatsDTimer(level: Long, count: Long): String = {
+ val samplingRate: Double = 1D / count
+ level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
+ }
+
+ def encodeStatsDCounter(count: Long): String = count.toString + "|c"
+
+}
+
+trait UdpExtensionProvider {
+ def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
+}
+
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/BatchStatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/BatchStatsDMetricSenderSpec.scala
new file mode 100644
index 00000000..8474a5ce
--- /dev/null
+++ b/kamon-statsd/src/test/scala/kamon/statsd/BatchStatsDMetricSenderSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import akka.testkit.TestProbe
+import akka.actor.{ ActorRef, Props, ActorSystem }
+import akka.io.Udp
+import com.typesafe.config.ConfigFactory
+
+class BatchStatsDMetricSenderSpec extends UDPBasedStatsDMetricSenderSpec("batch-statsd-metric-sender-spec") {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon {
+ | statsd {
+ | hostname = "127.0.0.1"
+ | port = 0
+ | simple-metric-key-generator {
+ | application = kamon
+ | hostname-override = kamon-host
+ | include-hostname = true
+ | metric-name-normalization-strategy = normalize
+ | }
+ | batch-metric-sender.max-packet-size = 1024
+ | }
+ |}
+ |
+ """.stripMargin)
+
+ val testMaxPacketSize = statsDConfig.getBytes("batch-metric-sender.max-packet-size")
+
+ trait BatchSenderFixture extends UdpListenerFixture {
+ override def newSender(udpProbe: TestProbe) =
+ Props(new BatchStatsDMetricsSender(statsDConfig, metricKeyGenerator) {
+ override def udpExtension(implicit system: ActorSystem): ActorRef = udpProbe.ref
+ })
+ }
+
+ "the BatchStatsDMetricSender" should {
+ "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new BatchSenderFixture {
+ val testMetricKey = buildMetricKey(testEntity, "metric-one")
+ val testRecorder = buildRecorder("user/kamon")
+ testRecorder.metricOne.record(10L)
+
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
+
+ expectUDPPacket(s"$testMetricKey:10|ms", udp)
+ }
+
+ "render several measurements of the same key under a single (key + multiple measurements) packet" in new BatchSenderFixture {
+ val testMetricKey = buildMetricKey(testEntity, "metric-one")
+ val testRecorder = buildRecorder("user/kamon")
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(11L)
+ testRecorder.metricOne.record(12L)
+
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
+
+ expectUDPPacket(s"$testMetricKey:10|ms:11|ms:12|ms", udp)
+ }
+
+ "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new BatchSenderFixture {
+ val testMetricKey = buildMetricKey(testEntity, "metric-one")
+ val testRecorder = buildRecorder("user/kamon")
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(10L)
+
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
+
+ expectUDPPacket(s"$testMetricKey:10|ms|@0.5", udp)
+ }
+
+ "flush the packet when the max-packet-size is reached" in new BatchSenderFixture {
+ val testMetricKey = buildMetricKey(testEntity, "metric-one")
+ val testRecorder = buildRecorder("user/kamon")
+
+ var bytes = testMetricKey.length
+ var level = 0
+ while (bytes <= testMaxPacketSize) {
+ level += 1
+ testRecorder.metricOne.record(level)
+ bytes += s":$level|ms".length
+ }
+
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
+ udp.expectMsgType[Udp.Send] // let the first flush pass
+ expectUDPPacket(s"$testMetricKey:$level|ms", udp)
+ }
+
+ "render multiple keys in the same packet using newline as separator" in new BatchSenderFixture {
+ val testMetricKey1 = buildMetricKey(testEntity, "metric-one")
+ val testMetricKey2 = buildMetricKey(testEntity, "metric-two")
+ val testRecorder = buildRecorder("user/kamon")
+
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(11L)
+
+ testRecorder.metricTwo.record(20L)
+ testRecorder.metricTwo.record(21L)
+
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
+ expectUDPPacket(s"$testMetricKey1:10|ms|@0.5:11|ms\n$testMetricKey2:20|ms:21|ms", udp)
+ }
+ }
+}
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala
index 2e03a59d..84c0b6aa 100644
--- a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala
+++ b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala
@@ -1,3 +1,19 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.statsd
import com.typesafe.config.ConfigFactory
@@ -17,7 +33,7 @@ class SimpleMetricKeyGeneratorSpec extends WordSpec with Matchers {
|}
""".stripMargin)
- "the StatsDMetricSender" should {
+ "the SimpleMetricKeyGenerator" should {
"generate metric names that follow the application.host.entity.entity-name.metric-name pattern by default" in {
implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(defaultConfiguration) {
override def hostName: String = "localhost"
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala
new file mode 100644
index 00000000..2e909d8b
--- /dev/null
+++ b/kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala
@@ -0,0 +1,76 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import akka.actor.{ ActorSystem, Props, ActorRef }
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+
+class SimpleStatsDMetricsSenderSpec extends UDPBasedStatsDMetricSenderSpec("simple-statsd-metric-sender-spec") {
+
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon {
+ | statsd {
+ | hostname = "127.0.0.1"
+ | port = 0
+ | simple-metric-key-generator {
+ | application = kamon
+ | hostname-override = kamon-host
+ | include-hostname = true
+ | metric-name-normalization-strategy = normalize
+ | }
+ | }
+ |}
+ |
+ """.stripMargin)
+
+ trait SimpleSenderFixture extends UdpListenerFixture {
+ override def newSender(udpProbe: TestProbe) =
+ Props(new SimpleStatsDMetricsSender(statsDConfig, metricKeyGenerator) {
+ override def udpExtension(implicit system: ActorSystem): ActorRef = udpProbe.ref
+ })
+ }
+
+ "the SimpleStatsDMetricSender" should {
+ "flush the metrics data for each unique value it receives" in new SimpleSenderFixture {
+
+ val testMetricKey1 = buildMetricKey(testEntity, "metric-one")
+ val testMetricKey2 = buildMetricKey(testEntity, "metric-two")
+ val testRecorder = buildRecorder("user/kamon")
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(30L)
+ testRecorder.metricTwo.record(20L)
+
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
+ expectUDPPacket(s"$testMetricKey1:10|ms", udp)
+ expectUDPPacket(s"$testMetricKey1:30|ms", udp)
+ expectUDPPacket(s"$testMetricKey2:20|ms", udp)
+ }
+
+ "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new SimpleSenderFixture {
+ val testMetricKey = buildMetricKey(testEntity, "metric-one")
+ val testRecorder = buildRecorder("user/kamon")
+ testRecorder.metricOne.record(10L)
+ testRecorder.metricOne.record(10L)
+
+ val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
+ expectUDPPacket(s"$testMetricKey:10|ms|@0.5", udp)
+ }
+ }
+}
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
deleted file mode 100644
index 51e7cf19..00000000
--- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.statsd
-
-import akka.testkit.{ TestKitBase, TestProbe }
-import akka.actor.{ ActorRef, Props, ActorSystem }
-import kamon.Kamon
-import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement }
-import kamon.testkit.BaseKamonSpec
-import kamon.util.MilliTimestamp
-import org.scalatest.{ Matchers, WordSpecLike }
-import kamon.metric._
-import akka.io.Udp
-import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
-import java.net.InetSocketAddress
-import com.typesafe.config.ConfigFactory
-
-class StatsDMetricSenderSpec extends BaseKamonSpec("statsd-metric-sender-spec") {
- override lazy val config =
- ConfigFactory.parseString(
- """
- |kamon {
- | statsd.simple-metric-key-generator {
- | application = kamon
- | hostname-override = kamon-host
- | include-hostname = true
- | metric-name-normalization-strategy = normalize
- | }
- |}
- |
- """.stripMargin)
-
- implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(system.settings.config) {
- override def hostName: String = "localhost_local"
- }
-
- "the StatsDMetricSender" should {
- "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture {
- val testMetricKey = buildMetricKey(testEntity, "metric-one")
- val testRecorder = buildRecorder("user/kamon")
- testRecorder.metricOne.record(10L)
-
- val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
- val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
-
- data.utf8String should be(s"$testMetricKey:10|ms")
- }
-
- "render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture {
- val testMetricKey = buildMetricKey(testEntity, "metric-one")
- val testRecorder = buildRecorder("user/kamon")
- testRecorder.metricOne.record(10L)
- testRecorder.metricOne.record(11L)
- testRecorder.metricOne.record(12L)
-
- val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
- val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
-
- data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms")
- }
-
- "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture {
- val testMetricKey = buildMetricKey(testEntity, "metric-one")
- val testRecorder = buildRecorder("user/kamon")
- testRecorder.metricOne.record(10L)
- testRecorder.metricOne.record(10L)
-
- val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
- val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
-
- data.utf8String should be(s"$testMetricKey:10|ms|@0.5")
- }
-
- "flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
- val testMetricKey = buildMetricKey(testEntity, "metric-one")
- val testRecorder = buildRecorder("user/kamon")
-
- var bytes = testMetricKey.length
- var level = 0
- while (bytes <= testMaxPacketSize) {
- level += 1
- testRecorder.metricOne.record(level)
- bytes += s":$level|ms".length
- }
-
- val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
- udp.expectMsgType[Udp.Send] // let the first flush pass
- val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
-
- data.utf8String should be(s"$testMetricKey:$level|ms")
- }
-
- "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture {
- val testMetricKey1 = buildMetricKey(testEntity, "metric-one")
- val testMetricKey2 = buildMetricKey(testEntity, "metric-two")
- val testRecorder = buildRecorder("user/kamon")
-
- testRecorder.metricOne.record(10L)
- testRecorder.metricOne.record(10L)
- testRecorder.metricOne.record(11L)
-
- testRecorder.metricTwo.record(20L)
- testRecorder.metricTwo.record(21L)
-
- val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext)))
- val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
-
- data.utf8String should be(s"$testMetricKey1:10|ms|@0.5:11|ms\n$testMetricKey2:20|ms:21|ms")
- }
- }
-
- trait UdpListenerFixture {
- val testMaxPacketSize = system.settings.config.getBytes("kamon.statsd.max-packet-size")
- val testEntity = Entity("user/kamon", "test")
-
- def buildMetricKey(entity: Entity, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = {
- val metricKey = HistogramKey(metricName, UnitOfMeasurement.Unknown)
- metricKeyGenerator.generateKey(entity, metricKey)
- }
-
- def buildRecorder(name: String): TestEntityRecorder = {
- Kamon.metrics.entity(TestEntityRecorder, name)
- }
-
- def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = {
- val udp = TestProbe()
- val metricsSender = system.actorOf(Props(new StatsDMetricsSender("127.0.0.1", 0, testMaxPacketSize, metricKeyGenerator) {
- override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref
- }))
-
- // Setup the SimpleSender
- udp.expectMsgType[Udp.SimpleSender]
- udp.reply(Udp.SimpleSenderReady)
-
- val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics)
- metricsSender ! fakeSnapshot
- udp
- }
- }
-}
-
-class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
- val metricOne = histogram("metric-one")
- val metricTwo = histogram("metric-two")
-}
-
-object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] {
- def category: String = "test"
- def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory)
-}
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala
new file mode 100644
index 00000000..d004adaa
--- /dev/null
+++ b/kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala
@@ -0,0 +1,81 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import akka.actor.Props
+import akka.io.Udp
+import akka.testkit.TestProbe
+import kamon.Kamon
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement }
+import kamon.metric._
+import kamon.testkit.BaseKamonSpec
+import kamon.util.MilliTimestamp
+
+abstract class UDPBasedStatsDMetricSenderSpec(actorSystemName: String) extends BaseKamonSpec(actorSystemName) {
+
+ implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(system.settings.config) {
+ override def hostName: String = "localhost_local"
+ }
+
+ val statsDConfig = config.getConfig("kamon.statsd")
+
+ trait UdpListenerFixture {
+ val testEntity = Entity("user/kamon", "test")
+
+ def buildMetricKey(entity: Entity, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = {
+ val metricKey = HistogramKey(metricName, UnitOfMeasurement.Unknown)
+ metricKeyGenerator.generateKey(entity, metricKey)
+ }
+
+ def buildRecorder(name: String): TestEntityRecorder =
+ Kamon.metrics.entity(TestEntityRecorder, name)
+
+ def newSender(udpProbe: TestProbe): Props
+
+ def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = {
+ val udp = TestProbe()
+ val metricsSender = system.actorOf(newSender(udp))
+
+ // Setup the SimpleSender
+ udp.expectMsgType[Udp.SimpleSender]
+ udp.reply(Udp.SimpleSenderReady)
+
+ val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics)
+ metricsSender ! fakeSnapshot
+ udp
+ }
+
+ def expectUDPPacket(expected: String, udp: TestProbe): Unit = {
+ val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
+ data.utf8String should be(expected)
+ }
+ }
+
+ class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+ val metricOne = histogram("metric-one")
+ val metricTwo = histogram("metric-two")
+ }
+
+ object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] {
+ def category: String = "test"
+
+ def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory)
+ }
+
+}
+