From 01a34f67ff75419c440f2e69c0a0db888a670a34 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 12 Jan 2015 01:45:27 +0100 Subject: ! all: improve the metric recorders infrastructure --- .../scala/kamon/system/GcMetricsCollector.scala | 77 ------ .../main/scala/kamon/system/SystemMetrics.scala | 78 ------ .../scala/kamon/system/SystemMetricsBanner.scala | 91 ------- .../kamon/system/SystemMetricsCollector.scala | 266 --------------------- .../kamon/system/SystemMetricsExtension.scala | 70 ++++++ .../system/custom/ContextSwitchesMetrics.scala | 96 ++++++++ .../kamon/system/jmx/ClassLoadingMetrics.scala | 28 +++ .../system/jmx/GarbageCollectionMetrics.scala | 34 +++ .../scala/kamon/system/jmx/HeapMemoryMetrics.scala | 29 +++ .../jmx/JmxSystemMetricRecorderCompanion.scala | 13 + .../kamon/system/jmx/NonHeapMemoryMetrics.scala | 33 +++ .../scala/kamon/system/jmx/ThreadsMetrics.scala | 28 +++ .../main/scala/kamon/system/sigar/CpuMetrics.scala | 29 +++ .../system/sigar/DiffRecordingHistogram.scala | 41 ++++ .../kamon/system/sigar/FileSystemMetrics.scala | 25 ++ .../kamon/system/sigar/LoadAverageMetrics.scala | 25 ++ .../scala/kamon/system/sigar/MemoryMetrics.scala | 36 +++ .../scala/kamon/system/sigar/NetworkMetrics.scala | 33 +++ .../kamon/system/sigar/ProcessCpuMetrics.scala | 39 +++ .../kamon/system/sigar/SigarMetricsUpdater.scala | 59 +++++ 20 files changed, 618 insertions(+), 512 deletions(-) delete mode 100644 kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala delete mode 100644 kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala delete mode 100644 kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsBanner.scala delete mode 100644 kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/custom/ContextSwitchesMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/jmx/ClassLoadingMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/jmx/GarbageCollectionMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/jmx/HeapMemoryMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/jmx/JmxSystemMetricRecorderCompanion.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/jmx/NonHeapMemoryMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/jmx/ThreadsMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/sigar/CpuMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/sigar/DiffRecordingHistogram.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/sigar/FileSystemMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/sigar/LoadAverageMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/sigar/MemoryMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/sigar/NetworkMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/sigar/ProcessCpuMetrics.scala create mode 100644 kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarMetricsUpdater.scala (limited to 'kamon-system-metrics/src/main/scala/kamon/system') diff --git a/kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala b/kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala deleted file mode 100644 index ae2f50cf..00000000 --- a/kamon-system-metrics/src/main/scala/kamon/system/GcMetricsCollector.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.system - -import java.lang.management.GarbageCollectorMXBean - -import akka.actor.{ Actor, Props } -import kamon.metrics.GCMetrics.GCMetricRecorder - -import scala.concurrent.duration.FiniteDuration - -class GcMetricsCollector(collectInterval: FiniteDuration, recorder: Option[GCMetricRecorder], extractor: GcMetricExtractor) extends Actor { - import kamon.system.GcMetricsCollector._ - - val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(SystemMetrics(context.system).dispatcher) - - def receive: Receive = { - case Collect ⇒ collectMetrics() - } - - override def postStop() = collectSchedule.cancel() - - def collectMetrics(): Unit = recorder.map(recordGc) - - private def recordGc(gcr: GCMetricRecorder) = { - val gcMeasure = extractor.extract() - - gcr.count.record(gcMeasure.collectionCount) - gcr.time.record(gcMeasure.collectionTime) - } -} - -object GcMetricsCollector { - case object Collect - - def props(collectInterval: FiniteDuration, recorder: Option[GCMetricRecorder], extractor: GcMetricExtractor): Props = Props(classOf[GcMetricsCollector], collectInterval, recorder, extractor) -} - -case class GcMeasure(collectionCount: Long, collectionTime: Long) - -case class GcMetricExtractor(gc: GarbageCollectorMXBean) { - var previousGcCount = 0L - var previousGcTime = 0L - - def extract(): GcMeasure = { - var diffCollectionCount = 0L - var diffCollectionTime = 0L - - val collectionCount = gc.getCollectionCount - val collectionTime = gc.getCollectionTime - - if (collectionCount > 0) - diffCollectionCount = collectionCount - previousGcCount - - if (collectionTime > 0) - diffCollectionTime = collectionTime - previousGcTime - - previousGcCount = collectionCount - previousGcTime = collectionTime - - GcMeasure(diffCollectionCount, diffCollectionTime) - } -} \ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala deleted file mode 100644 index cb3e2695..00000000 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.system - -import java.lang.management.ManagementFactory -import akka.actor._ -import akka.event.Logging -import kamon.Kamon -import kamon.metric.Metrics -import kamon.metrics._ -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ - -object SystemMetrics extends ExtensionId[SystemMetricsExtension] with ExtensionIdProvider { - override def lookup(): ExtensionId[_ <: Extension] = SystemMetrics - override def createExtension(system: ExtendedActorSystem): SystemMetricsExtension = new SystemMetricsExtension(system) -} - -class SystemMetricsExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { - import kamon.system.SystemMetricsExtension._ - - val log = Logging(system, classOf[SystemMetricsExtension]) - log.info(s"Starting the Kamon(SystemMetrics) extension") - - val config = system.settings.config.getConfig("kamon.system-metrics") - val dispatcher = system.dispatchers.lookup(config.getString("dispatcher")) - val sigarFolder = system.settings.config.getString("kamon.sigar.folder") - val systemMetricsExtension = Kamon(Metrics)(system) - - //System Metrics - system.actorOf(SystemMetricsCollector.props(1 second), "system-metrics-collector") - - //JVM Metrics - systemMetricsExtension.register(HeapMetrics(Heap), HeapMetrics.Factory) - systemMetricsExtension.register(NonHeapMetrics(NonHeap), NonHeapMetrics.Factory) - systemMetricsExtension.register(ClassLoadingMetrics(Classes), ClassLoadingMetrics.Factory) - systemMetricsExtension.register(ThreadMetrics(Threads), ThreadMetrics.Factory) - - garbageCollectors.map { gc ⇒ - val gcName = sanitize(gc.getName) - val recorder = systemMetricsExtension.register(GCMetrics(gcName), GCMetrics.Factory(gc)) - system.actorOf(GcMetricsCollector.props(1 second, recorder, GcMetricExtractor(gc)), s"$gcName-collector") - } -} - -object SystemMetricsExtension { - val CPU = "cpu" - val ProcessCPU = "process-cpu" - val Network = "network" - val Memory = "memory" - val Heap = "heap" - val NonHeap = "non-heap" - val Classes = "classes" - val Threads = "thread" - val ContextSwitches = "context-switches" - val Disk = "disk" - val LoadAverage = "load-average" - - def toKB(value: Long): Long = value / 1024 - def toMB(value: Long): Long = value / 1024 / 1024 - def toLong(value: Double): Long = math round (value * 100L) - def sanitize(str: String): String = str.replaceAll("""[^\w]""", "-") - - val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans.asScala.filter(_.isValid) -} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsBanner.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsBanner.scala deleted file mode 100644 index 99e09da9..00000000 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsBanner.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.system - -import akka.actor.ActorLogging -import org.hyperic.sigar._ - -import scala.util.control.NoStackTrace - -trait SystemMetricsBanner { - self: ActorLogging ⇒ - - def printBanner(sigar: Sigar) = { - val os = OperatingSystem.getInstance - - def loadAverage(sigar: Sigar) = try { - val average = sigar.getLoadAverage - (average(0), average(1), average(2)) - } catch { - case s: org.hyperic.sigar.SigarNotImplementedException ⇒ (0d, 0d, 0d) - } - - def uptime(sigar: Sigar) = { - def formatUptime(uptime: Double): String = { - var retval: String = "" - val days: Int = uptime.toInt / (60 * 60 * 24) - var minutes: Int = 0 - var hours: Int = 0 - - if (days != 0) { - retval += s"$days ${if (days > 1) "days" else "day"}, " - } - - minutes = uptime.toInt / 60 - hours = minutes / 60 - hours %= 24 - minutes %= 60 - - if (hours != 0) { - retval += hours + ":" + minutes - } else { - retval += minutes + " min" - } - retval - } - - val uptime = sigar.getUptime - val now = System.currentTimeMillis() - - s"up ${formatUptime(uptime.getUptime)}" - } - - val message = - """ - | - | _____ _ __ __ _ _ _ _ _ - | / ____| | | | \/ | | | (_) | | | | | | - || (___ _ _ ___| |_ ___ _ __ ___ | \ / | ___| |_ _ __ _ ___ ___| | ___ __ _ __| | ___ __| | - | \___ \| | | / __| __/ _ \ '_ ` _ \| |\/| |/ _ \ __| '__| |/ __/ __| | / _ \ / _` |/ _` |/ _ \/ _` | - | ____) | |_| \__ \ || __/ | | | | | | | | __/ |_| | | | (__\__ \ |___| (_) | (_| | (_| | __/ (_| | - ||_____/ \__, |___/\__\___|_| |_| |_|_| |_|\___|\__|_| |_|\___|___/______\___/ \__,_|\__,_|\___|\__,_| - | __/ | - | |___/ - | - | [System Status] [OS Information] - | |--------------------------------| |----------------------------------------| - | Up Time: %-10s Description: %s - | Load Average: %-16s Name: %s - | Version: %s - | Arch: %s - | - """.stripMargin.format(uptime(sigar), os.getDescription, loadAverage(sigar), os.getName, os.getVersion, os.getArch) - log.info(message) - } - - class UnexpectedSigarException(message: String) extends RuntimeException(message) with NoStackTrace -} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala deleted file mode 100644 index 4391240a..00000000 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsCollector.scala +++ /dev/null @@ -1,266 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.system - -import java.io.{ File, IOException } - -import akka.actor.{ Actor, ActorLogging, Props } -import kamon.Kamon -import kamon.metric.Metrics -import kamon.metrics.CPUMetrics.CPUMetricRecorder -import kamon.metrics.ContextSwitchesMetrics.ContextSwitchesMetricsRecorder -import kamon.metrics.DiskMetrics.DiskMetricsRecorder -import kamon.metrics.LoadAverageMetrics.LoadAverageMetricsRecorder -import kamon.metrics.MemoryMetrics.MemoryMetricRecorder -import kamon.metrics.NetworkMetrics.NetworkMetricRecorder -import kamon.metrics.ProcessCPUMetrics.ProcessCPUMetricsRecorder -import kamon.metrics._ -import kamon.sigar.SigarProvisioner -import org.hyperic.sigar._ - -import scala.collection.concurrent.TrieMap -import scala.concurrent.duration.FiniteDuration -import scala.io.Source -import scala.collection.mutable - -class SystemMetricsCollector(collectInterval: FiniteDuration) extends Actor with ActorLogging with SystemMetricsBanner { - import kamon.system.SystemMetricsCollector._ - import kamon.system.SystemMetricsExtension._ - - lazy val sigar = createSigarInstance - def pid = sigar.getPid - - val interfaces = sigar.getNetInterfaceList.filterNot(NetworkFilter).toSet - val fileSystems = sigar.getFileSystemList.filter(_.getType == FileSystem.TYPE_LOCAL_DISK).map(_.getDevName).toSet - - val metricExtension = Kamon(Metrics)(context.system) - val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(SystemMetrics(context.system).dispatcher) - - val cpuRecorder = metricExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) - val processCpuRecorder = metricExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) - val memoryRecorder = metricExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) - val networkRecorder = metricExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) - val contextSwitchesRecorder = metricExtension.register(ContextSwitchesMetrics(ContextSwitches), ContextSwitchesMetrics.Factory) - val diskRecorder = metricExtension.register(DiskMetrics(Disk), DiskMetrics.Factory) - val loadAverageRecorder = metricExtension.register(LoadAverageMetrics(LoadAverage), LoadAverageMetrics.Factory) - - def receive: Receive = { - case Collect ⇒ collectMetrics() - } - - override def postStop() = collectSchedule.cancel() - - def collectMetrics() = { - cpuRecorder.map(recordCpu) - processCpuRecorder.map(recordProcessCpu) - memoryRecorder.map(recordMemory) - networkRecorder.map(recordNetwork) - diskRecorder.map(recordDisk) - loadAverageRecorder.map(recordLoadAverage) - - if (OsUtils.isLinux) - contextSwitchesRecorder.map(recordContextSwitches) - } - - private def recordCpu(cpur: CPUMetricRecorder) = { - val cpuPerc = sigar.getCpuPerc - cpur.user.record(toLong(cpuPerc.getUser)) - cpur.system.record(toLong(cpuPerc.getSys)) - cpur.cpuWait.record(toLong(cpuPerc.getWait)) - cpur.idle.record(toLong(cpuPerc.getIdle)) - cpur.stolen.record(toLong(cpuPerc.getStolen)) - } - - private def recordProcessCpu(pcpur: ProcessCPUMetricsRecorder) = { - val procCpu = sigar.getProcCpu(pid) - val procTime = sigar.getProcTime(pid) - - pcpur.cpuPercent.record(toLong(procCpu.getPercent)) - pcpur.totalProcessTime.record(procTime.getTotal) // gives an idea of what is really measured and then interpreted as % - } - - private def recordMemory(mr: MemoryMetricRecorder) = { - val mem = sigar.getMem - val swap = sigar.getSwap - - mr.used.record(toMB(mem.getUsed)) - mr.free.record(toMB(mem.getFree)) - mr.swapUsed.record(toMB(swap.getUsed)) - mr.swapFree.record(toMB(swap.getFree)) - mr.buffer.record(toMB(collectBuffer(mem))) - mr.cache.record(toMB(collectCache(mem))) - - def collectBuffer(mem: Mem): Long = if (mem.getUsed != mem.getActualUsed) mem.getActualUsed else 0L - def collectCache(mem: Mem): Long = if (mem.getFree != mem.getActualFree) mem.getActualFree else 0L - } - - private def recordNetwork(nr: NetworkMetricRecorder) = { - import Networks._ - nr.rxBytes.record(collect(sigar, interfaces, RxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getRxBytes))) - nr.txBytes.record(collect(sigar, interfaces, TxBytes, previousNetworkMetrics)(net ⇒ toKB(net.getTxBytes))) - nr.rxErrors.record(collect(sigar, interfaces, RxErrors, previousNetworkMetrics)(net ⇒ net.getRxErrors)) - nr.txErrors.record(collect(sigar, interfaces, TxErrors, previousNetworkMetrics)(net ⇒ net.getTxErrors)) - nr.rxDropped.record(collect(sigar, interfaces, RxDropped, previousNetworkMetrics)(net ⇒ net.getRxDropped)) - nr.txDropped.record(collect(sigar, interfaces, TxDropped, previousNetworkMetrics)(net ⇒ net.getTxDropped)) - - def collect(sigar: SigarProxy, interfaces: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: NetInterfaceStat ⇒ Long): Long = { - interfaces.foldLeft(0L) { (acc, interface) ⇒ - { - val net = sigar.getNetInterfaceStat(interface) - val previous = previousMetrics.getOrElse(interface, mutable.Map.empty[String, Long]) - val current = thunk(net) - val delta = current - previous.getOrElse(name, 0L) - previousMetrics.put(interface, previous += name -> current) - acc + delta - } - } - } - } - - private def recordDisk(rd: DiskMetricsRecorder) = { - import Disks._ - - rd.reads.record(collect(sigar, fileSystems, Reads, previousDiskMetrics)(disk ⇒ disk.getReads)) - rd.writes.record(collect(sigar, fileSystems, Writes, previousDiskMetrics)(disk ⇒ disk.getWrites)) - rd.queue.record(collect(sigar, fileSystems, Queue, previousDiskMetrics)(disk ⇒ toLong(disk.getQueue))) - rd.serviceTime.record(collect(sigar, fileSystems, Service, previousDiskMetrics)(disk ⇒ toLong(disk.getServiceTime))) - } - - def collect(sigar: SigarProxy, fileSystems: Set[String], name: String, previousMetrics: TrieMap[String, mutable.Map[String, Long]])(thunk: DiskUsage ⇒ Long): Long = { - fileSystems.foldLeft(0L) { (acc, fileSystem) ⇒ - { - val disk = sigar.getDiskUsage(fileSystem) - val previous = previousMetrics.getOrElse(fileSystem, mutable.Map.empty[String, Long]) - val value = thunk(disk) - val current = if (value == Sigar.FIELD_NOTIMPL) 0L else value - val delta = current - previous.getOrElse(name, 0L) - previousMetrics.put(fileSystem, previous += name -> current) - acc + delta - } - } - } - - private def recordLoadAverage(lar: LoadAverageMetricsRecorder) = { - val loadAverage = sigar.getLoadAverage - val (one, five, fifteen) = (loadAverage(0), loadAverage(1), loadAverage(2)) - - lar.one.record(toLong(one)) - lar.five.record(toLong(five)) - lar.fifteen.record(toLong(fifteen)) - } - - private def recordContextSwitches(rcs: ContextSwitchesMetricsRecorder) = { - def contextSwitchesByProcess(pid: Long): (Long, Long) = { - val filename = s"/proc/$pid/status" - var voluntaryContextSwitches = 0L - var nonVoluntaryContextSwitches = 0L - - try { - for (line ← Source.fromFile(filename).getLines()) { - if (line.startsWith("voluntary_ctxt_switches")) { - voluntaryContextSwitches = line.substring(line.indexOf(":") + 1).trim.toLong - } - if (line.startsWith("nonvoluntary_ctxt_switches")) { - nonVoluntaryContextSwitches = line.substring(line.indexOf(":") + 1).trim.toLong - } - } - } catch { - case ex: IOException ⇒ log.error("Error trying to read [{}]", filename) - } - (voluntaryContextSwitches, nonVoluntaryContextSwitches) - } - - def contextSwitches: Long = { - val filename = "/proc/stat" - var contextSwitches = 0L - - try { - for (line ← Source.fromFile(filename).getLines()) { - if (line.startsWith("rcs")) { - contextSwitches = line.substring(line.indexOf(" ") + 1).toLong - } - } - } catch { - case ex: IOException ⇒ log.error("Error trying to read [{}]", filename) - } - contextSwitches - } - - val (perProcessVoluntary, perProcessNonVoluntary) = contextSwitchesByProcess(pid) - rcs.perProcessVoluntary.record(perProcessVoluntary) - rcs.perProcessNonVoluntary.record(perProcessNonVoluntary) - rcs.global.record(contextSwitches) - } - - def verifiedSigarInstance: SigarProxy = { - val sigar = new Sigar() - printBanner(sigar) - sigar - } - - def provisionSigarLibrary: Unit = { - val folder = SystemMetrics(context.system).sigarFolder - SigarProvisioner.provision(new File(folder)) - } - - def createSigarInstance: SigarProxy = { - // 1) Assume that library is already provisioned. - try { - return verifiedSigarInstance - } catch { - // Not using [[Try]] - any error is non-fatal in this case. - case e: Throwable ⇒ log.info(s"Sigar is not yet provisioned: ${e}") - } - - // 2) Attempt to provision library via sigar-loader. - try { - provisionSigarLibrary - return verifiedSigarInstance - } catch { - // Not using [[Try]] - any error is non-fatal in this case. - case e: Throwable ⇒ throw new UnexpectedSigarException(s"Failed to load Sigar: ${e}") - } - } -} - -object SystemMetricsCollector { - val NetworkFilter = Set("lo") - val previousDiskMetrics = TrieMap[String, mutable.Map[String, Long]]() - val previousNetworkMetrics = TrieMap[String, mutable.Map[String, Long]]() - - object Networks { - val RxBytes = "rxBytes" - val TxBytes = "txBytes" - val RxErrors = "rxErrors" - val TxErrors = "txErrors" - val RxDropped = "rxDropped" - val TxDropped = "txDropped" - } - - object Disks { - val Reads = "reads" - val Writes = "writes" - val Queue = "queue" - val Service = "service" - } - case object Collect - - object OsUtils { - def isLinux: Boolean = System.getProperty("os.name").indexOf("Linux") != -1 - } - - def props(collectInterval: FiniteDuration): Props = Props(classOf[SystemMetricsCollector], collectInterval) -} \ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala new file mode 100644 index 00000000..df120611 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala @@ -0,0 +1,70 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package kamon.system + +import java.io.File +import akka.actor._ +import akka.event.Logging +import kamon.system.custom.{ ContextSwitchesUpdater, ContextSwitchesMetrics } +import kamon.system.jmx._ +import kamon.{ ModuleSupervisor, Kamon } +import kamon.metric._ +import kamon.sigar.SigarProvisioner +import kamon.system.sigar.SigarMetricsUpdater + +import kamon.util.ConfigTools.Syntax + +object SystemMetrics extends ExtensionId[SystemMetricsExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = SystemMetrics + override def createExtension(system: ExtendedActorSystem): SystemMetricsExtension = new SystemMetricsExtension(system) +} + +class SystemMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { + + val log = Logging(system, classOf[SystemMetricsExtension]) + log.info(s"Starting the Kamon(SystemMetrics) extension") + + val config = system.settings.config.getConfig("kamon.system-metrics") + val sigarFolder = config.getString("sigar-native-folder") + val sigarRefreshInterval = config.getFiniteDuration("sigar-metrics-refresh-interval") + val contextSwitchesRefreshInterval = config.getFiniteDuration("context-switches-refresh-interval") + val metricsExtension = Kamon(Metrics)(system) + + // Sigar-based metrics + SigarProvisioner.provision(new File(sigarFolder)) + val sigarMetricsRecorder = ModuleSupervisor.get(system).createModule("sigar-metrics-recorder", + SigarMetricsUpdater.props(sigarRefreshInterval).withDispatcher("kamon.system-metrics.sigar-dispatcher")) + + // JMX Metrics + ClassLoadingMetrics.register(metricsExtension) + GarbageCollectionMetrics.register(metricsExtension) + HeapMemoryMetrics.register(metricsExtension) + NonHeapMemoryMetrics.register(metricsExtension) + ThreadsMetrics.register(metricsExtension) + + // If we are in Linux, add ContextSwitchesMetrics as well. + if (isLinux) { + val contextSwitchesRecorder = ContextSwitchesMetrics.register(system, contextSwitchesRefreshInterval) + + ModuleSupervisor.get(system).createModule("context-switches-metrics-recorder", + ContextSwitchesUpdater.props(contextSwitchesRecorder, sigarRefreshInterval) + .withDispatcher("kamon.system-metrics.context-switches-dispatcher")) + } + + def isLinux: Boolean = + System.getProperty("os.name").indexOf("Linux") != -1 + +} \ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/system/custom/ContextSwitchesMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/custom/ContextSwitchesMetrics.scala new file mode 100644 index 00000000..a3c56733 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/custom/ContextSwitchesMetrics.scala @@ -0,0 +1,96 @@ +package kamon.system.custom + +import java.io.IOException +import java.nio.charset.StandardCharsets +import java.nio.file.{ Paths, Files } + +import akka.actor.{ Props, Actor, ActorSystem } +import akka.event.{ Logging, LoggingAdapter } +import kamon.Kamon +import kamon.metric._ +import kamon.metric.instrument.InstrumentFactory +import kamon.system.custom.ContextSwitchesUpdater.UpdateContextSwitches +import org.hyperic.sigar.Sigar +import scala.collection.JavaConverters.iterableAsScalaIterableConverter +import scala.concurrent.duration.FiniteDuration + +class ContextSwitchesMetrics(pid: Long, log: LoggingAdapter, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val perProcessVoluntary = histogram("context-switches-process-voluntary") + val perProcessNonVoluntary = histogram("context-switches-process-non-voluntary") + val global = histogram("context-switches-global") + + def update(): Unit = { + def contextSwitchesByProcess(pid: Long): (Long, Long) = { + val filename = s"/proc/$pid/status" + var voluntaryContextSwitches = 0L + var nonVoluntaryContextSwitches = 0L + + try { + for (line ← Files.readAllLines(Paths.get(filename), StandardCharsets.US_ASCII).asScala.toList) { + if (line.startsWith("voluntary_ctxt_switches")) { + voluntaryContextSwitches = line.substring(line.indexOf(":") + 1).trim.toLong + } + if (line.startsWith("nonvoluntary_ctxt_switches")) { + nonVoluntaryContextSwitches = line.substring(line.indexOf(":") + 1).trim.toLong + } + } + } catch { + case ex: IOException ⇒ log.error("Error trying to read [{}]", filename) + } + (voluntaryContextSwitches, nonVoluntaryContextSwitches) + } + + def contextSwitches: Long = { + val filename = "/proc/stat" + var contextSwitches = 0L + + try { + for (line ← Files.readAllLines(Paths.get(filename), StandardCharsets.US_ASCII).asScala.toList) { + if (line.startsWith("rcs")) { + contextSwitches = line.substring(line.indexOf(" ") + 1).toLong + } + } + } catch { + case ex: IOException ⇒ log.error("Error trying to read [{}]", filename) + } + contextSwitches + } + + val (voluntary, nonVoluntary) = contextSwitchesByProcess(pid) + perProcessVoluntary.record(voluntary) + perProcessNonVoluntary.record(nonVoluntary) + global.record(contextSwitches) + } +} + +object ContextSwitchesMetrics { + + def register(system: ActorSystem, refreshInterval: FiniteDuration): ContextSwitchesMetrics = { + val metricsExtension = Kamon(Metrics)(system) + val log = Logging(system, "ContextSwitchesMetrics") + val pid = (new Sigar).getPid + + val instrumentFactory = metricsExtension.instrumentFactory("system-metric") + metricsExtension.register(Entity("context-switches", "system-metric"), new ContextSwitchesMetrics(pid, log, instrumentFactory)).recorder + } +} + +class ContextSwitchesUpdater(csm: ContextSwitchesMetrics, refreshInterval: FiniteDuration) extends Actor { + val schedule = context.system.scheduler.schedule(refreshInterval, refreshInterval, self, UpdateContextSwitches)(context.dispatcher) + + def receive = { + case UpdateContextSwitches ⇒ csm.update() + } + + override def postStop(): Unit = { + schedule.cancel() + super.postStop() + } +} + +object ContextSwitchesUpdater { + case object UpdateContextSwitches + + def props(csm: ContextSwitchesMetrics, refreshInterval: FiniteDuration): Props = + Props(new ContextSwitchesUpdater(csm, refreshInterval)) +} \ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/system/jmx/ClassLoadingMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/jmx/ClassLoadingMetrics.scala new file mode 100644 index 00000000..d9379738 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/jmx/ClassLoadingMetrics.scala @@ -0,0 +1,28 @@ +package kamon.system.jmx + +import java.lang.management.ManagementFactory + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.{ Memory, InstrumentFactory } + +class ClassLoadingMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val classLoadingBean = ManagementFactory.getClassLoadingMXBean + + gauge("classes-loaded", Memory.Bytes, () ⇒ { + classLoadingBean.getTotalLoadedClassCount + }) + + gauge("classes-unloaded", Memory.Bytes, () ⇒ { + classLoadingBean.getUnloadedClassCount + }) + + gauge("classes-currently-loaded", Memory.Bytes, () ⇒ { + classLoadingBean.getLoadedClassCount.toLong + }) + +} + +object ClassLoadingMetrics extends JmxSystemMetricRecorderCompanion("class-loading") { + def apply(instrumentFactory: InstrumentFactory): ClassLoadingMetrics = + new ClassLoadingMetrics(instrumentFactory) +} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/jmx/GarbageCollectionMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/jmx/GarbageCollectionMetrics.scala new file mode 100644 index 00000000..b7d2fe6a --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/jmx/GarbageCollectionMetrics.scala @@ -0,0 +1,34 @@ +package kamon.system.jmx + +import java.lang.management.{ GarbageCollectorMXBean, ManagementFactory } + +import kamon.metric.{ Entity, MetricsExtension, GenericEntityRecorder } +import kamon.metric.instrument.{ DifferentialValueCollector, Time, InstrumentFactory } +import scala.collection.JavaConverters._ + +class GarbageCollectionMetrics(gc: GarbageCollectorMXBean, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + + gauge("garbage-collection-count", DifferentialValueCollector(() ⇒ { + gc.getCollectionCount + })) + + gauge("garbage-collection-time", Time.Milliseconds, DifferentialValueCollector(() ⇒ { + gc.getCollectionTime + })) + +} + +object GarbageCollectionMetrics { + + def sanitizeCollectorName(name: String): String = + name.replaceAll("""[^\w]""", "-").toLowerCase + + def register(metricsExtension: MetricsExtension): Unit = { + + val instrumentFactory = metricsExtension.instrumentFactory("system-metric") + ManagementFactory.getGarbageCollectorMXBeans.asScala.filter(_.isValid) map { gc ⇒ + val gcName = sanitizeCollectorName(gc.getName) + metricsExtension.register(Entity(s"$gcName-garbage-collector", "system-metric"), new GarbageCollectionMetrics(gc, instrumentFactory)) + } + } +} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/jmx/HeapMemoryMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/jmx/HeapMemoryMetrics.scala new file mode 100644 index 00000000..a96b5319 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/jmx/HeapMemoryMetrics.scala @@ -0,0 +1,29 @@ +package kamon.system.jmx + +import java.lang.management.ManagementFactory + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.{ Memory, InstrumentFactory } + +class HeapMemoryMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val memoryBean = ManagementFactory.getMemoryMXBean + def nonHeapUsage = memoryBean.getHeapMemoryUsage + + gauge("heap-used", Memory.Bytes, () ⇒ { + nonHeapUsage.getUsed + }) + + gauge("heap-max", Memory.Bytes, () ⇒ { + nonHeapUsage.getMax + }) + + gauge("heap-committed", Memory.Bytes, () ⇒ { + nonHeapUsage.getCommitted + }) + +} + +object HeapMemoryMetrics extends JmxSystemMetricRecorderCompanion("heap-memory") { + def apply(instrumentFactory: InstrumentFactory): HeapMemoryMetrics = + new HeapMemoryMetrics(instrumentFactory) +} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/jmx/JmxSystemMetricRecorderCompanion.scala b/kamon-system-metrics/src/main/scala/kamon/system/jmx/JmxSystemMetricRecorderCompanion.scala new file mode 100644 index 00000000..d19622e6 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/jmx/JmxSystemMetricRecorderCompanion.scala @@ -0,0 +1,13 @@ +package kamon.system.jmx + +import kamon.metric.instrument.InstrumentFactory +import kamon.metric.{ Entity, EntityRecorder, MetricsExtension } + +abstract class JmxSystemMetricRecorderCompanion(metricName: String) { + def register(metricsExtension: MetricsExtension): EntityRecorder = { + val instrumentFactory = metricsExtension.instrumentFactory("system-metric") + metricsExtension.register(Entity(metricName, "system-metric"), apply(instrumentFactory)).recorder + } + + def apply(instrumentFactory: InstrumentFactory): EntityRecorder +} \ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/system/jmx/NonHeapMemoryMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/jmx/NonHeapMemoryMetrics.scala new file mode 100644 index 00000000..34a23d4f --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/jmx/NonHeapMemoryMetrics.scala @@ -0,0 +1,33 @@ +package kamon.system.jmx + +import java.lang.management.ManagementFactory + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.{ Memory, InstrumentFactory } + +class NonHeapMemoryMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val memoryBean = ManagementFactory.getMemoryMXBean + def nonHeapUsage = memoryBean.getNonHeapMemoryUsage + + gauge("non-heap-used", Memory.Bytes, () ⇒ { + nonHeapUsage.getUsed + }) + + gauge("non-heap-max", Memory.Bytes, () ⇒ { + val max = nonHeapUsage.getMax + + // .getMax can return -1 if the max is not defined. + if (max >= 0) max + else 0 + }) + + gauge("non-heap-committed", Memory.Bytes, () ⇒ { + nonHeapUsage.getCommitted + }) + +} + +object NonHeapMemoryMetrics extends JmxSystemMetricRecorderCompanion("non-heap-memory") { + def apply(instrumentFactory: InstrumentFactory): NonHeapMemoryMetrics = + new NonHeapMemoryMetrics(instrumentFactory) +} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/jmx/ThreadsMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/jmx/ThreadsMetrics.scala new file mode 100644 index 00000000..b33eb3e6 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/jmx/ThreadsMetrics.scala @@ -0,0 +1,28 @@ +package kamon.system.jmx + +import java.lang.management.ManagementFactory + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.InstrumentFactory + +class ThreadsMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val threadsBean = ManagementFactory.getThreadMXBean + + gauge("daemon-thread-count", () ⇒ { + threadsBean.getDaemonThreadCount.toLong + }) + + gauge("peak-thread-count", () ⇒ { + threadsBean.getPeakThreadCount.toLong + }) + + gauge("thread-count", () ⇒ { + threadsBean.getThreadCount.toLong + }) + +} + +object ThreadsMetrics extends JmxSystemMetricRecorderCompanion("threads") { + def apply(instrumentFactory: InstrumentFactory): ThreadsMetrics = + new ThreadsMetrics(instrumentFactory) +} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/CpuMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/CpuMetrics.scala new file mode 100644 index 00000000..0a5f6494 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/CpuMetrics.scala @@ -0,0 +1,29 @@ +package kamon.system.sigar + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.InstrumentFactory +import org.hyperic.sigar.Sigar + +class CpuMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with SigarMetric { + val user = histogram("cpu-user") + val system = histogram("cpu-system") + val Wait = histogram("cpu-wait") + val idle = histogram("cpu-idle") + val stolen = histogram("cpu-stolen") + + def update(sigar: Sigar): Unit = { + val cpuPerc = sigar.getCpuPerc + + user.record((cpuPerc.getUser * 100L).toLong) + system.record((cpuPerc.getSys * 100L).toLong) + Wait.record((cpuPerc.getWait * 100L).toLong) + idle.record((cpuPerc.getIdle * 100L).toLong) + stolen.record((cpuPerc.getStolen * 100L).toLong) + } +} + +object CpuMetrics extends SigarMetricRecorderCompanion("cpu") { + + def apply(instrumentFactory: InstrumentFactory): CpuMetrics = + new CpuMetrics(instrumentFactory) +} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/DiffRecordingHistogram.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/DiffRecordingHistogram.scala new file mode 100644 index 00000000..94aa76d1 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/DiffRecordingHistogram.scala @@ -0,0 +1,41 @@ +package kamon.system.sigar + +import java.util.concurrent.atomic.AtomicLong + +import kamon.metric.instrument.{ CollectionContext, Histogram } + +/** + * Wrapper Histogram for cases in which the recorded values should always be the difference + * between the current value and the last recorded value. This is not thread-safe and only + * to be used with Sigar-based metrics that are securely updated within an actor. + */ +class DiffRecordingHistogram(wrappedHistogram: Histogram) extends Histogram { + @volatile private var _recordedAtLeastOnce = false + private val _lastObservedValue = new AtomicLong(0) + + private def processRecording(value: Long, count: Long): Unit = { + if (_recordedAtLeastOnce) + wrappedHistogram.record(value - _lastObservedValue.getAndSet(value), count) + else { + _lastObservedValue.set(value) + _recordedAtLeastOnce = true + } + } + + def record(value: Long): Unit = + processRecording(value, 1) + + def record(value: Long, count: Long): Unit = + processRecording(value, count) + + def cleanup: Unit = + wrappedHistogram.cleanup + + def collect(context: CollectionContext): Histogram.Snapshot = + wrappedHistogram.collect(context) +} + +object DiffRecordingHistogram { + def apply(histogram: Histogram): DiffRecordingHistogram = + new DiffRecordingHistogram(histogram) +} \ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/FileSystemMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/FileSystemMetrics.scala new file mode 100644 index 00000000..dffebf5a --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/FileSystemMetrics.scala @@ -0,0 +1,25 @@ +package kamon.system.sigar + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.{ Memory, InstrumentFactory } +import org.hyperic.sigar.{ DiskUsage, FileSystem, Sigar } + +class FileSystemMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with SigarMetric { + val reads = DiffRecordingHistogram(histogram("file-system-reads", Memory.Bytes)) + val writes = DiffRecordingHistogram(histogram("file-system-writes", Memory.Bytes)) + + def sumOfAllFileSystems(sigar: Sigar, thunk: DiskUsage ⇒ Long): Long = { + val fileSystems = sigar.getFileSystemList.filter(_.getType == FileSystem.TYPE_LOCAL_DISK).map(_.getDevName).toSet + fileSystems.map(i ⇒ thunk(sigar.getDiskUsage(i))).fold(0L)(_ + _) + } + + def update(sigar: Sigar): Unit = { + reads.record(sumOfAllFileSystems(sigar, _.getReadBytes)) + writes.record(sumOfAllFileSystems(sigar, _.getWriteBytes)) + } +} + +object FileSystemMetrics extends SigarMetricRecorderCompanion("file-system") { + def apply(instrumentFactory: InstrumentFactory): FileSystemMetrics = + new FileSystemMetrics(instrumentFactory) +} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/LoadAverageMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/LoadAverageMetrics.scala new file mode 100644 index 00000000..3e02cc8f --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/LoadAverageMetrics.scala @@ -0,0 +1,25 @@ +package kamon.system.sigar + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.InstrumentFactory +import org.hyperic.sigar.Sigar + +class LoadAverageMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with SigarMetric { + val oneMinute = histogram("one-minute") + val fiveMinutes = histogram("five-minutes") + val fifteenMinutes = histogram("fifteen-minutes") + + def update(sigar: Sigar): Unit = { + val loadAverage = sigar.getLoadAverage + + oneMinute.record(loadAverage(0).toLong) + fiveMinutes.record(loadAverage(1).toLong) + fifteenMinutes.record(loadAverage(2).toLong) + } +} + +object LoadAverageMetrics extends SigarMetricRecorderCompanion("load-average") { + + def apply(instrumentFactory: InstrumentFactory): LoadAverageMetrics = + new LoadAverageMetrics(instrumentFactory) +} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/MemoryMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/MemoryMetrics.scala new file mode 100644 index 00000000..ab7fcd88 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/MemoryMetrics.scala @@ -0,0 +1,36 @@ +package kamon.system.sigar + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.{ Memory, InstrumentFactory } +import org.hyperic.sigar.Sigar + +/** + * System memory usage metrics, as reported by Sigar: + * - used: Total used system memory. + * - free: Total free system memory (e.g. Linux plus cached). + * - swap-used: Total used system swap.. + * - swap-free: Total free system swap. + */ +class MemoryMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with SigarMetric { + val used = histogram("memory-used", Memory.Bytes) + val free = histogram("memory-free", Memory.Bytes) + val swapUsed = histogram("swap-used", Memory.Bytes) + val swapFree = histogram("swap-free", Memory.Bytes) + + def update(sigar: Sigar): Unit = { + val mem = sigar.getMem + val swap = sigar.getSwap + + used.record(mem.getUsed) + free.record(mem.getFree) + swapUsed.record(swap.getUsed) + swapFree.record(swap.getFree) + } +} + +object MemoryMetrics extends SigarMetricRecorderCompanion("memory") { + + def apply(instrumentFactory: InstrumentFactory): MemoryMetrics = + new MemoryMetrics(instrumentFactory) +} + diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/NetworkMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/NetworkMetrics.scala new file mode 100644 index 00000000..fb33b7e4 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/NetworkMetrics.scala @@ -0,0 +1,33 @@ +package kamon.system.sigar + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument._ +import org.hyperic.sigar.{ NetInterfaceStat, Sigar } + +class NetworkMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with SigarMetric { + val receivedBytes = DiffRecordingHistogram(histogram("rx-bytes", Memory.Bytes)) + val transmittedBytes = DiffRecordingHistogram(histogram("tx-bytes", Memory.Bytes)) + val receiveErrors = DiffRecordingHistogram(histogram("rx-errors")) + val transmitErrors = DiffRecordingHistogram(histogram("tx-errors")) + val receiveDrops = DiffRecordingHistogram(histogram("rx-dropped")) + val transmitDrops = DiffRecordingHistogram(histogram("tx-dropped")) + + def sumOfAllInterfaces(sigar: Sigar, thunk: NetInterfaceStat ⇒ Long): Long = { + val interfaces = sigar.getNetInterfaceList.toList.filter(_ != "lo") + interfaces.map(i ⇒ thunk(sigar.getNetInterfaceStat(i))).fold(0L)(_ + _) + } + + def update(sigar: Sigar): Unit = { + receivedBytes.record(sumOfAllInterfaces(sigar, _.getRxBytes)) + transmittedBytes.record(sumOfAllInterfaces(sigar, _.getTxBytes)) + receiveErrors.record(sumOfAllInterfaces(sigar, _.getRxErrors)) + transmitErrors.record(sumOfAllInterfaces(sigar, _.getTxErrors)) + receiveDrops.record(sumOfAllInterfaces(sigar, _.getRxDropped)) + transmitDrops.record(sumOfAllInterfaces(sigar, _.getTxDropped)) + } +} + +object NetworkMetrics extends SigarMetricRecorderCompanion("network") { + def apply(instrumentFactory: InstrumentFactory): NetworkMetrics = + new NetworkMetrics(instrumentFactory) +} \ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/ProcessCpuMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/ProcessCpuMetrics.scala new file mode 100644 index 00000000..0ca5c1c8 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/ProcessCpuMetrics.scala @@ -0,0 +1,39 @@ +package kamon.system.sigar + +import kamon.metric.GenericEntityRecorder +import kamon.metric.instrument.InstrumentFactory +import org.hyperic.sigar.{ ProcCpu, Sigar } + +class ProcessCpuMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with SigarMetric { + val processUserCpu = histogram("process-user-cpu") + val processSystemCpu = histogram("process-system-cpu") + val processTotalCpu = histogram("process-cpu") + + var lastProcCpu: Option[ProcCpu] = None + + def update(sigar: Sigar): Unit = { + val pid = sigar.getPid + val procCpu = sigar.getProcCpu(pid) + + lastProcCpu.map { last ⇒ + val timeDiff = procCpu.getLastTime - last.getLastTime + if (timeDiff > 0) { + val userPercent = (((procCpu.getUser - last.getUser) / timeDiff.toDouble) * 100).toLong + val systemPercent = (((procCpu.getSys - last.getSys) / timeDiff.toDouble) * 100).toLong + + processUserCpu.record(userPercent) + processSystemCpu.record(systemPercent) + processTotalCpu.record(userPercent + systemPercent) + } + } + + lastProcCpu = Some(procCpu) + + } +} + +object ProcessCpuMetrics extends SigarMetricRecorderCompanion("process-cpu") { + + def apply(instrumentFactory: InstrumentFactory): ProcessCpuMetrics = + new ProcessCpuMetrics(instrumentFactory) +} diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarMetricsUpdater.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarMetricsUpdater.scala new file mode 100644 index 00000000..8a430427 --- /dev/null +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarMetricsUpdater.scala @@ -0,0 +1,59 @@ +package kamon.system.sigar + +import akka.actor.{ Props, Actor } +import kamon.Kamon +import kamon.metric.instrument.InstrumentFactory +import kamon.metric.{ Entity, EntityRecorder, MetricsExtension, Metrics } +import kamon.system.sigar.SigarMetricsUpdater.UpdateSigarMetrics +import org.hyperic.sigar.Sigar + +import scala.concurrent.duration.FiniteDuration + +class SigarMetricsUpdater(refreshInterval: FiniteDuration) extends Actor { + val sigar = new Sigar + val metricsExtension = Kamon(Metrics)(context.system) + + val sigarMetrics = List( + CpuMetrics.register(metricsExtension), + FileSystemMetrics.register(metricsExtension), + LoadAverageMetrics.register(metricsExtension), + MemoryMetrics.register(metricsExtension), + NetworkMetrics.register(metricsExtension), + ProcessCpuMetrics.register(metricsExtension)) + + val refreshSchedule = context.system.scheduler.schedule(refreshInterval, refreshInterval, self, UpdateSigarMetrics)(context.dispatcher) + + def receive = { + case UpdateSigarMetrics ⇒ updateMetrics() + } + + def updateMetrics(): Unit = { + sigarMetrics.foreach(_.update(sigar)) + } + + override def postStop(): Unit = { + refreshSchedule.cancel() + super.postStop() + } +} + +object SigarMetricsUpdater { + def props(refreshInterval: FiniteDuration): Props = + Props(new SigarMetricsUpdater((refreshInterval))) + + case object UpdateSigarMetrics +} + +trait SigarMetric extends EntityRecorder { + def update(sigar: Sigar): Unit +} + +abstract class SigarMetricRecorderCompanion(metricName: String) { + def register(metricsExtension: MetricsExtension): SigarMetric = { + val instrumentFactory = metricsExtension.instrumentFactory("system-metric") + metricsExtension.register(Entity(metricName, "system-metric"), apply(instrumentFactory)).recorder + } + + def apply(instrumentFactory: InstrumentFactory): SigarMetric +} + -- cgit v1.2.3