aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/kamon/executor/InstrumentedExecutorServiceConfigurator.scala
blob: 62f90da8f3bfbfdd1e9bafa42357a936dc9d7852 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package kamon.executor

import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites}
import com.typesafe.config.Config
import scala.concurrent.forkjoin.ForkJoinPool
import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService}
import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
import java.util

class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
  extends ForkJoinExecutorConfigurator(config, prerequisites) {

  println("Created the instrumented executor")


  class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int)
    extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) {


    override def createExecutorService: ExecutorService = {
      super.createExecutorService match {
        case fjp: AkkaForkJoinPool => new WrappedPool(fjp)
        case other => other
      }
    }
  }

}

case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long)

class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService {


  def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount)

  def shutdown = fjp.shutdown()

  def shutdownNow(): util.List[Runnable] = fjp.shutdownNow()

  def isShutdown: Boolean = fjp.isShutdown

  def isTerminated: Boolean = fjp.isTerminated

  def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit)

  def submit[T](task: Callable[T]): Future[T] = fjp.submit(task)

  def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result)

  def submit(task: Runnable): Future[_] = fjp.submit(task)

  def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks)

  def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit)

  def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks)

  def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks)

  def execute(command: Runnable) = fjp.execute(command)
}