aboutsummaryrefslogtreecommitdiff
path: root/kamon-statsd/src/main/scala
diff options
context:
space:
mode:
authorEugene Platonov <jozic@live.com>2015-09-10 09:10:49 -0400
committerEugene Platonov <jozic@live.com>2015-10-28 13:52:44 -0400
commit4bcddee80e277e08ed4afe52a9cf118fcaff1feb (patch)
tree8179fb1cac0a1269848e98a78cb6da112a205957 /kamon-statsd/src/main/scala
parent546f460d9a682e27d1ad97de1dae1ce3a681c0f6 (diff)
downloadKamon-4bcddee80e277e08ed4afe52a9cf118fcaff1feb.tar.gz
Kamon-4bcddee80e277e08ed4afe52a9cf118fcaff1feb.tar.bz2
Kamon-4bcddee80e277e08ed4afe52a9cf118fcaff1feb.zip
! statsd: allow custom statsd senders + add simple statsd sender which doesn't batch stats
Diffstat (limited to 'kamon-statsd/src/main/scala')
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala (renamed from kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala)79
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala16
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala62
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala18
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala24
-rw-r--r--kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala76
6 files changed, 211 insertions, 64 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala
index 70ff1b45..be9a572a 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -16,41 +16,34 @@
package kamon.statsd
-import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
-import akka.io.{ Udp, IO }
-import java.net.InetSocketAddress
-import akka.util.ByteString
+import akka.actor.Props
+import com.typesafe.config.Config
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
-import java.text.{ DecimalFormatSymbols, DecimalFormat }
-import java.util.Locale
-
import kamon.metric.instrument.{ Counter, Histogram }
-class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBytes: Long, metricKeyGenerator: MetricKeyGenerator)
- extends Actor with UdpExtensionProvider {
- import context.system
-
- val symbols = DecimalFormatSymbols.getInstance(Locale.US)
- symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.
-
- // Absurdly high number of decimal digits, let the other end lose precision if it needs to.
- val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)
-
- udpExtension ! Udp.SimpleSender
-
- def newSocketAddress = new InetSocketAddress(statsDHost, statsDPort)
+/**
+ * Factory for [[BatchStatsDMetricsSender]].
+ * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender"
+ * to select [[BatchStatsDMetricsSender]] as your sender
+ */
+object BatchStatsDMetricsSender extends StatsDMetricsSenderFactory {
+ override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props =
+ Props(new BatchStatsDMetricsSender(statsDConfig, metricKeyGenerator))
+}
- def receive = {
- case Udp.SimpleSenderReady ⇒
- context.become(ready(sender))
- }
+/**
+ * StatsD sender which sends a UDP packet every "kamon.statsd.flush-interval" or
+ * as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached.
+ * @param statsDConfig Config to read settings specific to this sender
+ * @param metricKeyGenerator Key generator for all metrics sent by this sender
+ */
+class BatchStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
+ extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) {
- def ready(udpSender: ActorRef): Receive = {
- case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender)
- }
+ val maxPacketSizeInBytes = statsDConfig.getBytes("batch-metric-sender.max-packet-size")
- def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
- val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, newSocketAddress)
+ def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = {
+ val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, flushToUDP)
for (
(entity, snapshot) ← tick.metrics;
@@ -72,25 +65,9 @@ class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBy
packetBuilder.flush()
}
-
- def encodeStatsDTimer(level: Long, count: Long): String = {
- val samplingRate: Double = 1D / count
- level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
- }
-
- def encodeStatsDCounter(count: Long): String = count.toString + "|c"
}
-object StatsDMetricsSender {
- def props(statsDHost: String, statsDPort: Int, maxPacketSize: Long, metricKeyGenerator: MetricKeyGenerator): Props =
- Props(new StatsDMetricsSender(statsDHost, statsDPort, maxPacketSize, metricKeyGenerator))
-}
-
-trait UdpExtensionProvider {
- def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
-}
-
-class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) {
+class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, flushToUDP: String ⇒ Unit) {
val metricSeparator = "\n"
val measurementSeparator = ":"
@@ -103,8 +80,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r
if (fitsOnBuffer(dataWithoutKey))
buffer.append(dataWithoutKey)
else {
- flushToUDP(buffer.toString())
- buffer.clear()
+ flush()
buffer.append(key).append(dataWithoutKey)
}
} else {
@@ -114,8 +90,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r
val mSeparator = if (buffer.length > 0) metricSeparator else ""
buffer.append(mSeparator).append(dataWithoutSeparator)
} else {
- flushToUDP(buffer.toString())
- buffer.clear()
+ flush()
buffer.append(dataWithoutSeparator)
}
}
@@ -123,8 +98,6 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r
def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes
- private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote)
-
def flush(): Unit = {
flushToUDP(buffer.toString)
buffer.clear()
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala
index 97a27ff3..c4d8682a 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala
@@ -1,3 +1,19 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon.statsd
import java.lang.management.ManagementFactory
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala
new file mode 100644
index 00000000..47ce66cd
--- /dev/null
+++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala
@@ -0,0 +1,62 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import akka.actor.Props
+import com.typesafe.config.Config
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric.instrument.{ Counter, Histogram }
+
+/**
+ * Factory for [[SimpleStatsDMetricsSender]].
+ * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender"
+ * to select [[SimpleStatsDMetricsSender]] as your sender
+ */
+object SimpleStatsDMetricsSender extends StatsDMetricsSenderFactory {
+ override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props =
+ Props(new SimpleStatsDMetricsSender(statsDConfig, metricKeyGenerator))
+}
+
+/**
+ * "Traditional" StatsD sender which sends a UDP packet for each piece of data it receives.
+ * @param statsDConfig Config to read settings specific to this sender
+ * @param metricKeyGenerator Key generator for all metrics sent by this sender
+ */
+class SimpleStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
+ extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) {
+
+ def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = {
+
+ for (
+ (entity, snapshot) ← tick.metrics;
+ (metricKey, metricSnapshot) ← snapshot.metrics
+ ) {
+
+ val keyPrefix = metricKeyGenerator.generateKey(entity, metricKey) + ":"
+
+ metricSnapshot match {
+ case hs: Histogram.Snapshot ⇒
+ hs.recordsIterator.foreach { record ⇒
+ flushToUDP(keyPrefix + encodeStatsDTimer(record.level, record.count))
+ }
+
+ case cs: Counter.Snapshot ⇒
+ flushToUDP(keyPrefix + encodeStatsDCounter(cs.count))
+ }
+ }
+ }
+}
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
index 91d05510..a1f7dca3 100644
--- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -23,8 +23,6 @@ import kamon.util.ConfigTools.Syntax
import scala.concurrent.duration._
import com.typesafe.config.Config
import akka.event.Logging
-import java.net.InetSocketAddress
-import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.collection.JavaConverters._
object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
@@ -44,10 +42,10 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val tickInterval = metricsExtension.settings.tickInterval
val flushInterval = statsDConfig.getFiniteDuration("flush-interval")
- val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size")
val keyGeneratorFQCN = statsDConfig.getString("metric-key-generator")
+ val senderFactoryFQCN = statsDConfig.getString("metric-sender-factory")
- val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config)
+ val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, senderFactoryFQCN, config)
val subscriptions = statsDConfig.getConfig("subscriptions")
subscriptions.firstLevelKeys.map { subscriptionCategory ⇒
@@ -56,15 +54,13 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
}
}
- def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, keyGeneratorFQCN: String, config: Config): ActorRef = {
+ def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration,
+ keyGeneratorFQCN: String, senderFactoryFQCN: String, config: Config): ActorRef = {
assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
val keyGenerator = system.dynamicAccess.createInstanceFor[MetricKeyGenerator](keyGeneratorFQCN, (classOf[Config], config) :: Nil).get
+ val senderFactory = system.dynamicAccess.getObjectFor[StatsDMetricsSenderFactory](senderFactoryFQCN).get
- val metricsSender = system.actorOf(StatsDMetricsSender.props(
- statsDConfig.getString("hostname"),
- statsDConfig.getInt("port"),
- maxPacketSizeInBytes,
- keyGenerator), "statsd-metrics-sender")
+ val metricsSender = system.actorOf(senderFactory.props(statsDConfig, keyGenerator), "statsd-metrics-sender")
if (flushInterval == tickInterval) {
// No need to buffer the metrics, let's go straight to the metrics sender.
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala
new file mode 100644
index 00000000..1f603aea
--- /dev/null
+++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala
@@ -0,0 +1,24 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import akka.actor.Props
+import com.typesafe.config.Config
+
+trait StatsDMetricsSenderFactory {
+ def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props
+}
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala
new file mode 100644
index 00000000..9e856eda
--- /dev/null
+++ b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala
@@ -0,0 +1,76 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.statsd
+
+import java.net.InetSocketAddress
+import java.text.{ DecimalFormat, DecimalFormatSymbols }
+import java.util.Locale
+import akka.actor.{ Actor, ActorRef, ActorSystem }
+import akka.io.{ IO, Udp }
+import akka.util.ByteString
+import com.typesafe.config.Config
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+
+/**
+ * Base class for different StatsD senders utilizing UDP protocol. It implies use of one statsd server.
+ * @param statsDConfig Config to read settings specific to this sender
+ * @param metricKeyGenerator Key generator for all metrics sent by this sender
+ */
+abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
+ extends Actor with UdpExtensionProvider {
+
+ import context.system
+
+ val statsDHost = statsDConfig.getString("hostname")
+ val statsDPort = statsDConfig.getInt("port")
+
+ val symbols = DecimalFormatSymbols.getInstance(Locale.US)
+ symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.
+
+ // Absurdly high number of decimal digits, let the other end lose precision if it needs to.
+ val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)
+
+ udpExtension ! Udp.SimpleSender
+
+ lazy val socketAddress = new InetSocketAddress(statsDHost, statsDPort)
+
+ def receive = {
+ case Udp.SimpleSenderReady ⇒
+ context.become(ready(sender))
+ }
+
+ def ready(udpSender: ActorRef): Receive = {
+ case tick: TickMetricSnapshot ⇒
+ writeMetricsToRemote(tick,
+ (data: String) ⇒ udpSender ! Udp.Send(ByteString(data), socketAddress))
+ }
+
+ def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit
+
+ def encodeStatsDTimer(level: Long, count: Long): String = {
+ val samplingRate: Double = 1D / count
+ level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
+ }
+
+ def encodeStatsDCounter(count: Long): String = count.toString + "|c"
+
+}
+
+trait UdpExtensionProvider {
+ def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
+}
+