summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-03-16 15:25:00 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-03-16 15:25:00 +0000
commit00a38096af091c0daef84b687c20601834644b0a (patch)
treee0009fe204fe5e1ec96372704034ec321fb62d4f
parent5de3ead55f9828c986f0369038d4e87432890f66 (diff)
downloadscala-00a38096af091c0daef84b687c20601834644b0a.tar.gz
scala-00a38096af091c0daef84b687c20601834644b0a.tar.bz2
scala-00a38096af091c0daef84b687c20601834644b0a.zip
scala.actors: added snapshot/restart for Schedu...
scala.actors: added snapshot/restart for Scheduler.
-rw-r--r--src/actors/scala/actors/FJTaskRunner.java24
-rw-r--r--src/actors/scala/actors/FJTaskRunnerGroup.java29
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala54
-rw-r--r--src/actors/scala/actors/IFJTaskRunnerGroup.java8
-rw-r--r--src/actors/scala/actors/Scheduler.scala51
-rw-r--r--src/actors/scala/actors/ThreadPoolScheduler.scala6
-rw-r--r--src/actors/scala/actors/TickedScheduler.scala6
7 files changed, 135 insertions, 43 deletions
diff --git a/src/actors/scala/actors/FJTaskRunner.java b/src/actors/scala/actors/FJTaskRunner.java
index 991ff97083..73bc9ff1dd 100644
--- a/src/actors/scala/actors/FJTaskRunner.java
+++ b/src/actors/scala/actors/FJTaskRunner.java
@@ -378,6 +378,12 @@ public class FJTaskRunner extends Thread {
+ /* -------- Suspending -------- */
+ protected boolean suspending = false;
+
+ synchronized void setSuspending(boolean susp) {
+ suspending = susp;
+ }
/* ------------ DEQ operations ------------------- */
@@ -793,7 +799,6 @@ public class FJTaskRunner extends Thread {
public void run() {
try{
while (!interrupted()) {
-
FJTask task = pop();
if (task != null) {
if (!task.isDone()) {
@@ -806,6 +811,23 @@ public class FJTaskRunner extends Thread {
else
scanWhileIdling();
}
+ // check for suspending
+ if (suspending) {
+ synchronized(this) {
+ // move all local tasks to group-wide entry queue
+ for (int i = 0; i < deq.length; ++i) {
+ synchronized(group) {
+ try {
+ FJTask task = (FJTask)deq[i].take();
+ if (task != null)
+ group.getEntryQueue().put(task);
+ } catch (InterruptedException ie) {
+ System.err.println("Suspend: when transferring task to entryQueue: "+ie);
+ }
+ }
+ }
+ }
+ }
}
finally {
group.setInactive(this);
diff --git a/src/actors/scala/actors/FJTaskRunnerGroup.java b/src/actors/scala/actors/FJTaskRunnerGroup.java
index 27c870453f..6fd53d5f97 100644
--- a/src/actors/scala/actors/FJTaskRunnerGroup.java
+++ b/src/actors/scala/actors/FJTaskRunnerGroup.java
@@ -125,7 +125,11 @@ public class FJTaskRunnerGroup implements IFJTaskRunnerGroup {
protected /*final*/ FJTaskRunner[] threads;
/** Group-wide queue for tasks entered via execute() **/
- protected final LinkedQueue entryQueue = new LinkedQueue();
+ /*protected*/ final LinkedQueue entryQueue = new LinkedQueue();
+
+ public LinkedQueue getEntryQueue() {
+ return entryQueue;
+ }
/** Number of threads that are not waiting for work **/
protected int activeCount = 0;
@@ -155,6 +159,29 @@ public class FJTaskRunnerGroup implements IFJTaskRunnerGroup {
static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;
+ /* -------- Suspending -------- */
+
+ void snapshot() throws InterruptedException {
+ // set flag in all task runners to suspend
+ for (int i = 0; i < threads.length; ++i) {
+ FJTaskRunner t = threads[i];
+ t.setSuspending(true);
+ }
+
+ // interrupt all task runners
+ // assume: current thread not in threads (scheduler)
+ for (int i = 0; i < threads.length; ++i) {
+ Thread t = threads[i];
+ t.interrupt();
+ }
+
+ // wait until all of them have terminated
+ for (int i = 0; i < threads.length; ++i) {
+ Thread t = threads[i];
+ t.join();
+ }
+ }
+
/**
* Create a FJTaskRunnerGroup with the indicated number
* of FJTaskRunner threads. Normally, the best size to use is
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index ddb0908fd7..6fed4bdd3a 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
/**
* FJTaskScheduler2
*
- * @version 0.9.4
+ * @version 0.9.5
* @author Philipp Haller
*/
class FJTaskScheduler2 extends Thread with IScheduler {
@@ -36,6 +36,7 @@ class FJTaskScheduler2 extends Thread with IScheduler {
new FJTaskRunnerGroup(coreSize)
private var terminating = false
+ private var suspending = false
private var lastActivity = Platform.currentTime
@@ -76,24 +77,26 @@ class FJTaskScheduler2 extends Thread with IScheduler {
if (terminating) throw new QuitException
}
- // check if we need more threads
- if (Platform.currentTime - lastActivity >= TICK_FREQ
- && coreSize < maxSize
- && executor.checkPoolSize()) {
- // do nothing
- }
- else {
- if (pendingReactions == 0) {
- // if all worker threads idle terminate
- if (executor.getActiveCount() == 0) {
- // Note that we don't have to shutdown
- // the FJTaskRunnerGroup since there is
- // no separate thread associated with it,
- // and FJTaskRunner threads have daemon status.
-
- // terminate timer thread
- TimerThread.t.interrupt()
- throw new QuitException
+ if (!suspending) {
+ // check if we need more threads
+ if (Platform.currentTime - lastActivity >= TICK_FREQ
+ && coreSize < maxSize
+ && executor.checkPoolSize()) {
+ // do nothing
+ }
+ else {
+ if (pendingReactions <= 0) {
+ // if all worker threads idle terminate
+ if (executor.getActiveCount() == 0) {
+ // Note that we don't have to shutdown
+ // the FJTaskRunnerGroup since there is
+ // no separate thread associated with it,
+ // and FJTaskRunner threads have daemon status.
+
+ // terminate timer thread
+ TimerThread.t.interrupt()
+ throw new QuitException
+ }
}
}
}
@@ -114,6 +117,10 @@ class FJTaskScheduler2 extends Thread with IScheduler {
executor.execute(task)
}
+ def execute(task: FJTask) {
+ executor.execute(task)
+ }
+
def start(task: Reaction) {
this.synchronized {
pendingReactions = pendingReactions + 1
@@ -141,4 +148,13 @@ class FJTaskScheduler2 extends Thread with IScheduler {
// terminate timer thread
TimerThread.t.interrupt()
}
+
+ def snapshot(): LinkedQueue = synchronized {
+ suspending = true
+ executor.snapshot()
+ // grab tasks from executor
+ executor.entryQueue
+ }
+
+
}
diff --git a/src/actors/scala/actors/IFJTaskRunnerGroup.java b/src/actors/scala/actors/IFJTaskRunnerGroup.java
index 2e9f3359b8..121c55fbea 100644
--- a/src/actors/scala/actors/IFJTaskRunnerGroup.java
+++ b/src/actors/scala/actors/IFJTaskRunnerGroup.java
@@ -1,6 +1,12 @@
package scala.actors;
+/**
+ * IFJTaskRunnerGroup
+ *
+ * @version 0.9.5
+ * @author Philipp Haller
+ */
interface IFJTaskRunnerGroup {
public void executeTask(FJTask t);
public FJTaskRunner[] getArray();
@@ -8,5 +14,5 @@ interface IFJTaskRunnerGroup {
public void setActive(FJTaskRunner t);
public void checkActive(FJTaskRunner t, long scans);
public void setInactive(FJTaskRunner t);
-
+ public LinkedQueue getEntryQueue();
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index fe9fe9d52b..b259817f04 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -22,29 +22,13 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
* The <code>Scheduler</code> object is used by
* <code>Actor</code> to execute tasks of an execution of an actor.
*
- * @version 0.9.4
+ * @version 0.9.5
* @author Philipp Haller
*/
object Scheduler {
private var sched: IScheduler =
{
var s: IScheduler = new FJTaskScheduler2
-
-/*
- // Check for JDK version >= 1.5
- var olderThanJDK5 = false
- try {
- java.lang.Class.forName("java.util.concurrent.ThreadPoolExecutor")
- } catch {
- case _: ClassNotFoundException =>
- olderThanJDK5 = true
- }
-
- s = if (olderThanJDK5)
- new TickedScheduler
- else
- Class.forName("scala.actors.ThreadPoolScheduler").newInstance().asInstanceOf[IScheduler]
-*/
s.start()
s
}
@@ -54,6 +38,25 @@ object Scheduler {
sched = scheduler
}
+ var tasks: LinkedQueue = null
+
+ def snapshot(): unit = synchronized {
+ tasks = sched.snapshot()
+ sched.shutdown()
+ }
+
+ def restart(): unit = synchronized {
+ sched = {
+ var s: IScheduler = new FJTaskScheduler2
+ s.start()
+ s
+ }
+ while (!tasks.isEmpty()) {
+ sched.execute(tasks.take().asInstanceOf[FJTask])
+ }
+ tasks = null
+ }
+
def start(task: Reaction) = sched.start(task)
def execute(task: Reaction) = {
val t = currentThread
@@ -84,19 +87,23 @@ object Scheduler {
* This abstract class provides a common interface for all
* schedulers used to execute actor tasks.
*
- * @version 0.9.4
+ * @version 0.9.5
* @author Philipp Haller
*/
trait IScheduler {
def start(): unit
+
def start(task: Reaction): unit
def execute(task: Reaction): unit
+ def execute(task: FJTask): unit
+
def getTask(worker: WorkerThread): Runnable
def tick(a: Actor): unit
def terminated(a: Actor): unit
def pendReaction: unit
def unPendReaction: unit
+ def snapshot(): LinkedQueue
def shutdown(): unit
def onLockup(handler: () => unit): unit
@@ -114,7 +121,7 @@ trait IScheduler {
* This scheduler executes the tasks of an actor on a single
* thread (the current thread).
*
- * @version 0.9.4
+ * @version 0.9.5
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
@@ -130,6 +137,11 @@ class SingleThreadedScheduler extends IScheduler {
task.run()
}
+ def execute(task: FJTask) {
+ // execute task immediately on same thread
+ task.run()
+ }
+
def getTask(worker: WorkerThread): Runnable = null
def tick(a: Actor): Unit = {}
def terminated(a: Actor): unit = {}
@@ -137,6 +149,7 @@ class SingleThreadedScheduler extends IScheduler {
def unPendReaction: unit = {}
def shutdown(): Unit = {}
+ def snapshot(): LinkedQueue = { null }
def onLockup(handler: () => unit): unit = {}
def onLockup(millis: int)(handler: () => unit): unit = {}
diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala
index b7d68cfae5..caa18ecbdc 100644
--- a/src/actors/scala/actors/ThreadPoolScheduler.scala
+++ b/src/actors/scala/actors/ThreadPoolScheduler.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{ThreadPoolExecutor,
* This handler executes rejected tasks on the thread of
* the scheduler.
*
- * @version 0.9.4
+ * @version 0.9.5
* @author Philipp Haller
*/
private class TaskRejectedHandler(sched: ThreadPoolScheduler) extends RejectedExecutionHandler {
@@ -156,6 +156,10 @@ class ThreadPoolScheduler extends Thread with IScheduler {
executor.execute(item)
}
+ def execute(task: FJTask) { }
+
+ def snapshot(): LinkedQueue = null
+
/**
* @param worker the worker thread executing tasks
* @return the executed task
diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala
index adc84b00c8..e17c1232c9 100644
--- a/src/actors/scala/actors/TickedScheduler.scala
+++ b/src/actors/scala/actors/TickedScheduler.scala
@@ -13,7 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
* by the execution of actors. Unlike <code>ThreadPoolScheduler</code>, this
* scheduler is available on all Java versions >= 1.4.</p>
*
- * @version 0.9.4
+ * @version 0.9.5
* @author Philipp Haller
*/
class TickedScheduler extends Thread with IScheduler {
@@ -127,6 +127,10 @@ class TickedScheduler extends Thread with IScheduler {
}
}
+ def execute(task: FJTask) { }
+
+ def snapshot(): LinkedQueue = null
+
/**
* @param worker the worker thread executing tasks
* @return the executed task