From 7e2cf36025c697116a232b966986cbf4f2f76b01 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 2 Apr 2014 18:52:27 -0300 Subject: WIP:statsd module --- .../src/main/scala/kamon/statsd/Statsd.scala | 14 ++++----- .../kamon/statsd/StatsdMetricTranslator.scala | 4 +-- .../scala/kamon/statsd/StatsdMetricsSender.scala | 34 +++++++++++----------- 3 files changed, 25 insertions(+), 27 deletions(-) (limited to 'kamon-statsd/src/main/scala') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala index a0af09bb..786e518b 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala @@ -19,8 +19,7 @@ package kamon.statsd import akka.actor._ import kamon.Kamon import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.{TickMetricSnapshotBuffer, CustomMetric, TraceMetrics, Metrics} -import kamon.statsd.StatsdMetricsSender +import kamon.metrics.{ TickMetricSnapshotBuffer, CustomMetric, TraceMetrics, Metrics } object Statsd extends ExtensionId[StatsdExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = Statsd @@ -32,8 +31,8 @@ class StatsdExtension(private val system: ExtendedActorSystem) extends Kamon.Ext private val config = system.settings.config.getConfig("kamon.statsd") val hostname = config.getString("hostname") - val port = config.getInt("port") - val prefix = config.getString("prefix") + val port = config.getInt("port") + val prefix = config.getString("prefix") val statsdMetricsListener = system.actorOf(Props(new StatsdMetricsListener(hostname, port, prefix)), "kamon-statsd-metrics-listener") @@ -41,12 +40,12 @@ class StatsdExtension(private val system: ExtendedActorSystem) extends Kamon.Ext Kamon(Metrics)(system).subscribe(CustomMetric, "*", statsdMetricsListener, permanently = true) } -class StatsdMetricsListener(host:String, port:Int, prefix:String) extends Actor with ActorLogging { - import java.net.{InetAddress, InetSocketAddress} +class StatsdMetricsListener(host: String, port: Int, prefix: String) extends Actor with ActorLogging { + import java.net.{ InetAddress, InetSocketAddress } log.info("Starting the Kamon(Statsd) extension") - val statsdActor = context.actorOf(StatsdMetricsSender.props(prefix, new InetSocketAddress(InetAddress.getByName(host), port)), "statsd-metrics-sender") + val statsdActor = context.actorOf(StatsdMetricsSender.props(prefix, new InetSocketAddress(InetAddress.getByName(host), port)), "statsd-metrics-sender") val translator = context.actorOf(StatsdMetricTranslator.props(statsdActor), "statsd-metrics-translator") val buffer = context.actorOf(TickMetricSnapshotBuffer.props(1 minute, translator), "metrics-buffer") @@ -55,4 +54,3 @@ class StatsdMetricsListener(host:String, port:Int, prefix:String) extends Actor } } - diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala index ee56c005..61c39ed7 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala @@ -15,10 +15,10 @@ */ package kamon.statsd -import akka.actor.{Props, Actor, ActorRef} +import akka.actor.{ Props, Actor, ActorRef } import kamon.metrics.Subscriptions.TickMetricSnapshot -class StatsdMetricTranslator(receiver: ActorRef) extends Actor{ +class StatsdMetricTranslator(receiver: ActorRef) extends Actor { def receive = { case TickMetricSnapshot(from, to, metrics) ⇒ diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala index 1ccf5397..9898a6e8 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala @@ -16,43 +16,43 @@ package kamon.statsd -import akka.actor.{ActorLogging, Props, ActorRef, Actor} -import akka.io.{Udp, IO} +import akka.actor.{ ActorLogging, Props, ActorRef, Actor } +import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -class StatsdMetricsSender(statPrefix:String, remote: InetSocketAddress) extends Actor with ActorLogging { +class StatsdMetricsSender(statPrefix: String, remote: InetSocketAddress) extends Actor with ActorLogging { import StatsdMetricsSender._ import context.system IO(Udp) ! Udp.SimpleSender def receive = { - case Udp.SimpleSenderReady => + case Udp.SimpleSenderReady ⇒ context.become(ready(sender)) } def ready(send: ActorRef): Receive = { - case metric: StatsdMetric => + case metric: StatsdMetric ⇒ send ! Udp.Send(toByteString(statPrefix, metric), remote) - case _ => log.error("Unknown Metric") + case _ ⇒ log.error("Unknown Metric") } } object StatsdMetricsSender { sealed trait StatsdMetric - case class Counter(key: String, value: Long = 1, suffix:String = "c", samplingRate: Double = 1.0) extends StatsdMetric - case class Timing(key: String, millis: Long, suffix:String = "ms", samplingRate: Double = 1.0) extends StatsdMetric - case class Gauge(key: String, value: Long, suffix:String = "g", samplingRate: Double = 1.0) extends StatsdMetric + case class Counter(key: String, value: Long = 1, suffix: String = "c", samplingRate: Double = 1.0) extends StatsdMetric + case class Timing(key: String, millis: Long, suffix: String = "ms", samplingRate: Double = 1.0) extends StatsdMetric + case class Gauge(key: String, value: Long, suffix: String = "g", samplingRate: Double = 1.0) extends StatsdMetric - def props(statPrefix:String, remote: InetSocketAddress): Props = Props(new StatsdMetricsSender(statPrefix, remote)) + def props(statPrefix: String, remote: InetSocketAddress): Props = Props(new StatsdMetricsSender(statPrefix, remote)) - def toByteString(statPrefix:String, metric:StatsdMetric) : ByteString = metric match { - case Counter(key, value, suffix, samplingRate) => statFor(statPrefix, key, value, suffix, samplingRate) - case Timing(key, value, suffix, samplingRate) => statFor(statPrefix, key, value, suffix, samplingRate) - case Gauge(key, value, suffix, samplingRate) => statFor(statPrefix, key, value, suffix, samplingRate) + def toByteString(statPrefix: String, metric: StatsdMetric): ByteString = metric match { + case Counter(key, value, suffix, samplingRate) ⇒ statFor(statPrefix, key, value, suffix, samplingRate) + case Timing(key, value, suffix, samplingRate) ⇒ statFor(statPrefix, key, value, suffix, samplingRate) + case Gauge(key, value, suffix, samplingRate) ⇒ statFor(statPrefix, key, value, suffix, samplingRate) } /* @@ -61,10 +61,10 @@ object StatsdMetricsSender { * For timing, it provides something like {@code key:millis|ms}. * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate} */ - private[this] def statFor(statPrefix:String, key: String, value: Long, suffix: String, samplingRate: Double): ByteString = { + private[this] def statFor(statPrefix: String, key: String, value: Long, suffix: String, samplingRate: Double): ByteString = { samplingRate match { - case x if x >= 1.0 => ByteString(s"${statPrefix}.${key}:${value}|$suffix") - case _ => ByteString(s"${statPrefix}.${key}:${value}|${suffix}|@$samplingRate") + case x if x >= 1.0 ⇒ ByteString(s"${statPrefix}.${key}:${value}|$suffix") + case _ ⇒ ByteString(s"${statPrefix}.${key}:${value}|${suffix}|@$samplingRate") } } } \ No newline at end of file -- cgit v1.2.3