aboutsummaryrefslogtreecommitdiff
path: root/kamon-system-metrics/src/main/scala/kamon/system
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-system-metrics/src/main/scala/kamon/system')
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala77
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala30
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala107
3 files changed, 184 insertions, 30 deletions
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala b/kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala
new file mode 100644
index 00000000..ae2f50cf
--- /dev/null
+++ b/kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala
@@ -0,0 +1,77 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.system
+
+import java.lang.management.GarbageCollectorMXBean
+
+import akka.actor.{ Actor, Props }
+import kamon.metrics.GCMetrics.GCMetricRecorder
+
+import scala.concurrent.duration.FiniteDuration
+
+class GcMetricsCollector(collectInterval: FiniteDuration, recorder: Option[GCMetricRecorder], extractor: GcMetricExtractor) extends Actor {
+ import kamon.system.GcMetricsCollector._
+
+ val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(SystemMetrics(context.system).dispatcher)
+
+ def receive: Receive = {
+ case Collect ⇒ collectMetrics()
+ }
+
+ override def postStop() = collectSchedule.cancel()
+
+ def collectMetrics(): Unit = recorder.map(recordGc)
+
+ private def recordGc(gcr: GCMetricRecorder) = {
+ val gcMeasure = extractor.extract()
+
+ gcr.count.record(gcMeasure.collectionCount)
+ gcr.time.record(gcMeasure.collectionTime)
+ }
+}
+
+object GcMetricsCollector {
+ case object Collect
+
+ def props(collectInterval: FiniteDuration, recorder: Option[GCMetricRecorder], extractor: GcMetricExtractor): Props = Props(classOf[GcMetricsCollector], collectInterval, recorder, extractor)
+}
+
+case class GcMeasure(collectionCount: Long, collectionTime: Long)
+
+case class GcMetricExtractor(gc: GarbageCollectorMXBean) {
+ var previousGcCount = 0L
+ var previousGcTime = 0L
+
+ def extract(): GcMeasure = {
+ var diffCollectionCount = 0L
+ var diffCollectionTime = 0L
+
+ val collectionCount = gc.getCollectionCount
+ val collectionTime = gc.getCollectionTime
+
+ if (collectionCount > 0)
+ diffCollectionCount = collectionCount - previousGcCount
+
+ if (collectionTime > 0)
+ diffCollectionTime = collectionTime - previousGcTime
+
+ previousGcCount = collectionCount
+ previousGcTime = collectionTime
+
+ GcMeasure(diffCollectionCount, diffCollectionTime)
+ }
+} \ No newline at end of file
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
index 62ffdb33..cb3e2695 100644
--- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
@@ -16,19 +16,16 @@
package kamon.system
import java.lang.management.ManagementFactory
-
import akka.actor._
import akka.event.Logging
import kamon.Kamon
import kamon.metric.Metrics
import kamon.metrics._
-
import scala.collection.JavaConverters._
import scala.concurrent.duration._
object SystemMetrics extends ExtensionId[SystemMetricsExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = SystemMetrics
-
override def createExtension(system: ExtendedActorSystem): SystemMetricsExtension = new SystemMetricsExtension(system)
}
@@ -38,14 +35,25 @@ class SystemMetricsExtension(private val system: ExtendedActorSystem) extends Ka
val log = Logging(system, classOf[SystemMetricsExtension])
log.info(s"Starting the Kamon(SystemMetrics) extension")
+ val config = system.settings.config.getConfig("kamon.system-metrics")
+ val dispatcher = system.dispatchers.lookup(config.getString("dispatcher"))
+ val sigarFolder = system.settings.config.getString("kamon.sigar.folder")
val systemMetricsExtension = Kamon(Metrics)(system)
+ //System Metrics
+ system.actorOf(SystemMetricsCollector.props(1 second), "system-metrics-collector")
+
//JVM Metrics
systemMetricsExtension.register(HeapMetrics(Heap), HeapMetrics.Factory)
- garbageCollectors.map { gc ⇒ systemMetricsExtension.register(GCMetrics(gc.getName), GCMetrics.Factory(gc)) }
+ systemMetricsExtension.register(NonHeapMetrics(NonHeap), NonHeapMetrics.Factory)
+ systemMetricsExtension.register(ClassLoadingMetrics(Classes), ClassLoadingMetrics.Factory)
+ systemMetricsExtension.register(ThreadMetrics(Threads), ThreadMetrics.Factory)
- //System Metrics
- system.actorOf(SystemMetricsCollector.props(1 second), "system-metrics-collector")
+ garbageCollectors.map { gc ⇒
+ val gcName = sanitize(gc.getName)
+ val recorder = systemMetricsExtension.register(GCMetrics(gcName), GCMetrics.Factory(gc))
+ system.actorOf(GcMetricsCollector.props(1 second, recorder, GcMetricExtractor(gc)), s"$gcName-collector")
+ }
}
object SystemMetricsExtension {
@@ -54,11 +62,17 @@ object SystemMetricsExtension {
val Network = "network"
val Memory = "memory"
val Heap = "heap"
+ val NonHeap = "non-heap"
+ val Classes = "classes"
+ val Threads = "thread"
val ContextSwitches = "context-switches"
+ val Disk = "disk"
+ val LoadAverage = "load-average"
- def toKB(value: Long): Long = (value / 1024)
- def toMB(value: Long): Long = (value / 1024 / 1024)
+ def toKB(value: Long): Long = value / 1024
+ def toMB(value: Long): Long = value / 1024 / 1024
def toLong(value: Double): Long = math round (value * 100L)
+ def sanitize(str: String): String = str.replaceAll("""[^\w]""", "-")
val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans.asScala.filter(_.isValid)
}
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala
index c200091e..4391240a 100644
--- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala
@@ -22,16 +22,19 @@ import kamon.Kamon
import kamon.metric.Metrics
import kamon.metrics.CPUMetrics.CPUMetricRecorder
import kamon.metrics.ContextSwitchesMetrics.ContextSwitchesMetricsRecorder
+import kamon.metrics.DiskMetrics.DiskMetricsRecorder
+import kamon.metrics.LoadAverageMetrics.LoadAverageMetricsRecorder
import kamon.metrics.MemoryMetrics.MemoryMetricRecorder
import kamon.metrics.NetworkMetrics.NetworkMetricRecorder
import kamon.metrics.ProcessCPUMetrics.ProcessCPUMetricsRecorder
import kamon.metrics._
import kamon.sigar.SigarProvisioner
-import org.hyperic.sigar.{ Sigar, Mem, NetInterfaceStat, SigarProxy }
+import org.hyperic.sigar._
+import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.FiniteDuration
import scala.io.Source
-import scala.util.control.NonFatal
+import scala.collection.mutable
class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with ActorLogging with SystemMetricsBanner {
import kamon.system.SystemMetricsCollector._
@@ -40,16 +43,19 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
lazy val sigar = createSigarInstance
def pid = sigar.getPid
- val interfaces: Set[String] = sigar.getNetInterfaceList.toSet
+ val interfaces = sigar.getNetInterfaceList.filterNot(NetworkFilter).toSet
+ val fileSystems = sigar.getFileSystemList.filter(_.getType == FileSystem.TYPE_LOCAL_DISK).map(_.getDevName).toSet
- val systemMetricsExtension = Kamon(Metrics)(context.system)
- val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher)
+ val metricExtension = Kamon(Metrics)(context.system)
+ val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(SystemMetrics(context.system).dispatcher)
- val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
- val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
- val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
- val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)
- val contextSwitchesRecorder = systemMetricsExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory)
+ val cpuRecorder = metricExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
+ val processCpuRecorder = metricExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
+ val memoryRecorder = metricExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
+ val networkRecorder = metricExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)
+ val contextSwitchesRecorder = metricExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory)
+ val diskRecorder = metricExtension.register(DiskMetrics(Disk), DiskMetrics.Factory)
+ val loadAverageRecorder = metricExtension.register(LoadAverageMetrics(LoadAverage), LoadAverageMetrics.Factory)
def receive: Receive = {
case Collect ⇒ collectMetrics()
@@ -62,6 +68,8 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
processCpuRecorder.map(recordProcessCpu)
memoryRecorder.map(recordMemory)
networkRecorder.map(recordNetwork)
+ diskRecorder.map(recordDisk)
+ loadAverageRecorder.map(recordLoadAverage)
if (OsUtils.isLinux)
contextSwitchesRecorder.map(recordContextSwitches)
@@ -100,23 +108,60 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
}
private def recordNetwork(nr: NetworkMetricRecorder) = {
- nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getRxBytes)))
- nr.txBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getTxBytes)))
- nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors))
- nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors))
- nr.rxDropped.record(collect(sigar, interfaces)(net ⇒ net.getRxDropped))
- nr.txDropped.record(collect(sigar, interfaces)(net ⇒ net.getTxDropped))
-
- def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = {
- interfaces.foldLeft(0L) { (totalBytes, interface) ⇒
+ import Networks._
+ nr.rxBytes.record(collect(sigar, interfaces, RxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getRxBytes)))
+ nr.txBytes.record(collect(sigar, interfaces, TxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getTxBytes)))
+ nr.rxErrors.record(collect(sigar, interfaces, RxErrors, previousNetworkMetrics)(net ⇒ net.getRxErrors))
+ nr.txErrors.record(collect(sigar, interfaces, TxErrors, previousNetworkMetrics)(net ⇒ net.getTxErrors))
+ nr.rxDropped.record(collect(sigar, interfaces, RxDropped, previousNetworkMetrics)(net ⇒ net.getRxDropped))
+ nr.txDropped.record(collect(sigar, interfaces, TxDropped, previousNetworkMetrics)(net ⇒ net.getTxDropped))
+
+ def collect(sigar: SigarProxy, interfaces: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: NetInterfaceStat ⇒ Long): Long = {
+ interfaces.foldLeft(0L) { (acc, interface) ⇒
{
val net = sigar.getNetInterfaceStat(interface)
- totalBytes + block(net)
+ val previous = previousMetrics.getOrElse(interface, mutable.Map.empty[String, Long])
+ val current = thunk(net)
+ val delta = current - previous.getOrElse(name, 0L)
+ previousMetrics.put(interface, previous += name -> current)
+ acc + delta
}
}
}
}
+ private def recordDisk(rd: DiskMetricsRecorder) = {
+ import Disks._
+
+ rd.reads.record(collect(sigar, fileSystems, Reads, previousDiskMetrics)(disk ⇒ disk.getReads))
+ rd.writes.record(collect(sigar, fileSystems, Writes, previousDiskMetrics)(disk ⇒ disk.getWrites))
+ rd.queue.record(collect(sigar, fileSystems, Queue, previousDiskMetrics)(disk ⇒ toLong(disk.getQueue)))
+ rd.serviceTime.record(collect(sigar, fileSystems, Service, previousDiskMetrics)(disk ⇒ toLong(disk.getServiceTime)))
+ }
+
+ def collect(sigar: SigarProxy, fileSystems: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: DiskUsage ⇒ Long): Long = {
+ fileSystems.foldLeft(0L) { (acc, fileSystem) ⇒
+ {
+ val disk = sigar.getDiskUsage(fileSystem)
+ val previous = previousMetrics.getOrElse(fileSystem, mutable.Map.empty[String, Long])
+ val value = thunk(disk)
+ val current = if (value == Sigar.FIELD_NOTIMPL) 0L else value
+ val delta = current - previous.getOrElse(name, 0L)
+ previousMetrics.put(fileSystem, previous += name -> current)
+ acc + delta
+ }
+ }
+ }
+
+ private def recordLoadAverage(lar: LoadAverageMetricsRecorder) = {
+ val loadAverage = sigar.getLoadAverage
+ val (one, five, fifteen) = (loadAverage(0), loadAverage(1), loadAverage(2))
+
+ lar.one.record(toLong(one))
+ lar.five.record(toLong(five))
+ lar.fifteen.record(toLong(fifteen))
+ }
+
private def recordContextSwitches(rcs: ContextSwitchesMetricsRecorder) = {
def contextSwitchesByProcess(pid: Long): (Long, Long) = {
val filename = s"/proc/$pid/status"
@@ -167,12 +212,11 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
}
def provisionSigarLibrary: Unit = {
- val folder = context.system.settings.config.getString("kamon.sigar.folder")
+ val folder = SystemMetrics(context.system).sigarFolder
SigarProvisioner.provision(new File(folder))
}
def createSigarInstance: SigarProxy = {
-
// 1) Assume that library is already provisioned.
try {
return verifiedSigarInstance
@@ -193,6 +237,25 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
}
object SystemMetricsCollector {
+ val NetworkFilter = Set("lo")
+ val previousDiskMetrics = TrieMap[String, mutable.Map[String, Long]]()
+ val previousNetworkMetrics = TrieMap[String, mutable.Map[String, Long]]()
+
+ object Networks {
+ val RxBytes = "rxBytes"
+ val TxBytes = "txBytes"
+ val RxErrors = "rxErrors"
+ val TxErrors = "txErrors"
+ val RxDropped = "rxDropped"
+ val TxDropped = "txDropped"
+ }
+
+ object Disks {
+ val Reads = "reads"
+ val Writes = "writes"
+ val Queue = "queue"
+ val Service = "service"
+ }
case object Collect
object OsUtils {