aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-04-07 23:57:57 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-04-07 23:57:57 -0300
commitded044c87eeb5313ec4067dc660ea92cccb4b098 (patch)
tree8bcb2fff795ec51401b7fbee02b4dd03d7ef9f6b
parent1affa4763f101a6fd16b2f24ac45fcc20c52af60 (diff)
downloadKamon-ded044c87eeb5313ec4067dc660ea92cccb4b098.tar.gz
Kamon-ded044c87eeb5313ec4067dc660ea92cccb4b098.tar.bz2
Kamon-ded044c87eeb5313ec4067dc660ea92cccb4b098.zip
! statsd: take advantange of the multi-measurement format
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala82
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala38
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala60
-rw-r--r--kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala144
4 files changed, 212 insertions, 112 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index 63b1a53a..42eb57d0 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -16,20 +16,23 @@
package kamon.statsd
-import akka.actor.{ Props, ActorRef, Actor }
+import akka.actor.{ActorSystem, Props, ActorRef, Actor}
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
import kamon.Kamon
-import scala.annotation.tailrec
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metrics.MetricSnapshot.Measurement
+import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType}
-class StatsDMetricsSender extends Actor {
+class StatsDMetricsSender extends Actor with UdpExtensionProvider {
import context.system
val statsDExtension = Kamon(StatsD)
val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port)
+ val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config)
- IO(Udp) ! Udp.SimpleSender
+ udpExtension ! Udp.SimpleSender
def receive = {
case Udp.SimpleSenderReady ⇒
@@ -37,27 +40,74 @@ class StatsDMetricsSender extends Actor {
}
def ready(udpSender: ActorRef): Receive = {
- case StatsD.MetricBatch(metrics) ⇒ sendMetricsToRemote(metrics, ByteString.empty, udpSender)
+ case tick: TickMetricSnapshot => writeMetricsToRemote(tick, udpSender)
}
- @tailrec final def sendMetricsToRemote(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = {
- def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote)
+ def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
+ val dataBuilder = new MetricDataPacketBuilder(statsDExtension.maxPacketSize, udpSender, remote)
- if (metrics.isEmpty)
- flushToRemote(buffer, udpSender)
- else {
- val headMetricData = metrics.head.toByteString(includeTrailingNewline = true)
+ for((groupIdentity, groupSnapshot) <- tick.metrics;
+ (metricIdentity, metricSnapshot) <- groupSnapshot.metrics) {
- if (buffer.size + headMetricData.size > statsDExtension.maxPacketSize) {
- flushToRemote(buffer, udpSender)
- sendMetricsToRemote(metrics.tail, headMetricData, udpSender)
- } else {
- sendMetricsToRemote(metrics.tail, buffer ++ headMetricData, udpSender)
+ val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity))
+
+ for(measurement <- metricSnapshot.measurements) {
+ val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType)
+ dataBuilder.appendMeasurement(key, measurementData)
}
}
+
+ dataBuilder.flush()
+ }
+
+ def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): ByteString = {
+ def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): ByteString =
+ ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRate else ""))
+
+ instrumentType match {
+ case Histogram => statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
+ case Gauge => statsDMetricFormat(measurement.value.toString, "g")
+ case Counter => ByteString.empty // TODO: Need to decide how to report counters, when we have them!
+ }
}
}
object StatsDMetricsSender {
def props: Props = Props[StatsDMetricsSender]
+}
+
+trait UdpExtensionProvider {
+ def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
+}
+
+class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: InetSocketAddress) {
+ val metricSeparator = ByteString("\n")
+ val measurementSeparator = ByteString(":")
+
+ var lastKey= ByteString.empty
+ var buffer = ByteString.empty
+
+ def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = {
+ val appendData =
+ if(key == lastKey)
+ measurementSeparator ++ measurementData
+ else {
+ lastKey = key
+ val keySeparator = if(buffer.length == 0) ByteString.empty else metricSeparator
+ keySeparator ++ key ++ measurementSeparator ++ measurementData
+ }
+
+ if(buffer.length + appendData.length >= maxPacketSize) {
+ flushToUDP(buffer)
+ buffer = appendData
+ } else
+ buffer = buffer ++ appendData
+ }
+
+ private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote)
+
+ def flush(): Unit = {
+ flushToUDP(buffer)
+ buffer = ByteString.empty
+ }
} \ No newline at end of file
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
index 167e993e..472824e9 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
@@ -21,7 +21,6 @@ import kamon.Kamon
import kamon.metrics._
import scala.concurrent.duration._
import scala.collection.JavaConverters._
-import akka.util.ByteString
import com.typesafe.config.Config
import java.lang.management.ManagementFactory
@@ -32,39 +31,6 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
trait MetricKeyGenerator {
def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String
}
-
- sealed trait Metric {
- def key: String
- def value: Double
- def suffix: String
- def samplingRate: Double
-
- /*
- * Creates the stats string to send to StatsD.
- * For counters, it provides something like {@code key:value|c}.
- * For timing, it provides something like {@code key:millis|ms}.
- * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate}
- */
- def toByteString(includeTrailingNewline: Boolean = true): ByteString =
- if (samplingRate >= 1D)
- ByteString(s"$key:$value|$suffix")
- else
- ByteString(s"$key:$value|$suffix|@$samplingRate")
- }
-
- case class Counter(key: String, value: Double = 1D, samplingRate: Double = 1.0) extends Metric {
- val suffix: String = "c"
- }
-
- case class Timing(key: String, value: Double, samplingRate: Double = 1.0) extends Metric {
- val suffix: String = "ms"
- }
-
- case class Gauge(key: String, value: Double, samplingRate: Double = 1.0) extends Metric {
- val suffix: String = "g"
- }
-
- case class MetricBatch(metrics: Iterable[Metric])
}
class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
@@ -86,9 +52,9 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
- val metricsTranslator = system.actorOf(StatsDMetricTranslator.props, "statsd-metrics-translator")
+ val metricsTranslator = system.actorOf(StatsDMetricsSender.props, "statsd-metrics-sender")
if (flushInterval == tickInterval) {
- // No need to buffer the metrics, let's go straight to the metrics translator.
+ // No need to buffer the metrics, let's go straight to the metrics sender.
metricsTranslator
} else {
system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer")
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
deleted file mode 100644
index 2ef41c6d..00000000
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-package kamon.statsd
-
-import akka.actor.{ Props, Actor, ActorRef }
-import kamon.metrics._
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.ActorMetrics.ActorMetricSnapshot
-
-class StatsDMetricTranslator extends Actor {
- val config = context.system.settings.config
-
- val metricKeyGenerator = new SimpleMetricKeyGenerator(config)
- val metricSender = context.actorOf(StatsDMetricsSender.props, "metrics-sender")
-
- def receive = {
- case TickMetricSnapshot(from, to, metrics) ⇒
- val translatedMetrics = metrics.collect {
- case (am @ ActorMetrics(_), snapshot: ActorMetricSnapshot) ⇒ transformActorMetric(am, snapshot)
- }
-
- metricSender ! StatsD.MetricBatch(translatedMetrics.flatten)
- }
-
- def transformActorMetric(actorIdentity: ActorMetrics, snapshot: ActorMetricSnapshot): Vector[StatsD.Metric] = {
- val timeInMailboxKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.TimeInMailbox)
- val processingTimeKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.ProcessingTime)
-
- roll(timeInMailboxKey, snapshot.timeInMailbox, StatsD.Timing) ++ roll(processingTimeKey, snapshot.processingTime, StatsD.Timing)
- }
-
- def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Double, Double) ⇒ StatsD.Metric): Vector[StatsD.Metric] = {
- val builder = Vector.newBuilder[StatsD.Metric]
- for (measurement ← snapshot.measurements) {
- val samplingRate = 1D / measurement.count
- val scaledValue = Scale.convert(snapshot.scale, Scale.Milli, measurement.value)
- builder += metricBuilder.apply(key, scaledValue, samplingRate)
- }
-
- builder.result()
- }
-
-}
-
-object StatsDMetricTranslator {
- def props: Props = Props[StatsDMetricTranslator]
-}
diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
new file mode 100644
index 00000000..caeaee28
--- /dev/null
+++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala
@@ -0,0 +1,144 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import akka.testkit.{TestKitBase, TestProbe}
+import akka.actor.{ActorRef, Props, ActorSystem}
+import org.scalatest.{Matchers, WordSpecLike}
+import kamon.metrics._
+import akka.io.Udp
+import org.HdrHistogram.HdrRecorder
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import java.lang.management.ManagementFactory
+import com.typesafe.config.ConfigFactory
+
+class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers {
+
+ implicit lazy val system: ActorSystem = ActorSystem("statsd-metric-sender-spec", ConfigFactory.parseString(
+ """
+ |kamon.statsd {
+ | max-packet-size = 256
+ |}
+ """.stripMargin
+ ))
+
+ "the StatsDMetricSender" should {
+ "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture {
+ val testMetricName = "test-metric"
+ val testMetricKey = buildMetricKey(testMetricName)
+ val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ testRecorder.record(10L)
+
+ val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
+
+ data.utf8String should be(s"$testMetricKey:10|ms")
+ }
+
+ "render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture {
+ val testMetricName = "test-metric"
+ val testMetricKey = buildMetricKey(testMetricName)
+ val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ testRecorder.record(10L)
+ testRecorder.record(11L)
+ testRecorder.record(12L)
+
+ val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
+
+ data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms")
+ }
+
+ "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture {
+ val testMetricName = "test-metric"
+ val testMetricKey = buildMetricKey(testMetricName)
+ val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ testRecorder.record(10L)
+ testRecorder.record(10L)
+
+ val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
+
+ data.utf8String should be(s"$testMetricKey:10|ms|@0.5")
+ }
+
+
+ "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture {
+ val firstTestMetricName = "first-test-metric"
+ val firstTestMetricKey = buildMetricKey(firstTestMetricName)
+ val secondTestMetricName = "second-test-metric"
+ val secondTestMetricKey = buildMetricKey(secondTestMetricName)
+ val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+
+ firstTestRecorder.record(10L)
+ firstTestRecorder.record(10L)
+ firstTestRecorder.record(11L)
+
+ secondTestRecorder.record(20L)
+ secondTestRecorder.record(21L)
+
+ val udp = setup(Map(
+ firstTestMetricName -> firstTestRecorder.collect(),
+ secondTestMetricName -> secondTestRecorder.collect()))
+ val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
+
+ data.utf8String should be(s"$firstTestMetricKey:10|ms|@0.5:11|ms\n$secondTestMetricKey:20|ms:21|ms")
+ }
+ }
+
+
+ trait UdpListenerFixture {
+ val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
+
+ def buildMetricKey(metricName: String): String = s"Kamon.$localhostName.test-metric-category.test-group.$metricName"
+
+ def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = {
+ val udp = TestProbe()
+ val metricsSender = system.actorOf(Props(new StatsDMetricsSender {
+ override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref
+ }))
+
+ // Setup the SimpleSender
+ udp.expectMsgType[Udp.SimpleSender]
+ udp.reply(Udp.SimpleSenderReady)
+
+
+ val testGroupIdentity = new MetricGroupIdentity {
+ val name: String = "test-group"
+ val category: MetricGroupCategory = new MetricGroupCategory {
+ val name: String = "test-metric-category"
+ }
+ }
+
+ val testMetrics = for((metricName, snapshot) <- metrics) yield {
+ val testMetricIdentity = new MetricIdentity {
+ val name: String = metricName
+ val tag: String = ""
+ }
+
+ (testMetricIdentity, snapshot)
+ }
+
+ metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot {
+ val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap
+ }))
+
+ udp
+ }
+ }
+}