diff options
Diffstat (limited to 'kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala')
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala | 82 |
1 files changed, 66 insertions, 16 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index 63b1a53a..42eb57d0 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -16,20 +16,23 @@ package kamon.statsd -import akka.actor.{ Props, ActorRef, Actor } +import akka.actor.{ActorSystem, Props, ActorRef, Actor} import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString import kamon.Kamon -import scala.annotation.tailrec +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.MetricSnapshot.Measurement +import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType} -class StatsDMetricsSender extends Actor { +class StatsDMetricsSender extends Actor with UdpExtensionProvider { import context.system val statsDExtension = Kamon(StatsD) val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port) + val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config) - IO(Udp) ! Udp.SimpleSender + udpExtension ! Udp.SimpleSender def receive = { case Udp.SimpleSenderReady ⇒ @@ -37,27 +40,74 @@ class StatsDMetricsSender extends Actor { } def ready(udpSender: ActorRef): Receive = { - case StatsD.MetricBatch(metrics) ⇒ sendMetricsToRemote(metrics, ByteString.empty, udpSender) + case tick: TickMetricSnapshot => writeMetricsToRemote(tick, udpSender) } - @tailrec final def sendMetricsToRemote(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = { - def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote) + def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { + val dataBuilder = new MetricDataPacketBuilder(statsDExtension.maxPacketSize, udpSender, remote) - if (metrics.isEmpty) - flushToRemote(buffer, udpSender) - else { - val headMetricData = metrics.head.toByteString(includeTrailingNewline = true) + for((groupIdentity, groupSnapshot) <- tick.metrics; + (metricIdentity, metricSnapshot) <- groupSnapshot.metrics) { - if (buffer.size + headMetricData.size > statsDExtension.maxPacketSize) { - flushToRemote(buffer, udpSender) - sendMetricsToRemote(metrics.tail, headMetricData, udpSender) - } else { - sendMetricsToRemote(metrics.tail, buffer ++ headMetricData, udpSender) + val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity)) + + for(measurement <- metricSnapshot.measurements) { + val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType) + dataBuilder.appendMeasurement(key, measurementData) } } + + dataBuilder.flush() + } + + def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): ByteString = { + def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): ByteString = + ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRate else "")) + + instrumentType match { + case Histogram => statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) + case Gauge => statsDMetricFormat(measurement.value.toString, "g") + case Counter => ByteString.empty // TODO: Need to decide how to report counters, when we have them! + } } } object StatsDMetricsSender { def props: Props = Props[StatsDMetricsSender] +} + +trait UdpExtensionProvider { + def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) +} + +class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: InetSocketAddress) { + val metricSeparator = ByteString("\n") + val measurementSeparator = ByteString(":") + + var lastKey= ByteString.empty + var buffer = ByteString.empty + + def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = { + val appendData = + if(key == lastKey) + measurementSeparator ++ measurementData + else { + lastKey = key + val keySeparator = if(buffer.length == 0) ByteString.empty else metricSeparator + keySeparator ++ key ++ measurementSeparator ++ measurementData + } + + if(buffer.length + appendData.length >= maxPacketSize) { + flushToUDP(buffer) + buffer = appendData + } else + buffer = buffer ++ appendData + } + + private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote) + + def flush(): Unit = { + flushToUDP(buffer) + buffer = ByteString.empty + } }
\ No newline at end of file |