summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-07-11 21:07:42 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-07-11 21:07:42 +0000
commitfdd7b82c5a6fe2fb2c9ac1520d28f0dffc7580fa (patch)
tree2da2ef33c673a5ea2bd02d142ea6aa3c5e852669 /src/actors
parenta25a8c309a6ec7dfa23d039d0f2ed1110eb12652 (diff)
downloadscala-fdd7b82c5a6fe2fb2c9ac1520d28f0dffc7580fa.tar.gz
scala-fdd7b82c5a6fe2fb2c9ac1520d28f0dffc7580fa.tar.bz2
scala-fdd7b82c5a6fe2fb2c9ac1520d28f0dffc7580fa.zip
Added ThreadPoolScheduler supporting managedBlock.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala4
-rw-r--r--src/actors/scala/actors/DelegatingScheduler.scala4
-rw-r--r--src/actors/scala/actors/ForkJoinScheduler.scala9
-rw-r--r--src/actors/scala/actors/IScheduler.scala4
-rw-r--r--src/actors/scala/actors/ManagedBlocker.scala20
-rw-r--r--src/actors/scala/actors/Scheduler.scala8
-rw-r--r--src/actors/scala/actors/ThreadPoolScheduler.scala73
7 files changed, 107 insertions, 15 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 64e6e31eab..015ec917b6 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -14,8 +14,6 @@ import scala.compat.Platform
import java.util.{Timer, TimerTask}
import java.util.concurrent.ExecutionException
-import forkjoin.ForkJoinPool
-
/**
* The <code>Actor</code> object provides functions for the definition of
* actors, as well as actor operations, such as
@@ -644,7 +642,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
scheduler execute task
}
- class ActorBlocker(timeout: Long) extends ForkJoinPool.ManagedBlocker {
+ class ActorBlocker(timeout: Long) extends ManagedBlocker {
def block() = {
if (timeout > 0)
Actor.this.suspendActorFor(timeout)
diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala
index 78c4020bb0..50fad31606 100644
--- a/src/actors/scala/actors/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/DelegatingScheduler.scala
@@ -8,8 +8,6 @@
package scala.actors
-import forkjoin.ForkJoinPool
-
/**
* @author Erik Engbrecht
*/
@@ -51,6 +49,6 @@ trait DelegatingScheduler extends IScheduler {
def onTerminate(actor: Reactor)(f: => Unit) = impl.onTerminate(actor)(f)
- override def managedBlock(blocker: ForkJoinPool.ManagedBlocker): Unit =
+ override def managedBlock(blocker: ManagedBlocker): Unit =
impl.managedBlock(blocker)
}
diff --git a/src/actors/scala/actors/ForkJoinScheduler.scala b/src/actors/scala/actors/ForkJoinScheduler.scala
index 59cc14fbb4..74595acde6 100644
--- a/src/actors/scala/actors/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/ForkJoinScheduler.scala
@@ -16,7 +16,7 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
private val CHECK_FREQ = 50
- override def managedBlock(blocker: ForkJoinPool.ManagedBlocker) {
+ override def managedBlock(blocker: ManagedBlocker) {
ForkJoinPool.managedBlock(blocker, true)
}
@@ -41,6 +41,13 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
} catch {
case _: QuitException =>
Debug.info(this+": initiating shutdown...")
+ while (!pool.isQuiescent()) {
+ try {
+ Thread.sleep(10)
+ } catch {
+ case ignore: InterruptedException =>
+ }
+ }
pool.shutdown()
// allow thread to exit
}
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index 811fd47ce3..3c87c64b1f 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -10,8 +10,6 @@
package scala.actors
-import forkjoin.ForkJoinPool
-
/**
* The <code>IScheduler</code> trait provides a common interface
* for all schedulers used to execute actor tasks.
@@ -66,7 +64,7 @@ trait IScheduler {
*/
def onTerminate(a: Reactor)(f: => Unit): Unit
- def managedBlock(blocker: ForkJoinPool.ManagedBlocker) {
+ def managedBlock(blocker: ManagedBlocker) {
blocker.block()
}
}
diff --git a/src/actors/scala/actors/ManagedBlocker.scala b/src/actors/scala/actors/ManagedBlocker.scala
new file mode 100644
index 0000000000..f3fd08301b
--- /dev/null
+++ b/src/actors/scala/actors/ManagedBlocker.scala
@@ -0,0 +1,20 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+import forkjoin.ForkJoinPool
+
+/**
+ * The <code>ManagedBlocker</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait ManagedBlocker extends ForkJoinPool.ManagedBlocker
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index bd083691e0..d81a9d1bc5 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -25,16 +25,14 @@ object Scheduler extends DelegatingScheduler {
Debug.info("initializing "+this+"...")
def makeNewScheduler: IScheduler = {
-/*
val workQueue = new LinkedBlockingQueue[Runnable](100000)
val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
ThreadPoolConfig.maxPoolSize,
50L,
TimeUnit.MILLISECONDS,
workQueue)
- val s = new SimpleExecutorScheduler(threadPool, true)
-*/
- val s = new ForkJoinScheduler
+ val s = new ThreadPoolScheduler(threadPool, true)
+ //val s = new ForkJoinScheduler
Debug.info(this+": starting new "+s+" ["+s.getClass+"]")
s.start()
s
@@ -45,7 +43,7 @@ object Scheduler extends DelegatingScheduler {
/* Assumes <code>sched</code> holds an instance
* of <code>FJTaskScheduler2</code>.
*/
- def snapshot(): Unit = synchronized {
+ @deprecated def snapshot(): Unit = synchronized {
if (sched.isInstanceOf[FJTaskScheduler2]) {
val fjts = sched.asInstanceOf[FJTaskScheduler2]
tasks = fjts.snapshot()
diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala
new file mode 100644
index 0000000000..c76d2c4f5c
--- /dev/null
+++ b/src/actors/scala/actors/ThreadPoolScheduler.scala
@@ -0,0 +1,73 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
+
+package scala.actors
+
+import scala.collection.mutable.HashMap
+import java.util.concurrent.{ThreadPoolExecutor, RejectedExecutionException}
+
+/**
+ * The <code>ThreadPoolScheduler</code> class uses an
+ * <code>ExecutorService</code> to execute <code>Actor</code>s. It
+ * does not start an additional thread.
+ *
+ * A <code>ThreadPoolScheduler</code> attempts to shut down
+ * the underlying <code>ExecutorService</code> only if
+ * <code>terminate</code> is set to true.
+ *
+ * Otherwise, the <code>ExecutorService</code> must be shut down either
+ * directly or by shutting down the
+ * <code>ThreadPoolScheduler</code> instance.
+ *
+ * @author Philipp Haller
+ */
+class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
+ protected var terminate: Boolean) extends TerminationService(terminate) {
+
+ /* This constructor (and the var above) is currently only used to work
+ * around a bug in scaladoc, which cannot deal with early initializers
+ * (to be used in subclasses such as DefaultExecutorScheduler) properly.
+ */
+ def this() {
+ this(null, true)
+ }
+
+ /** Submits a <code>Runnable</code> for execution.
+ *
+ * @param task the task to be executed
+ */
+ def execute(task: Runnable) {
+ try {
+ executor execute task
+ } catch {
+ case ree: RejectedExecutionException =>
+ // run task on current thread
+ task.run()
+ }
+ }
+
+ def onShutdown() {
+ executor.shutdown()
+ }
+
+ /** The scheduler is active if the underlying <code>ExecutorService</code>
+ * has not been shut down.
+ */
+ def isActive =
+ (executor ne null) && !executor.isShutdown()
+
+ override def managedBlock(blocker: ManagedBlocker) {
+ val coreSize = executor.getCorePoolSize()
+ if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) {
+ executor.setCorePoolSize(coreSize + 1)
+ }
+ blocker.block()
+ }
+}