diff options
Diffstat (limited to 'src/main/scala')
-rw-r--r-- | src/main/scala/akka/ActorAspect.scala | 24 | ||||
-rw-r--r-- | src/main/scala/akka/ActorSystemAspect.scala | 28 | ||||
-rw-r--r-- | src/main/scala/akka/MailboxAspect.scala | 26 | ||||
-rw-r--r-- | src/main/scala/akka/PoolMonitorAspect.scala | 29 | ||||
-rw-r--r-- | src/main/scala/akka/actor/ActorAspect.scala | 25 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/Metrics.scala | 5 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/NewRelicReporter.scala | 12 |
7 files changed, 111 insertions, 38 deletions
diff --git a/src/main/scala/akka/ActorAspect.scala b/src/main/scala/akka/ActorAspect.scala new file mode 100644 index 00000000..2550752b --- /dev/null +++ b/src/main/scala/akka/ActorAspect.scala @@ -0,0 +1,24 @@ +package akka + +import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import kamon.metric.Metrics +import akka.actor.ActorCell + +@Aspect +class ActorAspect extends Metrics { + + @Pointcut("execution(* akka.actor.ActorCell+.receiveMessage(..))") + protected def actorReceive:Unit = {} + + @Around("actorReceive() && this(actor)") + def around(pjp: ProceedingJoinPoint, actor: akka.actor.ActorCell): AnyRef = { + + val actorName:String = actor.self.path.toString + + markAndCountMeter(actorName){ + pjp.proceed + } + + } +}
\ No newline at end of file diff --git a/src/main/scala/akka/ActorSystemAspect.scala b/src/main/scala/akka/ActorSystemAspect.scala new file mode 100644 index 00000000..11524a2f --- /dev/null +++ b/src/main/scala/akka/ActorSystemAspect.scala @@ -0,0 +1,28 @@ +package akka + +import org.aspectj.lang.annotation.{Pointcut, Before, Aspect} +import akka.actor.ActorSystemImpl +import com.typesafe.config.Config +import java.util.concurrent.{TimeUnit, Executors} + +@Aspect +class ActorSystemAspect { + + @Pointcut("execution(akka.actor.ActorSystemImpl.new(..))") + protected def actorSystem:Unit = {} + + @Before("actorSystem() && this(system) && args(name, config, classLoader)") + def beforeInitialize(system: ActorSystemImpl, name: String, config: Config, classLoader: ClassLoader) { + + val scheduler = Executors.newScheduledThreadPool(1); + + scheduler.scheduleAtFixedRate(new Runnable { + def run() { + println("ActorSystemImpl" + system.name) + println("Thread Factory" + system.threadFactory.name) + println("Dispatchers" + system.dispatchers.defaultDispatcherConfig.resolve) + println("Dispatcher" + system.dispatcher.throughput) + } + }, 4, 4, TimeUnit.SECONDS) + } +} diff --git a/src/main/scala/akka/MailboxAspect.scala b/src/main/scala/akka/MailboxAspect.scala new file mode 100644 index 00000000..3dfc9c6a --- /dev/null +++ b/src/main/scala/akka/MailboxAspect.scala @@ -0,0 +1,26 @@ +package akka + +import org.aspectj.lang.annotation.{Pointcut, Before, Aspect} +import java.util.concurrent.{TimeUnit, Executors} + +@Aspect +class MailboxAspect { + + @Pointcut("execution(akka.dispatch.Mailbox.new(..))") + protected def mailboxMonitor:Unit = {} + + @Before("mailboxMonitor() && this(mb)") + def before(mb: akka.dispatch.Mailbox) : Unit = { + val scheduler = Executors.newScheduledThreadPool(1); + + scheduler.scheduleAtFixedRate(new Runnable { + def run() { + println("Mailbox: " + mb.actor.self.path) + println("NOM: " + mb.numberOfMessages) + println("Messages: " + mb.hasMessages) + print("Dispatcher throughput: " + mb.dispatcher.throughput) + println(mb.dispatcher.id) + } + }, 6, 4, TimeUnit.SECONDS) + } +} diff --git a/src/main/scala/akka/PoolMonitorAspect.scala b/src/main/scala/akka/PoolMonitorAspect.scala new file mode 100644 index 00000000..c6c4e4a5 --- /dev/null +++ b/src/main/scala/akka/PoolMonitorAspect.scala @@ -0,0 +1,29 @@ +package akka + +import org.aspectj.lang.annotation.{Pointcut, Before, Aspect} +import java.util.concurrent.{TimeUnit, Executors} + +@Aspect +class PoolMonitorAspect { + + @Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..))") + protected def poolMonitor:Unit = {} + + @Before("poolMonitor() && this(pool)") + def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool) { + + val scheduler = Executors.newScheduledThreadPool(1); + + scheduler.scheduleAtFixedRate(new Runnable { + def run() { + println("PoolName : " + pool.getClass.getSimpleName) + println("ThreadCount :" + pool.getActiveThreadCount) + println("Parallelism :" + pool.getParallelism) + println("PoolSize :" + pool.getPoolSize()) + println("Submission :" + pool.getQueuedSubmissionCount()) + println("Steals :" + pool.getStealCount()) + println("All :" + pool.toString) + } + }, 4, 4, TimeUnit.SECONDS) + } +} diff --git a/src/main/scala/akka/actor/ActorAspect.scala b/src/main/scala/akka/actor/ActorAspect.scala deleted file mode 100644 index b028d8c6..00000000 --- a/src/main/scala/akka/actor/ActorAspect.scala +++ /dev/null @@ -1,25 +0,0 @@ -package akka.actor - -import org.aspectj.lang.annotation.{Around, Pointcut, Before, Aspect} -import org.aspectj.lang.ProceedingJoinPoint -import kamon.metric.Metrics - -@Aspect -class ActorAspect extends Metrics { - - - @Pointcut("execution(* ActorCell+.receiveMessage(..))") - private def actorReceive:Unit = {} - - @Around("actorReceive() && target(actor)") - def around(pjp: ProceedingJoinPoint, actor: ActorCell): AnyRef = { - - - val actorName:String = actor.self.path.toString - - markAndCountMeter(actorName){ - pjp.proceed - } - - } -}
\ No newline at end of file diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala index ecfa0ec6..4cbe25e1 100644 --- a/src/main/scala/kamon/metric/Metrics.scala +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -11,14 +11,13 @@ class Metrics { private lazy val metricsRegistry: MetricsRegistry = new MetricsRegistry() private lazy val metricsGroup = new MetricsGroup(this.getClass, metricsRegistry) - private lazy val meters = new mutable.HashMap[String, Meter] + private lazy val meters = new mutable.HashMap[String, Meter] with SynchronizedMap[String, Meter] private lazy val timers = new HashMap[String, Timer] with SynchronizedMap[String, Timer] private lazy val counters = new HashMap[String, Counter] with SynchronizedMap[String, Counter] - - val consoleReporter = ConsoleReporter.enable(metricsRegistry, 1, TimeUnit.SECONDS) val newrelicReport = new NewRelicReporter(metricsRegistry, "newrelic-reporter"); + newrelicReport.run() newrelicReport.start(1, TimeUnit.SECONDS) diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala index 9aa374aa..fa6b29f3 100644 --- a/src/main/scala/kamon/metric/NewRelicReporter.scala +++ b/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -3,12 +3,11 @@ package kamon.metric import com.newrelic.api.agent.NewRelic import com.yammer.metrics.reporting.AbstractPollingReporter import com.yammer.metrics.core._ +import scala.collection.JavaConversions._ class NewRelicReporter(registry: MetricsRegistry, name: String) extends AbstractPollingReporter(registry, name) with MetricProcessor[String] { - - def processMeter(name: MetricName, meter: Metered, context: String) { println(s"Logging to NewRelic: ${meter.count()}") NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.count()) @@ -23,19 +22,12 @@ class NewRelicReporter(registry: MetricsRegistry, name: String) extends Abstract def processGauge(name: MetricName, gauge: Gauge[_], context: String) {} - private final val predicate: MetricPredicate = null - def run() { - import scala.collection.JavaConversions._ - for (entry <- getMetricsRegistry.groupedMetrics(predicate).entrySet) { - import scala.collection.JavaConversions._ + for (entry <- getMetricsRegistry.groupedMetrics(MetricPredicate.ALL).entrySet) { for (subEntry <- entry.getValue.entrySet) { subEntry.getValue.processWith(this, subEntry.getKey, "") } - } - } - } |