summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-08-03 15:56:44 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-08-03 15:56:44 +0000
commitac89702827559cab8835edecb35cc09a1ca3fe10 (patch)
tree7d8b01c12aeb8f42192ac68cce3289b3a4078310 /src/actors
parentcf7a2f64f1357dcfa8ecf78ae8f29880c9fab214 (diff)
downloadscala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.gz
scala-ac89702827559cab8835edecb35cc09a1ca3fe10.tar.bz2
scala-ac89702827559cab8835edecb35cc09a1ca3fe10.zip
Added the scala.concurrent.TaskRunner and scala...
Added the scala.concurrent.TaskRunner and scala.concurrent.AsyncInvokable abstractions with corresponding refactorings in scala.actors and scala.concurrent.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala4
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala6
-rw-r--r--src/actors/scala/actors/IScheduler.scala10
-rw-r--r--src/actors/scala/actors/ManagedBlocker.scala20
-rw-r--r--src/actors/scala/actors/Replyable.scala15
-rw-r--r--src/actors/scala/actors/ReplyableActor.scala1
-rw-r--r--src/actors/scala/actors/ReplyableReactor.scala2
-rw-r--r--src/actors/scala/actors/Scheduler.scala7
-rw-r--r--src/actors/scala/actors/SchedulerAdapter.scala7
-rw-r--r--src/actors/scala/actors/remote/Proxy.scala2
-rw-r--r--src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala4
-rw-r--r--src/actors/scala/actors/scheduler/DelegatingScheduler.scala4
-rw-r--r--src/actors/scala/actors/scheduler/ExecutorScheduler.scala27
-rw-r--r--src/actors/scala/actors/scheduler/ForkJoinScheduler.scala9
-rw-r--r--src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala7
-rw-r--r--src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala5
16 files changed, 53 insertions, 77 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index f9e79ce98f..aa65f5a6ac 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -643,7 +643,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
scheduler executeFromActor task
}
- private class ActorBlocker(timeout: Long) extends ManagedBlocker {
+ private class ActorBlocker(timeout: Long) extends scala.concurrent.ManagedBlocker {
def block() = {
if (timeout > 0)
Actor.this.suspendActorFor(timeout)
@@ -651,7 +651,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
Actor.this.suspendActor()
true
}
- def isReleasable() =
+ def isReleasable =
!Actor.this.isSuspended
}
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index e789e96be8..b12ef2d925 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -121,9 +121,6 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean)
def execute(task: Runnable): Unit =
executor execute task
- def executeFromActor(task: Runnable) =
- execute(task)
-
def execute(fun: => Unit): Unit =
executor.execute(new Runnable {
def run() { fun }
@@ -142,4 +139,7 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean)
def isActive = !terminating && !suspending
+ def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
+ blocker.block()
+ }
}
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index 8f3b243b71..f4034fbfab 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -16,8 +16,7 @@ package scala.actors
*
* Subclasses of <code>Actor</code> that override its
* <code>scheduler</code> member must provide
- * an implementation of the <code>IScheduler</code>
- * trait.
+ * an <code>IScheduler</code> implementation.
*
* @author Philipp Haller
*/
@@ -35,7 +34,8 @@ trait IScheduler {
*/
def execute(task: Runnable): Unit
- def executeFromActor(task: Runnable): Unit
+ def executeFromActor(task: Runnable): Unit =
+ execute(task)
/** Shuts down the scheduler.
*/
@@ -66,7 +66,5 @@ trait IScheduler {
*/
def onTerminate(a: Reactor)(f: => Unit): Unit
- def managedBlock(blocker: ManagedBlocker) {
- blocker.block()
- }
+ def managedBlock(blocker: scala.concurrent.ManagedBlocker): Unit
}
diff --git a/src/actors/scala/actors/ManagedBlocker.scala b/src/actors/scala/actors/ManagedBlocker.scala
deleted file mode 100644
index f3fd08301b..0000000000
--- a/src/actors/scala/actors/ManagedBlocker.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-// $Id$
-
-package scala.actors
-
-import forkjoin.ForkJoinPool
-
-/**
- * The <code>ManagedBlocker</code> trait...
- *
- * @author Philipp Haller
- */
-trait ManagedBlocker extends ForkJoinPool.ManagedBlocker
diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala
index 99c65d29f3..39d87241a6 100644
--- a/src/actors/scala/actors/Replyable.scala
+++ b/src/actors/scala/actors/Replyable.scala
@@ -16,8 +16,11 @@ package scala.actors
*
* @author Philipp Haller
*/
-trait Replyable[-T, +R] {
-
+trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] {
+/*
+ def apply(msg: T): this.Future[R] =
+ this !! msg
+*/
/**
* Sends <code>msg</code> to this Replyable and awaits reply
* (synchronous).
@@ -45,8 +48,8 @@ trait Replyable[-T, +R] {
* @param msg the message to be sent
* @return the future
*/
- def !!(msg: T): (() => R) =
- () => this !? msg
+ def !!(msg: T): this.Future[R]
+// () => this !? msg
/**
* Sends <code>msg</code> to this actor and immediately
@@ -59,7 +62,7 @@ trait Replyable[-T, +R] {
* @param f the function to be applied to the response
* @return the future
*/
- def !![P](msg: T, f: PartialFunction[R, P]): (() => P) =
- () => f(this !? msg)
+ def !![P](msg: T, f: PartialFunction[R, P]): this.Future[P]
+// () => f(this !? msg)
}
diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala
index 1e1487bf39..8ba4f13842 100644
--- a/src/actors/scala/actors/ReplyableActor.scala
+++ b/src/actors/scala/actors/ReplyableActor.scala
@@ -84,6 +84,7 @@ trait ReplyableActor extends ReplyableReactor {
override def !!(msg: Any): Future[Any] = {
val ftch = new Channel[Any](Actor.self(thiz.scheduler))
val linkedChannel = new AbstractActor {
+ type Future[+R] = scala.actors.Future[R]
def !(msg: Any) =
ftch ! msg
def send(msg: Any, replyTo: OutputChannel[Any]) =
diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala
index 84168abe0a..6752fbb7b4 100644
--- a/src/actors/scala/actors/ReplyableReactor.scala
+++ b/src/actors/scala/actors/ReplyableReactor.scala
@@ -20,6 +20,8 @@ package scala.actors
trait ReplyableReactor extends Replyable[Any, Any] {
thiz: ReplyReactor =>
+ type Future[+S] = scala.actors.Future[S]
+
/**
* Sends <code>msg</code> to this actor and awaits reply
* (synchronous).
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index cb764e963b..894dbc93e8 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -24,12 +24,13 @@ object Scheduler extends DelegatingScheduler {
Debug.info("initializing "+this+"...")
def makeNewScheduler: IScheduler = {
- val workQueue = new LinkedBlockingQueue[Runnable](100000)
+ val workQueue = new LinkedBlockingQueue[Runnable]
val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
ThreadPoolConfig.maxPoolSize,
- 50L,
+ 60000L,
TimeUnit.MILLISECONDS,
- workQueue)
+ workQueue,
+ new ThreadPoolExecutor.CallerRunsPolicy)
val s = new ThreadPoolScheduler(threadPool, true)
//val s = new ForkJoinScheduler
Debug.info(this+": starting new "+s+" ["+s.getClass+"]")
diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala
index 6355ee1ace..ba7822c372 100644
--- a/src/actors/scala/actors/SchedulerAdapter.scala
+++ b/src/actors/scala/actors/SchedulerAdapter.scala
@@ -28,9 +28,6 @@ trait SchedulerAdapter extends IScheduler {
def execute(task: Runnable): Unit =
execute { task.run() }
- def executeFromActor(task: Runnable): Unit =
- execute(task)
-
/** Shuts down the scheduler.
*/
def shutdown(): Unit =
@@ -64,4 +61,8 @@ trait SchedulerAdapter extends IScheduler {
*/
def onTerminate(a: Reactor)(f: => Unit) =
Scheduler.onTerminate(a)(f)
+
+ def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
+ blocker.block()
+ }
}
diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala
index d99a8d3c1c..60bcd34221 100644
--- a/src/actors/scala/actors/remote/Proxy.scala
+++ b/src/actors/scala/actors/remote/Proxy.scala
@@ -20,6 +20,8 @@ import scala.collection.mutable.HashMap
private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor {
import java.io.{IOException, ObjectOutputStream, ObjectInputStream}
+ type Future[+R] = scala.actors.Future[R]
+
@transient
private[remote] var del: Actor = null
startDelegate()
diff --git a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala
index 88ccb6dce8..4a8f9d034d 100644
--- a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala
+++ b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala
@@ -42,12 +42,12 @@ class DefaultExecutorScheduler(daemon: Boolean)
private val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
ThreadPoolConfig.maxPoolSize,
- 50L,
+ 60000L,
TimeUnit.MILLISECONDS,
workQueue,
threadFactory)
val executor = threadPool
- override val CHECK_FREQ = 50
+ override val CHECK_FREQ = 10
}
diff --git a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
index 4bd2d09bd0..e72acd09d8 100644
--- a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
@@ -9,6 +9,8 @@
package scala.actors
package scheduler
+import scala.concurrent.ManagedBlocker
+
/**
* @author Erik Engbrecht
*/
@@ -37,7 +39,7 @@ trait DelegatingScheduler extends IScheduler {
def execute(task: Runnable) = impl.execute(task)
- def executeFromActor(task: Runnable) = impl.executeFromActor(task)
+ override def executeFromActor(task: Runnable) = impl.executeFromActor(task)
def shutdown(): Unit = synchronized {
if (sched ne null) {
diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
index 8e032242f6..07a014f92d 100644
--- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
@@ -11,7 +11,7 @@
package scala.actors
package scheduler
-import java.util.concurrent.{ExecutorService, RejectedExecutionException}
+import scala.concurrent.ThreadPoolRunner
/**
* The <code>ExecutorScheduler</code> class uses an
@@ -19,29 +19,9 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException}
*
* @author Philipp Haller
*/
-trait ExecutorScheduler extends IScheduler {
+trait ExecutorScheduler extends IScheduler with ThreadPoolRunner[Unit] {
- protected def executor: ExecutorService
-
- /** Submits a <code>Runnable</code> for execution.
- *
- * @param task the task to be executed
- */
- def execute(task: Runnable) {
- try {
- executor execute task
- } catch {
- case ree: RejectedExecutionException =>
- // run task on current thread
- task.run()
- }
- }
-
- def executeFromActor(task: Runnable) =
- execute(task)
-
- /** This method is called when the <code>SchedulerService</code>
- * shuts down.
+ /** This method is called when the scheduler shuts down.
*/
def onShutdown(): Unit =
executor.shutdown()
@@ -51,4 +31,5 @@ trait ExecutorScheduler extends IScheduler {
*/
def isActive =
(executor ne null) && !executor.isShutdown
+
}
diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
index 5d6d676b1a..82e8f5c2fd 100644
--- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
@@ -94,7 +94,7 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
pool.execute(task)
}
- def executeFromActor(task: Runnable) {
+ override def executeFromActor(task: Runnable) {
val recAction = new RecursiveAction {
def compute() = task.run()
}
@@ -110,8 +110,11 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
def run() { fun }
})
- override def managedBlock(blocker: ManagedBlocker) {
- ForkJoinPool.managedBlock(blocker, true)
+ override def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
+ def block = blocker.block()
+ def isReleasable() = blocker.isReleasable
+ }, true)
}
/** Shuts down the scheduler.
diff --git a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala
index c7e588f2ed..4ad865a15d 100644
--- a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala
+++ b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala
@@ -24,9 +24,6 @@ class SingleThreadedScheduler extends IScheduler {
task.run()
}
- def executeFromActor(task: Runnable) =
- execute(task)
-
def execute(fun: => Unit): Unit =
execute(new Runnable {
def run() { fun }
@@ -39,4 +36,8 @@ class SingleThreadedScheduler extends IScheduler {
def onTerminate(actor: Reactor)(f: => Unit) {}
def isActive = true
+
+ def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
+ blocker.block()
+ }
}
diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
index 4826d44383..fd8a0e6a64 100644
--- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
@@ -12,6 +12,7 @@ package scala.actors
package scheduler
import java.util.concurrent.ThreadPoolExecutor
+import scala.concurrent.ManagedBlocker
/**
* The <code>ThreadPoolScheduler</code> class uses an
@@ -44,7 +45,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
override def managedBlock(blocker: ManagedBlocker) {
val coreSize = executor.getCorePoolSize()
- if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) {
+ if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) {
executor.setCorePoolSize(coreSize + 1)
}
blocker.block()
@@ -67,7 +68,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
throw new QuitException
val coreSize = executor.getCorePoolSize()
- if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) {
+ if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) {
executor.setCorePoolSize(coreSize + 1)
}
}