From 3c6a81b6f2d8bbc3618fe9175be5e13a6ff6c1db Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 29 Jan 2014 13:20:11 -0300 Subject: max, min and merge operations for MetricSnapshot --- .../src/main/scala/kamon/metrics/Metrics.scala | 112 +++++++++++++++++++++ .../scala/kamon/metrics/MetricsExtension.scala | 50 --------- .../main/scala/kamon/metrics/Subscriptions.scala | 16 +-- .../scala/kamon/metrics/ActorMetricsSpec.scala | 1 - .../scala/kamon/metrics/MetricSnapshotSpec.scala | 68 +++++++++++++ 5 files changed, 188 insertions(+), 59 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metrics/Metrics.scala create mode 100644 kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/scala/kamon/metrics/Metrics.scala b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala new file mode 100644 index 00000000..61f79a29 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/Metrics.scala @@ -0,0 +1,112 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import annotation.tailrec +import com.typesafe.config.Config +import kamon.metrics.MetricSnapshot.Measurement + +case class MetricGroupIdentity(name: String, category: MetricGroupIdentity.Category) + +trait MetricIdentity { + def name: String +} + +trait MetricGroupRecorder { + def record(identity: MetricIdentity, value: Long) + def collect: MetricGroupSnapshot +} + +trait MetricGroupSnapshot { + def metrics: Map[MetricIdentity, MetricSnapshot] +} + +trait MetricRecorder { + def record(value: Long) + def collect(): MetricSnapshot +} + +trait MetricSnapshot { + def numberOfMeasurements: Long + def measurementLevels: Vector[Measurement] + + def max: Long = measurementLevels.lastOption.map(_.value).getOrElse(0) + def min: Long = measurementLevels.headOption.map(_.value).getOrElse(0) + + def merge(that: MetricSnapshot): MetricSnapshot = { + val mergedMeasurements = Vector.newBuilder[Measurement] + + @tailrec def go(left: Vector[Measurement], right: Vector[Measurement], totalNrOfMeasurements: Long): Long = { + if (left.nonEmpty && right.nonEmpty) { + val leftValue = left.head + val rightValue = right.head + + if (rightValue.value == leftValue.value) { + val merged = rightValue.merge(leftValue) + mergedMeasurements += merged + go(left.tail, right.tail, totalNrOfMeasurements + merged.count) + } else { + if (leftValue.value < rightValue.value) { + mergedMeasurements += leftValue + go(left.tail, right, totalNrOfMeasurements + leftValue.count) + } else { + mergedMeasurements += rightValue + go(left, right.tail, totalNrOfMeasurements + rightValue.count) + } + } + } else { + if (left.isEmpty && right.nonEmpty) { + mergedMeasurements += right.head + go(left, right.tail, totalNrOfMeasurements + right.head.count) + } else { + if (left.nonEmpty && right.isEmpty) { + mergedMeasurements += left.head + go(left.tail, right, totalNrOfMeasurements + left.head.count) + } else totalNrOfMeasurements + } + } + } + + val totalNrOfMeasurements = go(measurementLevels, that.measurementLevels, 0) + DefaultMetricSnapshot(totalNrOfMeasurements, mergedMeasurements.result()) + } +} + +object MetricSnapshot { + case class Measurement(value: Long, count: Long) { + def merge(that: Measurement) = Measurement(value, count + that.count) + } +} + +case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot + +object MetricGroupIdentity { + trait Category { + def name: String + } + + val AnyCategory = new Category { + def name: String = "match-all" + override def equals(that: Any): Boolean = that.isInstanceOf[Category] + } +} + +trait MetricGroupFactory { + type Group <: MetricGroupRecorder + def create(config: Config): Group +} + diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala index 11e3ebfc..4d7ff354 100644 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala @@ -22,61 +22,11 @@ import com.typesafe.config.Config import kamon.util.GlobPathFilter import kamon.Kamon import akka.actor -import kamon.metrics.Metrics.MetricGroupFilter import kamon.metrics.MetricGroupIdentity.Category import kamon.metrics.Metrics.MetricGroupFilter import scala.Some import kamon.metrics.Subscriptions.Subscribe -case class MetricGroupIdentity(name: String, category: MetricGroupIdentity.Category) - -trait MetricIdentity { - def name: String -} - -trait MetricGroupRecorder { - def record(identity: MetricIdentity, value: Long) - def collect: MetricGroupSnapshot -} - -trait MetricGroupSnapshot { - def metrics: Map[MetricIdentity, MetricSnapshot] -} - -trait MetricRecorder { - def record(value: Long) - def collect(): MetricSnapshot -} - -trait MetricSnapshot { - def numberOfMeasurements: Long - def measurementLevels: Vector[MetricSnapshot.Measurement] -} - -object MetricSnapshot { - case class Measurement(value: Long, count: Long) -} - -case class DefaultMetricSnapshot(numberOfMeasurements: Long, measurementLevels: Vector[MetricSnapshot.Measurement]) extends MetricSnapshot - -object MetricGroupIdentity { - trait Category { - def name: String - } - - val AnyCategory = new Category { - def name: String = "match-all" - override def equals(that: Any): Boolean = that.isInstanceOf[Category] - } -} - -trait MetricGroupFactory { - type Group <: MetricGroupRecorder - def create(config: Config): Group -} - - - class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala index 5b2a902d..3151bdc1 100644 --- a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -16,8 +16,8 @@ package kamon.metrics -import akka.actor.{ActorRef, Actor} -import kamon.metrics.Subscriptions.{MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe} +import akka.actor.{ ActorRef, Actor } +import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } import kamon.util.GlobPathFilter import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit @@ -36,13 +36,13 @@ class Subscriptions extends Actor { var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty def receive = { - case Subscribe(category, selection, permanent) => subscribe(category, selection, permanent) - case FlushMetrics => flush() + case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent) + case FlushMetrics ⇒ flush() } def subscribe(category: Category, selection: String, permanent: Boolean): Unit = { val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) - if(permanent) { + if (permanent) { val receivers = subscribedPermanently.get(filter).getOrElse(Nil) subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) @@ -67,8 +67,8 @@ class Subscriptions extends Actor { def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { - for((filter, receivers) <- subscriptions) yield { - val selection = snapshots.filter(group => filter.accept(group._1)) + for ((filter, receivers) ← subscriptions) yield { + val selection = snapshots.filter(group ⇒ filter.accept(group._1)) val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) receivers.foreach(_ ! tickMetrics) @@ -86,5 +86,5 @@ object Subscriptions { category.equals(identity.category) && globFilter.accept(identity.name) } } - + } diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala index 91fb3a69..03c1f323 100644 --- a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala @@ -39,7 +39,6 @@ class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { |} """.stripMargin)) - "the Kamon actor metrics" should { "track configured actors" in { Kamon(Metrics).subscribe(ActorMetrics, "user/test-tracked-actor", testActor) diff --git a/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala new file mode 100644 index 00000000..1c5a4b21 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metrics/MetricSnapshotSpec.scala @@ -0,0 +1,68 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import org.scalatest.{ Matchers, WordSpec } +import kamon.metrics.MetricSnapshot.Measurement + +class MetricSnapshotSpec extends WordSpec with Matchers { + + "a metric snapshot" should { + "support a max operation" in new SnapshotFixtures { + snapshotA.max should be(17) + snapshotB.max should be(10) + } + + "support a min operation" in new SnapshotFixtures { + snapshotA.min should be(1) + snapshotB.min should be(2) + } + + "be able to merge with other snapshot" in new SnapshotFixtures { + val merged = snapshotA.merge(snapshotB) + + merged.min should be(1) + merged.max should be(17) + merged.numberOfMeasurements should be(200) + merged.measurementLevels.map(_.value) should contain inOrderOnly (1, 2, 4, 5, 7, 10, 17) + } + + "be able to merge with empty snapshots" in new SnapshotFixtures { + snapshotA.merge(emptySnapshot) should be(snapshotA) + } + + } + + trait SnapshotFixtures { + val emptySnapshot = DefaultMetricSnapshot(0, Vector.empty) + + val snapshotA = DefaultMetricSnapshot(100, Vector( + Measurement(1, 3), + Measurement(2, 15), + Measurement(5, 68), + Measurement(7, 13), + Measurement(17, 1))) + + val snapshotB = DefaultMetricSnapshot(100, Vector( + Measurement(2, 6), + Measurement(4, 48), + Measurement(5, 39), + Measurement(10, 7))) + + } + +} -- cgit v1.2.3