aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-04-03 09:21:15 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-04-03 09:21:15 -0300
commit0819320d7f20c78ad096c21c6aedb3536758792b (patch)
treecaa0a0a9052663531583d343a7639ef92d335782
parent4026aa0269f6aad0bc1acaf837fc51f4f0da504e (diff)
downloadKamon-0819320d7f20c78ad096c21c6aedb3536758792b.tar.gz
Kamon-0819320d7f20c78ad096c21c6aedb3536758792b.tar.bz2
Kamon-0819320d7f20c78ad096c21c6aedb3536758792b.zip
minor reorganization and includes filter
-rw-r--r--kamon-statsd/src/main/resources/reference.conf16
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala76
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala28
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala32
4 files changed, 93 insertions, 59 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf
index e49d878c..64f84a82 100644
--- a/kamon-statsd/src/main/resources/reference.conf
+++ b/kamon-statsd/src/main/resources/reference.conf
@@ -1,11 +1,19 @@
# ==================================== #
-# Kamon-Statsd Reference Configuration #
+# Kamon-StatsD Reference Configuration #
# ==================================== #
kamon {
statsd {
- prefix = kamon-app
- hostname = statsd.example.com
- port = 8125
+ prefix = kamon-app
+ hostname = statsd.example.com
+ port = 8125
+
+ flush-interval = 1 second
+ buffer-size = 512
+
+ includes {
+ actor = [ "*" ]
+ trace = [ "*" ]
+ }
}
} \ No newline at end of file
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
index 124cdab8..a7cba371 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
@@ -18,42 +18,78 @@ package kamon.statsd
import akka.actor._
import kamon.Kamon
-import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics._
import scala.concurrent.duration._
-import kamon.metrics.Subscriptions.TickMetricSnapshot
+import scala.collection.JavaConverters._
+import akka.util.ByteString
-object Statsd extends ExtensionId[StatsdExtension] with ExtensionIdProvider {
- override def lookup(): ExtensionId[_ <: Extension] = Statsd
- override def createExtension(system: ExtendedActorSystem): StatsdExtension = new StatsdExtension(system)
+object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
+ override def lookup(): ExtensionId[_ <: Extension] = StatsD
+ override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system)
+
+
+ sealed trait Metric {
+ def key: String
+ def value: Long
+ def suffix: String
+ def samplingRate: Double
+
+ /*
+ * Creates the stats string to send to StatsD.
+ * For counters, it provides something like {@code key:value|c}.
+ * For timing, it provides something like {@code key:millis|ms}.
+ * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate}
+ */
+ def toByteString: ByteString =
+ if(samplingRate >= 1D)
+ ByteString(s"$key:$value|$suffix")
+ else
+ ByteString(s"$key:$value|$suffix|@$samplingRate")
+ }
+
+ case class Counter(key: String, value: Long = 1, samplingRate: Double = 1.0) extends Metric {
+ val suffix: String = "c"
+ }
+
+ case class Timing(key: String, value: Long, samplingRate: Double = 1.0) extends Metric {
+ val suffix: String = "ms"
+ }
+
+ case class Gauge(key: String, value: Long, samplingRate: Double = 1.0) extends Metric {
+ val suffix: String = "g"
+ }
+
+ case class MetricBatch(metrics: Vector[Metric])
}
-class StatsdExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
+class StatsDExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
private val config = system.settings.config.getConfig("kamon.statsd")
val hostname = config.getString("hostname")
val port = config.getInt("port")
val prefix = config.getString("prefix")
+ val flushInterval = config.getMilliseconds("flush-interval")
+ val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval")
- val statsdMetricsListener = system.actorOf(Props(new StatsdMetricsListener(hostname, port, prefix)), "kamon-statsd-metrics-listener")
+ val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval)
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", statsdMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(CustomMetric, "*", statsdMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ActorMetrics, "*", statsdMetricsListener, permanently = true)
-}
-
-class StatsdMetricsListener(host: String, port: Int, prefix: String) extends Actor with ActorLogging {
- import java.net.{ InetAddress, InetSocketAddress }
+ val includedActors = config.getStringList("includes.actor").asScala
+ for(actorPathPattern <- includedActors) {
+ Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true)
+ }
- log.info("Starting the Kamon(Statsd) extension")
- val statsdActor = context.actorOf(StatsdMetricsSender.props(prefix, new InetSocketAddress(InetAddress.getByName(host), port)), "statsd-metrics-sender")
- val translator = context.actorOf(StatsdMetricTranslator.props(statsdActor), "statsd-metrics-translator")
- val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "statsd-metrics-buffer")
+ def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
+ assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
- def receive = {
- case tick: TickMetricSnapshot ⇒ buffer.forward(tick)
+ val metricsTranslator = system.actorOf(StatsDMetricTranslator.props, "statsd-metrics-translator")
+ if(flushInterval == tickInterval) {
+ // No need to buffer the metrics, let's go straight to the metrics translator.
+ metricsTranslator
+ } else {
+ system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer")
+ }
}
}
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
index a08450c5..6a4c8d56 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
@@ -18,20 +18,36 @@ package kamon.statsd
import akka.actor.{ Props, Actor, ActorRef }
import kamon.metrics._
import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metrics.ActorMetrics.ActorMetricSnapshot
+
+class StatsDMetricTranslator extends Actor {
+ //val metricsSender =
-class StatsdMetricTranslator(receiver: ActorRef) extends Actor {
def receive = {
case TickMetricSnapshot(from, to, metrics) ⇒
- collectAllMetrics(metrics)
- receiver ! ""
+
+
}
- def collectAllMetrics(metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) = {
+ def transformActorMetric(actorIdentity: ActorMetrics, snapshot: ActorMetricSnapshot): Vector[StatsD.Metric] = {
+ // TODO: Define metrics namespacing.
+ roll(actorIdentity.name, snapshot.timeInMailbox, StatsD.Timing)
+ }
+ def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Long, Double) => StatsD.Metric): Vector[StatsD.Metric] = {
+ val builder = Vector.newBuilder[StatsD.Metric]
+ for(measurement <- snapshot.measurements) {
+ val samplingRate = 1D / measurement.count
+ builder += metricBuilder.apply(key, measurement.value, samplingRate)
+ }
+
+ builder.result()
}
+
+
}
-object StatsdMetricTranslator {
- def props(receiver: ActorRef): Props = Props(new StatsdMetricTranslator(receiver))
+object StatsDMetricTranslator {
+ def props: Props = Props(new StatsDMetricTranslator)
}
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala
index 96d83eb8..cff7a4a1 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala
@@ -22,7 +22,6 @@ import java.net.InetSocketAddress
import akka.util.ByteString
class StatsdMetricsSender(statPrefix: String, remote: InetSocketAddress) extends Actor with ActorLogging {
- import StatsdMetricsSender._
import context.system
IO(Udp) ! Udp.SimpleSender
@@ -33,39 +32,14 @@ class StatsdMetricsSender(statPrefix: String, remote: InetSocketAddress) extends
}
def ready(send: ActorRef): Receive = {
- case metric: StatsdMetric ⇒
- send ! Udp.Send(toByteString(statPrefix, metric), remote)
+ // TODO: batch writes
+ case metric: StatsD.Metric ⇒
+ send ! Udp.Send(metric.toByteString, remote)
case _ ⇒ log.error("Unknown Metric")
}
}
object StatsdMetricsSender {
-
- sealed trait StatsdMetric
-
- case class Counter(key: String, value: Long = 1, suffix: String = "c", samplingRate: Double = 1.0) extends StatsdMetric
- case class Timing(key: String, millis: Long, suffix: String = "ms", samplingRate: Double = 1.0) extends StatsdMetric
- case class Gauge(key: String, value: Long, suffix: String = "g", samplingRate: Double = 1.0) extends StatsdMetric
-
def props(statPrefix: String, remote: InetSocketAddress): Props = Props(new StatsdMetricsSender(statPrefix, remote))
-
- def toByteString(statPrefix: String, metric: StatsdMetric): ByteString = metric match {
- case Counter(key, value, suffix, samplingRate) ⇒ statFor(statPrefix, key, value, suffix, samplingRate)
- case Timing(key, value, suffix, samplingRate) ⇒ statFor(statPrefix, key, value, suffix, samplingRate)
- case Gauge(key, value, suffix, samplingRate) ⇒ statFor(statPrefix, key, value, suffix, samplingRate)
- }
-
- /*
- * Creates the stat string to send to statsd.
- * For counters, it provides something like {@code key:value|c}.
- * For timing, it provides something like {@code key:millis|ms}.
- * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate}
- */
- private[this] def statFor(statPrefix: String, key: String, value: Long, suffix: String, samplingRate: Double): ByteString = {
- samplingRate match {
- case x if x >= 1.0 ⇒ ByteString(s"$statPrefix.$key:$value|$suffix")
- case _ ⇒ ByteString(s"$statPrefix.$key:$value|$suffix|@$samplingRate")
- }
- }
} \ No newline at end of file