1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
package kamon.executor
import akka.dispatch.{ExecutorServiceFactory, ForkJoinExecutorConfigurator, DispatcherPrerequisites}
import com.typesafe.config.Config
import scala.concurrent.forkjoin.ForkJoinPool
import java.util.concurrent.{Future, TimeUnit, Callable, ExecutorService}
import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool
import java.util
class InstrumentedExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends ForkJoinExecutorConfigurator(config, prerequisites) {
println("Created the instrumented executor")
class InstrumentedExecutorServiceFactory(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int)
extends ForkJoinExecutorServiceFactory(threadFactory, parallelism) {
override def createExecutorService: ExecutorService = {
super.createExecutorService match {
case fjp: AkkaForkJoinPool => new WrappedPool(fjp)
case other => other
}
}
}
}
case class ForkJoinPoolMetrics(activeThreads: Int, queueSize: Long)
class WrappedPool(val fjp: AkkaForkJoinPool) extends ExecutorService {
def metrics = ForkJoinPoolMetrics(fjp.getActiveThreadCount(), fjp.getQueuedTaskCount)
def shutdown = fjp.shutdown()
def shutdownNow(): util.List[Runnable] = fjp.shutdownNow()
def isShutdown: Boolean = fjp.isShutdown
def isTerminated: Boolean = fjp.isTerminated
def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = fjp.awaitTermination(timeout, unit)
def submit[T](task: Callable[T]): Future[T] = fjp.submit(task)
def submit[T](task: Runnable, result: T): Future[T] = fjp.submit(task, result)
def submit(task: Runnable): Future[_] = fjp.submit(task)
def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = fjp.invokeAll(tasks)
def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = fjp.invokeAll(tasks, timeout, unit)
def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = fjp.invokeAny(tasks)
def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = fjp.invokeAny(tasks)
def execute(command: Runnable) = fjp.execute(command)
}
|