diff options
Diffstat (limited to 'kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala')
-rw-r--r-- | kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala | 107 |
1 files changed, 85 insertions, 22 deletions
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala index c200091e..4391240a 100644 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala +++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala @@ -22,16 +22,19 @@ import kamon.Kamon import kamon.metric.Metrics import kamon.metrics.CPUMetrics.CPUMetricRecorder import kamon.metrics.ContextSwitchesMetrics.ContextSwitchesMetricsRecorder +import kamon.metrics.DiskMetrics.DiskMetricsRecorder +import kamon.metrics.LoadAverageMetrics.LoadAverageMetricsRecorder import kamon.metrics.MemoryMetrics.MemoryMetricRecorder import kamon.metrics.NetworkMetrics.NetworkMetricRecorder import kamon.metrics.ProcessCPUMetrics.ProcessCPUMetricsRecorder import kamon.metrics._ import kamon.sigar.SigarProvisioner -import org.hyperic.sigar.{ Sigar, Mem, NetInterfaceStat, SigarProxy } +import org.hyperic.sigar._ +import scala.collection.concurrent.TrieMap import scala.concurrent.duration.FiniteDuration import scala.io.Source -import scala.util.control.NonFatal +import scala.collection.mutable class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with ActorLogging with SystemMetricsBanner { import kamon.system.SystemMetricsCollector._ @@ -40,16 +43,19 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with lazy val sigar = createSigarInstance def pid = sigar.getPid - val interfaces: Set[String] = sigar.getNetInterfaceList.toSet + val interfaces = sigar.getNetInterfaceList.filterNot(NetworkFilter).toSet + val fileSystems = sigar.getFileSystemList.filter(_.getType == FileSystem.TYPE_LOCAL_DISK).map(_.getDevName).toSet - val systemMetricsExtension = Kamon(Metrics)(context.system) - val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher) + val metricExtension = Kamon(Metrics)(context.system) + val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(SystemMetrics(context.system).dispatcher) - val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) - val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) - val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) - val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) - val contextSwitchesRecorder = systemMetricsExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory) + val cpuRecorder = metricExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) + val processCpuRecorder = metricExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) + val memoryRecorder = metricExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) + val networkRecorder = metricExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) + val contextSwitchesRecorder = metricExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory) + val diskRecorder = metricExtension.register(DiskMetrics(Disk), DiskMetrics.Factory) + val loadAverageRecorder = metricExtension.register(LoadAverageMetrics(LoadAverage), LoadAverageMetrics.Factory) def receive: Receive = { case Collect ⇒ collectMetrics() @@ -62,6 +68,8 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with processCpuRecorder.map(recordProcessCpu) memoryRecorder.map(recordMemory) networkRecorder.map(recordNetwork) + diskRecorder.map(recordDisk) + loadAverageRecorder.map(recordLoadAverage) if (OsUtils.isLinux) contextSwitchesRecorder.map(recordContextSwitches) @@ -100,23 +108,60 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with } private def recordNetwork(nr: NetworkMetricRecorder) = { - nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getRxBytes))) - nr.txBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getTxBytes))) - nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors)) - nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors)) - nr.rxDropped.record(collect(sigar, interfaces)(net ⇒ net.getRxDropped)) - nr.txDropped.record(collect(sigar, interfaces)(net ⇒ net.getTxDropped)) - - def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = { - interfaces.foldLeft(0L) { (totalBytes, interface) ⇒ + import Networks._ + nr.rxBytes.record(collect(sigar, interfaces, RxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getRxBytes))) + nr.txBytes.record(collect(sigar, interfaces, TxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getTxBytes))) + nr.rxErrors.record(collect(sigar, interfaces, RxErrors, previousNetworkMetrics)(net ⇒ net.getRxErrors)) + nr.txErrors.record(collect(sigar, interfaces, TxErrors, previousNetworkMetrics)(net ⇒ net.getTxErrors)) + nr.rxDropped.record(collect(sigar, interfaces, RxDropped, previousNetworkMetrics)(net ⇒ net.getRxDropped)) + nr.txDropped.record(collect(sigar, interfaces, TxDropped, previousNetworkMetrics)(net ⇒ net.getTxDropped)) + + def collect(sigar: SigarProxy, interfaces: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: NetInterfaceStat ⇒ Long): Long = { + interfaces.foldLeft(0L) { (acc, interface) ⇒ { val net = sigar.getNetInterfaceStat(interface) - totalBytes + block(net) + val previous = previousMetrics.getOrElse(interface, mutable.Map.empty[String, Long]) + val current = thunk(net) + val delta = current - previous.getOrElse(name, 0L) + previousMetrics.put(interface, previous += name -> current) + acc + delta } } } } + private def recordDisk(rd: DiskMetricsRecorder) = { + import Disks._ + + rd.reads.record(collect(sigar, fileSystems, Reads, previousDiskMetrics)(disk ⇒ disk.getReads)) + rd.writes.record(collect(sigar, fileSystems, Writes, previousDiskMetrics)(disk ⇒ disk.getWrites)) + rd.queue.record(collect(sigar, fileSystems, Queue, previousDiskMetrics)(disk ⇒ toLong(disk.getQueue))) + rd.serviceTime.record(collect(sigar, fileSystems, Service, previousDiskMetrics)(disk ⇒ toLong(disk.getServiceTime))) + } + + def collect(sigar: SigarProxy, fileSystems: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: DiskUsage ⇒ Long): Long = { + fileSystems.foldLeft(0L) { (acc, fileSystem) ⇒ + { + val disk = sigar.getDiskUsage(fileSystem) + val previous = previousMetrics.getOrElse(fileSystem, mutable.Map.empty[String, Long]) + val value = thunk(disk) + val current = if (value == Sigar.FIELD_NOTIMPL) 0L else value + val delta = current - previous.getOrElse(name, 0L) + previousMetrics.put(fileSystem, previous += name -> current) + acc + delta + } + } + } + + private def recordLoadAverage(lar: LoadAverageMetricsRecorder) = { + val loadAverage = sigar.getLoadAverage + val (one, five, fifteen) = (loadAverage(0), loadAverage(1), loadAverage(2)) + + lar.one.record(toLong(one)) + lar.five.record(toLong(five)) + lar.fifteen.record(toLong(fifteen)) + } + private def recordContextSwitches(rcs: ContextSwitchesMetricsRecorder) = { def contextSwitchesByProcess(pid: Long): (Long, Long) = { val filename = s"/proc/$pid/status" @@ -167,12 +212,11 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with } def provisionSigarLibrary: Unit = { - val folder = context.system.settings.config.getString("kamon.sigar.folder") + val folder = SystemMetrics(context.system).sigarFolder SigarProvisioner.provision(new File(folder)) } def createSigarInstance: SigarProxy = { - // 1) Assume that library is already provisioned. try { return verifiedSigarInstance @@ -193,6 +237,25 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with } object SystemMetricsCollector { + val NetworkFilter = Set("lo") + val previousDiskMetrics = TrieMap[String, mutable.Map[String, Long]]() + val previousNetworkMetrics = TrieMap[String, mutable.Map[String, Long]]() + + object Networks { + val RxBytes = "rxBytes" + val TxBytes = "txBytes" + val RxErrors = "rxErrors" + val TxErrors = "txErrors" + val RxDropped = "rxDropped" + val TxDropped = "txDropped" + } + + object Disks { + val Reads = "reads" + val Writes = "writes" + val Queue = "queue" + val Service = "service" + } case object Collect object OsUtils { |