diff options
12 files changed, 519 insertions, 232 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf index b8aca724..b296c0d9 100644 --- a/kamon-statsd/src/main/resources/reference.conf +++ b/kamon-statsd/src/main/resources/reference.conf @@ -14,9 +14,6 @@ kamon { # kamon.metric.tick-interval setting. flush-interval = 10 seconds - # Max packet size for UDP metrics data sent to StatsD. - max-packet-size = 1024 bytes - # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics # collection for your desired entities must be activated under the kamon.metrics.filters settings. subscriptions { @@ -63,6 +60,20 @@ kamon { # version of StatsD or if you are running your own, customized version of StatsD that supports this. metric-name-normalization-strategy = normalize } + + # FQCN of the implementation of `kamon.statsd.StatsDMetricsSenderFactory` to be instantiated and use for + # creating StatsD sender. Provided implementations are: + # - `kamon.statsd.BatchStatsDMetricsSender`. Sends a UDP packet every "kamon.statsd.flush-interval" or + # as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached. Default one. + # - `kamon.statsd.SimpleStatsDMetricsSender`. Sends a UDP packet for each piece of data it receives. + metric-sender-factory = kamon.statsd.BatchStatsDMetricsSender + + # Settings for `kamon.statsd.BatchStatsDMetricsSender`. + # Used only if kamon.statsd.metric-sender-factory is set to `kamon.statsd.BatchStatsDMetricsSender` + batch-metric-sender { + # Max packet size for UDP metrics data sent to StatsD. + max-packet-size = 1024 bytes + } } modules { diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala index 70ff1b45..be9a572a 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -16,41 +16,34 @@ package kamon.statsd -import akka.actor.{ ActorSystem, Props, ActorRef, Actor } -import akka.io.{ Udp, IO } -import java.net.InetSocketAddress -import akka.util.ByteString +import akka.actor.Props +import com.typesafe.config.Config import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot -import java.text.{ DecimalFormatSymbols, DecimalFormat } -import java.util.Locale - import kamon.metric.instrument.{ Counter, Histogram } -class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBytes: Long, metricKeyGenerator: MetricKeyGenerator) - extends Actor with UdpExtensionProvider { - import context.system - - val symbols = DecimalFormatSymbols.getInstance(Locale.US) - symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. - - // Absurdly high number of decimal digits, let the other end lose precision if it needs to. - val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) - - udpExtension ! Udp.SimpleSender - - def newSocketAddress = new InetSocketAddress(statsDHost, statsDPort) +/** + * Factory for [[BatchStatsDMetricsSender]]. + * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender" + * to select [[BatchStatsDMetricsSender]] as your sender + */ +object BatchStatsDMetricsSender extends StatsDMetricsSenderFactory { + override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props = + Props(new BatchStatsDMetricsSender(statsDConfig, metricKeyGenerator)) +} - def receive = { - case Udp.SimpleSenderReady ⇒ - context.become(ready(sender)) - } +/** + * StatsD sender which sends a UDP packet every "kamon.statsd.flush-interval" or + * as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached. + * @param statsDConfig Config to read settings specific to this sender + * @param metricKeyGenerator Key generator for all metrics sent by this sender + */ +class BatchStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) + extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) { - def ready(udpSender: ActorRef): Receive = { - case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender) - } + val maxPacketSizeInBytes = statsDConfig.getBytes("batch-metric-sender.max-packet-size") - def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, newSocketAddress) + def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = { + val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, flushToUDP) for ( (entity, snapshot) ← tick.metrics; @@ -72,25 +65,9 @@ class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBy packetBuilder.flush() } - - def encodeStatsDTimer(level: Long, count: Long): String = { - val samplingRate: Double = 1D / count - level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") - } - - def encodeStatsDCounter(count: Long): String = count.toString + "|c" } -object StatsDMetricsSender { - def props(statsDHost: String, statsDPort: Int, maxPacketSize: Long, metricKeyGenerator: MetricKeyGenerator): Props = - Props(new StatsDMetricsSender(statsDHost, statsDPort, maxPacketSize, metricKeyGenerator)) -} - -trait UdpExtensionProvider { - def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) -} - -class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) { +class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, flushToUDP: String ⇒ Unit) { val metricSeparator = "\n" val measurementSeparator = ":" @@ -103,8 +80,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r if (fitsOnBuffer(dataWithoutKey)) buffer.append(dataWithoutKey) else { - flushToUDP(buffer.toString()) - buffer.clear() + flush() buffer.append(key).append(dataWithoutKey) } } else { @@ -114,8 +90,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r val mSeparator = if (buffer.length > 0) metricSeparator else "" buffer.append(mSeparator).append(dataWithoutSeparator) } else { - flushToUDP(buffer.toString()) - buffer.clear() + flush() buffer.append(dataWithoutSeparator) } } @@ -123,8 +98,6 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes - private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) - def flush(): Unit = { flushToUDP(buffer.toString) buffer.clear() diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala index 97a27ff3..c4d8682a 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala @@ -1,3 +1,19 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.statsd import java.lang.management.ManagementFactory diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala new file mode 100644 index 00000000..47ce66cd --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala @@ -0,0 +1,62 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.Props +import com.typesafe.config.Config +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument.{ Counter, Histogram } + +/** + * Factory for [[SimpleStatsDMetricsSender]]. + * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender" + * to select [[SimpleStatsDMetricsSender]] as your sender + */ +object SimpleStatsDMetricsSender extends StatsDMetricsSenderFactory { + override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props = + Props(new SimpleStatsDMetricsSender(statsDConfig, metricKeyGenerator)) +} + +/** + * "Traditional" StatsD sender which sends a UDP packet for each piece of data it receives. + * @param statsDConfig Config to read settings specific to this sender + * @param metricKeyGenerator Key generator for all metrics sent by this sender + */ +class SimpleStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) + extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) { + + def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = { + + for ( + (entity, snapshot) ← tick.metrics; + (metricKey, metricSnapshot) ← snapshot.metrics + ) { + + val keyPrefix = metricKeyGenerator.generateKey(entity, metricKey) + ":" + + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + hs.recordsIterator.foreach { record ⇒ + flushToUDP(keyPrefix + encodeStatsDTimer(record.level, record.count)) + } + + case cs: Counter.Snapshot ⇒ + flushToUDP(keyPrefix + encodeStatsDCounter(cs.count)) + } + } + } +} diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 91d05510..a1f7dca3 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -23,8 +23,6 @@ import kamon.util.ConfigTools.Syntax import scala.concurrent.duration._ import com.typesafe.config.Config import akka.event.Logging -import java.net.InetSocketAddress -import java.util.concurrent.TimeUnit.MILLISECONDS import scala.collection.JavaConverters._ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { @@ -44,10 +42,10 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { val tickInterval = metricsExtension.settings.tickInterval val flushInterval = statsDConfig.getFiniteDuration("flush-interval") - val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size") val keyGeneratorFQCN = statsDConfig.getString("metric-key-generator") + val senderFactoryFQCN = statsDConfig.getString("metric-sender-factory") - val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config) + val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, senderFactoryFQCN, config) val subscriptions = statsDConfig.getConfig("subscriptions") subscriptions.firstLevelKeys.map { subscriptionCategory ⇒ @@ -56,15 +54,13 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { } } - def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, keyGeneratorFQCN: String, config: Config): ActorRef = { + def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, + keyGeneratorFQCN: String, senderFactoryFQCN: String, config: Config): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") val keyGenerator = system.dynamicAccess.createInstanceFor[MetricKeyGenerator](keyGeneratorFQCN, (classOf[Config], config) :: Nil).get + val senderFactory = system.dynamicAccess.getObjectFor[StatsDMetricsSenderFactory](senderFactoryFQCN).get - val metricsSender = system.actorOf(StatsDMetricsSender.props( - statsDConfig.getString("hostname"), - statsDConfig.getInt("port"), - maxPacketSizeInBytes, - keyGenerator), "statsd-metrics-sender") + val metricsSender = system.actorOf(senderFactory.props(statsDConfig, keyGenerator), "statsd-metrics-sender") if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics sender. diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala new file mode 100644 index 00000000..1f603aea --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala @@ -0,0 +1,24 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.Props +import com.typesafe.config.Config + +trait StatsDMetricsSenderFactory { + def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props +} diff --git a/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala new file mode 100644 index 00000000..9e856eda --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala @@ -0,0 +1,76 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import java.net.InetSocketAddress +import java.text.{ DecimalFormat, DecimalFormatSymbols } +import java.util.Locale +import akka.actor.{ Actor, ActorRef, ActorSystem } +import akka.io.{ IO, Udp } +import akka.util.ByteString +import com.typesafe.config.Config +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot + +/** + * Base class for different StatsD senders utilizing UDP protocol. It implies use of one statsd server. + * @param statsDConfig Config to read settings specific to this sender + * @param metricKeyGenerator Key generator for all metrics sent by this sender + */ +abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) + extends Actor with UdpExtensionProvider { + + import context.system + + val statsDHost = statsDConfig.getString("hostname") + val statsDPort = statsDConfig.getInt("port") + + val symbols = DecimalFormatSymbols.getInstance(Locale.US) + symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. + + // Absurdly high number of decimal digits, let the other end lose precision if it needs to. + val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) + + udpExtension ! Udp.SimpleSender + + lazy val socketAddress = new InetSocketAddress(statsDHost, statsDPort) + + def receive = { + case Udp.SimpleSenderReady ⇒ + context.become(ready(sender)) + } + + def ready(udpSender: ActorRef): Receive = { + case tick: TickMetricSnapshot ⇒ + writeMetricsToRemote(tick, + (data: String) ⇒ udpSender ! Udp.Send(ByteString(data), socketAddress)) + } + + def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit + + def encodeStatsDTimer(level: Long, count: Long): String = { + val samplingRate: Double = 1D / count + level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") + } + + def encodeStatsDCounter(count: Long): String = count.toString + "|c" + +} + +trait UdpExtensionProvider { + def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) +} + diff --git a/kamon-statsd/src/test/scala/kamon/statsd/BatchStatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/BatchStatsDMetricSenderSpec.scala new file mode 100644 index 00000000..8474a5ce --- /dev/null +++ b/kamon-statsd/src/test/scala/kamon/statsd/BatchStatsDMetricSenderSpec.scala @@ -0,0 +1,120 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.testkit.TestProbe +import akka.actor.{ ActorRef, Props, ActorSystem } +import akka.io.Udp +import com.typesafe.config.ConfigFactory + +class BatchStatsDMetricSenderSpec extends UDPBasedStatsDMetricSenderSpec("batch-statsd-metric-sender-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | statsd { + | hostname = "127.0.0.1" + | port = 0 + | simple-metric-key-generator { + | application = kamon + | hostname-override = kamon-host + | include-hostname = true + | metric-name-normalization-strategy = normalize + | } + | batch-metric-sender.max-packet-size = 1024 + | } + |} + | + """.stripMargin) + + val testMaxPacketSize = statsDConfig.getBytes("batch-metric-sender.max-packet-size") + + trait BatchSenderFixture extends UdpListenerFixture { + override def newSender(udpProbe: TestProbe) = + Props(new BatchStatsDMetricsSender(statsDConfig, metricKeyGenerator) { + override def udpExtension(implicit system: ActorSystem): ActorRef = udpProbe.ref + }) + } + + "the BatchStatsDMetricSender" should { + "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new BatchSenderFixture { + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) + + expectUDPPacket(s"$testMetricKey:10|ms", udp) + } + + "render several measurements of the same key under a single (key + multiple measurements) packet" in new BatchSenderFixture { + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(11L) + testRecorder.metricOne.record(12L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) + + expectUDPPacket(s"$testMetricKey:10|ms:11|ms:12|ms", udp) + } + + "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new BatchSenderFixture { + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(10L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) + + expectUDPPacket(s"$testMetricKey:10|ms|@0.5", udp) + } + + "flush the packet when the max-packet-size is reached" in new BatchSenderFixture { + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + + var bytes = testMetricKey.length + var level = 0 + while (bytes <= testMaxPacketSize) { + level += 1 + testRecorder.metricOne.record(level) + bytes += s":$level|ms".length + } + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) + udp.expectMsgType[Udp.Send] // let the first flush pass + expectUDPPacket(s"$testMetricKey:$level|ms", udp) + } + + "render multiple keys in the same packet using newline as separator" in new BatchSenderFixture { + val testMetricKey1 = buildMetricKey(testEntity, "metric-one") + val testMetricKey2 = buildMetricKey(testEntity, "metric-two") + val testRecorder = buildRecorder("user/kamon") + + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(11L) + + testRecorder.metricTwo.record(20L) + testRecorder.metricTwo.record(21L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) + expectUDPPacket(s"$testMetricKey1:10|ms|@0.5:11|ms\n$testMetricKey2:20|ms:21|ms", udp) + } + } +} diff --git a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala index 2e03a59d..84c0b6aa 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala @@ -1,3 +1,19 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.statsd import com.typesafe.config.ConfigFactory @@ -17,7 +33,7 @@ class SimpleMetricKeyGeneratorSpec extends WordSpec with Matchers { |} """.stripMargin) - "the StatsDMetricSender" should { + "the SimpleMetricKeyGenerator" should { "generate metric names that follow the application.host.entity.entity-name.metric-name pattern by default" in { implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(defaultConfiguration) { override def hostName: String = "localhost" diff --git a/kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala new file mode 100644 index 00000000..2e909d8b --- /dev/null +++ b/kamon-statsd/src/test/scala/kamon/statsd/SimpleStatsDMetricsSenderSpec.scala @@ -0,0 +1,76 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.{ ActorSystem, Props, ActorRef } +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory + +class SimpleStatsDMetricsSenderSpec extends UDPBasedStatsDMetricSenderSpec("simple-statsd-metric-sender-spec") { + + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | statsd { + | hostname = "127.0.0.1" + | port = 0 + | simple-metric-key-generator { + | application = kamon + | hostname-override = kamon-host + | include-hostname = true + | metric-name-normalization-strategy = normalize + | } + | } + |} + | + """.stripMargin) + + trait SimpleSenderFixture extends UdpListenerFixture { + override def newSender(udpProbe: TestProbe) = + Props(new SimpleStatsDMetricsSender(statsDConfig, metricKeyGenerator) { + override def udpExtension(implicit system: ActorSystem): ActorRef = udpProbe.ref + }) + } + + "the SimpleStatsDMetricSender" should { + "flush the metrics data for each unique value it receives" in new SimpleSenderFixture { + + val testMetricKey1 = buildMetricKey(testEntity, "metric-one") + val testMetricKey2 = buildMetricKey(testEntity, "metric-two") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(30L) + testRecorder.metricTwo.record(20L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) + expectUDPPacket(s"$testMetricKey1:10|ms", udp) + expectUDPPacket(s"$testMetricKey1:30|ms", udp) + expectUDPPacket(s"$testMetricKey2:20|ms", udp) + } + + "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new SimpleSenderFixture { + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(10L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) + expectUDPPacket(s"$testMetricKey:10|ms|@0.5", udp) + } + } +} diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala deleted file mode 100644 index 51e7cf19..00000000 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.statsd - -import akka.testkit.{ TestKitBase, TestProbe } -import akka.actor.{ ActorRef, Props, ActorSystem } -import kamon.Kamon -import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement } -import kamon.testkit.BaseKamonSpec -import kamon.util.MilliTimestamp -import org.scalatest.{ Matchers, WordSpecLike } -import kamon.metric._ -import akka.io.Udp -import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot -import java.net.InetSocketAddress -import com.typesafe.config.ConfigFactory - -class StatsDMetricSenderSpec extends BaseKamonSpec("statsd-metric-sender-spec") { - override lazy val config = - ConfigFactory.parseString( - """ - |kamon { - | statsd.simple-metric-key-generator { - | application = kamon - | hostname-override = kamon-host - | include-hostname = true - | metric-name-normalization-strategy = normalize - | } - |} - | - """.stripMargin) - - implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(system.settings.config) { - override def hostName: String = "localhost_local" - } - - "the StatsDMetricSender" should { - "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture { - val testMetricKey = buildMetricKey(testEntity, "metric-one") - val testRecorder = buildRecorder("user/kamon") - testRecorder.metricOne.record(10L) - - val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - - data.utf8String should be(s"$testMetricKey:10|ms") - } - - "render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture { - val testMetricKey = buildMetricKey(testEntity, "metric-one") - val testRecorder = buildRecorder("user/kamon") - testRecorder.metricOne.record(10L) - testRecorder.metricOne.record(11L) - testRecorder.metricOne.record(12L) - - val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - - data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms") - } - - "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture { - val testMetricKey = buildMetricKey(testEntity, "metric-one") - val testRecorder = buildRecorder("user/kamon") - testRecorder.metricOne.record(10L) - testRecorder.metricOne.record(10L) - - val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - - data.utf8String should be(s"$testMetricKey:10|ms|@0.5") - } - - "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { - val testMetricKey = buildMetricKey(testEntity, "metric-one") - val testRecorder = buildRecorder("user/kamon") - - var bytes = testMetricKey.length - var level = 0 - while (bytes <= testMaxPacketSize) { - level += 1 - testRecorder.metricOne.record(level) - bytes += s":$level|ms".length - } - - val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) - udp.expectMsgType[Udp.Send] // let the first flush pass - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - - data.utf8String should be(s"$testMetricKey:$level|ms") - } - - "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { - val testMetricKey1 = buildMetricKey(testEntity, "metric-one") - val testMetricKey2 = buildMetricKey(testEntity, "metric-two") - val testRecorder = buildRecorder("user/kamon") - - testRecorder.metricOne.record(10L) - testRecorder.metricOne.record(10L) - testRecorder.metricOne.record(11L) - - testRecorder.metricTwo.record(20L) - testRecorder.metricTwo.record(21L) - - val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) - val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - - data.utf8String should be(s"$testMetricKey1:10|ms|@0.5:11|ms\n$testMetricKey2:20|ms:21|ms") - } - } - - trait UdpListenerFixture { - val testMaxPacketSize = system.settings.config.getBytes("kamon.statsd.max-packet-size") - val testEntity = Entity("user/kamon", "test") - - def buildMetricKey(entity: Entity, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { - val metricKey = HistogramKey(metricName, UnitOfMeasurement.Unknown) - metricKeyGenerator.generateKey(entity, metricKey) - } - - def buildRecorder(name: String): TestEntityRecorder = { - Kamon.metrics.entity(TestEntityRecorder, name) - } - - def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = { - val udp = TestProbe() - val metricsSender = system.actorOf(Props(new StatsDMetricsSender("127.0.0.1", 0, testMaxPacketSize, metricKeyGenerator) { - override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref - })) - - // Setup the SimpleSender - udp.expectMsgType[Udp.SimpleSender] - udp.reply(Udp.SimpleSenderReady) - - val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics) - metricsSender ! fakeSnapshot - udp - } - } -} - -class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { - val metricOne = histogram("metric-one") - val metricTwo = histogram("metric-two") -} - -object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] { - def category: String = "test" - def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory) -} diff --git a/kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala new file mode 100644 index 00000000..d004adaa --- /dev/null +++ b/kamon-statsd/src/test/scala/kamon/statsd/UDPBasedStatsDMetricSenderSpec.scala @@ -0,0 +1,81 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.Props +import akka.io.Udp +import akka.testkit.TestProbe +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement } +import kamon.metric._ +import kamon.testkit.BaseKamonSpec +import kamon.util.MilliTimestamp + +abstract class UDPBasedStatsDMetricSenderSpec(actorSystemName: String) extends BaseKamonSpec(actorSystemName) { + + implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(system.settings.config) { + override def hostName: String = "localhost_local" + } + + val statsDConfig = config.getConfig("kamon.statsd") + + trait UdpListenerFixture { + val testEntity = Entity("user/kamon", "test") + + def buildMetricKey(entity: Entity, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { + val metricKey = HistogramKey(metricName, UnitOfMeasurement.Unknown) + metricKeyGenerator.generateKey(entity, metricKey) + } + + def buildRecorder(name: String): TestEntityRecorder = + Kamon.metrics.entity(TestEntityRecorder, name) + + def newSender(udpProbe: TestProbe): Props + + def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = { + val udp = TestProbe() + val metricsSender = system.actorOf(newSender(udp)) + + // Setup the SimpleSender + udp.expectMsgType[Udp.SimpleSender] + udp.reply(Udp.SimpleSenderReady) + + val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics) + metricsSender ! fakeSnapshot + udp + } + + def expectUDPPacket(expected: String, udp: TestProbe): Unit = { + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + data.utf8String should be(expected) + } + } + + class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val metricOne = histogram("metric-one") + val metricTwo = histogram("metric-two") + } + + object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] { + def category: String = "test" + + def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory) + } + +} + |