aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiego Parra <diegolparra@gmail.com>2013-05-09 00:44:02 -0300
committerDiego Parra <diegolparra@gmail.com>2013-05-09 00:44:02 -0300
commitaf23f1cd919194fe59d6fa31a612c003b5c609d2 (patch)
treedf76c36febaebf8638153aeff775dca532eaf130 /src
parent6ee6ea75f1f230b5156a546c1e0f16f6952f99a0 (diff)
downloadKamon-af23f1cd919194fe59d6fa31a612c003b5c609d2.tar.gz
Kamon-af23f1cd919194fe59d6fa31a612c003b5c609d2.tar.bz2
Kamon-af23f1cd919194fe59d6fa31a612c003b5c609d2.zip
first approach to send mailbox and pool metrics
Diffstat (limited to 'src')
-rw-r--r--src/main/scala/akka/ActorSystem.scala9
-rw-r--r--src/main/scala/akka/ActorSystemAspect.scala24
-rw-r--r--src/main/scala/akka/MailboxAspect.scala37
-rw-r--r--src/main/scala/akka/PoolMonitorAspect.scala41
4 files changed, 61 insertions, 50 deletions
diff --git a/src/main/scala/akka/ActorSystem.scala b/src/main/scala/akka/ActorSystem.scala
new file mode 100644
index 00000000..7c381e68
--- /dev/null
+++ b/src/main/scala/akka/ActorSystem.scala
@@ -0,0 +1,9 @@
+package akka
+
+import org.aspectj.lang.Aspects
+
+trait ActorSystem {
+ lazy val actorSystemAspect = Aspects.aspectOf(classOf[ActorSystemAspect])
+ lazy val actorSystem = actorSystemAspect.currentActorSystem
+ implicit val dispatcher = actorSystem.dispatcher
+} \ No newline at end of file
diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala
index 11524a2f..ce00d973 100644
--- a/src/main/scala/akka/ActorSystemAspect.scala
+++ b/src/main/scala/akka/ActorSystemAspect.scala
@@ -1,28 +1,18 @@
package akka
-import org.aspectj.lang.annotation.{Pointcut, Before, Aspect}
+import org.aspectj.lang.annotation._
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
-import java.util.concurrent.{TimeUnit, Executors}
@Aspect
+@DeclarePrecedence("ActorSystemAspect")
class ActorSystemAspect {
- @Pointcut("execution(akka.actor.ActorSystemImpl.new(..))")
- protected def actorSystem:Unit = {}
+ var currentActorSystem:ActorSystemImpl = _
- @Before("actorSystem() && this(system) && args(name, config, classLoader)")
- def beforeInitialize(system: ActorSystemImpl, name: String, config: Config, classLoader: ClassLoader) {
-
- val scheduler = Executors.newScheduledThreadPool(1);
+ @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && !within(ActorSystemAspect)")
+ protected def actorSystem():Unit = {}
- scheduler.scheduleAtFixedRate(new Runnable {
- def run() {
- println("ActorSystemImpl" + system.name)
- println("Thread Factory" + system.threadFactory.name)
- println("Dispatchers" + system.dispatchers.defaultDispatcherConfig.resolve)
- println("Dispatcher" + system.dispatcher.throughput)
- }
- }, 4, 4, TimeUnit.SECONDS)
- }
+ @Before("actorSystem() && this(system) && args(name, config, classLoader)")
+ def collectActorSystem(system: ActorSystemImpl, name: String, config: Config, classLoader: ClassLoader) { currentActorSystem = system }
}
diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala
index 3dfc9c6a..f52a2eee 100644
--- a/src/main/scala/akka/MailboxAspect.scala
+++ b/src/main/scala/akka/MailboxAspect.scala
@@ -1,26 +1,31 @@
package akka
-import org.aspectj.lang.annotation.{Pointcut, Before, Aspect}
-import java.util.concurrent.{TimeUnit, Executors}
+import org.aspectj.lang.annotation._
+import scala.concurrent.duration._
+import com.newrelic.api.agent.NewRelic
-@Aspect
-class MailboxAspect {
+@Aspect("perthis(mailboxMonitor())")
+class MailboxAspect extends ActorSystem {
- @Pointcut("execution(akka.dispatch.Mailbox.new(..))")
- protected def mailboxMonitor:Unit = {}
+ @Pointcut("execution(akka.dispatch.Mailbox.new(..)) && !within(MailboxAspect)")
+ protected def mailboxMonitor():Unit = {}
@Before("mailboxMonitor() && this(mb)")
def before(mb: akka.dispatch.Mailbox) : Unit = {
- val scheduler = Executors.newScheduledThreadPool(1);
-
- scheduler.scheduleAtFixedRate(new Runnable {
+ actorSystem.scheduler.schedule(5 seconds, 6 second, new Runnable {
def run() {
- println("Mailbox: " + mb.actor.self.path)
- println("NOM: " + mb.numberOfMessages)
- println("Messages: " + mb.hasMessages)
- print("Dispatcher throughput: " + mb.dispatcher.throughput)
- println(mb.dispatcher.id)
+
+ val actorName = mb.actor.self.path.toString
+
+ println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}")
+
+
+ NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages)
+ NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput)
+
+ NewRelic.addCustomParameter(s"${actorName}:Mailbox:Status", mb.hasMessages.toString)
+ NewRelic.addCustomParameter(s"${actorName}:Mailbox:HasMessages", mb.hasMessages.toString)
}
- }, 6, 4, TimeUnit.SECONDS)
+ })
}
-}
+} \ No newline at end of file
diff --git a/src/main/scala/akka/PoolMonitorAspect.scala b/src/main/scala/akka/PoolMonitorAspect.scala
index c6c4e4a5..36861a93 100644
--- a/src/main/scala/akka/PoolMonitorAspect.scala
+++ b/src/main/scala/akka/PoolMonitorAspect.scala
@@ -1,29 +1,36 @@
package akka
import org.aspectj.lang.annotation.{Pointcut, Before, Aspect}
-import java.util.concurrent.{TimeUnit, Executors}
+import scala.concurrent.duration._
+import com.newrelic.api.agent.NewRelic
-@Aspect
-class PoolMonitorAspect {
+@Aspect("perthis(poolMonitor())")
+class PoolMonitorAspect extends ActorSystem {
- @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..))")
+ @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && !within(PoolMonitorAspect)")
protected def poolMonitor:Unit = {}
@Before("poolMonitor() && this(pool)")
def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool) {
+ actorSystem.scheduler.schedule(10 seconds, 6 second, new Runnable {
+ def run() {
+ val poolName = pool.getClass.getSimpleName
- val scheduler = Executors.newScheduledThreadPool(1);
+ println(s"Sending metrics to Newrelic PoolMonitor -> ${poolName}")
+ PoolConverter.toMap(pool).map{case(k,v) => NewRelic.recordMetric(s"${poolName}:${k}",v)}
+ }
+ })
+ }
+}
- scheduler.scheduleAtFixedRate(new Runnable {
- def run() {
- println("PoolName : " + pool.getClass.getSimpleName)
- println("ThreadCount :" + pool.getActiveThreadCount)
- println("Parallelism :" + pool.getParallelism)
- println("PoolSize :" + pool.getPoolSize())
- println("Submission :" + pool.getQueuedSubmissionCount())
- println("Steals :" + pool.getStealCount())
- println("All :" + pool.toString)
- }
- }, 4, 4, TimeUnit.SECONDS)
- }
+object PoolConverter {
+ def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int](
+ "ActiveThreadCount" -> pool.getActiveThreadCount,
+ "Parallelism" -> pool.getParallelism,
+ "PoolSize" -> pool.getPoolSize,
+ "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount,
+ "StealCount" -> pool.getStealCount.toInt,
+ "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt,
+ "RunningThreadCount" -> pool.getRunningThreadCount
+ )
}