From 09f0932a4310f9fa529110c18cd13cbb4eb69e72 Mon Sep 17 00:00:00 2001 From: Eugene Platonov Date: Tue, 1 Dec 2015 13:27:35 -0500 Subject: + statsd: allow time and memory metrics be scaled before sending to statsd --- .../src/main/scala/kamon/statsd/StatsD.scala | 20 +++++++++---- .../kamon/statsd/UDPBasedStatsDMetricsSender.scala | 33 +++++++++++++--------- 2 files changed, 33 insertions(+), 20 deletions(-) (limited to 'kamon-statsd/src/main/scala') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index a1f7dca3..4eae8b79 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -16,14 +16,16 @@ package kamon.statsd +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ + import akka.actor._ +import akka.event.Logging +import com.typesafe.config.Config import kamon.Kamon import kamon.metric._ import kamon.util.ConfigTools.Syntax -import scala.concurrent.duration._ -import com.typesafe.config.Config -import akka.event.Logging -import scala.collection.JavaConverters._ +import kamon.util.NeedToScale object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = StatsD @@ -62,11 +64,17 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { val metricsSender = system.actorOf(senderFactory.props(statsDConfig, keyGenerator), "statsd-metrics-sender") + val decoratedSender = statsDConfig match { + case NeedToScale(scaleTimeTo, scaleMemoryTo) => + system.actorOf(MetricScaleDecorator.props(scaleTimeTo, scaleMemoryTo, metricsSender), "statsd-metric-scale-decorator") + case _ => metricsSender + } + if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics sender. - metricsSender + decoratedSender } else { - system.actorOf(TickMetricSnapshotBuffer.props(flushInterval, metricsSender), "statsd-metrics-buffer") + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval, decoratedSender), "statsd-metrics-buffer") } } } \ No newline at end of file diff --git a/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala index 9e856eda..88609686 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala @@ -19,31 +19,43 @@ package kamon.statsd import java.net.InetSocketAddress import java.text.{ DecimalFormat, DecimalFormatSymbols } import java.util.Locale + import akka.actor.{ Actor, ActorRef, ActorSystem } import akka.io.{ IO, Udp } import akka.util.ByteString import com.typesafe.config.Config import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +trait StatsDValueFormatters { + + val symbols = DecimalFormatSymbols.getInstance(Locale.US) + symbols.setDecimalSeparator('.') + // Just in case there is some weird locale config we are not aware of. + + // Absurdly high number of decimal digits, let the other end lose precision if it needs to. + val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) + + def encodeStatsDTimer(level: Long, count: Long): String = { + val samplingRate: Double = 1D / count + level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") + } + + def encodeStatsDCounter(count: Long): String = count.toString + "|c" +} + /** * Base class for different StatsD senders utilizing UDP protocol. It implies use of one statsd server. * @param statsDConfig Config to read settings specific to this sender * @param metricKeyGenerator Key generator for all metrics sent by this sender */ abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) - extends Actor with UdpExtensionProvider { + extends Actor with UdpExtensionProvider with StatsDValueFormatters { import context.system val statsDHost = statsDConfig.getString("hostname") val statsDPort = statsDConfig.getInt("port") - val symbols = DecimalFormatSymbols.getInstance(Locale.US) - symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. - - // Absurdly high number of decimal digits, let the other end lose precision if it needs to. - val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) - udpExtension ! Udp.SimpleSender lazy val socketAddress = new InetSocketAddress(statsDHost, statsDPort) @@ -61,13 +73,6 @@ abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenera def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit - def encodeStatsDTimer(level: Long, count: Long): String = { - val samplingRate: Double = 1D / count - level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") - } - - def encodeStatsDCounter(count: Long): String = count.toString + "|c" - } trait UdpExtensionProvider { -- cgit v1.2.3