From b6af84ab6b60b4ca6c0389c8c3622db3d3c27915 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Fri, 7 Mar 2014 18:08:01 -0300 Subject: multiple fixes to the custom metrics collection facilities --- .../ActorMessagePassingTracing.scala | 2 +- .../main/scala/kamon/metrics/ActorMetrics.scala | 31 +++----- .../main/scala/kamon/metrics/CustomMetric.scala | 33 +++++---- .../src/main/scala/kamon/metrics/Metrics.scala | 39 ++++------ .../src/main/scala/kamon/metrics/Scale.scala | 31 ++++++++ .../main/scala/kamon/metrics/Subscriptions.scala | 8 +- .../main/scala/kamon/metrics/TraceMetrics.scala | 27 ++++--- .../instruments/ContinuousHdrRecorder.scala | 52 +++++++++++++ .../ContinuousHighDynamicRangeRecorder.scala | 50 ------------- .../kamon/metrics/instruments/HdrRecorder.scala | 78 ++++++++++++++++++++ .../instruments/HighDynamicRangeRecorder.scala | 86 ---------------------- .../src/main/scala/kamon/metrics/package.scala | 19 ++++- .../src/main/scala/kamon/trace/TraceContext.scala | 16 ++-- .../scala/kamon/metrics/CustomMetricSpec.scala | 29 ++++++-- .../scala/kamon/metrics/MetricSnapshotSpec.scala | 11 ++- .../metrics/TickMetricSnapshotBufferSpec.scala | 81 ++++++++++++++++++++ 16 files changed, 356 insertions(+), 237 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metrics/Scale.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala create mode 100644 kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 6762fb10..3ed2e63e 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -55,7 +55,7 @@ class BehaviourInvokeTracing { actorMetrics.map { am ⇒ am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) - am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureMark) + am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) am.mailboxSize.record(cell.numberOfMessages) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala index b3685a93..86558375 100644 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala @@ -17,8 +17,8 @@ package kamon.metrics import com.typesafe.config.Config -import kamon.metrics.instruments.ContinuousHighDynamicRangeRecorder -import org.HdrHistogram.HighDynamicRangeRecorder +import kamon.metrics.instruments.ContinuousHdrRecorder +import org.HdrHistogram.HdrRecorder case class ActorMetrics(name: String) extends MetricGroupIdentity { val category = ActorMetrics @@ -32,23 +32,17 @@ object ActorMetrics extends MetricGroupCategory { case object TimeInMailbox extends MetricIdentity { val name, tag = "TimeInMailbox" } case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder) - extends MetricMultiGroupRecorder { - - def record(identity: MetricIdentity, value: Long): Unit = identity match { - case ProcessingTime ⇒ processingTime.record(value) - case MailboxSize ⇒ mailboxSize.record(value) - case TimeInMailbox ⇒ timeInMailbox.record(value) - } + extends MetricGroupRecorder { def collect: MetricGroupSnapshot = { ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect()) } } - case class ActorMetricSnapshot(processingTime: MetricSnapshot, mailboxSize: MetricSnapshot, timeInMailbox: MetricSnapshot) + case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike) extends MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] = Map( + val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map( (ProcessingTime -> processingTime), (MailboxSize -> mailboxSize), (TimeInMailbox -> timeInMailbox)) @@ -58,17 +52,16 @@ object ActorMetrics extends MetricGroupCategory { type GroupRecorder = ActorMetricRecorder def create(config: Config): ActorMetricRecorder = { - import HighDynamicRangeRecorder.Configuration - val settings = config.getConfig("kamon.metrics.precision.actor") - val processingTimeHdrConfig = Configuration.fromConfig(settings.getConfig("processing-time")) - val mailboxSizeHdrConfig = Configuration.fromConfig(settings.getConfig("mailbox-size")) - val timeInMailboxHdrConfig = Configuration.fromConfig(settings.getConfig("time-in-mailbox")) + + val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time")) + val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size")) + val timeInMailboxConfig = extractPrecisionConfig(settings.getConfig("time-in-mailbox")) new ActorMetricRecorder( - HighDynamicRangeRecorder(processingTimeHdrConfig), - ContinuousHighDynamicRangeRecorder(mailboxSizeHdrConfig), - HighDynamicRangeRecorder(timeInMailboxHdrConfig)) + HdrRecorder(processingTimeConfig.highestTrackableValue, processingTimeConfig.significantValueDigits, Scale.Nano), + ContinuousHdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit), + HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano)) } } } diff --git a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala index 11cb6c3d..82affe47 100644 --- a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala +++ b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala @@ -16,9 +16,8 @@ package kamon.metrics -import kamon.metrics.instruments.ContinuousHighDynamicRangeRecorder -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration -import org.HdrHistogram.HighDynamicRangeRecorder +import kamon.metrics.instruments.ContinuousHdrRecorder +import org.HdrHistogram.HdrRecorder import com.typesafe.config.Config case class CustomMetric(name: String) extends MetricGroupIdentity { @@ -27,25 +26,27 @@ case class CustomMetric(name: String) extends MetricGroupIdentity { object CustomMetric extends MetricGroupCategory { val name = "custom-metric" - private val identity = new MetricIdentity { val name, tag = "RecordedValues" } + val RecordedValues = new MetricIdentity { val name, tag = "RecordedValues" } - def withConfig(highestTrackableValue: Long, significantValueDigits: Int, continuous: Boolean = false) = new MetricGroupFactory { - type GroupRecorder = CustomMetricRecorder + def histogram(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale, continuous: Boolean = false) = + new MetricGroupFactory { - def create(config: Config): CustomMetricRecorder = - if (continuous) - new CustomMetricRecorder(identity, ContinuousHighDynamicRangeRecorder(Configuration(highestTrackableValue, significantValueDigits))) - else - new CustomMetricRecorder(identity, HighDynamicRangeRecorder(Configuration(highestTrackableValue, significantValueDigits))) - } + type GroupRecorder = CustomMetricRecorder - class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HighDynamicRangeRecorder) - extends MetricSingleGroupRecorder { + def create(config: Config): CustomMetricRecorder = { + val recorder = + if (continuous) ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) + else HdrRecorder(highestTrackableValue, significantValueDigits, scale) - def collect: MetricGroupSnapshot = new MetricGroupSnapshot { - val metrics: Map[MetricIdentity, MetricSnapshot] = Map((identity, underlyingRecorder.collect())) + new CustomMetricRecorder(RecordedValues, recorder) + } } + class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HdrRecorder) + extends MetricGroupRecorder { + def record(value: Long): Unit = underlyingRecorder.record(value) + + def collect: MetricGroupSnapshot = DefaultMetricGroupSnapshot(Map((identity, underlyingRecorder.collect()))) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala index f7ee833b..b82f429f 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala @@ -34,35 +34,30 @@ trait MetricIdentity { def tag: String } -sealed trait MetricGroupRecorder { +trait MetricGroupRecorder { def collect: MetricGroupSnapshot } -trait MetricMultiGroupRecorder extends MetricGroupRecorder { - def record(identity: MetricIdentity, value: Long) -} - -trait MetricSingleGroupRecorder extends MetricGroupRecorder { - def record(value: Long) -} - trait MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] + def metrics: Map[MetricIdentity, MetricSnapshotLike] } +case class DefaultMetricGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot + trait MetricRecorder { def record(value: Long) - def collect(): MetricSnapshot + def collect(): MetricSnapshotLike } -trait MetricSnapshot { +trait MetricSnapshotLike { def numberOfMeasurements: Long - def measurementLevels: Vector[Measurement] + def scale: Scale + def measurements: Vector[Measurement] - def max: Long = measurementLevels.lastOption.map(_.value).getOrElse(0) - def min: Long = measurementLevels.headOption.map(_.value).getOrElse(0) + def max: Long = measurements.lastOption.map(_.value).getOrElse(0) + def min: Long = measurements.headOption.map(_.value).getOrElse(0) - def merge(that: MetricSnapshot): MetricSnapshot = { + def merge(that: MetricSnapshotLike): MetricSnapshotLike = { val mergedMeasurements = Vector.newBuilder[Measurement] @tailrec def go(left: Vector[Measurement], right: Vector[Measurement], totalNrOfMeasurements: Long): Long = { @@ -96,23 +91,19 @@ trait MetricSnapshot { } } - val totalNrOfMeasurements = go(measurementLevels, that.measurementLevels, 0) - DefaultMetricSnapshot(totalNrOfMeasurements, mergedMeasurements.result()) + val totalNrOfMeasurements = go(measurements, that.measurements, 0) + MetricSnapshot(totalNrOfMeasurements, scale, mergedMeasurements.result()) } } +case class MetricSnapshot(numberOfMeasurements: Long, scale: Scale, measurements: Vector[MetricSnapshot.Measurement]) extends MetricSnapshotLike + object MetricSnapshot { case class Measurement(value: Long, count: Long) { def merge(that: Measurement) = Measurement(value, count + that.count) } } -case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot - -object DefaultMetricSnapshot { - val empty = DefaultMetricSnapshot(0, Vector.empty) -} - trait MetricGroupFactory { type GroupRecorder <: MetricGroupRecorder def create(config: Config): GroupRecorder diff --git a/kamon-core/src/main/scala/kamon/metrics/Scale.scala b/kamon-core/src/main/scala/kamon/metrics/Scale.scala new file mode 100644 index 00000000..6899490a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/Scale.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +class Scale(val numericValue: Double) extends AnyVal + +object Scale { + val Nano = new Scale(1E-9) + val Micro = new Scale(1E-6) + val Milli = new Scale(1E-3) + val Unit = new Scale(1) + val Kilo = new Scale(1E3) + val Mega = new Scale(1E6) + val Giga = new Scale(1E9) + + def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue +} diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala index 4b022377..e398ebe0 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -22,7 +22,7 @@ import kamon.util.GlobPathFilter import scala.concurrent.duration.{ FiniteDuration, Duration } import java.util.concurrent.TimeUnit import kamon.Kamon -import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer +import kamon.metrics.TickMetricSnapshotBuffer.{ Combined, FlushBuffer } class Subscriptions extends Actor { import context.system @@ -116,14 +116,14 @@ class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef super.postStop() } - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot { - val metrics = combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r)) - } + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r))) } object TickMetricSnapshotBuffer { case object FlushBuffer + case class Combined(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) } diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala index abea3d82..9d787428 100644 --- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala @@ -16,7 +16,7 @@ package kamon.metrics -import org.HdrHistogram.HighDynamicRangeRecorder +import org.HdrHistogram.HdrRecorder import scala.collection.concurrent.TrieMap import com.typesafe.config.Config @@ -30,37 +30,36 @@ object TraceMetrics extends MetricGroupCategory { case object ElapsedTime extends MetricIdentity { val name, tag = "ElapsedTime" } case class HttpClientRequest(name: String, tag: String) extends MetricIdentity - class TraceMetricRecorder(val elapsedTime: HighDynamicRangeRecorder, private val segmentRecorderFactory: () ⇒ HighDynamicRangeRecorder) - extends MetricMultiGroupRecorder { + class TraceMetricRecorder(val elapsedTime: HdrRecorder, private val segmentRecorderFactory: () ⇒ HdrRecorder) + extends MetricGroupRecorder { - private val segments = TrieMap[MetricIdentity, HighDynamicRangeRecorder]() + private val segments = TrieMap[MetricIdentity, HdrRecorder]() - def record(identity: MetricIdentity, value: Long): Unit = identity match { - case ElapsedTime ⇒ elapsedTime.record(value) - case id: MetricIdentity ⇒ segments.getOrElseUpdate(id, segmentRecorderFactory.apply()).record(value) - } + def segmentRecorder(segmentIdentity: MetricIdentity): HdrRecorder = + segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) def collect: MetricGroupSnapshot = TraceMetricSnapshot(elapsedTime.collect(), segments.map { case (identity, recorder) ⇒ (identity, recorder.collect()) }.toMap) } - case class TraceMetricSnapshot(elapsedTime: MetricSnapshot, segments: Map[MetricIdentity, MetricSnapshot]) + case class TraceMetricSnapshot(elapsedTime: MetricSnapshotLike, segments: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) + def metrics: Map[MetricIdentity, MetricSnapshotLike] = segments + (ElapsedTime -> elapsedTime) } val Factory = new MetricGroupFactory { type GroupRecorder = TraceMetricRecorder def create(config: Config): TraceMetricRecorder = { - import HighDynamicRangeRecorder.Configuration val settings = config.getConfig("kamon.metrics.precision.trace") - val elapsedTimeHdrConfig = Configuration.fromConfig(settings.getConfig("elapsed-time")) - val segmentHdrConfig = Configuration.fromConfig(settings.getConfig("segment")) + val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time")) + val segmentConfig = extractPrecisionConfig(settings.getConfig("segment")) - new TraceMetricRecorder(HighDynamicRangeRecorder(elapsedTimeHdrConfig), () ⇒ HighDynamicRangeRecorder(segmentHdrConfig)) + new TraceMetricRecorder( + HdrRecorder(elapsedTimeConfig.highestTrackableValue, elapsedTimeConfig.significantValueDigits, Scale.Nano), + () ⇒ HdrRecorder(segmentConfig.highestTrackableValue, segmentConfig.significantValueDigits, Scale.Nano)) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala new file mode 100644 index 00000000..3a39ec69 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala @@ -0,0 +1,52 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics.instruments + +import org.HdrHistogram.HdrRecorder +import kamon.metrics.{ Scale, MetricSnapshotLike } + +/** + * This recorder keeps track of the last value recoded and automatically adds it after collecting a snapshot. This is + * useful in cases where the absence of recordings does not necessarily mean the absence of values. For example, if this + * recorder is used for recording the mailbox size of an actor, and it only gets updated upon message enqueue o dequeue, + * the absence of recordings during 1 second means that the size hasn't change (example: the actor being blocked doing + * some work) and it should keep its last known value, instead of dropping to zero and then going back to the real value + * after a new event is processed. + * + */ +class ContinuousHdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) + extends HdrRecorder(highestTrackableValue, significantValueDigits, scale) { + + @volatile private var lastRecordedValue: Long = 0 + + override def record(value: Long): Unit = { + lastRecordedValue = value + super.record(value) + } + + override def collect(): MetricSnapshotLike = { + val snapshot = super.collect() + super.record(lastRecordedValue) + + snapshot + } +} + +object ContinuousHdrRecorder { + def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) = + new ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala deleted file mode 100644 index 533e06b1..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics.instruments - -import org.HdrHistogram.HighDynamicRangeRecorder -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration -import kamon.metrics.MetricSnapshot - -/** - * This recorder keeps track of the last value recoded and automatically adds it after collecting a snapshot. This is - * useful in cases where the absence of recordings does not necessarily mean the absence of values. For example, if this - * recorder is used for recording the mailbox size of an actor, and it only gets updated upon message enqueue o dequeue, - * the absence of recordings during 1 second means that the size hasn't change (example: the actor being blocked doing - * some work) and it should keep its last known value, instead of dropping to zero and then going back to the real value - * after a new event is processed. - * - */ -class ContinuousHighDynamicRangeRecorder(configuration: Configuration) extends HighDynamicRangeRecorder(configuration) { - @volatile private var lastRecordedValue: Long = 0 - - override def record(value: Long): Unit = { - lastRecordedValue = value - super.record(value) - } - - override def collect(): MetricSnapshot = { - val snapshot = super.collect() - super.record(lastRecordedValue) - - snapshot - } -} - -object ContinuousHighDynamicRangeRecorder { - def apply(configuration: Configuration) = new ContinuousHighDynamicRangeRecorder(configuration) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala new file mode 100644 index 00000000..28520030 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala @@ -0,0 +1,78 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package org.HdrHistogram + +import java.util.concurrent.atomic.AtomicLongFieldUpdater +import scala.annotation.tailrec +import kamon.metrics.{ Scale, MetricSnapshot, MetricSnapshotLike, MetricRecorder } + +/** + * This implementation aims to be used for real time data collection where data snapshots are taken often over time. + * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still + * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. + */ +class HdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) + extends AtomicHistogram(1L, highestTrackableValue, significantValueDigits) with MetricRecorder { + + import HdrRecorder.totalCountUpdater + + def record(value: Long): Unit = recordValue(value) + + def collect(): MetricSnapshotLike = { + val entries = Vector.newBuilder[MetricSnapshot.Measurement] + val countsLength = counts.length() + + @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { + if (index < countsLength) { + val currentValue = previousValue + increment + val countAtValue = counts.getAndSet(index, 0) + + if (countAtValue > 0) + entries += MetricSnapshot.Measurement(currentValue, countAtValue) + + if (currentValue == bucketLimit) + iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) + else + iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) + } else { + nrOfRecordings + } + } + + val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1) + + def tryUpdateTotalCount: Boolean = { + val previousTotalCount = getTotalCount + val newTotalCount = previousTotalCount - nrOfRecordings + + totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) + } + + while (!tryUpdateTotalCount) {} + + MetricSnapshot(nrOfRecordings, scale, entries.result()) + } + +} + +object HdrRecorder { + val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") + + def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale): HdrRecorder = + new HdrRecorder(highestTrackableValue, significantValueDigits, scale) + +} diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala deleted file mode 100644 index e31d0e11..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package org.HdrHistogram - -import java.util.concurrent.atomic.AtomicLongFieldUpdater -import scala.annotation.tailrec -import kamon.metrics.{ DefaultMetricSnapshot, MetricSnapshot, MetricRecorder } -import com.typesafe.config.Config -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration - -/** - * This implementation aims to be used for real time data collection where data snapshots are taken often over time. - * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still - * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. - */ -class HighDynamicRangeRecorder(configuration: Configuration) - extends AtomicHistogram(1L, configuration.highestTrackableValue, configuration.significantValueDigits) with MetricRecorder { - - import HighDynamicRangeRecorder.totalCountUpdater - - def record(value: Long): Unit = recordValue(value) - - def collect(): MetricSnapshot = { - val entries = Vector.newBuilder[MetricSnapshot.Measurement] - val countsLength = counts.length() - - @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { - if (index < countsLength) { - val currentValue = previousValue + increment - val countAtValue = counts.getAndSet(index, 0) - - if (countAtValue > 0) - entries += MetricSnapshot.Measurement(currentValue, countAtValue) - - if (currentValue == bucketLimit) - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) - else - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) - } else { - nrOfRecordings - } - } - - val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1) - - def tryUpdateTotalCount: Boolean = { - val previousTotalCount = getTotalCount - val newTotalCount = previousTotalCount - nrOfRecordings - - totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) - } - - while (!tryUpdateTotalCount) {} - - DefaultMetricSnapshot(nrOfRecordings, entries.result()) - } - -} - -object HighDynamicRangeRecorder { - val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") - - def apply(configuration: Configuration): HighDynamicRangeRecorder = new HighDynamicRangeRecorder(configuration) - - case class Configuration(highestTrackableValue: Long, significantValueDigits: Int) - - case object Configuration { - def fromConfig(config: Config): Configuration = { - Configuration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala index f89d84a3..640157a9 100644 --- a/kamon-core/src/main/scala/kamon/metrics/package.scala +++ b/kamon-core/src/main/scala/kamon/metrics/package.scala @@ -16,11 +16,24 @@ package kamon +import scala.annotation.tailrec +import com.typesafe.config.Config + package object metrics { - def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { - (left ++ right) map { - case (key, rightValue) ⇒ key -> left.get(key).map(leftValue ⇒ valueMerger(leftValue, rightValue)).getOrElse(rightValue) + case class HdrPrecisionConfig(highestTrackableValue: Long, significantValueDigits: Int) + + def extractPrecisionConfig(config: Config): HdrPrecisionConfig = + HdrPrecisionConfig(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) + + @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { + if (right.isEmpty) + left + else { + val (key, rightValue) = right.head + val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) + + combineMaps(left.updated(key, value), right.tail)(valueMerger) } } } diff --git a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala index ee1f949e..9980a022 100644 --- a/kamon-core/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/TraceContext.scala @@ -22,7 +22,7 @@ import kamon.metrics._ import java.util.concurrent.ConcurrentLinkedQueue import kamon.trace.TraceContextAware.DefaultTraceContextAware import kamon.trace.TraceContext.SegmentIdentity -import kamon.trace.SegmentData +import kamon.metrics.TraceMetrics.TraceMetricRecorder trait TraceContext { def name: String @@ -50,7 +50,7 @@ case object SimpleTrace extends TracingLevelOfDetail case object FullTrace extends TracingLevelOfDetail trait TraceContextAware { - def captureMark: Long + def captureNanoTime: Long def traceContext: Option[TraceContext] } @@ -58,7 +58,7 @@ object TraceContextAware { def default: TraceContextAware = new DefaultTraceContextAware class DefaultTraceContextAware extends TraceContextAware { - val captureMark = System.nanoTime() + val captureNanoTime = System.nanoTime() val traceContext = TraceRecorder.currentContext } } @@ -98,10 +98,10 @@ class SimpleMetricCollectionContext(@volatile private var _name: String, val tok } } - private def drainFinishedSegments(metricRecorder: MetricMultiGroupRecorder): Unit = { + private def drainFinishedSegments(metricRecorder: TraceMetricRecorder): Unit = { while (!finishedSegments.isEmpty) { val segmentData = finishedSegments.poll() - metricRecorder.record(segmentData.identity, segmentData.duration) + metricRecorder.segmentRecorder(segmentData.identity).record(segmentData.duration) } } @@ -119,11 +119,11 @@ class SimpleMetricCollectionContext(@volatile private var _name: String, val tok new SimpleMetricCollectionCompletionHandle(identity, metadata) class SimpleMetricCollectionCompletionHandle(identity: MetricIdentity, startMetadata: Map[String, String]) extends SegmentCompletionHandle { - val segmentStartMark = System.nanoTime() + val segmentStartNanoTime = System.nanoTime() def finish(metadata: Map[String, String] = Map.empty): Unit = { - val segmentFinishMark = System.nanoTime() - finishSegment(identity, (segmentFinishMark - segmentStartMark), startMetadata ++ metadata) + val segmentFinishNanoTime = System.nanoTime() + finishSegment(identity, (segmentFinishNanoTime - segmentStartNanoTime), startMetadata ++ metadata) } } } diff --git a/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala b/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala index f5caf6e9..1e072f71 100644 --- a/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala +++ b/kamon-core/src/test/scala/kamon/metrics/CustomMetricSpec.scala @@ -23,6 +23,7 @@ import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.MetricSnapshot.Measurement class CustomMetricSpec extends TestKitBase with WordSpecLike with Matchers { implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( @@ -41,20 +42,36 @@ class CustomMetricSpec extends TestKitBase with WordSpecLike with Matchers { "the Kamon custom metrics support" should { "allow registering a custom metric with the Metrics extension" in { - val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.withConfig(100, 2)) + val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.histogram(100, 2, Scale.Unit)) recorder should be('defined) } "allow subscriptions to custom metrics using the default subscription protocol" in { - val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.withConfig(100, 2)) - recorder.map(_.record(100)) + val recorder = Kamon(Metrics).register(CustomMetric("test/sample-counter"), CustomMetric.histogram(100, 2, Scale.Unit)) + + recorder.map { r ⇒ + r.record(100) + r.record(15) + r.record(0) + r.record(50) + } Kamon(Metrics).subscribe(CustomMetric, "test/sample-counter", testActor) - println(within(5 seconds) { - expectMsgType[TickMetricSnapshot] - }.metrics(CustomMetric("test/sample-counter"))) + val recordedValues = within(5 seconds) { + val snapshot = expectMsgType[TickMetricSnapshot] + snapshot.metrics(CustomMetric("test/sample-counter")).metrics(CustomMetric.RecordedValues) + } + + recordedValues.min should equal(0) + recordedValues.max should equal(100) + recordedValues.numberOfMeasurements should equal(4) + recordedValues.measurements should contain allOf ( + Measurement(0, 1), + Measurement(15, 1), + Measurement(50, 1), + Measurement(100, 1)) } } diff --git a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala index 1c5a4b21..c273aff1 100644 --- a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala +++ b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala @@ -38,31 +38,30 @@ class MetricSnapshotSpec extends WordSpec with Matchers { merged.min should be(1) merged.max should be(17) merged.numberOfMeasurements should be(200) - merged.measurementLevels.map(_.value) should contain inOrderOnly (1, 2, 4, 5, 7, 10, 17) + merged.measurements.map(_.value) should contain inOrderOnly (1, 2, 4, 5, 7, 10, 17) } "be able to merge with empty snapshots" in new SnapshotFixtures { snapshotA.merge(emptySnapshot) should be(snapshotA) + emptySnapshot.merge(snapshotA).merge(emptySnapshot) should be(snapshotA) } } trait SnapshotFixtures { - val emptySnapshot = DefaultMetricSnapshot(0, Vector.empty) + val emptySnapshot = MetricSnapshot(0, Scale.Unit, Vector.empty) - val snapshotA = DefaultMetricSnapshot(100, Vector( + val snapshotA = MetricSnapshot(100, Scale.Unit, Vector( Measurement(1, 3), Measurement(2, 15), Measurement(5, 68), Measurement(7, 13), Measurement(17, 1))) - val snapshotB = DefaultMetricSnapshot(100, Vector( + val snapshotB = MetricSnapshot(100, Scale.Unit, Vector( Measurement(2, 6), Measurement(4, 48), Measurement(5, 39), Measurement(10, 7))) - } - } diff --git a/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala b/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala new file mode 100644 index 00000000..33200e2d --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metrics/TickMetricSnapshotBufferSpec.scala @@ -0,0 +1,81 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.scalatest.{ Matchers, WordSpecLike } +import akka.testkit.TestKit +import akka.actor.ActorSystem +import scala.concurrent.duration._ +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.MetricSnapshot.Measurement + +class TickMetricSnapshotBufferSpec extends TestKit(ActorSystem("tick-metric-snapshot-buffer")) with WordSpecLike with Matchers { + + "the TickMetricSnapshotBuffer" should { + "merge TickMetricSnapshots received until the flush timeout is reached and fix the from/to fields" in new SnapshotFixtures { + val buffer = system.actorOf(TickMetricSnapshotBuffer.props(3 seconds, testActor)) + + buffer ! firstEmpty + buffer ! secondEmpty + buffer ! thirdEmpty + + within(2 seconds)(expectNoMsg()) + val mergedSnapshot = expectMsgType[TickMetricSnapshot] + + mergedSnapshot.from should equal(1000) + mergedSnapshot.to should equal(4000) + mergedSnapshot.metrics should be('empty) + } + + "merge empty and non-empty snapshots" in new SnapshotFixtures { + val buffer = system.actorOf(TickMetricSnapshotBuffer.props(3 seconds, testActor)) + + buffer ! firstNonEmpty + buffer ! secondNonEmpty + buffer ! thirdEmpty + + within(2 seconds)(expectNoMsg()) + val mergedSnapshot = expectMsgType[TickMetricSnapshot] + + mergedSnapshot.from should equal(1000) + mergedSnapshot.to should equal(4000) + mergedSnapshot.metrics should not be ('empty) + + val testMetricSnapshot = mergedSnapshot.metrics(CustomMetric("test-metric")).metrics(CustomMetric.RecordedValues) + testMetricSnapshot.min should equal(1) + testMetricSnapshot.max should equal(10) + testMetricSnapshot.numberOfMeasurements should equal(35) + testMetricSnapshot.measurements should contain allOf (Measurement(1, 10), Measurement(4, 9), Measurement(10, 16)) + + } + } + + trait SnapshotFixtures { + val firstEmpty = TickMetricSnapshot(1000, 2000, Map.empty) + val secondEmpty = TickMetricSnapshot(2000, 3000, Map.empty) + val thirdEmpty = TickMetricSnapshot(3000, 4000, Map.empty) + + val firstNonEmpty = TickMetricSnapshot(1000, 2000, + Map((CustomMetric("test-metric") -> SimpleGroupSnapshot(Map(CustomMetric.RecordedValues -> MetricSnapshot(20, Scale.Unit, Vector(Measurement(1, 10), Measurement(10, 10)))))))) + + val secondNonEmpty = TickMetricSnapshot(1000, 2000, + Map((CustomMetric("test-metric") -> SimpleGroupSnapshot(Map(CustomMetric.RecordedValues -> MetricSnapshot(15, Scale.Unit, Vector(Measurement(4, 9), Measurement(10, 6)))))))) + + } + + case class SimpleGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot +} -- cgit v1.2.3