diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-09-11 15:50:39 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-09-11 15:50:39 +0000 |
commit | 514ff83e3983d81f8bf948abebbe5b9141d9690d (patch) | |
tree | 28acd359da510bdff60fa15d9917670446b931aa /src/library | |
parent | 61ff261346289f7886350a8a4da5688574070e59 (diff) | |
download | scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.tar.gz scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.tar.bz2 scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.zip |
Split TaskRunner into FutureTaskRunner and Task...
Split TaskRunner into FutureTaskRunner and TaskRunner. FutureTaskRunner
has an abstract Future[T] type member and inherits an abstract Task[T]
type member from TaskRunner. Implicit conversions enable tasks and
futures to be treated as parameter-less functions. This allows
TaskRunners to be used by actor schedulers without creating lots of
wrapper objects.
Diffstat (limited to 'src/library')
-rw-r--r-- | src/library/scala/concurrent/FutureTaskRunner.scala | 17 | ||||
-rw-r--r-- | src/library/scala/concurrent/JavaConversions.scala | 25 | ||||
-rw-r--r-- | src/library/scala/concurrent/TaskRunner.scala | 11 | ||||
-rw-r--r-- | src/library/scala/concurrent/TaskRunners.scala | 6 | ||||
-rw-r--r-- | src/library/scala/concurrent/ThreadPoolRunner.scala | 31 | ||||
-rw-r--r-- | src/library/scala/concurrent/ThreadRunner.scala | 19 | ||||
-rw-r--r-- | src/library/scala/concurrent/ops.scala | 16 |
7 files changed, 80 insertions, 45 deletions
diff --git a/src/library/scala/concurrent/FutureTaskRunner.scala b/src/library/scala/concurrent/FutureTaskRunner.scala new file mode 100644 index 0000000000..48ad0817a2 --- /dev/null +++ b/src/library/scala/concurrent/FutureTaskRunner.scala @@ -0,0 +1,17 @@ +package scala.concurrent + +/** The <code>FutureTaskRunner</code> trait... + * + * @author Philipp Haller + */ +trait FutureTaskRunner extends TaskRunner { + + type Future[T] + + implicit def futureAsFunction[S](x: Future[S]): () => S + + def submit[S](task: Task[S]): Future[S] + + def managedBlock(blocker: ManagedBlocker): Unit + +} diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala index 7ee9096127..7810cee669 100644 --- a/src/library/scala/concurrent/JavaConversions.scala +++ b/src/library/scala/concurrent/JavaConversions.scala @@ -8,8 +8,8 @@ import java.util.concurrent.{ExecutorService, Executor} */ object JavaConversions { - implicit def asTaskRunner(exec: ExecutorService): TaskRunner[Unit] = - new ThreadPoolRunner[Unit] { + implicit def asTaskRunner(exec: ExecutorService): FutureTaskRunner = + new ThreadPoolRunner { override protected def executor = exec @@ -17,17 +17,16 @@ object JavaConversions { exec.shutdown() } - implicit def asTaskRunner(exec: Executor): TaskRunner[Unit] = - new TaskRunner[Unit] { - type Future[+R] = () => R - - def submit(task: () => Unit): this.Future[Unit] = { - val result = new SyncVar[Either[Throwable, Unit]] - val runnable = new Runnable { - def run() { result set tryCatch(task()) } - } - exec.execute(runnable) - () => ops getOrThrow result.get + implicit def asTaskRunner(exec: Executor): TaskRunner = + new TaskRunner { + type Task[T] = Runnable + + implicit def functionAsTask[T](fun: () => T): Task[T] = new Runnable { + def run() { fun() } + } + + def execute[S](task: Task[S]) { + exec.execute(task) } def managedBlock(blocker: ManagedBlocker) { diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala index a393b065fa..e9b7c6916b 100644 --- a/src/library/scala/concurrent/TaskRunner.scala +++ b/src/library/scala/concurrent/TaskRunner.scala @@ -4,16 +4,15 @@ package scala.concurrent * * @author Philipp Haller */ -trait TaskRunner[T] extends AsyncInvokable[() => T, T] { +trait TaskRunner { - def submit(task: () => T): Future[T] + type Task[T] - def shutdown(): Unit + implicit def functionAsTask[S](fun: () => S): Task[S] - def !!(task: () => T): Future[T] = - submit(task) + def execute[S](task: Task[S]): Unit - def managedBlock(blocker: ManagedBlocker): Unit + def shutdown(): Unit /** If expression computed successfully return it in <code>Right</code>, * otherwise return exception in <code>Left</code>. diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala index 8219d9d169..cc5c1a5131 100644 --- a/src/library/scala/concurrent/TaskRunners.scala +++ b/src/library/scala/concurrent/TaskRunners.scala @@ -8,10 +8,10 @@ import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit} */ object TaskRunners { - implicit val threadRunner: TaskRunner[Unit] = - new ThreadRunner[Unit] + implicit val threadRunner: FutureTaskRunner = + new ThreadRunner - implicit val threadPoolRunner: TaskRunner[Unit] = { + implicit val threadPoolRunner: FutureTaskRunner = { val numCores = Runtime.getRuntime().availableProcessors() val keepAliveTime = 60000L val workQueue = new LinkedBlockingQueue[Runnable] diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala index cbc5ebb293..e532d2bfff 100644 --- a/src/library/scala/concurrent/ThreadPoolRunner.scala +++ b/src/library/scala/concurrent/ThreadPoolRunner.scala @@ -2,29 +2,36 @@ package scala.concurrent import java.util.concurrent.{ExecutorService, Callable, TimeUnit} -import scala.annotation.unchecked.uncheckedVariance - /** The <code>ThreadPoolRunner</code> trait... * * @author Philipp Haller */ -trait ThreadPoolRunner[T] extends TaskRunner[T] { +trait ThreadPoolRunner extends FutureTaskRunner { + + type Task[T] = Callable[T] with Runnable + type Future[T] = RichFuture[T] + + private class RunCallable[S](fun: () => S) extends Runnable with Callable[S] { + def run() = fun() + def call() = fun() + } + + implicit def functionAsTask[S](fun: () => S): Task[S] = + new RunCallable(fun) - type Future[+R] = RichFuture[R] + implicit def futureAsFunction[S](x: Future[S]): () => S = + () => x.get() - trait RichFuture[+S] extends java.util.concurrent.Future[S @uncheckedVariance] - with (() => S) + trait RichFuture[S] extends java.util.concurrent.Future[S] + with (() => S) protected def executor: ExecutorService - def submit(task: () => T): this.Future[T] = { - val callable = new Callable[T] { - def call() = task() - } - toRichFuture(executor.submit[T](callable)) + def submit[S](task: Task[S]): Future[S] = { + toRichFuture(executor.submit[S](task)) } - def execute(task: Runnable): Unit = + def execute[S](task: Task[S]): Unit = executor execute task def managedBlock(blocker: ManagedBlocker) { diff --git a/src/library/scala/concurrent/ThreadRunner.scala b/src/library/scala/concurrent/ThreadRunner.scala index f48e0384b7..16269fa65f 100644 --- a/src/library/scala/concurrent/ThreadRunner.scala +++ b/src/library/scala/concurrent/ThreadRunner.scala @@ -6,12 +6,23 @@ import java.lang.Thread * * @author Philipp Haller */ -class ThreadRunner[T] extends TaskRunner[T] { +class ThreadRunner extends FutureTaskRunner { - type Future[+S] = () => S + type Task[T] = () => T + type Future[T] = () => T - def submit(task: () => T): this.Future[T] = { - val result = new SyncVar[Either[Exception, T]] + implicit def functionAsTask[S](fun: () => S): Task[S] = fun + implicit def futureAsFunction[S](x: Future[S]): () => S = x + + def execute[S](task: Task[S]) { + val runnable = new Runnable { + def run() { tryCatch(task()) } + } + (new Thread(runnable)).start() + } + + def submit[S](task: Task[S]): Future[S] = { + val result = new SyncVar[Either[Exception, S]] val runnable = new Runnable { def run() { result set tryCatch(task()) } } diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala index e2fb8f0ceb..4ff26d4465 100644 --- a/src/library/scala/concurrent/ops.scala +++ b/src/library/scala/concurrent/ops.scala @@ -20,21 +20,25 @@ import scala.util.control.Exception.allCatch */ object ops { - implicit val defaultRunner: TaskRunner[Unit] = + + implicit val defaultRunner: FutureTaskRunner = TaskRunners.threadRunner /** * If expression computed successfully return it in <code>Right</code>, * otherwise return exception in <code>Left</code>. */ + //TODO: make private def tryCatch[A](body: => A): Either[Throwable, A] = allCatch[A] either body + //TODO: make private def tryCatchEx[A](body: => A): Either[Exception, A] = try Right(body) catch { case ex: Exception => Left(ex) } + //TODO: make private def getOrThrow[T <: Throwable, A](x: Either[T, A]): A = x.fold[A](throw _, identity _) @@ -42,18 +46,16 @@ object ops * * @param p the expression to evaluate */ - def spawn(p: => Unit)(implicit runner: TaskRunner[Unit] = defaultRunner): Unit = { - runner submit (() => p) + def spawn(p: => Unit)(implicit runner: TaskRunner = defaultRunner): Unit = { + runner execute runner.functionAsTask(() => p) } /** * @param p ... * @return ... */ - def future[A](p: => A)(implicit runner: TaskRunner[Unit] = defaultRunner): () => A = { - val result = new SyncVar[Either[Throwable, A]] - spawn({ result set tryCatch(p) })(runner) - () => getOrThrow(result.get) + def future[A](p: => A)(implicit runner: FutureTaskRunner = defaultRunner): () => A = { + runner.futureAsFunction(runner submit runner.functionAsTask(() => p)) } /** |