From 0819320d7f20c78ad096c21c6aedb3536758792b Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 3 Apr 2014 09:21:15 -0300 Subject: minor reorganization and includes filter --- .../src/main/scala/kamon/statsd/Statsd.scala | 76 ++++++++++++++++------ .../kamon/statsd/StatsdMetricTranslator.scala | 28 ++++++-- .../scala/kamon/statsd/StatsdMetricsSender.scala | 32 +-------- 3 files changed, 81 insertions(+), 55 deletions(-) (limited to 'kamon-statsd/src/main/scala') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala index 124cdab8..a7cba371 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala @@ -18,42 +18,78 @@ package kamon.statsd import akka.actor._ import kamon.Kamon -import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics._ import scala.concurrent.duration._ -import kamon.metrics.Subscriptions.TickMetricSnapshot +import scala.collection.JavaConverters._ +import akka.util.ByteString -object Statsd extends ExtensionId[StatsdExtension] with ExtensionIdProvider { - override def lookup(): ExtensionId[_ <: Extension] = Statsd - override def createExtension(system: ExtendedActorSystem): StatsdExtension = new StatsdExtension(system) +object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = StatsD + override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system) + + + sealed trait Metric { + def key: String + def value: Long + def suffix: String + def samplingRate: Double + + /* + * Creates the stats string to send to StatsD. + * For counters, it provides something like {@code key:value|c}. + * For timing, it provides something like {@code key:millis|ms}. + * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate} + */ + def toByteString: ByteString = + if(samplingRate >= 1D) + ByteString(s"$key:$value|$suffix") + else + ByteString(s"$key:$value|$suffix|@$samplingRate") + } + + case class Counter(key: String, value: Long = 1, samplingRate: Double = 1.0) extends Metric { + val suffix: String = "c" + } + + case class Timing(key: String, value: Long, samplingRate: Double = 1.0) extends Metric { + val suffix: String = "ms" + } + + case class Gauge(key: String, value: Long, samplingRate: Double = 1.0) extends Metric { + val suffix: String = "g" + } + + case class MetricBatch(metrics: Vector[Metric]) } -class StatsdExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { +class StatsDExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { private val config = system.settings.config.getConfig("kamon.statsd") val hostname = config.getString("hostname") val port = config.getInt("port") val prefix = config.getString("prefix") + val flushInterval = config.getMilliseconds("flush-interval") + val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval") - val statsdMetricsListener = system.actorOf(Props(new StatsdMetricsListener(hostname, port, prefix)), "kamon-statsd-metrics-listener") + val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", statsdMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(CustomMetric, "*", statsdMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ActorMetrics, "*", statsdMetricsListener, permanently = true) -} - -class StatsdMetricsListener(host: String, port: Int, prefix: String) extends Actor with ActorLogging { - import java.net.{ InetAddress, InetSocketAddress } + val includedActors = config.getStringList("includes.actor").asScala + for(actorPathPattern <- includedActors) { + Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) + } - log.info("Starting the Kamon(Statsd) extension") - val statsdActor = context.actorOf(StatsdMetricsSender.props(prefix, new InetSocketAddress(InetAddress.getByName(host), port)), "statsd-metrics-sender") - val translator = context.actorOf(StatsdMetricTranslator.props(statsdActor), "statsd-metrics-translator") - val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "statsd-metrics-buffer") + def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { + assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") - def receive = { - case tick: TickMetricSnapshot ⇒ buffer.forward(tick) + val metricsTranslator = system.actorOf(StatsDMetricTranslator.props, "statsd-metrics-translator") + if(flushInterval == tickInterval) { + // No need to buffer the metrics, let's go straight to the metrics translator. + metricsTranslator + } else { + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer") + } } } diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala index a08450c5..6a4c8d56 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala @@ -18,20 +18,36 @@ package kamon.statsd import akka.actor.{ Props, Actor, ActorRef } import kamon.metrics._ import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.ActorMetrics.ActorMetricSnapshot + +class StatsDMetricTranslator extends Actor { + //val metricsSender = -class StatsdMetricTranslator(receiver: ActorRef) extends Actor { def receive = { case TickMetricSnapshot(from, to, metrics) ⇒ - collectAllMetrics(metrics) - receiver ! "" + + } - def collectAllMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) = { + def transformActorMetric(actorIdentity: ActorMetrics, snapshot: ActorMetricSnapshot): Vector[StatsD.Metric] = { + // TODO: Define metrics namespacing. + roll(actorIdentity.name, snapshot.timeInMailbox, StatsD.Timing) + } + def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Long, Double) => StatsD.Metric): Vector[StatsD.Metric] = { + val builder = Vector.newBuilder[StatsD.Metric] + for(measurement <- snapshot.measurements) { + val samplingRate = 1D / measurement.count + builder += metricBuilder.apply(key, measurement.value, samplingRate) + } + + builder.result() } + + } -object StatsdMetricTranslator { - def props(receiver: ActorRef): Props = Props(new StatsdMetricTranslator(receiver)) +object StatsDMetricTranslator { + def props: Props = Props(new StatsDMetricTranslator) } diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala index 96d83eb8..cff7a4a1 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala @@ -22,7 +22,6 @@ import java.net.InetSocketAddress import akka.util.ByteString class StatsdMetricsSender(statPrefix: String, remote: InetSocketAddress) extends Actor with ActorLogging { - import StatsdMetricsSender._ import context.system IO(Udp) ! Udp.SimpleSender @@ -33,39 +32,14 @@ class StatsdMetricsSender(statPrefix: String, remote: InetSocketAddress) extends } def ready(send: ActorRef): Receive = { - case metric: StatsdMetric ⇒ - send ! Udp.Send(toByteString(statPrefix, metric), remote) + // TODO: batch writes + case metric: StatsD.Metric ⇒ + send ! Udp.Send(metric.toByteString, remote) case _ ⇒ log.error("Unknown Metric") } } object StatsdMetricsSender { - - sealed trait StatsdMetric - - case class Counter(key: String, value: Long = 1, suffix: String = "c", samplingRate: Double = 1.0) extends StatsdMetric - case class Timing(key: String, millis: Long, suffix: String = "ms", samplingRate: Double = 1.0) extends StatsdMetric - case class Gauge(key: String, value: Long, suffix: String = "g", samplingRate: Double = 1.0) extends StatsdMetric - def props(statPrefix: String, remote: InetSocketAddress): Props = Props(new StatsdMetricsSender(statPrefix, remote)) - - def toByteString(statPrefix: String, metric: StatsdMetric): ByteString = metric match { - case Counter(key, value, suffix, samplingRate) ⇒ statFor(statPrefix, key, value, suffix, samplingRate) - case Timing(key, value, suffix, samplingRate) ⇒ statFor(statPrefix, key, value, suffix, samplingRate) - case Gauge(key, value, suffix, samplingRate) ⇒ statFor(statPrefix, key, value, suffix, samplingRate) - } - - /* - * Creates the stat string to send to statsd. - * For counters, it provides something like {@code key:value|c}. - * For timing, it provides something like {@code key:millis|ms}. - * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate} - */ - private[this] def statFor(statPrefix: String, key: String, value: Long, suffix: String, samplingRate: Double): ByteString = { - samplingRate match { - case x if x >= 1.0 ⇒ ByteString(s"$statPrefix.$key:$value|$suffix") - case _ ⇒ ByteString(s"$statPrefix.$key:$value|$suffix|@$samplingRate") - } - } } \ No newline at end of file -- cgit v1.2.3