aboutsummaryrefslogtreecommitdiff
path: root/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala
blob: a5a2f411df5663c3355c8bafeba9acb1602fcb80 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
 * =========================================================================================
 * Copyright © 2013-2014 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */
package kamon.system

import akka.actor.{ Actor, Props }
import kamon.Kamon
import kamon.metric.Metrics
import kamon.metrics.CPUMetrics.CPUMetricRecorder
import kamon.metrics.MemoryMetrics.MemoryMetricRecorder
import kamon.metrics.NetworkMetrics.NetworkMetricRecorder
import kamon.metrics.ProcessCPUMetrics.ProcessCPUMetricsRecorder
import kamon.metrics.{ CPUMetrics, MemoryMetrics, NetworkMetrics, ProcessCPUMetrics }
import kamon.system.sigar.SigarHolder
import org.hyperic.sigar.{ Mem, NetInterfaceStat, SigarProxy }

import scala.concurrent.duration.FiniteDuration

class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with SigarExtensionProvider {
  import kamon.system.SystemMetricsCollector._
  import kamon.system.SystemMetricsExtension._

  val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher)

  val systemMetricsExtension = Kamon(Metrics)(context.system)

  val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
  val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
  val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
  val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)

  def receive: Receive = {
    case Collect   collectMetrics()
    case anything 
  }

  override def postStop() = collectSchedule.cancel()

  def collectMetrics() = {
    cpuRecorder.map(recordCpu)
    processCpuRecorder.map(recordProcessCpu)
    memoryRecorder.map(recordMemory)
    networkRecorder.map(recordNetwork)
  }

  private def recordCpu(cpur: CPUMetricRecorder) = {
    cpur.user.record(toLong(cpu.getUser))
    cpur.system.record(toLong(cpu.getSys))
    cpur.cpuWait.record(toLong(cpu.getWait()))
    cpur.idle.record(toLong(cpu.getIdle))
  }

  private def recordProcessCpu(pcpur: ProcessCPUMetricsRecorder) = {
    pcpur.user.record(procCpu.getUser)
    pcpur.system.record(procCpu.getSys)
  }

  private def recordMemory(mr: MemoryMetricRecorder) = {
    mr.used.record(toMB(mem.getUsed))
    mr.free.record(toMB(mem.getFree))
    mr.swapUsed.record(toMB(swap.getUsed))
    mr.swapFree.record(toMB(swap.getFree))
    mr.buffer.record(toMB(collectBuffer(mem)))
    mr.cache.record(toMB(collectCache(mem)))

    def collectBuffer(mem: Mem): Long = if (mem.getUsed() != mem.getActualUsed()) mem.getActualUsed() else 0L
    def collectCache(mem: Mem): Long = if (mem.getFree() != mem.getActualFree()) mem.getActualFree() else 0L
  }

  private def recordNetwork(nr: NetworkMetricRecorder) = {
    nr.rxBytes.record(collect(sigar, interfaces)(net  toKB(net.getRxBytes)))
    nr.txBytes.record(collect(sigar, interfaces)(net  toKB(net.getTxBytes)))
    nr.rxErrors.record(collect(sigar, interfaces)(net  net.getRxErrors))
    nr.txErrors.record(collect(sigar, interfaces)(net  net.getTxErrors))

    def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat  Long): Long = {
      interfaces.foldLeft(0L) { (totalBytes, interface) 
        {
          val net = sigar.getNetInterfaceStat(interface)
          totalBytes + block(net)
        }
      }
    }
  }
}

object SystemMetricsCollector {
  case object Collect

  def props(collectInterval: FiniteDuration): Props = Props[SystemMetricsCollector](new SystemMetricsCollector(collectInterval))
}

trait SigarExtensionProvider {
  lazy val sigar = SigarHolder.instance()

  def pid = sigar.getPid
  def procCpu = sigar.getProcCpu(pid)
  def cpu = sigar.getCpuPerc
  def mem = sigar.getMem
  def swap = sigar.getSwap

  val interfaces: Set[String] = sigar.getNetInterfaceList.toSet
}