diff options
author | Eugene Platonov <jozic@live.com> | 2015-09-10 09:10:49 -0400 |
---|---|---|
committer | Eugene Platonov <jozic@live.com> | 2015-10-28 13:52:44 -0400 |
commit | 4bcddee80e277e08ed4afe52a9cf118fcaff1feb (patch) | |
tree | 8179fb1cac0a1269848e98a78cb6da112a205957 /kamon-statsd/src/main/scala | |
parent | 546f460d9a682e27d1ad97de1dae1ce3a681c0f6 (diff) | |
download | Kamon-4bcddee80e277e08ed4afe52a9cf118fcaff1feb.tar.gz Kamon-4bcddee80e277e08ed4afe52a9cf118fcaff1feb.tar.bz2 Kamon-4bcddee80e277e08ed4afe52a9cf118fcaff1feb.zip |
! statsd: allow custom statsd senders + add simple statsd sender which doesn't batch stats
Diffstat (limited to 'kamon-statsd/src/main/scala')
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala (renamed from kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala) | 79 | ||||
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala | 16 | ||||
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala | 62 | ||||
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala | 18 | ||||
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala | 24 | ||||
-rw-r--r-- | kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala | 76 |
6 files changed, 211 insertions, 64 deletions
diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala index 70ff1b45..be9a572a 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/BatchStatsDMetricsSender.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -16,41 +16,34 @@ package kamon.statsd -import akka.actor.{ ActorSystem, Props, ActorRef, Actor } -import akka.io.{ Udp, IO } -import java.net.InetSocketAddress -import akka.util.ByteString +import akka.actor.Props +import com.typesafe.config.Config import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot -import java.text.{ DecimalFormatSymbols, DecimalFormat } -import java.util.Locale - import kamon.metric.instrument.{ Counter, Histogram } -class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBytes: Long, metricKeyGenerator: MetricKeyGenerator) - extends Actor with UdpExtensionProvider { - import context.system - - val symbols = DecimalFormatSymbols.getInstance(Locale.US) - symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. - - // Absurdly high number of decimal digits, let the other end lose precision if it needs to. - val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) - - udpExtension ! Udp.SimpleSender - - def newSocketAddress = new InetSocketAddress(statsDHost, statsDPort) +/** + * Factory for [[BatchStatsDMetricsSender]]. + * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender" + * to select [[BatchStatsDMetricsSender]] as your sender + */ +object BatchStatsDMetricsSender extends StatsDMetricsSenderFactory { + override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props = + Props(new BatchStatsDMetricsSender(statsDConfig, metricKeyGenerator)) +} - def receive = { - case Udp.SimpleSenderReady ⇒ - context.become(ready(sender)) - } +/** + * StatsD sender which sends a UDP packet every "kamon.statsd.flush-interval" or + * as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached. + * @param statsDConfig Config to read settings specific to this sender + * @param metricKeyGenerator Key generator for all metrics sent by this sender + */ +class BatchStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) + extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) { - def ready(udpSender: ActorRef): Receive = { - case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender) - } + val maxPacketSizeInBytes = statsDConfig.getBytes("batch-metric-sender.max-packet-size") - def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, newSocketAddress) + def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = { + val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, flushToUDP) for ( (entity, snapshot) ← tick.metrics; @@ -72,25 +65,9 @@ class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBy packetBuilder.flush() } - - def encodeStatsDTimer(level: Long, count: Long): String = { - val samplingRate: Double = 1D / count - level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") - } - - def encodeStatsDCounter(count: Long): String = count.toString + "|c" } -object StatsDMetricsSender { - def props(statsDHost: String, statsDPort: Int, maxPacketSize: Long, metricKeyGenerator: MetricKeyGenerator): Props = - Props(new StatsDMetricsSender(statsDHost, statsDPort, maxPacketSize, metricKeyGenerator)) -} - -trait UdpExtensionProvider { - def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) -} - -class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) { +class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, flushToUDP: String ⇒ Unit) { val metricSeparator = "\n" val measurementSeparator = ":" @@ -103,8 +80,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r if (fitsOnBuffer(dataWithoutKey)) buffer.append(dataWithoutKey) else { - flushToUDP(buffer.toString()) - buffer.clear() + flush() buffer.append(key).append(dataWithoutKey) } } else { @@ -114,8 +90,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r val mSeparator = if (buffer.length > 0) metricSeparator else "" buffer.append(mSeparator).append(dataWithoutSeparator) } else { - flushToUDP(buffer.toString()) - buffer.clear() + flush() buffer.append(dataWithoutSeparator) } } @@ -123,8 +98,6 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes - private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) - def flush(): Unit = { flushToUDP(buffer.toString) buffer.clear() diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala index 97a27ff3..c4d8682a 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala @@ -1,3 +1,19 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon.statsd import java.lang.management.ManagementFactory diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala new file mode 100644 index 00000000..47ce66cd --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleStatsDMetricsSender.scala @@ -0,0 +1,62 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.Props +import com.typesafe.config.Config +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument.{ Counter, Histogram } + +/** + * Factory for [[SimpleStatsDMetricsSender]]. + * Use FQCN of the object in "kamon.statsd.statsd-metrics-sender" + * to select [[SimpleStatsDMetricsSender]] as your sender + */ +object SimpleStatsDMetricsSender extends StatsDMetricsSenderFactory { + override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props = + Props(new SimpleStatsDMetricsSender(statsDConfig, metricKeyGenerator)) +} + +/** + * "Traditional" StatsD sender which sends a UDP packet for each piece of data it receives. + * @param statsDConfig Config to read settings specific to this sender + * @param metricKeyGenerator Key generator for all metrics sent by this sender + */ +class SimpleStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) + extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) { + + def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit = { + + for ( + (entity, snapshot) ← tick.metrics; + (metricKey, metricSnapshot) ← snapshot.metrics + ) { + + val keyPrefix = metricKeyGenerator.generateKey(entity, metricKey) + ":" + + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + hs.recordsIterator.foreach { record ⇒ + flushToUDP(keyPrefix + encodeStatsDTimer(record.level, record.count)) + } + + case cs: Counter.Snapshot ⇒ + flushToUDP(keyPrefix + encodeStatsDCounter(cs.count)) + } + } + } +} diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 91d05510..a1f7dca3 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -23,8 +23,6 @@ import kamon.util.ConfigTools.Syntax import scala.concurrent.duration._ import com.typesafe.config.Config import akka.event.Logging -import java.net.InetSocketAddress -import java.util.concurrent.TimeUnit.MILLISECONDS import scala.collection.JavaConverters._ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { @@ -44,10 +42,10 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { val tickInterval = metricsExtension.settings.tickInterval val flushInterval = statsDConfig.getFiniteDuration("flush-interval") - val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size") val keyGeneratorFQCN = statsDConfig.getString("metric-key-generator") + val senderFactoryFQCN = statsDConfig.getString("metric-sender-factory") - val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config) + val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, senderFactoryFQCN, config) val subscriptions = statsDConfig.getConfig("subscriptions") subscriptions.firstLevelKeys.map { subscriptionCategory ⇒ @@ -56,15 +54,13 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { } } - def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, keyGeneratorFQCN: String, config: Config): ActorRef = { + def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, + keyGeneratorFQCN: String, senderFactoryFQCN: String, config: Config): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") val keyGenerator = system.dynamicAccess.createInstanceFor[MetricKeyGenerator](keyGeneratorFQCN, (classOf[Config], config) :: Nil).get + val senderFactory = system.dynamicAccess.getObjectFor[StatsDMetricsSenderFactory](senderFactoryFQCN).get - val metricsSender = system.actorOf(StatsDMetricsSender.props( - statsDConfig.getString("hostname"), - statsDConfig.getInt("port"), - maxPacketSizeInBytes, - keyGenerator), "statsd-metrics-sender") + val metricsSender = system.actorOf(senderFactory.props(statsDConfig, keyGenerator), "statsd-metrics-sender") if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics sender. diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala new file mode 100644 index 00000000..1f603aea --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSenderFactory.scala @@ -0,0 +1,24 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor.Props +import com.typesafe.config.Config + +trait StatsDMetricsSenderFactory { + def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props +} diff --git a/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala new file mode 100644 index 00000000..9e856eda --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/UDPBasedStatsDMetricsSender.scala @@ -0,0 +1,76 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import java.net.InetSocketAddress +import java.text.{ DecimalFormat, DecimalFormatSymbols } +import java.util.Locale +import akka.actor.{ Actor, ActorRef, ActorSystem } +import akka.io.{ IO, Udp } +import akka.util.ByteString +import com.typesafe.config.Config +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot + +/** + * Base class for different StatsD senders utilizing UDP protocol. It implies use of one statsd server. + * @param statsDConfig Config to read settings specific to this sender + * @param metricKeyGenerator Key generator for all metrics sent by this sender + */ +abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator) + extends Actor with UdpExtensionProvider { + + import context.system + + val statsDHost = statsDConfig.getString("hostname") + val statsDPort = statsDConfig.getInt("port") + + val symbols = DecimalFormatSymbols.getInstance(Locale.US) + symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. + + // Absurdly high number of decimal digits, let the other end lose precision if it needs to. + val samplingRateFormat = new DecimalFormat("#.################################################################", symbols) + + udpExtension ! Udp.SimpleSender + + lazy val socketAddress = new InetSocketAddress(statsDHost, statsDPort) + + def receive = { + case Udp.SimpleSenderReady ⇒ + context.become(ready(sender)) + } + + def ready(udpSender: ActorRef): Receive = { + case tick: TickMetricSnapshot ⇒ + writeMetricsToRemote(tick, + (data: String) ⇒ udpSender ! Udp.Send(ByteString(data), socketAddress)) + } + + def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String ⇒ Unit): Unit + + def encodeStatsDTimer(level: Long, count: Long): String = { + val samplingRate: Double = 1D / count + level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") + } + + def encodeStatsDCounter(count: Long): String = count.toString + "|c" + +} + +trait UdpExtensionProvider { + def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) +} + |