diff options
author | Ivan Topolnak <itopolnak@despegar.com> | 2014-03-07 18:08:01 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-03-11 21:14:53 -0300 |
commit | b6af84ab6b60b4ca6c0389c8c3622db3d3c27915 (patch) | |
tree | 1f89896820e36d59daadef7699b434de1051be42 /kamon-core | |
parent | ccbcc55282251b5e6d4f41384730232a8f0e7d05 (diff) | |
download | Kamon-b6af84ab6b60b4ca6c0389c8c3622db3d3c27915.tar.gz Kamon-b6af84ab6b60b4ca6c0389c8c3622db3d3c27915.tar.bz2 Kamon-b6af84ab6b60b4ca6c0389c8c3622db3d3c27915.zip |
multiple fixes to the custom metrics collection facilities
Diffstat (limited to 'kamon-core')
14 files changed, 244 insertions, 125 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 6762fb10..3ed2e63e 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -55,7 +55,7 @@ class BehaviourInvokeTracing { actorMetrics.map { am ⇒ am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) - am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureMark) + am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) am.mailboxSize.record(cell.numberOfMessages) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala index b3685a93..86558375 100644 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala @@ -17,8 +17,8 @@ package kamon.metrics import com.typesafe.config.Config -import kamon.metrics.instruments.ContinuousHighDynamicRangeRecorder -import org.HdrHistogram.HighDynamicRangeRecorder +import kamon.metrics.instruments.ContinuousHdrRecorder +import org.HdrHistogram.HdrRecorder case class ActorMetrics(name: String) extends MetricGroupIdentity { val category = ActorMetrics @@ -32,23 +32,17 @@ object ActorMetrics extends MetricGroupCategory { case object TimeInMailbox extends MetricIdentity { val name, tag = "TimeInMailbox" } case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder) - extends MetricMultiGroupRecorder { - - def record(identity: MetricIdentity, value: Long): Unit = identity match { - case ProcessingTime ⇒ processingTime.record(value) - case MailboxSize ⇒ mailboxSize.record(value) - case TimeInMailbox ⇒ timeInMailbox.record(value) - } + extends MetricGroupRecorder { def collect: MetricGroupSnapshot = { ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect()) } } - case class ActorMetricSnapshot(processingTime: MetricSnapshot, mailboxSize: MetricSnapshot, timeInMailbox: MetricSnapshot) + case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike) extends MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] = Map( + val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map( (ProcessingTime -> processingTime), (MailboxSize -> mailboxSize), (TimeInMailbox -> timeInMailbox)) @@ -58,17 +52,16 @@ object ActorMetrics extends MetricGroupCategory { type GroupRecorder = ActorMetricRecorder def create(config: Config): ActorMetricRecorder = { - import HighDynamicRangeRecorder.Configuration - val settings = config.getConfig("kamon.metrics.precision.actor") - val processingTimeHdrConfig = Configuration.fromConfig(settings.getConfig("processing-time")) - val mailboxSizeHdrConfig = Configuration.fromConfig(settings.getConfig("mailbox-size")) - val timeInMailboxHdrConfig = Configuration.fromConfig(settings.getConfig("time-in-mailbox")) + + val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time")) + val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size")) + val timeInMailboxConfig = extractPrecisionConfig(settings.getConfig("time-in-mailbox")) new ActorMetricRecorder( - HighDynamicRangeRecorder(processingTimeHdrConfig), - ContinuousHighDynamicRangeRecorder(mailboxSizeHdrConfig), - HighDynamicRangeRecorder(timeInMailboxHdrConfig)) + HdrRecorder(processingTimeConfig.highestTrackableValue, processingTimeConfig.significantValueDigits, Scale.Nano), + ContinuousHdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit), + HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano)) } } } diff --git a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala index 11cb6c3d..82affe47 100644 --- a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala +++ b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala @@ -16,9 +16,8 @@ package kamon.metrics -import kamon.metrics.instruments.ContinuousHighDynamicRangeRecorder -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration -import org.HdrHistogram.HighDynamicRangeRecorder +import kamon.metrics.instruments.ContinuousHdrRecorder +import org.HdrHistogram.HdrRecorder import com.typesafe.config.Config case class CustomMetric(name: String) extends MetricGroupIdentity { @@ -27,25 +26,27 @@ case class CustomMetric(name: String) extends MetricGroupIdentity { object CustomMetric extends MetricGroupCategory { val name = "custom-metric" - private val identity = new MetricIdentity { val name, tag = "RecordedValues" } + val RecordedValues = new MetricIdentity { val name, tag = "RecordedValues" } - def withConfig(highestTrackableValue: Long, significantValueDigits: Int, continuous: Boolean = false) = new MetricGroupFactory { - type GroupRecorder = CustomMetricRecorder + def histogram(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale, continuous: Boolean = false) = + new MetricGroupFactory { - def create(config: Config): CustomMetricRecorder = - if (continuous) - new CustomMetricRecorder(identity, ContinuousHighDynamicRangeRecorder(Configuration(highestTrackableValue, significantValueDigits))) - else - new CustomMetricRecorder(identity, HighDynamicRangeRecorder(Configuration(highestTrackableValue, significantValueDigits))) - } + type GroupRecorder = CustomMetricRecorder - class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HighDynamicRangeRecorder) - extends MetricSingleGroupRecorder { + def create(config: Config): CustomMetricRecorder = { + val recorder = + if (continuous) ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) + else HdrRecorder(highestTrackableValue, significantValueDigits, scale) - def collect: MetricGroupSnapshot = new MetricGroupSnapshot { - val metrics: Map[MetricIdentity, MetricSnapshot] = Map((identity, underlyingRecorder.collect())) + new CustomMetricRecorder(RecordedValues, recorder) + } } + class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HdrRecorder) + extends MetricGroupRecorder { + def record(value: Long): Unit = underlyingRecorder.record(value) + + def collect: MetricGroupSnapshot = DefaultMetricGroupSnapshot(Map((identity, underlyingRecorder.collect()))) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala index f7ee833b..b82f429f 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala @@ -34,35 +34,30 @@ trait MetricIdentity { def tag: String } -sealed trait MetricGroupRecorder { +trait MetricGroupRecorder { def collect: MetricGroupSnapshot } -trait MetricMultiGroupRecorder extends MetricGroupRecorder { - def record(identity: MetricIdentity, value: Long) -} - -trait MetricSingleGroupRecorder extends MetricGroupRecorder { - def record(value: Long) -} - trait MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] + def metrics: Map[MetricIdentity, MetricSnapshotLike] } +case class DefaultMetricGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot + trait MetricRecorder { def record(value: Long) - def collect(): MetricSnapshot + def collect(): MetricSnapshotLike } -trait MetricSnapshot { +trait MetricSnapshotLike { def numberOfMeasurements: Long - def measurementLevels: Vector[Measurement] + def scale: Scale + def measurements: Vector[Measurement] - def max: Long = measurementLevels.lastOption.map(_.value).getOrElse(0) - def min: Long = measurementLevels.headOption.map(_.value).getOrElse(0) + def max: Long = measurements.lastOption.map(_.value).getOrElse(0) + def min: Long = measurements.headOption.map(_.value).getOrElse(0) - def merge(that: MetricSnapshot): MetricSnapshot = { + def merge(that: MetricSnapshotLike): MetricSnapshotLike = { val mergedMeasurements = Vector.newBuilder[Measurement] @tailrec def go(left: Vector[Measurement], right: Vector[Measurement], totalNrOfMeasurements: Long): Long = { @@ -96,23 +91,19 @@ trait MetricSnapshot { } } - val totalNrOfMeasurements = go(measurementLevels, that.measurementLevels, 0) - DefaultMetricSnapshot(totalNrOfMeasurements, mergedMeasurements.result()) + val totalNrOfMeasurements = go(measurements, that.measurements, 0) + MetricSnapshot(totalNrOfMeasurements, scale, mergedMeasurements.result()) } } +case class MetricSnapshot(numberOfMeasurements: Long, scale: Scale, measurements: Vector[MetricSnapshot.Measurement]) extends MetricSnapshotLike + object MetricSnapshot { case class Measurement(value: Long, count: Long) { def merge(that: Measurement) = Measurement(value, count + that.count) } } -case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot - -object DefaultMetricSnapshot { - val empty = DefaultMetricSnapshot(0, Vector.empty) -} - trait MetricGroupFactory { type GroupRecorder <: MetricGroupRecorder def create(config: Config): GroupRecorder diff --git a/kamon-core/src/main/scala/kamon/metrics/Scale.scala b/kamon-core/src/main/scala/kamon/metrics/Scale.scala new file mode 100644 index 00000000..6899490a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/Scale.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +class Scale(val numericValue: Double) extends AnyVal + +object Scale { + val Nano = new Scale(1E-9) + val Micro = new Scale(1E-6) + val Milli = new Scale(1E-3) + val Unit = new Scale(1) + val Kilo = new Scale(1E3) + val Mega = new Scale(1E6) + val Giga = new Scale(1E9) + + def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue +} diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala index 4b022377..e398ebe0 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -22,7 +22,7 @@ import kamon.util.GlobPathFilter import scala.concurrent.duration.{ FiniteDuration, Duration } import java.util.concurrent.TimeUnit import kamon.Kamon -import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer +import kamon.metrics.TickMetricSnapshotBuffer.{ Combined, FlushBuffer } class Subscriptions extends Actor { import context.system @@ -116,14 +116,14 @@ class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef super.postStop() } - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot { - val metrics = combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r)) - } + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r))) } object TickMetricSnapshotBuffer { case object FlushBuffer + case class Combined(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) } diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala index abea3d82..9d787428 100644 --- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala @@ -16,7 +16,7 @@ package kamon.metrics -import org.HdrHistogram.HighDynamicRangeRecorder +import org.HdrHistogram.HdrRecorder import scala.collection.concurrent.TrieMap import com.typesafe.config.Config @@ -30,37 +30,36 @@ object TraceMetrics extends MetricGroupCategory { case object ElapsedTime extends MetricIdentity { val name, tag = "ElapsedTime" } case class HttpClientRequest(name: String, tag: String) extends MetricIdentity - class TraceMetricRecorder(val elapsedTime: HighDynamicRangeRecorder, private val segmentRecorderFactory: () ⇒ HighDynamicRangeRecorder) - extends MetricMultiGroupRecorder { + class TraceMetricRecorder(val elapsedTime: HdrRecorder, private val segmentRecorderFactory: () ⇒ HdrRecorder) + extends MetricGroupRecorder { - private val segments = TrieMap[MetricIdentity, HighDynamicRangeRecorder]() + private val segments = TrieMap[MetricIdentity, HdrRecorder]() - def record(identity: MetricIdentity, value: Long): Unit = identity match { - case ElapsedTime ⇒ elapsedTime.record(value) - case id: MetricIdentity ⇒ segments.getOrElseUpdate(id, segmentRecorderFactory.apply()).record(value) - } + def segmentRecorder(segmentIdentity: MetricIdentity): HdrRecorder = + segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) def collect: MetricGroupSnapshot = TraceMetricSnapshot(elapsedTime.collect(), segments.map { case (identity, recorder) ⇒ (identity, recorder.collect()) }.toMap) } - case class TraceMetricSnapshot(elapsedTime: MetricSnapshot, segments: Map[MetricIdentity, MetricSnapshot]) + case class TraceMetricSnapshot(elapsedTime: MetricSnapshotLike, segments: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) + def metrics: Map[MetricIdentity, MetricSnapshotLike] = segments + (ElapsedTime -> elapsedTime) } val Factory = new MetricGroupFactory { type GroupRecorder = TraceMetricRecorder def create(config: Config): TraceMetricRecorder = { - import HighDynamicRangeRecorder.Configuration val settings = config.getConfig("kamon.metrics.precision.trace") - val elapsedTimeHdrConfig = Configuration.fromConfig(settings.getConfig("elapsed-time")) - val segmentHdrConfig = Configuration.fromConfig(settings.getConfig("segment")) + val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time")) + val segmentConfig = extractPrecisionConfig(settings.getConfig("segment")) - new TraceMetricRecorder(HighDynamicRangeRecorder(elapsedTimeHdrConfig), () ⇒ HighDynamicRangeRecorder(segmentHdrConfig)) + new TraceMetricRecorder( + HdrRecorder(elapsedTimeConfig.highestTrackableValue, elapsedTimeConfig.significantValueDigits, Scale.Nano), + () ⇒ HdrRecorder(segmentConfig.highestTrackableValue, segmentConfig.significantValueDigits, Scale.Nano)) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala index 533e06b1..3a39ec69 100644 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala +++ b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala @@ -16,9 +16,8 @@ package kamon.metrics.instruments -import org.HdrHistogram.HighDynamicRangeRecorder -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration -import kamon.metrics.MetricSnapshot +import org.HdrHistogram.HdrRecorder +import kamon.metrics.{ Scale, MetricSnapshotLike } /** * This recorder keeps track of the last value recoded and automatically adds it after collecting a snapshot. This is @@ -29,7 +28,9 @@ import kamon.metrics.MetricSnapshot * after a new event is processed. * */ -class ContinuousHighDynamicRangeRecorder(configuration: Configuration) extends HighDynamicRangeRecorder(configuration) { +class ContinuousHdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) + extends HdrRecorder(highestTrackableValue, significantValueDigits, scale) { + @volatile private var lastRecordedValue: Long = 0 override def record(value: Long): Unit = { @@ -37,7 +38,7 @@ class ContinuousHighDynamicRangeRecorder(configuration: Configuration) extends H super.record(value) } - override def collect(): MetricSnapshot = { + override def collect(): MetricSnapshotLike = { val snapshot = super.collect() super.record(lastRecordedValue) @@ -45,6 +46,7 @@ class ContinuousHighDynamicRangeRecorder(configuration: Configuration) extends H } } -object ContinuousHighDynamicRangeRecorder { - def apply(configuration: Configuration) = new ContinuousHighDynamicRangeRecorder(configuration) +object ContinuousHdrRecorder { + def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) = + new ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala index e31d0e11..28520030 100644 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala +++ b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala @@ -18,23 +18,21 @@ package org.HdrHistogram import java.util.concurrent.atomic.AtomicLongFieldUpdater import scala.annotation.tailrec -import kamon.metrics.{ DefaultMetricSnapshot, MetricSnapshot, MetricRecorder } -import com.typesafe.config.Config -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration +import kamon.metrics.{ Scale, MetricSnapshot, MetricSnapshotLike, MetricRecorder } /** * This implementation aims to be used for real time data collection where data snapshots are taken often over time. * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. */ -class HighDynamicRangeRecorder(configuration: Configuration) - extends AtomicHistogram(1L, configuration.highestTrackableValue, configuration.significantValueDigits) with MetricRecorder { +class HdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) + extends AtomicHistogram(1L, highestTrackableValue, significantValueDigits) with MetricRecorder { - import HighDynamicRangeRecorder.totalCountUpdater + import HdrRecorder.totalCountUpdater def record(value: Long): Unit = recordValue(value) - def collect(): MetricSnapshot = { + def collect(): MetricSnapshotLike = { val entries = Vector.newBuilder[MetricSnapshot.Measurement] val countsLength = counts.length() @@ -66,21 +64,15 @@ class HighDynamicRangeRecorder(configuration: Configuration) while (!tryUpdateTotalCount) {} - DefaultMetricSnapshot(nrOfRecordings, entries.result()) + MetricSnapshot(nrOfRecordings, scale, entries.result()) } } -object HighDynamicRangeRecorder { +object HdrRecorder { val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") - def apply(configuration: Configuration): HighDynamicRangeRecorder = new HighDynamicRangeRecorder(configuration) + def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale): HdrRecorder = + new HdrRecorder(highestTrackableValue, significantValueDigits, scale) - case class Configuration(highestTrackableValue: Long, significantValueDigits: Int) - - case object Configuration { - def fromConfig(config: Config): Configuration = { - Configuration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) - } - } } diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala index f89d84a3..640157a9 100644 --- a/kamon-core/src/main/scala/kamon/metrics/package.scala +++ b/kamon-core/src/main/scala/kamon/metrics/package.scala @@ -16,11 +16,24 @@ package kamon +import scala.annotation.tailrec +import com.typesafe.config.Config + package object metrics { - def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { - (left ++ right) map { - case (key, rightValue) ⇒ key -> left.get(key).map(leftValue ⇒ valueMerger(leftValue, rightValue)).getOrElse(rightValue) + case class HdrPrecisionConfig(highestTrackableValue: Long, significantValueDigits: Int) + + def extractPrecisionConfig(config: Config): HdrPrecisionConfig = + HdrPrecisionConfig(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) + + @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { + if (right.isEmpty) + left + else { + val (key, rightValue) = right.head + val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) + + combineMaps(left.updated(key, value), right.tail)(valueMerger) } } } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index ee1f949e..9980a022 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -22,7 +22,7 @@ import kamon.metrics._ import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware import kamon.trace.TraceContext.SegmentIdentity -import kamon.trace.SegmentData +import kamon.metrics.TraceMetrics.TraceMetricRecorder trait TraceContext { def name: String @@ -50,7 +50,7 @@ case object SimpleTrace extends TracingLevelOfDetail case object FullTrace extends TracingLevelOfDetail trait TraceContextAware { - def captureMark: Long + def captureNanoTime: Long def traceContext: Option[TraceContext] } @@ -58,7 +58,7 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - val captureMark = System.nanoTime() + val captureNanoTime = System.nanoTime() val traceContext = TraceRecorder.currentContext } } @@ -98,10 +98,10 @@ class SimpleMetricCollectionContext(@volatile private var _name: String, val tok } } - private def drainFinishedSegments(metricRecorder: MetricMultiGroupRecorder): Unit = { + private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { while (!finishedSegments.isEmpty) { val segmentData = finishedSegments.poll() - metricRecorder.record(segmentData.identity, segmentData.duration) + metricRecorder.segmentRecorder(segmentData.identity).record(segmentData.duration) } } @@ -119,11 +119,11 @@ class SimpleMetricCollectionContext(@volatile private var _name: String, val tok new SimpleMetricCollectionCompletionHandle(identity, metadata) class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle { - val segmentStartMark = System.nanoTime() + val segmentStartNanoTime = System.nanoTime() def finish(metadata: Map[String, String] = Map.empty): Unit = { - val segmentFinishMark = System.nanoTime() - finishSegment(identity, (segmentFinishMark - segmentStartMark), startMetadata ++ metadata) + val segmentFinishNanoTime = System.nanoTime() + finishSegment(identity, (segmentFinishNanoTime - segmentStartNanoTime), startMetadata ++ metadata) } } } diff --git a/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala b/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala index f5caf6e9..1e072f71 100644 --- a/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala +++ b/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala @@ -23,6 +23,7 @@ import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.MetricSnapshot.Measurement class CustomMetricSpec extends TestKitBase with WordSpecLike with Matchers { implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( @@ -41,20 +42,36 @@ class CustomMetricSpec extends TestKitBase with WordSpecLike with Matchers { "the Kamon custom metrics support" should { "allow registering a custom metric with the Metrics extension" in { - val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.withConfig(100, 2)) + val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.histogram(100, 2, Scale.Unit)) recorder should be('defined) } "allow subscriptions to custom metrics using the default subscription protocol" in { - val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.withConfig(100, 2)) - recorder.map(_.record(100)) + val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.histogram(100, 2, Scale.Unit)) + + recorder.map { r ⇒ + r.record(100) + r.record(15) + r.record(0) + r.record(50) + } Kamon(Metrics).subscribe(CustomMetric, "test/sample-counter", testActor) - println(within(5 seconds) { - expectMsgType[TickMetricSnapshot] - }.metrics(CustomMetric("test/sample-counter"))) + val recordedValues = within(5 seconds) { + val snapshot = expectMsgType[TickMetricSnapshot] + snapshot.metrics(CustomMetric("test/sample-counter")).metrics(CustomMetric.RecordedValues) + } + + recordedValues.min should equal(0) + recordedValues.max should equal(100) + recordedValues.numberOfMeasurements should equal(4) + recordedValues.measurements should contain allOf ( + Measurement(0, 1), + Measurement(15, 1), + Measurement(50, 1), + Measurement(100, 1)) } } diff --git a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala index 1c5a4b21..c273aff1 100644 --- a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala +++ b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala @@ -38,31 +38,30 @@ class MetricSnapshotSpec extends WordSpec with Matchers { merged.min should be(1) merged.max should be(17) merged.numberOfMeasurements should be(200) - merged.measurementLevels.map(_.value) should contain inOrderOnly (1, 2, 4, 5, 7, 10, 17) + merged.measurements.map(_.value) should contain inOrderOnly (1, 2, 4, 5, 7, 10, 17) } "be able to merge with empty snapshots" in new SnapshotFixtures { snapshotA.merge(emptySnapshot) should be(snapshotA) + emptySnapshot.merge(snapshotA).merge(emptySnapshot) should be(snapshotA) } } trait SnapshotFixtures { - val emptySnapshot = DefaultMetricSnapshot(0, Vector.empty) + val emptySnapshot = MetricSnapshot(0, Scale.Unit, Vector.empty) - val snapshotA = DefaultMetricSnapshot(100, Vector( + val snapshotA = MetricSnapshot(100, Scale.Unit, Vector( Measurement(1, 3), Measurement(2, 15), Measurement(5, 68), Measurement(7, 13), Measurement(17, 1))) - val snapshotB = DefaultMetricSnapshot(100, Vector( + val snapshotB = MetricSnapshot(100, Scale.Unit, Vector( Measurement(2, 6), Measurement(4, 48), Measurement(5, 39), Measurement(10, 7))) - } - } diff --git a/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala new file mode 100644 index 00000000..33200e2d --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala @@ -0,0 +1,81 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.scalatest.{ Matchers, WordSpecLike } +import akka.testkit.TestKit +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.MetricSnapshot.Measurement + +class TickMetricSnapshotBufferSpec extends TestKit(ActorSystem("tick-metric-snapshot-buffer")) with WordSpecLike with Matchers { + + "the TickMetricSnapshotBuffer" should { + "merge TickMetricSnapshots received until the flush timeout is reached and fix the from/to fields" in new SnapshotFixtures { + val buffer = system.actorOf(TickMetricSnapshotBuffer.props(3 seconds, testActor)) + + buffer ! firstEmpty + buffer ! secondEmpty + buffer ! thirdEmpty + + within(2 seconds)(expectNoMsg()) + val mergedSnapshot = expectMsgType[TickMetricSnapshot] + + mergedSnapshot.from should equal(1000) + mergedSnapshot.to should equal(4000) + mergedSnapshot.metrics should be('empty) + } + + "merge empty and non-empty snapshots" in new SnapshotFixtures { + val buffer = system.actorOf(TickMetricSnapshotBuffer.props(3 seconds, testActor)) + + buffer ! firstNonEmpty + buffer ! secondNonEmpty + buffer ! thirdEmpty + + within(2 seconds)(expectNoMsg()) + val mergedSnapshot = expectMsgType[TickMetricSnapshot] + + mergedSnapshot.from should equal(1000) + mergedSnapshot.to should equal(4000) + mergedSnapshot.metrics should not be ('empty) + + val testMetricSnapshot = mergedSnapshot.metrics(CustomMetric("test-metric")).metrics(CustomMetric.RecordedValues) + testMetricSnapshot.min should equal(1) + testMetricSnapshot.max should equal(10) + testMetricSnapshot.numberOfMeasurements should equal(35) + testMetricSnapshot.measurements should contain allOf (Measurement(1, 10), Measurement(4, 9), Measurement(10, 16)) + + } + } + + trait SnapshotFixtures { + val firstEmpty = TickMetricSnapshot(1000, 2000, Map.empty) + val secondEmpty = TickMetricSnapshot(2000, 3000, Map.empty) + val thirdEmpty = TickMetricSnapshot(3000, 4000, Map.empty) + + val firstNonEmpty = TickMetricSnapshot(1000, 2000, + Map((CustomMetric("test-metric") -> SimpleGroupSnapshot(Map(CustomMetric.RecordedValues -> MetricSnapshot(20, Scale.Unit, Vector(Measurement(1, 10), Measurement(10, 10)))))))) + + val secondNonEmpty = TickMetricSnapshot(1000, 2000, + Map((CustomMetric("test-metric") -> SimpleGroupSnapshot(Map(CustomMetric.RecordedValues -> MetricSnapshot(15, Scale.Unit, Vector(Measurement(4, 9), Measurement(10, 6)))))))) + + } + + case class SimpleGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot +} |