From 4ef79b01d0b9d6cb4628886a8a2a29048d997b13 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 21 May 2014 09:09:34 -0300 Subject: + datadog: include identification tags and clean up all irrelevant code from StatsD --- kamon-datadog/src/main/resources/reference.conf | 13 ++-- .../src/main/scala/kamon/datadog/Datadog.scala | 11 +--- .../scala/kamon/datadog/DatadogMetricsSender.scala | 77 +++++++--------------- 3 files changed, 30 insertions(+), 71 deletions(-) (limited to 'kamon-datadog/src/main') diff --git a/kamon-datadog/src/main/resources/reference.conf b/kamon-datadog/src/main/resources/reference.conf index 5aaa87fc..657d3385 100644 --- a/kamon-datadog/src/main/resources/reference.conf +++ b/kamon-datadog/src/main/resources/reference.conf @@ -13,9 +13,6 @@ kamon { # kamon.metrics.tick-interval setting. flush-interval = 1 second - # Max packet size for UDP metrics data sent to Datadog. - max-packet-size = 1024 bytes - # Subscription patterns used to select which metrics will be pushed to Datadog. Note that first, metrics # collection for your desired entities must be activated under the kamon.metrics.filters settings. includes { @@ -23,11 +20,9 @@ kamon { trace = [ "*" ] } - simple-metric-key-generator { - # Application prefix for all metrics pushed to Datadog. The default namespacing scheme for metrics follows - # this pattern: - # application.host.entity.entity-name.metric-name - application = "kamon" - } + # Application prefix for all metrics pushed to Datadog. The default namespacing scheme for metrics follows + # this pattern: + # application.entity-name.metric-name + application-name = "kamon" } } diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala index b492b9fd..c6981fd7 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala @@ -44,7 +44,6 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension { val datadogHost = new InetSocketAddress(datadogConfig.getString("hostname"), datadogConfig.getInt("port")) val flushInterval = datadogConfig.getDuration("flush-interval", MILLISECONDS) - val maxPacketSizeInBytes = datadogConfig.getBytes("max-packet-size") val tickInterval = system.settings.config.getDuration("kamon.metrics.tick-interval", MILLISECONDS) val datadogMetricsListener = buildMetricsListener(tickInterval, flushInterval) @@ -64,7 +63,7 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension { def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { assert(flushInterval >= tickInterval, "Datadog flush-interval needs to be equal or greater to the tick-interval") - val metricsTranslator = system.actorOf(DatadogMetricsSender.props(datadogHost, maxPacketSizeInBytes), "datadog-metrics-sender") + val metricsTranslator = system.actorOf(DatadogMetricsSender.props(datadogHost), "datadog-metrics-sender") if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics sender. metricsTranslator @@ -74,11 +73,3 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension { } } -class SimpleMetricKeyGenerator(config: Config) extends Datadog.MetricKeyGenerator { - val application = config.getString("kamon.datadog.simple-metric-key-generator.application") - val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) - - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = - s"${application}.${localhostName}.${groupIdentity.category.name}.${groupIdentity.name}.${metricIdentity.name}" -} - diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala index df072552..ec603b39 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala @@ -24,11 +24,12 @@ import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics.MetricSnapshot.Measurement import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } import java.text.DecimalFormat +import kamon.metrics.{ MetricIdentity, MetricGroupIdentity } -class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { +class DatadogMetricsSender(remote: InetSocketAddress) extends Actor with UdpExtensionProvider { import context.system - val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config) + val appName = context.system.settings.config.getString("kamon.datadog.application-name") val samplingRateFormat = new DecimalFormat() samplingRateFormat.setMaximumFractionDigits(128) // Absurdly high, let the other end loss precision if it needs to. @@ -44,38 +45,34 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) + def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) for ( (groupIdentity, groupSnapshot) ← tick.metrics; (metricIdentity, metricSnapshot) ← groupSnapshot.metrics ) { - val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity) - for (measurement ← metricSnapshot.measurements) { - val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType) - dataBuilder.appendMeasurement(key, measurementData) + val measurementData = formatMeasurement(groupIdentity, metricIdentity, measurement, metricSnapshot.instrumentType) + flushToUDP(measurementData) } } - - dataBuilder.flush() } - def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): String = { + def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurement: Measurement, + instrumentType: InstrumentType): String = { - def processTags(tags: Seq[String]): String = { - if (tags.isEmpty) "" else { - tags.foldLeft(new StringBuilder("|#")) { - (sb, s) ⇒ - if (sb.length > 2) sb ++= "," - sb ++= s - }.toString() - } - } + StringBuilder.newBuilder + .append(buildMetricName(groupIdentity, metricIdentity)) + .append(":") + .append(buildMeasurementData(measurement, instrumentType)) + .append(buildIdentificationTag(groupIdentity, metricIdentity)) + .result() + } - def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D, tags: Seq[String] = Nil): String = - value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "" + processTags(tags)) + def buildMeasurementData(measurement: Measurement, instrumentType: InstrumentType): String = { + def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = + value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") instrumentType match { case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) @@ -83,42 +80,18 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c") } } + + def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = + appName + "." + groupIdentity.category.name + "." + metricIdentity.name + + def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = + "|#" + groupIdentity.category.name + ":" + groupIdentity.name } object DatadogMetricsSender { - def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new DatadogMetricsSender(remote, maxPacketSize)) + def props(remote: InetSocketAddress): Props = Props(new DatadogMetricsSender(remote)) } trait UdpExtensionProvider { def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) } - -class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) { - val metricSeparator = "\n" - val measurementSeparator = ":" - - var lastKey = "" - var buffer = new StringBuilder() - - def appendMeasurement(key: String, measurementData: String): Unit = { - val data = key + measurementSeparator + measurementData - - if (fitsOnBuffer(metricSeparator + data)) { - val mSeparator = if (buffer.length > 0) metricSeparator else "" - buffer.append(mSeparator).append(data) - } else { - flushToUDP(buffer.toString()) - buffer.clear() - buffer.append(data) - } - } - - def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes - - private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) - - def flush(): Unit = { - flushToUDP(buffer.toString) - buffer.clear() - } -} -- cgit v1.2.3