aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metric
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-07-03 14:36:42 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-07-03 14:36:18 -0300
commit29068fc70a3e5a17a630c2c7fff951572bb5fa21 (patch)
tree7ec2632f36e9493cb559f510fa3cc3ead7443511 /kamon-core/src/main/scala/kamon/metric
parent4d5803e579e223c4f4f5cb37ab79ca069a007949 (diff)
downloadKamon-29068fc70a3e5a17a630c2c7fff951572bb5fa21.tar.gz
Kamon-29068fc70a3e5a17a630c2c7fff951572bb5fa21.tar.bz2
Kamon-29068fc70a3e5a17a630c2c7fff951572bb5fa21.zip
! all: refactor the core metric recording instruments and accomodate UserMetrics
This PR is including several changes to the kamon-core, most notably: - Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure that the interfaces are as clean as possible. - Move away from the all Vector[Measurement] based Histogram snapshot to a new approach in which we use a single long to store both the index in the counts array and the frequency on that bucket. The leftmost 2 bytes of each long are used for storing the counts array index and the remaining 6 bytes are used for the actual count, and everything is put into a simple long array. This way only the buckets that actually have values will be included in the snapshot with the smallest possible memory footprint. - Introduce Gauges. - Reorganize the instrumentation for Akka and Scala and rewrite most of the tests of this components to avoid going through the subscription protocol to test. - Introduce trace tests and fixes on various tests. - Necessary changes on new relic, datadog and statsd modules to compile with the new codebase. Pending: - Finish the upgrade of the new relic to the current model. - Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes limits. - More testing, more testing, more testing. - Create the KamonStandalone module.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
-rw-r--r--kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala89
-rw-r--r--kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala88
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala75
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala110
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Scale.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Subscriptions.scala128
-rw-r--r--kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala77
-rw-r--r--kamon-core/src/main/scala/kamon/metric/UserMetrics.scala139
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala59
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala78
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala246
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala116
-rw-r--r--kamon-core/src/main/scala/kamon/metric/package.scala34
13 files changed, 1270 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala
new file mode 100644
index 00000000..bb412f79
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/ActorMetrics.scala
@@ -0,0 +1,89 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import kamon.metric.instrument.{ MinMaxCounter, Counter, Histogram }
+
+case class ActorMetrics(name: String) extends MetricGroupIdentity {
+ val category = ActorMetrics
+}
+
+object ActorMetrics extends MetricGroupCategory {
+ val name = "actor"
+
+ case object ProcessingTime extends MetricIdentity { val name = "processing-time" }
+ case object MailboxSize extends MetricIdentity { val name = "mailbox-size" }
+ case object TimeInMailbox extends MetricIdentity { val name = "time-in-mailbox" }
+ case object Errors extends MetricIdentity { val name = "errors" }
+
+ case class ActorMetricsRecorder(processingTime: Histogram, timeInMailbox: Histogram, mailboxSize: MinMaxCounter,
+ errors: Counter) extends MetricGroupRecorder {
+
+ def collect(context: CollectionContext): ActorMetricSnapshot =
+ ActorMetricSnapshot(
+ processingTime.collect(context),
+ timeInMailbox.collect(context),
+ mailboxSize.collect(context),
+ errors.collect(context))
+
+ def cleanup: Unit = {
+ processingTime.cleanup
+ mailboxSize.cleanup
+ timeInMailbox.cleanup
+ errors.cleanup
+ }
+ }
+
+ case class ActorMetricSnapshot(processingTime: Histogram.Snapshot, timeInMailbox: Histogram.Snapshot,
+ mailboxSize: Histogram.Snapshot, errors: Counter.Snapshot) extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = ActorMetricSnapshot
+
+ def merge(that: ActorMetricSnapshot, context: CollectionContext): ActorMetricSnapshot =
+ ActorMetricSnapshot(
+ processingTime.merge(that.processingTime, context),
+ timeInMailbox.merge(that.timeInMailbox, context),
+ mailboxSize.merge(that.mailboxSize, context),
+ errors.merge(that.errors, context))
+
+ lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
+ (ProcessingTime -> processingTime),
+ (MailboxSize -> mailboxSize),
+ (TimeInMailbox -> timeInMailbox),
+ (Errors -> errors))
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = ActorMetricsRecorder
+
+ def create(config: Config, system: ActorSystem): ActorMetricsRecorder = {
+ val settings = config.getConfig("precision.actor")
+
+ val processingTimeConfig = settings.getConfig("processing-time")
+ val timeInMailboxConfig = settings.getConfig("time-in-mailbox")
+ val mailboxSizeConfig = settings.getConfig("mailbox-size")
+
+ new ActorMetricsRecorder(
+ Histogram.fromConfig(processingTimeConfig),
+ Histogram.fromConfig(timeInMailboxConfig),
+ MinMaxCounter.fromConfig(mailboxSizeConfig, system),
+ Counter())
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala
new file mode 100644
index 00000000..fbce783c
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/DispatcherMetrics.scala
@@ -0,0 +1,88 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import kamon.metric.instrument.{ Histogram, HdrHistogram }
+
+case class DispatcherMetrics(name: String) extends MetricGroupIdentity {
+ val category = DispatcherMetrics
+}
+
+object DispatcherMetrics extends MetricGroupCategory {
+ val name = "dispatcher"
+
+ case object MaximumPoolSize extends MetricIdentity { val name = "maximum-pool-size" }
+ case object RunningThreadCount extends MetricIdentity { val name = "running-thread-count" }
+ case object QueueTaskCount extends MetricIdentity { val name = "queued-task-count" }
+ case object PoolSize extends MetricIdentity { val name = "pool-size" }
+
+ case class DispatcherMetricRecorder(maximumPoolSize: Histogram, runningThreadCount: Histogram,
+ queueTaskCount: Histogram, poolSize: Histogram)
+ extends MetricGroupRecorder {
+
+ def collect(context: CollectionContext): MetricGroupSnapshot =
+ DispatcherMetricSnapshot(
+ maximumPoolSize.collect(context),
+ runningThreadCount.collect(context),
+ queueTaskCount.collect(context),
+ poolSize.collect(context))
+
+ def cleanup: Unit = {}
+
+ }
+
+ case class DispatcherMetricSnapshot(maximumPoolSize: Histogram.Snapshot, runningThreadCount: Histogram.Snapshot,
+ queueTaskCount: Histogram.Snapshot, poolSize: Histogram.Snapshot) extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = DispatcherMetricSnapshot
+
+ def merge(that: DispatcherMetricSnapshot, context: CollectionContext): DispatcherMetricSnapshot =
+ DispatcherMetricSnapshot(
+ maximumPoolSize.merge(that.maximumPoolSize, context),
+ runningThreadCount.merge(that.runningThreadCount, context),
+ queueTaskCount.merge(that.queueTaskCount, context),
+ poolSize.merge(that.poolSize, context))
+
+ lazy val metrics: Map[MetricIdentity, MetricSnapshot] = Map(
+ (MaximumPoolSize -> maximumPoolSize),
+ (RunningThreadCount -> runningThreadCount),
+ (QueueTaskCount -> queueTaskCount),
+ (PoolSize -> poolSize))
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = DispatcherMetricRecorder
+
+ def create(config: Config, system: ActorSystem): DispatcherMetricRecorder = {
+ val settings = config.getConfig("precision.dispatcher")
+
+ val maximumPoolSizeConfig = settings.getConfig("maximum-pool-size")
+ val runningThreadCountConfig = settings.getConfig("running-thread-count")
+ val queueTaskCountConfig = settings.getConfig("queued-task-count")
+ val poolSizeConfig = settings.getConfig("pool-size")
+
+ new DispatcherMetricRecorder(
+ Histogram.fromConfig(maximumPoolSizeConfig),
+ Histogram.fromConfig(runningThreadCountConfig),
+ Histogram.fromConfig(queueTaskCountConfig),
+ Histogram.fromConfig(poolSizeConfig))
+ }
+ }
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
new file mode 100644
index 00000000..325dd216
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala
@@ -0,0 +1,75 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import java.nio.{ LongBuffer }
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+
+trait MetricGroupCategory {
+ def name: String
+}
+
+trait MetricGroupIdentity {
+ def name: String
+ def category: MetricGroupCategory
+}
+
+trait MetricIdentity {
+ def name: String
+}
+
+trait CollectionContext {
+ def buffer: LongBuffer
+}
+
+object CollectionContext {
+ def default: CollectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(10000)
+ }
+}
+
+trait MetricGroupRecorder {
+ def collect(context: CollectionContext): MetricGroupSnapshot
+ def cleanup: Unit
+}
+
+trait MetricSnapshot {
+ type SnapshotType
+
+ def merge(that: SnapshotType, context: CollectionContext): SnapshotType
+}
+
+trait MetricGroupSnapshot {
+ type GroupSnapshotType
+
+ def metrics: Map[MetricIdentity, MetricSnapshot]
+ def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType
+}
+
+private[kamon] trait MetricRecorder {
+ type SnapshotType <: MetricSnapshot
+
+ def collect(context: CollectionContext): SnapshotType
+ def cleanup: Unit
+}
+
+trait MetricGroupFactory {
+ type GroupRecorder <: MetricGroupRecorder
+ def create(config: Config, system: ActorSystem): GroupRecorder
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
new file mode 100644
index 00000000..1025f0de
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -0,0 +1,110 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import java.nio.{ LongBuffer, ByteBuffer }
+
+import scala.collection.concurrent.TrieMap
+import akka.actor._
+import com.typesafe.config.Config
+import kamon.util.GlobPathFilter
+import kamon.Kamon
+import akka.actor
+import kamon.metric.Metrics.MetricGroupFilter
+import kamon.metric.Subscriptions.Subscribe
+import java.util.concurrent.TimeUnit
+
+class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ val metricsExtConfig = system.settings.config.getConfig("kamon.metrics")
+
+ /** Configured Dispatchers */
+ val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions"))
+ val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings"))
+
+ /** Configuration Settings */
+ val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS)
+
+ val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
+ val filters = loadFilters(metricsExtConfig)
+ lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")
+
+ def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = {
+ if (shouldTrack(identity))
+ Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder])
+ else
+ None
+ }
+
+ def unregister(identity: MetricGroupIdentity): Unit = {
+ storage.remove(identity)
+ }
+
+ def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = {
+ subscriptions.tell(Subscribe(category, selection, permanently), receiver)
+ }
+
+ def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = {
+ // TODO: Improve the way in which we are getting the context.
+ val context = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(50000)
+ }
+ (for ((identity, recorder) ← storage) yield (identity, recorder.collect(context))).toMap
+ }
+
+ def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
+ import scala.concurrent.duration._
+
+ system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) {
+ body
+ }(gaugeRecordingsDispatcher)
+ }
+
+ private def shouldTrack(identity: MetricGroupIdentity): Boolean = {
+ filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true)
+ }
+
+ def loadFilters(config: Config): Map[String, MetricGroupFilter] = {
+ import scala.collection.JavaConverters._
+
+ val filters = config.getObjectList("filters").asScala
+
+ val allFilters =
+ for (
+ filter ← filters;
+ entry ← filter.entrySet().asScala
+ ) yield {
+ val key = entry.getKey
+ val keyBasedConfig = entry.getValue.atKey(key)
+
+ val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList
+ val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList
+
+ (key, MetricGroupFilter(includes, excludes))
+ }
+
+ allFilters.toMap
+ }
+}
+
+object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
+ def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system)
+
+ case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) {
+ def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/Scale.scala b/kamon-core/src/main/scala/kamon/metric/Scale.scala
new file mode 100644
index 00000000..2f27c1a3
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/Scale.scala
@@ -0,0 +1,31 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+class Scale(val numericValue: Double) extends AnyVal
+
+object Scale {
+ val Nano = new Scale(1E-9)
+ val Micro = new Scale(1E-6)
+ val Milli = new Scale(1E-3)
+ val Unit = new Scale(1)
+ val Kilo = new Scale(1E3)
+ val Mega = new Scale(1E6)
+ val Giga = new Scale(1E9)
+
+ def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
new file mode 100644
index 00000000..a9f4c721
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
@@ -0,0 +1,128 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.{ Props, ActorRef, Actor }
+import kamon.metric.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe }
+import kamon.util.GlobPathFilter
+import scala.concurrent.duration.{ FiniteDuration, Duration }
+import java.util.concurrent.TimeUnit
+import kamon.Kamon
+import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer
+
+class Subscriptions extends Actor {
+ import context.system
+
+ val config = context.system.settings.config
+ val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
+ val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+
+ var lastTick: Long = System.currentTimeMillis()
+ var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
+ var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
+
+ def receive = {
+ case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent)
+ case FlushMetrics ⇒ flush()
+ }
+
+ def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = {
+ val filter = MetricGroupFilter(category, new GlobPathFilter(selection))
+ if (permanent) {
+ val receivers = subscribedPermanently.get(filter).getOrElse(Nil)
+ subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers)
+
+ } else {
+ val receivers = subscribedForOneShot.get(filter).getOrElse(Nil)
+ subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers)
+ }
+
+ }
+
+ def flush(): Unit = {
+ val currentTick = System.currentTimeMillis()
+ val snapshots = Kamon(Metrics).collect
+
+ dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots)
+ dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots)
+
+ lastTick = currentTick
+ subscribedForOneShot = Map.empty
+ }
+
+ def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]],
+ snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
+
+ for ((filter, receivers) ← subscriptions) yield {
+ val selection = snapshots.filter(group ⇒ filter.accept(group._1))
+ val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
+
+ receivers.foreach(_ ! tickMetrics)
+ }
+ }
+}
+
+object Subscriptions {
+ case object FlushMetrics
+ case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false)
+ case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot])
+
+ case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) {
+ def accept(identity: MetricGroupIdentity): Boolean = {
+ category.equals(identity.category) && globFilter.accept(identity.name)
+ }
+ }
+}
+
+class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
+ val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
+ val collectionContext = CollectionContext.default
+
+ def receive = empty
+
+ def empty: Actor.Receive = {
+ case tick: TickMetricSnapshot ⇒ context become (buffering(tick))
+ case FlushBuffer ⇒ // Nothing to flush.
+ }
+
+ def buffering(buffered: TickMetricSnapshot): Actor.Receive = {
+ case TickMetricSnapshot(_, to, tickMetrics) ⇒
+ val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup)
+ val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics)
+
+ context become (buffering(combinedSnapshot))
+
+ case FlushBuffer ⇒
+ receiver ! buffered
+ context become (empty)
+
+ }
+
+ override def postStop(): Unit = {
+ flushSchedule.cancel()
+ super.postStop()
+ }
+
+ def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext)))
+}
+
+object TickMetricSnapshotBuffer {
+ case object FlushBuffer
+
+ def props(flushInterval: FiniteDuration, receiver: ActorRef): Props =
+ Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver))
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
new file mode 100644
index 00000000..1ee1eab4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala
@@ -0,0 +1,77 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.ActorSystem
+import kamon.metric.instrument.{ Histogram }
+
+import scala.collection.concurrent.TrieMap
+import com.typesafe.config.Config
+
+case class TraceMetrics(name: String) extends MetricGroupIdentity {
+ val category = TraceMetrics
+}
+
+object TraceMetrics extends MetricGroupCategory {
+ val name = "trace"
+
+ case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" }
+ case class HttpClientRequest(name: String) extends MetricIdentity
+
+ case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram)
+ extends MetricGroupRecorder {
+
+ private val segments = TrieMap[MetricIdentity, Histogram]()
+
+ def segmentRecorder(segmentIdentity: MetricIdentity): Histogram =
+ segments.getOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply())
+
+ def collect(context: CollectionContext): TraceMetricsSnapshot =
+ TraceMetricsSnapshot(
+ elapsedTime.collect(context),
+ segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap)
+
+ def cleanup: Unit = {}
+ }
+
+ case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot])
+ extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = TraceMetricsSnapshot
+
+ def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot =
+ TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), Map.empty)
+
+ def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime)
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = TraceMetricRecorder
+
+ def create(config: Config, system: ActorSystem): TraceMetricRecorder = {
+
+ val settings = config.getConfig("precision.trace")
+ val elapsedTimeConfig = settings.getConfig("elapsed-time")
+ val segmentConfig = settings.getConfig("segment")
+
+ new TraceMetricRecorder(
+ Histogram.fromConfig(elapsedTimeConfig),
+ () ⇒ Histogram.fromConfig(segmentConfig))
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
new file mode 100644
index 00000000..dea03968
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala
@@ -0,0 +1,139 @@
+package kamon.metric
+
+import akka.actor
+import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
+import com.typesafe.config.Config
+import kamon.Kamon
+import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram }
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+
+class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ lazy val userMetricsRecorder = Kamon(Metrics)(system).register(UserMetrics, UserMetrics.Factory).get
+
+ def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram =
+ userMetricsRecorder.buildHistogram(name, precision, highestTrackableValue)
+
+ def registerHistogram(name: String): Histogram =
+ userMetricsRecorder.buildHistogram(name)
+
+ def registerCounter(name: String): Counter =
+ userMetricsRecorder.buildCounter(name)
+
+ def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
+ refreshInterval: FiniteDuration): MinMaxCounter = {
+ userMetricsRecorder.buildMinMaxCounter(name, precision, highestTrackableValue, refreshInterval)
+ }
+
+ def registerMinMaxCounter(name: String): MinMaxCounter =
+ userMetricsRecorder.buildMinMaxCounter(name)
+
+ def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
+ userMetricsRecorder.buildGauge(name)(currentValueCollector)
+
+ def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
+ refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
+ userMetricsRecorder.buildGauge(name, precision, highestTrackableValue, refreshInterval, currentValueCollector)
+}
+
+object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider with MetricGroupIdentity {
+ def lookup(): ExtensionId[_ <: actor.Extension] = Metrics
+ def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system)
+
+ val name: String = "user-metrics-recorder"
+ val category = new MetricGroupCategory {
+ val name: String = "user-metrics"
+ }
+
+ val Factory = new MetricGroupFactory {
+ type GroupRecorder = UserMetricsRecorder
+ def create(config: Config, system: ActorSystem): UserMetricsRecorder = new UserMetricsRecorder(system)
+ }
+
+ class UserMetricsRecorder(system: ActorSystem) extends MetricGroupRecorder {
+ val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision")
+ val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision")
+ val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision")
+ val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision")
+
+ val histograms = TrieMap[String, Histogram]()
+ val counters = TrieMap[String, Counter]()
+ val minMaxCounters = TrieMap[String, MinMaxCounter]()
+ val gauges = TrieMap[String, Gauge]()
+
+ def buildHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram =
+ histograms.getOrElseUpdate(name, Histogram(highestTrackableValue, precision, Scale.Unit))
+
+ def buildHistogram(name: String): Histogram =
+ histograms.getOrElseUpdate(name, Histogram.fromConfig(defaultHistogramPrecisionConfig))
+
+ def buildCounter(name: String): Counter =
+ counters.getOrElseUpdate(name, Counter())
+
+ def buildMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
+ refreshInterval: FiniteDuration): MinMaxCounter = {
+ minMaxCounters.getOrElseUpdate(name, MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system))
+ }
+
+ def buildMinMaxCounter(name: String): MinMaxCounter =
+ minMaxCounters.getOrElseUpdate(name, MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system))
+
+ def buildGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long,
+ refreshInterval: FiniteDuration, currentValueCollector: Gauge.CurrentValueCollector): Gauge =
+ gauges.getOrElseUpdate(name, Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector))
+
+ def buildGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge =
+ gauges.getOrElseUpdate(name, Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector))
+
+ def collect(context: CollectionContext): UserMetricsSnapshot = {
+ val histogramSnapshots = histograms.map {
+ case (name, histogram) ⇒
+ (UserHistogram(name), histogram.collect(context))
+ } toMap
+
+ val counterSnapshots = counters.map {
+ case (name, counter) ⇒
+ (UserCounter(name), counter.collect(context))
+ } toMap
+
+ val minMaxCounterSnapshots = minMaxCounters.map {
+ case (name, minMaxCounter) ⇒
+ (UserMinMaxCounter(name), minMaxCounter.collect(context))
+ } toMap
+
+ val gaugeSnapshots = gauges.map {
+ case (name, gauge) ⇒
+ (UserGauge(name), gauge.collect(context))
+ } toMap
+
+ UserMetricsSnapshot(histogramSnapshots, counterSnapshots, minMaxCounterSnapshots, gaugeSnapshots)
+ }
+
+ def cleanup: Unit = {}
+ }
+
+ case class UserHistogram(name: String) extends MetricIdentity
+ case class UserCounter(name: String) extends MetricIdentity
+ case class UserMinMaxCounter(name: String) extends MetricIdentity
+ case class UserGauge(name: String) extends MetricIdentity
+
+ case class UserMetricsSnapshot(histograms: Map[UserHistogram, Histogram.Snapshot],
+ counters: Map[UserCounter, Counter.Snapshot],
+ minMaxCounters: Map[UserMinMaxCounter, Histogram.Snapshot],
+ gauges: Map[UserGauge, Histogram.Snapshot])
+ extends MetricGroupSnapshot {
+
+ type GroupSnapshotType = UserMetricsSnapshot
+
+ def merge(that: UserMetricsSnapshot, context: CollectionContext): UserMetricsSnapshot =
+ UserMetricsSnapshot(
+ combineMaps(histograms, that.histograms)((l, r) ⇒ l.merge(r, context)),
+ combineMaps(counters, that.counters)((l, r) ⇒ l.merge(r, context)),
+ combineMaps(minMaxCounters, that.minMaxCounters)((l, r) ⇒ l.merge(r, context)),
+ combineMaps(gauges, that.gauges)((l, r) ⇒ l.merge(r, context)))
+
+ def metrics: Map[MetricIdentity, MetricSnapshot] = histograms ++ counters ++ minMaxCounters ++ gauges
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
new file mode 100644
index 00000000..b592bcd3
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
@@ -0,0 +1,59 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import jsr166e.LongAdder
+import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder }
+
+trait Counter extends MetricRecorder {
+ type SnapshotType = Counter.Snapshot
+
+ def increment(): Unit
+ def increment(times: Long): Unit
+}
+
+object Counter {
+
+ def apply(): Counter = new LongAdderCounter
+
+ trait Snapshot extends MetricSnapshot {
+ type SnapshotType = Counter.Snapshot
+
+ def count: Long
+ def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot
+ }
+}
+
+class LongAdderCounter extends Counter {
+ private val counter = new LongAdder
+
+ def increment(): Unit = counter.increment()
+
+ def increment(times: Long): Unit = {
+ if (times < 0)
+ throw new UnsupportedOperationException("Counters cannot be decremented")
+ counter.add(times)
+ }
+
+ def collect(context: CollectionContext): Counter.Snapshot = CounterSnapshot(counter.sumThenReset())
+
+ def cleanup: Unit = {}
+}
+
+case class CounterSnapshot(count: Long) extends Counter.Snapshot {
+ def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
new file mode 100644
index 00000000..1efff2bc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
@@ -0,0 +1,78 @@
+package kamon.metric.instrument
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+import akka.actor.{ Cancellable, ActorSystem }
+import com.typesafe.config.Config
+import kamon.metric.{ CollectionContext, Scale, MetricRecorder }
+
+import scala.concurrent.duration.FiniteDuration
+
+trait Gauge extends MetricRecorder {
+ type SnapshotType = Histogram.Snapshot
+
+ def record(value: Long)
+ def record(value: Long, count: Long)
+}
+
+object Gauge {
+
+ trait CurrentValueCollector {
+ def currentValue: Long
+ }
+
+ def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration,
+ system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
+
+ val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
+ val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector)
+
+ val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ gauge.refreshValue()
+ }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+
+ gauge.refreshValuesSchedule.set(refreshValuesSchedule)
+ gauge
+ }
+
+ def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge =
+ fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction))
+
+ def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = {
+ val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision")
+ fromConfig(config, system)(currentValueCollector)
+ }
+
+ def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
+ import scala.concurrent.duration._
+
+ val highest = config.getLong("highest-trackable-value")
+ val significantDigits = config.getInt("significant-value-digits")
+ val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS)
+
+ Gauge(Histogram.Precision(significantDigits), highest, Scale.Unit, refreshInterval.millis, system)(currentValueCollector)
+ }
+
+ implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
+ def currentValue: Long = f.apply()
+ }
+}
+
+class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge {
+ val refreshValuesSchedule = new AtomicReference[Cancellable]()
+
+ def record(value: Long): Unit = underlyingHistogram.record(value)
+
+ def record(value: Long, count: Long): Unit = underlyingHistogram.record(value, count)
+
+ def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context)
+
+ def cleanup: Unit = {
+ if (refreshValuesSchedule.get() != null)
+ refreshValuesSchedule.get().cancel()
+ }
+
+ def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue)
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
new file mode 100644
index 00000000..9ae077f4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
@@ -0,0 +1,246 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import java.nio.LongBuffer
+import com.typesafe.config.Config
+import org.HdrHistogram.AtomicHistogramFieldsAccessor
+import org.HdrHistogram.AtomicHistogram
+import kamon.metric._
+
+trait Histogram extends MetricRecorder {
+ type SnapshotType = Histogram.Snapshot
+
+ def record(value: Long)
+ def record(value: Long, count: Long)
+}
+
+object Histogram {
+
+ def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram =
+ new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale)
+
+ def fromConfig(config: Config): Histogram = {
+ val highest = config.getLong("highest-trackable-value")
+ val significantDigits = config.getInt("significant-value-digits")
+
+ new HdrHistogram(1L, highest, significantDigits)
+ }
+
+ object HighestTrackableValue {
+ val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L
+ }
+
+ case class Precision(significantDigits: Int)
+ object Precision {
+ val Low = Precision(1)
+ val Normal = Precision(2)
+ val Fine = Precision(3)
+ }
+
+ trait Record {
+ def level: Long
+ def count: Long
+
+ private[kamon] def rawCompactRecord: Long
+ }
+
+ case class MutableRecord(var level: Long, var count: Long) extends Record {
+ var rawCompactRecord: Long = 0L
+ }
+
+ trait Snapshot extends MetricSnapshot {
+ type SnapshotType = Histogram.Snapshot
+
+ def isEmpty: Boolean = numberOfMeasurements == 0
+ def scale: Scale
+ def numberOfMeasurements: Long
+ def min: Long
+ def max: Long
+ def recordsIterator: Iterator[Record]
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot
+ }
+}
+
+/**
+ * This implementation is meant to be used for real time data collection where data snapshots are taken often over time.
+ * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still
+ * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken.
+ */
+class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit)
+ extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits)
+ with Histogram with AtomicHistogramFieldsAccessor {
+
+ import AtomicHistogramFieldsAccessor.totalCountUpdater
+
+ def record(value: Long): Unit = recordValue(value)
+
+ def record(value: Long, count: Long): Unit = recordValueWithCount(value, count)
+
+ def collect(context: CollectionContext): Histogram.Snapshot = {
+ import context.buffer
+ buffer.clear()
+ val nrOfMeasurements = writeSnapshotTo(buffer)
+
+ buffer.flip()
+
+ val measurementsArray = Array.ofDim[Long](buffer.limit())
+ buffer.get(measurementsArray, 0, measurementsArray.length)
+ new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude())
+ }
+
+ def cleanup: Unit = {}
+
+ private def writeSnapshotTo(buffer: LongBuffer): Long = {
+ val counts = countsArray()
+ val countsLength = counts.length()
+
+ var nrOfMeasurements = 0L
+ var index = 0L
+ while (index < countsLength) {
+ val countAtIndex = counts.getAndSet(index.toInt, 0L)
+
+ if (countAtIndex > 0) {
+ buffer.put(CompactHdrSnapshot.compactRecord(index, countAtIndex))
+ nrOfMeasurements += countAtIndex
+ }
+
+ index += 1
+ }
+
+ reestablishTotalCount(nrOfMeasurements)
+ nrOfMeasurements
+ }
+
+ private def reestablishTotalCount(diff: Long): Unit = {
+ def tryUpdateTotalCount: Boolean = {
+ val previousTotalCount = getTotalCount
+ val newTotalCount = previousTotalCount - diff
+
+ totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount)
+ }
+
+ while (!tryUpdateTotalCount) {}
+ }
+
+}
+
+class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int,
+ subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot {
+
+ def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0))
+ def max: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(compactRecords.length - 1))
+
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = {
+ if (that.isEmpty) this else if (this.isEmpty) that else {
+ import context.buffer
+ buffer.clear()
+
+ val selfIterator = recordsIterator
+ val thatIterator = that.recordsIterator
+ var thatCurrentRecord: Histogram.Record = null
+ var mergedNumberOfMeasurements = 0L
+
+ def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null
+ def addToBuffer(compactRecord: Long): Unit = {
+ mergedNumberOfMeasurements += countFromCompactRecord(compactRecord)
+ buffer.put(compactRecord)
+ }
+
+ while (selfIterator.hasNext) {
+ val selfCurrentRecord = selfIterator.next()
+
+ // Advance that to no further than the level of selfCurrentRecord
+ thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord
+ while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) {
+ addToBuffer(thatCurrentRecord.rawCompactRecord)
+ thatCurrentRecord = nextOrNull(thatIterator)
+ }
+
+ // Include the current record of self and optionally merge if has the same level as thatCurrentRecord
+ if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) {
+ addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord))
+ thatCurrentRecord = nextOrNull(thatIterator)
+ } else {
+ addToBuffer(selfCurrentRecord.rawCompactRecord)
+ }
+ }
+
+ // Include everything that might have been left from that
+ if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord)
+ while (thatIterator.hasNext) {
+ addToBuffer(thatIterator.next().rawCompactRecord)
+ }
+
+ buffer.flip()
+ val compactRecords = Array.ofDim[Long](buffer.limit())
+ buffer.get(compactRecords)
+
+ new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude)
+ }
+ }
+
+ @inline private def mergeCompactRecords(left: Long, right: Long): Long = {
+ val index = left >> 48
+ val leftCount = countFromCompactRecord(left)
+ val rightCount = countFromCompactRecord(right)
+
+ CompactHdrSnapshot.compactRecord(index, leftCount + rightCount)
+ }
+
+ @inline private def levelFromCompactRecord(compactRecord: Long): Long = {
+ val countsArrayIndex = (compactRecord >> 48).toInt
+ var bucketIndex: Int = (countsArrayIndex >> subBucketHalfCountMagnitude) - 1
+ var subBucketIndex: Int = (countsArrayIndex & (subBucketHalfCount - 1)) + subBucketHalfCount
+ if (bucketIndex < 0) {
+ subBucketIndex -= subBucketHalfCount
+ bucketIndex = 0
+ }
+
+ subBucketIndex.toLong << (bucketIndex + unitMagnitude)
+ }
+
+ @inline private def countFromCompactRecord(compactRecord: Long): Long =
+ compactRecord & CompactHdrSnapshot.CompactRecordCountMask
+
+ def recordsIterator: Iterator[Histogram.Record] = new Iterator[Histogram.Record] {
+ var currentIndex = 0
+ val mutableRecord = Histogram.MutableRecord(0, 0)
+
+ override def hasNext: Boolean = currentIndex < compactRecords.length
+
+ override def next(): Histogram.Record = {
+ if (hasNext) {
+ val measurement = compactRecords(currentIndex)
+ mutableRecord.rawCompactRecord = measurement
+ mutableRecord.level = levelFromCompactRecord(measurement)
+ mutableRecord.count = countFromCompactRecord(measurement)
+ currentIndex += 1
+
+ mutableRecord
+ } else {
+ throw new IllegalStateException("The iterator has already been consumed.")
+ }
+ }
+ }
+}
+
+object CompactHdrSnapshot {
+ val CompactRecordCountMask = 0xFFFFFFFFFFFFL
+
+ def compactRecord(index: Long, count: Long): Long = (index << 48) | count
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
new file mode 100644
index 00000000..471e7bd4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
@@ -0,0 +1,116 @@
+package kamon.metric.instrument
+
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import java.lang.Math.abs
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+import akka.actor.{ ActorSystem, Cancellable }
+import com.typesafe.config.Config
+import jsr166e.LongMaxUpdater
+import kamon.metric.{ Scale, MetricRecorder, CollectionContext }
+import kamon.util.PaddedAtomicLong
+import scala.concurrent.duration.FiniteDuration
+
+trait MinMaxCounter extends MetricRecorder {
+ override type SnapshotType = Histogram.Snapshot
+
+ def increment(): Unit
+ def increment(times: Long): Unit
+ def decrement()
+ def decrement(times: Long)
+}
+
+object MinMaxCounter {
+
+ def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration,
+ system: ActorSystem): MinMaxCounter = {
+
+ val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
+ val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram)
+
+ val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ minMaxCounter.refreshValues()
+ }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+
+ minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule)
+ minMaxCounter
+ }
+
+ def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = {
+ import scala.concurrent.duration._
+
+ val highest = config.getLong("highest-trackable-value")
+ val significantDigits = config.getInt("significant-value-digits")
+ val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS)
+
+ apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system)
+ }
+}
+
+class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter {
+ private val min = new LongMaxUpdater
+ private val max = new LongMaxUpdater
+ private val sum = new PaddedAtomicLong
+ val refreshValuesSchedule = new AtomicReference[Cancellable]()
+
+ min.update(0L)
+ max.update(0L)
+
+ def increment(): Unit = increment(1L)
+
+ def increment(times: Long): Unit = {
+ val currentValue = sum.addAndGet(times)
+ max.update(currentValue)
+ }
+
+ def decrement(): Unit = decrement(1L)
+
+ def decrement(times: Long): Unit = {
+ val currentValue = sum.addAndGet(-times)
+ min.update(-currentValue)
+ }
+
+ def collect(context: CollectionContext): Histogram.Snapshot = {
+ refreshValues()
+ underlyingHistogram.collect(context)
+ }
+
+ def cleanup: Unit = {
+ if (refreshValuesSchedule.get() != null)
+ refreshValuesSchedule.get().cancel()
+ }
+
+ def refreshValues(): Unit = {
+ val currentValue = {
+ val value = sum.get()
+ if (value < 0) 0 else value
+ }
+
+ val currentMin = {
+ val minAbs = abs(min.maxThenReset())
+ if (minAbs <= currentValue) minAbs else 0
+ }
+
+ underlyingHistogram.record(currentValue)
+ underlyingHistogram.record(currentMin)
+ underlyingHistogram.record(max.maxThenReset())
+
+ max.update(currentValue)
+ min.update(-currentValue)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/package.scala b/kamon-core/src/main/scala/kamon/metric/package.scala
new file mode 100644
index 00000000..43166058
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/package.scala
@@ -0,0 +1,34 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+
+import scala.annotation.tailrec
+import com.typesafe.config.Config
+
+package object metric {
+
+ @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = {
+ if (right.isEmpty)
+ left
+ else {
+ val (key, rightValue) = right.head
+ val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue)
+
+ combineMaps(left.updated(key, value), right.tail)(valueMerger)
+ }
+ }
+}