summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/actors/scala/actors/Actor.scala5
-rw-r--r--src/actors/scala/actors/ActorGC.scala17
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala2
-rw-r--r--src/actors/scala/actors/Future.scala4
-rw-r--r--src/actors/scala/actors/Scheduler.scala11
-rw-r--r--src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala49
-rw-r--r--src/actors/scala/actors/scheduler/SchedulerService.scala3
-rw-r--r--src/actors/scala/actors/scheduler/TerminationMonitor.scala5
-rw-r--r--src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala42
-rw-r--r--test/files/jvm/t2359.check5
-rw-r--r--test/files/jvm/t2359.scala21
11 files changed, 120 insertions, 44 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 92111d6bf8..06b8af788f 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -398,6 +398,9 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
*/
private var onTimeout: Option[TimerTask] = None
+ /* Used for notifying scheduler when blocking inside <code>receive</code>. */
+ private lazy val blocker = new ActorBlocker(0)
+
private class RunCallable(fun: () => Unit) extends Callable[Unit] with Runnable {
def call() = fun()
def run() = fun()
@@ -457,7 +460,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
} else {
waitingFor = f.isDefinedAt
isSuspended = true
- scheduler.managedBlock(new ActorBlocker(0))
+ scheduler.managedBlock(blocker)
done = true
}
}
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala
index 67493e91cd..2725e29109 100644
--- a/src/actors/scala/actors/ActorGC.scala
+++ b/src/actors/scala/actors/ActorGC.scala
@@ -13,6 +13,7 @@ package scala.actors
import java.lang.ref.{Reference, WeakReference, ReferenceQueue}
import scala.collection.mutable.{HashMap, HashSet}
+import scala.actors.scheduler.TerminationMonitor
/**
* ActorGC keeps track of the number of live actors being managed by a
@@ -24,10 +25,8 @@ import scala.collection.mutable.{HashMap, HashSet}
* (e.g. act method finishes, exit explicitly called, an exception is thrown),
* the ActorGC is informed via the <code>terminated</code> method.
*/
-trait ActorGC extends IScheduler {
-
- private var pendingReactions = 0
- private val termHandlers = new HashMap[Reactor, () => Unit]
+trait ActorGC extends TerminationMonitor {
+ self: IScheduler =>
/** Actors are added to refQ in newActor. */
private val refQ = new ReferenceQueue[Reactor]
@@ -40,7 +39,7 @@ trait ActorGC extends IScheduler {
private val refSet = new HashSet[Reference[t] forSome { type t <: Reactor }]
/** newActor is invoked whenever a new actor is started. */
- def newActor(a: Reactor) = synchronized {
+ override def newActor(a: Reactor) = synchronized {
// registers a reference to the actor with the ReferenceQueue
val wr = new WeakReference[Reactor](a, refQ)
refSet += wr
@@ -48,7 +47,7 @@ trait ActorGC extends IScheduler {
}
/** Removes unreachable actors from refSet. */
- protected def gc() = synchronized {
+ protected override def gc() = synchronized {
// check for unreachable actors
def drainRefQ() {
val wr = refQ.poll
@@ -66,17 +65,17 @@ trait ActorGC extends IScheduler {
println(this+": size of refSet: "+refSet.size)
}
- protected def allTerminated: Boolean = synchronized {
+ protected override def allTerminated: Boolean = synchronized {
pendingReactions <= 0
}
- def onTerminate(a: Reactor)(f: => Unit) = synchronized {
+ override def onTerminate(a: Reactor)(f: => Unit): Unit = synchronized {
termHandlers += (a -> (() => f))
}
/* Called only from <code>Reaction</code>.
*/
- def terminated(a: Reactor) = synchronized {
+ override def terminated(a: Reactor) = synchronized {
// execute registered termination handler (if any)
termHandlers.get(a) match {
case Some(handler) =>
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index b12ef2d925..9a2832d85f 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -26,7 +26,7 @@ import scheduler.{ThreadPoolConfig, QuitException}
* @version 0.9.18
* @author Philipp Haller
*/
-class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Thread with ActorGC {
+class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Thread with IScheduler with ActorGC {
setDaemon(daemon)
/** Default constructor creates a non-daemon thread. */
diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala
index bc7ae1c402..5ed758a65b 100644
--- a/src/actors/scala/actors/Future.scala
+++ b/src/actors/scala/actors/Future.scala
@@ -10,7 +10,7 @@
package scala.actors
-import scheduler.DefaultExecutorScheduler
+import scheduler.DefaultThreadPoolScheduler
/**
* <p>
@@ -40,7 +40,7 @@ abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T
*/
object Futures {
- private lazy val sched = new DefaultExecutorScheduler(true)
+ private lazy val sched = new DefaultThreadPoolScheduler(true)
def future[T](body: => T): Future[T] = {
case object Eval
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 894dbc93e8..f6b82e1bb9 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -11,7 +11,7 @@
package scala.actors
import java.util.concurrent._
-import scheduler.{DelegatingScheduler, ThreadPoolConfig, ThreadPoolScheduler, ForkJoinScheduler}
+import scheduler.{DelegatingScheduler, ThreadPoolConfig, ThreadPoolScheduler, ForkJoinScheduler, DefaultThreadPoolScheduler}
/**
* The <code>Scheduler</code> object is used by <code>Actor</code> to
@@ -24,14 +24,7 @@ object Scheduler extends DelegatingScheduler {
Debug.info("initializing "+this+"...")
def makeNewScheduler: IScheduler = {
- val workQueue = new LinkedBlockingQueue[Runnable]
- val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
- ThreadPoolConfig.maxPoolSize,
- 60000L,
- TimeUnit.MILLISECONDS,
- workQueue,
- new ThreadPoolExecutor.CallerRunsPolicy)
- val s = new ThreadPoolScheduler(threadPool, true)
+ val s = new DefaultThreadPoolScheduler(false)
//val s = new ForkJoinScheduler
Debug.info(this+": starting new "+s+" ["+s.getClass+"]")
s.start()
diff --git a/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala
new file mode 100644
index 0000000000..f5eb6c4884
--- /dev/null
+++ b/src/actors/scala/actors/scheduler/DefaultThreadPoolScheduler.scala
@@ -0,0 +1,49 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors.scheduler
+
+import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue,
+ ThreadFactory}
+
+/**
+ * The <code>DefaultThreadPoolScheduler</code> class uses a default
+ * <code>ThreadPoolExecutor</code> for executing <code>Actor</code>s.
+ *
+ * It can be configured using the two JVM properties
+ * <code>actors.corePoolSize</code> and
+ * <code>actors.maxPoolSize</code> that control the initial and
+ * maximum size of the thread pool, respectively.
+ *
+ * @author Philipp Haller
+ */
+private[actors] class DefaultThreadPoolScheduler(daemon: Boolean) extends ThreadPoolScheduler(daemon) {
+
+ executor = {
+ val workQueue = new LinkedBlockingQueue[Runnable]
+
+ val threadFactory = new ThreadFactory {
+ def newThread(r: Runnable): Thread = {
+ val t = new Thread(r)
+ t setDaemon daemon
+ t
+ }
+ }
+
+ new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
+ ThreadPoolConfig.maxPoolSize,
+ 60000L,
+ TimeUnit.MILLISECONDS,
+ workQueue,
+ threadFactory,
+ new ThreadPoolExecutor.CallerRunsPolicy)
+ }
+
+}
diff --git a/src/actors/scala/actors/scheduler/SchedulerService.scala b/src/actors/scala/actors/scheduler/SchedulerService.scala
index d1528c9e5b..2663118a06 100644
--- a/src/actors/scala/actors/scheduler/SchedulerService.scala
+++ b/src/actors/scala/actors/scheduler/SchedulerService.scala
@@ -23,7 +23,8 @@ import java.lang.{Runnable, Thread, InterruptedException}
* @version 0.9.18
* @author Philipp Haller
*/
-abstract class SchedulerService(daemon: Boolean) extends Thread with ActorGC {
+abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler with ActorGC {
+
setDaemon(daemon)
def this() =
diff --git a/src/actors/scala/actors/scheduler/TerminationMonitor.scala b/src/actors/scala/actors/scheduler/TerminationMonitor.scala
index 47d408dc1a..43173479e8 100644
--- a/src/actors/scala/actors/scheduler/TerminationMonitor.scala
+++ b/src/actors/scala/actors/scheduler/TerminationMonitor.scala
@@ -15,8 +15,8 @@ import scala.collection.mutable.HashMap
trait TerminationMonitor {
- private var pendingReactions = 0
- private val termHandlers = new HashMap[Reactor, () => Unit]
+ protected var pendingReactions = 0
+ protected val termHandlers = new HashMap[Reactor, () => Unit]
private var started = false
/** newActor is invoked whenever a new actor is started. */
@@ -60,4 +60,5 @@ trait TerminationMonitor {
started && pendingReactions <= 0
}
+ protected def gc() {}
}
diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
index c43f541cbd..1a68dc669b 100644
--- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
@@ -8,46 +8,54 @@
// $Id$
-package scala.actors
-package scheduler
+package scala.actors.scheduler
import java.util.concurrent.ThreadPoolExecutor
+import scala.actors.Debug
import scala.concurrent.ManagedBlocker
/**
- * The <code>ThreadPoolScheduler</code> class uses an
+ * The <code>ThreadPoolScheduler</code> class uses a
* <code>ThreadPoolExecutor</code> to execute <code>Actor</code>s.
*
* A <code>ThreadPoolScheduler</code> attempts to shut down
- * the underlying <code>ExecutorService</code> only if
+ * the underlying <code>ThreadPoolExecutor</code> only if
* <code>terminate</code> is set to true.
*
- * Otherwise, the <code>ExecutorService</code> must be shut down either
- * directly or by shutting down the
+ * Otherwise, the <code>ThreadPoolExecutor</code> must be shut down
+ * either directly or by shutting down the
* <code>ThreadPoolScheduler</code> instance.
*
* @author Philipp Haller
*/
class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
- protected var terminate: Boolean)
- extends Thread with TerminationMonitor with ExecutorScheduler {
+ protected var terminate: Boolean,
+ protected val daemon: Boolean)
+ extends Thread with ExecutorScheduler with TerminationMonitor {
+
+ setDaemon(daemon)
private var terminating = false
protected val CHECK_FREQ = 10
/* This constructor (and the var above) is currently only used to work
* around a bug in scaladoc, which cannot deal with early initializers
- * (to be used in subclasses such as DefaultExecutorScheduler) properly.
+ * (to be used in subclasses such as DefaultThreadPoolScheduler)
+ * properly.
*/
- def this() {
- this(null, true)
+ def this(d: Boolean) {
+ this(null, true, d)
}
- override def managedBlock(blocker: ManagedBlocker) {
+ protected def adjustCorePoolSize() {
val coreSize = executor.getCorePoolSize()
if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) {
executor.setCorePoolSize(coreSize + 1)
}
+ }
+
+ override def managedBlock(blocker: ManagedBlocker) {
+ adjustCorePoolSize()
blocker.block()
}
@@ -61,16 +69,12 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
case _: InterruptedException =>
}
- if (terminating)
+ if (terminating || (terminate && allTerminated))
throw new QuitException
- if (terminate && allTerminated)
- throw new QuitException
+ gc()
- val coreSize = executor.getCorePoolSize()
- if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) {
- executor.setCorePoolSize(coreSize + 1)
- }
+ adjustCorePoolSize()
}
}
} catch {
diff --git a/test/files/jvm/t2359.check b/test/files/jvm/t2359.check
new file mode 100644
index 0000000000..8a1218a102
--- /dev/null
+++ b/test/files/jvm/t2359.check
@@ -0,0 +1,5 @@
+1
+2
+3
+4
+5
diff --git a/test/files/jvm/t2359.scala b/test/files/jvm/t2359.scala
new file mode 100644
index 0000000000..1b4d5e0a27
--- /dev/null
+++ b/test/files/jvm/t2359.scala
@@ -0,0 +1,21 @@
+import scala.actors.Futures._
+
+object Test {
+ def main(args: Array[String]) {
+ val x = future {
+ System.out.println(1)
+ future {
+ System.out.println(2)
+ future {
+ System.out.println(3)
+ future {
+ System.out.println(4)
+ future {
+ System.out.println(5)
+ }()
+ }()
+ }()
+ }()
+ }()
+ }
+}