diff options
Diffstat (limited to 'kamon-system-metrics/src/main/scala/kamon/system')
3 files changed, 184 insertions, 30 deletions
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala b/kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala new file mode 100644 index 00000000..ae2f50cf --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala @@ -0,0 +1,77 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.system + +import java.lang.management.GarbageCollectorMXBean + +import akka.actor.{ Actor, Props } +import kamon.metrics.GCMetrics.GCMetricRecorder + +import scala.concurrent.duration.FiniteDuration + +class GcMetricsCollector(collectInterval: FiniteDuration, recorder: Option[GCMetricRecorder], extractor: GcMetricExtractor) extends Actor { + import kamon.system.GcMetricsCollector._ + + val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(SystemMetrics(context.system).dispatcher) + + def receive: Receive = { + case Collect ⇒ collectMetrics() + } + + override def postStop() = collectSchedule.cancel() + + def collectMetrics(): Unit = recorder.map(recordGc) + + private def recordGc(gcr: GCMetricRecorder) = { + val gcMeasure = extractor.extract() + + gcr.count.record(gcMeasure.collectionCount) + gcr.time.record(gcMeasure.collectionTime) + } +} + +object GcMetricsCollector { + case object Collect + + def props(collectInterval: FiniteDuration, recorder: Option[GCMetricRecorder], extractor: GcMetricExtractor): Props = Props(classOf[GcMetricsCollector], collectInterval, recorder, extractor) +} + +case class GcMeasure(collectionCount: Long, collectionTime: Long) + +case class GcMetricExtractor(gc: GarbageCollectorMXBean) { + var previousGcCount = 0L + var previousGcTime = 0L + + def extract(): GcMeasure = { + var diffCollectionCount = 0L + var diffCollectionTime = 0L + + val collectionCount = gc.getCollectionCount + val collectionTime = gc.getCollectionTime + + if (collectionCount > 0) + diffCollectionCount = collectionCount - previousGcCount + + if (collectionTime > 0) + diffCollectionTime = collectionTime - previousGcTime + + previousGcCount = collectionCount + previousGcTime = collectionTime + + GcMeasure(diffCollectionCount, diffCollectionTime) + } +}
\ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala index 62ffdb33..cb3e2695 100644 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala +++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala @@ -16,19 +16,16 @@ package kamon.system import java.lang.management.ManagementFactory - import akka.actor._ import akka.event.Logging import kamon.Kamon import kamon.metric.Metrics import kamon.metrics._ - import scala.collection.JavaConverters._ import scala.concurrent.duration._ object SystemMetrics extends ExtensionId[SystemMetricsExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = SystemMetrics - override def createExtension(system: ExtendedActorSystem): SystemMetricsExtension = new SystemMetricsExtension(system) } @@ -38,14 +35,25 @@ class SystemMetricsExtension(private val system: ExtendedActorSystem) extends Ka val log = Logging(system, classOf[SystemMetricsExtension]) log.info(s"Starting the Kamon(SystemMetrics) extension") + val config = system.settings.config.getConfig("kamon.system-metrics") + val dispatcher = system.dispatchers.lookup(config.getString("dispatcher")) + val sigarFolder = system.settings.config.getString("kamon.sigar.folder") val systemMetricsExtension = Kamon(Metrics)(system) + //System Metrics + system.actorOf(SystemMetricsCollector.props(1 second), "system-metrics-collector") + //JVM Metrics systemMetricsExtension.register(HeapMetrics(Heap), HeapMetrics.Factory) - garbageCollectors.map { gc ⇒ systemMetricsExtension.register(GCMetrics(gc.getName), GCMetrics.Factory(gc)) } + systemMetricsExtension.register(NonHeapMetrics(NonHeap), NonHeapMetrics.Factory) + systemMetricsExtension.register(ClassLoadingMetrics(Classes), ClassLoadingMetrics.Factory) + systemMetricsExtension.register(ThreadMetrics(Threads), ThreadMetrics.Factory) - //System Metrics - system.actorOf(SystemMetricsCollector.props(1 second), "system-metrics-collector") + garbageCollectors.map { gc ⇒ + val gcName = sanitize(gc.getName) + val recorder = systemMetricsExtension.register(GCMetrics(gcName), GCMetrics.Factory(gc)) + system.actorOf(GcMetricsCollector.props(1 second, recorder, GcMetricExtractor(gc)), s"$gcName-collector") + } } object SystemMetricsExtension { @@ -54,11 +62,17 @@ object SystemMetricsExtension { val Network = "network" val Memory = "memory" val Heap = "heap" + val NonHeap = "non-heap" + val Classes = "classes" + val Threads = "thread" val ContextSwitches = "context-switches" + val Disk = "disk" + val LoadAverage = "load-average" - def toKB(value: Long): Long = (value / 1024) - def toMB(value: Long): Long = (value / 1024 / 1024) + def toKB(value: Long): Long = value / 1024 + def toMB(value: Long): Long = value / 1024 / 1024 def toLong(value: Double): Long = math round (value * 100L) + def sanitize(str: String): String = str.replaceAll("""[^\w]""", "-") val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans.asScala.filter(_.isValid) } diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala index c200091e..4391240a 100644 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala +++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala @@ -22,16 +22,19 @@ import kamon.Kamon import kamon.metric.Metrics import kamon.metrics.CPUMetrics.CPUMetricRecorder import kamon.metrics.ContextSwitchesMetrics.ContextSwitchesMetricsRecorder +import kamon.metrics.DiskMetrics.DiskMetricsRecorder +import kamon.metrics.LoadAverageMetrics.LoadAverageMetricsRecorder import kamon.metrics.MemoryMetrics.MemoryMetricRecorder import kamon.metrics.NetworkMetrics.NetworkMetricRecorder import kamon.metrics.ProcessCPUMetrics.ProcessCPUMetricsRecorder import kamon.metrics._ import kamon.sigar.SigarProvisioner -import org.hyperic.sigar.{ Sigar, Mem, NetInterfaceStat, SigarProxy } +import org.hyperic.sigar._ +import scala.collection.concurrent.TrieMap import scala.concurrent.duration.FiniteDuration import scala.io.Source -import scala.util.control.NonFatal +import scala.collection.mutable class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with ActorLogging with SystemMetricsBanner { import kamon.system.SystemMetricsCollector._ @@ -40,16 +43,19 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with lazy val sigar = createSigarInstance def pid = sigar.getPid - val interfaces: Set[String] = sigar.getNetInterfaceList.toSet + val interfaces = sigar.getNetInterfaceList.filterNot(NetworkFilter).toSet + val fileSystems = sigar.getFileSystemList.filter(_.getType == FileSystem.TYPE_LOCAL_DISK).map(_.getDevName).toSet - val systemMetricsExtension = Kamon(Metrics)(context.system) - val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher) + val metricExtension = Kamon(Metrics)(context.system) + val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(SystemMetrics(context.system).dispatcher) - val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) - val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) - val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) - val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) - val contextSwitchesRecorder = systemMetricsExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory) + val cpuRecorder = metricExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) + val processCpuRecorder = metricExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) + val memoryRecorder = metricExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) + val networkRecorder = metricExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) + val contextSwitchesRecorder = metricExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory) + val diskRecorder = metricExtension.register(DiskMetrics(Disk), DiskMetrics.Factory) + val loadAverageRecorder = metricExtension.register(LoadAverageMetrics(LoadAverage), LoadAverageMetrics.Factory) def receive: Receive = { case Collect ⇒ collectMetrics() @@ -62,6 +68,8 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with processCpuRecorder.map(recordProcessCpu) memoryRecorder.map(recordMemory) networkRecorder.map(recordNetwork) + diskRecorder.map(recordDisk) + loadAverageRecorder.map(recordLoadAverage) if (OsUtils.isLinux) contextSwitchesRecorder.map(recordContextSwitches) @@ -100,23 +108,60 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with } private def recordNetwork(nr: NetworkMetricRecorder) = { - nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getRxBytes))) - nr.txBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getTxBytes))) - nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors)) - nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors)) - nr.rxDropped.record(collect(sigar, interfaces)(net ⇒ net.getRxDropped)) - nr.txDropped.record(collect(sigar, interfaces)(net ⇒ net.getTxDropped)) - - def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = { - interfaces.foldLeft(0L) { (totalBytes, interface) ⇒ + import Networks._ + nr.rxBytes.record(collect(sigar, interfaces, RxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getRxBytes))) + nr.txBytes.record(collect(sigar, interfaces, TxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getTxBytes))) + nr.rxErrors.record(collect(sigar, interfaces, RxErrors, previousNetworkMetrics)(net ⇒ net.getRxErrors)) + nr.txErrors.record(collect(sigar, interfaces, TxErrors, previousNetworkMetrics)(net ⇒ net.getTxErrors)) + nr.rxDropped.record(collect(sigar, interfaces, RxDropped, previousNetworkMetrics)(net ⇒ net.getRxDropped)) + nr.txDropped.record(collect(sigar, interfaces, TxDropped, previousNetworkMetrics)(net ⇒ net.getTxDropped)) + + def collect(sigar: SigarProxy, interfaces: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: NetInterfaceStat ⇒ Long): Long = { + interfaces.foldLeft(0L) { (acc, interface) ⇒ { val net = sigar.getNetInterfaceStat(interface) - totalBytes + block(net) + val previous = previousMetrics.getOrElse(interface, mutable.Map.empty[String, Long]) + val current = thunk(net) + val delta = current - previous.getOrElse(name, 0L) + previousMetrics.put(interface, previous += name -> current) + acc + delta } } } } + private def recordDisk(rd: DiskMetricsRecorder) = { + import Disks._ + + rd.reads.record(collect(sigar, fileSystems, Reads, previousDiskMetrics)(disk ⇒ disk.getReads)) + rd.writes.record(collect(sigar, fileSystems, Writes, previousDiskMetrics)(disk ⇒ disk.getWrites)) + rd.queue.record(collect(sigar, fileSystems, Queue, previousDiskMetrics)(disk ⇒ toLong(disk.getQueue))) + rd.serviceTime.record(collect(sigar, fileSystems, Service, previousDiskMetrics)(disk ⇒ toLong(disk.getServiceTime))) + } + + def collect(sigar: SigarProxy, fileSystems: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: DiskUsage ⇒ Long): Long = { + fileSystems.foldLeft(0L) { (acc, fileSystem) ⇒ + { + val disk = sigar.getDiskUsage(fileSystem) + val previous = previousMetrics.getOrElse(fileSystem, mutable.Map.empty[String, Long]) + val value = thunk(disk) + val current = if (value == Sigar.FIELD_NOTIMPL) 0L else value + val delta = current - previous.getOrElse(name, 0L) + previousMetrics.put(fileSystem, previous += name -> current) + acc + delta + } + } + } + + private def recordLoadAverage(lar: LoadAverageMetricsRecorder) = { + val loadAverage = sigar.getLoadAverage + val (one, five, fifteen) = (loadAverage(0), loadAverage(1), loadAverage(2)) + + lar.one.record(toLong(one)) + lar.five.record(toLong(five)) + lar.fifteen.record(toLong(fifteen)) + } + private def recordContextSwitches(rcs: ContextSwitchesMetricsRecorder) = { def contextSwitchesByProcess(pid: Long): (Long, Long) = { val filename = s"/proc/$pid/status" @@ -167,12 +212,11 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with } def provisionSigarLibrary: Unit = { - val folder = context.system.settings.config.getString("kamon.sigar.folder") + val folder = SystemMetrics(context.system).sigarFolder SigarProvisioner.provision(new File(folder)) } def createSigarInstance: SigarProxy = { - // 1) Assume that library is already provisioned. try { return verifiedSigarInstance @@ -193,6 +237,25 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with } object SystemMetricsCollector { + val NetworkFilter = Set("lo") + val previousDiskMetrics = TrieMap[String, mutable.Map[String, Long]]() + val previousNetworkMetrics = TrieMap[String, mutable.Map[String, Long]]() + + object Networks { + val RxBytes = "rxBytes" + val TxBytes = "txBytes" + val RxErrors = "rxErrors" + val TxErrors = "txErrors" + val RxDropped = "rxDropped" + val TxDropped = "txDropped" + } + + object Disks { + val Reads = "reads" + val Writes = "writes" + val Queue = "queue" + val Service = "service" + } case object Collect object OsUtils { |