summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2010-02-22 17:43:40 +0000
committerPhilipp Haller <hallerp@gmail.com>2010-02-22 17:43:40 +0000
commit669ce2013ec01091760ab8ed5cd8814471869a96 (patch)
tree190bbc000b4bed62a797a480d63e7419048ad472
parent57d38b321e73f78e03469ad5dab23e84999a817d (diff)
downloadscala-669ce2013ec01091760ab8ed5cd8814471869a96.tar.gz
scala-669ce2013ec01091760ab8ed5cd8814471869a96.tar.bz2
scala-669ce2013ec01091760ab8ed5cd8814471869a96.zip
- Added fair mode to ForkJoinScheduler, which s...
- Added fair mode to ForkJoinScheduler, which submits tasks to global - queue with a 2% chance Reactor uses ForkJoinScheduler by default Moved - ActorGC to scheduler package Various clean-ups
-rw-r--r--src/actors/scala/actors/Actor.scala2
-rw-r--r--src/actors/scala/actors/ActorGC.scala2
-rw-r--r--src/actors/scala/actors/IScheduler.scala4
-rw-r--r--src/actors/scala/actors/OutputChannel.scala2
-rw-r--r--src/actors/scala/actors/Reactor.scala62
-rw-r--r--src/actors/scala/actors/ReactorTask.scala6
-rw-r--r--src/actors/scala/actors/Scheduler.scala2
-rw-r--r--src/actors/scala/actors/package.scala2
-rw-r--r--src/actors/scala/actors/scheduler/DaemonScheduler.scala2
-rw-r--r--src/actors/scala/actors/scheduler/ForkJoinScheduler.scala16
-rw-r--r--src/actors/scala/actors/scheduler/ThreadPoolConfig.scala3
11 files changed, 64 insertions, 39 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 838d3a8f63..99a23c980f 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -820,7 +820,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
// (because shouldExit == true)
if (isSuspended)
resumeActor()
- else if (waitingFor ne waitingForNone) {
+ else if (waitingFor ne Reactor.waitingForNone) {
scheduleActor(waitingFor, null)
/* Here we should not throw a SuspendActorException,
since the current method is called from an actor that
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala
index af1b0ed82d..09fe2f1209 100644
--- a/src/actors/scala/actors/ActorGC.scala
+++ b/src/actors/scala/actors/ActorGC.scala
@@ -9,11 +9,11 @@
// $Id$
package scala.actors
+package scheduler
import java.lang.ref.{Reference, WeakReference, ReferenceQueue}
import scala.collection.mutable.HashSet
-import scala.actors.scheduler.TerminationMonitor
/**
* ActorGC keeps track of the number of live actors being managed by a
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index 7a47c670e7..9197c44eeb 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -11,12 +11,12 @@
package scala.actors
/**
- * The <code>IScheduler</code> trait provides a common interface
+ * The <code>AbstractScheduler</code> trait provides a common interface
* for all schedulers used to execute actor tasks.
*
* Subclasses of <code>Actor</code> that override its
* <code>scheduler</code> member must provide
- * an <code>IScheduler</code> implementation.
+ * an <code>AbstractScheduler</code> implementation.
*
* @author Philipp Haller
*/
diff --git a/src/actors/scala/actors/OutputChannel.scala b/src/actors/scala/actors/OutputChannel.scala
index 4fdd606a40..d1ce2761df 100644
--- a/src/actors/scala/actors/OutputChannel.scala
+++ b/src/actors/scala/actors/OutputChannel.scala
@@ -42,7 +42,7 @@ trait OutputChannel[-Msg] extends AbstractReactor[Msg] {
def forward(msg: Msg): Unit
/**
- * Returns the <code>Reactor</code> that is
+ * Returns the <code>Actor</code> that is
* receiving from this <code>OutputChannel</code>.
*/
def receiver: Actor
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index eb0485263b..a5cc26949a 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -10,18 +10,34 @@
package scala.actors
-import scala.actors.scheduler.{DelegatingScheduler, DefaultThreadPoolScheduler}
-import scala.collection.mutable.Queue
+import scala.actors.scheduler.{DelegatingScheduler, DefaultThreadPoolScheduler,
+ ForkJoinScheduler, ThreadPoolConfig}
+
+private[actors] object Reactor {
-private object Reactor {
val scheduler = new DelegatingScheduler {
def makeNewScheduler: IScheduler = {
- val s = new DefaultThreadPoolScheduler(false)
- Debug.info(this+": starting new "+s+" ["+s.getClass+"]")
- s.start()
- s
+ val sched = if (!ThreadPoolConfig.useForkJoin) {
+ // default is non-daemon
+ val s = new DefaultThreadPoolScheduler(false)
+ s.start()
+ s
+ } else {
+ // default is non-daemon, non-fair
+ val s = new ForkJoinScheduler(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, false, false)
+ s.start()
+ s
+ }
+ Debug.info(this+": starting new "+sched+" ["+sched.getClass+"]")
+ sched
}
}
+
+ val waitingForNone = new PartialFunction[Any, Unit] {
+ def isDefinedAt(x: Any) = false
+ def apply(x: Any) {}
+ }
+
}
/**
@@ -37,22 +53,20 @@ trait Reactor extends OutputChannel[Any] {
// guarded by this
private[actors] val sendBuffer = new MQueue("SendBuffer")
- /* Whenever this actor executes on some thread, waitingFor is
- * guaranteed to be equal to waitingForNone.
+ /* Whenever this actor executes on some thread, `waitingFor` is
+ * guaranteed to be equal to `Reactor.waitingForNone`.
*
- * In other words, whenever waitingFor is not equal to
- * waitingForNone, this actor is guaranteed not to execute on some
- * thread.
- */
- private[actors] val waitingForNone = new PartialFunction[Any, Unit] {
- def isDefinedAt(x: Any) = false
- def apply(x: Any) {}
- }
-
- /* If the actor waits in a react, waitingFor holds the
- * message handler that react was called with.
+ * In other words, whenever `waitingFor` is not equal to
+ * `Reactor.waitingForNone`, this actor is guaranteed not to execute
+ * on some thread.
+ *
+ * If the actor waits in a `react`, `waitingFor` holds the
+ * message handler that `react` was called with.
+ *
+ * guarded by this
*/
- private[actors] var waitingFor: PartialFunction[Any, Any] = waitingForNone // guarded by lock of this
+ private[actors] var waitingFor: PartialFunction[Any, Any] =
+ Reactor.waitingForNone
/**
* The behavior of an actor is specified by implementing this
@@ -78,9 +92,9 @@ trait Reactor extends OutputChannel[Any] {
*/
def send(msg: Any, replyTo: OutputChannel[Any]) {
val todo = synchronized {
- if (waitingFor ne waitingForNone) {
+ if (waitingFor ne Reactor.waitingForNone) {
val savedWaitingFor = waitingFor
- waitingFor = waitingForNone
+ waitingFor = Reactor.waitingForNone
startSearch(msg, replyTo, savedWaitingFor)
} else {
sendBuffer.append(msg, replyTo)
@@ -195,7 +209,7 @@ trait Reactor extends OutputChannel[Any] {
/* This closure is used to implement control-flow operations
* built on top of `seq`. Note that the only invocation of
- * `kill` is supposed to be inside `Reaction.run`.
+ * `kill` is supposed to be inside `ReactorTask.run`.
*/
@volatile
private[actors] var kill: () => Unit =
diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala
index 37aec0f8ec..aab7d6b72f 100644
--- a/src/actors/scala/actors/ReactorTask.scala
+++ b/src/actors/scala/actors/ReactorTask.scala
@@ -14,6 +14,8 @@ package scala.actors
import java.lang.Runnable
import java.util.concurrent.Callable
+import scala.concurrent.forkjoin.RecursiveAction
+
/** <p>
* The class <code>ReactorTask</code>.
* </p>
@@ -21,7 +23,7 @@ import java.util.concurrent.Callable
* @author Philipp Haller
*/
private[actors] class ReactorTask[T >: Null <: Reactor](var reactor: T, var fun: () => Any)
- extends Callable[Unit] with Runnable {
+ extends RecursiveAction with Callable[Unit] with Runnable {
def run() {
val saved = Actor.tl.get
@@ -58,6 +60,8 @@ private[actors] class ReactorTask[T >: Null <: Reactor](var reactor: T, var fun:
def call() = run()
+ def compute() = run()
+
protected def beforeExecuting() {}
protected def afterExecuting(e: Exception) {}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index d653271c1e..6792b65abe 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -25,10 +25,12 @@ object Scheduler extends DelegatingScheduler {
def makeNewScheduler: IScheduler = {
val sched = if (!ThreadPoolConfig.useForkJoin) {
+ // default is non-daemon
val s = new ResizableThreadPoolScheduler(false)
s.start()
s
} else {
+ // default is non-daemon, fair
val s = new ForkJoinScheduler
s.start()
s
diff --git a/src/actors/scala/actors/package.scala b/src/actors/scala/actors/package.scala
index 7075518931..7a7ef5899e 100644
--- a/src/actors/scala/actors/package.scala
+++ b/src/actors/scala/actors/package.scala
@@ -1,6 +1,7 @@
package scala
package object actors {
+
@deprecated("use scala.actors.scheduler.ForkJoinScheduler instead")
type FJTaskScheduler2 = scala.actors.scheduler.ForkJoinScheduler
@@ -15,4 +16,5 @@ package object actors {
@deprecated("this value is going to be removed in a future release")
val ActorGC = scala.actors.Scheduler.impl.asInstanceOf[scala.actors.scheduler.ThreadPoolScheduler]
+
}
diff --git a/src/actors/scala/actors/scheduler/DaemonScheduler.scala b/src/actors/scala/actors/scheduler/DaemonScheduler.scala
index 02f652db0b..257e847a6a 100644
--- a/src/actors/scala/actors/scheduler/DaemonScheduler.scala
+++ b/src/actors/scala/actors/scheduler/DaemonScheduler.scala
@@ -16,7 +16,7 @@ package scheduler
*/
object DaemonScheduler extends DelegatingScheduler {
- def makeNewScheduler(): IScheduler = {
+ protected def makeNewScheduler(): IScheduler = {
val sched = if (!ThreadPoolConfig.useForkJoin) {
val s = new ResizableThreadPoolScheduler(true)
s.start()
diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
index 54fab4fb11..e2537511ac 100644
--- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
@@ -3,13 +3,15 @@ package scheduler
import java.util.{Collection, ArrayList}
import scala.concurrent.forkjoin._
+import scala.util.Random
/** The <code>ForkJoinScheduler</code> is backed by a lightweight
* fork-join task execution framework.
*
* @author Philipp Haller
*/
-class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Runnable with IScheduler with TerminationMonitor {
+class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean, fair: Boolean)
+ extends Runnable with IScheduler with TerminationMonitor {
private var pool = makeNewPool() // guarded by this
private var terminating = false // guarded by this
@@ -22,7 +24,7 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean
protected val CHECK_FREQ = 10
def this(d: Boolean) {
- this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, d)
+ this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, d, true)
}
def this() {
@@ -106,11 +108,11 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean
}
override def executeFromActor(task: Runnable) {
- // TODO: only pass RecursiveAction (with Runnable), and cast to it
- val recAction = new RecursiveAction {
- def compute() = task.run()
- }
- recAction.fork()
+ // in fair mode: 2% chance of submitting to global task queue
+ if (fair && Random.nextInt(50) == 1)
+ pool.execute(task)
+ else
+ task.asInstanceOf[RecursiveAction].fork()
}
/** Submits a closure for execution.
diff --git a/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala b/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala
index 93112ae80a..c34cc83df6 100644
--- a/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala
+++ b/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala
@@ -13,6 +13,7 @@ package scheduler
/**
* @author Erik Engbrecht
+ * @author Philipp Haller
*/
object ThreadPoolConfig {
private val rt = Runtime.getRuntime()
@@ -23,7 +24,7 @@ object ThreadPoolConfig {
val prop = System.getProperty(propName)
Some(Integer.parseInt(prop))
} catch {
- case ace: java.security.AccessControlException => None
+ case se: SecurityException => None
case nfe: NumberFormatException => None
}
}