summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-08-03 15:56:44 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-08-03 15:56:44 +0000
commitac89702827559cab8835edecb35cc09a1ca3fe10 (patch)
tree7d8b01c12aeb8f42192ac68cce3289b3a4078310 /src
parentcf7a2f64f1357dcfa8ecf78ae8f29880c9fab214 (diff)
downloadscala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.gz
scala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.bz2
scala-ac89702827559cab8835edecb35cc09a1ca3fe10.zip
Added the scala.concurrent.TaskRunner and scala...
Added the scala.concurrent.TaskRunner and scala.concurrent.AsyncInvokable abstractions with corresponding refactorings in scala.actors and scala.concurrent.
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/Actor.scala4
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala6
-rw-r--r--src/actors/scala/actors/IScheduler.scala10
-rw-r--r--src/actors/scala/actors/ManagedBlocker.scala20
-rw-r--r--src/actors/scala/actors/Replyable.scala15
-rw-r--r--src/actors/scala/actors/ReplyableActor.scala1
-rw-r--r--src/actors/scala/actors/ReplyableReactor.scala2
-rw-r--r--src/actors/scala/actors/Scheduler.scala7
-rw-r--r--src/actors/scala/actors/SchedulerAdapter.scala7
-rw-r--r--src/actors/scala/actors/remote/Proxy.scala2
-rw-r--r--src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala4
-rw-r--r--src/actors/scala/actors/scheduler/DelegatingScheduler.scala4
-rw-r--r--src/actors/scala/actors/scheduler/ExecutorScheduler.scala27
-rw-r--r--src/actors/scala/actors/scheduler/ForkJoinScheduler.scala9
-rw-r--r--src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala7
-rw-r--r--src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala5
-rw-r--r--src/compiler/scala/tools/nsc/InterpreterLoop.scala2
-rw-r--r--src/library/scala/concurrent/AsyncInvokable.scala13
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala5
-rw-r--r--src/library/scala/concurrent/JavaConversions.scala44
-rw-r--r--src/library/scala/concurrent/ManagedBlocker.scala24
-rw-r--r--src/library/scala/concurrent/TaskRunner.scala29
-rw-r--r--src/library/scala/concurrent/TaskRunners.scala27
-rw-r--r--src/library/scala/concurrent/ThreadPoolRunner.scala44
-rw-r--r--src/library/scala/concurrent/ThreadRunner.scala33
-rw-r--r--src/library/scala/concurrent/jolib.scala3
-rw-r--r--src/library/scala/concurrent/ops.scala16
-rw-r--r--src/library/scala/concurrent/pilib.scala3
28 files changed, 285 insertions, 88 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index f9e79ce98f..aa65f5a6ac 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -643,7 +643,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
scheduler executeFromActor task
}
- private class ActorBlocker(timeout: Long) extends ManagedBlocker {
+ private class ActorBlocker(timeout: Long) extends scala.concurrent.ManagedBlocker {
def block() = {
if (timeout > 0)
Actor.this.suspendActorFor(timeout)
@@ -651,7 +651,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
Actor.this.suspendActor()
true
}
- def isReleasable() =
+ def isReleasable =
!Actor.this.isSuspended
}
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index e789e96be8..b12ef2d925 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -121,9 +121,6 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean)
def execute(task: Runnable): Unit =
executor execute task
- def executeFromActor(task: Runnable) =
- execute(task)
-
def execute(fun: => Unit): Unit =
executor.execute(new Runnable {
def run() { fun }
@@ -142,4 +139,7 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean)
def isActive = !terminating && !suspending
+ def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
+ blocker.block()
+ }
}
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index 8f3b243b71..f4034fbfab 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -16,8 +16,7 @@ package scala.actors
*
* Subclasses of <code>Actor</code> that override its
* <code>scheduler</code> member must provide
- * an implementation of the <code>IScheduler</code>
- * trait.
+ * an <code>IScheduler</code> implementation.
*
* @author Philipp Haller
*/
@@ -35,7 +34,8 @@ trait IScheduler {
*/
def execute(task: Runnable): Unit
- def executeFromActor(task: Runnable): Unit
+ def executeFromActor(task: Runnable): Unit =
+ execute(task)
/** Shuts down the scheduler.
*/
@@ -66,7 +66,5 @@ trait IScheduler {
*/
def onTerminate(a: Reactor)(f: => Unit): Unit
- def managedBlock(blocker: ManagedBlocker) {
- blocker.block()
- }
+ def managedBlock(blocker: scala.concurrent.ManagedBlocker): Unit
}
diff --git a/src/actors/scala/actors/ManagedBlocker.scala b/src/actors/scala/actors/ManagedBlocker.scala
deleted file mode 100644
index f3fd08301b..0000000000
--- a/src/actors/scala/actors/ManagedBlocker.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-// $Id$
-
-package scala.actors
-
-import forkjoin.ForkJoinPool
-
-/**
- * The <code>ManagedBlocker</code> trait...
- *
- * @author Philipp Haller
- */
-trait ManagedBlocker extends ForkJoinPool.ManagedBlocker
diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala
index 99c65d29f3..39d87241a6 100644
--- a/src/actors/scala/actors/Replyable.scala
+++ b/src/actors/scala/actors/Replyable.scala
@@ -16,8 +16,11 @@ package scala.actors
*
* @author Philipp Haller
*/
-trait Replyable[-T, +R] {
-
+trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] {
+/*
+ def apply(msg: T): this.Future[R] =
+ this !! msg
+*/
/**
* Sends <code>msg</code> to this Replyable and awaits reply
* (synchronous).
@@ -45,8 +48,8 @@ trait Replyable[-T, +R] {
* @param msg the message to be sent
* @return the future
*/
- def !!(msg: T): (() => R) =
- () => this !? msg
+ def !!(msg: T): this.Future[R]
+// () => this !? msg
/**
* Sends <code>msg</code> to this actor and immediately
@@ -59,7 +62,7 @@ trait Replyable[-T, +R] {
* @param f the function to be applied to the response
* @return the future
*/
- def !![P](msg: T, f: PartialFunction[R, P]): (() => P) =
- () => f(this !? msg)
+ def !![P](msg: T, f: PartialFunction[R, P]): this.Future[P]
+// () => f(this !? msg)
}
diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala
index 1e1487bf39..8ba4f13842 100644
--- a/src/actors/scala/actors/ReplyableActor.scala
+++ b/src/actors/scala/actors/ReplyableActor.scala
@@ -84,6 +84,7 @@ trait ReplyableActor extends ReplyableReactor {
override def !!(msg: Any): Future[Any] = {
val ftch = new Channel[Any](Actor.self(thiz.scheduler))
val linkedChannel = new AbstractActor {
+ type Future[+R] = scala.actors.Future[R]
def !(msg: Any) =
ftch ! msg
def send(msg: Any, replyTo: OutputChannel[Any]) =
diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala
index 84168abe0a..6752fbb7b4 100644
--- a/src/actors/scala/actors/ReplyableReactor.scala
+++ b/src/actors/scala/actors/ReplyableReactor.scala
@@ -20,6 +20,8 @@ package scala.actors
trait ReplyableReactor extends Replyable[Any, Any] {
thiz: ReplyReactor =>
+ type Future[+S] = scala.actors.Future[S]
+
/**
* Sends <code>msg</code> to this actor and awaits reply
* (synchronous).
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index cb764e963b..894dbc93e8 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -24,12 +24,13 @@ object Scheduler extends DelegatingScheduler {
Debug.info("initializing "+this+"...")
def makeNewScheduler: IScheduler = {
- val workQueue = new LinkedBlockingQueue[Runnable](100000)
+ val workQueue = new LinkedBlockingQueue[Runnable]
val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
ThreadPoolConfig.maxPoolSize,
- 50L,
+ 60000L,
TimeUnit.MILLISECONDS,
- workQueue)
+ workQueue,
+ new ThreadPoolExecutor.CallerRunsPolicy)
val s = new ThreadPoolScheduler(threadPool, true)
//val s = new ForkJoinScheduler
Debug.info(this+": starting new "+s+" ["+s.getClass+"]")
diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala
index 6355ee1ace..ba7822c372 100644
--- a/src/actors/scala/actors/SchedulerAdapter.scala
+++ b/src/actors/scala/actors/SchedulerAdapter.scala
@@ -28,9 +28,6 @@ trait SchedulerAdapter extends IScheduler {
def execute(task: Runnable): Unit =
execute { task.run() }
- def executeFromActor(task: Runnable): Unit =
- execute(task)
-
/** Shuts down the scheduler.
*/
def shutdown(): Unit =
@@ -64,4 +61,8 @@ trait SchedulerAdapter extends IScheduler {
*/
def onTerminate(a: Reactor)(f: => Unit) =
Scheduler.onTerminate(a)(f)
+
+ def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
+ blocker.block()
+ }
}
diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala
index d99a8d3c1c..60bcd34221 100644
--- a/src/actors/scala/actors/remote/Proxy.scala
+++ b/src/actors/scala/actors/remote/Proxy.scala
@@ -20,6 +20,8 @@ import scala.collection.mutable.HashMap
private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor {
import java.io.{IOException, ObjectOutputStream, ObjectInputStream}
+ type Future[+R] = scala.actors.Future[R]
+
@transient
private[remote] var del: Actor = null
startDelegate()
diff --git a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala
index 88ccb6dce8..4a8f9d034d 100644
--- a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala
+++ b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala
@@ -42,12 +42,12 @@ class DefaultExecutorScheduler(daemon: Boolean)
private val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
ThreadPoolConfig.maxPoolSize,
- 50L,
+ 60000L,
TimeUnit.MILLISECONDS,
workQueue,
threadFactory)
val executor = threadPool
- override val CHECK_FREQ = 50
+ override val CHECK_FREQ = 10
}
diff --git a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
index 4bd2d09bd0..e72acd09d8 100644
--- a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
@@ -9,6 +9,8 @@
package scala.actors
package scheduler
+import scala.concurrent.ManagedBlocker
+
/**
* @author Erik Engbrecht
*/
@@ -37,7 +39,7 @@ trait DelegatingScheduler extends IScheduler {
def execute(task: Runnable) = impl.execute(task)
- def executeFromActor(task: Runnable) = impl.executeFromActor(task)
+ override def executeFromActor(task: Runnable) = impl.executeFromActor(task)
def shutdown(): Unit = synchronized {
if (sched ne null) {
diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
index 8e032242f6..07a014f92d 100644
--- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
@@ -11,7 +11,7 @@
package scala.actors
package scheduler
-import java.util.concurrent.{ExecutorService, RejectedExecutionException}
+import scala.concurrent.ThreadPoolRunner
/**
* The <code>ExecutorScheduler</code> class uses an
@@ -19,29 +19,9 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException}
*
* @author Philipp Haller
*/
-trait ExecutorScheduler extends IScheduler {
+trait ExecutorScheduler extends IScheduler with ThreadPoolRunner[Unit] {
- protected def executor: ExecutorService
-
- /** Submits a <code>Runnable</code> for execution.
- *
- * @param task the task to be executed
- */
- def execute(task: Runnable) {
- try {
- executor execute task
- } catch {
- case ree: RejectedExecutionException =>
- // run task on current thread
- task.run()
- }
- }
-
- def executeFromActor(task: Runnable) =
- execute(task)
-
- /** This method is called when the <code>SchedulerService</code>
- * shuts down.
+ /** This method is called when the scheduler shuts down.
*/
def onShutdown(): Unit =
executor.shutdown()
@@ -51,4 +31,5 @@ trait ExecutorScheduler extends IScheduler {
*/
def isActive =
(executor ne null) && !executor.isShutdown
+
}
diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
index 5d6d676b1a..82e8f5c2fd 100644
--- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
@@ -94,7 +94,7 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
pool.execute(task)
}
- def executeFromActor(task: Runnable) {
+ override def executeFromActor(task: Runnable) {
val recAction = new RecursiveAction {
def compute() = task.run()
}
@@ -110,8 +110,11 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
def run() { fun }
})
- override def managedBlock(blocker: ManagedBlocker) {
- ForkJoinPool.managedBlock(blocker, true)
+ override def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
+ def block = blocker.block()
+ def isReleasable() = blocker.isReleasable
+ }, true)
}
/** Shuts down the scheduler.
diff --git a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala
index c7e588f2ed..4ad865a15d 100644
--- a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala
+++ b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala
@@ -24,9 +24,6 @@ class SingleThreadedScheduler extends IScheduler {
task.run()
}
- def executeFromActor(task: Runnable) =
- execute(task)
-
def execute(fun: => Unit): Unit =
execute(new Runnable {
def run() { fun }
@@ -39,4 +36,8 @@ class SingleThreadedScheduler extends IScheduler {
def onTerminate(actor: Reactor)(f: => Unit) {}
def isActive = true
+
+ def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
+ blocker.block()
+ }
}
diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
index 4826d44383..fd8a0e6a64 100644
--- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
@@ -12,6 +12,7 @@ package scala.actors
package scheduler
import java.util.concurrent.ThreadPoolExecutor
+import scala.concurrent.ManagedBlocker
/**
* The <code>ThreadPoolScheduler</code> class uses an
@@ -44,7 +45,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
override def managedBlock(blocker: ManagedBlocker) {
val coreSize = executor.getCorePoolSize()
- if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) {
+ if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) {
executor.setCorePoolSize(coreSize + 1)
}
blocker.block()
@@ -67,7 +68,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
throw new QuitException
val coreSize = executor.getCorePoolSize()
- if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) {
+ if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) {
executor.setCorePoolSize(coreSize + 1)
}
}
diff --git a/src/compiler/scala/tools/nsc/InterpreterLoop.scala b/src/compiler/scala/tools/nsc/InterpreterLoop.scala
index 9d4e734a40..69db5c98b2 100644
--- a/src/compiler/scala/tools/nsc/InterpreterLoop.scala
+++ b/src/compiler/scala/tools/nsc/InterpreterLoop.scala
@@ -55,6 +55,8 @@ object InterpreterControl {
}
import InterpreterControl._
+import scala.concurrent.ops.defaultRunner
+
/** The
* <a href="http://scala-lang.org/" target="_top">Scala</a>
* interactive shell. It provides a read-eval-print loop around
diff --git a/src/library/scala/concurrent/AsyncInvokable.scala b/src/library/scala/concurrent/AsyncInvokable.scala
new file mode 100644
index 0000000000..ae84042689
--- /dev/null
+++ b/src/library/scala/concurrent/AsyncInvokable.scala
@@ -0,0 +1,13 @@
+package scala.concurrent
+
+/** The <code>AsyncInvokable</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait AsyncInvokable[-T, +R] {
+
+ type Future[+S] <: () => S
+
+ def !!(task: T): Future[R]
+
+}
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index 0fa3c1660b..63477b4b3c 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -11,6 +11,7 @@
package scala.concurrent
import annotation.experimental
+import ops._
/** A <code>DelayedLazyVal</code> is a wrapper for lengthy
* computations which have a valid partially computed result.
@@ -37,8 +38,8 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
*/
def apply(): T = if (isDone) complete else f()
- ops.future {
+ future {
body
isDone = true
}
-} \ No newline at end of file
+}
diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala
new file mode 100644
index 0000000000..9fde489ced
--- /dev/null
+++ b/src/library/scala/concurrent/JavaConversions.scala
@@ -0,0 +1,44 @@
+package scala.concurrent
+
+import java.util.concurrent.{ExecutorService, Executor}
+
+/** The <code>JavaConversions</code> object...
+ *
+ * @author Philipp Haller
+ */
+object JavaConversions {
+
+ implicit def asTaskRunner(exec: ExecutorService): TaskRunner[Unit] =
+ new ThreadPoolRunner[Unit] {
+ override protected def executor =
+ exec
+
+ def shutdown() =
+ exec.shutdown()
+ }
+
+ implicit def asTaskRunner(exec: Executor): TaskRunner[Unit] =
+ new TaskRunner[Unit] {
+ type Future[+R] = () => R
+
+ def submit(task: () => Unit): this.Future[Unit] = {
+ val result = new SyncVar[Either[Unit, Throwable]]
+ val runnable = new Runnable {
+ def run() { result set tryCatch(task()) }
+ }
+ exec.execute(runnable)
+ () => result.get match {
+ case Left(a) => a
+ case Right(t) => throw t
+ }
+ }
+
+ def managedBlock(blocker: ManagedBlocker) {
+ blocker.block()
+ }
+
+ def shutdown() {
+ // do nothing
+ }
+ }
+}
diff --git a/src/library/scala/concurrent/ManagedBlocker.scala b/src/library/scala/concurrent/ManagedBlocker.scala
new file mode 100644
index 0000000000..c77f97285e
--- /dev/null
+++ b/src/library/scala/concurrent/ManagedBlocker.scala
@@ -0,0 +1,24 @@
+package scala.concurrent
+
+/** The <code>ManagedBlocker</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait ManagedBlocker {
+
+ /**
+ * Possibly blocks the current thread, for example waiting for
+ * a lock or condition.
+ * @return true if no additional blocking is necessary (i.e.,
+ * if isReleasable would return true).
+ * @throws InterruptedException if interrupted while waiting
+ * (the method is not required to do so, but is allowed to).
+ */
+ def block(): Boolean
+
+ /**
+ * Returns true if blocking is unnecessary.
+ */
+ def isReleasable: Boolean
+
+}
diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala
new file mode 100644
index 0000000000..d29e8ff12f
--- /dev/null
+++ b/src/library/scala/concurrent/TaskRunner.scala
@@ -0,0 +1,29 @@
+package scala.concurrent
+
+/** The <code>TaskRunner</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait TaskRunner[T] extends AsyncInvokable[() => T, T] {
+
+ def submit(task: () => T): Future[T]
+
+ def shutdown(): Unit
+
+ def !!(task: () => T): Future[T] =
+ submit(task)
+
+ def managedBlock(blocker: ManagedBlocker): Unit
+
+ /** If expression computed successfully return it in <code>Left</code>,
+ * otherwise return exception in <code>Right</code>.
+ */
+ protected def tryCatch[A](left: => A): Either[A, Exception] = {
+ try {
+ Left(left)
+ } catch {
+ case e: Exception => Right(e)
+ }
+ }
+
+}
diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala
new file mode 100644
index 0000000000..8219d9d169
--- /dev/null
+++ b/src/library/scala/concurrent/TaskRunners.scala
@@ -0,0 +1,27 @@
+package scala.concurrent
+
+import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit}
+
+/** The <code>TaskRunners</code> object...
+ *
+ * @author Philipp Haller
+ */
+object TaskRunners {
+
+ implicit val threadRunner: TaskRunner[Unit] =
+ new ThreadRunner[Unit]
+
+ implicit val threadPoolRunner: TaskRunner[Unit] = {
+ val numCores = Runtime.getRuntime().availableProcessors()
+ val keepAliveTime = 60000L
+ val workQueue = new LinkedBlockingQueue[Runnable]
+ val exec = new ThreadPoolExecutor(numCores,
+ numCores,
+ keepAliveTime,
+ TimeUnit.MILLISECONDS,
+ workQueue,
+ new ThreadPoolExecutor.CallerRunsPolicy)
+ JavaConversions.asTaskRunner(exec)
+ }
+
+}
diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala
new file mode 100644
index 0000000000..cbc5ebb293
--- /dev/null
+++ b/src/library/scala/concurrent/ThreadPoolRunner.scala
@@ -0,0 +1,44 @@
+package scala.concurrent
+
+import java.util.concurrent.{ExecutorService, Callable, TimeUnit}
+
+import scala.annotation.unchecked.uncheckedVariance
+
+/** The <code>ThreadPoolRunner</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait ThreadPoolRunner[T] extends TaskRunner[T] {
+
+ type Future[+R] = RichFuture[R]
+
+ trait RichFuture[+S] extends java.util.concurrent.Future[S @uncheckedVariance]
+ with (() => S)
+
+ protected def executor: ExecutorService
+
+ def submit(task: () => T): this.Future[T] = {
+ val callable = new Callable[T] {
+ def call() = task()
+ }
+ toRichFuture(executor.submit[T](callable))
+ }
+
+ def execute(task: Runnable): Unit =
+ executor execute task
+
+ def managedBlock(blocker: ManagedBlocker) {
+ blocker.block()
+ }
+
+ private def toRichFuture[S](future: java.util.concurrent.Future[S]) =
+ new RichFuture[S] {
+ def cancel(mayInterrupt: Boolean) = future cancel mayInterrupt
+ def get() = future.get()
+ def get(timeout: Long, unit: TimeUnit) = future.get(timeout, unit)
+ def isCancelled() = future.isCancelled()
+ def isDone() = future.isDone()
+ def apply() = future.get()
+ }
+
+}
diff --git a/src/library/scala/concurrent/ThreadRunner.scala b/src/library/scala/concurrent/ThreadRunner.scala
new file mode 100644
index 0000000000..7fb653a326
--- /dev/null
+++ b/src/library/scala/concurrent/ThreadRunner.scala
@@ -0,0 +1,33 @@
+package scala.concurrent
+
+import java.lang.Thread
+
+/** The <code>ThreadRunner</code> trait...
+ *
+ * @author Philipp Haller
+ */
+class ThreadRunner[T] extends TaskRunner[T] {
+
+ type Future[+S] = () => S
+
+ def submit(task: () => T): this.Future[T] = {
+ val result = new SyncVar[Either[T, Exception]]
+ val runnable = new Runnable {
+ def run() { result set tryCatch(task()) }
+ }
+ (new Thread(runnable)).start()
+ () => result.get match {
+ case Left(a) => a
+ case Right(t) => throw t
+ }
+ }
+
+ def managedBlock(blocker: ManagedBlocker) {
+ blocker.block()
+ }
+
+ def shutdown() {
+ // do nothing
+ }
+
+}
diff --git a/src/library/scala/concurrent/jolib.scala b/src/library/scala/concurrent/jolib.scala
index 615996695b..d53f90f744 100644
--- a/src/library/scala/concurrent/jolib.scala
+++ b/src/library/scala/concurrent/jolib.scala
@@ -11,6 +11,7 @@
package scala.concurrent
+import ops._
/**
* Library for using join-calculus concurrent primitives in Scala.
@@ -44,7 +45,7 @@ package scala.concurrent
case None => () => ()
case Some((p, r)) => {
val args = values(p)
- () => concurrent.ops.spawn(r(args))
+ () => spawn(r(args))
}
}
diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala
index 939ed6f575..f3be1475a7 100644
--- a/src/library/scala/concurrent/ops.scala
+++ b/src/library/scala/concurrent/ops.scala
@@ -16,10 +16,13 @@ import java.lang.Thread
/** The object <code>ops</code> ...
*
- * @author Martin Odersky, Stepan Koltsov
- * @version 1.0, 12/03/2003
+ * @author Martin Odersky, Stepan Koltsov, Philipp Haller
*/
object ops {
+
+ implicit val defaultRunner: TaskRunner[Unit] =
+ TaskRunners.threadRunner
+
/**
* If expression computed successfully return it in <code>Left</code>,
* otherwise return exception in <code>Right</code>.
@@ -36,18 +39,17 @@ object ops {
*
* @param p the expression to evaluate
*/
- def spawn(p: => Unit) = {
- val t = new Thread() { override def run() = p }
- t.start()
+ def spawn(p: => Unit)(implicit runner: TaskRunner[Unit]): Unit = {
+ runner submit (() => p)
}
/**
* @param p ...
* @return ...
*/
- def future[A](p: => A): () => A = {
+ def future[A](p: => A)(implicit runner: TaskRunner[Unit]): () => A = {
val result = new SyncVar[Either[A, Throwable]]
- spawn { result set tryCatch(p) }
+ spawn({ result set tryCatch(p) })(runner)
() => result.get match {
case Left(a) => a
case Right(t) => throw t
diff --git a/src/library/scala/concurrent/pilib.scala b/src/library/scala/concurrent/pilib.scala
index a510f41055..246f7e2c54 100644
--- a/src/library/scala/concurrent/pilib.scala
+++ b/src/library/scala/concurrent/pilib.scala
@@ -11,7 +11,6 @@
package scala.concurrent
-
/** <p>
* Library for using Pi-calculus concurrent primitives in
* <a href="http://scala-lang.org/" target="_top">Scala</a>. As an
@@ -33,6 +32,8 @@ package scala.concurrent
*/
object pilib {
+ import TaskRunners.threadRunner
+
//////////////////////////////// SPAWN /////////////////////////////////
/**