aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-04-03 18:41:39 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-04-03 18:41:39 -0300
commit1d267614d6718e61d6791f293a6451e378181935 (patch)
tree54db0e3aaea8c7766c1a1b18e7b855500d3271eb
parent0819320d7f20c78ad096c21c6aedb3536758792b (diff)
downloadKamon-1d267614d6718e61d6791f293a6451e378181935.tar.gz
Kamon-1d267614d6718e61d6791f293a6451e378181935.tar.bz2
Kamon-1d267614d6718e61d6791f293a6451e378181935.zip
+ statsd: first working implementation with processing-time and time-in-mailbox metrics for actors
-rw-r--r--kamon-playground/src/main/resources/application.conf21
-rw-r--r--kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala1
-rw-r--r--kamon-statsd/src/main/resources/reference.conf11
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala (renamed from kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala)36
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala39
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala22
-rw-r--r--project/Projects.scala2
7 files changed, 96 insertions, 36 deletions
diff --git a/kamon-playground/src/main/resources/application.conf b/kamon-playground/src/main/resources/application.conf
index 97785275..aaafd836 100644
--- a/kamon-playground/src/main/resources/application.conf
+++ b/kamon-playground/src/main/resources/application.conf
@@ -1,8 +1,8 @@
akka {
loggers = [ "akka.event.slf4j.Slf4jLogger" ]
- loglevel = DEBUG
+ loglevel = INFO
- extensions = ["kamon.newrelic.NewRelic"]
+ extensions = ["kamon.newrelic.NewRelic", "kamon.statsd.StatsD"]
actor {
debug {
@@ -54,6 +54,23 @@ kamon {
}
}
]
+
+ precision {
+ actor {
+ processing-time {
+ highest-trackable-value = 3600000000000
+ significant-value-digits = 1
+ }
+ time-in-mailbox {
+ highest-trackable-value = 3600000000000
+ significant-value-digits = 1
+ }
+ mailbox-size {
+ highest-trackable-value = 999999999
+ significant-value-digits = 1
+ }
+ }
+ }
}
}
diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
index 9ff22c12..05859ee5 100644
--- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
+++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala
@@ -88,7 +88,6 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil
path("ok") {
traceName("OK") {
complete {
- println("Defined: " + requestCountRecorder)
requestCountRecorder.map(_.record(1))
"ok"
}
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf
index 64f84a82..da103338 100644
--- a/kamon-statsd/src/main/resources/reference.conf
+++ b/kamon-statsd/src/main/resources/reference.conf
@@ -4,16 +4,17 @@
kamon {
statsd {
- prefix = kamon-app
- hostname = statsd.example.com
+ hostname = "10.254.169.44"
port = 8125
-
flush-interval = 1 second
- buffer-size = 512
+ max-packet-size = 1024
includes {
actor = [ "*" ]
- trace = [ "*" ]
+ }
+
+ simple-metric-key-generator {
+ application = "Kamon"
}
}
} \ No newline at end of file
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index cff7a4a1..b14e6022 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -20,10 +20,16 @@ import akka.actor.{ ActorLogging, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
+import kamon.Kamon
+import scala.annotation.tailrec
-class StatsdMetricsSender(statPrefix: String, remote: InetSocketAddress) extends Actor with ActorLogging {
+class StatsDMetricsSender extends Actor with ActorLogging {
import context.system
+ val statsDExtension = Kamon(StatsD)
+ val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port)
+ val maxPacketSize = 1024
+
IO(Udp) ! Udp.SimpleSender
def receive = {
@@ -31,15 +37,29 @@ class StatsdMetricsSender(statPrefix: String, remote: InetSocketAddress) extends
context.become(ready(sender))
}
- def ready(send: ActorRef): Receive = {
- // TODO: batch writes
- case metric: StatsD.Metric ⇒
- send ! Udp.Send(metric.toByteString, remote)
+ def ready(udpSender: ActorRef): Receive = {
+ case StatsD.MetricBatch(metrics) ⇒ writeDown(metrics, ByteString.empty, udpSender)
+ }
+
+
+ def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote)
+
+ @tailrec final def writeDown(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = {
+ if(metrics.isEmpty)
+ flushToRemote(buffer, udpSender)
+ else {
+ val headData = metrics.head.toByteString
+ if(buffer.size + headData.size > maxPacketSize) {
+ flushToRemote(buffer, udpSender)
+ writeDown(metrics.tail, headData, udpSender)
+ } else {
+ writeDown(metrics.tail, buffer ++ headData, udpSender)
+ }
- case _ ⇒ log.error("Unknown Metric")
+ }
}
}
-object StatsdMetricsSender {
- def props(statPrefix: String, remote: InetSocketAddress): Props = Props(new StatsdMetricsSender(statPrefix, remote))
+object StatsDMetricsSender {
+ def props: Props = Props[StatsDMetricsSender]
} \ No newline at end of file
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
index a7cba371..0ded1394 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
@@ -22,15 +22,21 @@ import kamon.metrics._
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import akka.util.ByteString
+import com.typesafe.config.Config
+import java.lang.management.ManagementFactory
object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = StatsD
override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system)
+ trait MetricKeyGenerator {
+ def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String
+ }
+
sealed trait Metric {
def key: String
- def value: Long
+ def value: Double
def suffix: String
def samplingRate: Double
@@ -47,34 +53,34 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
ByteString(s"$key:$value|$suffix|@$samplingRate")
}
- case class Counter(key: String, value: Long = 1, samplingRate: Double = 1.0) extends Metric {
+ case class Counter(key: String, value: Double = 1D, samplingRate: Double = 1.0) extends Metric {
val suffix: String = "c"
}
- case class Timing(key: String, value: Long, samplingRate: Double = 1.0) extends Metric {
+ case class Timing(key: String, value: Double, samplingRate: Double = 1.0) extends Metric {
val suffix: String = "ms"
}
- case class Gauge(key: String, value: Long, samplingRate: Double = 1.0) extends Metric {
+ case class Gauge(key: String, value: Double, samplingRate: Double = 1.0) extends Metric {
val suffix: String = "g"
}
- case class MetricBatch(metrics: Vector[Metric])
+ case class MetricBatch(metrics: Iterable[Metric])
}
-class StatsDExtension(private val system: ExtendedActorSystem) extends Kamon.Extension {
- private val config = system.settings.config.getConfig("kamon.statsd")
+class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ private val statsDConfig = system.settings.config.getConfig("kamon.statsd")
- val hostname = config.getString("hostname")
- val port = config.getInt("port")
- val prefix = config.getString("prefix")
- val flushInterval = config.getMilliseconds("flush-interval")
+ val hostname = statsDConfig.getString("hostname")
+ val port = statsDConfig.getInt("port")
+ val flushInterval = statsDConfig.getMilliseconds("flush-interval")
+ val maxPacketSize = statsDConfig.getInt("max-packet-size")
val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval")
val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval)
- val includedActors = config.getStringList("includes.actor").asScala
+ val includedActors = statsDConfig.getStringList("includes.actor").asScala
for(actorPathPattern <- includedActors) {
Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true)
}
@@ -93,3 +99,12 @@ class StatsDExtension(private val system: ExtendedActorSystem) extends Kamon.Ext
}
}
+
+class SimpleMetricKeyGenerator(config: Config) extends StatsD.MetricKeyGenerator {
+ val application = config.getString("kamon.statsd.simple-metric-key-generator.application")
+ val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
+
+ def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
+ application + "." + localhostName + "." + groupIdentity.category.name + "." + groupIdentity.name + "." + metricIdentity.name
+}
+
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
index 6a4c8d56..2cf672b8 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
@@ -21,25 +21,33 @@ import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics.ActorMetrics.ActorMetricSnapshot
class StatsDMetricTranslator extends Actor {
- //val metricsSender =
+ val config = context.system.settings.config
+ val metricKeyGenerator = new SimpleMetricKeyGenerator(config)
+ val metricSender = context.actorOf(StatsDMetricsSender.props, "metrics-sender")
def receive = {
case TickMetricSnapshot(from, to, metrics) ⇒
+ val translatedMetrics = metrics.collect {
+ case (am @ ActorMetrics(_), snapshot: ActorMetricSnapshot) => transformActorMetric(am, snapshot)
+ }
-
+ metricSender ! StatsD.MetricBatch(translatedMetrics.flatten)
}
def transformActorMetric(actorIdentity: ActorMetrics, snapshot: ActorMetricSnapshot): Vector[StatsD.Metric] = {
- // TODO: Define metrics namespacing.
- roll(actorIdentity.name, snapshot.timeInMailbox, StatsD.Timing)
+ val timeInMailboxKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.TimeInMailbox)
+ val processingTimeKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.ProcessingTime)
+
+ roll(timeInMailboxKey, snapshot.timeInMailbox, StatsD.Timing) ++ roll(processingTimeKey, snapshot.processingTime, StatsD.Timing)
}
- def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Long, Double) => StatsD.Metric): Vector[StatsD.Metric] = {
+ def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Double, Double) => StatsD.Metric): Vector[StatsD.Metric] = {
val builder = Vector.newBuilder[StatsD.Metric]
for(measurement <- snapshot.measurements) {
val samplingRate = 1D / measurement.count
- builder += metricBuilder.apply(key, measurement.value, samplingRate)
+ val scaledValue = Scale.convert(snapshot.scale, Scale.Milli, measurement.value)
+ builder += metricBuilder.apply(key, scaledValue, samplingRate)
}
builder.result()
@@ -49,5 +57,5 @@ class StatsDMetricTranslator extends Actor {
}
object StatsDMetricTranslator {
- def props: Props = Props(new StatsDMetricTranslator)
+ def props: Props = Props[StatsDMetricTranslator]
}
diff --git a/project/Projects.scala b/project/Projects.scala
index 18dfe03c..f3f7f17d 100644
--- a/project/Projects.scala
+++ b/project/Projects.scala
@@ -54,7 +54,7 @@ object Projects extends Build {
.settings(
libraryDependencies ++=
compile(akkaActor, akkaSlf4j, sprayCan, sprayClient, sprayRouting, logback))
- .dependsOn(kamonSpray, kamonNewrelic)
+ .dependsOn(kamonSpray, kamonNewrelic, kamonStatsd)
lazy val kamonDashboard = Project("kamon-dashboard", file("kamon-dashboard"))