aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-01-28 01:25:51 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-01-28 01:25:51 -0300
commit01450abea84a4c0f9f4efe73201a8ca041acea2b (patch)
tree601b76380bd5c3ba405d6b480ddd1090b574ea98 /kamon-core
parentb6ea0a93e6be8e1f355f1bc993618d178d0c9372 (diff)
downloadKamon-01450abea84a4c0f9f4efe73201a8ca041acea2b.tar.gz
Kamon-01450abea84a4c0f9f4efe73201a8ca041acea2b.tar.bz2
Kamon-01450abea84a4c0f9f4efe73201a8ca041acea2b.zip
store actor metrics in the new metrics extension
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala140
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala90
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala (renamed from kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala)0
-rw-r--r--kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala35
6 files changed, 137 insertions, 161 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index 68d606ba..d43de311 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -17,12 +17,14 @@ package akka.instrumentation
import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
-import akka.actor.{ Cell, Props, ActorSystem, ActorRef }
+import akka.actor._
import akka.dispatch.{ Envelope, MessageDispatcher }
import kamon.trace.{ TraceContext, ContextAware, Trace }
-import kamon.metrics.{ ActorMetrics, HdrActorMetricsRecorder, Metrics }
+import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
import kamon.metrics.ActorMetrics.ActorMetricRecorder
+import kamon.trace.TraceContext
+import kamon.metrics.ActorMetrics.ActorMetricRecorder
@Aspect("perthis(actorCellCreation(*, *, *, *, *))")
class BehaviourInvokeTracing {
@@ -40,11 +42,11 @@ class BehaviourInvokeTracing {
actorMetrics = metricsExtension.register(path, ActorMetrics)
}
- @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && args(envelope)")
- def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}
+ @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
+ def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {}
- @Around("invokingActorBehaviourAtActorCell(envelope)")
- def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
+ @Around("invokingActorBehaviourAtActorCell(cell, envelope)")
+ def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Unit = {
val timestampBeforeProcessing = System.nanoTime()
val contextAndTimestamp = envelope.asInstanceOf[ContextAndTimestampAware]
@@ -55,6 +57,7 @@ class BehaviourInvokeTracing {
actorMetrics.map { am ⇒
am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.timestamp)
+ am.mailboxSize.record(cell.numberOfMessages)
}
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala
deleted file mode 100644
index 0e3af5fd..00000000
--- a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.metrics
-
-import org.HdrHistogram.{ HighDynamicRangeRecorder, AbstractHistogram, AtomicHistogram }
-import kamon.util.GlobPathFilter
-import scala.collection.concurrent.TrieMap
-import scala.collection.JavaConversions.iterableAsScalaIterable
-import akka.actor._
-import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, FlushMetrics }
-import kamon.Kamon
-import scala.concurrent.duration._
-import java.util.concurrent.TimeUnit
-import kamon.metrics.ActorMetricsDispatcher.Subscribe
-
-trait ActorMetricsOps {
- self: MetricsExtension ⇒
-
- val config = system.settings.config.getConfig("kamon.metrics.actors")
- val actorMetrics = TrieMap[String, HdrActorMetricsRecorder]()
-
- val trackedActors: Vector[GlobPathFilter] = config.getStringList("tracked").map(glob ⇒ new GlobPathFilter(glob)).toVector
- val excludedActors: Vector[GlobPathFilter] = config.getStringList("excluded").map(glob ⇒ new GlobPathFilter(glob)).toVector
-
- val actorMetricsFactory: () ⇒ HdrActorMetricsRecorder = {
- val settings = config.getConfig("hdr-settings")
- val processingTimeHdrConfig = HighDynamicRangeRecorder.Configuration.fromConfig(settings.getConfig("processing-time"))
- val timeInMailboxHdrConfig = HighDynamicRangeRecorder.Configuration.fromConfig(settings.getConfig("time-in-mailbox"))
- val mailboxSizeHdrConfig = HighDynamicRangeRecorder.Configuration.fromConfig(settings.getConfig("mailbox-size"))
-
- () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig)
- }
-
- def shouldTrackActor(path: String): Boolean =
- trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path))
-
- def registerActor(path: String): HdrActorMetricsRecorder = actorMetrics.getOrElseUpdate(path, actorMetricsFactory())
-
- def unregisterActor(path: String): Unit = actorMetrics.remove(path)
-}
-
-class HdrActorMetricsRecorder(processingTimeHdrConfig: HighDynamicRangeRecorder.Configuration, timeInMailboxHdrConfig: HighDynamicRangeRecorder.Configuration,
- mailboxSizeHdrConfig: HighDynamicRangeRecorder.Configuration) {
-
- val processingTimeHistogram = new AtomicHistogram(processingTimeHdrConfig.highestTrackableValue, processingTimeHdrConfig.significantValueDigits)
- val timeInMailboxHistogram = new AtomicHistogram(timeInMailboxHdrConfig.highestTrackableValue, timeInMailboxHdrConfig.significantValueDigits)
- val mailboxSizeHistogram = new AtomicHistogram(mailboxSizeHdrConfig.highestTrackableValue, mailboxSizeHdrConfig.significantValueDigits)
-
- def recordTimeInMailbox(waitTime: Long): Unit = timeInMailboxHistogram.recordValue(waitTime)
-
- def recordProcessingTime(processingTime: Long): Unit = processingTimeHistogram.recordValue(processingTime)
-
- def snapshot(): HdrActorMetricsSnapshot = {
- HdrActorMetricsSnapshot(processingTimeHistogram.copy(), timeInMailboxHistogram.copy(), mailboxSizeHistogram.copy())
- }
-
- def reset(): Unit = {
- processingTimeHistogram.reset()
- timeInMailboxHistogram.reset()
- mailboxSizeHistogram.reset()
- }
-}
-
-case class HdrActorMetricsSnapshot(processingTimeHistogram: AbstractHistogram, timeInMailboxHistogram: AbstractHistogram,
- mailboxSizeHistogram: AbstractHistogram)
-
-class ActorMetricsDispatcher extends Actor {
- val tickInterval = Duration(context.system.settings.config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS)
- val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
-
- var subscribedForever: Map[GlobPathFilter, List[ActorRef]] = Map.empty
- var subscribedForOne: Map[GlobPathFilter, List[ActorRef]] = Map.empty
- var lastTick = System.currentTimeMillis()
-
- def receive = {
- case Subscribe(path, true) ⇒ subscribeForever(path, sender)
- case Subscribe(path, false) ⇒ subscribeOneOff(path, sender)
- case FlushMetrics ⇒ flushMetrics()
- }
-
- def subscribeForever(path: String, receiver: ActorRef): Unit = subscribedForever = subscribe(receiver, path, subscribedForever)
-
- def subscribeOneOff(path: String, receiver: ActorRef): Unit = subscribedForOne = subscribe(receiver, path, subscribedForOne)
-
- def subscribe(receiver: ActorRef, path: String, target: Map[GlobPathFilter, List[ActorRef]]): Map[GlobPathFilter, List[ActorRef]] = {
- val pathFilter = new GlobPathFilter(path)
- val oldReceivers = target.get(pathFilter).getOrElse(Nil)
- target.updated(pathFilter, receiver :: oldReceivers)
- }
-
- def flushMetrics(): Unit = {
- /* val currentTick = System.currentTimeMillis()
- val snapshots = Kamon(Metrics)(context.system).actorMetrics.map {
- case (path, metrics) ⇒
- val snapshot = metrics.snapshot()
- metrics.reset()
-
- (path, snapshot)
- }.toMap
-
- dispatchMetricsTo(subscribedForOne, snapshots, currentTick)
- dispatchMetricsTo(subscribedForever, snapshots, currentTick)
-
- subscribedForOne = Map.empty
- lastTick = currentTick*/
- }
-
- def dispatchMetricsTo(subscribers: Map[GlobPathFilter, List[ActorRef]], snapshots: Map[String, HdrActorMetricsSnapshot],
- currentTick: Long): Unit = {
-
- for ((subscribedPath, receivers) ← subscribers) {
- val metrics = snapshots.filterKeys(snapshotPath ⇒ subscribedPath.accept(snapshotPath))
- val actorMetrics = ActorMetricsSnapshot(lastTick, currentTick, metrics)
-
- receivers.foreach(ref ⇒ ref ! actorMetrics)
- }
- }
-}
-
-object ActorMetricsDispatcher {
- case class Subscribe(path: String, forever: Boolean = false)
- case class UnSubscribe(path: String)
-
- case class ActorMetricsSnapshot(fromMillis: Long, toMillis: Long, metrics: Map[String, HdrActorMetricsSnapshot])
- case object FlushMetrics
-}
diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
index 4bac3519..11e3ebfc 100644
--- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala
@@ -17,12 +17,16 @@
package kamon.metrics
import scala.collection.concurrent.TrieMap
-import akka.actor.{ ExtensionIdProvider, ExtensionId, ExtendedActorSystem }
+import akka.actor._
import com.typesafe.config.Config
import kamon.util.GlobPathFilter
import kamon.Kamon
import akka.actor
import kamon.metrics.Metrics.MetricGroupFilter
+import kamon.metrics.MetricGroupIdentity.Category
+import kamon.metrics.Metrics.MetricGroupFilter
+import scala.Some
+import kamon.metrics.Subscriptions.Subscribe
case class MetricGroupIdentity(name: String, category: MetricGroupIdentity.Category)
@@ -59,6 +63,11 @@ object MetricGroupIdentity {
trait Category {
def name: String
}
+
+ val AnyCategory = new Category {
+ def name: String = "match-all"
+ override def equals(that: Any): Boolean = that.isInstanceOf[Category]
+ }
}
trait MetricGroupFactory {
@@ -66,10 +75,13 @@ trait MetricGroupFactory {
def create(config: Config): Group
}
+
+
class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension {
val config = system.settings.config
val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]()
val filters = loadFilters(config)
+ lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions")
def register(name: String, category: MetricGroupIdentity.Category with MetricGroupFactory): Option[category.Group] = {
if (shouldTrack(name, category))
@@ -82,6 +94,10 @@ class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension
storage.remove(MetricGroupIdentity(name, category))
}
+ def subscribe(category: Category, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = {
+ subscriptions.tell(Subscribe(category, selection, permanently), receiver)
+ }
+
def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = {
(for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap
}
diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
new file mode 100644
index 00000000..5b2a902d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
@@ -0,0 +1,90 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import akka.actor.{ActorRef, Actor}
+import kamon.metrics.Subscriptions.{MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe}
+import kamon.util.GlobPathFilter
+import scala.concurrent.duration.Duration
+import java.util.concurrent.TimeUnit
+import kamon.metrics.MetricGroupIdentity.Category
+import kamon.Kamon
+
+class Subscriptions extends Actor {
+ import context.system
+
+ val config = context.system.settings.config
+ val tickInterval = Duration(config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS)
+ val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+
+ var lastTick: Long = System.currentTimeMillis()
+ var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
+ var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
+
+ def receive = {
+ case Subscribe(category, selection, permanent) => subscribe(category, selection, permanent)
+ case FlushMetrics => flush()
+ }
+
+ def subscribe(category: Category, selection: String, permanent: Boolean): Unit = {
+ val filter = MetricGroupFilter(category, new GlobPathFilter(selection))
+ if(permanent) {
+ val receivers = subscribedPermanently.get(filter).getOrElse(Nil)
+ subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers)
+
+ } else {
+ val receivers = subscribedForOneShot.get(filter).getOrElse(Nil)
+ subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers)
+ }
+
+ }
+
+ def flush(): Unit = {
+ val currentTick = System.currentTimeMillis()
+ val snapshots = Kamon(Metrics).collect
+
+ dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots)
+ dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots)
+
+ lastTick = currentTick
+ subscribedForOneShot = Map.empty
+ }
+
+ def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]],
+ snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
+
+ for((filter, receivers) <- subscriptions) yield {
+ val selection = snapshots.filter(group => filter.accept(group._1))
+ val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
+
+ receivers.foreach(_ ! tickMetrics)
+ }
+ }
+}
+
+object Subscriptions {
+ case object FlushMetrics
+ case class Subscribe(category: Category, selection: String, permanently: Boolean = false)
+ case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot])
+
+ case class MetricGroupFilter(category: Category, globFilter: GlobPathFilter) {
+ def accept(identity: MetricGroupIdentity): Boolean = {
+ category.equals(identity.category) && globFilter.accept(identity.name)
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala
index e31d0e11..e31d0e11 100644
--- a/kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala
diff --git a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
index 127de4c4..91fb3a69 100644
--- a/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metrics/ActorMetricsSpec.scala
@@ -16,36 +16,43 @@
package kamon.metrics
-import org.scalatest.{ WordSpecLike, Matchers, WordSpec }
+import org.scalatest.{ WordSpecLike, Matchers }
import akka.testkit.TestKitBase
import akka.actor.{ Actor, Props, ActorSystem }
import com.typesafe.config.ConfigFactory
-import kamon.Kamon
-import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, Subscribe }
import scala.concurrent.duration._
+import kamon.Kamon
+import kamon.metrics.Subscriptions.TickMetricSnapshot
class ActorMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
"""
- |kamon.metrics.actors.tracked = ["user/test*"]
- """.stripMargin))
+ |kamon.metrics {
+ | filters = [
+ | {
+ | actor {
+ | includes = [ "user/*" ]
+ | excludes = [ ]
+ | }
+ | }
+ | ]
+ |}
+ """.stripMargin))
- implicit def self = testActor
-
- lazy val metricsExtension = Actor.noSender
"the Kamon actor metrics" should {
"track configured actors" in {
- system.actorOf(Props[Other], "test-tracked-actor") ! "nothing"
- metricsExtension ! Subscribe("user/test-tracked-actor")
+ Kamon(Metrics).subscribe(ActorMetrics, "user/test-tracked-actor", testActor)
+
+ system.actorOf(Props[Discard], "test-tracked-actor") ! "nothing"
- within(5 seconds) {
- expectMsgType[ActorMetricsDispatcher.ActorMetricsSnapshot]
- }
+ println(within(5 seconds) {
+ expectMsgType[TickMetricSnapshot]
+ })
}
}
}
-class Other extends Actor {
+class Discard extends Actor {
def receive = { case a ⇒ }
}