aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-07-23 23:51:10 -0300
committerDiego <diegolparra@gmail.com>2014-07-23 23:51:10 -0300
commitef5e1c0de024c4d768b202754045dcfc4843b0d8 (patch)
treebb662fa12e474e840502e76fb663def6537e7f91
parentc7a6d66636518a73cae4b79f20e1737f3f02e2ec (diff)
downloadKamon-ef5e1c0de024c4d768b202754045dcfc4843b0d8.tar.gz
Kamon-ef5e1c0de024c4d768b202754045dcfc4843b0d8.tar.bz2
Kamon-ef5e1c0de024c4d768b202754045dcfc4843b0d8.zip
= kamon-system-metrics: introduce actor system collector
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala6
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/metrics/CPUMetrics.scala17
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/metrics/MemoryMetrics.scala26
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/metrics/NetworkMetrics.scala28
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/metrics/ProcessCPUMetrics.scala13
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala93
-rw-r--r--kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala8
7 files changed, 125 insertions, 66 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
index 8c81e717..9ce11f49 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
@@ -35,10 +35,14 @@ object Histogram {
new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale)
def fromConfig(config: Config): Histogram = {
+ fromConfig(config, Scale.Unit)
+ }
+
+ def fromConfig(config: Config, scale: Scale): Histogram = {
val highest = config.getLong("highest-trackable-value")
val significantDigits = config.getInt("significant-value-digits")
- new HdrHistogram(1L, highest, significantDigits)
+ new HdrHistogram(1L, highest, significantDigits, scale)
}
object HighestTrackableValue {
diff --git a/kamon-system-metrics/src/main/scala/kamon/metrics/CPUMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/metrics/CPUMetrics.scala
index 89ec5729..99288f94 100644
--- a/kamon-system-metrics/src/main/scala/kamon/metrics/CPUMetrics.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/metrics/CPUMetrics.scala
@@ -18,8 +18,7 @@ package kamon.metrics
import akka.actor.ActorSystem
import com.typesafe.config.Config
import kamon.metric._
-import kamon.metric.instrument.{ Gauge, Histogram }
-import kamon.system.SigarExtensionProvider
+import kamon.metric.instrument.Histogram
case class CPUMetrics(name: String) extends MetricGroupIdentity {
val category = CPUMetrics
@@ -33,7 +32,7 @@ object CPUMetrics extends MetricGroupCategory {
case object Wait extends MetricIdentity { val name = "wait" }
case object Idle extends MetricIdentity { val name = "idle" }
- case class CPUMetricRecorder(user: Gauge, system: Gauge, cpuWait: Gauge, idle: Gauge)
+ case class CPUMetricRecorder(user: Histogram, system: Histogram, cpuWait: Histogram, idle: Histogram)
extends MetricGroupRecorder {
def collect(context: CollectionContext): MetricGroupSnapshot = {
@@ -59,9 +58,7 @@ object CPUMetrics extends MetricGroupCategory {
Idle -> idle)
}
- val Factory = new MetricGroupFactory with SigarExtensionProvider {
-
- def cpu = sigar.getCpu
+ val Factory = new MetricGroupFactory {
type GroupRecorder = CPUMetricRecorder
@@ -74,10 +71,10 @@ object CPUMetrics extends MetricGroupCategory {
val idleConfig = settings.getConfig("idle")
new CPUMetricRecorder(
- Gauge.fromConfig(userConfig, system)(() ⇒ cpu.getUser),
- Gauge.fromConfig(systemConfig, system)(() ⇒ cpu.getSys),
- Gauge.fromConfig(cpuWaitConfig, system)(() ⇒ cpu.getWait),
- Gauge.fromConfig(idleConfig, system)(() ⇒ cpu.getIdle))
+ Histogram.fromConfig(userConfig),
+ Histogram.fromConfig(systemConfig),
+ Histogram.fromConfig(cpuWaitConfig),
+ Histogram.fromConfig(idleConfig))
}
}
}
diff --git a/kamon-system-metrics/src/main/scala/kamon/metrics/MemoryMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/metrics/MemoryMetrics.scala
index 5591fb97..b2b713af 100644
--- a/kamon-system-metrics/src/main/scala/kamon/metrics/MemoryMetrics.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/metrics/MemoryMetrics.scala
@@ -18,10 +18,7 @@ package kamon.metrics
import akka.actor.ActorSystem
import com.typesafe.config.Config
import kamon.metric._
-import kamon.metric.instrument.Gauge.CurrentValueCollector
-import kamon.metric.instrument.{ Gauge, Histogram }
-import kamon.system.SigarExtensionProvider
-import org.hyperic.sigar.Mem
+import kamon.metric.instrument.Histogram
case class MemoryMetrics(name: String) extends MetricGroupIdentity {
val category = MemoryMetrics
@@ -37,7 +34,7 @@ object MemoryMetrics extends MetricGroupCategory {
case object SwapUsed extends MetricIdentity { val name = "swap-used" }
case object SwapFree extends MetricIdentity { val name = "swap-free" }
- case class MemoryMetricRecorder(used: Gauge, free: Gauge, buffer: Gauge, cache: Gauge, swapUsed: Gauge, swapFree: Gauge)
+ case class MemoryMetricRecorder(used: Histogram, free: Histogram, buffer: Histogram, cache: Histogram, swapUsed: Histogram, swapFree: Histogram)
extends MetricGroupRecorder {
def collect(context: CollectionContext): MetricGroupSnapshot = {
@@ -65,9 +62,7 @@ object MemoryMetrics extends MetricGroupCategory {
SwapFree -> swapFree)
}
- val Factory = new MetricGroupFactory with SigarExtensionProvider {
- def mem = sigar.getMem
- def swap = sigar.getSwap
+ val Factory = new MetricGroupFactory {
type GroupRecorder = MemoryMetricRecorder
@@ -82,15 +77,12 @@ object MemoryMetrics extends MetricGroupCategory {
val swapFreeConfig = settings.getConfig("swap-free")
new MemoryMetricRecorder(
- Gauge.fromConfig(usedConfig, system, Scale.Kilo)(() ⇒ mem.getUsed),
- Gauge.fromConfig(freeConfig, system, Scale.Kilo)(() ⇒ mem.getFree),
- Gauge.fromConfig(bufferConfig, system, Scale.Kilo)(() ⇒ swap.getUsed),
- Gauge.fromConfig(cacheConfig, system, Scale.Kilo)(() ⇒ swap.getFree),
- Gauge.fromConfig(swapUsedConfig, system, Scale.Kilo)(collectBuffer(mem)),
- Gauge.fromConfig(swapFreeConfig, system, Scale.Kilo)(collectCache(mem)))
+ Histogram.fromConfig(usedConfig, Scale.Kilo),
+ Histogram.fromConfig(freeConfig, Scale.Kilo),
+ Histogram.fromConfig(swapUsedConfig, Scale.Kilo),
+ Histogram.fromConfig(swapFreeConfig, Scale.Kilo),
+ Histogram.fromConfig(bufferConfig, Scale.Kilo),
+ Histogram.fromConfig(cacheConfig, Scale.Kilo))
}
-
- private def collectBuffer(mem: Mem) = () ⇒ if (mem.getUsed() != mem.getActualUsed()) mem.getActualUsed() else 0L
- private def collectCache(mem: Mem) = () ⇒ if (mem.getFree() != mem.getActualFree()) mem.getActualFree() else 0L
}
} \ No newline at end of file
diff --git a/kamon-system-metrics/src/main/scala/kamon/metrics/NetworkMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/metrics/NetworkMetrics.scala
index f6230695..831a06e3 100644
--- a/kamon-system-metrics/src/main/scala/kamon/metrics/NetworkMetrics.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/metrics/NetworkMetrics.scala
@@ -18,10 +18,7 @@ package kamon.metrics
import akka.actor.ActorSystem
import com.typesafe.config.Config
import kamon.metric._
-import kamon.metric.instrument.Gauge.CurrentValueCollector
-import kamon.metric.instrument.{ Gauge, Histogram }
-import kamon.system.SigarExtensionProvider
-import org.hyperic.sigar.{ NetInterfaceStat, SigarProxy }
+import kamon.metric.instrument.Histogram
case class NetworkMetrics(name: String) extends MetricGroupIdentity {
val category = NetworkMetrics
@@ -35,7 +32,7 @@ object NetworkMetrics extends MetricGroupCategory {
case object RxErrors extends MetricIdentity { val name = "rx-errors" }
case object TxErrors extends MetricIdentity { val name = "tx-errors" }
- case class NetworkMetricRecorder(rxBytes: Gauge, txBytes: Gauge, rxErrors: Gauge, txErrors: Gauge)
+ case class NetworkMetricRecorder(rxBytes: Histogram, txBytes: Histogram, rxErrors: Histogram, txErrors: Histogram)
extends MetricGroupRecorder {
def collect(context: CollectionContext): MetricGroupSnapshot = {
@@ -61,9 +58,7 @@ object NetworkMetrics extends MetricGroupCategory {
TxErrors -> txErrors)
}
- val Factory = new MetricGroupFactory with SigarExtensionProvider {
-
- val interfaces: Set[String] = sigar.getNetInterfaceList.toSet
+ val Factory = new MetricGroupFactory {
type GroupRecorder = NetworkMetricRecorder
@@ -76,19 +71,10 @@ object NetworkMetrics extends MetricGroupCategory {
val txErrorsConfig = settings.getConfig("tx-errors")
new NetworkMetricRecorder(
- Gauge.fromConfig(rxBytesConfig, system, Scale.Kilo)(collect(sigar, interfaces)(net ⇒ net.getRxBytes)),
- Gauge.fromConfig(txBytesConfig, system, Scale.Kilo)(collect(sigar, interfaces)(net ⇒ net.getTxBytes)),
- Gauge.fromConfig(rxErrorsConfig, system)(collect(sigar, interfaces)(net ⇒ net.getRxErrors)),
- Gauge.fromConfig(txErrorsConfig, system)(collect(sigar, interfaces)(net ⇒ net.getTxErrors)))
- }
-
- private def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long) = () ⇒ {
- interfaces.foldLeft(0L) { (totalBytes, interface) ⇒
- {
- val net = sigar.getNetInterfaceStat(interface)
- totalBytes + block(net)
- }
- }
+ Histogram.fromConfig(rxBytesConfig, Scale.Kilo),
+ Histogram.fromConfig(txBytesConfig, Scale.Kilo),
+ Histogram.fromConfig(rxErrorsConfig),
+ Histogram.fromConfig(txErrorsConfig))
}
}
} \ No newline at end of file
diff --git a/kamon-system-metrics/src/main/scala/kamon/metrics/ProcessCPUMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/metrics/ProcessCPUMetrics.scala
index 356504b7..61f7ddb2 100644
--- a/kamon-system-metrics/src/main/scala/kamon/metrics/ProcessCPUMetrics.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/metrics/ProcessCPUMetrics.scala
@@ -17,9 +17,8 @@ package kamon.metrics
import akka.actor.ActorSystem
import com.typesafe.config.Config
-import kamon.metric.instrument.{ Gauge, Histogram }
import kamon.metric._
-import kamon.system.SigarExtensionProvider
+import kamon.metric.instrument.Histogram
case class ProcessCPUMetrics(name: String) extends MetricGroupIdentity {
val category = ProcessCPUMetrics
@@ -31,7 +30,7 @@ object ProcessCPUMetrics extends MetricGroupCategory {
case object User extends MetricIdentity { val name = "user" }
case object System extends MetricIdentity { val name = "system" }
- case class ProcessCPUMetricsRecorder(user: Gauge, system: Gauge)
+ case class ProcessCPUMetricsRecorder(user: Histogram, system: Histogram)
extends MetricGroupRecorder {
def collect(context: CollectionContext): MetricGroupSnapshot = {
@@ -55,9 +54,7 @@ object ProcessCPUMetrics extends MetricGroupCategory {
System -> system)
}
- val Factory = new MetricGroupFactory with SigarExtensionProvider {
- def pid = sigar.getPid
- def cpu = sigar.getProcCpu(pid)
+ val Factory = new MetricGroupFactory {
type GroupRecorder = ProcessCPUMetricsRecorder
@@ -68,8 +65,8 @@ object ProcessCPUMetrics extends MetricGroupCategory {
val systemConfig = settings.getConfig("system")
new ProcessCPUMetricsRecorder(
- Gauge.fromConfig(userConfig, system)(() ⇒ cpu.getUser),
- Gauge.fromConfig(systemConfig, system)(() ⇒ cpu.getSys))
+ Histogram.fromConfig(userConfig),
+ Histogram.fromConfig(systemConfig))
}
}
}
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
index bd156c2a..15566d7b 100644
--- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetrics.scala
@@ -22,8 +22,12 @@ import akka.event.Logging
import kamon.Kamon
import kamon.metric.Metrics
import kamon.metrics._
+import kamon.system.SystemMetricsCollectorActor.Collect
+import org.hyperic.sigar.{ NetInterfaceStat, SigarProxy, Mem }
+import scala.concurrent.duration._
import scala.collection.JavaConverters._
+import scala.concurrent.duration.FiniteDuration
object SystemMetrics extends ExtensionId[SystemMetricsExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = SystemMetrics
@@ -40,13 +44,12 @@ class SystemMetricsExtension(private val system: ExtendedActorSystem) extends Ka
val systemMetricsExtension = Kamon(Metrics)(system)
- systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
- systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
- systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)
- systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
+ //JVM Metrics
systemMetricsExtension.register(HeapMetrics(Heap), HeapMetrics.Factory)
-
garbageCollectors.map { gc ⇒ systemMetricsExtension.register(GCMetrics(gc.getName), GCMetrics.Factory(gc)) }
+
+ //System Metrics
+ system.actorOf(SystemMetricsCollectorActor.props(1 second), "system-metrics-collector")
}
object SystemMetricsExtension {
@@ -63,3 +66,83 @@ trait SigarExtensionProvider {
lazy val sigar = SigarHolder.instance()
}
+class SystemMetricsCollectorActor(collectInterval: FiniteDuration) extends Actor with SigarExtensionProvider {
+ import kamon.system.SystemMetricsExtension._
+
+ val collectSchedule = context.system.scheduler.schedule(collectInterval, collectInterval, self, Collect)(context.dispatcher)
+
+ val systemMetricsExtension = Kamon(Metrics)(context.system)
+
+ val cpuRecorder = systemMetricsExtension.register(CPUMetrics(CPU), CPUMetrics.Factory)
+ val processCpuRecorder = systemMetricsExtension.register(ProcessCPUMetrics(ProcessCPU), ProcessCPUMetrics.Factory)
+ val memoryRecorder = systemMetricsExtension.register(MemoryMetrics(Memory), MemoryMetrics.Factory)
+ val networkRecorder = systemMetricsExtension.register(NetworkMetrics(Network), NetworkMetrics.Factory)
+
+ def receive: Receive = {
+ case Collect ⇒ collectMetrics()
+ case anything ⇒
+ }
+
+ override def postStop() = collectSchedule.cancel()
+
+ def pid = sigar.getPid
+ def procCpu = sigar.getProcCpu(pid)
+ def cpu = sigar.getCpuPerc
+ def mem = sigar.getMem
+ def swap = sigar.getSwap
+
+ val interfaces: Set[String] = sigar.getNetInterfaceList.toSet
+
+ def collectMetrics() = {
+ cpuRecorder.map {
+ cpur ⇒
+ cpur.user.record((cpu.getUser * 100L).toLong)
+ cpur.system.record((cpu.getSys * 100L).toLong)
+ cpur.cpuWait.record((cpu.getWait() * 100L).toLong)
+ cpur.idle.record((cpu.getIdle * 100L).toLong)
+ }
+
+ processCpuRecorder.map {
+ pcpur ⇒
+ pcpur.user.record(procCpu.getUser)
+ pcpur.system.record(procCpu.getSys)
+ }
+
+ memoryRecorder.map {
+ mr ⇒
+ mr.used.record(mem.getUsed)
+ mr.free.record(mem.getFree)
+ mr.swapUsed.record(swap.getUsed)
+ mr.swapFree.record(swap.getFree)
+ mr.buffer.record(collectBuffer(mem))
+ mr.cache.record(collectCache(mem))
+ }
+
+ networkRecorder.map {
+ nr ⇒
+ nr.rxBytes.record(collect(sigar, interfaces)(net ⇒ net.getRxBytes))
+ nr.txBytes.record(collect(sigar, interfaces)(net ⇒ net.getTxBytes))
+ nr.rxErrors.record(collect(sigar, interfaces)(net ⇒ net.getRxErrors))
+ nr.txErrors.record(collect(sigar, interfaces)(net ⇒ net.getTxErrors))
+ }
+ }
+
+ private def collectBuffer(mem: Mem): Long = if (mem.getUsed() != mem.getActualUsed()) mem.getActualUsed() else 0L
+ private def collectCache(mem: Mem): Long = if (mem.getFree() != mem.getActualFree()) mem.getActualFree() else 0L
+
+ private def collect(sigar: SigarProxy, interfaces: Set[String])(block: NetInterfaceStat ⇒ Long): Long = {
+ interfaces.foldLeft(0L) { (totalBytes, interface) ⇒
+ {
+ val net = sigar.getNetInterfaceStat(interface)
+ totalBytes + block(net)
+ }
+ }
+ }
+}
+
+object SystemMetricsCollectorActor {
+ case object Collect
+
+ def props(collectInterval: FiniteDuration): Props =
+ Props[SystemMetricsCollectorActor](new SystemMetricsCollectorActor(collectInterval))
+} \ No newline at end of file
diff --git a/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala b/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala
index ed10903f..8115c2e9 100644
--- a/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala
+++ b/kamon-system-metrics/src/test/scala/kamon/metrics/SystemMetricsSpec.scala
@@ -151,10 +151,10 @@ class SystemMetricsSpec extends TestKitBase with WordSpecLike with Matchers {
val metricsListener = subscribeToMetrics()
val CPUMetrics = expectCPUMetrics(metricsListener, 3 seconds)
- CPUMetrics.user.max should be > 0L
- CPUMetrics.system.max should be > 0L
- CPUMetrics.cpuWait.max should be > 0L
- CPUMetrics.idle.max should be > 0L
+ CPUMetrics.user.max should be >= 0L
+ CPUMetrics.system.max should be >= 0L
+ CPUMetrics.cpuWait.max should be >= 0L
+ CPUMetrics.idle.max should be >= 0L
}
}
"the Kamon GC Metrics" should {