aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiego Parra <dparra@despegar.com>2013-05-10 16:24:26 -0300
committerDiego Parra <dparra@despegar.com>2013-05-10 16:24:26 -0300
commit588f9820601aa4f48962009ab346b8696e93c7f6 (patch)
treea0e2f2d4460826da003935b293c4445ac0ec4866 /src
parentb9ff1a5ee5d3e1418e8d5bfe4e2cee48ec12bc30 (diff)
downloadKamon-588f9820601aa4f48962009ab346b8696e93c7f6.tar.gz
Kamon-588f9820601aa4f48962009ab346b8696e93c7f6.tar.bz2
Kamon-588f9820601aa4f48962009ab346b8696e93c7f6.zip
remove ActorSystemHolder and introduce Tracer
Diffstat (limited to 'src')
-rw-r--r--src/main/scala/akka/ActorAspect.scala1
-rw-r--r--src/main/scala/akka/ActorSystemAspect.scala16
-rw-r--r--src/main/scala/akka/ActorSystemHolder.scala9
-rw-r--r--src/main/scala/akka/MailboxAspect.scala25
-rw-r--r--src/main/scala/akka/PoolMonitorAspect.scala32
-rw-r--r--src/main/scala/akka/Tracer.scala68
6 files changed, 88 insertions, 63 deletions
diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala
index db96279c..744b0aea 100644
--- a/src/main/scala/akka/ActorAspect.scala
+++ b/src/main/scala/akka/ActorAspect.scala
@@ -7,6 +7,7 @@ import akka.actor.ActorCell
@Aspect
class ActorAspect extends Metrics {
+ println("Created ActorAspect")
@Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
protected def actorReceive:Unit = {}
diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala
index ce00d973..9d1d515d 100644
--- a/src/main/scala/akka/ActorSystemAspect.scala
+++ b/src/main/scala/akka/ActorSystemAspect.scala
@@ -1,18 +1,18 @@
package akka
import org.aspectj.lang.annotation._
-import akka.actor.ActorSystemImpl
-import com.typesafe.config.Config
+import actor.ActorSystemImpl
@Aspect
-@DeclarePrecedence("ActorSystemAspect")
class ActorSystemAspect {
+ println("Created ActorSystemAspect")
- var currentActorSystem:ActorSystemImpl = _
-
- @Pointcut("execution(akka.actor.ActorSystemImpl.new(..)) && !within(ActorSystemAspect)")
+ @Pointcut("execution(* akka.actor.ActorRefProvider+.init(..)) && !within(ActorSystemAspect)")
protected def actorSystem():Unit = {}
- @Before("actorSystem() && this(system) && args(name, config, classLoader)")
- def collectActorSystem(system: ActorSystemImpl, name: String, config: Config, classLoader: ClassLoader) { currentActorSystem = system }
+ @After("actorSystem() && args(system)")
+ def collectActorSystem(system: ActorSystemImpl):Unit = {
+ Tracer.collectActorSystem(system)
+ Tracer.start()
+ }
}
diff --git a/src/main/scala/akka/ActorSystemHolder.scala b/src/main/scala/akka/ActorSystemHolder.scala
deleted file mode 100644
index 0be6c89e..00000000
--- a/src/main/scala/akka/ActorSystemHolder.scala
+++ /dev/null
@@ -1,9 +0,0 @@
-package akka
-
-import org.aspectj.lang.Aspects
-
-trait ActorSystemHolder {
- lazy val actorSystemAspect = Aspects.aspectOf(classOf[ActorSystemAspect])
- lazy val actorSystem = actorSystemAspect.currentActorSystem
- lazy implicit val dispatcher = actorSystem.dispatcher
-} \ No newline at end of file
diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala
index a823d5b9..781ba055 100644
--- a/src/main/scala/akka/MailboxAspect.scala
+++ b/src/main/scala/akka/MailboxAspect.scala
@@ -1,31 +1,16 @@
package akka
import org.aspectj.lang.annotation._
-import scala.concurrent.duration._
-import com.newrelic.api.agent.NewRelic
@Aspect("perthis(mailboxMonitor())")
-class MailboxAspect extends ActorSystemHolder {
+class MailboxAspect {
+ println("Created MailboxAspect")
@Pointcut("execution(akka.dispatch.Mailbox.new(..)) && !within(MailboxAspect)")
protected def mailboxMonitor():Unit = {}
- @Before("mailboxMonitor() && this(mb)")
- def before(mb: akka.dispatch.Mailbox) : Unit = {
- actorSystem.scheduler.schedule(5 seconds, 6 second, new Runnable {
- def run() {
-
- val actorName = mb.actor.self.path.toString
-
- println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}")
-
-
- NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages)
- NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput)
-
- NewRelic.addCustomParameter(s"${actorName}:Mailbox:Status", mb.hasMessages.toString)
- NewRelic.addCustomParameter(s"${actorName}:Mailbox:HasMessages", mb.hasMessages.toString)
- }
- })
+ @After("mailboxMonitor() && this(mb)")
+ def afterInitialization(mb: akka.dispatch.Mailbox) : Unit = {
+ Tracer.collectMailBox(mb)
}
} \ No newline at end of file
diff --git a/src/main/scala/akka/PoolMonitorAspect.scala b/src/main/scala/akka/PoolMonitorAspect.scala
index 20d002a2..c83defa9 100644
--- a/src/main/scala/akka/PoolMonitorAspect.scala
+++ b/src/main/scala/akka/PoolMonitorAspect.scala
@@ -1,36 +1,16 @@
package akka
-import org.aspectj.lang.annotation.{Pointcut, Before, Aspect}
-import scala.concurrent.duration._
-import com.newrelic.api.agent.NewRelic
+import org.aspectj.lang.annotation._
@Aspect("perthis(poolMonitor())")
-class PoolMonitorAspect extends ActorSystemHolder {
+class PoolMonitorAspect {
+ println("Created PoolMonitorAspect")
@Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && !within(PoolMonitorAspect)")
protected def poolMonitor:Unit = {}
@Before("poolMonitor() && this(pool)")
- def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool) {
- actorSystem.scheduler.schedule(10 seconds, 6 second, new Runnable {
- def run() {
- val poolName = pool.getClass.getSimpleName
-
- println(s"Sending metrics to Newrelic PoolMonitor -> ${poolName}")
- PoolConverter.toMap(pool).map{case(k,v) => NewRelic.recordMetric(s"${poolName}:${k}",v)}
- }
- })
- }
-}
-
-object PoolConverter {
- def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int](
- "ActiveThreadCount" -> pool.getActiveThreadCount,
- "Parallelism" -> pool.getParallelism,
- "PoolSize" -> pool.getPoolSize,
- "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount,
- "StealCount" -> pool.getStealCount.toInt,
- "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt,
- "RunningThreadCount" -> pool.getRunningThreadCount
- )
+ def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = {
+ Tracer.collectPool(pool)
+ }
}
diff --git a/src/main/scala/akka/Tracer.scala b/src/main/scala/akka/Tracer.scala
new file mode 100644
index 00000000..4b2542b7
--- /dev/null
+++ b/src/main/scala/akka/Tracer.scala
@@ -0,0 +1,68 @@
+package akka
+
+import actor.{Props, Actor, ActorSystemImpl}
+import concurrent.forkjoin.ForkJoinPool
+import scala.concurrent.duration._
+import com.newrelic.api.agent.NewRelic
+import akka.dispatch.Mailbox
+
+object Tracer {
+ var system: ActorSystemImpl = _
+ var forkJoinPool:ForkJoinPool = _
+ var mailbox:Mailbox = _
+
+ def collectPool(pool: ForkJoinPool) = forkJoinPool = pool
+ def collectActorSystem(actorSystem: ActorSystemImpl) = system = actorSystem
+
+ def collectMailBox(mb: akka.dispatch.Mailbox) = {
+ mailbox = mb
+ }
+
+ def start():Unit ={
+ implicit val dispatcher = system.dispatcher
+ val metricsActor = system.actorOf(Props[MetricsActor], "PoolActor")
+
+ system.scheduler.schedule(10 seconds, 6 second, metricsActor, PoolMetrics(forkJoinPool))
+ system.scheduler.schedule(10 seconds, 6 second, metricsActor, MailboxMetrics(mailbox))
+ }
+}
+
+case class PoolMetrics(poolName:String, data:Map[String,Int])
+object PoolMetrics {
+ def apply(pool: ForkJoinPool) = new PoolMetrics(pool.getClass.getSimpleName, toMap(pool))
+
+ def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int](
+ "ActiveThreadCount" -> pool.getActiveThreadCount,
+ "Parallelism" -> pool.getParallelism,
+ "PoolSize" -> pool.getPoolSize,
+ "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount,
+ "StealCount" -> pool.getStealCount.toInt,
+ "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt,
+ "RunningThreadCount" -> pool.getRunningThreadCount
+ )
+}
+case class MailboxMetrics(mbName:String, mailBox:Mailbox)
+object MailboxMetrics {
+ def apply(mb: Mailbox) = new MailboxMetrics(mb.actor.self.path.toString,mb)
+}
+
+class MetricsActor extends Actor {
+ def receive = {
+ case poolMetrics:PoolMetrics => {
+ println(poolMetrics)
+ poolMetrics.data.map{case(k,v) => NewRelic.recordMetric(s"${poolMetrics.poolName}:${k}",v)}
+ }
+ case mailboxMetrics:MailboxMetrics => {
+ val actorName = mailboxMetrics.mbName
+ val mb = mailboxMetrics.mailBox
+ println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}")
+
+
+ NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages)
+ NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput)
+
+ NewRelic.addCustomParameter(s"${actorName}:Mailbox:Status", mb.hasMessages.toString)
+ NewRelic.addCustomParameter(s"${actorName}:Mailbox:HasMessages", mb.hasMessages.toString)
+ }
+ }
+} \ No newline at end of file