From 1d267614d6718e61d6791f293a6451e378181935 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Thu, 3 Apr 2014 18:41:39 -0300 Subject: + statsd: first working implementation with processing-time and time-in-mailbox metrics for actors --- .../scala/kamon/statsd/StatsDMetricsSender.scala | 65 ++++++++++++++++++++++ .../src/main/scala/kamon/statsd/Statsd.scala | 39 +++++++++---- .../kamon/statsd/StatsdMetricTranslator.scala | 22 +++++--- .../scala/kamon/statsd/StatsdMetricsSender.scala | 45 --------------- 4 files changed, 107 insertions(+), 64 deletions(-) create mode 100644 kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala delete mode 100644 kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala (limited to 'kamon-statsd/src/main/scala') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala new file mode 100644 index 00000000..b14e6022 --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -0,0 +1,65 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.{ ActorLogging, Props, ActorRef, Actor } +import akka.io.{ Udp, IO } +import java.net.InetSocketAddress +import akka.util.ByteString +import kamon.Kamon +import scala.annotation.tailrec + +class StatsDMetricsSender extends Actor with ActorLogging { + import context.system + + val statsDExtension = Kamon(StatsD) + val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port) + val maxPacketSize = 1024 + + IO(Udp) ! Udp.SimpleSender + + def receive = { + case Udp.SimpleSenderReady ⇒ + context.become(ready(sender)) + } + + def ready(udpSender: ActorRef): Receive = { + case StatsD.MetricBatch(metrics) ⇒ writeDown(metrics, ByteString.empty, udpSender) + } + + + def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote) + + @tailrec final def writeDown(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = { + if(metrics.isEmpty) + flushToRemote(buffer, udpSender) + else { + val headData = metrics.head.toByteString + if(buffer.size + headData.size > maxPacketSize) { + flushToRemote(buffer, udpSender) + writeDown(metrics.tail, headData, udpSender) + } else { + writeDown(metrics.tail, buffer ++ headData, udpSender) + } + + } + } +} + +object StatsDMetricsSender { + def props: Props = Props[StatsDMetricsSender] +} \ No newline at end of file diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala index a7cba371..0ded1394 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala @@ -22,15 +22,21 @@ import kamon.metrics._ import scala.concurrent.duration._ import scala.collection.JavaConverters._ import akka.util.ByteString +import com.typesafe.config.Config +import java.lang.management.ManagementFactory object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = StatsD override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system) + trait MetricKeyGenerator { + def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String + } + sealed trait Metric { def key: String - def value: Long + def value: Double def suffix: String def samplingRate: Double @@ -47,34 +53,34 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { ByteString(s"$key:$value|$suffix|@$samplingRate") } - case class Counter(key: String, value: Long = 1, samplingRate: Double = 1.0) extends Metric { + case class Counter(key: String, value: Double = 1D, samplingRate: Double = 1.0) extends Metric { val suffix: String = "c" } - case class Timing(key: String, value: Long, samplingRate: Double = 1.0) extends Metric { + case class Timing(key: String, value: Double, samplingRate: Double = 1.0) extends Metric { val suffix: String = "ms" } - case class Gauge(key: String, value: Long, samplingRate: Double = 1.0) extends Metric { + case class Gauge(key: String, value: Double, samplingRate: Double = 1.0) extends Metric { val suffix: String = "g" } - case class MetricBatch(metrics: Vector[Metric]) + case class MetricBatch(metrics: Iterable[Metric]) } -class StatsDExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { - private val config = system.settings.config.getConfig("kamon.statsd") +class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { + private val statsDConfig = system.settings.config.getConfig("kamon.statsd") - val hostname = config.getString("hostname") - val port = config.getInt("port") - val prefix = config.getString("prefix") - val flushInterval = config.getMilliseconds("flush-interval") + val hostname = statsDConfig.getString("hostname") + val port = statsDConfig.getInt("port") + val flushInterval = statsDConfig.getMilliseconds("flush-interval") + val maxPacketSize = statsDConfig.getInt("max-packet-size") val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval") val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) - val includedActors = config.getStringList("includes.actor").asScala + val includedActors = statsDConfig.getStringList("includes.actor").asScala for(actorPathPattern <- includedActors) { Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) } @@ -93,3 +99,12 @@ class StatsDExtension(private val system: ExtendedActorSystem) extends Kamon.Ext } } + +class SimpleMetricKeyGenerator(config: Config) extends StatsD.MetricKeyGenerator { + val application = config.getString("kamon.statsd.simple-metric-key-generator.application") + val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) + + def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = + application + "." + localhostName + "." + groupIdentity.category.name + "." + groupIdentity.name + "." + metricIdentity.name +} + diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala index 6a4c8d56..2cf672b8 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala @@ -21,25 +21,33 @@ import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics.ActorMetrics.ActorMetricSnapshot class StatsDMetricTranslator extends Actor { - //val metricsSender = + val config = context.system.settings.config + val metricKeyGenerator = new SimpleMetricKeyGenerator(config) + val metricSender = context.actorOf(StatsDMetricsSender.props, "metrics-sender") def receive = { case TickMetricSnapshot(from, to, metrics) ⇒ + val translatedMetrics = metrics.collect { + case (am @ ActorMetrics(_), snapshot: ActorMetricSnapshot) => transformActorMetric(am, snapshot) + } - + metricSender ! StatsD.MetricBatch(translatedMetrics.flatten) } def transformActorMetric(actorIdentity: ActorMetrics, snapshot: ActorMetricSnapshot): Vector[StatsD.Metric] = { - // TODO: Define metrics namespacing. - roll(actorIdentity.name, snapshot.timeInMailbox, StatsD.Timing) + val timeInMailboxKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.TimeInMailbox) + val processingTimeKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.ProcessingTime) + + roll(timeInMailboxKey, snapshot.timeInMailbox, StatsD.Timing) ++ roll(processingTimeKey, snapshot.processingTime, StatsD.Timing) } - def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Long, Double) => StatsD.Metric): Vector[StatsD.Metric] = { + def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Double, Double) => StatsD.Metric): Vector[StatsD.Metric] = { val builder = Vector.newBuilder[StatsD.Metric] for(measurement <- snapshot.measurements) { val samplingRate = 1D / measurement.count - builder += metricBuilder.apply(key, measurement.value, samplingRate) + val scaledValue = Scale.convert(snapshot.scale, Scale.Milli, measurement.value) + builder += metricBuilder.apply(key, scaledValue, samplingRate) } builder.result() @@ -49,5 +57,5 @@ class StatsDMetricTranslator extends Actor { } object StatsDMetricTranslator { - def props: Props = Props(new StatsDMetricTranslator) + def props: Props = Props[StatsDMetricTranslator] } diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala deleted file mode 100644 index cff7a4a1..00000000 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.statsd - -import akka.actor.{ ActorLogging, Props, ActorRef, Actor } -import akka.io.{ Udp, IO } -import java.net.InetSocketAddress -import akka.util.ByteString - -class StatsdMetricsSender(statPrefix: String, remote: InetSocketAddress) extends Actor with ActorLogging { - import context.system - - IO(Udp) ! Udp.SimpleSender - - def receive = { - case Udp.SimpleSenderReady ⇒ - context.become(ready(sender)) - } - - def ready(send: ActorRef): Receive = { - // TODO: batch writes - case metric: StatsD.Metric ⇒ - send ! Udp.Send(metric.toByteString, remote) - - case _ ⇒ log.error("Unknown Metric") - } -} - -object StatsdMetricsSender { - def props(statPrefix: String, remote: InetSocketAddress): Props = Props(new StatsdMetricsSender(statPrefix, remote)) -} \ No newline at end of file -- cgit v1.2.3