summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-09-11 15:50:39 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-09-11 15:50:39 +0000
commit514ff83e3983d81f8bf948abebbe5b9141d9690d (patch)
tree28acd359da510bdff60fa15d9917670446b931aa /src/library
parent61ff261346289f7886350a8a4da5688574070e59 (diff)
downloadscala-514ff83e3983d81f8bf948abebbe5b9141d9690d.tar.gz
scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.tar.bz2
scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.zip
Split TaskRunner into FutureTaskRunner and Task...
Split TaskRunner into FutureTaskRunner and TaskRunner. FutureTaskRunner has an abstract Future[T] type member and inherits an abstract Task[T] type member from TaskRunner. Implicit conversions enable tasks and futures to be treated as parameter-less functions. This allows TaskRunners to be used by actor schedulers without creating lots of wrapper objects.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/concurrent/FutureTaskRunner.scala17
-rw-r--r--src/library/scala/concurrent/JavaConversions.scala25
-rw-r--r--src/library/scala/concurrent/TaskRunner.scala11
-rw-r--r--src/library/scala/concurrent/TaskRunners.scala6
-rw-r--r--src/library/scala/concurrent/ThreadPoolRunner.scala31
-rw-r--r--src/library/scala/concurrent/ThreadRunner.scala19
-rw-r--r--src/library/scala/concurrent/ops.scala16
7 files changed, 80 insertions, 45 deletions
diff --git a/src/library/scala/concurrent/FutureTaskRunner.scala b/src/library/scala/concurrent/FutureTaskRunner.scala
new file mode 100644
index 0000000000..48ad0817a2
--- /dev/null
+++ b/src/library/scala/concurrent/FutureTaskRunner.scala
@@ -0,0 +1,17 @@
+package scala.concurrent
+
+/** The <code>FutureTaskRunner</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait FutureTaskRunner extends TaskRunner {
+
+ type Future[T]
+
+ implicit def futureAsFunction[S](x: Future[S]): () => S
+
+ def submit[S](task: Task[S]): Future[S]
+
+ def managedBlock(blocker: ManagedBlocker): Unit
+
+}
diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala
index 7ee9096127..7810cee669 100644
--- a/src/library/scala/concurrent/JavaConversions.scala
+++ b/src/library/scala/concurrent/JavaConversions.scala
@@ -8,8 +8,8 @@ import java.util.concurrent.{ExecutorService, Executor}
*/
object JavaConversions {
- implicit def asTaskRunner(exec: ExecutorService): TaskRunner[Unit] =
- new ThreadPoolRunner[Unit] {
+ implicit def asTaskRunner(exec: ExecutorService): FutureTaskRunner =
+ new ThreadPoolRunner {
override protected def executor =
exec
@@ -17,17 +17,16 @@ object JavaConversions {
exec.shutdown()
}
- implicit def asTaskRunner(exec: Executor): TaskRunner[Unit] =
- new TaskRunner[Unit] {
- type Future[+R] = () => R
-
- def submit(task: () => Unit): this.Future[Unit] = {
- val result = new SyncVar[Either[Throwable, Unit]]
- val runnable = new Runnable {
- def run() { result set tryCatch(task()) }
- }
- exec.execute(runnable)
- () => ops getOrThrow result.get
+ implicit def asTaskRunner(exec: Executor): TaskRunner =
+ new TaskRunner {
+ type Task[T] = Runnable
+
+ implicit def functionAsTask[T](fun: () => T): Task[T] = new Runnable {
+ def run() { fun() }
+ }
+
+ def execute[S](task: Task[S]) {
+ exec.execute(task)
}
def managedBlock(blocker: ManagedBlocker) {
diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala
index a393b065fa..e9b7c6916b 100644
--- a/src/library/scala/concurrent/TaskRunner.scala
+++ b/src/library/scala/concurrent/TaskRunner.scala
@@ -4,16 +4,15 @@ package scala.concurrent
*
* @author Philipp Haller
*/
-trait TaskRunner[T] extends AsyncInvokable[() => T, T] {
+trait TaskRunner {
- def submit(task: () => T): Future[T]
+ type Task[T]
- def shutdown(): Unit
+ implicit def functionAsTask[S](fun: () => S): Task[S]
- def !!(task: () => T): Future[T] =
- submit(task)
+ def execute[S](task: Task[S]): Unit
- def managedBlock(blocker: ManagedBlocker): Unit
+ def shutdown(): Unit
/** If expression computed successfully return it in <code>Right</code>,
* otherwise return exception in <code>Left</code>.
diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala
index 8219d9d169..cc5c1a5131 100644
--- a/src/library/scala/concurrent/TaskRunners.scala
+++ b/src/library/scala/concurrent/TaskRunners.scala
@@ -8,10 +8,10 @@ import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit}
*/
object TaskRunners {
- implicit val threadRunner: TaskRunner[Unit] =
- new ThreadRunner[Unit]
+ implicit val threadRunner: FutureTaskRunner =
+ new ThreadRunner
- implicit val threadPoolRunner: TaskRunner[Unit] = {
+ implicit val threadPoolRunner: FutureTaskRunner = {
val numCores = Runtime.getRuntime().availableProcessors()
val keepAliveTime = 60000L
val workQueue = new LinkedBlockingQueue[Runnable]
diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala
index cbc5ebb293..e532d2bfff 100644
--- a/src/library/scala/concurrent/ThreadPoolRunner.scala
+++ b/src/library/scala/concurrent/ThreadPoolRunner.scala
@@ -2,29 +2,36 @@ package scala.concurrent
import java.util.concurrent.{ExecutorService, Callable, TimeUnit}
-import scala.annotation.unchecked.uncheckedVariance
-
/** The <code>ThreadPoolRunner</code> trait...
*
* @author Philipp Haller
*/
-trait ThreadPoolRunner[T] extends TaskRunner[T] {
+trait ThreadPoolRunner extends FutureTaskRunner {
+
+ type Task[T] = Callable[T] with Runnable
+ type Future[T] = RichFuture[T]
+
+ private class RunCallable[S](fun: () => S) extends Runnable with Callable[S] {
+ def run() = fun()
+ def call() = fun()
+ }
+
+ implicit def functionAsTask[S](fun: () => S): Task[S] =
+ new RunCallable(fun)
- type Future[+R] = RichFuture[R]
+ implicit def futureAsFunction[S](x: Future[S]): () => S =
+ () => x.get()
- trait RichFuture[+S] extends java.util.concurrent.Future[S @uncheckedVariance]
- with (() => S)
+ trait RichFuture[S] extends java.util.concurrent.Future[S]
+ with (() => S)
protected def executor: ExecutorService
- def submit(task: () => T): this.Future[T] = {
- val callable = new Callable[T] {
- def call() = task()
- }
- toRichFuture(executor.submit[T](callable))
+ def submit[S](task: Task[S]): Future[S] = {
+ toRichFuture(executor.submit[S](task))
}
- def execute(task: Runnable): Unit =
+ def execute[S](task: Task[S]): Unit =
executor execute task
def managedBlock(blocker: ManagedBlocker) {
diff --git a/src/library/scala/concurrent/ThreadRunner.scala b/src/library/scala/concurrent/ThreadRunner.scala
index f48e0384b7..16269fa65f 100644
--- a/src/library/scala/concurrent/ThreadRunner.scala
+++ b/src/library/scala/concurrent/ThreadRunner.scala
@@ -6,12 +6,23 @@ import java.lang.Thread
*
* @author Philipp Haller
*/
-class ThreadRunner[T] extends TaskRunner[T] {
+class ThreadRunner extends FutureTaskRunner {
- type Future[+S] = () => S
+ type Task[T] = () => T
+ type Future[T] = () => T
- def submit(task: () => T): this.Future[T] = {
- val result = new SyncVar[Either[Exception, T]]
+ implicit def functionAsTask[S](fun: () => S): Task[S] = fun
+ implicit def futureAsFunction[S](x: Future[S]): () => S = x
+
+ def execute[S](task: Task[S]) {
+ val runnable = new Runnable {
+ def run() { tryCatch(task()) }
+ }
+ (new Thread(runnable)).start()
+ }
+
+ def submit[S](task: Task[S]): Future[S] = {
+ val result = new SyncVar[Either[Exception, S]]
val runnable = new Runnable {
def run() { result set tryCatch(task()) }
}
diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala
index e2fb8f0ceb..4ff26d4465 100644
--- a/src/library/scala/concurrent/ops.scala
+++ b/src/library/scala/concurrent/ops.scala
@@ -20,21 +20,25 @@ import scala.util.control.Exception.allCatch
*/
object ops
{
- implicit val defaultRunner: TaskRunner[Unit] =
+
+ implicit val defaultRunner: FutureTaskRunner =
TaskRunners.threadRunner
/**
* If expression computed successfully return it in <code>Right</code>,
* otherwise return exception in <code>Left</code>.
*/
+ //TODO: make private
def tryCatch[A](body: => A): Either[Throwable, A] =
allCatch[A] either body
+ //TODO: make private
def tryCatchEx[A](body: => A): Either[Exception, A] =
try Right(body) catch {
case ex: Exception => Left(ex)
}
+ //TODO: make private
def getOrThrow[T <: Throwable, A](x: Either[T, A]): A =
x.fold[A](throw _, identity _)
@@ -42,18 +46,16 @@ object ops
*
* @param p the expression to evaluate
*/
- def spawn(p: => Unit)(implicit runner: TaskRunner[Unit] = defaultRunner): Unit = {
- runner submit (() => p)
+ def spawn(p: => Unit)(implicit runner: TaskRunner = defaultRunner): Unit = {
+ runner execute runner.functionAsTask(() => p)
}
/**
* @param p ...
* @return ...
*/
- def future[A](p: => A)(implicit runner: TaskRunner[Unit] = defaultRunner): () => A = {
- val result = new SyncVar[Either[Throwable, A]]
- spawn({ result set tryCatch(p) })(runner)
- () => getOrThrow(result.get)
+ def future[A](p: => A)(implicit runner: FutureTaskRunner = defaultRunner): () => A = {
+ runner.futureAsFunction(runner submit runner.functionAsTask(() => p))
}
/**