diff options
author | Diego <diegolparra@gmail.com> | 2014-04-02 17:55:49 -0300 |
---|---|---|
committer | Diego <diegolparra@gmail.com> | 2014-04-02 20:12:39 -0300 |
commit | 5da2099182b4b2b8678ed97fcaa6d74f02bb5544 (patch) | |
tree | 83d19f83ca6c2259f5e497d40e7e008cc1b3baa3 | |
parent | 02370563c32fb4dbee848097a5e7723c902a44d4 (diff) | |
download | Kamon-5da2099182b4b2b8678ed97fcaa6d74f02bb5544.tar.gz Kamon-5da2099182b4b2b8678ed97fcaa6d74f02bb5544.tar.bz2 Kamon-5da2099182b4b2b8678ed97fcaa6d74f02bb5544.zip |
WIP:First implementation of statsd client
5 files changed, 100 insertions, 70 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf index 9d09a709..e49d878c 100644 --- a/kamon-statsd/src/main/resources/reference.conf +++ b/kamon-statsd/src/main/resources/reference.conf @@ -8,5 +8,4 @@ kamon { hostname = statsd.example.com port = 8125 } - } }
\ No newline at end of file diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala index 33344ca9..4e0f0dfe 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala @@ -16,8 +16,11 @@ package kamon.statsd -import akka.actor.{ ExtendedActorSystem, Extension, ExtensionIdProvider, ExtensionId } +import akka.actor._ import kamon.Kamon +import kamon.statsd.client.StatsdMetricsSender +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.{CustomMetric, TraceMetrics, Metrics} object Statsd extends ExtensionId[StatsdExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = Statsd @@ -25,17 +28,29 @@ object Statsd extends ExtensionId[StatsdExtension] with ExtensionIdProvider { } class StatsdExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { - publishInfoMessage(system, "Statsd Extension Loaded!!") private val config = system.settings.config.getConfig("kamon.statsd") - val prefix = config.getString("prefix") val hostname = config.getString("hostname") val port = config.getInt("port") + val prefix = config.getString("prefix") + + val statsdMetricsListener = system.actorOf(Props(new StatsdMetricsListener(hostname, port, prefix)), "kamon-statsd-metrics-listener") + + Kamon(Metrics)(system).subscribe(TraceMetrics, "*", statsdMetricsListener, permanently = true) + Kamon(Metrics)(system).subscribe(CustomMetric, "*", statsdMetricsListener, permanently = true) +} + +class StatsdMetricsListener(host:String, port:Int, prefix:String) extends Actor with ActorLogging { + import java.net.{InetAddress, InetSocketAddress} + + log.info("Starting the Kamon(Statsd) extension") + + val statsdActor = context.actorOf(StatsdMetricsSender.props(prefix, new InetSocketAddress(InetAddress.getByName(host), port)), "StatsdSender") + + def receive = { + case tick: TickMetricSnapshot ⇒ statsdActor.forward(tick) + } } -object MetricsTypes{ - case class Counter(key: String, value: Long = 1, samplingRate: Double = 1.0) - case class Timing(key: String, millis: Long, samplingRate: Double = 1.0) - case class Gauge(key: String, value: Long) -}
\ No newline at end of file + diff --git a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdClient.scala b/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdClient.scala deleted file mode 100644 index ea50ef60..00000000 --- a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdClient.scala +++ /dev/null @@ -1,20 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.statsd.client - -class StatsdClient { - -} diff --git a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdMetricsSender.scala new file mode 100644 index 00000000..8ee08420 --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdMetricsSender.scala @@ -0,0 +1,77 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd.client + +import akka.actor.{ActorLogging, Props, ActorRef, Actor} +import akka.io.{Udp, IO} +import java.net.InetSocketAddress +import akka.util.ByteString + +class StatsdMetricsSender(statPrefix:String, remote: InetSocketAddress) extends Actor with ActorLogging { + import context.system + + import StatsdMetricsSender._ + import StatsDProtocol._ + + IO(Udp) ! Udp.SimpleSender + + def receive = { + case Udp.SimpleSenderReady => + context.become(ready(sender)) + } + + def ready(send: ActorRef): Receive = { + case metric: StatsdMetric => + send ! Udp.Send(toByteString(statPrefix, metric), remote) + + case _ => log.error("Unknown Metric") + } +} + +object StatsdMetricsSender { + import StatsDProtocol._ + + def props(statPrefix:String, remote: InetSocketAddress): Props = Props(new StatsdMetricsSender(statPrefix, remote)) + + def toByteString(statPrefix:String, metric:StatsdMetric) : ByteString = metric match { + case Counter(key, value, suffix, samplingRate) => statFor(statPrefix, key, value, suffix, samplingRate) + case Timing(key, value, suffix, samplingRate) => statFor(statPrefix, key, value, suffix, samplingRate) + case Gauge(key, value, suffix, samplingRate) => statFor(statPrefix, key, value, suffix, samplingRate) + } + + /* + * Creates the stat string to send to statsd. + * For counters, it provides something like {@code key:value|c}. + * For timing, it provides something like {@code key:millis|ms}. + * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate} + */ + private[this] def statFor(statPrefix:String, key: String, value: Long, suffix: String, samplingRate: Double): ByteString = { + samplingRate match { + case x if x >= 1.0 => ByteString(s"${statPrefix}.${key}:${value}|$suffix") + case _ => ByteString(s"${statPrefix}.${key}:${value}|${suffix}|@$samplingRate") + } + } +} + + +object StatsDProtocol { + + sealed trait StatsdMetric + case class Counter(key: String, value: Long = 1, suffix:String = "c", samplingRate: Double = 1.0) extends StatsdMetric + case class Timing(key: String, millis: Long, suffix:String = "ms", samplingRate: Double = 1.0) extends StatsdMetric + case class Gauge(key: String, value: Long, suffix:String = "g", samplingRate: Double = 1.0) extends StatsdMetric +} diff --git a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdSender.scala deleted file mode 100644 index 676cf003..00000000 --- a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdSender.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.statsd.client - -import akka.actor.{Props, ActorRef, Actor} -import akka.io.{Udp, IO} -import java.net.InetSocketAddress -import akka.util.ByteString - -class StatsdSender(remote: InetSocketAddress) extends Actor { - import context.system - - IO(Udp) ! Udp.SimpleSender - - def receive = { - case Udp.SimpleSenderReady => - context.become(ready(sender)) - } - - def ready(send: ActorRef): Receive = { - case msg: String => - send ! Udp.Send(ByteString(msg), remote) - } -} -object StatsdSender { - def props(remote: InetSocketAddress): Props = Props(new StatsdSender(remote)) -}
\ No newline at end of file |