diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-12 01:45:27 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-24 23:19:01 +0100 |
commit | 01a34f67ff75419c440f2e69c0a0db888a670a34 (patch) | |
tree | 9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala | |
parent | 4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff) | |
download | Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2 Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip |
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala')
-rw-r--r-- | kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala | 266 |
1 files changed, 0 insertions, 266 deletions
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala deleted file mode 100644 index 4391240a..00000000 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala +++ /dev/null @@ -1,266 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.system - -import java.io.{ File, IOException } - -import akka.actor.{ Actor, ActorLogging, Props } -import kamon.Kamon -import kamon.metric.Metrics -import kamon.metrics.CPUMetrics.CPUMetricRecorder -import kamon.metrics.ContextSwitchesMetrics.ContextSwitchesMetricsRecorder -import kamon.metrics.DiskMetrics.DiskMetricsRecorder -import kamon.metrics.LoadAverageMetrics.LoadAverageMetricsRecorder -import kamon.metrics.MemoryMetrics.MemoryMetricRecorder -import kamon.metrics.NetworkMetrics.NetworkMetricRecorder -import kamon.metrics.ProcessCPUMetrics.ProcessCPUMetricsRecorder -import kamon.metrics._ -import kamon.sigar.SigarProvisioner -import org.hyperic.sigar._ - -import scala.collection.concurrent.TrieMap -import scala.concurrent.duration.FiniteDuration -import scala.io.Source -import scala.collection.mutable - -class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with ActorLogging with SystemMetricsBanner { - import kamon.system.SystemMetricsCollector._ - import kamon.system.SystemMetricsExtension._ - - lazy val sigar = createSigarInstance - def pid = sigar.getPid - - val interfaces = sigar.getNetInterfaceList.filterNot(NetworkFilter).toSet - val fileSystems = sigar.getFileSystemList.filter(_.getType == FileSystem.TYPE_LOCAL_DISK).map(_.getDevName).toSet - - val metricExtension = Kamon(Metrics)(context.system) - val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(SystemMetrics(context.system).dispatcher) - - val cpuRecorder = metricExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) - val processCpuRecorder = metricExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) - val memoryRecorder = metricExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) - val networkRecorder = metricExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) - val contextSwitchesRecorder = metricExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory) - val diskRecorder = metricExtension.register(DiskMetrics(Disk), DiskMetrics.Factory) - val loadAverageRecorder = metricExtension.register(LoadAverageMetrics(LoadAverage), LoadAverageMetrics.Factory) - - def receive: Receive = { - case Collect ⇒ collectMetrics() - } - - override def postStop() = collectSchedule.cancel() - - def collectMetrics() = { - cpuRecorder.map(recordCpu) - processCpuRecorder.map(recordProcessCpu) - memoryRecorder.map(recordMemory) - networkRecorder.map(recordNetwork) - diskRecorder.map(recordDisk) - loadAverageRecorder.map(recordLoadAverage) - - if (OsUtils.isLinux) - contextSwitchesRecorder.map(recordContextSwitches) - } - - private def recordCpu(cpur: CPUMetricRecorder) = { - val cpuPerc = sigar.getCpuPerc - cpur.user.record(toLong(cpuPerc.getUser)) - cpur.system.record(toLong(cpuPerc.getSys)) - cpur.cpuWait.record(toLong(cpuPerc.getWait)) - cpur.idle.record(toLong(cpuPerc.getIdle)) - cpur.stolen.record(toLong(cpuPerc.getStolen)) - } - - private def recordProcessCpu(pcpur: ProcessCPUMetricsRecorder) = { - val procCpu = sigar.getProcCpu(pid) - val procTime = sigar.getProcTime(pid) - - pcpur.cpuPercent.record(toLong(procCpu.getPercent)) - pcpur.totalProcessTime.record(procTime.getTotal) // gives an idea of what is really measured and then interpreted as % - } - - private def recordMemory(mr: MemoryMetricRecorder) = { - val mem = sigar.getMem - val swap = sigar.getSwap - - mr.used.record(toMB(mem.getUsed)) - mr.free.record(toMB(mem.getFree)) - mr.swapUsed.record(toMB(swap.getUsed)) - mr.swapFree.record(toMB(swap.getFree)) - mr.buffer.record(toMB(collectBuffer(mem))) - mr.cache.record(toMB(collectCache(mem))) - - def collectBuffer(mem: Mem): Long = if (mem.getUsed != mem.getActualUsed) mem.getActualUsed else 0L - def collectCache(mem: Mem): Long = if (mem.getFree != mem.getActualFree) mem.getActualFree else 0L - } - - private def recordNetwork(nr: NetworkMetricRecorder) = { - import Networks._ - nr.rxBytes.record(collect(sigar, interfaces, RxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getRxBytes))) - nr.txBytes.record(collect(sigar, interfaces, TxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getTxBytes))) - nr.rxErrors.record(collect(sigar, interfaces, RxErrors, previousNetworkMetrics)(net ⇒ net.getRxErrors)) - nr.txErrors.record(collect(sigar, interfaces, TxErrors, previousNetworkMetrics)(net ⇒ net.getTxErrors)) - nr.rxDropped.record(collect(sigar, interfaces, RxDropped, previousNetworkMetrics)(net ⇒ net.getRxDropped)) - nr.txDropped.record(collect(sigar, interfaces, TxDropped, previousNetworkMetrics)(net ⇒ net.getTxDropped)) - - def collect(sigar: SigarProxy, interfaces: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: NetInterfaceStat ⇒ Long): Long = { - interfaces.foldLeft(0L) { (acc, interface) ⇒ - { - val net = sigar.getNetInterfaceStat(interface) - val previous = previousMetrics.getOrElse(interface, mutable.Map.empty[String, Long]) - val current = thunk(net) - val delta = current - previous.getOrElse(name, 0L) - previousMetrics.put(interface, previous += name -> current) - acc + delta - } - } - } - } - - private def recordDisk(rd: DiskMetricsRecorder) = { - import Disks._ - - rd.reads.record(collect(sigar, fileSystems, Reads, previousDiskMetrics)(disk ⇒ disk.getReads)) - rd.writes.record(collect(sigar, fileSystems, Writes, previousDiskMetrics)(disk ⇒ disk.getWrites)) - rd.queue.record(collect(sigar, fileSystems, Queue, previousDiskMetrics)(disk ⇒ toLong(disk.getQueue))) - rd.serviceTime.record(collect(sigar, fileSystems, Service, previousDiskMetrics)(disk ⇒ toLong(disk.getServiceTime))) - } - - def collect(sigar: SigarProxy, fileSystems: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: DiskUsage ⇒ Long): Long = { - fileSystems.foldLeft(0L) { (acc, fileSystem) ⇒ - { - val disk = sigar.getDiskUsage(fileSystem) - val previous = previousMetrics.getOrElse(fileSystem, mutable.Map.empty[String, Long]) - val value = thunk(disk) - val current = if (value == Sigar.FIELD_NOTIMPL) 0L else value - val delta = current - previous.getOrElse(name, 0L) - previousMetrics.put(fileSystem, previous += name -> current) - acc + delta - } - } - } - - private def recordLoadAverage(lar: LoadAverageMetricsRecorder) = { - val loadAverage = sigar.getLoadAverage - val (one, five, fifteen) = (loadAverage(0), loadAverage(1), loadAverage(2)) - - lar.one.record(toLong(one)) - lar.five.record(toLong(five)) - lar.fifteen.record(toLong(fifteen)) - } - - private def recordContextSwitches(rcs: ContextSwitchesMetricsRecorder) = { - def contextSwitchesByProcess(pid: Long): (Long, Long) = { - val filename = s"/proc/$pid/status" - var voluntaryContextSwitches = 0L - var nonVoluntaryContextSwitches = 0L - - try { - for (line ← Source.fromFile(filename).getLines()) { - if (line.startsWith("voluntary_ctxt_switches")) { - voluntaryContextSwitches = line.substring(line.indexOf(":") + 1).trim.toLong - } - if (line.startsWith("nonvoluntary_ctxt_switches")) { - nonVoluntaryContextSwitches = line.substring(line.indexOf(":") + 1).trim.toLong - } - } - } catch { - case ex: IOException ⇒ log.error("Error trying to read [{}]", filename) - } - (voluntaryContextSwitches, nonVoluntaryContextSwitches) - } - - def contextSwitches: Long = { - val filename = "/proc/stat" - var contextSwitches = 0L - - try { - for (line ← Source.fromFile(filename).getLines()) { - if (line.startsWith("rcs")) { - contextSwitches = line.substring(line.indexOf(" ") + 1).toLong - } - } - } catch { - case ex: IOException ⇒ log.error("Error trying to read [{}]", filename) - } - contextSwitches - } - - val (perProcessVoluntary, perProcessNonVoluntary) = contextSwitchesByProcess(pid) - rcs.perProcessVoluntary.record(perProcessVoluntary) - rcs.perProcessNonVoluntary.record(perProcessNonVoluntary) - rcs.global.record(contextSwitches) - } - - def verifiedSigarInstance: SigarProxy = { - val sigar = new Sigar() - printBanner(sigar) - sigar - } - - def provisionSigarLibrary: Unit = { - val folder = SystemMetrics(context.system).sigarFolder - SigarProvisioner.provision(new File(folder)) - } - - def createSigarInstance: SigarProxy = { - // 1) Assume that library is already provisioned. - try { - return verifiedSigarInstance - } catch { - // Not using [[Try]] - any error is non-fatal in this case. - case e: Throwable ⇒ log.info(s"Sigar is not yet provisioned: ${e}") - } - - // 2) Attempt to provision library via sigar-loader. - try { - provisionSigarLibrary - return verifiedSigarInstance - } catch { - // Not using [[Try]] - any error is non-fatal in this case. - case e: Throwable ⇒ throw new UnexpectedSigarException(s"Failed to load Sigar: ${e}") - } - } -} - -object SystemMetricsCollector { - val NetworkFilter = Set("lo") - val previousDiskMetrics = TrieMap[String, mutable.Map[String, Long]]() - val previousNetworkMetrics = TrieMap[String, mutable.Map[String, Long]]() - - object Networks { - val RxBytes = "rxBytes" - val TxBytes = "txBytes" - val RxErrors = "rxErrors" - val TxErrors = "txErrors" - val RxDropped = "rxDropped" - val TxDropped = "txDropped" - } - - object Disks { - val Reads = "reads" - val Writes = "writes" - val Queue = "queue" - val Service = "service" - } - case object Collect - - object OsUtils { - def isLinux: Boolean = System.getProperty("os.name").indexOf("Linux") != -1 - } - - def props(collectInterval: FiniteDuration): Props = Props(classOf[SystemMetricsCollector], collectInterval) -}
\ No newline at end of file |