summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
blob: 9a03b1d5dfec9f4c7df40463459bfa5ea6c35321 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2013, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala.concurrent.impl



import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor }
import java.util.Collection
import scala.concurrent.forkjoin._
import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService }
import scala.util.control.NonFatal



private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: Throwable => Unit) extends ExecutionContextExecutor {

  val executor: Either[ForkJoinPool, Executor] = es match {
    case null => createExecutor
    case some => Right(some)
  }

  // Implement BlockContext on FJP threads
  class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { 
    def wire[T <: Thread](thread: T): T = {
      thread.setDaemon(daemonic)
      //Potentially set things like uncaught exception handler, name etc
      thread
    }

    def newThread(runnable: Runnable): Thread = wire(new Thread(runnable))

    def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext {
      override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
        var result: T = null.asInstanceOf[T]
        ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
          @volatile var isdone = false
          override def block(): Boolean = {
            result = try thunk finally { isdone = true }
            true
          }
          override def isReleasable = isdone
        }, true)
        result
      }
    })
  }

  def createExecutor: Either[ForkJoinPool, Executor] = {

    def getInt(name: String, f: String => Int): Int =
        try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors }
    def range(floor: Int, desired: Int, ceiling: Int): Int =
      if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling)

    val desiredParallelism = range(
      getInt("scala.concurrent.context.minThreads", _.toInt),
      getInt("scala.concurrent.context.numThreads", {
        case null | "" => Runtime.getRuntime.availableProcessors
        case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt
        case other => other.toInt
      }),
      getInt("scala.concurrent.context.maxThreads", _.toInt))

    val threadFactory = new DefaultThreadFactory(daemonic = true)

    try {
      val pool = new ForkJoinPool(
        desiredParallelism,
        threadFactory)
      pool.setAsyncMode(true)
      Left(pool)
    } catch {
      case NonFatal(t) =>
        System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to ThreadPoolExecutor")
        t.printStackTrace(System.err)
        val exec = new ThreadPoolExecutor(
          desiredParallelism,
          desiredParallelism,
          5L,
          TimeUnit.MINUTES,
          new LinkedBlockingQueue[Runnable],
          threadFactory
        )
        exec.allowCoreThreadTimeOut(true)
        Right(exec)
    }
  }

  def execute(runnable: Runnable): Unit = executor match {
    case Left(fj) =>
      Thread.currentThread match {
        case fjw: ForkJoinWorkerThread if fjw.getPool eq fj =>
          (runnable match {
            case fjt: ForkJoinTask[_] => fjt
            case _                    => new AdaptedRunnableAction(runnable)
          }).fork
        case _ => fj.execute(runnable)
      }
    case Right(generic) => generic execute runnable
  }

  def reportFailure(t: Throwable) = reporter(t)
}


private[concurrent] object ExecutionContextImpl {

  def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)

  def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
    new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {
      final def asExecutorService: ExecutorService = executor.right.asInstanceOf[ExecutorService]
      override def execute(command: Runnable) = executor.right.get.execute(command)
      override def shutdown() { asExecutorService.shutdown() }
      override def shutdownNow() = asExecutorService.shutdownNow()
      override def isShutdown = asExecutorService.isShutdown
      override def isTerminated = asExecutorService.isTerminated
      override def awaitTermination(l: Long, timeUnit: TimeUnit) = asExecutorService.awaitTermination(l, timeUnit)
      override def submit[T](callable: Callable[T]) = asExecutorService.submit(callable)
      override def submit[T](runnable: Runnable, t: T) = asExecutorService.submit(runnable, t)
      override def submit(runnable: Runnable) = asExecutorService.submit(runnable)
      override def invokeAll[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAll(callables)
      override def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAll(callables, l, timeUnit)
      override def invokeAny[T](callables: Collection[_ <: Callable[T]]) = asExecutorService.invokeAny(callables)
      override def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = asExecutorService.invokeAny(callables, l, timeUnit)
    }
}