aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala
diff options
context:
space:
mode:
authorDiego Parra <dparra@despegar.com>2013-05-03 17:06:10 -0300
committerDiego Parra <dparra@despegar.com>2013-05-03 17:06:10 -0300
commit2caece9ef7574406c548b4a1f333de4c9579b3a2 (patch)
tree9536c6b95fac7dca34a5c2dae27d931b88694632 /src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala
parentfc8a371548d9de3c5e0719dcfcf041ca31bc2227 (diff)
downloadKamon-2caece9ef7574406c548b4a1f333de4c9579b3a2.tar.gz
Kamon-2caece9ef7574406c548b4a1f333de4c9579b3a2.tar.bz2
Kamon-2caece9ef7574406c548b4a1f333de4c9579b3a2.zip
Initial Commit Kamon
Diffstat (limited to 'src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala')
-rw-r--r--src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala63
1 files changed, 63 insertions, 0 deletions
diff --git a/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala
new file mode 100644
index 00000000..62f90da8
--- /dev/null
+++ b/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala
@@ -0,0 +1,63 @@
+package kamon.executor
+
+import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites}
+import com.typesafe.config.Config
+import scala.concurrent.forkjoin.ForkJoinPool
+import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService}
+import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
+import java.util
+
+class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
+ extends ForkJoinExecutorConfigurator(config, prerequisites) {
+
+ println("Created the instrumented executor")
+
+
+ class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int)
+ extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) {
+
+
+ override def createExecutorService: ExecutorService = {
+ super.createExecutorService match {
+ case fjp: AkkaForkJoinPool => new WrappedPool(fjp)
+ case other => other
+ }
+ }
+ }
+
+}
+
+case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long)
+
+class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService {
+
+
+ def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount)
+
+ def shutdown = fjp.shutdown()
+
+ def shutdownNow(): util.List[Runnable] = fjp.shutdownNow()
+
+ def isShutdown: Boolean = fjp.isShutdown
+
+ def isTerminated: Boolean = fjp.isTerminated
+
+ def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit)
+
+ def submit[T](task: Callable[T]): Future[T] = fjp.submit(task)
+
+ def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result)
+
+ def submit(task: Runnable): Future[_] = fjp.submit(task)
+
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks)
+
+ def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit)
+
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks)
+
+ def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks)
+
+ def execute(command: Runnable) = fjp.execute(command)
+}
+