aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/kamon
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/kamon')
-rw-r--r--src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala63
-rw-r--r--src/main/scala/kamon/executor/eventbus.scala82
-rw-r--r--src/main/scala/kamon/metric/AkkaMetrics.scala1
-rw-r--r--src/main/scala/kamon/metric/Metrics.scala49
-rw-r--r--src/main/scala/kamon/metric/NewRelicReporter.scala41
5 files changed, 236 insertions, 0 deletions
diff --git a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala
new file mode 100644
index 00000000..62f90da8
--- /dev/null
+++ b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala
@@ -0,0 +1,63 @@
+package kamon.executor
+
+import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites}
+import com.typesafe.config.Config
+import scala.concurrent.forkjoin.ForkJoinPool
+import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService}
+import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
+import java.util
+
+class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
+ extends ForkJoinExecutorConfigurator(config, prerequisites) {
+
+ println("Created the instrumented executor")
+
+
+ class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int)
+ extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) {
+
+
+ override def createExecutorService: ExecutorService = {
+ super.createExecutorService match {
+ case fjp: AkkaForkJoinPool => new WrappedPool(fjp)
+ case other => other
+ }
+ }
+ }
+
+}
+
+case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long)
+
+class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService {
+
+
+ def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount)
+
+ def shutdown = fjp.shutdown()
+
+ def shutdownNow(): util.List[Runnable] = fjp.shutdownNow()
+
+ def isShutdown: Boolean = fjp.isShutdown
+
+ def isTerminated: Boolean = fjp.isTerminated
+
+ def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit)
+
+ def submit[T](task: Callable[T]): Future[T] = fjp.submit(task)
+
+ def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result)
+
+ def submit(task: Runnable): Future[_] = fjp.submit(task)
+
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks)
+
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit)
+
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks)
+
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks)
+
+ def execute(command: Runnable) = fjp.execute(command)
+}
+
diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala
new file mode 100644
index 00000000..d64ff444
--- /dev/null
+++ b/src/main/scala/kamon/executor/eventbus.scala
@@ -0,0 +1,82 @@
+package kamon.executor
+
+import akka.event.ActorEventBus
+import akka.event.LookupClassification
+import akka.actor.{ActorRef, ActorSystem, Props, Actor}
+import java.util.concurrent.TimeUnit
+import kamon.metric.NewRelicReporter
+
+import com.yammer.metrics.core.{MetricName, MetricsRegistry}
+import com.yammer.metrics.reporting.ConsoleReporter
+
+
+trait Message
+
+case class PostMessage(text:String) extends Message
+
+case class MessageEvent(val channel:String, val message:Message)
+
+class AppActorEventBus extends ActorEventBus with LookupClassification{
+ type Event = MessageEvent
+ type Classifier=String
+ protected def mapSize(): Int={
+ 10
+ }
+
+ protected def classify(event: Event): Classifier={
+ event.channel
+ }
+
+ protected def publish(event: Event, subscriber: Subscriber): Unit={
+ subscriber ! event
+ }
+}
+
+object TryAkka extends App{
+ val system = ActorSystem("MySystem")
+ val appActorEventBus=new AppActorEventBus
+ val NEW_POST_CHANNEL="/posts/new"
+ val subscriber = system.actorOf(Props(new Actor {
+ def receive = {
+ case d: MessageEvent => println(d)
+ }
+ }))
+
+
+
+
+ case class Ping()
+ case class Pong()
+
+ class PingActor(val target: ActorRef) extends Actor {
+ def receive = {
+ case Pong() => target ! Ping()
+ }
+ }
+
+ class PongActor extends Actor {
+ var i = 0
+ def receive = {
+ case Ping() => {
+ i=i+1
+ sender ! Pong()
+ }
+ }
+ }
+
+
+ /*
+ val newRelicReporter = new NewRelicReporter(registry)
+ newRelicReporter.start(1, TimeUnit.SECONDS)
+
+*/
+
+ for(i <- 1 to 8) {
+ val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-actor-${i}"))), s"pong-actor-${i}")
+ ping ! Pong()
+ }
+
+
+/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
+ appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
+} \ No newline at end of file
diff --git a/src/main/scala/kamon/metric/AkkaMetrics.scala b/src/main/scala/kamon/metric/AkkaMetrics.scala
new file mode 100644
index 00000000..de647c07
--- /dev/null
+++ b/src/main/scala/kamon/metric/AkkaMetrics.scala
@@ -0,0 +1 @@
+package kamon.metric
diff --git a/src/main/scala/kamon/metric/Metrics.scala b/src/main/scala/kamon/metric/Metrics.scala
new file mode 100644
index 00000000..ecfa0ec6
--- /dev/null
+++ b/src/main/scala/kamon/metric/Metrics.scala
@@ -0,0 +1,49 @@
+package kamon.metric
+
+import com.yammer.metrics.core.{MetricName, MetricsRegistry}
+import scala.collection.mutable.{HashMap,SynchronizedMap}
+import com.yammer.metrics.scala.{Meter, Counter, MetricsGroup, Timer}
+import com.yammer.metrics.reporting.{ConsoleReporter, JmxReporter}
+import scala.collection.mutable
+import java.util.concurrent.TimeUnit
+
+class Metrics {
+ private lazy val metricsRegistry: MetricsRegistry = new MetricsRegistry()
+ private lazy val metricsGroup = new MetricsGroup(this.getClass, metricsRegistry)
+
+ private lazy val meters = new mutable.HashMap[String, Meter]
+ private lazy val timers = new HashMap[String, Timer] with SynchronizedMap[String, Timer]
+ private lazy val counters = new HashMap[String, Counter] with SynchronizedMap[String, Counter]
+
+
+
+ val consoleReporter = ConsoleReporter.enable(metricsRegistry, 1, TimeUnit.SECONDS)
+ val newrelicReport = new NewRelicReporter(metricsRegistry, "newrelic-reporter");
+ newrelicReport.run()
+ newrelicReport.start(1, TimeUnit.SECONDS)
+
+ def incrementCounter(key: String) {
+ counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count
+ }
+
+ def markMeter(key: String) {
+ meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark()
+ }
+
+ def trace[T](key: String)(f: => T): T = {
+ val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) )
+ timer.time(f)
+ }
+
+ def markAndCountMeter[T](key: String)(f: => T): T = {
+ markMeter(key)
+ f
+ }
+
+ def traceAndCount[T](key: String)(f: => T): T = {
+ incrementCounter(key)
+ trace(key) {
+ f
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/scala/kamon/metric/NewRelicReporter.scala b/src/main/scala/kamon/metric/NewRelicReporter.scala
new file mode 100644
index 00000000..9aa374aa
--- /dev/null
+++ b/src/main/scala/kamon/metric/NewRelicReporter.scala
@@ -0,0 +1,41 @@
+package kamon.metric
+
+import com.newrelic.api.agent.NewRelic
+import com.yammer.metrics.reporting.AbstractPollingReporter
+import com.yammer.metrics.core._
+
+
+class NewRelicReporter(registry: MetricsRegistry, name: String) extends AbstractPollingReporter(registry, name) with MetricProcessor[String] {
+
+
+
+ def processMeter(name: MetricName, meter: Metered, context: String) {
+ println(s"Logging to NewRelic: ${meter.count()}")
+ NewRelic.recordMetric("Custom/Actor/MessagesPerSecond", meter.count())
+ }
+
+
+ def processCounter(name: MetricName, counter: Counter, context: String) {}
+
+ def processHistogram(name: MetricName, histogram: Histogram, context: String) {}
+
+ def processTimer(name: MetricName, timer: Timer, context: String) {}
+
+ def processGauge(name: MetricName, gauge: Gauge[_], context: String) {}
+
+ private final val predicate: MetricPredicate = null
+
+
+ def run() {
+ import scala.collection.JavaConversions._
+ for (entry <- getMetricsRegistry.groupedMetrics(predicate).entrySet) {
+ import scala.collection.JavaConversions._
+ for (subEntry <- entry.getValue.entrySet) {
+ subEntry.getValue.processWith(this, subEntry.getKey, "")
+ }
+
+ }
+
+ }
+
+}