aboutsummaryrefslogtreecommitdiff
path: root/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala')
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala93
1 files changed, 88 insertions, 5 deletions
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
index bd156c2a..15566d7b 100644
--- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
@@ -22,8 +22,12 @@ import akka.event.Logging
import kamon.Kamon
import kamon.metric.Metrics
import kamon.metrics._
+import kamon.system.SystemMetricsCollectorActor.Collect
+import org.hyperic.sigar.{ NetInterfaceStat, SigarProxy, Mem }
+import scala.concurrent.duration._
import scala.collection.JavaConverters._
+import scala.concurrent.duration.FiniteDuration
object SystemMetrics extends ExtensionId[SystemMetricsExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = SystemMetrics
@@ -40,13 +44,12 @@ class SystemMetricsExtension(private val system: ExtendedActorSystem) extends Ka
val systemMetricsExtension = Kamon(Metrics)(system)
- systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
- systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
- systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)
- systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
+ //JVM Metrics
systemMetricsExtension.register(HeapMetrics(Heap), HeapMetrics.Factory)
-
garbageCollectors.map { gc ⇒ systemMetricsExtension.register(GCMetrics(gc.getName), GCMetrics.Factory(gc)) }
+
+ //System Metrics
+ system.actorOf(SystemMetricsCollectorActor.props(1 second), "system-metrics-collector")
}
object SystemMetricsExtension {
@@ -63,3 +66,83 @@ trait SigarExtensionProvider {
lazy val sigar = SigarHolder.instance()
}
+class SystemMetricsCollectorActor(collectInterval: FiniteDuration) extends Actor with SigarExtensionProvider {
+ import kamon.system.SystemMetricsExtension._
+
+ val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher)
+
+ val systemMetricsExtension = Kamon(Metrics)(context.system)
+
+ val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
+ val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
+ val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
+ val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)
+
+ def receive: Receive = {
+ case Collect ⇒ collectMetrics()
+ case anything ⇒
+ }
+
+ override def postStop() = collectSchedule.cancel()
+
+ def pid = sigar.getPid
+ def procCpu = sigar.getProcCpu(pid)
+ def cpu = sigar.getCpuPerc
+ def mem = sigar.getMem
+ def swap = sigar.getSwap
+
+ val interfaces: Set[String] = sigar.getNetInterfaceList.toSet
+
+ def collectMetrics() = {
+ cpuRecorder.map {
+ cpur ⇒
+ cpur.user.record((cpu.getUser * 100L).toLong)
+ cpur.system.record((cpu.getSys * 100L).toLong)
+ cpur.cpuWait.record((cpu.getWait() * 100L).toLong)
+ cpur.idle.record((cpu.getIdle * 100L).toLong)
+ }
+
+ processCpuRecorder.map {
+ pcpur ⇒
+ pcpur.user.record(procCpu.getUser)
+ pcpur.system.record(procCpu.getSys)
+ }
+
+ memoryRecorder.map {
+ mr ⇒
+ mr.used.record(mem.getUsed)
+ mr.free.record(mem.getFree)
+ mr.swapUsed.record(swap.getUsed)
+ mr.swapFree.record(swap.getFree)
+ mr.buffer.record(collectBuffer(mem))
+ mr.cache.record(collectCache(mem))
+ }
+
+ networkRecorder.map {
+ nr ⇒
+ nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ net.getRxBytes))
+ nr.txBytes.record(collect(sigar, interfaces)(net ⇒ net.getTxBytes))
+ nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors))
+ nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors))
+ }
+ }
+
+ private def collectBuffer(mem: Mem): Long = if (mem.getUsed() != mem.getActualUsed()) mem.getActualUsed() else 0L
+ private def collectCache(mem: Mem): Long = if (mem.getFree() != mem.getActualFree()) mem.getActualFree() else 0L
+
+ private def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = {
+ interfaces.foldLeft(0L) { (totalBytes, interface) ⇒
+ {
+ val net = sigar.getNetInterfaceStat(interface)
+ totalBytes + block(net)
+ }
+ }
+ }
+}
+
+object SystemMetricsCollectorActor {
+ case object Collect
+
+ def props(collectInterval: FiniteDuration): Props =
+ Props[SystemMetricsCollectorActor](new SystemMetricsCollectorActor(collectInterval))
+} \ No newline at end of file