aboutsummaryrefslogtreecommitdiff
path: root/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala')
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala82
1 files changed, 66 insertions, 16 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index 63b1a53a..42eb57d0 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -16,20 +16,23 @@
package kamon.statsd
-import akka.actor.{ Props, ActorRef, Actor }
+import akka.actor.{ActorSystem, Props, ActorRef, Actor}
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
import kamon.Kamon
-import scala.annotation.tailrec
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metrics.MetricSnapshot.Measurement
+import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType}
-class StatsDMetricsSender extends Actor {
+class StatsDMetricsSender extends Actor with UdpExtensionProvider {
import context.system
val statsDExtension = Kamon(StatsD)
val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port)
+ val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config)
- IO(Udp) ! Udp.SimpleSender
+ udpExtension ! Udp.SimpleSender
def receive = {
case Udp.SimpleSenderReady ⇒
@@ -37,27 +40,74 @@ class StatsDMetricsSender extends Actor {
}
def ready(udpSender: ActorRef): Receive = {
- case StatsD.MetricBatch(metrics) ⇒ sendMetricsToRemote(metrics, ByteString.empty, udpSender)
+ case tick: TickMetricSnapshot => writeMetricsToRemote(tick, udpSender)
}
- @tailrec final def sendMetricsToRemote(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = {
- def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote)
+ def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
+ val dataBuilder = new MetricDataPacketBuilder(statsDExtension.maxPacketSize, udpSender, remote)
- if (metrics.isEmpty)
- flushToRemote(buffer, udpSender)
- else {
- val headMetricData = metrics.head.toByteString(includeTrailingNewline = true)
+ for((groupIdentity, groupSnapshot) <- tick.metrics;
+ (metricIdentity, metricSnapshot) <- groupSnapshot.metrics) {
- if (buffer.size + headMetricData.size > statsDExtension.maxPacketSize) {
- flushToRemote(buffer, udpSender)
- sendMetricsToRemote(metrics.tail, headMetricData, udpSender)
- } else {
- sendMetricsToRemote(metrics.tail, buffer ++ headMetricData, udpSender)
+ val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity))
+
+ for(measurement <- metricSnapshot.measurements) {
+ val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType)
+ dataBuilder.appendMeasurement(key, measurementData)
}
}
+
+ dataBuilder.flush()
+ }
+
+ def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): ByteString = {
+ def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): ByteString =
+ ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRate else ""))
+
+ instrumentType match {
+ case Histogram => statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
+ case Gauge => statsDMetricFormat(measurement.value.toString, "g")
+ case Counter => ByteString.empty // TODO: Need to decide how to report counters, when we have them!
+ }
}
}
object StatsDMetricsSender {
def props: Props = Props[StatsDMetricsSender]
+}
+
+trait UdpExtensionProvider {
+ def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
+}
+
+class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: InetSocketAddress) {
+ val metricSeparator = ByteString("\n")
+ val measurementSeparator = ByteString(":")
+
+ var lastKey= ByteString.empty
+ var buffer = ByteString.empty
+
+ def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = {
+ val appendData =
+ if(key == lastKey)
+ measurementSeparator ++ measurementData
+ else {
+ lastKey = key
+ val keySeparator = if(buffer.length == 0) ByteString.empty else metricSeparator
+ keySeparator ++ key ++ measurementSeparator ++ measurementData
+ }
+
+ if(buffer.length + appendData.length >= maxPacketSize) {
+ flushToUDP(buffer)
+ buffer = appendData
+ } else
+ buffer = buffer ++ appendData
+ }
+
+ private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote)
+
+ def flush(): Unit = {
+ flushToUDP(buffer)
+ buffer = ByteString.empty
+ }
} \ No newline at end of file