aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego Parra <diegolparra@gmail.com>2013-05-05 20:33:28 -0300
committerDiego Parra <diegolparra@gmail.com>2013-05-05 20:33:28 -0300
commit2f81f03742860a0a57cebeea6f0160f3c0d80a48 (patch)
tree2715ea8499f8245d31976ff2450c47da3cf64f4c
parent2caece9ef7574406c548b4a1f333de4c9579b3a2 (diff)
downloadKamon-2f81f03742860a0a57cebeea6f0160f3c0d80a48.tar.gz
Kamon-2f81f03742860a0a57cebeea6f0160f3c0d80a48.tar.bz2
Kamon-2f81f03742860a0a57cebeea6f0160f3c0d80a48.zip
added Mailbox, PoolMonitor and ActorSystem Aspects and fix newRelicReporter
-rw-r--r--src/main/resources/META-INF/aop.xml5
-rw-r--r--src/main/scala/akka/ActorAspect.scala24
-rw-r--r--src/main/scala/akka/ActorSystemAspect.scala28
-rw-r--r--src/main/scala/akka/MailboxAspect.scala26
-rw-r--r--src/main/scala/akka/PoolMonitorAspect.scala29
-rw-r--r--src/main/scala/akka/actor/ActorAspect.scala25
-rw-r--r--src/main/scala/kamon/metric/Metrics.scala5
-rw-r--r--src/main/scala/kamon/metric/NewRelicReporter.scala12
8 files changed, 115 insertions, 39 deletions
diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml
index 03ccb0e8..d678e4ad 100644
--- a/src/main/resources/META-INF/aop.xml
+++ b/src/main/resources/META-INF/aop.xml
@@ -6,7 +6,10 @@
<weaver options="-verbose"/>
<aspects>
- <aspect name="akka.actor.ActorAspect"/>
+ <aspect name="akka.ActorSystemAspect"/>
+ <aspect name="akka.MailboxAspect"/>
+ <aspect name="akka.PoolMonitorAspect"/>
+ <aspect name="akka.ActorAspect"/>
<include within="*"/>
<exclude within="javax.*"/>
diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala
new file mode 100644
index 00000000..2550752b
--- /dev/null
+++ b/src/main/scala/akka/ActorAspect.scala
@@ -0,0 +1,24 @@
+package akka
+
+import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
+import org.aspectj.lang.ProceedingJoinPoint
+import kamon.metric.Metrics
+import akka.actor.ActorCell
+
+@Aspect
+class ActorAspect extends Metrics {
+
+ @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))")
+ protected def actorReceive:Unit = {}
+
+ @Around("actorReceive() && this(actor)")
+ def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = {
+
+ val actorName:String = actor.self.path.toString
+
+ markAndCountMeter(actorName){
+ pjp.proceed
+ }
+
+ }
+} \ No newline at end of file
diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala
new file mode 100644
index 00000000..11524a2f
--- /dev/null
+++ b/src/main/scala/akka/ActorSystemAspect.scala
@@ -0,0 +1,28 @@
+package akka
+
+import org.aspectj.lang.annotation.{Pointcut, Before, Aspect}
+import akka.actor.ActorSystemImpl
+import com.typesafe.config.Config
+import java.util.concurrent.{TimeUnit, Executors}
+
+@Aspect
+class ActorSystemAspect {
+
+ @Pointcut("execution(akka.actor.ActorSystemImpl.new(..))")
+ protected def actorSystem:Unit = {}
+
+ @Before("actorSystem() && this(system) && args(name, config, classLoader)")
+ def beforeInitialize(system: ActorSystemImpl, name: String, config: Config, classLoader: ClassLoader) {
+
+ val scheduler = Executors.newScheduledThreadPool(1);
+
+ scheduler.scheduleAtFixedRate(new Runnable {
+ def run() {
+ println("ActorSystemImpl" + system.name)
+ println("Thread Factory" + system.threadFactory.name)
+ println("Dispatchers" + system.dispatchers.defaultDispatcherConfig.resolve)
+ println("Dispatcher" + system.dispatcher.throughput)
+ }
+ }, 4, 4, TimeUnit.SECONDS)
+ }
+}
diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala
new file mode 100644
index 00000000..3dfc9c6a
--- /dev/null
+++ b/src/main/scala/akka/MailboxAspect.scala
@@ -0,0 +1,26 @@
+package akka
+
+import org.aspectj.lang.annotation.{Pointcut, Before, Aspect}
+import java.util.concurrent.{TimeUnit, Executors}
+
+@Aspect
+class MailboxAspect {
+
+ @Pointcut("execution(akka.dispatch.Mailbox.new(..))")
+ protected def mailboxMonitor:Unit = {}
+
+ @Before("mailboxMonitor() && this(mb)")
+ def before(mb: akka.dispatch.Mailbox) : Unit = {
+ val scheduler = Executors.newScheduledThreadPool(1);
+
+ scheduler.scheduleAtFixedRate(new Runnable {
+ def run() {
+ println("Mailbox: " + mb.actor.self.path)
+ println("NOM: " + mb.numberOfMessages)
+ println("Messages: " + mb.hasMessages)
+ print("Dispatcher throughput: " + mb.dispatcher.throughput)
+ println(mb.dispatcher.id)
+ }
+ }, 6, 4, TimeUnit.SECONDS)
+ }
+}
diff --git a/src/main/scala/akka/PoolMonitorAspect.scala b/src/main/scala/akka/PoolMonitorAspect.scala
new file mode 100644
index 00000000..c6c4e4a5
--- /dev/null
+++ b/src/main/scala/akka/PoolMonitorAspect.scala
@@ -0,0 +1,29 @@
+package akka
+
+import org.aspectj.lang.annotation.{Pointcut, Before, Aspect}
+import java.util.concurrent.{TimeUnit, Executors}
+
+@Aspect
+class PoolMonitorAspect {
+
+ @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..))")
+ protected def poolMonitor:Unit = {}
+
+ @Before("poolMonitor() && this(pool)")
+ def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool) {
+
+ val scheduler = Executors.newScheduledThreadPool(1);
+
+ scheduler.scheduleAtFixedRate(new Runnable {
+ def run() {
+ println("PoolName : " + pool.getClass.getSimpleName)
+ println("ThreadCount :" + pool.getActiveThreadCount)
+ println("Parallelism :" + pool.getParallelism)
+ println("PoolSize :" + pool.getPoolSize())
+ println("Submission :" + pool.getQueuedSubmissionCount())
+ println("Steals :" + pool.getStealCount())
+ println("All :" + pool.toString)
+ }
+ }, 4, 4, TimeUnit.SECONDS)
+ }
+}
diff --git a/src/main/scala/akka/actor/ActorAspect.scala b/src/main/scala/akka/actor/ActorAspect.scala
deleted file mode 100644
index b028d8c6..00000000
--- a/src/main/scala/akka/actor/ActorAspect.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package akka.actor
-
-import org.aspectj.lang.annotation.{Around, Pointcut, Before, Aspect}
-import org.aspectj.lang.ProceedingJoinPoint
-import kamon.metric.Metrics
-
-@Aspect
-class ActorAspect extends Metrics {
-
-
- @Pointcut("execution(* ActorCell+.receiveMessage(..))")
- private def actorReceive:Unit = {}
-
- @Around("actorReceive() && target(actor)")
- def around(pjp: ProceedingJoinPoint, actor: ActorCell): AnyRef = {
-
-
- val actorName:String = actor.self.path.toString
-
- markAndCountMeter(actorName){
- pjp.proceed
- }
-
- }
-} \ No newline at end of file
diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala
index ecfa0ec6..4cbe25e1 100644
--- a/src/main/scala/kamon/metric/Metrics.scala
+++ b/src/main/scala/kamon/metric/Metrics.scala
@@ -11,14 +11,13 @@ class Metrics {
private lazy val metricsRegistry: MetricsRegistry = new MetricsRegistry()
private lazy val metricsGroup = new MetricsGroup(this.getClass, metricsRegistry)
- private lazy val meters = new mutable.HashMap[String, Meter]
+ private lazy val meters = new mutable.HashMap[String, Meter] with SynchronizedMap[String, Meter]
private lazy val timers = new HashMap[String, Timer] with SynchronizedMap[String, Timer]
private lazy val counters = new HashMap[String, Counter] with SynchronizedMap[String, Counter]
-
-
val consoleReporter = ConsoleReporter.enable(metricsRegistry, 1, TimeUnit.SECONDS)
val newrelicReport = new NewRelicReporter(metricsRegistry, "newrelic-reporter");
+
newrelicReport.run()
newrelicReport.start(1, TimeUnit.SECONDS)
diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala
index 9aa374aa..fa6b29f3 100644
--- a/src/main/scala/kamon/metric/NewRelicReporter.scala
+++ b/src/main/scala/kamon/metric/NewRelicReporter.scala
@@ -3,12 +3,11 @@ package kamon.metric
import com.newrelic.api.agent.NewRelic
import com.yammer.metrics.reporting.AbstractPollingReporter
import com.yammer.metrics.core._
+import scala.collection.JavaConversions._
class NewRelicReporter(registry: MetricsRegistry, name: String) extends AbstractPollingReporter(registry, name) with MetricProcessor[String] {
-
-
def processMeter(name: MetricName, meter: Metered, context: String) {
println(s"Logging to NewRelic: ${meter.count()}")
NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.count())
@@ -23,19 +22,12 @@ class NewRelicReporter(registry: MetricsRegistry, name: String) extends Abstract
def processGauge(name: MetricName, gauge: Gauge[_], context: String) {}
- private final val predicate: MetricPredicate = null
-
def run() {
- import scala.collection.JavaConversions._
- for (entry <- getMetricsRegistry.groupedMetrics(predicate).entrySet) {
- import scala.collection.JavaConversions._
+ for (entry <- getMetricsRegistry.groupedMetrics(MetricPredicate.ALL).entrySet) {
for (subEntry <- entry.getValue.entrySet) {
subEntry.getValue.processWith(this, subEntry.getKey, "")
}
-
}
-
}
-
}