From ef5e1c0de024c4d768b202754045dcfc4843b0d8 Mon Sep 17 00:00:00 2001 From: Diego Date: Wed, 23 Jul 2014 23:51:10 -0300 Subject: = kamon-system-metrics: introduce actor system collector --- .../scala/kamon/metric/instrument/Histogram.scala | 6 +- .../src/main/scala/kamon/metrics/CPUMetrics.scala | 17 ++-- .../main/scala/kamon/metrics/MemoryMetrics.scala | 26 +++--- .../main/scala/kamon/metrics/NetworkMetrics.scala | 28 ++----- .../scala/kamon/metrics/ProcessCPUMetrics.scala | 13 ++- .../main/scala/kamon/system/SystemMetrics.scala | 93 ++++++++++++++++++++-- .../scala/kamon/metrics/SystemMetricsSpec.scala | 8 +- 7 files changed, 125 insertions(+), 66 deletions(-) diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala index 8c81e717..9ce11f49 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -35,10 +35,14 @@ object Histogram { new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale) def fromConfig(config: Config): Histogram = { + fromConfig(config, Scale.Unit) + } + + def fromConfig(config: Config, scale: Scale): Histogram = { val highest = config.getLong("highest-trackable-value") val significantDigits = config.getInt("significant-value-digits") - new HdrHistogram(1L, highest, significantDigits) + new HdrHistogram(1L, highest, significantDigits, scale) } object HighestTrackableValue { diff --git a/kamon-system-metrics/src/main/scala/kamon/metrics/CPUMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/metrics/CPUMetrics.scala index 89ec5729..99288f94 100644 --- a/kamon-system-metrics/src/main/scala/kamon/metrics/CPUMetrics.scala +++ b/kamon-system-metrics/src/main/scala/kamon/metrics/CPUMetrics.scala @@ -18,8 +18,7 @@ package kamon.metrics import akka.actor.ActorSystem import com.typesafe.config.Config import kamon.metric._ -import kamon.metric.instrument.{ Gauge, Histogram } -import kamon.system.SigarExtensionProvider +import kamon.metric.instrument.Histogram case class CPUMetrics(name: String) extends MetricGroupIdentity { val category = CPUMetrics @@ -33,7 +32,7 @@ object CPUMetrics extends MetricGroupCategory { case object Wait extends MetricIdentity { val name = "wait" } case object Idle extends MetricIdentity { val name = "idle" } - case class CPUMetricRecorder(user: Gauge, system: Gauge, cpuWait: Gauge, idle: Gauge) + case class CPUMetricRecorder(user: Histogram, system: Histogram, cpuWait: Histogram, idle: Histogram) extends MetricGroupRecorder { def collect(context: CollectionContext): MetricGroupSnapshot = { @@ -59,9 +58,7 @@ object CPUMetrics extends MetricGroupCategory { Idle -> idle) } - val Factory = new MetricGroupFactory with SigarExtensionProvider { - - def cpu = sigar.getCpu + val Factory = new MetricGroupFactory { type GroupRecorder = CPUMetricRecorder @@ -74,10 +71,10 @@ object CPUMetrics extends MetricGroupCategory { val idleConfig = settings.getConfig("idle") new CPUMetricRecorder( - Gauge.fromConfig(userConfig, system)(() ⇒ cpu.getUser), - Gauge.fromConfig(systemConfig, system)(() ⇒ cpu.getSys), - Gauge.fromConfig(cpuWaitConfig, system)(() ⇒ cpu.getWait), - Gauge.fromConfig(idleConfig, system)(() ⇒ cpu.getIdle)) + Histogram.fromConfig(userConfig), + Histogram.fromConfig(systemConfig), + Histogram.fromConfig(cpuWaitConfig), + Histogram.fromConfig(idleConfig)) } } } diff --git a/kamon-system-metrics/src/main/scala/kamon/metrics/MemoryMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/metrics/MemoryMetrics.scala index 5591fb97..b2b713af 100644 --- a/kamon-system-metrics/src/main/scala/kamon/metrics/MemoryMetrics.scala +++ b/kamon-system-metrics/src/main/scala/kamon/metrics/MemoryMetrics.scala @@ -18,10 +18,7 @@ package kamon.metrics import akka.actor.ActorSystem import com.typesafe.config.Config import kamon.metric._ -import kamon.metric.instrument.Gauge.CurrentValueCollector -import kamon.metric.instrument.{ Gauge, Histogram } -import kamon.system.SigarExtensionProvider -import org.hyperic.sigar.Mem +import kamon.metric.instrument.Histogram case class MemoryMetrics(name: String) extends MetricGroupIdentity { val category = MemoryMetrics @@ -37,7 +34,7 @@ object MemoryMetrics extends MetricGroupCategory { case object SwapUsed extends MetricIdentity { val name = "swap-used" } case object SwapFree extends MetricIdentity { val name = "swap-free" } - case class MemoryMetricRecorder(used: Gauge, free: Gauge, buffer: Gauge, cache: Gauge, swapUsed: Gauge, swapFree: Gauge) + case class MemoryMetricRecorder(used: Histogram, free: Histogram, buffer: Histogram, cache: Histogram, swapUsed: Histogram, swapFree: Histogram) extends MetricGroupRecorder { def collect(context: CollectionContext): MetricGroupSnapshot = { @@ -65,9 +62,7 @@ object MemoryMetrics extends MetricGroupCategory { SwapFree -> swapFree) } - val Factory = new MetricGroupFactory with SigarExtensionProvider { - def mem = sigar.getMem - def swap = sigar.getSwap + val Factory = new MetricGroupFactory { type GroupRecorder = MemoryMetricRecorder @@ -82,15 +77,12 @@ object MemoryMetrics extends MetricGroupCategory { val swapFreeConfig = settings.getConfig("swap-free") new MemoryMetricRecorder( - Gauge.fromConfig(usedConfig, system, Scale.Kilo)(() ⇒ mem.getUsed), - Gauge.fromConfig(freeConfig, system, Scale.Kilo)(() ⇒ mem.getFree), - Gauge.fromConfig(bufferConfig, system, Scale.Kilo)(() ⇒ swap.getUsed), - Gauge.fromConfig(cacheConfig, system, Scale.Kilo)(() ⇒ swap.getFree), - Gauge.fromConfig(swapUsedConfig, system, Scale.Kilo)(collectBuffer(mem)), - Gauge.fromConfig(swapFreeConfig, system, Scale.Kilo)(collectCache(mem))) + Histogram.fromConfig(usedConfig, Scale.Kilo), + Histogram.fromConfig(freeConfig, Scale.Kilo), + Histogram.fromConfig(swapUsedConfig, Scale.Kilo), + Histogram.fromConfig(swapFreeConfig, Scale.Kilo), + Histogram.fromConfig(bufferConfig, Scale.Kilo), + Histogram.fromConfig(cacheConfig, Scale.Kilo)) } - - private def collectBuffer(mem: Mem) = () ⇒ if (mem.getUsed() != mem.getActualUsed()) mem.getActualUsed() else 0L - private def collectCache(mem: Mem) = () ⇒ if (mem.getFree() != mem.getActualFree()) mem.getActualFree() else 0L } } \ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/metrics/NetworkMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/metrics/NetworkMetrics.scala index f6230695..831a06e3 100644 --- a/kamon-system-metrics/src/main/scala/kamon/metrics/NetworkMetrics.scala +++ b/kamon-system-metrics/src/main/scala/kamon/metrics/NetworkMetrics.scala @@ -18,10 +18,7 @@ package kamon.metrics import akka.actor.ActorSystem import com.typesafe.config.Config import kamon.metric._ -import kamon.metric.instrument.Gauge.CurrentValueCollector -import kamon.metric.instrument.{ Gauge, Histogram } -import kamon.system.SigarExtensionProvider -import org.hyperic.sigar.{ NetInterfaceStat, SigarProxy } +import kamon.metric.instrument.Histogram case class NetworkMetrics(name: String) extends MetricGroupIdentity { val category = NetworkMetrics @@ -35,7 +32,7 @@ object NetworkMetrics extends MetricGroupCategory { case object RxErrors extends MetricIdentity { val name = "rx-errors" } case object TxErrors extends MetricIdentity { val name = "tx-errors" } - case class NetworkMetricRecorder(rxBytes: Gauge, txBytes: Gauge, rxErrors: Gauge, txErrors: Gauge) + case class NetworkMetricRecorder(rxBytes: Histogram, txBytes: Histogram, rxErrors: Histogram, txErrors: Histogram) extends MetricGroupRecorder { def collect(context: CollectionContext): MetricGroupSnapshot = { @@ -61,9 +58,7 @@ object NetworkMetrics extends MetricGroupCategory { TxErrors -> txErrors) } - val Factory = new MetricGroupFactory with SigarExtensionProvider { - - val interfaces: Set[String] = sigar.getNetInterfaceList.toSet + val Factory = new MetricGroupFactory { type GroupRecorder = NetworkMetricRecorder @@ -76,19 +71,10 @@ object NetworkMetrics extends MetricGroupCategory { val txErrorsConfig = settings.getConfig("tx-errors") new NetworkMetricRecorder( - Gauge.fromConfig(rxBytesConfig, system, Scale.Kilo)(collect(sigar, interfaces)(net ⇒ net.getRxBytes)), - Gauge.fromConfig(txBytesConfig, system, Scale.Kilo)(collect(sigar, interfaces)(net ⇒ net.getTxBytes)), - Gauge.fromConfig(rxErrorsConfig, system)(collect(sigar, interfaces)(net ⇒ net.getRxErrors)), - Gauge.fromConfig(txErrorsConfig, system)(collect(sigar, interfaces)(net ⇒ net.getTxErrors))) - } - - private def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long) = () ⇒ { - interfaces.foldLeft(0L) { (totalBytes, interface) ⇒ - { - val net = sigar.getNetInterfaceStat(interface) - totalBytes + block(net) - } - } + Histogram.fromConfig(rxBytesConfig, Scale.Kilo), + Histogram.fromConfig(txBytesConfig, Scale.Kilo), + Histogram.fromConfig(rxErrorsConfig), + Histogram.fromConfig(txErrorsConfig)) } } } \ No newline at end of file diff --git a/kamon-system-metrics/src/main/scala/kamon/metrics/ProcessCPUMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/metrics/ProcessCPUMetrics.scala index 356504b7..61f7ddb2 100644 --- a/kamon-system-metrics/src/main/scala/kamon/metrics/ProcessCPUMetrics.scala +++ b/kamon-system-metrics/src/main/scala/kamon/metrics/ProcessCPUMetrics.scala @@ -17,9 +17,8 @@ package kamon.metrics import akka.actor.ActorSystem import com.typesafe.config.Config -import kamon.metric.instrument.{ Gauge, Histogram } import kamon.metric._ -import kamon.system.SigarExtensionProvider +import kamon.metric.instrument.Histogram case class ProcessCPUMetrics(name: String) extends MetricGroupIdentity { val category = ProcessCPUMetrics @@ -31,7 +30,7 @@ object ProcessCPUMetrics extends MetricGroupCategory { case object User extends MetricIdentity { val name = "user" } case object System extends MetricIdentity { val name = "system" } - case class ProcessCPUMetricsRecorder(user: Gauge, system: Gauge) + case class ProcessCPUMetricsRecorder(user: Histogram, system: Histogram) extends MetricGroupRecorder { def collect(context: CollectionContext): MetricGroupSnapshot = { @@ -55,9 +54,7 @@ object ProcessCPUMetrics extends MetricGroupCategory { System -> system) } - val Factory = new MetricGroupFactory with SigarExtensionProvider { - def pid = sigar.getPid - def cpu = sigar.getProcCpu(pid) + val Factory = new MetricGroupFactory { type GroupRecorder = ProcessCPUMetricsRecorder @@ -68,8 +65,8 @@ object ProcessCPUMetrics extends MetricGroupCategory { val systemConfig = settings.getConfig("system") new ProcessCPUMetricsRecorder( - Gauge.fromConfig(userConfig, system)(() ⇒ cpu.getUser), - Gauge.fromConfig(systemConfig, system)(() ⇒ cpu.getSys)) + Histogram.fromConfig(userConfig), + Histogram.fromConfig(systemConfig)) } } } diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala index bd156c2a..15566d7b 100644 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala +++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala @@ -22,8 +22,12 @@ import akka.event.Logging import kamon.Kamon import kamon.metric.Metrics import kamon.metrics._ +import kamon.system.SystemMetricsCollectorActor.Collect +import org.hyperic.sigar.{ NetInterfaceStat, SigarProxy, Mem } +import scala.concurrent.duration._ import scala.collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration object SystemMetrics extends ExtensionId[SystemMetricsExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = SystemMetrics @@ -40,13 +44,12 @@ class SystemMetricsExtension(private val system: ExtendedActorSystem) extends Ka val systemMetricsExtension = Kamon(Metrics)(system) - systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) - systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) - systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) - systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) + //JVM Metrics systemMetricsExtension.register(HeapMetrics(Heap), HeapMetrics.Factory) - garbageCollectors.map { gc ⇒ systemMetricsExtension.register(GCMetrics(gc.getName), GCMetrics.Factory(gc)) } + + //System Metrics + system.actorOf(SystemMetricsCollectorActor.props(1 second), "system-metrics-collector") } object SystemMetricsExtension { @@ -63,3 +66,83 @@ trait SigarExtensionProvider { lazy val sigar = SigarHolder.instance() } +class SystemMetricsCollectorActor(collectInterval: FiniteDuration) extends Actor with SigarExtensionProvider { + import kamon.system.SystemMetricsExtension._ + + val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher) + + val systemMetricsExtension = Kamon(Metrics)(context.system) + + val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory) + val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory) + val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory) + val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory) + + def receive: Receive = { + case Collect ⇒ collectMetrics() + case anything ⇒ + } + + override def postStop() = collectSchedule.cancel() + + def pid = sigar.getPid + def procCpu = sigar.getProcCpu(pid) + def cpu = sigar.getCpuPerc + def mem = sigar.getMem + def swap = sigar.getSwap + + val interfaces: Set[String] = sigar.getNetInterfaceList.toSet + + def collectMetrics() = { + cpuRecorder.map { + cpur ⇒ + cpur.user.record((cpu.getUser * 100L).toLong) + cpur.system.record((cpu.getSys * 100L).toLong) + cpur.cpuWait.record((cpu.getWait() * 100L).toLong) + cpur.idle.record((cpu.getIdle * 100L).toLong) + } + + processCpuRecorder.map { + pcpur ⇒ + pcpur.user.record(procCpu.getUser) + pcpur.system.record(procCpu.getSys) + } + + memoryRecorder.map { + mr ⇒ + mr.used.record(mem.getUsed) + mr.free.record(mem.getFree) + mr.swapUsed.record(swap.getUsed) + mr.swapFree.record(swap.getFree) + mr.buffer.record(collectBuffer(mem)) + mr.cache.record(collectCache(mem)) + } + + networkRecorder.map { + nr ⇒ + nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ net.getRxBytes)) + nr.txBytes.record(collect(sigar, interfaces)(net ⇒ net.getTxBytes)) + nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors)) + nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors)) + } + } + + private def collectBuffer(mem: Mem): Long = if (mem.getUsed() != mem.getActualUsed()) mem.getActualUsed() else 0L + private def collectCache(mem: Mem): Long = if (mem.getFree() != mem.getActualFree()) mem.getActualFree() else 0L + + private def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = { + interfaces.foldLeft(0L) { (totalBytes, interface) ⇒ + { + val net = sigar.getNetInterfaceStat(interface) + totalBytes + block(net) + } + } + } +} + +object SystemMetricsCollectorActor { + case object Collect + + def props(collectInterval: FiniteDuration): Props = + Props[SystemMetricsCollectorActor](new SystemMetricsCollectorActor(collectInterval)) +} \ No newline at end of file diff --git a/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala b/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala index ed10903f..8115c2e9 100644 --- a/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala +++ b/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala @@ -151,10 +151,10 @@ class SystemMetricsSpec extends TestKitBase with WordSpecLike with Matchers { val metricsListener = subscribeToMetrics() val CPUMetrics = expectCPUMetrics(metricsListener, 3 seconds) - CPUMetrics.user.max should be > 0L - CPUMetrics.system.max should be > 0L - CPUMetrics.cpuWait.max should be > 0L - CPUMetrics.idle.max should be > 0L + CPUMetrics.user.max should be >= 0L + CPUMetrics.system.max should be >= 0L + CPUMetrics.cpuWait.max should be >= 0L + CPUMetrics.idle.max should be >= 0L } } "the Kamon GC Metrics" should { -- cgit v1.2.3