From b6af84ab6b60b4ca6c0389c8c3622db3d3c27915 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Fri, 7 Mar 2014 18:08:01 -0300 Subject: multiple fixes to the custom metrics collection facilities --- .../main/scala/kamon/metrics/ActorMetrics.scala | 31 +++----- .../main/scala/kamon/metrics/CustomMetric.scala | 33 +++++---- .../src/main/scala/kamon/metrics/Metrics.scala | 39 ++++------ .../src/main/scala/kamon/metrics/Scale.scala | 31 ++++++++ .../main/scala/kamon/metrics/Subscriptions.scala | 8 +- .../main/scala/kamon/metrics/TraceMetrics.scala | 27 ++++--- .../instruments/ContinuousHdrRecorder.scala | 52 +++++++++++++ .../ContinuousHighDynamicRangeRecorder.scala | 50 ------------- .../kamon/metrics/instruments/HdrRecorder.scala | 78 ++++++++++++++++++++ .../instruments/HighDynamicRangeRecorder.scala | 86 ---------------------- .../src/main/scala/kamon/metrics/package.scala | 19 ++++- 11 files changed, 238 insertions(+), 216 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metrics/Scale.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala (limited to 'kamon-core/src/main/scala/kamon/metrics') diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala index b3685a93..86558375 100644 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala @@ -17,8 +17,8 @@ package kamon.metrics import com.typesafe.config.Config -import kamon.metrics.instruments.ContinuousHighDynamicRangeRecorder -import org.HdrHistogram.HighDynamicRangeRecorder +import kamon.metrics.instruments.ContinuousHdrRecorder +import org.HdrHistogram.HdrRecorder case class ActorMetrics(name: String) extends MetricGroupIdentity { val category = ActorMetrics @@ -32,23 +32,17 @@ object ActorMetrics extends MetricGroupCategory { case object TimeInMailbox extends MetricIdentity { val name, tag = "TimeInMailbox" } case class ActorMetricRecorder(processingTime: MetricRecorder, mailboxSize: MetricRecorder, timeInMailbox: MetricRecorder) - extends MetricMultiGroupRecorder { - - def record(identity: MetricIdentity, value: Long): Unit = identity match { - case ProcessingTime ⇒ processingTime.record(value) - case MailboxSize ⇒ mailboxSize.record(value) - case TimeInMailbox ⇒ timeInMailbox.record(value) - } + extends MetricGroupRecorder { def collect: MetricGroupSnapshot = { ActorMetricSnapshot(processingTime.collect(), mailboxSize.collect(), timeInMailbox.collect()) } } - case class ActorMetricSnapshot(processingTime: MetricSnapshot, mailboxSize: MetricSnapshot, timeInMailbox: MetricSnapshot) + case class ActorMetricSnapshot(processingTime: MetricSnapshotLike, mailboxSize: MetricSnapshotLike, timeInMailbox: MetricSnapshotLike) extends MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] = Map( + val metrics: Map[MetricIdentity, MetricSnapshotLike] = Map( (ProcessingTime -> processingTime), (MailboxSize -> mailboxSize), (TimeInMailbox -> timeInMailbox)) @@ -58,17 +52,16 @@ object ActorMetrics extends MetricGroupCategory { type GroupRecorder = ActorMetricRecorder def create(config: Config): ActorMetricRecorder = { - import HighDynamicRangeRecorder.Configuration - val settings = config.getConfig("kamon.metrics.precision.actor") - val processingTimeHdrConfig = Configuration.fromConfig(settings.getConfig("processing-time")) - val mailboxSizeHdrConfig = Configuration.fromConfig(settings.getConfig("mailbox-size")) - val timeInMailboxHdrConfig = Configuration.fromConfig(settings.getConfig("time-in-mailbox")) + + val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time")) + val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size")) + val timeInMailboxConfig = extractPrecisionConfig(settings.getConfig("time-in-mailbox")) new ActorMetricRecorder( - HighDynamicRangeRecorder(processingTimeHdrConfig), - ContinuousHighDynamicRangeRecorder(mailboxSizeHdrConfig), - HighDynamicRangeRecorder(timeInMailboxHdrConfig)) + HdrRecorder(processingTimeConfig.highestTrackableValue, processingTimeConfig.significantValueDigits, Scale.Nano), + ContinuousHdrRecorder(mailboxSizeConfig.highestTrackableValue, mailboxSizeConfig.significantValueDigits, Scale.Unit), + HdrRecorder(timeInMailboxConfig.highestTrackableValue, timeInMailboxConfig.significantValueDigits, Scale.Nano)) } } } diff --git a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala index 11cb6c3d..82affe47 100644 --- a/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala +++ b/kamon-core/src/main/scala/kamon/metrics/CustomMetric.scala @@ -16,9 +16,8 @@ package kamon.metrics -import kamon.metrics.instruments.ContinuousHighDynamicRangeRecorder -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration -import org.HdrHistogram.HighDynamicRangeRecorder +import kamon.metrics.instruments.ContinuousHdrRecorder +import org.HdrHistogram.HdrRecorder import com.typesafe.config.Config case class CustomMetric(name: String) extends MetricGroupIdentity { @@ -27,25 +26,27 @@ case class CustomMetric(name: String) extends MetricGroupIdentity { object CustomMetric extends MetricGroupCategory { val name = "custom-metric" - private val identity = new MetricIdentity { val name, tag = "RecordedValues" } + val RecordedValues = new MetricIdentity { val name, tag = "RecordedValues" } - def withConfig(highestTrackableValue: Long, significantValueDigits: Int, continuous: Boolean = false) = new MetricGroupFactory { - type GroupRecorder = CustomMetricRecorder + def histogram(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale, continuous: Boolean = false) = + new MetricGroupFactory { - def create(config: Config): CustomMetricRecorder = - if (continuous) - new CustomMetricRecorder(identity, ContinuousHighDynamicRangeRecorder(Configuration(highestTrackableValue, significantValueDigits))) - else - new CustomMetricRecorder(identity, HighDynamicRangeRecorder(Configuration(highestTrackableValue, significantValueDigits))) - } + type GroupRecorder = CustomMetricRecorder - class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HighDynamicRangeRecorder) - extends MetricSingleGroupRecorder { + def create(config: Config): CustomMetricRecorder = { + val recorder = + if (continuous) ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) + else HdrRecorder(highestTrackableValue, significantValueDigits, scale) - def collect: MetricGroupSnapshot = new MetricGroupSnapshot { - val metrics: Map[MetricIdentity, MetricSnapshot] = Map((identity, underlyingRecorder.collect())) + new CustomMetricRecorder(RecordedValues, recorder) + } } + class CustomMetricRecorder(identity: MetricIdentity, underlyingRecorder: HdrRecorder) + extends MetricGroupRecorder { + def record(value: Long): Unit = underlyingRecorder.record(value) + + def collect: MetricGroupSnapshot = DefaultMetricGroupSnapshot(Map((identity, underlyingRecorder.collect()))) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala index f7ee833b..b82f429f 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala @@ -34,35 +34,30 @@ trait MetricIdentity { def tag: String } -sealed trait MetricGroupRecorder { +trait MetricGroupRecorder { def collect: MetricGroupSnapshot } -trait MetricMultiGroupRecorder extends MetricGroupRecorder { - def record(identity: MetricIdentity, value: Long) -} - -trait MetricSingleGroupRecorder extends MetricGroupRecorder { - def record(value: Long) -} - trait MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] + def metrics: Map[MetricIdentity, MetricSnapshotLike] } +case class DefaultMetricGroupSnapshot(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot + trait MetricRecorder { def record(value: Long) - def collect(): MetricSnapshot + def collect(): MetricSnapshotLike } -trait MetricSnapshot { +trait MetricSnapshotLike { def numberOfMeasurements: Long - def measurementLevels: Vector[Measurement] + def scale: Scale + def measurements: Vector[Measurement] - def max: Long = measurementLevels.lastOption.map(_.value).getOrElse(0) - def min: Long = measurementLevels.headOption.map(_.value).getOrElse(0) + def max: Long = measurements.lastOption.map(_.value).getOrElse(0) + def min: Long = measurements.headOption.map(_.value).getOrElse(0) - def merge(that: MetricSnapshot): MetricSnapshot = { + def merge(that: MetricSnapshotLike): MetricSnapshotLike = { val mergedMeasurements = Vector.newBuilder[Measurement] @tailrec def go(left: Vector[Measurement], right: Vector[Measurement], totalNrOfMeasurements: Long): Long = { @@ -96,23 +91,19 @@ trait MetricSnapshot { } } - val totalNrOfMeasurements = go(measurementLevels, that.measurementLevels, 0) - DefaultMetricSnapshot(totalNrOfMeasurements, mergedMeasurements.result()) + val totalNrOfMeasurements = go(measurements, that.measurements, 0) + MetricSnapshot(totalNrOfMeasurements, scale, mergedMeasurements.result()) } } +case class MetricSnapshot(numberOfMeasurements: Long, scale: Scale, measurements: Vector[MetricSnapshot.Measurement]) extends MetricSnapshotLike + object MetricSnapshot { case class Measurement(value: Long, count: Long) { def merge(that: Measurement) = Measurement(value, count + that.count) } } -case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot - -object DefaultMetricSnapshot { - val empty = DefaultMetricSnapshot(0, Vector.empty) -} - trait MetricGroupFactory { type GroupRecorder <: MetricGroupRecorder def create(config: Config): GroupRecorder diff --git a/kamon-core/src/main/scala/kamon/metrics/Scale.scala b/kamon-core/src/main/scala/kamon/metrics/Scale.scala new file mode 100644 index 00000000..6899490a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/Scale.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +class Scale(val numericValue: Double) extends AnyVal + +object Scale { + val Nano = new Scale(1E-9) + val Micro = new Scale(1E-6) + val Milli = new Scale(1E-3) + val Unit = new Scale(1) + val Kilo = new Scale(1E3) + val Mega = new Scale(1E6) + val Giga = new Scale(1E9) + + def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue +} diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala index 4b022377..e398ebe0 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -22,7 +22,7 @@ import kamon.util.GlobPathFilter import scala.concurrent.duration.{ FiniteDuration, Duration } import java.util.concurrent.TimeUnit import kamon.Kamon -import kamon.metrics.TickMetricSnapshotBuffer.FlushBuffer +import kamon.metrics.TickMetricSnapshotBuffer.{ Combined, FlushBuffer } class Subscriptions extends Actor { import context.system @@ -116,14 +116,14 @@ class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef super.postStop() } - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = new MetricGroupSnapshot { - val metrics = combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r)) - } + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r))) } object TickMetricSnapshotBuffer { case object FlushBuffer + case class Combined(metrics: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) } diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala index abea3d82..9d787428 100644 --- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala @@ -16,7 +16,7 @@ package kamon.metrics -import org.HdrHistogram.HighDynamicRangeRecorder +import org.HdrHistogram.HdrRecorder import scala.collection.concurrent.TrieMap import com.typesafe.config.Config @@ -30,37 +30,36 @@ object TraceMetrics extends MetricGroupCategory { case object ElapsedTime extends MetricIdentity { val name, tag = "ElapsedTime" } case class HttpClientRequest(name: String, tag: String) extends MetricIdentity - class TraceMetricRecorder(val elapsedTime: HighDynamicRangeRecorder, private val segmentRecorderFactory: () ⇒ HighDynamicRangeRecorder) - extends MetricMultiGroupRecorder { + class TraceMetricRecorder(val elapsedTime: HdrRecorder, private val segmentRecorderFactory: () ⇒ HdrRecorder) + extends MetricGroupRecorder { - private val segments = TrieMap[MetricIdentity, HighDynamicRangeRecorder]() + private val segments = TrieMap[MetricIdentity, HdrRecorder]() - def record(identity: MetricIdentity, value: Long): Unit = identity match { - case ElapsedTime ⇒ elapsedTime.record(value) - case id: MetricIdentity ⇒ segments.getOrElseUpdate(id, segmentRecorderFactory.apply()).record(value) - } + def segmentRecorder(segmentIdentity: MetricIdentity): HdrRecorder = + segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) def collect: MetricGroupSnapshot = TraceMetricSnapshot(elapsedTime.collect(), segments.map { case (identity, recorder) ⇒ (identity, recorder.collect()) }.toMap) } - case class TraceMetricSnapshot(elapsedTime: MetricSnapshot, segments: Map[MetricIdentity, MetricSnapshot]) + case class TraceMetricSnapshot(elapsedTime: MetricSnapshotLike, segments: Map[MetricIdentity, MetricSnapshotLike]) extends MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) + def metrics: Map[MetricIdentity, MetricSnapshotLike] = segments + (ElapsedTime -> elapsedTime) } val Factory = new MetricGroupFactory { type GroupRecorder = TraceMetricRecorder def create(config: Config): TraceMetricRecorder = { - import HighDynamicRangeRecorder.Configuration val settings = config.getConfig("kamon.metrics.precision.trace") - val elapsedTimeHdrConfig = Configuration.fromConfig(settings.getConfig("elapsed-time")) - val segmentHdrConfig = Configuration.fromConfig(settings.getConfig("segment")) + val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time")) + val segmentConfig = extractPrecisionConfig(settings.getConfig("segment")) - new TraceMetricRecorder(HighDynamicRangeRecorder(elapsedTimeHdrConfig), () ⇒ HighDynamicRangeRecorder(segmentHdrConfig)) + new TraceMetricRecorder( + HdrRecorder(elapsedTimeConfig.highestTrackableValue, elapsedTimeConfig.significantValueDigits, Scale.Nano), + () ⇒ HdrRecorder(segmentConfig.highestTrackableValue, segmentConfig.significantValueDigits, Scale.Nano)) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala new file mode 100644 index 00000000..3a39ec69 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHdrRecorder.scala @@ -0,0 +1,52 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics.instruments + +import org.HdrHistogram.HdrRecorder +import kamon.metrics.{ Scale, MetricSnapshotLike } + +/** + * This recorder keeps track of the last value recoded and automatically adds it after collecting a snapshot. This is + * useful in cases where the absence of recordings does not necessarily mean the absence of values. For example, if this + * recorder is used for recording the mailbox size of an actor, and it only gets updated upon message enqueue o dequeue, + * the absence of recordings during 1 second means that the size hasn't change (example: the actor being blocked doing + * some work) and it should keep its last known value, instead of dropping to zero and then going back to the real value + * after a new event is processed. + * + */ +class ContinuousHdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) + extends HdrRecorder(highestTrackableValue, significantValueDigits, scale) { + + @volatile private var lastRecordedValue: Long = 0 + + override def record(value: Long): Unit = { + lastRecordedValue = value + super.record(value) + } + + override def collect(): MetricSnapshotLike = { + val snapshot = super.collect() + super.record(lastRecordedValue) + + snapshot + } +} + +object ContinuousHdrRecorder { + def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) = + new ContinuousHdrRecorder(highestTrackableValue, significantValueDigits, scale) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala deleted file mode 100644 index 533e06b1..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/ContinuousHighDynamicRangeRecorder.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics.instruments - -import org.HdrHistogram.HighDynamicRangeRecorder -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration -import kamon.metrics.MetricSnapshot - -/** - * This recorder keeps track of the last value recoded and automatically adds it after collecting a snapshot. This is - * useful in cases where the absence of recordings does not necessarily mean the absence of values. For example, if this - * recorder is used for recording the mailbox size of an actor, and it only gets updated upon message enqueue o dequeue, - * the absence of recordings during 1 second means that the size hasn't change (example: the actor being blocked doing - * some work) and it should keep its last known value, instead of dropping to zero and then going back to the real value - * after a new event is processed. - * - */ -class ContinuousHighDynamicRangeRecorder(configuration: Configuration) extends HighDynamicRangeRecorder(configuration) { - @volatile private var lastRecordedValue: Long = 0 - - override def record(value: Long): Unit = { - lastRecordedValue = value - super.record(value) - } - - override def collect(): MetricSnapshot = { - val snapshot = super.collect() - super.record(lastRecordedValue) - - snapshot - } -} - -object ContinuousHighDynamicRangeRecorder { - def apply(configuration: Configuration) = new ContinuousHighDynamicRangeRecorder(configuration) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala new file mode 100644 index 00000000..28520030 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/instruments/HdrRecorder.scala @@ -0,0 +1,78 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package org.HdrHistogram + +import java.util.concurrent.atomic.AtomicLongFieldUpdater +import scala.annotation.tailrec +import kamon.metrics.{ Scale, MetricSnapshot, MetricSnapshotLike, MetricRecorder } + +/** + * This implementation aims to be used for real time data collection where data snapshots are taken often over time. + * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still + * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. + */ +class HdrRecorder(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale) + extends AtomicHistogram(1L, highestTrackableValue, significantValueDigits) with MetricRecorder { + + import HdrRecorder.totalCountUpdater + + def record(value: Long): Unit = recordValue(value) + + def collect(): MetricSnapshotLike = { + val entries = Vector.newBuilder[MetricSnapshot.Measurement] + val countsLength = counts.length() + + @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { + if (index < countsLength) { + val currentValue = previousValue + increment + val countAtValue = counts.getAndSet(index, 0) + + if (countAtValue > 0) + entries += MetricSnapshot.Measurement(currentValue, countAtValue) + + if (currentValue == bucketLimit) + iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) + else + iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) + } else { + nrOfRecordings + } + } + + val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1) + + def tryUpdateTotalCount: Boolean = { + val previousTotalCount = getTotalCount + val newTotalCount = previousTotalCount - nrOfRecordings + + totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) + } + + while (!tryUpdateTotalCount) {} + + MetricSnapshot(nrOfRecordings, scale, entries.result()) + } + +} + +object HdrRecorder { + val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") + + def apply(highestTrackableValue: Long, significantValueDigits: Int, scale: Scale): HdrRecorder = + new HdrRecorder(highestTrackableValue, significantValueDigits, scale) + +} diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala deleted file mode 100644 index e31d0e11..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package org.HdrHistogram - -import java.util.concurrent.atomic.AtomicLongFieldUpdater -import scala.annotation.tailrec -import kamon.metrics.{ DefaultMetricSnapshot, MetricSnapshot, MetricRecorder } -import com.typesafe.config.Config -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration - -/** - * This implementation aims to be used for real time data collection where data snapshots are taken often over time. - * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still - * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. - */ -class HighDynamicRangeRecorder(configuration: Configuration) - extends AtomicHistogram(1L, configuration.highestTrackableValue, configuration.significantValueDigits) with MetricRecorder { - - import HighDynamicRangeRecorder.totalCountUpdater - - def record(value: Long): Unit = recordValue(value) - - def collect(): MetricSnapshot = { - val entries = Vector.newBuilder[MetricSnapshot.Measurement] - val countsLength = counts.length() - - @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { - if (index < countsLength) { - val currentValue = previousValue + increment - val countAtValue = counts.getAndSet(index, 0) - - if (countAtValue > 0) - entries += MetricSnapshot.Measurement(currentValue, countAtValue) - - if (currentValue == bucketLimit) - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) - else - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) - } else { - nrOfRecordings - } - } - - val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1) - - def tryUpdateTotalCount: Boolean = { - val previousTotalCount = getTotalCount - val newTotalCount = previousTotalCount - nrOfRecordings - - totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) - } - - while (!tryUpdateTotalCount) {} - - DefaultMetricSnapshot(nrOfRecordings, entries.result()) - } - -} - -object HighDynamicRangeRecorder { - val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") - - def apply(configuration: Configuration): HighDynamicRangeRecorder = new HighDynamicRangeRecorder(configuration) - - case class Configuration(highestTrackableValue: Long, significantValueDigits: Int) - - case object Configuration { - def fromConfig(config: Config): Configuration = { - Configuration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala index f89d84a3..640157a9 100644 --- a/kamon-core/src/main/scala/kamon/metrics/package.scala +++ b/kamon-core/src/main/scala/kamon/metrics/package.scala @@ -16,11 +16,24 @@ package kamon +import scala.annotation.tailrec +import com.typesafe.config.Config + package object metrics { - def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { - (left ++ right) map { - case (key, rightValue) ⇒ key -> left.get(key).map(leftValue ⇒ valueMerger(leftValue, rightValue)).getOrElse(rightValue) + case class HdrPrecisionConfig(highestTrackableValue: Long, significantValueDigits: Int) + + def extractPrecisionConfig(config: Config): HdrPrecisionConfig = + HdrPrecisionConfig(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) + + @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { + if (right.isEmpty) + left + else { + val (key, rightValue) = right.head + val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) + + combineMaps(left.updated(key, value), right.tail)(valueMerger) } } } -- cgit v1.2.3