From 923b88e8adef2f66b43e551fa4a0a1bbae5af7ff Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 7 Aug 2013 19:06:33 -0300 Subject: upgrading to akka 2.2 --- .../metric/ExecutorServiceMetricCollector.scala | 67 ++++++++++ .../main/scala/kamon/metric/GaugeGenerator.scala | 12 ++ .../src/main/scala/kamon/metric/MetricFilter.scala | 6 + .../src/main/scala/kamon/metric/Metrics.scala | 146 +++++++++++++++++++++ .../src/main/scala/kamon/metric/MetricsUtils.scala | 51 +++++++ .../main/scala/kamon/metric/NewRelicReporter.scala | 51 +++++++ 6 files changed, 333 insertions(+) create mode 100644 kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricFilter.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Metrics.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala (limited to 'kamon-core/src/main/scala/kamon/metric') diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala new file mode 100644 index 00000000..54a13f39 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala @@ -0,0 +1,67 @@ +package kamon.metric + +import java.util.concurrent.{ThreadPoolExecutor, ExecutorService} +import scala.concurrent.forkjoin.ForkJoinPool +import com.codahale.metrics.{Metric, MetricFilter} + +object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { + + def register(fullName: String, executorService: ExecutorService) = executorService match { + case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp) + case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe) + case _ => // If it is a unknown Executor then just do nothing. + } + + def deregister(fullName: String) = { + Metrics.registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + + +trait ForkJoinPoolMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + + def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { + val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ + + val allMetrics = Map( + fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), + fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), + fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount) + ) + + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + } +} + +trait ThreadPoolExecutorMetricCollector { + import GaugeGenerator._ + import BasicExecutorMetricNames._ + + def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = { + val tpeGauge = newNumericGaugeFor(tpe) _ + + val allMetrics = Map( + fullName + queueSize -> tpeGauge(_.getQueue.size()), + fullName + poolSize -> tpeGauge(_.getPoolSize), + fullName + activeThreads -> tpeGauge(_.getActiveCount) + ) + + allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + } +} + + +object BasicExecutorMetricNames { + val queueSize = "queueSize" + val poolSize = "threads/poolSize" + val activeThreads = "threads/activeThreads" +} + + + + diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala new file mode 100644 index 00000000..30635432 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala @@ -0,0 +1,12 @@ +package kamon.metric + +import com.codahale.metrics.Gauge + +trait GaugeGenerator { + + def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] { + def getValue: V = generator(target) + } +} + +object GaugeGenerator extends GaugeGenerator diff --git a/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala new file mode 100644 index 00000000..fb117968 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricFilter.scala @@ -0,0 +1,6 @@ +package kamon.metric + +object MetricFilter { + def actorSystem(system: String): Boolean = !system.startsWith("kamon") + def actor(path: String, system: String): Boolean = true +} diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala new file mode 100644 index 00000000..cdc0a334 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala @@ -0,0 +1,146 @@ +package kamon.metric + +import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet, TimeUnit} +import akka.actor.ActorRef +import com.codahale.metrics +import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry} + + +object Metrics { + val registry: MetricRegistry = new MetricRegistry + + val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) + //consoleReporter.build().start(45, TimeUnit.SECONDS) + + //val newrelicReporter = NewRelicReporter(registry) + //newrelicReporter.start(5, TimeUnit.SECONDS) + + def include(name: String, metric: Metric) = { + //registry.register(name, metric) + } + + def exclude(name: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(name) + }) + } + + + + def deregister(fullName: String) = { + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) + }) + } +} + +object Watched { + case object Actor + case object Dispatcher +} + +object MetricDirectory { + def nameForDispatcher(actorSystem: String, dispatcher: String) = s"/ActorSystem/${actorSystem}/Dispatcher/${dispatcher}/" + + def nameForMailbox(actorSystem: String, actor: String) = s"/ActorSystem/$actorSystem/Actor/$actor/Mailbox" + + def nameForActor(actorRef: ActorRef) = actorRef.path.elements.mkString("/") + + def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon") + + + def shouldInstrumentActor(actorPath: String): Boolean = { + !(actorPath.isEmpty || actorPath.startsWith("system")) + } + + +} + + + + + + + + + + + + +case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) + + + + +trait Histogram { + def update(value: Long): Unit + def snapshot: HistogramSnapshot +} + +trait HistogramSnapshot { + def median: Double + def max: Double + def min: Double +} + + +case class ActorSystemMetrics(actorSystemName: String) { + import scala.collection.JavaConverters._ + val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala + + private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) + + def registerDispatcher(dispatcherName: String): Option[DispatcherMetricCollector] = { + val stats = createDispatcherCollector + dispatchers.put(dispatcherName, stats) + Some(stats) + } + +} + + +case class CodahaleHistogram() extends Histogram { + private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) + + def update(value: Long) = histogram.update(value) + def snapshot: HistogramSnapshot = { + val snapshot = histogram.getSnapshot + + CodahaleHistogramSnapshot(snapshot.getMedian, snapshot.getMax, snapshot.getMin) + } +} + +case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot + + + + + + + +/** + * Dispatcher Metrics that we care about currently with a histogram-like nature: + * - Work Queue Size + * - Total/Active Thread Count + */ + + + +import annotation.tailrec +import java.util.concurrent.atomic.AtomicReference + +object Atomic { + def apply[T]( obj : T) = new Atomic(new AtomicReference(obj)) + implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref) +} + +class Atomic[T](val atomic : AtomicReference[T]) { + @tailrec + final def update(f: T => T) : T = { + val oldValue = atomic.get() + val newValue = f(oldValue) + if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) + } + + def get() = atomic.get() +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala new file mode 100644 index 00000000..5b4ceaf4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala @@ -0,0 +1,51 @@ +package kamon.metric + +import com.codahale.metrics._ + +object MetricsUtils { + + def markMeter[T](meter:Meter)(f: => T): T = { + meter.mark() + f + } +// +// def incrementCounter(key: String) { +// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count +// } +// +// def markMeter(key: String) { +// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark() +// } +// +// def trace[T](key: String)(f: => T): T = { +// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) ) +// timer.time(f) +// } + +// def markAndCountMeter[T](key: String)(f: => T): T = { +// markMeter(key) +// f +// } +// +// def traceAndCount[T](key: String)(f: => T): T = { +// incrementCounter(key) +// trace(key) { +// f +// } + //} + +// implicit def runnable(f: () => Unit): Runnable = +// new Runnable() { def run() = f() } +// +// +// import java.util.concurrent.Callable +// +// implicit def callable[T](f: () => T): Callable[T] = +// new Callable[T]() { def call() = f() } + +// private val actorCounter:Counter = new Counter +// private val actorTimer:Timer = new Timer +// +// metricsRegistry.register(s"counter-for-${actorName}", actorCounter) +// metricsRegistry.register(s"timer-for-${actorName}", actorTimer) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala new file mode 100644 index 00000000..70f3e54a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -0,0 +1,51 @@ +package kamon.metric + +import com.codahale.metrics +import metrics._ +import java.util.concurrent.TimeUnit +import java.util +import com.newrelic.api.agent.NewRelic +import scala.collection.JavaConverters._ + + +class NewRelicReporter(registry: MetricRegistry, name: String,filter: MetricFilter, rateUnit: TimeUnit, durationUnit: TimeUnit) extends ScheduledReporter(registry, name, filter, rateUnit, durationUnit) { + + + + private[NewRelicReporter] def processMeter(name: String, meter: Meter) { + NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.getMeanRate().toFloat) + } + + private[NewRelicReporter] def processCounter(name:String, counter:Counter) { + println(s"Logging to NewRelic: ${counter.getCount}") + + } + + +/* def processGauge(name: String, gauge: Gauge[_]) = { + println(s"the value is: "+gauge.getValue) + NewRelic.recordMetric("Custom/ActorSystem/activeCount", gauge.getValue.asInstanceOf[Float]) + }*/ + + + def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, metrics.Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { + //Process Meters + meters.asScala.map{case(name, meter) => processMeter(name, meter)} + + //Process Meters + counters.asScala.map{case(name, counter) => processCounter(name, counter)} + + // Gauges + gauges.asScala.foreach{ case (name, gauge) => { + val measure: Float = gauge.getValue.asInstanceOf[Number].floatValue() + val fullMetricName = "Custom" + name + NewRelic.recordMetric(fullMetricName, measure) + }} + } + + +} + +object NewRelicReporter { + def apply(registry: MetricRegistry) = new NewRelicReporter(registry, "NewRelic-reporter", metrics.MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) +} \ No newline at end of file -- cgit v1.2.3