/* * ========================================================================================= * Copyright © 2013-2014 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. * ========================================================================================= */ package kamon.system import java.lang.management.ManagementFactory import akka.actor._ import akka.event.Logging import kamon.Kamon import kamon.metric.Metrics import kamon.metrics._ import kamon.system.SystemMetricsCollectorActor.Collect import org.hyperic.sigar.{ NetInterfaceStat, SigarProxy, Mem } import scala.concurrent.duration._ import scala.collection.JavaConverters._ import scala.concurrent.duration.FiniteDuration object SystemMetrics extends ExtensionId[SystemMetricsExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = SystemMetrics override def createExtension(system: ExtendedActorSystem): SystemMetricsExtension = new SystemMetricsExtension(system) } class SystemMetricsExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { import kamon.system.SystemMetricsExtension._ val log = Logging(system, classOf[SystemMetricsExtension]) log.info(s"Starting the Kamon(SystemMetrics) extension") val systemMetricsExtension = Kamon(Metrics)(system) //JVM Metrics systemMetricsExtension.register(HeapMetrics(Heap), HeapMetrics.Factory) garbageCollectors.map { gc ⇒ systemMetricsExtension.register(GCMetrics(gc.getName), GCMetrics.Factory(gc)) } //System Metrics system.actorOf(SystemMetricsCollectorActor.props(1 second), "system-metrics-collector") } object SystemMetricsExtension { val CPU = "cpu" val ProcessCPU = "process-cpu" val Network = "network" val Memory = "memory" val Heap = "heap" val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans.asScala.filter(_.isValid) } trait SigarExtensionProvider { lazy val sigar = SigarHolder.instance() } class SystemMetricsCollectorActor(collectInterval: FiniteDuration) extends Actor with SigarExtensionProvider { import kamon.system.SystemMetricsExtension._ val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher) val systemMetricsExtension = Kamon(Metrics)(context.system) val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) def receive: Receive = { case Collect ⇒ collectMetrics() case anything ⇒ } override def postStop() = collectSchedule.cancel() def pid = sigar.getPid def procCpu = sigar.getProcCpu(pid) def cpu = sigar.getCpuPerc def mem = sigar.getMem def swap = sigar.getSwap val interfaces: Set[String] = sigar.getNetInterfaceList.toSet def collectMetrics() = { cpuRecorder.map { cpur ⇒ cpur.user.record((cpu.getUser * 100L).toLong) cpur.system.record((cpu.getSys * 100L).toLong) cpur.cpuWait.record((cpu.getWait() * 100L).toLong) cpur.idle.record((cpu.getIdle * 100L).toLong) } processCpuRecorder.map { pcpur ⇒ pcpur.user.record(procCpu.getUser) pcpur.system.record(procCpu.getSys) } memoryRecorder.map { mr ⇒ mr.used.record(mem.getUsed) mr.free.record(mem.getFree) mr.swapUsed.record(swap.getUsed) mr.swapFree.record(swap.getFree) mr.buffer.record(collectBuffer(mem)) mr.cache.record(collectCache(mem)) } networkRecorder.map { nr ⇒ nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ net.getRxBytes)) nr.txBytes.record(collect(sigar, interfaces)(net ⇒ net.getTxBytes)) nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors)) nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors)) } } private def collectBuffer(mem: Mem): Long = if (mem.getUsed() != mem.getActualUsed()) mem.getActualUsed() else 0L private def collectCache(mem: Mem): Long = if (mem.getFree() != mem.getActualFree()) mem.getActualFree() else 0L private def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = { interfaces.foldLeft(0L) { (totalBytes, interface) ⇒ { val net = sigar.getNetInterfaceStat(interface) totalBytes + block(net) } } } } object SystemMetricsCollectorActor { case object Collect def props(collectInterval: FiniteDuration): Props = Props[SystemMetricsCollectorActor](new SystemMetricsCollectorActor(collectInterval)) }