aboutsummaryrefslogtreecommitdiff
path: root/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala')
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala91
1 files changed, 75 insertions, 16 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index 63b1a53a..60404d7e 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -16,20 +16,23 @@
package kamon.statsd
-import akka.actor.{ Props, ActorRef, Actor }
+import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
import kamon.Kamon
-import scala.annotation.tailrec
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metrics.MetricSnapshot.Measurement
+import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
-class StatsDMetricsSender extends Actor {
+class StatsDMetricsSender extends Actor with UdpExtensionProvider {
import context.system
val statsDExtension = Kamon(StatsD)
val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port)
+ val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config)
- IO(Udp) ! Udp.SimpleSender
+ udpExtension ! Udp.SimpleSender
def receive = {
case Udp.SimpleSenderReady ⇒
@@ -37,27 +40,83 @@ class StatsDMetricsSender extends Actor {
}
def ready(udpSender: ActorRef): Receive = {
- case StatsD.MetricBatch(metrics) ⇒ sendMetricsToRemote(metrics, ByteString.empty, udpSender)
+ case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender)
}
- @tailrec final def sendMetricsToRemote(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = {
- def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote)
+ def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
+ val dataBuilder = new MetricDataPacketBuilder(statsDExtension.maxPacketSize, udpSender, remote)
- if (metrics.isEmpty)
- flushToRemote(buffer, udpSender)
- else {
- val headMetricData = metrics.head.toByteString(includeTrailingNewline = true)
+ for (
+ (groupIdentity, groupSnapshot) ← tick.metrics;
+ (metricIdentity, metricSnapshot) ← groupSnapshot.metrics
+ ) {
- if (buffer.size + headMetricData.size > statsDExtension.maxPacketSize) {
- flushToRemote(buffer, udpSender)
- sendMetricsToRemote(metrics.tail, headMetricData, udpSender)
- } else {
- sendMetricsToRemote(metrics.tail, buffer ++ headMetricData, udpSender)
+ val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity))
+
+ for (measurement ← metricSnapshot.measurements) {
+ val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType)
+ dataBuilder.appendMeasurement(key, measurementData)
}
}
+
+ dataBuilder.flush()
+ }
+
+ def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): ByteString = {
+ def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): ByteString =
+ ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRate else ""))
+
+ instrumentType match {
+ case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
+ case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g")
+ case Counter ⇒ ByteString.empty // TODO: Need to decide how to report counters, when we have them!
+ }
}
}
object StatsDMetricsSender {
def props: Props = Props[StatsDMetricsSender]
+}
+
+trait UdpExtensionProvider {
+ def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
+}
+
+class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: InetSocketAddress) {
+ val metricSeparator = ByteString("\n")
+ val measurementSeparator = ByteString(":")
+
+ var lastKey = ByteString.empty
+ var buffer = ByteString.empty
+
+ def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = {
+ if (key == lastKey) {
+ val dataWithoutKey = measurementSeparator ++ measurementData
+ if (fitsOnBuffer(dataWithoutKey))
+ buffer = buffer ++ dataWithoutKey
+ else {
+ flushToUDP(buffer)
+ buffer = key ++ dataWithoutKey
+ }
+ } else {
+ lastKey = key
+ val dataWithoutSeparator = key ++ measurementSeparator ++ measurementData
+ if (fitsOnBuffer(metricSeparator ++ dataWithoutSeparator)) {
+ val mSeparator = if (buffer.length > 0) metricSeparator else ByteString.empty
+ buffer = buffer ++ mSeparator ++ dataWithoutSeparator
+ } else {
+ flushToUDP(buffer)
+ buffer = dataWithoutSeparator
+ }
+ }
+ }
+
+ def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSize
+
+ private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote)
+
+ def flush(): Unit = {
+ flushToUDP(buffer)
+ buffer = ByteString.empty
+ }
} \ No newline at end of file