diff options
author | Diego Parra <dparra@despegar.com> | 2013-05-03 17:06:10 -0300 |
---|---|---|
committer | Diego Parra <dparra@despegar.com> | 2013-05-03 17:06:10 -0300 |
commit | 2caece9ef7574406c548b4a1f333de4c9579b3a2 (patch) | |
tree | 9536c6b95fac7dca34a5c2dae27d931b88694632 /src/main/scala/kamon/executor | |
parent | fc8a371548d9de3c5e0719dcfcf041ca31bc2227 (diff) | |
download | Kamon-2caece9ef7574406c548b4a1f333de4c9579b3a2.tar.gz Kamon-2caece9ef7574406c548b4a1f333de4c9579b3a2.tar.bz2 Kamon-2caece9ef7574406c548b4a1f333de4c9579b3a2.zip |
Initial Commit Kamon
Diffstat (limited to 'src/main/scala/kamon/executor')
-rw-r--r-- | src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala | 63 | ||||
-rw-r--r-- | src/main/scala/kamon/executor/eventbus.scala | 82 |
2 files changed, 145 insertions, 0 deletions
diff --git a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala new file mode 100644 index 00000000..62f90da8 --- /dev/null +++ b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala @@ -0,0 +1,63 @@ +package kamon.executor + +import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites} +import com.typesafe.config.Config +import scala.concurrent.forkjoin.ForkJoinPool +import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService} +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool +import java.util + +class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ForkJoinExecutorConfigurator(config, prerequisites) { + + println("Created the instrumented executor") + + + class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) + extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) { + + + override def createExecutorService: ExecutorService = { + super.createExecutorService match { + case fjp: AkkaForkJoinPool => new WrappedPool(fjp) + case other => other + } + } + } + +} + +case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long) + +class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService { + + + def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount) + + def shutdown = fjp.shutdown() + + def shutdownNow(): util.List[Runnable] = fjp.shutdownNow() + + def isShutdown: Boolean = fjp.isShutdown + + def isTerminated: Boolean = fjp.isTerminated + + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit) + + def submit[T](task: Callable[T]): Future[T] = fjp.submit(task) + + def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result) + + def submit(task: Runnable): Future[_] = fjp.submit(task) + + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks) + + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit) + + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks) + + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks) + + def execute(command: Runnable) = fjp.execute(command) +} + diff --git a/src/main/scala/kamon/executor/eventbus.scala b/src/main/scala/kamon/executor/eventbus.scala new file mode 100644 index 00000000..d64ff444 --- /dev/null +++ b/src/main/scala/kamon/executor/eventbus.scala @@ -0,0 +1,82 @@ +package kamon.executor + +import akka.event.ActorEventBus +import akka.event.LookupClassification +import akka.actor.{ActorRef, ActorSystem, Props, Actor} +import java.util.concurrent.TimeUnit +import kamon.metric.NewRelicReporter + +import com.yammer.metrics.core.{MetricName, MetricsRegistry} +import com.yammer.metrics.reporting.ConsoleReporter + + +trait Message + +case class PostMessage(text:String) extends Message + +case class MessageEvent(val channel:String, val message:Message) + +class AppActorEventBus extends ActorEventBus with LookupClassification{ + type Event = MessageEvent + type Classifier=String + protected def mapSize(): Int={ + 10 + } + + protected def classify(event: Event): Classifier={ + event.channel + } + + protected def publish(event: Event, subscriber: Subscriber): Unit={ + subscriber ! event + } +} + +object TryAkka extends App{ + val system = ActorSystem("MySystem") + val appActorEventBus=new AppActorEventBus + val NEW_POST_CHANNEL="/posts/new" + val subscriber = system.actorOf(Props(new Actor { + def receive = { + case d: MessageEvent => println(d) + } + })) + + + + + case class Ping() + case class Pong() + + class PingActor(val target: ActorRef) extends Actor { + def receive = { + case Pong() => target ! Ping() + } + } + + class PongActor extends Actor { + var i = 0 + def receive = { + case Ping() => { + i=i+1 + sender ! Pong() + } + } + } + + + /* + val newRelicReporter = new NewRelicReporter(registry) + newRelicReporter.start(1, TimeUnit.SECONDS) + +*/ + + for(i <- 1 to 8) { + val ping = system.actorOf(Props(new PingActor(system.actorOf(Props[PongActor], s"ping-actor-${i}"))), s"pong-actor-${i}") + ping ! Pong() + } + + +/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) + appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ +}
\ No newline at end of file |