From 01450abea84a4c0f9f4efe73201a8ca041acea2b Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Tue, 28 Jan 2014 01:25:51 -0300 Subject: store actor metrics in the new metrics extension --- .../ActorMessagePassingTracing.scala | 15 ++- .../main/scala/kamon/metrics/ActorMetricsOps.scala | 140 --------------------- .../kamon/metrics/HighDynamicRangeRecorder.scala | 86 ------------- .../scala/kamon/metrics/MetricsExtension.scala | 18 ++- .../main/scala/kamon/metrics/Subscriptions.scala | 90 +++++++++++++ .../instruments/HighDynamicRangeRecorder.scala | 86 +++++++++++++ .../scala/kamon/metrics/ActorMetricsSpec.scala | 35 +++--- 7 files changed, 223 insertions(+), 247 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala delete mode 100644 kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala create mode 100644 kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index 68d606ba..d43de311 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -17,12 +17,14 @@ package akka.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{ Cell, Props, ActorSystem, ActorRef } +import akka.actor._ import akka.dispatch.{ Envelope, MessageDispatcher } import kamon.trace.{ TraceContext, ContextAware, Trace } -import kamon.metrics.{ ActorMetrics, HdrActorMetricsRecorder, Metrics } +import kamon.metrics.{ ActorMetrics, Metrics } import kamon.Kamon import kamon.metrics.ActorMetrics.ActorMetricRecorder +import kamon.trace.TraceContext +import kamon.metrics.ActorMetrics.ActorMetricRecorder @Aspect("perthis(actorCellCreation(*, *, *, *, *))") class BehaviourInvokeTracing { @@ -40,11 +42,11 @@ class BehaviourInvokeTracing { actorMetrics = metricsExtension.register(path, ActorMetrics) } - @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)") - def invokingActorBehaviourAtActorCell(envelope: Envelope) = {} + @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") + def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} - @Around("invokingActorBehaviourAtActorCell(envelope)") - def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = { + @Around("invokingActorBehaviourAtActorCell(cell, envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Unit = { val timestampBeforeProcessing = System.nanoTime() val contextAndTimestamp = envelope.asInstanceOf[ContextAndTimestampAware] @@ -55,6 +57,7 @@ class BehaviourInvokeTracing { actorMetrics.map { am ⇒ am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.timestamp) + am.mailboxSize.record(cell.numberOfMessages) } } diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala deleted file mode 100644 index 0e3af5fd..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import org.HdrHistogram.{ HighDynamicRangeRecorder, AbstractHistogram, AtomicHistogram } -import kamon.util.GlobPathFilter -import scala.collection.concurrent.TrieMap -import scala.collection.JavaConversions.iterableAsScalaIterable -import akka.actor._ -import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, FlushMetrics } -import kamon.Kamon -import scala.concurrent.duration._ -import java.util.concurrent.TimeUnit -import kamon.metrics.ActorMetricsDispatcher.Subscribe - -trait ActorMetricsOps { - self: MetricsExtension ⇒ - - val config = system.settings.config.getConfig("kamon.metrics.actors") - val actorMetrics = TrieMap[String, HdrActorMetricsRecorder]() - - val trackedActors: Vector[GlobPathFilter] = config.getStringList("tracked").map(glob ⇒ new GlobPathFilter(glob)).toVector - val excludedActors: Vector[GlobPathFilter] = config.getStringList("excluded").map(glob ⇒ new GlobPathFilter(glob)).toVector - - val actorMetricsFactory: () ⇒ HdrActorMetricsRecorder = { - val settings = config.getConfig("hdr-settings") - val processingTimeHdrConfig = HighDynamicRangeRecorder.Configuration.fromConfig(settings.getConfig("processing-time")) - val timeInMailboxHdrConfig = HighDynamicRangeRecorder.Configuration.fromConfig(settings.getConfig("time-in-mailbox")) - val mailboxSizeHdrConfig = HighDynamicRangeRecorder.Configuration.fromConfig(settings.getConfig("mailbox-size")) - - () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig) - } - - def shouldTrackActor(path: String): Boolean = - trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path)) - - def registerActor(path: String): HdrActorMetricsRecorder = actorMetrics.getOrElseUpdate(path, actorMetricsFactory()) - - def unregisterActor(path: String): Unit = actorMetrics.remove(path) -} - -class HdrActorMetricsRecorder(processingTimeHdrConfig: HighDynamicRangeRecorder.Configuration, timeInMailboxHdrConfig: HighDynamicRangeRecorder.Configuration, - mailboxSizeHdrConfig: HighDynamicRangeRecorder.Configuration) { - - val processingTimeHistogram = new AtomicHistogram(processingTimeHdrConfig.highestTrackableValue, processingTimeHdrConfig.significantValueDigits) - val timeInMailboxHistogram = new AtomicHistogram(timeInMailboxHdrConfig.highestTrackableValue, timeInMailboxHdrConfig.significantValueDigits) - val mailboxSizeHistogram = new AtomicHistogram(mailboxSizeHdrConfig.highestTrackableValue, mailboxSizeHdrConfig.significantValueDigits) - - def recordTimeInMailbox(waitTime: Long): Unit = timeInMailboxHistogram.recordValue(waitTime) - - def recordProcessingTime(processingTime: Long): Unit = processingTimeHistogram.recordValue(processingTime) - - def snapshot(): HdrActorMetricsSnapshot = { - HdrActorMetricsSnapshot(processingTimeHistogram.copy(), timeInMailboxHistogram.copy(), mailboxSizeHistogram.copy()) - } - - def reset(): Unit = { - processingTimeHistogram.reset() - timeInMailboxHistogram.reset() - mailboxSizeHistogram.reset() - } -} - -case class HdrActorMetricsSnapshot(processingTimeHistogram: AbstractHistogram, timeInMailboxHistogram: AbstractHistogram, - mailboxSizeHistogram: AbstractHistogram) - -class ActorMetricsDispatcher extends Actor { - val tickInterval = Duration(context.system.settings.config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) - val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) - - var subscribedForever: Map[GlobPathFilter, List[ActorRef]] = Map.empty - var subscribedForOne: Map[GlobPathFilter, List[ActorRef]] = Map.empty - var lastTick = System.currentTimeMillis() - - def receive = { - case Subscribe(path, true) ⇒ subscribeForever(path, sender) - case Subscribe(path, false) ⇒ subscribeOneOff(path, sender) - case FlushMetrics ⇒ flushMetrics() - } - - def subscribeForever(path: String, receiver: ActorRef): Unit = subscribedForever = subscribe(receiver, path, subscribedForever) - - def subscribeOneOff(path: String, receiver: ActorRef): Unit = subscribedForOne = subscribe(receiver, path, subscribedForOne) - - def subscribe(receiver: ActorRef, path: String, target: Map[GlobPathFilter, List[ActorRef]]): Map[GlobPathFilter, List[ActorRef]] = { - val pathFilter = new GlobPathFilter(path) - val oldReceivers = target.get(pathFilter).getOrElse(Nil) - target.updated(pathFilter, receiver :: oldReceivers) - } - - def flushMetrics(): Unit = { - /* val currentTick = System.currentTimeMillis() - val snapshots = Kamon(Metrics)(context.system).actorMetrics.map { - case (path, metrics) ⇒ - val snapshot = metrics.snapshot() - metrics.reset() - - (path, snapshot) - }.toMap - - dispatchMetricsTo(subscribedForOne, snapshots, currentTick) - dispatchMetricsTo(subscribedForever, snapshots, currentTick) - - subscribedForOne = Map.empty - lastTick = currentTick*/ - } - - def dispatchMetricsTo(subscribers: Map[GlobPathFilter, List[ActorRef]], snapshots: Map[String, HdrActorMetricsSnapshot], - currentTick: Long): Unit = { - - for ((subscribedPath, receivers) ← subscribers) { - val metrics = snapshots.filterKeys(snapshotPath ⇒ subscribedPath.accept(snapshotPath)) - val actorMetrics = ActorMetricsSnapshot(lastTick, currentTick, metrics) - - receivers.foreach(ref ⇒ ref ! actorMetrics) - } - } -} - -object ActorMetricsDispatcher { - case class Subscribe(path: String, forever: Boolean = false) - case class UnSubscribe(path: String) - - case class ActorMetricsSnapshot(fromMillis: Long, toMillis: Long, metrics: Map[String, HdrActorMetricsSnapshot]) - case object FlushMetrics -} diff --git a/kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala deleted file mode 100644 index e31d0e11..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package org.HdrHistogram - -import java.util.concurrent.atomic.AtomicLongFieldUpdater -import scala.annotation.tailrec -import kamon.metrics.{ DefaultMetricSnapshot, MetricSnapshot, MetricRecorder } -import com.typesafe.config.Config -import org.HdrHistogram.HighDynamicRangeRecorder.Configuration - -/** - * This implementation aims to be used for real time data collection where data snapshots are taken often over time. - * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still - * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. - */ -class HighDynamicRangeRecorder(configuration: Configuration) - extends AtomicHistogram(1L, configuration.highestTrackableValue, configuration.significantValueDigits) with MetricRecorder { - - import HighDynamicRangeRecorder.totalCountUpdater - - def record(value: Long): Unit = recordValue(value) - - def collect(): MetricSnapshot = { - val entries = Vector.newBuilder[MetricSnapshot.Measurement] - val countsLength = counts.length() - - @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { - if (index < countsLength) { - val currentValue = previousValue + increment - val countAtValue = counts.getAndSet(index, 0) - - if (countAtValue > 0) - entries += MetricSnapshot.Measurement(currentValue, countAtValue) - - if (currentValue == bucketLimit) - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) - else - iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) - } else { - nrOfRecordings - } - } - - val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1) - - def tryUpdateTotalCount: Boolean = { - val previousTotalCount = getTotalCount - val newTotalCount = previousTotalCount - nrOfRecordings - - totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) - } - - while (!tryUpdateTotalCount) {} - - DefaultMetricSnapshot(nrOfRecordings, entries.result()) - } - -} - -object HighDynamicRangeRecorder { - val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") - - def apply(configuration: Configuration): HighDynamicRangeRecorder = new HighDynamicRangeRecorder(configuration) - - case class Configuration(highestTrackableValue: Long, significantValueDigits: Int) - - case object Configuration { - def fromConfig(config: Config): Configuration = { - Configuration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala index 4bac3519..11e3ebfc 100644 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala @@ -17,12 +17,16 @@ package kamon.metrics import scala.collection.concurrent.TrieMap -import akka.actor.{ ExtensionIdProvider, ExtensionId, ExtendedActorSystem } +import akka.actor._ import com.typesafe.config.Config import kamon.util.GlobPathFilter import kamon.Kamon import akka.actor import kamon.metrics.Metrics.MetricGroupFilter +import kamon.metrics.MetricGroupIdentity.Category +import kamon.metrics.Metrics.MetricGroupFilter +import scala.Some +import kamon.metrics.Subscriptions.Subscribe case class MetricGroupIdentity(name: String, category: MetricGroupIdentity.Category) @@ -59,6 +63,11 @@ object MetricGroupIdentity { trait Category { def name: String } + + val AnyCategory = new Category { + def name: String = "match-all" + override def equals(that: Any): Boolean = that.isInstanceOf[Category] + } } trait MetricGroupFactory { @@ -66,10 +75,13 @@ trait MetricGroupFactory { def create(config: Config): Group } + + class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() val filters = loadFilters(config) + lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") def register(name: String, category: MetricGroupIdentity.Category with MetricGroupFactory): Option[category.Group] = { if (shouldTrack(name, category)) @@ -82,6 +94,10 @@ class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension storage.remove(MetricGroupIdentity(name, category)) } + def subscribe(category: Category, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { + subscriptions.tell(Subscribe(category, selection, permanently), receiver) + } + def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = { (for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap } diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala new file mode 100644 index 00000000..5b2a902d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -0,0 +1,90 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import akka.actor.{ActorRef, Actor} +import kamon.metrics.Subscriptions.{MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe} +import kamon.util.GlobPathFilter +import scala.concurrent.duration.Duration +import java.util.concurrent.TimeUnit +import kamon.metrics.MetricGroupIdentity.Category +import kamon.Kamon + +class Subscriptions extends Actor { + import context.system + + val config = context.system.settings.config + val tickInterval = Duration(config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var lastTick: Long = System.currentTimeMillis() + var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + + def receive = { + case Subscribe(category, selection, permanent) => subscribe(category, selection, permanent) + case FlushMetrics => flush() + } + + def subscribe(category: Category, selection: String, permanent: Boolean): Unit = { + val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) + if(permanent) { + val receivers = subscribedPermanently.get(filter).getOrElse(Nil) + subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) + + } else { + val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) + subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) + } + + } + + def flush(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(Metrics).collect + + dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) + dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) + + lastTick = currentTick + subscribedForOneShot = Map.empty + } + + def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], + snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { + + for((filter, receivers) <- subscriptions) yield { + val selection = snapshots.filter(group => filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + receivers.foreach(_ ! tickMetrics) + } + } +} + +object Subscriptions { + case object FlushMetrics + case class Subscribe(category: Category, selection: String, permanently: Boolean = false) + case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) + + case class MetricGroupFilter(category: Category, globFilter: GlobPathFilter) { + def accept(identity: MetricGroupIdentity): Boolean = { + category.equals(identity.category) && globFilter.accept(identity.name) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala new file mode 100644 index 00000000..e31d0e11 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala @@ -0,0 +1,86 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package org.HdrHistogram + +import java.util.concurrent.atomic.AtomicLongFieldUpdater +import scala.annotation.tailrec +import kamon.metrics.{ DefaultMetricSnapshot, MetricSnapshot, MetricRecorder } +import com.typesafe.config.Config +import org.HdrHistogram.HighDynamicRangeRecorder.Configuration + +/** + * This implementation aims to be used for real time data collection where data snapshots are taken often over time. + * The snapshotAndReset() operation extracts all the recorded values from the histogram and resets the counts, but still + * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. + */ +class HighDynamicRangeRecorder(configuration: Configuration) + extends AtomicHistogram(1L, configuration.highestTrackableValue, configuration.significantValueDigits) with MetricRecorder { + + import HighDynamicRangeRecorder.totalCountUpdater + + def record(value: Long): Unit = recordValue(value) + + def collect(): MetricSnapshot = { + val entries = Vector.newBuilder[MetricSnapshot.Measurement] + val countsLength = counts.length() + + @tailrec def iterate(index: Int, previousValue: Long, nrOfRecordings: Long, bucketLimit: Long, increment: Long): Long = { + if (index < countsLength) { + val currentValue = previousValue + increment + val countAtValue = counts.getAndSet(index, 0) + + if (countAtValue > 0) + entries += MetricSnapshot.Measurement(currentValue, countAtValue) + + if (currentValue == bucketLimit) + iterate(index + 1, currentValue, nrOfRecordings + countAtValue, (bucketLimit << 1) + 1, increment << 1) + else + iterate(index + 1, currentValue, nrOfRecordings + countAtValue, bucketLimit, increment) + } else { + nrOfRecordings + } + } + + val nrOfRecordings = iterate(0, -1, 0, subBucketMask, 1) + + def tryUpdateTotalCount: Boolean = { + val previousTotalCount = getTotalCount + val newTotalCount = previousTotalCount - nrOfRecordings + + totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) + } + + while (!tryUpdateTotalCount) {} + + DefaultMetricSnapshot(nrOfRecordings, entries.result()) + } + +} + +object HighDynamicRangeRecorder { + val totalCountUpdater = AtomicLongFieldUpdater.newUpdater(classOf[AtomicHistogram], "totalCount") + + def apply(configuration: Configuration): HighDynamicRangeRecorder = new HighDynamicRangeRecorder(configuration) + + case class Configuration(highestTrackableValue: Long, significantValueDigits: Int) + + case object Configuration { + def fromConfig(config: Config): Configuration = { + Configuration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits")) + } + } +} diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala index 127de4c4..91fb3a69 100644 --- a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala @@ -16,36 +16,43 @@ package kamon.metrics -import org.scalatest.{ WordSpecLike, Matchers, WordSpec } +import org.scalatest.{ WordSpecLike, Matchers } import akka.testkit.TestKitBase import akka.actor.{ Actor, Props, ActorSystem } import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, Subscribe } import scala.concurrent.duration._ +import kamon.Kamon +import kamon.metrics.Subscriptions.TickMetricSnapshot class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers { implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( """ - |kamon.metrics.actors.tracked = ["user/test*"] - """.stripMargin)) + |kamon.metrics { + | filters = [ + | { + | actor { + | includes = [ "user/*" ] + | excludes = [ ] + | } + | } + | ] + |} + """.stripMargin)) - implicit def self = testActor - - lazy val metricsExtension = Actor.noSender "the Kamon actor metrics" should { "track configured actors" in { - system.actorOf(Props[Other], "test-tracked-actor") ! "nothing" - metricsExtension ! Subscribe("user/test-tracked-actor") + Kamon(Metrics).subscribe(ActorMetrics, "user/test-tracked-actor", testActor) + + system.actorOf(Props[Discard], "test-tracked-actor") ! "nothing" - within(5 seconds) { - expectMsgType[ActorMetricsDispatcher.ActorMetricsSnapshot] - } + println(within(5 seconds) { + expectMsgType[TickMetricSnapshot] + }) } } } -class Other extends Actor { +class Discard extends Actor { def receive = { case a ⇒ } } -- cgit v1.2.3