diff options
Diffstat (limited to 'src/main/scala')
-rw-r--r-- | src/main/scala/akka/actor/ActorAspect.scala | 25 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala | 63 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 82 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/AkkaMetrics.scala | 1 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/Metrics.scala | 49 | ||||
-rw-r--r-- | src/main/scala/kamon/metric/NewRelicReporter.scala | 41 |
6 files changed, 261 insertions, 0 deletions
diff --git a/src/main/scala/akka/actor/ActorAspect.scala b/src/main/scala/akka/actor/ActorAspect.scala new file mode 100644 index 00000000..b028d8c6 --- /dev/null +++ b/src/main/scala/akka/actor/ActorAspect.scala @@ -0,0 +1,25 @@ +package akka.actor + +import org.aspectj.lang.annotation.{Around, Pointcut, Before, Aspect} +import org.aspectj.lang.ProceedingJoinPoint +import kamon.metric.Metrics + +@Aspect +class ActorAspect extends Metrics { + + + @Pointcut("execution(* ActorCell+.receiveMessage(..))") + private def actorReceive:Unit = {} + + @Around("actorReceive() && target(actor)") + def around(pjp: ProceedingJoinPoint, actor: ActorCell): AnyRef = { + + + val actorName:String = actor.self.path.toString + + markAndCountMeter(actorName){ + pjp.proceed + } + + } +}
\ No newline at end of file diff --git a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala new file mode 100644 index 00000000..62f90da8 --- /dev/null +++ b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala @@ -0,0 +1,63 @@ +package kamon.executor + +import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites} +import com.typesafe.config.Config +import scala.concurrent.forkjoin.ForkJoinPool +import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService} +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool +import java.util + +class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ForkJoinExecutorConfigurator(config, prerequisites) { + + println("Created the instrumented executor") + + + class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) + extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) { + + + override def createExecutorService: ExecutorService = { + super.createExecutorService match { + case fjp: AkkaForkJoinPool => new WrappedPool(fjp) + case other => other + } + } + } + +} + +case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long) + +class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService { + + + def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount) + + def shutdown = fjp.shutdown() + + def shutdownNow(): util.List[Runnable] = fjp.shutdownNow() + + def isShutdown: Boolean = fjp.isShutdown + + def isTerminated: Boolean = fjp.isTerminated + + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit) + + def submit[T](task: Callable[T]): Future[T] = fjp.submit(task) + + def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result) + + def submit(task: Runnable): Future[_] = fjp.submit(task) + + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks) + + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit) + + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks) + + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks) + + def execute(command: Runnable) = fjp.execute(command) +} + diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala new file mode 100644 index 00000000..d64ff444 --- /dev/null +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -0,0 +1,82 @@ +package kamon.executor + +import akka.event.ActorEventBus +import akka.event.LookupClassification +import akka.actor.{ActorRef, ActorSystem, Props, Actor} +import java.util.concurrent.TimeUnit +import kamon.metric.NewRelicReporter + +import com.yammer.metrics.core.{MetricName, MetricsRegistry} +import com.yammer.metrics.reporting.ConsoleReporter + + +trait Message + +case class PostMessage(text:String) extends Message + +case class MessageEvent(val channel:String, val message:Message) + +class AppActorEventBus extends ActorEventBus with LookupClassification{ + type Event = MessageEvent + type Classifier=String + protected def mapSize(): Int={ + 10 + } + + protected def classify(event: Event): Classifier={ + event.channel + } + + protected def publish(event: Event, subscriber: Subscriber): Unit={ + subscriber ! event + } +} + +object TryAkka extends App{ + val system = ActorSystem("MySystem") + val appActorEventBus=new AppActorEventBus + val NEW_POST_CHANNEL="/posts/new" + val subscriber = system.actorOf(Props(new Actor { + def receive = { + case d: MessageEvent => println(d) + } + })) + + + + + case class Ping() + case class Pong() + + class PingActor(val target: ActorRef) extends Actor { + def receive = { + case Pong() => target ! Ping() + } + } + + class PongActor extends Actor { + var i = 0 + def receive = { + case Ping() => { + i=i+1 + sender ! Pong() + } + } + } + + + /* + val newRelicReporter = new NewRelicReporter(registry) + newRelicReporter.start(1, TimeUnit.SECONDS) + +*/ + + for(i <- 1 to 8) { + val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-actor-${i}"))), s"pong-actor-${i}") + ping ! Pong() + } + + +/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) + appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ +}
\ No newline at end of file diff --git a/src/main/scala/kamon/metric/AkkaMetrics.scala b/src/main/scala/kamon/metric/AkkaMetrics.scala new file mode 100644 index 00000000..de647c07 --- /dev/null +++ b/src/main/scala/kamon/metric/AkkaMetrics.scala @@ -0,0 +1 @@ +package kamon.metric diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala new file mode 100644 index 00000000..ecfa0ec6 --- /dev/null +++ b/src/main/scala/kamon/metric/Metrics.scala @@ -0,0 +1,49 @@ +package kamon.metric + +import com.yammer.metrics.core.{MetricName, MetricsRegistry} +import scala.collection.mutable.{HashMap,SynchronizedMap} +import com.yammer.metrics.scala.{Meter, Counter, MetricsGroup, Timer} +import com.yammer.metrics.reporting.{ConsoleReporter, JmxReporter} +import scala.collection.mutable +import java.util.concurrent.TimeUnit + +class Metrics { + private lazy val metricsRegistry: MetricsRegistry = new MetricsRegistry() + private lazy val metricsGroup = new MetricsGroup(this.getClass, metricsRegistry) + + private lazy val meters = new mutable.HashMap[String, Meter] + private lazy val timers = new HashMap[String, Timer] with SynchronizedMap[String, Timer] + private lazy val counters = new HashMap[String, Counter] with SynchronizedMap[String, Counter] + + + + val consoleReporter = ConsoleReporter.enable(metricsRegistry, 1, TimeUnit.SECONDS) + val newrelicReport = new NewRelicReporter(metricsRegistry, "newrelic-reporter"); + newrelicReport.run() + newrelicReport.start(1, TimeUnit.SECONDS) + + def incrementCounter(key: String) { + counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count + } + + def markMeter(key: String) { + meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark() + } + + def trace[T](key: String)(f: => T): T = { + val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) ) + timer.time(f) + } + + def markAndCountMeter[T](key: String)(f: => T): T = { + markMeter(key) + f + } + + def traceAndCount[T](key: String)(f: => T): T = { + incrementCounter(key) + trace(key) { + f + } + } +}
\ No newline at end of file diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala new file mode 100644 index 00000000..9aa374aa --- /dev/null +++ b/src/main/scala/kamon/metric/NewRelicReporter.scala @@ -0,0 +1,41 @@ +package kamon.metric + +import com.newrelic.api.agent.NewRelic +import com.yammer.metrics.reporting.AbstractPollingReporter +import com.yammer.metrics.core._ + + +class NewRelicReporter(registry: MetricsRegistry, name: String) extends AbstractPollingReporter(registry, name) with MetricProcessor[String] { + + + + def processMeter(name: MetricName, meter: Metered, context: String) { + println(s"Logging to NewRelic: ${meter.count()}") + NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.count()) + } + + + def processCounter(name: MetricName, counter: Counter, context: String) {} + + def processHistogram(name: MetricName, histogram: Histogram, context: String) {} + + def processTimer(name: MetricName, timer: Timer, context: String) {} + + def processGauge(name: MetricName, gauge: Gauge[_], context: String) {} + + private final val predicate: MetricPredicate = null + + + def run() { + import scala.collection.JavaConversions._ + for (entry <- getMetricsRegistry.groupedMetrics(predicate).entrySet) { + import scala.collection.JavaConversions._ + for (subEntry <- entry.getValue.entrySet) { + subEntry.getValue.processWith(this, subEntry.getKey, "") + } + + } + + } + +} |