diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-07-03 14:36:42 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-07-03 14:36:18 -0300 |
commit | a3353d3e3fcb1dfab3e8f401187e236e99df2202 (patch) | |
tree | 4e9e246201cf169f1496bc72928ea2d35d03fcd0 /kamon-core/src/main/scala/kamon/metric | |
parent | 6d7970c6dd5b96b512c846181771bb11a43bc82a (diff) | |
download | Kamon-a3353d3e3fcb1dfab3e8f401187e236e99df2202.tar.gz Kamon-a3353d3e3fcb1dfab3e8f401187e236e99df2202.tar.bz2 Kamon-a3353d3e3fcb1dfab3e8f401187e236e99df2202.zip |
! all: refactor the core metric recording instruments and accomodate UserMetrics
This PR is including several changes to the kamon-core, most notably:
- Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure
that the interfaces are as clean as possible.
- Move away from the all Vector[Measurement] based Histogram snapshot to a new approach
in which we use a single long to store both the index in the counts array and the
frequency on that bucket. The leftmost 2 bytes of each long are used for storing the
counts array index and the remaining 6 bytes are used for the actual count, and
everything is put into a simple long array. This way only the buckets that actually
have values will be included in the snapshot with the smallest possible memory
footprint.
- Introduce Gauges.
- Reorganize the instrumentation for Akka and Scala and rewrite most of the tests
of this components to avoid going through the subscription protocol to test.
- Introduce trace tests and fixes on various tests.
- Necessary changes on new relic, datadog and statsd modules to compile with the new
codebase.
Pending:
- Finish the upgrade of the new relic to the current model.
- Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes
limits.
- More testing, more testing, more testing.
- Create the KamonStandalone module.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
13 files changed, 1270 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala new file mode 100644 index 00000000..bb412f79 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala @@ -0,0 +1,89 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram } + +case class ActorMetrics(name: String) extends MetricGroupIdentity { + val category = ActorMetrics +} + +object ActorMetrics extends MetricGroupCategory { + val name = "actor" + + case object ProcessingTime extends MetricIdentity { val name = "processing-time" } + case object MailboxSize extends MetricIdentity { val name = "mailbox-size" } + case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" } + case object Errors extends MetricIdentity { val name = "errors" } + + case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter, + errors: Counter) extends MetricGroupRecorder { + + def collect(context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.collect(context), + timeInMailbox.collect(context), + mailboxSize.collect(context), + errors.collect(context)) + + def cleanup: Unit = { + processingTime.cleanup + mailboxSize.cleanup + timeInMailbox.cleanup + errors.cleanup + } + } + + case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot, + mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = ActorMetricSnapshot + + def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot = + ActorMetricSnapshot( + processingTime.merge(that.processingTime, context), + timeInMailbox.merge(that.timeInMailbox, context), + mailboxSize.merge(that.mailboxSize, context), + errors.merge(that.errors, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (ProcessingTime -> processingTime), + (MailboxSize -> mailboxSize), + (TimeInMailbox -> timeInMailbox), + (Errors -> errors)) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = ActorMetricsRecorder + + def create(config: Config, system: ActorSystem): ActorMetricsRecorder = { + val settings = config.getConfig("precision.actor") + + val processingTimeConfig = settings.getConfig("processing-time") + val timeInMailboxConfig = settings.getConfig("time-in-mailbox") + val mailboxSizeConfig = settings.getConfig("mailbox-size") + + new ActorMetricsRecorder( + Histogram.fromConfig(processingTimeConfig), + Histogram.fromConfig(timeInMailboxConfig), + MinMaxCounter.fromConfig(mailboxSizeConfig, system), + Counter()) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala new file mode 100644 index 00000000..fbce783c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala @@ -0,0 +1,88 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ Histogram, HdrHistogram } + +case class DispatcherMetrics(name: String) extends MetricGroupIdentity { + val category = DispatcherMetrics +} + +object DispatcherMetrics extends MetricGroupCategory { + val name = "dispatcher" + + case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" } + case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" } + case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" } + case object PoolSize extends MetricIdentity { val name = "pool-size" } + + case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram, + queueTaskCount: Histogram, poolSize: Histogram) + extends MetricGroupRecorder { + + def collect(context: CollectionContext): MetricGroupSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.collect(context), + runningThreadCount.collect(context), + queueTaskCount.collect(context), + poolSize.collect(context)) + + def cleanup: Unit = {} + + } + + case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot, + queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot { + + type GroupSnapshotType = DispatcherMetricSnapshot + + def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot = + DispatcherMetricSnapshot( + maximumPoolSize.merge(that.maximumPoolSize, context), + runningThreadCount.merge(that.runningThreadCount, context), + queueTaskCount.merge(that.queueTaskCount, context), + poolSize.merge(that.poolSize, context)) + + lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map( + (MaximumPoolSize -> maximumPoolSize), + (RunningThreadCount -> runningThreadCount), + (QueueTaskCount -> queueTaskCount), + (PoolSize -> poolSize)) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = DispatcherMetricRecorder + + def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = { + val settings = config.getConfig("precision.dispatcher") + + val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size") + val runningThreadCountConfig = settings.getConfig("running-thread-count") + val queueTaskCountConfig = settings.getConfig("queued-task-count") + val poolSizeConfig = settings.getConfig("pool-size") + + new DispatcherMetricRecorder( + Histogram.fromConfig(maximumPoolSizeConfig), + Histogram.fromConfig(runningThreadCountConfig), + Histogram.fromConfig(queueTaskCountConfig), + Histogram.fromConfig(poolSizeConfig)) + } + } +} + diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala new file mode 100644 index 00000000..325dd216 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala @@ -0,0 +1,75 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.{ LongBuffer } +import akka.actor.ActorSystem +import com.typesafe.config.Config + +trait MetricGroupCategory { + def name: String +} + +trait MetricGroupIdentity { + def name: String + def category: MetricGroupCategory +} + +trait MetricIdentity { + def name: String +} + +trait CollectionContext { + def buffer: LongBuffer +} + +object CollectionContext { + def default: CollectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(10000) + } +} + +trait MetricGroupRecorder { + def collect(context: CollectionContext): MetricGroupSnapshot + def cleanup: Unit +} + +trait MetricSnapshot { + type SnapshotType + + def merge(that: SnapshotType, context: CollectionContext): SnapshotType +} + +trait MetricGroupSnapshot { + type GroupSnapshotType + + def metrics: Map[MetricIdentity, MetricSnapshot] + def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType +} + +private[kamon] trait MetricRecorder { + type SnapshotType <: MetricSnapshot + + def collect(context: CollectionContext): SnapshotType + def cleanup: Unit +} + +trait MetricGroupFactory { + type GroupRecorder <: MetricGroupRecorder + def create(config: Config, system: ActorSystem): GroupRecorder +} + diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala new file mode 100644 index 00000000..1025f0de --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -0,0 +1,110 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import java.nio.{ LongBuffer, ByteBuffer } + +import scala.collection.concurrent.TrieMap +import akka.actor._ +import com.typesafe.config.Config +import kamon.util.GlobPathFilter +import kamon.Kamon +import akka.actor +import kamon.metric.Metrics.MetricGroupFilter +import kamon.metric.Subscriptions.Subscribe +import java.util.concurrent.TimeUnit + +class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") + + /** Configured Dispatchers */ + val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) + val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) + + /** Configuration Settings */ + val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS) + + val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() + val filters = loadFilters(metricsExtConfig) + lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") + + def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { + if (shouldTrack(identity)) + Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) + else + None + } + + def unregister(identity: MetricGroupIdentity): Unit = { + storage.remove(identity) + } + + def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { + subscriptions.tell(Subscribe(category, selection, permanently), receiver) + } + + def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = { + // TODO: Improve the way in which we are getting the context. + val context = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(50000) + } + (for ((identity, recorder) ← storage) yield (identity, recorder.collect(context))).toMap + } + + def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { + import scala.concurrent.duration._ + + system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { + body + }(gaugeRecordingsDispatcher) + } + + private def shouldTrack(identity: MetricGroupIdentity): Boolean = { + filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true) + } + + def loadFilters(config: Config): Map[String, MetricGroupFilter] = { + import scala.collection.JavaConverters._ + + val filters = config.getObjectList("filters").asScala + + val allFilters = + for ( + filter ← filters; + entry ← filter.entrySet().asScala + ) yield { + val key = entry.getKey + val keyBasedConfig = entry.getValue.atKey(key) + + val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList + val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + + (key, MetricGroupFilter(includes, excludes)) + } + + allFilters.toMap + } +} + +object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) + + case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { + def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/Scale.scala b/kamon-core/src/main/scala/kamon/metric/Scale.scala new file mode 100644 index 00000000..2f27c1a3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Scale.scala @@ -0,0 +1,31 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +class Scale(val numericValue: Double) extends AnyVal + +object Scale { + val Nano = new Scale(1E-9) + val Micro = new Scale(1E-6) + val Milli = new Scale(1E-3) + val Unit = new Scale(1) + val Kilo = new Scale(1E3) + val Mega = new Scale(1E6) + val Giga = new Scale(1E9) + + def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue +} diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala new file mode 100644 index 00000000..a9f4c721 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala @@ -0,0 +1,128 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.{ Props, ActorRef, Actor } +import kamon.metric.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } +import kamon.util.GlobPathFilter +import scala.concurrent.duration.{ FiniteDuration, Duration } +import java.util.concurrent.TimeUnit +import kamon.Kamon +import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer + +class Subscriptions extends Actor { + import context.system + + val config = context.system.settings.config + val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var lastTick: Long = System.currentTimeMillis() + var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + + def receive = { + case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent) + case FlushMetrics ⇒ flush() + } + + def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = { + val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) + if (permanent) { + val receivers = subscribedPermanently.get(filter).getOrElse(Nil) + subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) + + } else { + val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) + subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) + } + + } + + def flush(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(Metrics).collect + + dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) + dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) + + lastTick = currentTick + subscribedForOneShot = Map.empty + } + + def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], + snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { + + for ((filter, receivers) ← subscriptions) yield { + val selection = snapshots.filter(group ⇒ filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + receivers.foreach(_ ! tickMetrics) + } + } +} + +object Subscriptions { + case object FlushMetrics + case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false) + case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) + + case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) { + def accept(identity: MetricGroupIdentity): Boolean = { + category.equals(identity.category) && globFilter.accept(identity.name) + } + } +} + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + val collectionContext = CollectionContext.default + + def receive = empty + + def empty: Actor.Receive = { + case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) + case FlushBuffer ⇒ // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) ⇒ + val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become (buffering(combinedSnapshot)) + + case FlushBuffer ⇒ + receiver ! buffered + context become (empty) + + } + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } + + def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext))) +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) +} diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala new file mode 100644 index 00000000..1ee1eab4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -0,0 +1,77 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.ActorSystem +import kamon.metric.instrument.{ Histogram } + +import scala.collection.concurrent.TrieMap +import com.typesafe.config.Config + +case class TraceMetrics(name: String) extends MetricGroupIdentity { + val category = TraceMetrics +} + +object TraceMetrics extends MetricGroupCategory { + val name = "trace" + + case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } + case class HttpClientRequest(name: String) extends MetricIdentity + + case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) + extends MetricGroupRecorder { + + private val segments = TrieMap[MetricIdentity, Histogram]() + + def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = + segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) + + def collect(context: CollectionContext): TraceMetricsSnapshot = + TraceMetricsSnapshot( + elapsedTime.collect(context), + segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap) + + def cleanup: Unit = {} + } + + case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot]) + extends MetricGroupSnapshot { + + type GroupSnapshotType = TraceMetricsSnapshot + + def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = + TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty) + + def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = TraceMetricRecorder + + def create(config: Config, system: ActorSystem): TraceMetricRecorder = { + + val settings = config.getConfig("precision.trace") + val elapsedTimeConfig = settings.getConfig("elapsed-time") + val segmentConfig = settings.getConfig("segment") + + new TraceMetricRecorder( + Histogram.fromConfig(elapsedTimeConfig), + () ⇒ Histogram.fromConfig(segmentConfig)) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala new file mode 100644 index 00000000..dea03968 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -0,0 +1,139 @@ +package kamon.metric + +import akka.actor +import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import com.typesafe.config.Config +import kamon.Kamon +import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.FiniteDuration + +class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + lazy val userMetricsRecorder = Kamon(Metrics)(system).register(UserMetrics, UserMetrics.Factory).get + + def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = + userMetricsRecorder.buildHistogram(name, precision, highestTrackableValue) + + def registerHistogram(name: String): Histogram = + userMetricsRecorder.buildHistogram(name) + + def registerCounter(name: String): Counter = + userMetricsRecorder.buildCounter(name) + + def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration): MinMaxCounter = { + userMetricsRecorder.buildMinMaxCounter(name, precision, highestTrackableValue, refreshInterval) + } + + def registerMinMaxCounter(name: String): MinMaxCounter = + userMetricsRecorder.buildMinMaxCounter(name) + + def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + userMetricsRecorder.buildGauge(name)(currentValueCollector) + + def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + userMetricsRecorder.buildGauge(name, precision, highestTrackableValue, refreshInterval, currentValueCollector) +} + +object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider with MetricGroupIdentity { + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system) + + val name: String = "user-metrics-recorder" + val category = new MetricGroupCategory { + val name: String = "user-metrics" + } + + val Factory = new MetricGroupFactory { + type GroupRecorder = UserMetricsRecorder + def create(config: Config, system: ActorSystem): UserMetricsRecorder = new UserMetricsRecorder(system) + } + + class UserMetricsRecorder(system: ActorSystem) extends MetricGroupRecorder { + val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision") + val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision") + val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision") + val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") + + val histograms = TrieMap[String, Histogram]() + val counters = TrieMap[String, Counter]() + val minMaxCounters = TrieMap[String, MinMaxCounter]() + val gauges = TrieMap[String, Gauge]() + + def buildHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = + histograms.getOrElseUpdate(name, Histogram(highestTrackableValue, precision, Scale.Unit)) + + def buildHistogram(name: String): Histogram = + histograms.getOrElseUpdate(name, Histogram.fromConfig(defaultHistogramPrecisionConfig)) + + def buildCounter(name: String): Counter = + counters.getOrElseUpdate(name, Counter()) + + def buildMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration): MinMaxCounter = { + minMaxCounters.getOrElseUpdate(name, MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) + } + + def buildMinMaxCounter(name: String): MinMaxCounter = + minMaxCounters.getOrElseUpdate(name, MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) + + def buildGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, + refreshInterval: FiniteDuration, currentValueCollector: Gauge.CurrentValueCollector): Gauge = + gauges.getOrElseUpdate(name, Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) + + def buildGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = + gauges.getOrElseUpdate(name, Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) + + def collect(context: CollectionContext): UserMetricsSnapshot = { + val histogramSnapshots = histograms.map { + case (name, histogram) ⇒ + (UserHistogram(name), histogram.collect(context)) + } toMap + + val counterSnapshots = counters.map { + case (name, counter) ⇒ + (UserCounter(name), counter.collect(context)) + } toMap + + val minMaxCounterSnapshots = minMaxCounters.map { + case (name, minMaxCounter) ⇒ + (UserMinMaxCounter(name), minMaxCounter.collect(context)) + } toMap + + val gaugeSnapshots = gauges.map { + case (name, gauge) ⇒ + (UserGauge(name), gauge.collect(context)) + } toMap + + UserMetricsSnapshot(histogramSnapshots, counterSnapshots, minMaxCounterSnapshots, gaugeSnapshots) + } + + def cleanup: Unit = {} + } + + case class UserHistogram(name: String) extends MetricIdentity + case class UserCounter(name: String) extends MetricIdentity + case class UserMinMaxCounter(name: String) extends MetricIdentity + case class UserGauge(name: String) extends MetricIdentity + + case class UserMetricsSnapshot(histograms: Map[UserHistogram, Histogram.Snapshot], + counters: Map[UserCounter, Counter.Snapshot], + minMaxCounters: Map[UserMinMaxCounter, Histogram.Snapshot], + gauges: Map[UserGauge, Histogram.Snapshot]) + extends MetricGroupSnapshot { + + type GroupSnapshotType = UserMetricsSnapshot + + def merge(that: UserMetricsSnapshot, context: CollectionContext): UserMetricsSnapshot = + UserMetricsSnapshot( + combineMaps(histograms, that.histograms)((l, r) ⇒ l.merge(r, context)), + combineMaps(counters, that.counters)((l, r) ⇒ l.merge(r, context)), + combineMaps(minMaxCounters, that.minMaxCounters)((l, r) ⇒ l.merge(r, context)), + combineMaps(gauges, that.gauges)((l, r) ⇒ l.merge(r, context))) + + def metrics: Map[MetricIdentity, MetricSnapshot] = histograms ++ counters ++ minMaxCounters ++ gauges + } + +} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala new file mode 100644 index 00000000..b592bcd3 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -0,0 +1,59 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import jsr166e.LongAdder +import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder } + +trait Counter extends MetricRecorder { + type SnapshotType = Counter.Snapshot + + def increment(): Unit + def increment(times: Long): Unit +} + +object Counter { + + def apply(): Counter = new LongAdderCounter + + trait Snapshot extends MetricSnapshot { + type SnapshotType = Counter.Snapshot + + def count: Long + def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot + } +} + +class LongAdderCounter extends Counter { + private val counter = new LongAdder + + def increment(): Unit = counter.increment() + + def increment(times: Long): Unit = { + if (times < 0) + throw new UnsupportedOperationException("Counters cannot be decremented") + counter.add(times) + } + + def collect(context: CollectionContext): Counter.Snapshot = CounterSnapshot(counter.sumThenReset()) + + def cleanup: Unit = {} +} + +case class CounterSnapshot(count: Long) extends Counter.Snapshot { + def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala new file mode 100644 index 00000000..1efff2bc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -0,0 +1,78 @@ +package kamon.metric.instrument + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ Cancellable, ActorSystem } +import com.typesafe.config.Config +import kamon.metric.{ CollectionContext, Scale, MetricRecorder } + +import scala.concurrent.duration.FiniteDuration + +trait Gauge extends MetricRecorder { + type SnapshotType = Histogram.Snapshot + + def record(value: Long) + def record(value: Long, count: Long) +} + +object Gauge { + + trait CurrentValueCollector { + def currentValue: Long + } + + def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration, + system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { + + val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector) + + val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + gauge.refreshValue() + }(system.dispatcher) // TODO: Move this to Kamon dispatchers + + gauge.refreshValuesSchedule.set(refreshValuesSchedule) + gauge + } + + def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge = + fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction)) + + def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = { + val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision") + fromConfig(config, system)(currentValueCollector) + } + + def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { + import scala.concurrent.duration._ + + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) + + Gauge(Histogram.Precision(significantDigits), highest, Scale.Unit, refreshInterval.millis, system)(currentValueCollector) + } + + implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f.apply() + } +} + +class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge { + val refreshValuesSchedule = new AtomicReference[Cancellable]() + + def record(value: Long): Unit = underlyingHistogram.record(value) + + def record(value: Long, count: Long): Unit = underlyingHistogram.record(value, count) + + def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context) + + def cleanup: Unit = { + if (refreshValuesSchedule.get() != null) + refreshValuesSchedule.get().cancel() + } + + def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala new file mode 100644 index 00000000..9ae077f4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -0,0 +1,246 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric.instrument + +import java.nio.LongBuffer +import com.typesafe.config.Config +import org.HdrHistogram.AtomicHistogramFieldsAccessor +import org.HdrHistogram.AtomicHistogram +import kamon.metric._ + +trait Histogram extends MetricRecorder { + type SnapshotType = Histogram.Snapshot + + def record(value: Long) + def record(value: Long, count: Long) +} + +object Histogram { + + def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram = + new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale) + + def fromConfig(config: Config): Histogram = { + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + + new HdrHistogram(1L, highest, significantDigits) + } + + object HighestTrackableValue { + val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L + } + + case class Precision(significantDigits: Int) + object Precision { + val Low = Precision(1) + val Normal = Precision(2) + val Fine = Precision(3) + } + + trait Record { + def level: Long + def count: Long + + private[kamon] def rawCompactRecord: Long + } + + case class MutableRecord(var level: Long, var count: Long) extends Record { + var rawCompactRecord: Long = 0L + } + + trait Snapshot extends MetricSnapshot { + type SnapshotType = Histogram.Snapshot + + def isEmpty: Boolean = numberOfMeasurements == 0 + def scale: Scale + def numberOfMeasurements: Long + def min: Long + def max: Long + def recordsIterator: Iterator[Record] + def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot + } +} + +/** + * This implementation is meant to be used for real time data collection where data snapshots are taken often over time. + * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still + * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. + */ +class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit) + extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits) + with Histogram with AtomicHistogramFieldsAccessor { + + import AtomicHistogramFieldsAccessor.totalCountUpdater + + def record(value: Long): Unit = recordValue(value) + + def record(value: Long, count: Long): Unit = recordValueWithCount(value, count) + + def collect(context: CollectionContext): Histogram.Snapshot = { + import context.buffer + buffer.clear() + val nrOfMeasurements = writeSnapshotTo(buffer) + + buffer.flip() + + val measurementsArray = Array.ofDim[Long](buffer.limit()) + buffer.get(measurementsArray, 0, measurementsArray.length) + new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + } + + def cleanup: Unit = {} + + private def writeSnapshotTo(buffer: LongBuffer): Long = { + val counts = countsArray() + val countsLength = counts.length() + + var nrOfMeasurements = 0L + var index = 0L + while (index < countsLength) { + val countAtIndex = counts.getAndSet(index.toInt, 0L) + + if (countAtIndex > 0) { + buffer.put(CompactHdrSnapshot.compactRecord(index, countAtIndex)) + nrOfMeasurements += countAtIndex + } + + index += 1 + } + + reestablishTotalCount(nrOfMeasurements) + nrOfMeasurements + } + + private def reestablishTotalCount(diff: Long): Unit = { + def tryUpdateTotalCount: Boolean = { + val previousTotalCount = getTotalCount + val newTotalCount = previousTotalCount - diff + + totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) + } + + while (!tryUpdateTotalCount) {} + } + +} + +class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, + subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot { + + def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0)) + def max: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(compactRecords.length - 1)) + + def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = { + if (that.isEmpty) this else if (this.isEmpty) that else { + import context.buffer + buffer.clear() + + val selfIterator = recordsIterator + val thatIterator = that.recordsIterator + var thatCurrentRecord: Histogram.Record = null + var mergedNumberOfMeasurements = 0L + + def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null + def addToBuffer(compactRecord: Long): Unit = { + mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) + buffer.put(compactRecord) + } + + while (selfIterator.hasNext) { + val selfCurrentRecord = selfIterator.next() + + // Advance that to no further than the level of selfCurrentRecord + thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord + while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { + addToBuffer(thatCurrentRecord.rawCompactRecord) + thatCurrentRecord = nextOrNull(thatIterator) + } + + // Include the current record of self and optionally merge if has the same level as thatCurrentRecord + if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { + addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) + thatCurrentRecord = nextOrNull(thatIterator) + } else { + addToBuffer(selfCurrentRecord.rawCompactRecord) + } + } + + // Include everything that might have been left from that + if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) + while (thatIterator.hasNext) { + addToBuffer(thatIterator.next().rawCompactRecord) + } + + buffer.flip() + val compactRecords = Array.ofDim[Long](buffer.limit()) + buffer.get(compactRecords) + + new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) + } + } + + @inline private def mergeCompactRecords(left: Long, right: Long): Long = { + val index = left >> 48 + val leftCount = countFromCompactRecord(left) + val rightCount = countFromCompactRecord(right) + + CompactHdrSnapshot.compactRecord(index, leftCount + rightCount) + } + + @inline private def levelFromCompactRecord(compactRecord: Long): Long = { + val countsArrayIndex = (compactRecord >> 48).toInt + var bucketIndex: Int = (countsArrayIndex >> subBucketHalfCountMagnitude) - 1 + var subBucketIndex: Int = (countsArrayIndex & (subBucketHalfCount - 1)) + subBucketHalfCount + if (bucketIndex < 0) { + subBucketIndex -= subBucketHalfCount + bucketIndex = 0 + } + + subBucketIndex.toLong << (bucketIndex + unitMagnitude) + } + + @inline private def countFromCompactRecord(compactRecord: Long): Long = + compactRecord & CompactHdrSnapshot.CompactRecordCountMask + + def recordsIterator: Iterator[Histogram.Record] = new Iterator[Histogram.Record] { + var currentIndex = 0 + val mutableRecord = Histogram.MutableRecord(0, 0) + + override def hasNext: Boolean = currentIndex < compactRecords.length + + override def next(): Histogram.Record = { + if (hasNext) { + val measurement = compactRecords(currentIndex) + mutableRecord.rawCompactRecord = measurement + mutableRecord.level = levelFromCompactRecord(measurement) + mutableRecord.count = countFromCompactRecord(measurement) + currentIndex += 1 + + mutableRecord + } else { + throw new IllegalStateException("The iterator has already been consumed.") + } + } + } +} + +object CompactHdrSnapshot { + val CompactRecordCountMask = 0xFFFFFFFFFFFFL + + def compactRecord(index: Long, count: Long): Long = (index << 48) | count +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala new file mode 100644 index 00000000..471e7bd4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -0,0 +1,116 @@ +package kamon.metric.instrument + +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +import java.lang.Math.abs +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference +import akka.actor.{ ActorSystem, Cancellable } +import com.typesafe.config.Config +import jsr166e.LongMaxUpdater +import kamon.metric.{ Scale, MetricRecorder, CollectionContext } +import kamon.util.PaddedAtomicLong +import scala.concurrent.duration.FiniteDuration + +trait MinMaxCounter extends MetricRecorder { + override type SnapshotType = Histogram.Snapshot + + def increment(): Unit + def increment(times: Long): Unit + def decrement() + def decrement(times: Long) +} + +object MinMaxCounter { + + def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration, + system: ActorSystem): MinMaxCounter = { + + val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram) + + val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + minMaxCounter.refreshValues() + }(system.dispatcher) // TODO: Move this to Kamon dispatchers + + minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule) + minMaxCounter + } + + def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = { + import scala.concurrent.duration._ + + val highest = config.getLong("highest-trackable-value") + val significantDigits = config.getInt("significant-value-digits") + val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) + + apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system) + } +} + +class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter { + private val min = new LongMaxUpdater + private val max = new LongMaxUpdater + private val sum = new PaddedAtomicLong + val refreshValuesSchedule = new AtomicReference[Cancellable]() + + min.update(0L) + max.update(0L) + + def increment(): Unit = increment(1L) + + def increment(times: Long): Unit = { + val currentValue = sum.addAndGet(times) + max.update(currentValue) + } + + def decrement(): Unit = decrement(1L) + + def decrement(times: Long): Unit = { + val currentValue = sum.addAndGet(-times) + min.update(-currentValue) + } + + def collect(context: CollectionContext): Histogram.Snapshot = { + refreshValues() + underlyingHistogram.collect(context) + } + + def cleanup: Unit = { + if (refreshValuesSchedule.get() != null) + refreshValuesSchedule.get().cancel() + } + + def refreshValues(): Unit = { + val currentValue = { + val value = sum.get() + if (value < 0) 0 else value + } + + val currentMin = { + val minAbs = abs(min.maxThenReset()) + if (minAbs <= currentValue) minAbs else 0 + } + + underlyingHistogram.record(currentValue) + underlyingHistogram.record(currentMin) + underlyingHistogram.record(max.maxThenReset()) + + max.update(currentValue) + min.update(-currentValue) + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/package.scala b/kamon-core/src/main/scala/kamon/metric/package.scala new file mode 100644 index 00000000..43166058 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/package.scala @@ -0,0 +1,34 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon + +import scala.annotation.tailrec +import com.typesafe.config.Config + +package object metric { + + @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { + if (right.isEmpty) + left + else { + val (key, rightValue) = right.head + val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) + + combineMaps(left.updated(key, value), right.tail)(valueMerger) + } + } +} |