aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-04-03 20:48:02 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-04-03 20:48:02 -0300
commit1f7c5967e0e93b129754d68f494665d35031d971 (patch)
treec562b2fb9ca2a06a1a9b9908d5ad8f0537fdf8ee
parentde6a2c9bd6ab6034f59ee51b0eb48beae3d70561 (diff)
downloadKamon-1f7c5967e0e93b129754d68f494665d35031d971.tar.gz
Kamon-1f7c5967e0e93b129754d68f494665d35031d971.tar.bz2
Kamon-1f7c5967e0e93b129754d68f494665d35031d971.zip
= statsd: honor the max-packet-size setting and include newline char on multi-metric packets
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala20
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala12
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala7
3 files changed, 16 insertions, 23 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
index b14e6022..cfc228d3 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
@@ -28,7 +28,6 @@ class StatsDMetricsSender extends Actor with ActorLogging {
val statsDExtension = Kamon(StatsD)
val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port)
- val maxPacketSize = 1024
IO(Udp) ! Udp.SimpleSender
@@ -38,24 +37,23 @@ class StatsDMetricsSender extends Actor with ActorLogging {
}
def ready(udpSender: ActorRef): Receive = {
- case StatsD.MetricBatch(metrics) ⇒ writeDown(metrics, ByteString.empty, udpSender)
+ case StatsD.MetricBatch(metrics) ⇒ sendMetricsToRemote(metrics, ByteString.empty, udpSender)
}
+ @tailrec final def sendMetricsToRemote(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = {
+ def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote)
- def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote)
-
- @tailrec final def writeDown(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = {
- if(metrics.isEmpty)
+ if (metrics.isEmpty)
flushToRemote(buffer, udpSender)
else {
- val headData = metrics.head.toByteString
- if(buffer.size + headData.size > maxPacketSize) {
+ val headMetricData = metrics.head.toByteString(includeTrailingNewline = true)
+
+ if (buffer.size + headMetricData.size > statsDExtension.maxPacketSize) {
flushToRemote(buffer, udpSender)
- writeDown(metrics.tail, headData, udpSender)
+ sendMetricsToRemote(metrics.tail, headMetricData, udpSender)
} else {
- writeDown(metrics.tail, buffer ++ headData, udpSender)
+ sendMetricsToRemote(metrics.tail, buffer ++ headMetricData, udpSender)
}
-
}
}
}
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
index 0ded1394..167e993e 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala
@@ -29,7 +29,6 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = StatsD
override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system)
-
trait MetricKeyGenerator {
def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String
}
@@ -46,8 +45,8 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
* For timing, it provides something like {@code key:millis|ms}.
* If sampling rate is less than 1, it provides something like {@code key:value|type|@rate}
*/
- def toByteString: ByteString =
- if(samplingRate >= 1D)
+ def toByteString(includeTrailingNewline: Boolean = true): ByteString =
+ if (samplingRate >= 1D)
ByteString(s"$key:$value|$suffix")
else
ByteString(s"$key:$value|$suffix|@$samplingRate")
@@ -68,7 +67,6 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
case class MetricBatch(metrics: Iterable[Metric])
}
-
class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
private val statsDConfig = system.settings.config.getConfig("kamon.statsd")
@@ -81,16 +79,15 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval)
val includedActors = statsDConfig.getStringList("includes.actor").asScala
- for(actorPathPattern <- includedActors) {
+ for (actorPathPattern ← includedActors) {
Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true)
}
-
def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
val metricsTranslator = system.actorOf(StatsDMetricTranslator.props, "statsd-metrics-translator")
- if(flushInterval == tickInterval) {
+ if (flushInterval == tickInterval) {
// No need to buffer the metrics, let's go straight to the metrics translator.
metricsTranslator
} else {
@@ -99,7 +96,6 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
}
}
-
class SimpleMetricKeyGenerator(config: Config) extends StatsD.MetricKeyGenerator {
val application = config.getString("kamon.statsd.simple-metric-key-generator.application")
val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
index 2cf672b8..2ef41c6d 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala
@@ -29,7 +29,7 @@ class StatsDMetricTranslator extends Actor {
def receive = {
case TickMetricSnapshot(from, to, metrics) ⇒
val translatedMetrics = metrics.collect {
- case (am @ ActorMetrics(_), snapshot: ActorMetricSnapshot) => transformActorMetric(am, snapshot)
+ case (am @ ActorMetrics(_), snapshot: ActorMetricSnapshot) ⇒ transformActorMetric(am, snapshot)
}
metricSender ! StatsD.MetricBatch(translatedMetrics.flatten)
@@ -42,9 +42,9 @@ class StatsDMetricTranslator extends Actor {
roll(timeInMailboxKey, snapshot.timeInMailbox, StatsD.Timing) ++ roll(processingTimeKey, snapshot.processingTime, StatsD.Timing)
}
- def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Double, Double) => StatsD.Metric): Vector[StatsD.Metric] = {
+ def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Double, Double) ⇒ StatsD.Metric): Vector[StatsD.Metric] = {
val builder = Vector.newBuilder[StatsD.Metric]
- for(measurement <- snapshot.measurements) {
+ for (measurement ← snapshot.measurements) {
val samplingRate = 1D / measurement.count
val scaledValue = Scale.convert(snapshot.scale, Scale.Milli, measurement.value)
builder += metricBuilder.apply(key, scaledValue, samplingRate)
@@ -53,7 +53,6 @@ class StatsDMetricTranslator extends Actor {
builder.result()
}
-
}
object StatsDMetricTranslator {