diff options
Diffstat (limited to 'kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala')
-rw-r--r-- | kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala | 68 |
1 files changed, 49 insertions, 19 deletions
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala index ec603b39..de4dc140 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala @@ -16,17 +16,18 @@ package kamon.datadog -import akka.actor.{ ActorSystem, Props, ActorRef, Actor } -import akka.io.{ Udp, IO } +import akka.actor.{ActorSystem, Props, ActorRef, Actor} +import akka.io.{Udp, IO} import java.net.InetSocketAddress import akka.util.ByteString import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } +import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType} import java.text.DecimalFormat -import kamon.metrics.{ MetricIdentity, MetricGroupIdentity } +import kamon.metrics.{MetricIdentity, MetricGroupIdentity} + +class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { -class DatadogMetricsSender(remote: InetSocketAddress) extends Actor with UdpExtensionProvider { import context.system val appName = context.system.settings.config.getString("kamon.datadog.application-name") @@ -45,53 +46,82 @@ class DatadogMetricsSender(remote: InetSocketAddress) extends Actor with UdpExte } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) + val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) - for ( + for { (groupIdentity, groupSnapshot) ← tick.metrics; (metricIdentity, metricSnapshot) ← groupSnapshot.metrics - ) { + } { + + val key = buildMetricName(groupIdentity, metricIdentity) for (measurement ← metricSnapshot.measurements) { val measurementData = formatMeasurement(groupIdentity, metricIdentity, measurement, metricSnapshot.instrumentType) - flushToUDP(measurementData) + dataBuilder.appendMeasurement(key, measurementData) } } + dataBuilder.flush() } def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurement: Measurement, instrumentType: InstrumentType): String = { - StringBuilder.newBuilder - .append(buildMetricName(groupIdentity, metricIdentity)) - .append(":") - .append(buildMeasurementData(measurement, instrumentType)) + StringBuilder.newBuilder.append(buildMeasurementData(measurement, instrumentType)) .append(buildIdentificationTag(groupIdentity, metricIdentity)) .result() } def buildMeasurementData(measurement: Measurement, instrumentType: InstrumentType): String = { def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = - value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") + s"$value|$metricType${(if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")}" instrumentType match { case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) - case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g") - case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c") + case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g") + case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c") } } def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = - appName + "." + groupIdentity.category.name + "." + metricIdentity.name + s"$appName.${groupIdentity.category.name}.${metricIdentity.name}" def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = - "|#" + groupIdentity.category.name + ":" + groupIdentity.name + s"|#${groupIdentity.category.name}:${groupIdentity.name}" } object DatadogMetricsSender { - def props(remote: InetSocketAddress): Props = Props(new DatadogMetricsSender(remote)) + def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new DatadogMetricsSender(remote, maxPacketSize)) } trait UdpExtensionProvider { def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) } + +class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) { + val metricSeparator = "\n" + val measurementSeparator = ":" + var lastKey = "" + var buffer = new StringBuilder() + + def appendMeasurement(key: String, measurementData: String): Unit = { + val data = key + measurementSeparator + measurementData + + if (fitsOnBuffer(metricSeparator + data)) { + val mSeparator = if (buffer.length > 0) metricSeparator else "" + buffer.append(mSeparator).append(data) + } else { + flushToUDP(buffer.toString()) + buffer.clear() + buffer.append(data) + } + } + + def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes + + private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) + + def flush(): Unit = { + flushToUDP(buffer.toString) + buffer.clear() + } +} |