aboutsummaryrefslogtreecommitdiff
path: root/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala')
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala107
1 files changed, 85 insertions, 22 deletions
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala
index c200091e..4391240a 100644
--- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala
@@ -22,16 +22,19 @@ import kamon.Kamon
import kamon.metric.Metrics
import kamon.metrics.CPUMetrics.CPUMetricRecorder
import kamon.metrics.ContextSwitchesMetrics.ContextSwitchesMetricsRecorder
+import kamon.metrics.DiskMetrics.DiskMetricsRecorder
+import kamon.metrics.LoadAverageMetrics.LoadAverageMetricsRecorder
import kamon.metrics.MemoryMetrics.MemoryMetricRecorder
import kamon.metrics.NetworkMetrics.NetworkMetricRecorder
import kamon.metrics.ProcessCPUMetrics.ProcessCPUMetricsRecorder
import kamon.metrics._
import kamon.sigar.SigarProvisioner
-import org.hyperic.sigar.{ Sigar, Mem, NetInterfaceStat, SigarProxy }
+import org.hyperic.sigar._
+import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.FiniteDuration
import scala.io.Source
-import scala.util.control.NonFatal
+import scala.collection.mutable
class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with ActorLogging with SystemMetricsBanner {
import kamon.system.SystemMetricsCollector._
@@ -40,16 +43,19 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
lazy val sigar = createSigarInstance
def pid = sigar.getPid
- val interfaces: Set[String] = sigar.getNetInterfaceList.toSet
+ val interfaces = sigar.getNetInterfaceList.filterNot(NetworkFilter).toSet
+ val fileSystems = sigar.getFileSystemList.filter(_.getType == FileSystem.TYPE_LOCAL_DISK).map(_.getDevName).toSet
- val systemMetricsExtension = Kamon(Metrics)(context.system)
- val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher)
+ val metricExtension = Kamon(Metrics)(context.system)
+ val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(SystemMetrics(context.system).dispatcher)
- val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
- val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
- val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
- val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)
- val contextSwitchesRecorder = systemMetricsExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory)
+ val cpuRecorder = metricExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
+ val processCpuRecorder = metricExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
+ val memoryRecorder = metricExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
+ val networkRecorder = metricExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)
+ val contextSwitchesRecorder = metricExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory)
+ val diskRecorder = metricExtension.register(DiskMetrics(Disk), DiskMetrics.Factory)
+ val loadAverageRecorder = metricExtension.register(LoadAverageMetrics(LoadAverage), LoadAverageMetrics.Factory)
def receive: Receive = {
case Collect ⇒ collectMetrics()
@@ -62,6 +68,8 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
processCpuRecorder.map(recordProcessCpu)
memoryRecorder.map(recordMemory)
networkRecorder.map(recordNetwork)
+ diskRecorder.map(recordDisk)
+ loadAverageRecorder.map(recordLoadAverage)
if (OsUtils.isLinux)
contextSwitchesRecorder.map(recordContextSwitches)
@@ -100,23 +108,60 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
}
private def recordNetwork(nr: NetworkMetricRecorder) = {
- nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getRxBytes)))
- nr.txBytes.record(collect(sigar, interfaces)(net ⇒ toKB(net.getTxBytes)))
- nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors))
- nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors))
- nr.rxDropped.record(collect(sigar, interfaces)(net ⇒ net.getRxDropped))
- nr.txDropped.record(collect(sigar, interfaces)(net ⇒ net.getTxDropped))
-
- def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = {
- interfaces.foldLeft(0L) { (totalBytes, interface) ⇒
+ import Networks._
+ nr.rxBytes.record(collect(sigar, interfaces, RxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getRxBytes)))
+ nr.txBytes.record(collect(sigar, interfaces, TxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getTxBytes)))
+ nr.rxErrors.record(collect(sigar, interfaces, RxErrors, previousNetworkMetrics)(net ⇒ net.getRxErrors))
+ nr.txErrors.record(collect(sigar, interfaces, TxErrors, previousNetworkMetrics)(net ⇒ net.getTxErrors))
+ nr.rxDropped.record(collect(sigar, interfaces, RxDropped, previousNetworkMetrics)(net ⇒ net.getRxDropped))
+ nr.txDropped.record(collect(sigar, interfaces, TxDropped, previousNetworkMetrics)(net ⇒ net.getTxDropped))
+
+ def collect(sigar: SigarProxy, interfaces: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: NetInterfaceStat ⇒ Long): Long = {
+ interfaces.foldLeft(0L) { (acc, interface) ⇒
{
val net = sigar.getNetInterfaceStat(interface)
- totalBytes + block(net)
+ val previous = previousMetrics.getOrElse(interface, mutable.Map.empty[String, Long])
+ val current = thunk(net)
+ val delta = current - previous.getOrElse(name, 0L)
+ previousMetrics.put(interface, previous += name -> current)
+ acc + delta
}
}
}
}
+ private def recordDisk(rd: DiskMetricsRecorder) = {
+ import Disks._
+
+ rd.reads.record(collect(sigar, fileSystems, Reads, previousDiskMetrics)(disk ⇒ disk.getReads))
+ rd.writes.record(collect(sigar, fileSystems, Writes, previousDiskMetrics)(disk ⇒ disk.getWrites))
+ rd.queue.record(collect(sigar, fileSystems, Queue, previousDiskMetrics)(disk ⇒ toLong(disk.getQueue)))
+ rd.serviceTime.record(collect(sigar, fileSystems, Service, previousDiskMetrics)(disk ⇒ toLong(disk.getServiceTime)))
+ }
+
+ def collect(sigar: SigarProxy, fileSystems: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: DiskUsage ⇒ Long): Long = {
+ fileSystems.foldLeft(0L) { (acc, fileSystem) ⇒
+ {
+ val disk = sigar.getDiskUsage(fileSystem)
+ val previous = previousMetrics.getOrElse(fileSystem, mutable.Map.empty[String, Long])
+ val value = thunk(disk)
+ val current = if (value == Sigar.FIELD_NOTIMPL) 0L else value
+ val delta = current - previous.getOrElse(name, 0L)
+ previousMetrics.put(fileSystem, previous += name -> current)
+ acc + delta
+ }
+ }
+ }
+
+ private def recordLoadAverage(lar: LoadAverageMetricsRecorder) = {
+ val loadAverage = sigar.getLoadAverage
+ val (one, five, fifteen) = (loadAverage(0), loadAverage(1), loadAverage(2))
+
+ lar.one.record(toLong(one))
+ lar.five.record(toLong(five))
+ lar.fifteen.record(toLong(fifteen))
+ }
+
private def recordContextSwitches(rcs: ContextSwitchesMetricsRecorder) = {
def contextSwitchesByProcess(pid: Long): (Long, Long) = {
val filename = s"/proc/$pid/status"
@@ -167,12 +212,11 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
}
def provisionSigarLibrary: Unit = {
- val folder = context.system.settings.config.getString("kamon.sigar.folder")
+ val folder = SystemMetrics(context.system).sigarFolder
SigarProvisioner.provision(new File(folder))
}
def createSigarInstance: SigarProxy = {
-
// 1) Assume that library is already provisioned.
try {
return verifiedSigarInstance
@@ -193,6 +237,25 @@ class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with
}
object SystemMetricsCollector {
+ val NetworkFilter = Set("lo")
+ val previousDiskMetrics = TrieMap[String, mutable.Map[String, Long]]()
+ val previousNetworkMetrics = TrieMap[String, mutable.Map[String, Long]]()
+
+ object Networks {
+ val RxBytes = "rxBytes"
+ val TxBytes = "txBytes"
+ val RxErrors = "rxErrors"
+ val TxErrors = "txErrors"
+ val RxDropped = "rxDropped"
+ val TxDropped = "txDropped"
+ }
+
+ object Disks {
+ val Reads = "reads"
+ val Writes = "writes"
+ val Queue = "queue"
+ val Service = "service"
+ }
case object Collect
object OsUtils {