From 1e6665e30d96772eab92aca4d23e176adcd88dc5 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 7 Aug 2013 11:25:08 -0300 Subject: upgraded to akka 2.2 --- src/main/scala/kamon/Kamon.scala | 64 +++++++++++++++++++++- src/main/scala/kamon/TraceContext.scala | 1 + .../ActorRefTellInstrumentation.scala | 13 +++-- .../instrumentation/ExecutorServiceMetrics.scala | 4 +- .../instrumentation/MessageQueueMetrics.scala | 4 +- src/main/scala/kamon/metric/Metrics.scala | 8 +-- 6 files changed, 80 insertions(+), 14 deletions(-) (limited to 'src/main/scala/kamon') diff --git a/src/main/scala/kamon/Kamon.scala b/src/main/scala/kamon/Kamon.scala index 9946a1fd..5a1382a4 100644 --- a/src/main/scala/kamon/Kamon.scala +++ b/src/main/scala/kamon/Kamon.scala @@ -1,9 +1,11 @@ package kamon -import akka.actor.{Props, ActorSystem} +import akka.actor.{Actor, Props, ActorSystem} import scala.collection.JavaConverters._ import java.util.concurrent.ConcurrentHashMap -import kamon.metric.{Atomic, ActorSystemMetrics} +import kamon.metric.{HistogramSnapshot, Histogram, Atomic, ActorSystemMetrics} +import scala.concurrent.duration.{FiniteDuration, Duration} +import com.newrelic.api.agent.NewRelic object Kamon { @@ -42,6 +44,11 @@ object Kamon { def actorSystem(name: String): Option[ActorSystemMetrics] = actorSystems.get(name) } + + + val metricManager = actorSystem.actorOf(Props[MetricManager], "metric-manager") + val newrelicReporter = actorSystem.actorOf(Props[NewrelicReporterActor], "newrelic-reporter") + } @@ -68,5 +75,58 @@ object Tracer { } //def newTraceContext(): TraceContext = TraceContext() +} + + +class MetricManager extends Actor { + implicit val ec = context.system.dispatcher + def receive = { + case RegisterForAllDispatchers(frequency) => { + val subscriber = sender + context.system.scheduler.schedule(frequency, frequency) { + Kamon.Metric.actorSystems.foreach { + case (asName, actorSystemMetrics) => actorSystemMetrics.dispatchers.foreach { + case (dispatcherName, dispatcherMetrics) => { + val activeThreads = dispatcherMetrics.activeThreadCount.snapshot + val poolSize = dispatcherMetrics.poolSize.snapshot + val queueSize = dispatcherMetrics.queueSize.snapshot + + subscriber ! DispatcherMetrics(asName, dispatcherName, activeThreads, poolSize, queueSize) + + } + } + } + } + } + } } + +case class RegisterForAllDispatchers(frequency: FiniteDuration) +case class DispatcherMetrics(actorSystem: String, dispatcher: String, activeThreads: HistogramSnapshot, poolSize: HistogramSnapshot, queueSize: HistogramSnapshot) + + + + + + +class NewrelicReporterActor extends Actor { + import scala.concurrent.duration._ + + Kamon.metricManager ! RegisterForAllDispatchers(5 seconds) + + def receive = { + case DispatcherMetrics(actorSystem, dispatcher, activeThreads, poolSize, queueSize) => { + println("PUBLISHED DISPATCHER STATS") + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active =>" + activeThreads.median.toFloat) + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive =>" + (poolSize.median.toFloat-activeThreads.median.toFloat)) + println(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue =>" + queueSize.median.toFloat) + + + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/active", activeThreads.median.toFloat) + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Threads/inactive", (poolSize.median.toFloat-activeThreads.median.toFloat)) + + NewRelic.recordMetric(s"Custom/$actorSystem/Dispatcher/$dispatcher/Queue", queueSize.median.toFloat) + } + } +} \ No newline at end of file diff --git a/src/main/scala/kamon/TraceContext.scala b/src/main/scala/kamon/TraceContext.scala index 0bfcd74b..6b32550f 100644 --- a/src/main/scala/kamon/TraceContext.scala +++ b/src/main/scala/kamon/TraceContext.scala @@ -20,6 +20,7 @@ case class TraceContext(id: UUID, private val entries: Agent[List[TraceEntry]], } object TraceContext { + implicit val as2 = Kamon.actorSystem.dispatcher def apply()(implicit actorSystem: ActorSystem) = new TraceContext(UUID.randomUUID(), Agent[List[TraceEntry]](Nil)) } diff --git a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala index 6677f0f7..7398a2bd 100644 --- a/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala +++ b/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala @@ -4,7 +4,7 @@ import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import akka.actor.{Props, ActorSystem, ActorRef} import kamon.{Kamon, TraceContext} -import akka.dispatch.Envelope +import akka.dispatch.{MessageDispatcher, Envelope} import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram} import kamon.metric.{MetricDirectory, Metrics} import com.codahale.metrics @@ -38,11 +38,13 @@ class ActorCellInvokeInstrumentation { var processingTimeTimer: Timer = _ var shouldTrack = false - @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, parent)") - def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {} + // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut. - @After("actorCellCreation(system, ref, props, parent)") - def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = { + @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)") + def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} + + @After("actorCellCreation(system, ref, props, dispatcher, parent)") + def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { val actorName = MetricDirectory.nameForActor(ref) val histogramName = MetricDirectory.nameForMailbox(system.name, actorName) @@ -73,6 +75,7 @@ class ActorCellInvokeInstrumentation { ctx match { case Some(c) => { Kamon.set(c) + //println("ENVELOPE ORIGINAL:---------------->"+originalEnvelope) pjp.proceedWith(originalEnvelope) Kamon.clear } diff --git a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index 1f3564d3..b4f8a475 100644 --- a/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -28,6 +28,7 @@ class ActorSystemInstrumentation { @Aspect("perthis(forkJoinPoolInstantiation(int, scala.concurrent.forkjoin.ForkJoinPool.ForkJoinWorkerThreadFactory, java.lang.Thread.UncaughtExceptionHandler))") class ForkJoinPoolInstrumentation { var activeThreadsHistogram: Histogram = _ + var poolSizeHistogram: Histogram = _ @Pointcut("execution(akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool.new(..)) && args(parallelism, threadFactory, exceptionHandler)") def forkJoinPoolInstantiation(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, exceptionHandler: Thread.UncaughtExceptionHandler) = {} @@ -42,6 +43,7 @@ class ForkJoinPoolInstrumentation { val metrics = Kamon.Metric.actorSystem(actorSystemName).get.registerDispatcher(dispatcherName) for(m <- metrics) { activeThreadsHistogram = m.activeThreadCount + poolSizeHistogram = m.poolSize println(s"Registered $dispatcherName for actor system $actorSystemName") } } @@ -59,7 +61,7 @@ class ForkJoinPoolInstrumentation { @After("forkJoinScan(fjp)") def updateMetrics(fjp: AkkaForkJoinPool): Unit = { activeThreadsHistogram.update(fjp.getActiveThreadCount) - println("UPDATED THE COUNT TWOOOO!!!") + poolSizeHistogram.update(fjp.getPoolSize) } diff --git a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index 75d6189c..c21502ac 100644 --- a/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ b/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -1,7 +1,7 @@ package kamon.instrumentation import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import akka.dispatch.{Envelope, MessageQueue} +import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue} import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} import akka.actor.{ActorSystem, ActorRef} import kamon.metric.{Metrics, MetricDirectory} @@ -44,7 +44,7 @@ class MessageQueueInstrumentation { } -class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue { +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{ def enqueue(receiver: ActorRef, handle: Envelope) = { delegate.enqueue(receiver, handle) diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index 46809d8f..3992ab43 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -10,11 +10,10 @@ object Metrics { val registry: MetricRegistry = new MetricRegistry val consoleReporter = ConsoleReporter.forRegistry(registry).convertDurationsTo(TimeUnit.NANOSECONDS) - val newrelicReporter = NewRelicReporter(registry) + //consoleReporter.build().start(45, TimeUnit.SECONDS) + //val newrelicReporter = NewRelicReporter(registry) //newrelicReporter.start(5, TimeUnit.SECONDS) - consoleReporter.build().start(10, TimeUnit.SECONDS) - def include(name: String, metric: Metric) = registry.register(name, metric) @@ -84,7 +83,8 @@ trait HistogramSnapshot { case class ActorSystemMetrics(actorSystemName: String) { - val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] + import scala.collection.JavaConverters._ + val dispatchers = new ConcurrentHashMap[String, DispatcherMetricCollector] asScala private[this] def createDispatcherCollector: DispatcherMetricCollector = DispatcherMetricCollector(CodahaleHistogram(), CodahaleHistogram(), CodahaleHistogram()) -- cgit v1.2.3