aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-04-02 17:55:49 -0300
committerDiego <diegolparra@gmail.com>2014-04-02 17:55:49 -0300
commitc6194ee87065c850ed11860f1e3e3ee9ded54a7e (patch)
tree68df522399ea11b41ce4764d828196827e344e01
parent7d50762b3fd19634fbf26fe8c0685155c19edd27 (diff)
downloadKamon-c6194ee87065c850ed11860f1e3e3ee9ded54a7e.tar.gz
Kamon-c6194ee87065c850ed11860f1e3e3ee9ded54a7e.tar.bz2
Kamon-c6194ee87065c850ed11860f1e3e3ee9ded54a7e.zip
WIP:First implementation of statsd client
-rw-r--r--kamon-statsd/src/main/resources/reference.conf1
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala31
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/client/StatsdClient.scala20
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/client/StatsdMetricsSender.scala77
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/client/StatsdSender.scala41
5 files changed, 100 insertions, 70 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf
index 9d09a709..e49d878c 100644
--- a/kamon-statsd/src/main/resources/reference.conf
+++ b/kamon-statsd/src/main/resources/reference.conf
@@ -8,5 +8,4 @@ kamon {
hostname = statsd.example.com
port = 8125
}
- }
} \ No newline at end of file
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
index 33344ca9..4e0f0dfe 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
@@ -16,8 +16,11 @@
package kamon.statsd
-import akka.actor.{ ExtendedActorSystem, Extension, ExtensionIdProvider, ExtensionId }
+import akka.actor._
import kamon.Kamon
+import kamon.statsd.client.StatsdMetricsSender
+import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metrics.{CustomMetric, TraceMetrics, Metrics}
object Statsd extends ExtensionId[StatsdExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = Statsd
@@ -25,17 +28,29 @@ object Statsd extends ExtensionId[StatsdExtension] with ExtensionIdProvider {
}
class StatsdExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
- publishInfoMessage(system, "Statsd Extension Loaded!!")
private val config = system.settings.config.getConfig("kamon.statsd")
- val prefix = config.getString("prefix")
val hostname = config.getString("hostname")
val port = config.getInt("port")
+ val prefix = config.getString("prefix")
+
+ val statsdMetricsListener = system.actorOf(Props(new StatsdMetricsListener(hostname, port, prefix)), "kamon-statsd-metrics-listener")
+
+ Kamon(Metrics)(system).subscribe(TraceMetrics, "*", statsdMetricsListener, permanently = true)
+ Kamon(Metrics)(system).subscribe(CustomMetric, "*", statsdMetricsListener, permanently = true)
+}
+
+class StatsdMetricsListener(host:String, port:Int, prefix:String) extends Actor with ActorLogging {
+ import java.net.{InetAddress, InetSocketAddress}
+
+ log.info("Starting the Kamon(Statsd) extension")
+
+ val statsdActor = context.actorOf(StatsdMetricsSender.props(prefix, new InetSocketAddress(InetAddress.getByName(host), port)), "StatsdSender")
+
+ def receive = {
+ case tick: TickMetricSnapshot ⇒ statsdActor.forward(tick)
+ }
}
-object MetricsTypes{
- case class Counter(key: String, value: Long = 1, samplingRate: Double = 1.0)
- case class Timing(key: String, millis: Long, samplingRate: Double = 1.0)
- case class Gauge(key: String, value: Long)
-} \ No newline at end of file
+
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdClient.scala b/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdClient.scala
deleted file mode 100644
index ea50ef60..00000000
--- a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdClient.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-package kamon.statsd.client
-
-class StatsdClient {
-
-}
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdMetricsSender.scala
new file mode 100644
index 00000000..8ee08420
--- /dev/null
+++ b/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdMetricsSender.scala
@@ -0,0 +1,77 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd.client
+
+import akka.actor.{ActorLogging, Props, ActorRef, Actor}
+import akka.io.{Udp, IO}
+import java.net.InetSocketAddress
+import akka.util.ByteString
+
+class StatsdMetricsSender(statPrefix:String, remote: InetSocketAddress) extends Actor with ActorLogging {
+ import context.system
+
+ import StatsdMetricsSender._
+ import StatsDProtocol._
+
+ IO(Udp) ! Udp.SimpleSender
+
+ def receive = {
+ case Udp.SimpleSenderReady =>
+ context.become(ready(sender))
+ }
+
+ def ready(send: ActorRef): Receive = {
+ case metric: StatsdMetric =>
+ send ! Udp.Send(toByteString(statPrefix, metric), remote)
+
+ case _ => log.error("Unknown Metric")
+ }
+}
+
+object StatsdMetricsSender {
+ import StatsDProtocol._
+
+ def props(statPrefix:String, remote: InetSocketAddress): Props = Props(new StatsdMetricsSender(statPrefix, remote))
+
+ def toByteString(statPrefix:String, metric:StatsdMetric) : ByteString = metric match {
+ case Counter(key, value, suffix, samplingRate) => statFor(statPrefix, key, value, suffix, samplingRate)
+ case Timing(key, value, suffix, samplingRate) => statFor(statPrefix, key, value, suffix, samplingRate)
+ case Gauge(key, value, suffix, samplingRate) => statFor(statPrefix, key, value, suffix, samplingRate)
+ }
+
+ /*
+ * Creates the stat string to send to statsd.
+ * For counters, it provides something like {@code key:value|c}.
+ * For timing, it provides something like {@code key:millis|ms}.
+ * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate}
+ */
+ private[this] def statFor(statPrefix:String, key: String, value: Long, suffix: String, samplingRate: Double): ByteString = {
+ samplingRate match {
+ case x if x >= 1.0 => ByteString(s"${statPrefix}.${key}:${value}|$suffix")
+ case _ => ByteString(s"${statPrefix}.${key}:${value}|${suffix}|@$samplingRate")
+ }
+ }
+}
+
+
+object StatsDProtocol {
+
+ sealed trait StatsdMetric
+ case class Counter(key: String, value: Long = 1, suffix:String = "c", samplingRate: Double = 1.0) extends StatsdMetric
+ case class Timing(key: String, millis: Long, suffix:String = "ms", samplingRate: Double = 1.0) extends StatsdMetric
+ case class Gauge(key: String, value: Long, suffix:String = "g", samplingRate: Double = 1.0) extends StatsdMetric
+}
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdSender.scala
deleted file mode 100644
index 676cf003..00000000
--- a/kamon-statsd/src/main/scala/kamon/statsd/client/StatsdSender.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.statsd.client
-
-import akka.actor.{Props, ActorRef, Actor}
-import akka.io.{Udp, IO}
-import java.net.InetSocketAddress
-import akka.util.ByteString
-
-class StatsdSender(remote: InetSocketAddress) extends Actor {
- import context.system
-
- IO(Udp) ! Udp.SimpleSender
-
- def receive = {
- case Udp.SimpleSenderReady =>
- context.become(ready(sender))
- }
-
- def ready(send: ActorRef): Receive = {
- case msg: String =>
- send ! Udp.Send(ByteString(msg), remote)
- }
-}
-object StatsdSender {
- def props(remote: InetSocketAddress): Props = Props(new StatsdSender(remote))
-} \ No newline at end of file