summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-28 19:08:24 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-28 19:08:24 +0000
commite1424d97d5386275003b40c60a7b36c5f5fd87d7 (patch)
tree14a9d52409c635f0b3665505240a219824081ab5 /src/actors
parent9b2430c7766882b55994fda01bc59130e256d022 (diff)
downloadscala-e1424d97d5386275003b40c60a7b36c5f5fd87d7.tar.gz
scala-e1424d97d5386275003b40c60a7b36c5f5fd87d7.tar.bz2
scala-e1424d97d5386275003b40c60a7b36c5f5fd87d7.zip
Improved SimpleExecutorScheduler with non-leaki...
Improved SimpleExecutorScheduler with non-leaking termination monitor.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/ExecutorScheduler.scala5
-rw-r--r--src/actors/scala/actors/Scheduler.scala24
-rw-r--r--src/actors/scala/actors/SimpleExecutorScheduler.scala62
-rw-r--r--src/actors/scala/actors/TerminationMonitor.scala59
-rw-r--r--src/actors/scala/actors/TerminationService.scala70
5 files changed, 160 insertions, 60 deletions
diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala
index 72d38d1b2f..c414d80eba 100644
--- a/src/actors/scala/actors/ExecutorScheduler.scala
+++ b/src/actors/scala/actors/ExecutorScheduler.scala
@@ -37,7 +37,8 @@ class ExecutorScheduler(protected var executor: ExecutorService) extends Schedul
executor execute task
} catch {
case ree: RejectedExecutionException =>
- Debug.info("caught "+ree)
+ // run task on current thread
+ task.run()
}
}
@@ -48,5 +49,5 @@ class ExecutorScheduler(protected var executor: ExecutorService) extends Schedul
executor.shutdown()
def isActive =
- !executor.isShutdown
+ (executor ne null) && !executor.isShutdown
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 50b8ecbbea..a7935edabf 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -10,8 +10,8 @@
package scala.actors
-import compat.Platform
import java.lang.Runnable
+import java.util.concurrent._
/**
* The <code>Scheduler</code> object is used by <code>Actor</code> to
@@ -25,9 +25,15 @@ object Scheduler extends DelegatingScheduler {
Debug.info("initializing "+this+"...")
def makeNewScheduler: IScheduler = {
- val sched = new DefaultExecutorScheduler
- sched.start()
- sched
+ val workQueue = new LinkedBlockingQueue[Runnable](100000)
+ val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
+ ThreadPoolConfig.maxPoolSize,
+ 50L,
+ TimeUnit.MILLISECONDS,
+ workQueue)
+ val s = new SimpleExecutorScheduler(threadPool, true)
+ s.start()
+ s
}
private var tasks: LinkedQueue = null
@@ -56,8 +62,9 @@ object Scheduler extends DelegatingScheduler {
*/
def restart(): Unit = synchronized {
// 1. shut down current scheduler
- if (sched ne null)
+ if (sched ne null) {
sched.shutdown()
+ }
// 2. create and start new scheduler
if ((sched ne null) && sched.isInstanceOf[FJTaskScheduler2]) {
@@ -73,11 +80,8 @@ object Scheduler extends DelegatingScheduler {
tasks = null
}
} else {
- sched = {
- val s = new DefaultExecutorScheduler
- s.start()
- s
- }
+ // will trigger creation of new delegate scheduler
+ sched = null
}
}
diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala
index e83cb5cfb9..dfa1bdaf73 100644
--- a/src/actors/scala/actors/SimpleExecutorScheduler.scala
+++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala
@@ -16,27 +16,34 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException}
/**
* The <code>SimpleExecutorScheduler</code> class uses an
* <code>ExecutorService</code> to execute <code>Actor</code>s. It
- * does not start an additional thread. Also, the underlying
- * <code>ExecutorService</code> is not shut down automatically;
- * instead, the <code>ExecutorService</code> must be shut down either
+ * does not start an additional thread.
+ *
+ * A <code>SimpleExecutorScheduler</code> attempts to shut down
+ * the underlying <code>ExecutorService</code> only if
+ * <code>terminate</code> is set to true.
+ *
+ * Otherwise, the <code>ExecutorService</code> must be shut down either
* directly or by shutting down the
* <code>SimpleExecutorScheduler</code> instance.
*
* @author Philipp Haller
*/
-class SimpleExecutorScheduler(protected var executor: ExecutorService) extends IScheduler {
+class SimpleExecutorScheduler(protected var executor: ExecutorService,
+ protected var terminate: Boolean) extends TerminationService(terminate) {
- /* Maintains at most one closure per actor that is executed
+ /* Maintains per actor one closure that is executed
* when the actor terminates.
*/
protected val termHandlers = new HashMap[OutputChannelActor, () => Unit]
+ private var pendingReactions = 0
+
/* This constructor (and the var above) is currently only used to work
* around a bug in scaladoc, which cannot deal with early initializers
* (to be used in subclasses such as DefaultExecutorScheduler) properly.
*/
def this() {
- this(null)
+ this(null, true)
}
/** Submits a <code>Runnable</code> for execution.
@@ -53,20 +60,7 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I
}
}
- /** Submits a closure for execution.
- *
- * @param block the closure to be executed
- */
- def execute(block: => Unit) {
- val task = new Runnable {
- def run() { block }
- }
- execute(task)
- }
-
- /** Shuts down the scheduler.
- */
- def shutdown() {
+ def onShutdown() {
executor.shutdown()
}
@@ -75,32 +69,4 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService) extends I
*/
def isActive =
(executor ne null) && !executor.isShutdown()
-
- def newActor(a: OutputChannelActor) {}
-
- def terminated(a: OutputChannelActor) {
- // obtain termination handler (if any)
- val todo = synchronized {
- termHandlers.get(a) match {
- case Some(handler) =>
- termHandlers -= a
- () => handler
- case None =>
- () => { /* do nothing */ }
- }
- }
-
- // invoke termination handler (if any)
- todo()
- }
-
- /** Registers a closure to be executed when the specified
- * actor terminates.
- *
- * @param a the actor
- * @param f the closure to be registered
- */
- def onTerminate(a: OutputChannelActor)(block: => Unit) = synchronized {
- termHandlers += (a -> (() => block))
- }
}
diff --git a/src/actors/scala/actors/TerminationMonitor.scala b/src/actors/scala/actors/TerminationMonitor.scala
new file mode 100644
index 0000000000..b7cb748bf0
--- /dev/null
+++ b/src/actors/scala/actors/TerminationMonitor.scala
@@ -0,0 +1,59 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id:$
+
+package scala.actors
+
+import scala.collection.mutable.HashMap
+
+trait TerminationMonitor extends IScheduler {
+
+ private var pendingReactions = 0
+ private val termHandlers = new HashMap[OutputChannelActor, () => Unit]
+
+ /** newActor is invoked whenever a new actor is started. */
+ def newActor(a: OutputChannelActor) = synchronized {
+ pendingReactions += 1
+ }
+
+ /** Registers a closure to be executed when the specified
+ * actor terminates.
+ *
+ * @param a the actor
+ * @param f the closure to be registered
+ */
+ def onTerminate(a: OutputChannelActor)(f: => Unit) = synchronized {
+ termHandlers += (a -> (() => f))
+ }
+
+ def terminated(a: OutputChannelActor) = synchronized {
+ // obtain termination handler (if any)
+ val todo = synchronized {
+ termHandlers.get(a) match {
+ case Some(handler) =>
+ termHandlers -= a
+ () => handler
+ case None =>
+ () => { /* do nothing */ }
+ }
+ }
+
+ // invoke termination handler (if any)
+ todo()
+
+ synchronized {
+ pendingReactions -= 1
+ }
+ }
+
+ protected def allTerminated: Boolean = synchronized {
+ pendingReactions <= 0
+ }
+
+}
diff --git a/src/actors/scala/actors/TerminationService.scala b/src/actors/scala/actors/TerminationService.scala
new file mode 100644
index 0000000000..c983d2d558
--- /dev/null
+++ b/src/actors/scala/actors/TerminationService.scala
@@ -0,0 +1,70 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id: $
+
+package scala.actors
+
+import java.lang.{Runnable, Thread, InterruptedException}
+
+/**
+ * The <code>TerminationService</code> class starts a new thread
+ * that is used to check regularly if the scheduler can be
+ * shut down, because all started actors are known to
+ * have terminated.
+ *
+ * @author Philipp Haller
+ */
+abstract class TerminationService(terminate: Boolean) extends Thread with TerminationMonitor {
+
+ private var terminating = false
+
+ protected val CHECK_FREQ = 50
+
+ def onShutdown(): Unit
+
+ override def run() {
+ try {
+ while (true) {
+ this.synchronized {
+ try {
+ wait(CHECK_FREQ)
+ } catch {
+ case _: InterruptedException =>
+ }
+ if (terminating)
+ throw new QuitException
+
+ if (terminate && allTerminated)
+ throw new QuitException
+ }
+ }
+ } catch {
+ case _: QuitException =>
+ Debug.info(this+": initiating shutdown...")
+ // invoke shutdown hook
+ onShutdown()
+ // allow thread to exit
+ }
+ }
+
+ /** Submits a closure for execution.
+ *
+ * @param fun the closure to be executed
+ */
+ def execute(fun: => Unit): Unit =
+ execute(new Runnable {
+ def run() { fun }
+ })
+
+ /** Shuts down the scheduler.
+ */
+ def shutdown(): Unit = synchronized {
+ terminating = true
+ }
+}