diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-06-19 00:47:07 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-06-19 00:47:07 -0300 |
commit | 1150d528eb5231993e542c086e2df90cf760d8a7 (patch) | |
tree | e1c86f9a115872073c46890b5ef32dbe1bd0bd3d /kamon-datadog/src | |
parent | 35b8a715d78ddd194d410ba0cc2119b5a1caa924 (diff) | |
parent | 4abab8df49d1bc5d9a051a8b54852e0712be7b74 (diff) | |
download | Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.gz Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.tar.bz2 Kamon-1150d528eb5231993e542c086e2df90cf760d8a7.zip |
Merge branch 'master' into release-0.2
Conflicts:
kamon-play/src/test/scala/kamon/play/WSInstrumentationSpec.scala
kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
project/Dependencies.scala
project/Projects.scala
version.sbt
Diffstat (limited to 'kamon-datadog/src')
4 files changed, 391 insertions, 0 deletions
diff --git a/kamon-datadog/src/main/resources/reference.conf b/kamon-datadog/src/main/resources/reference.conf new file mode 100644 index 00000000..231eaf7d --- /dev/null +++ b/kamon-datadog/src/main/resources/reference.conf @@ -0,0 +1,32 @@ +# ==================================== # +# Kamon-Datadog Reference Configuration # +# ==================================== # + +kamon { + datadog { + # Hostname and port in which your Datadog is running. Remember that Datadog packets are sent using UDP and + # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere. + hostname = "127.0.0.1" + port = 8125 + + # Interval between metrics data flushes to Datadog. It's value must be equal or greater than the + # kamon.metrics.tick-interval setting. + flush-interval = 1 second + + # Max packet size for UDP metrics data sent to Datadog. + max-packet-size = 1024 bytes + + # Subscription patterns used to select which metrics will be pushed to Datadog. Note that first, metrics + # collection for your desired entities must be activated under the kamon.metrics.filters settings. + includes { + actor = [ "*" ] + trace = [ "*" ] + dispatcher = [ "*" ] + } + + # Application prefix for all metrics pushed to Datadog. The default namespacing scheme for metrics follows + # this pattern: + # application.entity-name.metric-name + application-name = "kamon" + } +} diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala new file mode 100644 index 00000000..15d5d3fe --- /dev/null +++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala @@ -0,0 +1,82 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.datadog + +import akka.actor._ +import kamon.Kamon +import kamon.metrics._ +import scala.concurrent.duration._ +import scala.collection.JavaConverters._ +import com.typesafe.config.Config +import java.lang.management.ManagementFactory +import akka.event.Logging +import java.net.InetSocketAddress +import java.util.concurrent.TimeUnit.MILLISECONDS + +object Datadog extends ExtensionId[DatadogExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = Datadog + override def createExtension(system: ExtendedActorSystem): DatadogExtension = new DatadogExtension(system) + + trait MetricKeyGenerator { + def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String + } +} + +class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val log = Logging(system, classOf[DatadogExtension]) + log.info("Starting the Kamon(Datadog) extension") + + private val datadogConfig = system.settings.config.getConfig("kamon.datadog") + + val datadogHost = new InetSocketAddress(datadogConfig.getString("hostname"), datadogConfig.getInt("port")) + val flushInterval = datadogConfig.getDuration("flush-interval", MILLISECONDS) + val maxPacketSizeInBytes = datadogConfig.getBytes("max-packet-size") + val tickInterval = system.settings.config.getDuration("kamon.metrics.tick-interval", MILLISECONDS) + + val datadogMetricsListener = buildMetricsListener(tickInterval, flushInterval) + + // Subscribe to Actors + val includedActors = datadogConfig.getStringList("includes.actor").asScala + for (actorPathPattern ← includedActors) { + Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, datadogMetricsListener, permanently = true) + } + + // Subscribe to Traces + val includedTraces = datadogConfig.getStringList("includes.trace").asScala + for (tracePathPattern ← includedTraces) { + Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, datadogMetricsListener, permanently = true) + } + + // Subscribe to Dispatchers + val includedDispatchers = datadogConfig.getStringList("includes.dispatcher").asScala + for (dispatcherPathPattern ← includedDispatchers) { + Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, datadogMetricsListener, permanently = true) + } + + def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { + assert(flushInterval >= tickInterval, "Datadog flush-interval needs to be equal or greater to the tick-interval") + + val metricsTranslator = system.actorOf(DatadogMetricsSender.props(datadogHost, maxPacketSizeInBytes), "datadog-metrics-sender") + if (flushInterval == tickInterval) { + // No need to buffer the metrics, let's go straight to the metrics sender. + metricsTranslator + } else { + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "datadog-metrics-buffer") + } + } +} + diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala new file mode 100644 index 00000000..028e9608 --- /dev/null +++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala @@ -0,0 +1,131 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.datadog + +import akka.actor.{ ActorSystem, Props, ActorRef, Actor } +import akka.io.{ Udp, IO } +import java.net.InetSocketAddress +import akka.util.ByteString +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.MetricSnapshot.Measurement +import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } +import java.text.{ DecimalFormatSymbols, DecimalFormat } +import kamon.metrics.{ MetricIdentity, MetricGroupIdentity } +import java.util.Locale + +class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { + + import context.system + + val appName = context.system.settings.config.getString("kamon.datadog.application-name") + val symbols = DecimalFormatSymbols.getInstance(Locale.US) + symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. + + // Absurdly high number of decimal digits, let the other end lose precision if it needs to. + val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) + + udpExtension ! Udp.SimpleSender + + def receive = { + case Udp.SimpleSenderReady ⇒ + context.become(ready(sender)) + } + + def ready(udpSender: ActorRef): Receive = { + case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender) + } + + def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { + val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) + + for { + (groupIdentity, groupSnapshot) ← tick.metrics + (metricIdentity, metricSnapshot) ← groupSnapshot.metrics + } { + + val key = buildMetricName(groupIdentity, metricIdentity) + + for (measurement ← metricSnapshot.measurements) { + val measurementData = formatMeasurement(groupIdentity, metricIdentity, measurement, metricSnapshot.instrumentType) + dataBuilder.appendMeasurement(key, measurementData) + } + } + dataBuilder.flush() + } + + def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurement: Measurement, + instrumentType: InstrumentType): String = { + + StringBuilder.newBuilder.append(buildMeasurementData(measurement, instrumentType)) + .append(buildIdentificationTag(groupIdentity, metricIdentity)) + .result() + } + + def buildMeasurementData(measurement: Measurement, instrumentType: InstrumentType): String = { + def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = + s"$value|$metricType${(if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")}" + + instrumentType match { + case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) + case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g") + case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c") + } + } + + def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = + s"$appName.${groupIdentity.category.name}.${metricIdentity.name}" + + def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = + s"|#${groupIdentity.category.name}:${groupIdentity.name}" +} + +object DatadogMetricsSender { + def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new DatadogMetricsSender(remote, maxPacketSize)) +} + +trait UdpExtensionProvider { + def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) +} + +class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) { + val metricSeparator = "\n" + val measurementSeparator = ":" + var lastKey = "" + var buffer = new StringBuilder() + + def appendMeasurement(key: String, measurementData: String): Unit = { + val data = key + measurementSeparator + measurementData + + if (fitsOnBuffer(metricSeparator + data)) { + val mSeparator = if (buffer.length > 0) metricSeparator else "" + buffer.append(mSeparator).append(data) + } else { + flushToUDP(buffer.toString()) + buffer.clear() + buffer.append(data) + } + } + + def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes + + private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) + + def flush(): Unit = { + flushToUDP(buffer.toString) + buffer.clear() + } +} diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala new file mode 100644 index 00000000..6a7191a1 --- /dev/null +++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala @@ -0,0 +1,146 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.datadog + +import akka.testkit.{TestKitBase, TestProbe} +import akka.actor.{Props, ActorRef, ActorSystem} +import kamon.metrics.instruments.CounterRecorder +import org.scalatest.{Matchers, WordSpecLike} +import kamon.metrics._ +import akka.io.Udp +import org.HdrHistogram.HdrRecorder +import kamon.metrics.Subscriptions.TickMetricSnapshot +import java.lang.management.ManagementFactory +import java.net.InetSocketAddress +import com.typesafe.config.ConfigFactory + +class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers { + implicit lazy val system = ActorSystem("datadog-metric-sender-spec", + ConfigFactory.parseString("kamon.datadog.max-packet-size = 256 bytes")) + + "the DataDogMetricSender" should { + "send latency measurements" in new UdpListenerFixture { + val testMetricName = "processing-time" + val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + testRecorder.record(10L) + + val udp = setup(Map(testMetricName -> testRecorder.collect())) + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be(s"kamon.actor.processing-time:10|ms|#actor:user/kamon") + } + + "include the sampling rate in case of multiple measurements of the same value" in new UdpListenerFixture { + val testMetricName = "processing-time" + val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + testRecorder.record(10L) + testRecorder.record(10L) + + val udp = setup(Map(testMetricName -> testRecorder.collect())) + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon") + } + + "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { + val testMetricName = "processing-time" + val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit) + + var bytes = 0 + var level = 0 + + while (bytes <= testMaxPacketSize) { + level += 1 + testRecorder.record(level) + bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length + } + + val udp = setup(Map(testMetricName -> testRecorder.collect())) + udp.expectMsgType[Udp.Send]// let the first flush pass + + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + data.utf8String should be(s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon") + } + + "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { + val firstTestMetricName = "processing-time-1" + val secondTestMetricName = "processing-time-2" + val thirdTestMetricName = "counter" + + val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val thirdTestRecorder = CounterRecorder() + + firstTestRecorder.record(10L) + firstTestRecorder.record(10L) + + secondTestRecorder.record(21L) + + thirdTestRecorder.record(1L) + thirdTestRecorder.record(1L) + thirdTestRecorder.record(1L) + thirdTestRecorder.record(1L) + + val udp = setup(Map( + firstTestMetricName -> firstTestRecorder.collect(), + secondTestMetricName -> secondTestRecorder.collect(), + thirdTestMetricName -> thirdTestRecorder.collect())) + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon") + } + } + + trait UdpListenerFixture { + val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) + val testMaxPacketSize = system.settings.config.getBytes("kamon.datadog.max-packet-size") + + def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = { + val udp = TestProbe() + val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) { + override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref + })) + + // Setup the SimpleSender + udp.expectMsgType[Udp.SimpleSender] + udp.reply(Udp.SimpleSenderReady) + + // These names are not intented to match the real actor metrics, it's just about seeing more familiar data in tests. + val testGroupIdentity = new MetricGroupIdentity { + val name: String = "user/kamon" + val category: MetricGroupCategory = new MetricGroupCategory { + val name: String = "actor" + } + } + + val testMetrics = for ((metricName, snapshot) ← metrics) yield { + val testMetricIdentity = new MetricIdentity { + val name: String = metricName + val tag: String = "" + } + + (testMetricIdentity, snapshot) + } + + metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot { + val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap + })) + udp + } + } + +} |