diff options
Diffstat (limited to 'kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala')
-rw-r--r-- | kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala | 93 |
1 files changed, 88 insertions, 5 deletions
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala index bd156c2a..15566d7b 100644 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala +++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala @@ -22,8 +22,12 @@ import akka.event.Logging import kamon.Kamon import kamon.metric.Metrics import kamon.metrics._ +import kamon.system.SystemMetricsCollectorActor.Collect +import org.hyperic.sigar.{ NetInterfaceStat, SigarProxy, Mem } +import scala.concurrent.duration._ import scala.collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration object SystemMetrics extends ExtensionId[SystemMetricsExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = SystemMetrics @@ -40,13 +44,12 @@ class SystemMetricsExtension(private val system: ExtendedActorSystem) extends Ka val systemMetricsExtension = Kamon(Metrics)(system) - systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) - systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) - systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) - systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) + //JVM Metrics systemMetricsExtension.register(HeapMetrics(Heap), HeapMetrics.Factory) - garbageCollectors.map { gc ⇒ systemMetricsExtension.register(GCMetrics(gc.getName), GCMetrics.Factory(gc)) } + + //System Metrics + system.actorOf(SystemMetricsCollectorActor.props(1 second), "system-metrics-collector") } object SystemMetricsExtension { @@ -63,3 +66,83 @@ trait SigarExtensionProvider { lazy val sigar = SigarHolder.instance() } +class SystemMetricsCollectorActor(collectInterval: FiniteDuration) extends Actor with SigarExtensionProvider { + import kamon.system.SystemMetricsExtension._ + + val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher) + + val systemMetricsExtension = Kamon(Metrics)(context.system) + + val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) + val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) + val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) + val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) + + def receive: Receive = { + case Collect ⇒ collectMetrics() + case anything ⇒ + } + + override def postStop() = collectSchedule.cancel() + + def pid = sigar.getPid + def procCpu = sigar.getProcCpu(pid) + def cpu = sigar.getCpuPerc + def mem = sigar.getMem + def swap = sigar.getSwap + + val interfaces: Set[String] = sigar.getNetInterfaceList.toSet + + def collectMetrics() = { + cpuRecorder.map { + cpur ⇒ + cpur.user.record((cpu.getUser * 100L).toLong) + cpur.system.record((cpu.getSys * 100L).toLong) + cpur.cpuWait.record((cpu.getWait() * 100L).toLong) + cpur.idle.record((cpu.getIdle * 100L).toLong) + } + + processCpuRecorder.map { + pcpur ⇒ + pcpur.user.record(procCpu.getUser) + pcpur.system.record(procCpu.getSys) + } + + memoryRecorder.map { + mr ⇒ + mr.used.record(mem.getUsed) + mr.free.record(mem.getFree) + mr.swapUsed.record(swap.getUsed) + mr.swapFree.record(swap.getFree) + mr.buffer.record(collectBuffer(mem)) + mr.cache.record(collectCache(mem)) + } + + networkRecorder.map { + nr ⇒ + nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ net.getRxBytes)) + nr.txBytes.record(collect(sigar, interfaces)(net ⇒ net.getTxBytes)) + nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors)) + nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors)) + } + } + + private def collectBuffer(mem: Mem): Long = if (mem.getUsed() != mem.getActualUsed()) mem.getActualUsed() else 0L + private def collectCache(mem: Mem): Long = if (mem.getFree() != mem.getActualFree()) mem.getActualFree() else 0L + + private def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = { + interfaces.foldLeft(0L) { (totalBytes, interface) ⇒ + { + val net = sigar.getNetInterfaceStat(interface) + totalBytes + block(net) + } + } + } +} + +object SystemMetricsCollectorActor { + case object Collect + + def props(collectInterval: FiniteDuration): Props = + Props[SystemMetricsCollectorActor](new SystemMetricsCollectorActor(collectInterval)) +}
\ No newline at end of file |