aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metrics
diff options
context:
space:
mode:
authorIvan Topolnak <itopolnak@despegar.com>2014-01-02 18:09:53 -0300
committerIvan Topolnak <itopolnak@despegar.com>2014-01-13 17:37:20 -0300
commit7a10c0ef2a6566229e8571f6d385ca2ff794cc20 (patch)
treececd7ce6eb7a71f967eaa1605615780fa94d346c /kamon-core/src/main/scala/kamon/metrics
parent54143e4af6182b967736abc60a7fb20c88dd6587 (diff)
downloadKamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.tar.gz
Kamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.tar.bz2
Kamon-7a10c0ef2a6566229e8571f6d385ca2ff794cc20.zip
integrate trace and metrics into the base project
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metrics')
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala148
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/package.scala31
3 files changed, 210 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
new file mode 100644
index 00000000..72e473e8
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala
@@ -0,0 +1,31 @@
+/* ===================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================================================== */
+
+package kamon.metrics
+
+import akka.actor.{ Props, ExtendedActorSystem, ExtensionIdProvider, ExtensionId }
+import akka.actor
+import kamon.Kamon
+
+object ActorMetrics extends ExtensionId[ActorMetricsExtension] with ExtensionIdProvider {
+ def lookup(): ExtensionId[_ <: actor.Extension] = ActorMetrics
+ def createExtension(system: ExtendedActorSystem): ActorMetricsExtension = new ActorMetricsExtension(system)
+
+}
+
+class ActorMetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension with ActorMetricsOps {
+ lazy val metricsDispatcher = system.actorOf(Props[ActorMetricsDispatcher], "kamon-actor-metrics")
+}
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala
new file mode 100644
index 00000000..dc4abde0
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala
@@ -0,0 +1,148 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import org.HdrHistogram.{ AbstractHistogram, AtomicHistogram }
+import kamon.util.GlobPathFilter
+import scala.collection.concurrent.TrieMap
+import scala.collection.JavaConversions.iterableAsScalaIterable
+import akka.actor._
+import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, FlushMetrics }
+import kamon.Kamon
+import scala.concurrent.duration._
+import java.util.concurrent.TimeUnit
+import kamon.metrics.ActorMetricsDispatcher.Subscribe
+
+trait ActorMetricsOps {
+ self: ActorMetricsExtension ⇒
+
+ val config = system.settings.config.getConfig("kamon.metrics.actors")
+ val actorMetrics = TrieMap[String, HdrActorMetricsRecorder]()
+
+ val trackedActors: Vector[GlobPathFilter] = config.getStringList("tracked").map(glob ⇒ new GlobPathFilter(glob)).toVector
+ val excludedActors: Vector[GlobPathFilter] = config.getStringList("excluded").map(glob ⇒ new GlobPathFilter(glob)).toVector
+
+ val actorMetricsFactory: () ⇒ HdrActorMetricsRecorder = {
+ val settings = config.getConfig("hdr-settings")
+ val processingTimeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("processing-time"))
+ val timeInMailboxHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("time-in-mailbox"))
+ val mailboxSizeHdrConfig = HdrConfiguration.fromConfig(settings.getConfig("mailbox-size"))
+
+ () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig)
+ }
+
+ import scala.concurrent.duration._
+ system.scheduler.schedule(0.seconds, 10.seconds)(
+ actorMetrics.collect {
+ case (name, recorder: HdrActorMetricsRecorder) ⇒
+ println(s"Actor: $name")
+ recorder.processingTimeHistogram.copy.getHistogramData.outputPercentileDistribution(System.out, 1000000D)
+ })(system.dispatcher)
+
+ def shouldTrackActor(path: String): Boolean =
+ trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path))
+
+ def registerActor(path: String): HdrActorMetricsRecorder = actorMetrics.getOrElseUpdate(path, actorMetricsFactory())
+
+ def unregisterActor(path: String): Unit = actorMetrics.remove(path)
+}
+
+class HdrActorMetricsRecorder(processingTimeHdrConfig: HdrConfiguration, timeInMailboxHdrConfig: HdrConfiguration,
+ mailboxSizeHdrConfig: HdrConfiguration) {
+
+ val processingTimeHistogram = new AtomicHistogram(processingTimeHdrConfig.highestTrackableValue, processingTimeHdrConfig.significantValueDigits)
+ val timeInMailboxHistogram = new AtomicHistogram(timeInMailboxHdrConfig.highestTrackableValue, timeInMailboxHdrConfig.significantValueDigits)
+ val mailboxSizeHistogram = new AtomicHistogram(mailboxSizeHdrConfig.highestTrackableValue, mailboxSizeHdrConfig.significantValueDigits)
+
+ def recordTimeInMailbox(waitTime: Long): Unit = timeInMailboxHistogram.recordValue(waitTime)
+
+ def recordProcessingTime(processingTime: Long): Unit = processingTimeHistogram.recordValue(processingTime)
+
+ def snapshot(): HdrActorMetricsSnapshot = {
+ HdrActorMetricsSnapshot(processingTimeHistogram.copy(), timeInMailboxHistogram.copy(), mailboxSizeHistogram.copy())
+ }
+
+ def reset(): Unit = {
+ processingTimeHistogram.reset()
+ timeInMailboxHistogram.reset()
+ mailboxSizeHistogram.reset()
+ }
+}
+
+case class HdrActorMetricsSnapshot(processingTimeHistogram: AbstractHistogram, timeInMailboxHistogram: AbstractHistogram,
+ mailboxSizeHistogram: AbstractHistogram)
+
+class ActorMetricsDispatcher extends Actor {
+ val tickInterval = Duration(context.system.settings.config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS)
+ val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+
+ var subscribedForever: Map[GlobPathFilter, List[ActorRef]] = Map.empty
+ var subscribedForOne: Map[GlobPathFilter, List[ActorRef]] = Map.empty
+ var lastTick = System.currentTimeMillis()
+
+ def receive = {
+ case Subscribe(path, true) ⇒ subscribeForever(path, sender)
+ case Subscribe(path, false) ⇒ subscribeOneOff(path, sender)
+ case FlushMetrics ⇒ flushMetrics()
+ }
+
+ def subscribeForever(path: String, receiver: ActorRef): Unit = subscribedForever = subscribe(receiver, path, subscribedForever)
+
+ def subscribeOneOff(path: String, receiver: ActorRef): Unit = subscribedForOne = subscribe(receiver, path, subscribedForOne)
+
+ def subscribe(receiver: ActorRef, path: String, target: Map[GlobPathFilter, List[ActorRef]]): Map[GlobPathFilter, List[ActorRef]] = {
+ val pathFilter = new GlobPathFilter(path)
+ val oldReceivers = target.get(pathFilter).getOrElse(Nil)
+ target.updated(pathFilter, receiver :: oldReceivers)
+ }
+
+ def flushMetrics(): Unit = {
+ val currentTick = System.currentTimeMillis()
+ val snapshots = Kamon(ActorMetrics)(context.system).actorMetrics.map {
+ case (path, metrics) ⇒
+ val snapshot = metrics.snapshot()
+ metrics.reset()
+
+ (path, snapshot)
+ }.toMap
+
+ dispatchMetricsTo(subscribedForOne, snapshots, currentTick)
+ dispatchMetricsTo(subscribedForever, snapshots, currentTick)
+
+ subscribedForOne = Map.empty
+ lastTick = currentTick
+ }
+
+ def dispatchMetricsTo(subscribers: Map[GlobPathFilter, List[ActorRef]], snapshots: Map[String, HdrActorMetricsSnapshot],
+ currentTick: Long): Unit = {
+
+ for ((subscribedPath, receivers) ← subscribers) {
+ val metrics = snapshots.filterKeys(snapshotPath ⇒ subscribedPath.accept(snapshotPath))
+ val actorMetrics = ActorMetricsSnapshot(lastTick, currentTick, metrics)
+
+ receivers.foreach(ref ⇒ ref ! actorMetrics)
+ }
+ }
+}
+
+object ActorMetricsDispatcher {
+ case class Subscribe(path: String, forever: Boolean = false)
+ case class UnSubscribe(path: String)
+
+ case class ActorMetricsSnapshot(fromMillis: Long, toMillis: Long, metrics: Map[String, HdrActorMetricsSnapshot])
+ case object FlushMetrics
+}
diff --git a/kamon-core/src/main/scala/kamon/metrics/package.scala b/kamon-core/src/main/scala/kamon/metrics/package.scala
new file mode 100644
index 00000000..d6359ead
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/package.scala
@@ -0,0 +1,31 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+
+import scala.concurrent.duration._
+import com.typesafe.config.Config
+
+package object metrics {
+ val OneHour = 1.hour.toNanos
+
+ case class HdrConfiguration(highestTrackableValue: Long, significantValueDigits: Int)
+ case object HdrConfiguration {
+ def fromConfig(config: Config): HdrConfiguration = {
+ HdrConfiguration(config.getLong("highest-trackable-value"), config.getInt("significant-value-digits"))
+ }
+ }
+}