1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
/*
* =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon.system
import akka.actor.{ Actor, Props }
import kamon.Kamon
import kamon.metric.Metrics
import kamon.metrics.CPUMetrics.CPUMetricRecorder
import kamon.metrics.MemoryMetrics.MemoryMetricRecorder
import kamon.metrics.NetworkMetrics.NetworkMetricRecorder
import kamon.metrics.ProcessCPUMetrics.ProcessCPUMetricsRecorder
import kamon.metrics.{ CPUMetrics, MemoryMetrics, NetworkMetrics, ProcessCPUMetrics }
import kamon.system.sigar.SigarHolder
import org.hyperic.sigar.{ Mem, NetInterfaceStat, SigarProxy }
import scala.concurrent.duration.FiniteDuration
class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with SigarExtensionProvider {
import kamon.system.SystemMetricsCollector._
import kamon.system.SystemMetricsExtension._
val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher)
val systemMetricsExtension = Kamon(Metrics)(context.system)
val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)
def receive: Receive = {
case Collect ⇒ collectMetrics()
}
override def postStop() = collectSchedule.cancel()
def collectMetrics() = {
cpuRecorder.map(recordCpu)
processCpuRecorder.map(recordProcessCpu)
memoryRecorder.map(recordMemory)
networkRecorder.map(recordNetwork)
}
private def recordCpu(cpur: CPUMetricRecorder) = {
val cpuPerc = sigar.getCpuPerc
cpur.user.record(toLong(cpuPerc.getUser))
cpur.system.record(toLong(cpuPerc.getSys))
cpur.cpuWait.record(toLong(cpuPerc.getWait))
cpur.idle.record(toLong(cpuPerc.getIdle))
}
private def recordProcessCpu(pcpur: ProcessCPUMetricsRecorder) = {
val procCpu = sigar.getProcCpu(pid)
val procTime = sigar.getProcTime(pid)
pcpur.cpuPercent.record(toLong(procCpu.getPercent))
pcpur.totalProcessTime.record(procTime.getTotal) // gives an idea of what is really measured and then interpreted as %
}
private def recordMemory(mr: MemoryMetricRecorder) = {
val mem = sigar.getMem
val swap = sigar.getSwap
mr.used.record(toMB(mem.getUsed))
mr.free.record(toMB(mem.getFree))
mr.swapUsed.record(toMB(swap.getUsed))
mr.swapFree.record(toMB(swap.getFree))
mr.buffer.record(toMB(collectBuffer(mem)))
mr.cache.record(toMB(collectCache(mem)))
def collectBuffer(mem: Mem): Long = if (mem.getUsed() != mem.getActualUsed()) mem.getActualUsed() else 0L
def collectCache(mem: Mem): Long = if (mem.getFree() != mem.getActualFree()) mem.getActualFree() else 0L
}
private def recordNetwork(nr: NetworkMetricRecorder) = {
nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getRxBytes)))
nr.txBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getTxBytes)))
nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors))
nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors))
def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = {
interfaces.foldLeft(0L) { (totalBytes, interface) ⇒
{
val net = sigar.getNetInterfaceStat(interface)
totalBytes + block(net)
}
}
}
}
}
object SystemMetricsCollector {
case object Collect
def props(collectInterval: FiniteDuration): Props = Props[SystemMetricsCollector](new SystemMetricsCollector(collectInterval))
}
trait SigarExtensionProvider {
lazy val sigar = SigarHolder.instance()
def pid = sigar.getPid
val interfaces: Set[String] = sigar.getNetInterfaceList.toSet
}
|