From 2caece9ef7574406c548b4a1f333de4c9579b3a2 Mon Sep 17 00:00:00 2001 From: Diego Parra Date: Fri, 3 May 2013 17:06:10 -0300 Subject: Initial Commit Kamon --- .../InstrumentedExecutorServiceConfigurator.scala | 63 ++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala (limited to 'src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala') diff --git a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala new file mode 100644 index 00000000..62f90da8 --- /dev/null +++ b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala @@ -0,0 +1,63 @@ +package kamon.executor + +import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites} +import com.typesafe.config.Config +import scala.concurrent.forkjoin.ForkJoinPool +import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService} +import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool +import java.util + +class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ForkJoinExecutorConfigurator(config, prerequisites) { + + println("Created the instrumented executor") + + + class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) + extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) { + + + override def createExecutorService: ExecutorService = { + super.createExecutorService match { + case fjp: AkkaForkJoinPool => new WrappedPool(fjp) + case other => other + } + } + } + +} + +case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long) + +class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService { + + + def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount) + + def shutdown = fjp.shutdown() + + def shutdownNow(): util.List[Runnable] = fjp.shutdownNow() + + def isShutdown: Boolean = fjp.isShutdown + + def isTerminated: Boolean = fjp.isTerminated + + def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit) + + def submit[T](task: Callable[T]): Future[T] = fjp.submit(task) + + def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result) + + def submit(task: Runnable): Future[_] = fjp.submit(task) + + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks) + + def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit) + + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks) + + def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks) + + def execute(command: Runnable) = fjp.execute(command) +} + -- cgit v1.2.3