aboutsummaryrefslogtreecommitdiff
path: root/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala')
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala68
1 files changed, 49 insertions, 19 deletions
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
index ec603b39..de4dc140 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
@@ -16,17 +16,18 @@
package kamon.datadog
-import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
-import akka.io.{ Udp, IO }
+import akka.actor.{ActorSystem, Props, ActorRef, Actor}
+import akka.io.{Udp, IO}
import java.net.InetSocketAddress
import akka.util.ByteString
import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics.MetricSnapshot.Measurement
-import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
+import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType}
import java.text.DecimalFormat
-import kamon.metrics.{ MetricIdentity, MetricGroupIdentity }
+import kamon.metrics.{MetricIdentity, MetricGroupIdentity}
+
+class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {
-class DatadogMetricsSender(remote: InetSocketAddress) extends Actor with UdpExtensionProvider {
import context.system
val appName = context.system.settings.config.getString("kamon.datadog.application-name")
@@ -45,53 +46,82 @@ class DatadogMetricsSender(remote: InetSocketAddress) extends Actor with UdpExte
}
def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
- def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote)
+ val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
- for (
+ for {
(groupIdentity, groupSnapshot) ← tick.metrics;
(metricIdentity, metricSnapshot) ← groupSnapshot.metrics
- ) {
+ } {
+
+ val key = buildMetricName(groupIdentity, metricIdentity)
for (measurement ← metricSnapshot.measurements) {
val measurementData = formatMeasurement(groupIdentity, metricIdentity, measurement, metricSnapshot.instrumentType)
- flushToUDP(measurementData)
+ dataBuilder.appendMeasurement(key, measurementData)
}
}
+ dataBuilder.flush()
}
def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurement: Measurement,
instrumentType: InstrumentType): String = {
- StringBuilder.newBuilder
- .append(buildMetricName(groupIdentity, metricIdentity))
- .append(":")
- .append(buildMeasurementData(measurement, instrumentType))
+ StringBuilder.newBuilder.append(buildMeasurementData(measurement, instrumentType))
.append(buildIdentificationTag(groupIdentity, metricIdentity))
.result()
}
def buildMeasurementData(measurement: Measurement, instrumentType: InstrumentType): String = {
def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String =
- value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
+ s"$value|$metricType${(if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")}"
instrumentType match {
case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
- case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g")
- case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c")
+ case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g")
+ case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c")
}
}
def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
- appName + "." + groupIdentity.category.name + "." + metricIdentity.name
+ s"$appName.${groupIdentity.category.name}.${metricIdentity.name}"
def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
- "|#" + groupIdentity.category.name + ":" + groupIdentity.name
+ s"|#${groupIdentity.category.name}:${groupIdentity.name}"
}
object DatadogMetricsSender {
- def props(remote: InetSocketAddress): Props = Props(new DatadogMetricsSender(remote))
+ def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new DatadogMetricsSender(remote, maxPacketSize))
}
trait UdpExtensionProvider {
def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
}
+
+class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) {
+ val metricSeparator = "\n"
+ val measurementSeparator = ":"
+ var lastKey = ""
+ var buffer = new StringBuilder()
+
+ def appendMeasurement(key: String, measurementData: String): Unit = {
+ val data = key + measurementSeparator + measurementData
+
+ if (fitsOnBuffer(metricSeparator + data)) {
+ val mSeparator = if (buffer.length > 0) metricSeparator else ""
+ buffer.append(mSeparator).append(data)
+ } else {
+ flushToUDP(buffer.toString())
+ buffer.clear()
+ buffer.append(data)
+ }
+ }
+
+ def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes
+
+ private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote)
+
+ def flush(): Unit = {
+ flushToUDP(buffer.toString)
+ buffer.clear()
+ }
+}