From 588f9820601aa4f48962009ab346b8696e93c7f6 Mon Sep 17 00:00:00 2001 From: Diego Parra Date: Fri, 10 May 2013 16:24:26 -0300 Subject: remove ActorSystemHolder and introduce Tracer --- src/main/scala/akka/ActorAspect.scala | 1 + src/main/scala/akka/ActorSystemAspect.scala | 16 +++---- src/main/scala/akka/ActorSystemHolder.scala | 9 ---- src/main/scala/akka/MailboxAspect.scala | 25 +++-------- src/main/scala/akka/PoolMonitorAspect.scala | 32 +++----------- src/main/scala/akka/Tracer.scala | 68 +++++++++++++++++++++++++++++ 6 files changed, 88 insertions(+), 63 deletions(-) delete mode 100644 src/main/scala/akka/ActorSystemHolder.scala create mode 100644 src/main/scala/akka/Tracer.scala (limited to 'src') diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala index db96279c..744b0aea 100644 --- a/src/main/scala/akka/ActorAspect.scala +++ b/src/main/scala/akka/ActorAspect.scala @@ -7,6 +7,7 @@ import akka.actor.ActorCell @Aspect class ActorAspect extends Metrics { + println("Created ActorAspect") @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") protected def actorReceive:Unit = {} diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala index ce00d973..9d1d515d 100644 --- a/src/main/scala/akka/ActorSystemAspect.scala +++ b/src/main/scala/akka/ActorSystemAspect.scala @@ -1,18 +1,18 @@ package akka import org.aspectj.lang.annotation._ -import akka.actor.ActorSystemImpl -import com.typesafe.config.Config +import actor.ActorSystemImpl @Aspect -@DeclarePrecedence("ActorSystemAspect") class ActorSystemAspect { + println("Created ActorSystemAspect") - var currentActorSystem:ActorSystemImpl = _ - - @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && !within(ActorSystemAspect)") + @Pointcut("execution(* akka.actor.ActorRefProvider+.init(..)) && !within(ActorSystemAspect)") protected def actorSystem():Unit = {} - @Before("actorSystem() && this(system) && args(name, config, classLoader)") - def collectActorSystem(system: ActorSystemImpl, name: String, config: Config, classLoader: ClassLoader) { currentActorSystem = system } + @After("actorSystem() && args(system)") + def collectActorSystem(system: ActorSystemImpl):Unit = { + Tracer.collectActorSystem(system) + Tracer.start() + } } diff --git a/src/main/scala/akka/ActorSystemHolder.scala b/src/main/scala/akka/ActorSystemHolder.scala deleted file mode 100644 index 0be6c89e..00000000 --- a/src/main/scala/akka/ActorSystemHolder.scala +++ /dev/null @@ -1,9 +0,0 @@ -package akka - -import org.aspectj.lang.Aspects - -trait ActorSystemHolder { - lazy val actorSystemAspect = Aspects.aspectOf(classOf[ActorSystemAspect]) - lazy val actorSystem = actorSystemAspect.currentActorSystem - lazy implicit val dispatcher = actorSystem.dispatcher -} \ No newline at end of file diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala index a823d5b9..781ba055 100644 --- a/src/main/scala/akka/MailboxAspect.scala +++ b/src/main/scala/akka/MailboxAspect.scala @@ -1,31 +1,16 @@ package akka import org.aspectj.lang.annotation._ -import scala.concurrent.duration._ -import com.newrelic.api.agent.NewRelic @Aspect("perthis(mailboxMonitor())") -class MailboxAspect extends ActorSystemHolder { +class MailboxAspect { + println("Created MailboxAspect") @Pointcut("execution(akka.dispatch.Mailbox.new(..)) && !within(MailboxAspect)") protected def mailboxMonitor():Unit = {} - @Before("mailboxMonitor() && this(mb)") - def before(mb: akka.dispatch.Mailbox) : Unit = { - actorSystem.scheduler.schedule(5 seconds, 6 second, new Runnable { - def run() { - - val actorName = mb.actor.self.path.toString - - println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}") - - - NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages) - NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput) - - NewRelic.addCustomParameter(s"${actorName}:Mailbox:Status", mb.hasMessages.toString) - NewRelic.addCustomParameter(s"${actorName}:Mailbox:HasMessages", mb.hasMessages.toString) - } - }) + @After("mailboxMonitor() && this(mb)") + def afterInitialization(mb: akka.dispatch.Mailbox) : Unit = { + Tracer.collectMailBox(mb) } } \ No newline at end of file diff --git a/src/main/scala/akka/PoolMonitorAspect.scala b/src/main/scala/akka/PoolMonitorAspect.scala index 20d002a2..c83defa9 100644 --- a/src/main/scala/akka/PoolMonitorAspect.scala +++ b/src/main/scala/akka/PoolMonitorAspect.scala @@ -1,36 +1,16 @@ package akka -import org.aspectj.lang.annotation.{Pointcut, Before, Aspect} -import scala.concurrent.duration._ -import com.newrelic.api.agent.NewRelic +import org.aspectj.lang.annotation._ @Aspect("perthis(poolMonitor())") -class PoolMonitorAspect extends ActorSystemHolder { +class PoolMonitorAspect { + println("Created PoolMonitorAspect") @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && !within(PoolMonitorAspect)") protected def poolMonitor:Unit = {} @Before("poolMonitor() && this(pool)") - def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool) { - actorSystem.scheduler.schedule(10 seconds, 6 second, new Runnable { - def run() { - val poolName = pool.getClass.getSimpleName - - println(s"Sending metrics to Newrelic PoolMonitor -> ${poolName}") - PoolConverter.toMap(pool).map{case(k,v) => NewRelic.recordMetric(s"${poolName}:${k}",v)} - } - }) - } -} - -object PoolConverter { - def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int]( - "ActiveThreadCount" -> pool.getActiveThreadCount, - "Parallelism" -> pool.getParallelism, - "PoolSize" -> pool.getPoolSize, - "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount, - "StealCount" -> pool.getStealCount.toInt, - "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt, - "RunningThreadCount" -> pool.getRunningThreadCount - ) + def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = { + Tracer.collectPool(pool) + } } diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala new file mode 100644 index 00000000..4b2542b7 --- /dev/null +++ b/src/main/scala/akka/Tracer.scala @@ -0,0 +1,68 @@ +package akka + +import actor.{Props, Actor, ActorSystemImpl} +import concurrent.forkjoin.ForkJoinPool +import scala.concurrent.duration._ +import com.newrelic.api.agent.NewRelic +import akka.dispatch.Mailbox + +object Tracer { + var system: ActorSystemImpl = _ + var forkJoinPool:ForkJoinPool = _ + var mailbox:Mailbox = _ + + def collectPool(pool: ForkJoinPool) = forkJoinPool = pool + def collectActorSystem(actorSystem: ActorSystemImpl) = system = actorSystem + + def collectMailBox(mb: akka.dispatch.Mailbox) = { + mailbox = mb + } + + def start():Unit ={ + implicit val dispatcher = system.dispatcher + val metricsActor = system.actorOf(Props[MetricsActor], "PoolActor") + + system.scheduler.schedule(10 seconds, 6 second, metricsActor, PoolMetrics(forkJoinPool)) + system.scheduler.schedule(10 seconds, 6 second, metricsActor, MailboxMetrics(mailbox)) + } +} + +case class PoolMetrics(poolName:String, data:Map[String,Int]) +object PoolMetrics { + def apply(pool: ForkJoinPool) = new PoolMetrics(pool.getClass.getSimpleName, toMap(pool)) + + def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int]( + "ActiveThreadCount" -> pool.getActiveThreadCount, + "Parallelism" -> pool.getParallelism, + "PoolSize" -> pool.getPoolSize, + "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount, + "StealCount" -> pool.getStealCount.toInt, + "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt, + "RunningThreadCount" -> pool.getRunningThreadCount + ) +} +case class MailboxMetrics(mbName:String, mailBox:Mailbox) +object MailboxMetrics { + def apply(mb: Mailbox) = new MailboxMetrics(mb.actor.self.path.toString,mb) +} + +class MetricsActor extends Actor { + def receive = { + case poolMetrics:PoolMetrics => { + println(poolMetrics) + poolMetrics.data.map{case(k,v) => NewRelic.recordMetric(s"${poolMetrics.poolName}:${k}",v)} + } + case mailboxMetrics:MailboxMetrics => { + val actorName = mailboxMetrics.mbName + val mb = mailboxMetrics.mailBox + println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}") + + + NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages) + NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput) + + NewRelic.addCustomParameter(s"${actorName}:Mailbox:Status", mb.hasMessages.toString) + NewRelic.addCustomParameter(s"${actorName}:Mailbox:HasMessages", mb.hasMessages.toString) + } + } +} \ No newline at end of file -- cgit v1.2.3