diff options
author | Diego Parra <diegolparra@gmail.com> | 2013-05-09 00:44:02 -0300 |
---|---|---|
committer | Diego Parra <diegolparra@gmail.com> | 2013-05-09 00:44:02 -0300 |
commit | af23f1cd919194fe59d6fa31a612c003b5c609d2 (patch) | |
tree | df76c36febaebf8638153aeff775dca532eaf130 | |
parent | 6ee6ea75f1f230b5156a546c1e0f16f6952f99a0 (diff) | |
download | Kamon-af23f1cd919194fe59d6fa31a612c003b5c609d2.tar.gz Kamon-af23f1cd919194fe59d6fa31a612c003b5c609d2.tar.bz2 Kamon-af23f1cd919194fe59d6fa31a612c003b5c609d2.zip |
first approach to send mailbox and pool metrics
-rw-r--r-- | src/main/scala/akka/ActorSystem.scala | 9 | ||||
-rw-r--r-- | src/main/scala/akka/ActorSystemAspect.scala | 24 | ||||
-rw-r--r-- | src/main/scala/akka/MailboxAspect.scala | 37 | ||||
-rw-r--r-- | src/main/scala/akka/PoolMonitorAspect.scala | 41 |
4 files changed, 61 insertions, 50 deletions
diff --git a/src/main/scala/akka/ActorSystem.scala b/src/main/scala/akka/ActorSystem.scala new file mode 100644 index 00000000..7c381e68 --- /dev/null +++ b/src/main/scala/akka/ActorSystem.scala @@ -0,0 +1,9 @@ +package akka + +import org.aspectj.lang.Aspects + +trait ActorSystem { + lazy val actorSystemAspect = Aspects.aspectOf(classOf[ActorSystemAspect]) + lazy val actorSystem = actorSystemAspect.currentActorSystem + implicit val dispatcher = actorSystem.dispatcher +}
\ No newline at end of file diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala index 11524a2f..ce00d973 100644 --- a/src/main/scala/akka/ActorSystemAspect.scala +++ b/src/main/scala/akka/ActorSystemAspect.scala @@ -1,28 +1,18 @@ package akka -import org.aspectj.lang.annotation.{Pointcut, Before, Aspect} +import org.aspectj.lang.annotation._ import akka.actor.ActorSystemImpl import com.typesafe.config.Config -import java.util.concurrent.{TimeUnit, Executors} @Aspect +@DeclarePrecedence("ActorSystemAspect") class ActorSystemAspect { - @Pointcut("execution(akka.actor.ActorSystemImpl.new(..))") - protected def actorSystem:Unit = {} + var currentActorSystem:ActorSystemImpl = _ - @Before("actorSystem() && this(system) && args(name, config, classLoader)") - def beforeInitialize(system: ActorSystemImpl, name: String, config: Config, classLoader: ClassLoader) { - - val scheduler = Executors.newScheduledThreadPool(1); + @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && !within(ActorSystemAspect)") + protected def actorSystem():Unit = {} - scheduler.scheduleAtFixedRate(new Runnable { - def run() { - println("ActorSystemImpl" + system.name) - println("Thread Factory" + system.threadFactory.name) - println("Dispatchers" + system.dispatchers.defaultDispatcherConfig.resolve) - println("Dispatcher" + system.dispatcher.throughput) - } - }, 4, 4, TimeUnit.SECONDS) - } + @Before("actorSystem() && this(system) && args(name, config, classLoader)") + def collectActorSystem(system: ActorSystemImpl, name: String, config: Config, classLoader: ClassLoader) { currentActorSystem = system } } diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala index 3dfc9c6a..f52a2eee 100644 --- a/src/main/scala/akka/MailboxAspect.scala +++ b/src/main/scala/akka/MailboxAspect.scala @@ -1,26 +1,31 @@ package akka -import org.aspectj.lang.annotation.{Pointcut, Before, Aspect} -import java.util.concurrent.{TimeUnit, Executors} +import org.aspectj.lang.annotation._ +import scala.concurrent.duration._ +import com.newrelic.api.agent.NewRelic -@Aspect -class MailboxAspect { +@Aspect("perthis(mailboxMonitor())") +class MailboxAspect extends ActorSystem { - @Pointcut("execution(akka.dispatch.Mailbox.new(..))") - protected def mailboxMonitor:Unit = {} + @Pointcut("execution(akka.dispatch.Mailbox.new(..)) && !within(MailboxAspect)") + protected def mailboxMonitor():Unit = {} @Before("mailboxMonitor() && this(mb)") def before(mb: akka.dispatch.Mailbox) : Unit = { - val scheduler = Executors.newScheduledThreadPool(1); - - scheduler.scheduleAtFixedRate(new Runnable { + actorSystem.scheduler.schedule(5 seconds, 6 second, new Runnable { def run() { - println("Mailbox: " + mb.actor.self.path) - println("NOM: " + mb.numberOfMessages) - println("Messages: " + mb.hasMessages) - print("Dispatcher throughput: " + mb.dispatcher.throughput) - println(mb.dispatcher.id) + + val actorName = mb.actor.self.path.toString + + println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}") + + + NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages) + NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput) + + NewRelic.addCustomParameter(s"${actorName}:Mailbox:Status", mb.hasMessages.toString) + NewRelic.addCustomParameter(s"${actorName}:Mailbox:HasMessages", mb.hasMessages.toString) } - }, 6, 4, TimeUnit.SECONDS) + }) } -} +}
\ No newline at end of file diff --git a/src/main/scala/akka/PoolMonitorAspect.scala b/src/main/scala/akka/PoolMonitorAspect.scala index c6c4e4a5..36861a93 100644 --- a/src/main/scala/akka/PoolMonitorAspect.scala +++ b/src/main/scala/akka/PoolMonitorAspect.scala @@ -1,29 +1,36 @@ package akka import org.aspectj.lang.annotation.{Pointcut, Before, Aspect} -import java.util.concurrent.{TimeUnit, Executors} +import scala.concurrent.duration._ +import com.newrelic.api.agent.NewRelic -@Aspect -class PoolMonitorAspect { +@Aspect("perthis(poolMonitor())") +class PoolMonitorAspect extends ActorSystem { - @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..))") + @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && !within(PoolMonitorAspect)") protected def poolMonitor:Unit = {} @Before("poolMonitor() && this(pool)") def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool) { + actorSystem.scheduler.schedule(10 seconds, 6 second, new Runnable { + def run() { + val poolName = pool.getClass.getSimpleName - val scheduler = Executors.newScheduledThreadPool(1); + println(s"Sending metrics to Newrelic PoolMonitor -> ${poolName}") + PoolConverter.toMap(pool).map{case(k,v) => NewRelic.recordMetric(s"${poolName}:${k}",v)} + } + }) + } +} - scheduler.scheduleAtFixedRate(new Runnable { - def run() { - println("PoolName : " + pool.getClass.getSimpleName) - println("ThreadCount :" + pool.getActiveThreadCount) - println("Parallelism :" + pool.getParallelism) - println("PoolSize :" + pool.getPoolSize()) - println("Submission :" + pool.getQueuedSubmissionCount()) - println("Steals :" + pool.getStealCount()) - println("All :" + pool.toString) - } - }, 4, 4, TimeUnit.SECONDS) - } +object PoolConverter { + def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int]( + "ActiveThreadCount" -> pool.getActiveThreadCount, + "Parallelism" -> pool.getParallelism, + "PoolSize" -> pool.getPoolSize, + "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount, + "StealCount" -> pool.getStealCount.toInt, + "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt, + "RunningThreadCount" -> pool.getRunningThreadCount + ) } |