aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-01-29 13:20:11 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-01-29 13:20:11 -0300
commit3c6a81b6f2d8bbc3618fe9175be5e13a6ff6c1db (patch)
treefab61102adc95106145ae8f2b27a641811b8034f /kamon-core/src
parent01450abea84a4c0f9f4efe73201a8ca041acea2b (diff)
downloadKamon-3c6a81b6f2d8bbc3618fe9175be5e13a6ff6c1db.tar.gz
Kamon-3c6a81b6f2d8bbc3618fe9175be5e13a6ff6c1db.tar.bz2
Kamon-3c6a81b6f2d8bbc3618fe9175be5e13a6ff6c1db.zip
max, min and merge operations for MetricSnapshot
Diffstat (limited to 'kamon-core/src')
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/Metrics.scala112
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala50
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala16
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala1
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala68
5 files changed, 188 insertions, 59 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala
new file mode 100644
index 00000000..61f79a29
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala
@@ -0,0 +1,112 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import annotation.tailrec
+import com.typesafe.config.Config
+import kamon.metrics.MetricSnapshot.Measurement
+
+case class MetricGroupIdentity(name: String, category: MetricGroupIdentity.Category)
+
+trait MetricIdentity {
+ def name: String
+}
+
+trait MetricGroupRecorder {
+ def record(identity: MetricIdentity, value: Long)
+ def collect: MetricGroupSnapshot
+}
+
+trait MetricGroupSnapshot {
+ def metrics: Map[MetricIdentity, MetricSnapshot]
+}
+
+trait MetricRecorder {
+ def record(value: Long)
+ def collect(): MetricSnapshot
+}
+
+trait MetricSnapshot {
+ def numberOfMeasurements: Long
+ def measurementLevels: Vector[Measurement]
+
+ def max: Long = measurementLevels.lastOption.map(_.value).getOrElse(0)
+ def min: Long = measurementLevels.headOption.map(_.value).getOrElse(0)
+
+ def merge(that: MetricSnapshot): MetricSnapshot = {
+ val mergedMeasurements = Vector.newBuilder[Measurement]
+
+ @tailrec def go(left: Vector[Measurement], right: Vector[Measurement], totalNrOfMeasurements: Long): Long = {
+ if (left.nonEmpty && right.nonEmpty) {
+ val leftValue = left.head
+ val rightValue = right.head
+
+ if (rightValue.value == leftValue.value) {
+ val merged = rightValue.merge(leftValue)
+ mergedMeasurements += merged
+ go(left.tail, right.tail, totalNrOfMeasurements + merged.count)
+ } else {
+ if (leftValue.value < rightValue.value) {
+ mergedMeasurements += leftValue
+ go(left.tail, right, totalNrOfMeasurements + leftValue.count)
+ } else {
+ mergedMeasurements += rightValue
+ go(left, right.tail, totalNrOfMeasurements + rightValue.count)
+ }
+ }
+ } else {
+ if (left.isEmpty && right.nonEmpty) {
+ mergedMeasurements += right.head
+ go(left, right.tail, totalNrOfMeasurements + right.head.count)
+ } else {
+ if (left.nonEmpty && right.isEmpty) {
+ mergedMeasurements += left.head
+ go(left.tail, right, totalNrOfMeasurements + left.head.count)
+ } else totalNrOfMeasurements
+ }
+ }
+ }
+
+ val totalNrOfMeasurements = go(measurementLevels, that.measurementLevels, 0)
+ DefaultMetricSnapshot(totalNrOfMeasurements, mergedMeasurements.result())
+ }
+}
+
+object MetricSnapshot {
+ case class Measurement(value: Long, count: Long) {
+ def merge(that: Measurement) = Measurement(value, count + that.count)
+ }
+}
+
+case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot
+
+object MetricGroupIdentity {
+ trait Category {
+ def name: String
+ }
+
+ val AnyCategory = new Category {
+ def name: String = "match-all"
+ override def equals(that: Any): Boolean = that.isInstanceOf[Category]
+ }
+}
+
+trait MetricGroupFactory {
+ type Group <: MetricGroupRecorder
+ def create(config: Config): Group
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
index 11e3ebfc..4d7ff354 100644
--- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
@@ -22,61 +22,11 @@ import com.typesafe.config.Config
import kamon.util.GlobPathFilter
import kamon.Kamon
import akka.actor
-import kamon.metrics.Metrics.MetricGroupFilter
import kamon.metrics.MetricGroupIdentity.Category
import kamon.metrics.Metrics.MetricGroupFilter
import scala.Some
import kamon.metrics.Subscriptions.Subscribe
-case class MetricGroupIdentity(name: String, category: MetricGroupIdentity.Category)
-
-trait MetricIdentity {
- def name: String
-}
-
-trait MetricGroupRecorder {
- def record(identity: MetricIdentity, value: Long)
- def collect: MetricGroupSnapshot
-}
-
-trait MetricGroupSnapshot {
- def metrics: Map[MetricIdentity, MetricSnapshot]
-}
-
-trait MetricRecorder {
- def record(value: Long)
- def collect(): MetricSnapshot
-}
-
-trait MetricSnapshot {
- def numberOfMeasurements: Long
- def measurementLevels: Vector[MetricSnapshot.Measurement]
-}
-
-object MetricSnapshot {
- case class Measurement(value: Long, count: Long)
-}
-
-case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot
-
-object MetricGroupIdentity {
- trait Category {
- def name: String
- }
-
- val AnyCategory = new Category {
- def name: String = "match-all"
- override def equals(that: Any): Boolean = that.isInstanceOf[Category]
- }
-}
-
-trait MetricGroupFactory {
- type Group <: MetricGroupRecorder
- def create(config: Config): Group
-}
-
-
-
class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension {
val config = system.settings.config
val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
index 5b2a902d..3151bdc1 100644
--- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
@@ -16,8 +16,8 @@
package kamon.metrics
-import akka.actor.{ActorRef, Actor}
-import kamon.metrics.Subscriptions.{MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe}
+import akka.actor.{ ActorRef, Actor }
+import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe }
import kamon.util.GlobPathFilter
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit
@@ -36,13 +36,13 @@ class Subscriptions extends Actor {
var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
def receive = {
- case Subscribe(category, selection, permanent) => subscribe(category, selection, permanent)
- case FlushMetrics => flush()
+ case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent)
+ case FlushMetrics ⇒ flush()
}
def subscribe(category: Category, selection: String, permanent: Boolean): Unit = {
val filter = MetricGroupFilter(category, new GlobPathFilter(selection))
- if(permanent) {
+ if (permanent) {
val receivers = subscribedPermanently.get(filter).getOrElse(Nil)
subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers)
@@ -67,8 +67,8 @@ class Subscriptions extends Actor {
def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]],
snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
- for((filter, receivers) <- subscriptions) yield {
- val selection = snapshots.filter(group => filter.accept(group._1))
+ for ((filter, receivers) ← subscriptions) yield {
+ val selection = snapshots.filter(group ⇒ filter.accept(group._1))
val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
receivers.foreach(_ ! tickMetrics)
@@ -86,5 +86,5 @@ object Subscriptions {
category.equals(identity.category) && globFilter.accept(identity.name)
}
}
-
+
}
diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
index 91fb3a69..03c1f323 100644
--- a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
@@ -39,7 +39,6 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
|}
""".stripMargin))
-
"the Kamon actor metrics" should {
"track configured actors" in {
Kamon(Metrics).subscribe(ActorMetrics, "user/test-tracked-actor", testActor)
diff --git a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala
new file mode 100644
index 00000000..1c5a4b21
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala
@@ -0,0 +1,68 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import org.scalatest.{ Matchers, WordSpec }
+import kamon.metrics.MetricSnapshot.Measurement
+
+class MetricSnapshotSpec extends WordSpec with Matchers {
+
+ "a metric snapshot" should {
+ "support a max operation" in new SnapshotFixtures {
+ snapshotA.max should be(17)
+ snapshotB.max should be(10)
+ }
+
+ "support a min operation" in new SnapshotFixtures {
+ snapshotA.min should be(1)
+ snapshotB.min should be(2)
+ }
+
+ "be able to merge with other snapshot" in new SnapshotFixtures {
+ val merged = snapshotA.merge(snapshotB)
+
+ merged.min should be(1)
+ merged.max should be(17)
+ merged.numberOfMeasurements should be(200)
+ merged.measurementLevels.map(_.value) should contain inOrderOnly (1, 2, 4, 5, 7, 10, 17)
+ }
+
+ "be able to merge with empty snapshots" in new SnapshotFixtures {
+ snapshotA.merge(emptySnapshot) should be(snapshotA)
+ }
+
+ }
+
+ trait SnapshotFixtures {
+ val emptySnapshot = DefaultMetricSnapshot(0, Vector.empty)
+
+ val snapshotA = DefaultMetricSnapshot(100, Vector(
+ Measurement(1, 3),
+ Measurement(2, 15),
+ Measurement(5, 68),
+ Measurement(7, 13),
+ Measurement(17, 1)))
+
+ val snapshotB = DefaultMetricSnapshot(100, Vector(
+ Measurement(2, 6),
+ Measurement(4, 48),
+ Measurement(5, 39),
+ Measurement(10, 7)))
+
+ }
+
+}