aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metric
diff options
context:
space:
mode:
authorIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
committerIvan Topolnak <ivantopo@gmail.com>2013-08-07 19:06:33 -0300
commit923b88e8adef2f66b43e551fa4a0a1bbae5af7ff (patch)
treed555199f0c63b690ec51805b496ee2d54eb014da /kamon-core/src/main/scala/kamon/metric
parent1e6665e30d96772eab92aca4d23e176adcd88dc5 (diff)
downloadKamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.gz
Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.tar.bz2
Kamon-923b88e8adef2f66b43e551fa4a0a1bbae5af7ff.zip
upgrading to akka 2.2
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
-rw-r--r--kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala12
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricFilter.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Metrics.scala146
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala51
-rw-r--r--kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala51
6 files changed, 333 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
new file mode 100644
index 00000000..54a13f39
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
@@ -0,0 +1,67 @@
+package kamon.metric
+
+import java.util.concurrent.{ThreadPoolExecutor, ExecutorService}
+import scala.concurrent.forkjoin.ForkJoinPool
+import com.codahale.metrics.{Metric, MetricFilter}
+
+object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector {
+
+ def register(fullName: String, executorService: ExecutorService) = executorService match {
+ case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp)
+ case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe)
+ case _ => // If it is a unknown Executor then just do nothing.
+ }
+
+ def deregister(fullName: String) = {
+ Metrics.registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
+ })
+ }
+}
+
+
+trait ForkJoinPoolMetricCollector {
+ import GaugeGenerator._
+ import BasicExecutorMetricNames._
+
+
+ def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = {
+ val forkJoinPoolGauge = newNumericGaugeFor(fjp) _
+
+ val allMetrics = Map(
+ fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt),
+ fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize),
+ fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount)
+ )
+
+ allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) }
+ }
+}
+
+trait ThreadPoolExecutorMetricCollector {
+ import GaugeGenerator._
+ import BasicExecutorMetricNames._
+
+ def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = {
+ val tpeGauge = newNumericGaugeFor(tpe) _
+
+ val allMetrics = Map(
+ fullName + queueSize -> tpeGauge(_.getQueue.size()),
+ fullName + poolSize -> tpeGauge(_.getPoolSize),
+ fullName + activeThreads -> tpeGauge(_.getActiveCount)
+ )
+
+ allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) }
+ }
+}
+
+
+object BasicExecutorMetricNames {
+ val queueSize = "queueSize"
+ val poolSize = "threads/poolSize"
+ val activeThreads = "threads/activeThreads"
+}
+
+
+
+
diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
new file mode 100644
index 00000000..30635432
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala
@@ -0,0 +1,12 @@
+package kamon.metric
+
+import com.codahale.metrics.Gauge
+
+trait GaugeGenerator {
+
+ def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] {
+ def getValue: V = generator(target)
+ }
+}
+
+object GaugeGenerator extends GaugeGenerator
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
new file mode 100644
index 00000000..fb117968
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala
@@ -0,0 +1,6 @@
+package kamon.metric
+
+object MetricFilter {
+ def actorSystem(system: String): Boolean = !system.startsWith("kamon")
+ def actor(path: String, system: String): Boolean = true
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
new file mode 100644
index 00000000..cdc0a334
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
@@ -0,0 +1,146 @@
+package kamon.metric
+
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit}
+import akka.actor.ActorRef
+import com.codahale.metrics
+import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry}
+
+
+object Metrics {
+ val registry: MetricRegistry = new MetricRegistry
+
+ val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS)
+ //consoleReporter.build().start(45, TimeUnit.SECONDS)
+
+ //val newrelicReporter = NewRelicReporter(registry)
+ //newrelicReporter.start(5, TimeUnit.SECONDS)
+
+ def include(name: String, metric: Metric) = {
+ //registry.register(name, metric)
+ }
+
+ def exclude(name: String) = {
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(name)
+ })
+ }
+
+
+
+ def deregister(fullName: String) = {
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
+ })
+ }
+}
+
+object Watched {
+ case object Actor
+ case object Dispatcher
+}
+
+object MetricDirectory {
+ def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/"
+
+ def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox"
+
+ def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/")
+
+ def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon")
+
+
+ def shouldInstrumentActor(actorPath: String): Boolean = {
+ !(actorPath.isEmpty || actorPath.startsWith("system"))
+ }
+
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram)
+
+
+
+
+trait Histogram {
+ def update(value: Long): Unit
+ def snapshot: HistogramSnapshot
+}
+
+trait HistogramSnapshot {
+ def median: Double
+ def max: Double
+ def min: Double
+}
+
+
+case class ActorSystemMetrics(actorSystemName: String) {
+ import scala.collection.JavaConverters._
+ val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala
+
+ private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram())
+
+ def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = {
+ val stats = createDispatcherCollector
+ dispatchers.put(dispatcherName, stats)
+ Some(stats)
+ }
+
+}
+
+
+case class CodahaleHistogram() extends Histogram {
+ private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir())
+
+ def update(value: Long) = histogram.update(value)
+ def snapshot: HistogramSnapshot = {
+ val snapshot = histogram.getSnapshot
+
+ CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin)
+ }
+}
+
+case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot
+
+
+
+
+
+
+
+/**
+ * Dispatcher Metrics that we care about currently with a histogram-like nature:
+ * - Work Queue Size
+ * - Total/Active Thread Count
+ */
+
+
+
+import annotation.tailrec
+import java.util.concurrent.atomic.AtomicReference
+
+object Atomic {
+ def apply[T]( obj : T) = new Atomic(new AtomicReference(obj))
+ implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref)
+}
+
+class Atomic[T](val atomic : AtomicReference[T]) {
+ @tailrec
+ final def update(f: T => T) : T = {
+ val oldValue = atomic.get()
+ val newValue = f(oldValue)
+ if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f)
+ }
+
+ def get() = atomic.get()
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
new file mode 100644
index 00000000..5b4ceaf4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala
@@ -0,0 +1,51 @@
+package kamon.metric
+
+import com.codahale.metrics._
+
+object MetricsUtils {
+
+ def markMeter[T](meter:Meter)(f: => T): T = {
+ meter.mark()
+ f
+ }
+//
+// def incrementCounter(key: String) {
+// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count
+// }
+//
+// def markMeter(key: String) {
+// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark()
+// }
+//
+// def trace[T](key: String)(f: => T): T = {
+// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) )
+// timer.time(f)
+// }
+
+// def markAndCountMeter[T](key: String)(f: => T): T = {
+// markMeter(key)
+// f
+// }
+//
+// def traceAndCount[T](key: String)(f: => T): T = {
+// incrementCounter(key)
+// trace(key) {
+// f
+// }
+ //}
+
+// implicit def runnable(f: () => Unit): Runnable =
+// new Runnable() { def run() = f() }
+//
+//
+// import java.util.concurrent.Callable
+//
+// implicit def callable[T](f: () => T): Callable[T] =
+// new Callable[T]() { def call() = f() }
+
+// private val actorCounter:Counter = new Counter
+// private val actorTimer:Timer = new Timer
+//
+// metricsRegistry.register(s"counter-for-${actorName}", actorCounter)
+// metricsRegistry.register(s"timer-for-${actorName}", actorTimer)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
new file mode 100644
index 00000000..70f3e54a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala
@@ -0,0 +1,51 @@
+package kamon.metric
+
+import com.codahale.metrics
+import metrics._
+import java.util.concurrent.TimeUnit
+import java.util
+import com.newrelic.api.agent.NewRelic
+import scala.collection.JavaConverters._
+
+
+class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) {
+
+
+
+ private[NewRelicReporter] def processMeter(name: String, meter: Meter) {
+ NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat)
+ }
+
+ private[NewRelicReporter] def processCounter(name:String, counter:Counter) {
+ println(s"Logging to NewRelic: ${counter.getCount}")
+
+ }
+
+
+/* def processGauge(name: String, gauge: Gauge[_]) = {
+ println(s"the value is: "+gauge.getValue)
+ NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float])
+ }*/
+
+
+ def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
+ //Process Meters
+ meters.asScala.map{case(name, meter) => processMeter(name, meter)}
+
+ //Process Meters
+ counters.asScala.map{case(name, counter) => processCounter(name, counter)}
+
+ // Gauges
+ gauges.asScala.foreach{ case (name, gauge) => {
+ val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue()
+ val fullMetricName = "Custom" + name
+ NewRelic.recordMetric(fullMetricName, measure)
+ }}
+ }
+
+
+}
+
+object NewRelicReporter {
+ def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS)
+} \ No newline at end of file