diff options
-rw-r--r-- | kamon-playground/src/main/resources/application.conf | 21 | ||||
-rw-r--r-- | kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala | 1 | ||||
-rw-r--r-- | kamon-statsd/src/main/resources/reference.conf | 20 | ||||
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala | 65 | ||||
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala | 110 | ||||
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala | 61 | ||||
-rw-r--r-- | project/Projects.scala | 11 | ||||
-rw-r--r-- | site/src/main/jekyll/statsd/index.md | 24 |
8 files changed, 307 insertions, 6 deletions
diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf index 97785275..aaafd836 100644 --- a/kamon-playground/src/main/resources/application.conf +++ b/kamon-playground/src/main/resources/application.conf @@ -1,8 +1,8 @@ akka { loggers = [ "akka.event.slf4j.Slf4jLogger" ] - loglevel = DEBUG + loglevel = INFO - extensions = ["kamon.newrelic.NewRelic"] + extensions = ["kamon.newrelic.NewRelic", "kamon.statsd.StatsD"] actor { debug { @@ -54,6 +54,23 @@ kamon { } } ] + + precision { + actor { + processing-time { + highest-trackable-value = 3600000000000 + significant-value-digits = 1 + } + time-in-mailbox { + highest-trackable-value = 3600000000000 + significant-value-digits = 1 + } + mailbox-size { + highest-trackable-value = 999999999 + significant-value-digits = 1 + } + } + } } } diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 9ff22c12..05859ee5 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -88,7 +88,6 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil path("ok") { traceName("OK") { complete { - println("Defined: " + requestCountRecorder) requestCountRecorder.map(_.record(1)) "ok" } diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf new file mode 100644 index 00000000..da103338 --- /dev/null +++ b/kamon-statsd/src/main/resources/reference.conf @@ -0,0 +1,20 @@ +# ==================================== # +# Kamon-StatsD Reference Configuration # +# ==================================== # + +kamon { + statsd { + hostname = "10.254.169.44" + port = 8125 + flush-interval = 1 second + max-packet-size = 1024 + + includes { + actor = [ "*" ] + } + + simple-metric-key-generator { + application = "Kamon" + } + } +}
\ No newline at end of file diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala new file mode 100644 index 00000000..b14e6022 --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -0,0 +1,65 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.{ ActorLogging, Props, ActorRef, Actor } +import akka.io.{ Udp, IO } +import java.net.InetSocketAddress +import akka.util.ByteString +import kamon.Kamon +import scala.annotation.tailrec + +class StatsDMetricsSender extends Actor with ActorLogging { + import context.system + + val statsDExtension = Kamon(StatsD) + val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port) + val maxPacketSize = 1024 + + IO(Udp) ! Udp.SimpleSender + + def receive = { + case Udp.SimpleSenderReady ⇒ + context.become(ready(sender)) + } + + def ready(udpSender: ActorRef): Receive = { + case StatsD.MetricBatch(metrics) ⇒ writeDown(metrics, ByteString.empty, udpSender) + } + + + def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote) + + @tailrec final def writeDown(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = { + if(metrics.isEmpty) + flushToRemote(buffer, udpSender) + else { + val headData = metrics.head.toByteString + if(buffer.size + headData.size > maxPacketSize) { + flushToRemote(buffer, udpSender) + writeDown(metrics.tail, headData, udpSender) + } else { + writeDown(metrics.tail, buffer ++ headData, udpSender) + } + + } + } +} + +object StatsDMetricsSender { + def props: Props = Props[StatsDMetricsSender] +}
\ No newline at end of file diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala new file mode 100644 index 00000000..0ded1394 --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala @@ -0,0 +1,110 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor._ +import kamon.Kamon +import kamon.metrics._ +import scala.concurrent.duration._ +import scala.collection.JavaConverters._ +import akka.util.ByteString +import com.typesafe.config.Config +import java.lang.management.ManagementFactory + +object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = StatsD + override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system) + + + trait MetricKeyGenerator { + def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String + } + + sealed trait Metric { + def key: String + def value: Double + def suffix: String + def samplingRate: Double + + /* + * Creates the stats string to send to StatsD. + * For counters, it provides something like {@code key:value|c}. + * For timing, it provides something like {@code key:millis|ms}. + * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate} + */ + def toByteString: ByteString = + if(samplingRate >= 1D) + ByteString(s"$key:$value|$suffix") + else + ByteString(s"$key:$value|$suffix|@$samplingRate") + } + + case class Counter(key: String, value: Double = 1D, samplingRate: Double = 1.0) extends Metric { + val suffix: String = "c" + } + + case class Timing(key: String, value: Double, samplingRate: Double = 1.0) extends Metric { + val suffix: String = "ms" + } + + case class Gauge(key: String, value: Double, samplingRate: Double = 1.0) extends Metric { + val suffix: String = "g" + } + + case class MetricBatch(metrics: Iterable[Metric]) +} + + +class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { + private val statsDConfig = system.settings.config.getConfig("kamon.statsd") + + val hostname = statsDConfig.getString("hostname") + val port = statsDConfig.getInt("port") + val flushInterval = statsDConfig.getMilliseconds("flush-interval") + val maxPacketSize = statsDConfig.getInt("max-packet-size") + val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval") + + val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) + + val includedActors = statsDConfig.getStringList("includes.actor").asScala + for(actorPathPattern <- includedActors) { + Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) + } + + + def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { + assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") + + val metricsTranslator = system.actorOf(StatsDMetricTranslator.props, "statsd-metrics-translator") + if(flushInterval == tickInterval) { + // No need to buffer the metrics, let's go straight to the metrics translator. + metricsTranslator + } else { + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer") + } + } +} + + +class SimpleMetricKeyGenerator(config: Config) extends StatsD.MetricKeyGenerator { + val application = config.getString("kamon.statsd.simple-metric-key-generator.application") + val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) + + def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = + application + "." + localhostName + "." + groupIdentity.category.name + "." + groupIdentity.name + "." + metricIdentity.name +} + diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala new file mode 100644 index 00000000..2cf672b8 --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala @@ -0,0 +1,61 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package kamon.statsd + +import akka.actor.{ Props, Actor, ActorRef } +import kamon.metrics._ +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.ActorMetrics.ActorMetricSnapshot + +class StatsDMetricTranslator extends Actor { + val config = context.system.settings.config + + val metricKeyGenerator = new SimpleMetricKeyGenerator(config) + val metricSender = context.actorOf(StatsDMetricsSender.props, "metrics-sender") + + def receive = { + case TickMetricSnapshot(from, to, metrics) ⇒ + val translatedMetrics = metrics.collect { + case (am @ ActorMetrics(_), snapshot: ActorMetricSnapshot) => transformActorMetric(am, snapshot) + } + + metricSender ! StatsD.MetricBatch(translatedMetrics.flatten) + } + + def transformActorMetric(actorIdentity: ActorMetrics, snapshot: ActorMetricSnapshot): Vector[StatsD.Metric] = { + val timeInMailboxKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.TimeInMailbox) + val processingTimeKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.ProcessingTime) + + roll(timeInMailboxKey, snapshot.timeInMailbox, StatsD.Timing) ++ roll(processingTimeKey, snapshot.processingTime, StatsD.Timing) + } + + def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Double, Double) => StatsD.Metric): Vector[StatsD.Metric] = { + val builder = Vector.newBuilder[StatsD.Metric] + for(measurement <- snapshot.measurements) { + val samplingRate = 1D / measurement.count + val scaledValue = Scale.convert(snapshot.scale, Scale.Milli, measurement.value) + builder += metricBuilder.apply(key, scaledValue, samplingRate) + } + + builder.result() + } + + +} + +object StatsDMetricTranslator { + def props: Props = Props[StatsDMetricTranslator] +} diff --git a/project/Projects.scala b/project/Projects.scala index ad04e60a..f3f7f17d 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -8,7 +8,7 @@ object Projects extends Build { import Dependencies._ lazy val root = Project("root", file(".")) - .aggregate(kamonCore, kamonSpray, kamonNewrelic, kamonPlayground, kamonDashboard, kamonTestkit, kamonPlay, site) + .aggregate(kamonCore, kamonSpray, kamonNewrelic, kamonPlayground, kamonDashboard, kamonTestkit, kamonPlay, kamonStatsd, site) .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(noPublishing: _*) @@ -54,7 +54,7 @@ object Projects extends Build { .settings( libraryDependencies ++= compile(akkaActor, akkaSlf4j, sprayCan, sprayClient, sprayRouting, logback)) - .dependsOn(kamonSpray, kamonNewrelic) + .dependsOn(kamonSpray, kamonNewrelic, kamonStatsd) lazy val kamonDashboard = Project("kamon-dashboard", file("kamon-dashboard")) @@ -63,13 +63,13 @@ object Projects extends Build { .settings(libraryDependencies ++= compile(akkaActor, akkaSlf4j, sprayRouting, sprayCan, sprayJson)) .dependsOn(kamonCore) + lazy val kamonTestkit = Project("kamon-testkit", file("kamon-testkit")) .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(libraryDependencies ++= compile(akkaActor, akkaTestKit) ++ test(slf4Api, slf4nop)) .dependsOn(kamonCore) - lazy val kamonPlay = Project("kamon-play", file("kamon-play")) .settings(basicSettings: _*) .settings(formatSettings: _*) @@ -77,6 +77,11 @@ object Projects extends Build { .settings(libraryDependencies ++= compile(playTest, aspectJ) ++ test(playTest, slf4Api)) .dependsOn(kamonCore) + lazy val kamonStatsd = Project("kamon-statsd", file("kamon-statsd")) + .settings(basicSettings: _*) + .settings(formatSettings: _*) + .settings(libraryDependencies ++= compile(akkaActor) ++ test(scalatest, akkaTestKit, slf4Api, slf4nop)) + .dependsOn(kamonCore) lazy val site = Project("site", file("site")) .settings(basicSettings: _*) diff --git a/site/src/main/jekyll/statsd/index.md b/site/src/main/jekyll/statsd/index.md new file mode 100644 index 00000000..5a9cf0b2 --- /dev/null +++ b/site/src/main/jekyll/statsd/index.md @@ -0,0 +1,24 @@ +--- +title: Kamon | StatsD | Documentation +layout: default +--- + +What is StatsD? +======= + +StatsD is a simple network daemon that continuously receives metrics pushed over UDP and periodically sends aggregate metrics to upstream services +like Graphite. Because it uses UDP, clients (for example, web applications) can ship metrics to it very fast with little to no overhead. +This means that a user can capture multiple metrics for every request to a web application, even at a rate of thousands of requests per second. +Request-level metrics are aggregated over a flush interval (default 10 seconds) and pushed to an upstream metrics service. + +Getting Started with StatsD +---------- +If you are not familiar with StatsD, we recommend reading bla bla bla. +To get started running StatsD in your environment, follow the installation instructions in bla bla bla. + +Installing Graphite +---------- + +Graphite dashboards(grafana) +---------- + |