diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-04-03 09:21:15 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-04-03 09:21:15 -0300 |
commit | 0819320d7f20c78ad096c21c6aedb3536758792b (patch) | |
tree | caa0a0a9052663531583d343a7639ef92d335782 /kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala | |
parent | 4026aa0269f6aad0bc1acaf837fc51f4f0da504e (diff) | |
download | Kamon-0819320d7f20c78ad096c21c6aedb3536758792b.tar.gz Kamon-0819320d7f20c78ad096c21c6aedb3536758792b.tar.bz2 Kamon-0819320d7f20c78ad096c21c6aedb3536758792b.zip |
minor reorganization and includes filter
Diffstat (limited to 'kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala')
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala | 76 |
1 files changed, 56 insertions, 20 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala index 124cdab8..a7cba371 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala @@ -18,42 +18,78 @@ package kamon.statsd import akka.actor._ import kamon.Kamon -import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics._ import scala.concurrent.duration._ -import kamon.metrics.Subscriptions.TickMetricSnapshot +import scala.collection.JavaConverters._ +import akka.util.ByteString -object Statsd extends ExtensionId[StatsdExtension] with ExtensionIdProvider { - override def lookup(): ExtensionId[_ <: Extension] = Statsd - override def createExtension(system: ExtendedActorSystem): StatsdExtension = new StatsdExtension(system) +object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = StatsD + override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system) + + + sealed trait Metric { + def key: String + def value: Long + def suffix: String + def samplingRate: Double + + /* + * Creates the stats string to send to StatsD. + * For counters, it provides something like {@code key:value|c}. + * For timing, it provides something like {@code key:millis|ms}. + * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate} + */ + def toByteString: ByteString = + if(samplingRate >= 1D) + ByteString(s"$key:$value|$suffix") + else + ByteString(s"$key:$value|$suffix|@$samplingRate") + } + + case class Counter(key: String, value: Long = 1, samplingRate: Double = 1.0) extends Metric { + val suffix: String = "c" + } + + case class Timing(key: String, value: Long, samplingRate: Double = 1.0) extends Metric { + val suffix: String = "ms" + } + + case class Gauge(key: String, value: Long, samplingRate: Double = 1.0) extends Metric { + val suffix: String = "g" + } + + case class MetricBatch(metrics: Vector[Metric]) } -class StatsdExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { +class StatsDExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { private val config = system.settings.config.getConfig("kamon.statsd") val hostname = config.getString("hostname") val port = config.getInt("port") val prefix = config.getString("prefix") + val flushInterval = config.getMilliseconds("flush-interval") + val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval") - val statsdMetricsListener = system.actorOf(Props(new StatsdMetricsListener(hostname, port, prefix)), "kamon-statsd-metrics-listener") + val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", statsdMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(CustomMetric, "*", statsdMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ActorMetrics, "*", statsdMetricsListener, permanently = true) -} - -class StatsdMetricsListener(host: String, port: Int, prefix: String) extends Actor with ActorLogging { - import java.net.{ InetAddress, InetSocketAddress } + val includedActors = config.getStringList("includes.actor").asScala + for(actorPathPattern <- includedActors) { + Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) + } - log.info("Starting the Kamon(Statsd) extension") - val statsdActor = context.actorOf(StatsdMetricsSender.props(prefix, new InetSocketAddress(InetAddress.getByName(host), port)), "statsd-metrics-sender") - val translator = context.actorOf(StatsdMetricTranslator.props(statsdActor), "statsd-metrics-translator") - val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "statsd-metrics-buffer") + def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { + assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") - def receive = { - case tick: TickMetricSnapshot ⇒ buffer.forward(tick) + val metricsTranslator = system.actorOf(StatsDMetricTranslator.props, "statsd-metrics-translator") + if(flushInterval == tickInterval) { + // No need to buffer the metrics, let's go straight to the metrics translator. + metricsTranslator + } else { + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer") + } } } |