From 29068fc70a3e5a17a630c2c7fff951572bb5fa21 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 3 Jul 2014 14:36:42 -0300 Subject: ! all: refactor the core metric recording instruments and accomodate UserMetrics This PR is including several changes to the kamon-core, most notably: - Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure that the interfaces are as clean as possible. - Move away from the all Vector[Measurement] based Histogram snapshot to a new approach in which we use a single long to store both the index in the counts array and the frequency on that bucket. The leftmost 2 bytes of each long are used for storing the counts array index and the remaining 6 bytes are used for the actual count, and everything is put into a simple long array. This way only the buckets that actually have values will be included in the snapshot with the smallest possible memory footprint. - Introduce Gauges. - Reorganize the instrumentation for Akka and Scala and rewrite most of the tests of this components to avoid going through the subscription protocol to test. - Introduce trace tests and fixes on various tests. - Necessary changes on new relic, datadog and statsd modules to compile with the new codebase. Pending: - Finish the upgrade of the new relic to the current model. - Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes limits. - More testing, more testing, more testing. - Create the KamonStandalone module. --- .../src/main/scala/kamon/statsd/StatsD.scala | 2 +- .../scala/kamon/statsd/StatsDMetricsSender.scala | 35 +++++++++++----------- 2 files changed, 19 insertions(+), 18 deletions(-) (limited to 'kamon-statsd/src/main/scala/kamon') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 1b3daa97..dcd78f78 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -18,7 +18,7 @@ package kamon.statsd import akka.actor._ import kamon.Kamon -import kamon.metrics._ +import kamon.metric._ import scala.concurrent.duration._ import scala.collection.JavaConverters._ import com.typesafe.config.Config diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index adda18cc..94bab27c 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -20,12 +20,12 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } +import kamon.metric.Subscriptions.TickMetricSnapshot import java.text.{ DecimalFormatSymbols, DecimalFormat } import java.util.Locale +import kamon.metric.instrument.{ Counter, Histogram } + class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { import context.system @@ -48,7 +48,7 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) + val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) for ( (groupIdentity, groupSnapshot) ← tick.metrics; @@ -57,25 +57,26 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity) - for (measurement ← metricSnapshot.measurements) { - val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType) - dataBuilder.appendMeasurement(key, measurementData) + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + hs.recordsIterator.foreach { record ⇒ + packetBuilder.appendMeasurement(key, encodeStatsDTimer(record.level, record.count)) + } + + case cs: Counter.Snapshot ⇒ + packetBuilder.appendMeasurement(key, encodeStatsDCounter(cs.count)) } } - dataBuilder.flush() + packetBuilder.flush() } - def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): String = { - def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = - value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") - - instrumentType match { - case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) - case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g") - case Counter ⇒ statsDMetricFormat(measurement.count.toString, "c") - } + def encodeStatsDTimer(level: Long, count: Long): String = { + val samplingRate: Double = 1D / count + level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") } + + def encodeStatsDCounter(count: Long): String = count.toString + "|c" } object StatsDMetricsSender { -- cgit v1.2.3