aboutsummaryrefslogtreecommitdiff
path: root/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala')
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala76
1 files changed, 56 insertions, 20 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
index 124cdab8..a7cba371 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
@@ -18,42 +18,78 @@ package kamon.statsd
import akka.actor._
import kamon.Kamon
-import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics._
import scala.concurrent.duration._
-import kamon.metrics.Subscriptions.TickMetricSnapshot
+import scala.collection.JavaConverters._
+import akka.util.ByteString
-object Statsd extends ExtensionId[StatsdExtension] with ExtensionIdProvider {
- override def lookup(): ExtensionId[_ <: Extension] = Statsd
- override def createExtension(system: ExtendedActorSystem): StatsdExtension = new StatsdExtension(system)
+object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
+ override def lookup(): ExtensionId[_ <: Extension] = StatsD
+ override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system)
+
+
+ sealed trait Metric {
+ def key: String
+ def value: Long
+ def suffix: String
+ def samplingRate: Double
+
+ /*
+ * Creates the stats string to send to StatsD.
+ * For counters, it provides something like {@code key:value|c}.
+ * For timing, it provides something like {@code key:millis|ms}.
+ * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate}
+ */
+ def toByteString: ByteString =
+ if(samplingRate >= 1D)
+ ByteString(s"$key:$value|$suffix")
+ else
+ ByteString(s"$key:$value|$suffix|@$samplingRate")
+ }
+
+ case class Counter(key: String, value: Long = 1, samplingRate: Double = 1.0) extends Metric {
+ val suffix: String = "c"
+ }
+
+ case class Timing(key: String, value: Long, samplingRate: Double = 1.0) extends Metric {
+ val suffix: String = "ms"
+ }
+
+ case class Gauge(key: String, value: Long, samplingRate: Double = 1.0) extends Metric {
+ val suffix: String = "g"
+ }
+
+ case class MetricBatch(metrics: Vector[Metric])
}
-class StatsdExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
+class StatsDExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
private val config = system.settings.config.getConfig("kamon.statsd")
val hostname = config.getString("hostname")
val port = config.getInt("port")
val prefix = config.getString("prefix")
+ val flushInterval = config.getMilliseconds("flush-interval")
+ val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval")
- val statsdMetricsListener = system.actorOf(Props(new StatsdMetricsListener(hostname, port, prefix)), "kamon-statsd-metrics-listener")
+ val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval)
- Kamon(Metrics)(system).subscribe(TraceMetrics, "*", statsdMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(CustomMetric, "*", statsdMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ActorMetrics, "*", statsdMetricsListener, permanently = true)
-}
-
-class StatsdMetricsListener(host: String, port: Int, prefix: String) extends Actor with ActorLogging {
- import java.net.{ InetAddress, InetSocketAddress }
+ val includedActors = config.getStringList("includes.actor").asScala
+ for(actorPathPattern <- includedActors) {
+ Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true)
+ }
- log.info("Starting the Kamon(Statsd) extension")
- val statsdActor = context.actorOf(StatsdMetricsSender.props(prefix, new InetSocketAddress(InetAddress.getByName(host), port)), "statsd-metrics-sender")
- val translator = context.actorOf(StatsdMetricTranslator.props(statsdActor), "statsd-metrics-translator")
- val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "statsd-metrics-buffer")
+ def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
+ assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
- def receive = {
- case tick: TickMetricSnapshot ⇒ buffer.forward(tick)
+ val metricsTranslator = system.actorOf(StatsDMetricTranslator.props, "statsd-metrics-translator")
+ if(flushInterval == tickInterval) {
+ // No need to buffer the metrics, let's go straight to the metrics translator.
+ metricsTranslator
+ } else {
+ system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer")
+ }
}
}