diff options
-rw-r--r-- | src/actors/scala/actors/FJTaskRunner.java | 24 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskRunnerGroup.java | 29 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 54 | ||||
-rw-r--r-- | src/actors/scala/actors/IFJTaskRunnerGroup.java | 8 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 51 | ||||
-rw-r--r-- | src/actors/scala/actors/ThreadPoolScheduler.scala | 6 | ||||
-rw-r--r-- | src/actors/scala/actors/TickedScheduler.scala | 6 |
7 files changed, 135 insertions, 43 deletions
diff --git a/src/actors/scala/actors/FJTaskRunner.java b/src/actors/scala/actors/FJTaskRunner.java index 991ff97083..73bc9ff1dd 100644 --- a/src/actors/scala/actors/FJTaskRunner.java +++ b/src/actors/scala/actors/FJTaskRunner.java @@ -378,6 +378,12 @@ public class FJTaskRunner extends Thread { + /* -------- Suspending -------- */ + protected boolean suspending = false; + + synchronized void setSuspending(boolean susp) { + suspending = susp; + } /* ------------ DEQ operations ------------------- */ @@ -793,7 +799,6 @@ public class FJTaskRunner extends Thread { public void run() { try{ while (!interrupted()) { - FJTask task = pop(); if (task != null) { if (!task.isDone()) { @@ -806,6 +811,23 @@ public class FJTaskRunner extends Thread { else scanWhileIdling(); } + // check for suspending + if (suspending) { + synchronized(this) { + // move all local tasks to group-wide entry queue + for (int i = 0; i < deq.length; ++i) { + synchronized(group) { + try { + FJTask task = (FJTask)deq[i].take(); + if (task != null) + group.getEntryQueue().put(task); + } catch (InterruptedException ie) { + System.err.println("Suspend: when transferring task to entryQueue: "+ie); + } + } + } + } + } } finally { group.setInactive(this); diff --git a/src/actors/scala/actors/FJTaskRunnerGroup.java b/src/actors/scala/actors/FJTaskRunnerGroup.java index 27c870453f..6fd53d5f97 100644 --- a/src/actors/scala/actors/FJTaskRunnerGroup.java +++ b/src/actors/scala/actors/FJTaskRunnerGroup.java @@ -125,7 +125,11 @@ public class FJTaskRunnerGroup implements IFJTaskRunnerGroup { protected /*final*/ FJTaskRunner[] threads; /** Group-wide queue for tasks entered via execute() **/ - protected final LinkedQueue entryQueue = new LinkedQueue(); + /*protected*/ final LinkedQueue entryQueue = new LinkedQueue(); + + public LinkedQueue getEntryQueue() { + return entryQueue; + } /** Number of threads that are not waiting for work **/ protected int activeCount = 0; @@ -155,6 +159,29 @@ public class FJTaskRunnerGroup implements IFJTaskRunnerGroup { static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1; + /* -------- Suspending -------- */ + + void snapshot() throws InterruptedException { + // set flag in all task runners to suspend + for (int i = 0; i < threads.length; ++i) { + FJTaskRunner t = threads[i]; + t.setSuspending(true); + } + + // interrupt all task runners + // assume: current thread not in threads (scheduler) + for (int i = 0; i < threads.length; ++i) { + Thread t = threads[i]; + t.interrupt(); + } + + // wait until all of them have terminated + for (int i = 0; i < threads.length; ++i) { + Thread t = threads[i]; + t.join(); + } + } + /** * Create a FJTaskRunnerGroup with the indicated number * of FJTaskRunner threads. Normally, the best size to use is diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index ddb0908fd7..6fed4bdd3a 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has /** * FJTaskScheduler2 * - * @version 0.9.4 + * @version 0.9.5 * @author Philipp Haller */ class FJTaskScheduler2 extends Thread with IScheduler { @@ -36,6 +36,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { new FJTaskRunnerGroup(coreSize) private var terminating = false + private var suspending = false private var lastActivity = Platform.currentTime @@ -76,24 +77,26 @@ class FJTaskScheduler2 extends Thread with IScheduler { if (terminating) throw new QuitException } - // check if we need more threads - if (Platform.currentTime - lastActivity >= TICK_FREQ - && coreSize < maxSize - && executor.checkPoolSize()) { - // do nothing - } - else { - if (pendingReactions == 0) { - // if all worker threads idle terminate - if (executor.getActiveCount() == 0) { - // Note that we don't have to shutdown - // the FJTaskRunnerGroup since there is - // no separate thread associated with it, - // and FJTaskRunner threads have daemon status. - - // terminate timer thread - TimerThread.t.interrupt() - throw new QuitException + if (!suspending) { + // check if we need more threads + if (Platform.currentTime - lastActivity >= TICK_FREQ + && coreSize < maxSize + && executor.checkPoolSize()) { + // do nothing + } + else { + if (pendingReactions <= 0) { + // if all worker threads idle terminate + if (executor.getActiveCount() == 0) { + // Note that we don't have to shutdown + // the FJTaskRunnerGroup since there is + // no separate thread associated with it, + // and FJTaskRunner threads have daemon status. + + // terminate timer thread + TimerThread.t.interrupt() + throw new QuitException + } } } } @@ -114,6 +117,10 @@ class FJTaskScheduler2 extends Thread with IScheduler { executor.execute(task) } + def execute(task: FJTask) { + executor.execute(task) + } + def start(task: Reaction) { this.synchronized { pendingReactions = pendingReactions + 1 @@ -141,4 +148,13 @@ class FJTaskScheduler2 extends Thread with IScheduler { // terminate timer thread TimerThread.t.interrupt() } + + def snapshot(): LinkedQueue = synchronized { + suspending = true + executor.snapshot() + // grab tasks from executor + executor.entryQueue + } + + } diff --git a/src/actors/scala/actors/IFJTaskRunnerGroup.java b/src/actors/scala/actors/IFJTaskRunnerGroup.java index 2e9f3359b8..121c55fbea 100644 --- a/src/actors/scala/actors/IFJTaskRunnerGroup.java +++ b/src/actors/scala/actors/IFJTaskRunnerGroup.java @@ -1,6 +1,12 @@ package scala.actors; +/** + * IFJTaskRunnerGroup + * + * @version 0.9.5 + * @author Philipp Haller + */ interface IFJTaskRunnerGroup { public void executeTask(FJTask t); public FJTaskRunner[] getArray(); @@ -8,5 +14,5 @@ interface IFJTaskRunnerGroup { public void setActive(FJTaskRunner t); public void checkActive(FJTaskRunner t, long scans); public void setInactive(FJTaskRunner t); - + public LinkedQueue getEntryQueue(); } diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index fe9fe9d52b..b259817f04 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -22,29 +22,13 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has * The <code>Scheduler</code> object is used by * <code>Actor</code> to execute tasks of an execution of an actor. * - * @version 0.9.4 + * @version 0.9.5 * @author Philipp Haller */ object Scheduler { private var sched: IScheduler = { var s: IScheduler = new FJTaskScheduler2 - -/* - // Check for JDK version >= 1.5 - var olderThanJDK5 = false - try { - java.lang.Class.forName("java.util.concurrent.ThreadPoolExecutor") - } catch { - case _: ClassNotFoundException => - olderThanJDK5 = true - } - - s = if (olderThanJDK5) - new TickedScheduler - else - Class.forName("scala.actors.ThreadPoolScheduler").newInstance().asInstanceOf[IScheduler] -*/ s.start() s } @@ -54,6 +38,25 @@ object Scheduler { sched = scheduler } + var tasks: LinkedQueue = null + + def snapshot(): unit = synchronized { + tasks = sched.snapshot() + sched.shutdown() + } + + def restart(): unit = synchronized { + sched = { + var s: IScheduler = new FJTaskScheduler2 + s.start() + s + } + while (!tasks.isEmpty()) { + sched.execute(tasks.take().asInstanceOf[FJTask]) + } + tasks = null + } + def start(task: Reaction) = sched.start(task) def execute(task: Reaction) = { val t = currentThread @@ -84,19 +87,23 @@ object Scheduler { * This abstract class provides a common interface for all * schedulers used to execute actor tasks. * - * @version 0.9.4 + * @version 0.9.5 * @author Philipp Haller */ trait IScheduler { def start(): unit + def start(task: Reaction): unit def execute(task: Reaction): unit + def execute(task: FJTask): unit + def getTask(worker: WorkerThread): Runnable def tick(a: Actor): unit def terminated(a: Actor): unit def pendReaction: unit def unPendReaction: unit + def snapshot(): LinkedQueue def shutdown(): unit def onLockup(handler: () => unit): unit @@ -114,7 +121,7 @@ trait IScheduler { * This scheduler executes the tasks of an actor on a single * thread (the current thread). * - * @version 0.9.4 + * @version 0.9.5 * @author Philipp Haller */ class SingleThreadedScheduler extends IScheduler { @@ -130,6 +137,11 @@ class SingleThreadedScheduler extends IScheduler { task.run() } + def execute(task: FJTask) { + // execute task immediately on same thread + task.run() + } + def getTask(worker: WorkerThread): Runnable = null def tick(a: Actor): Unit = {} def terminated(a: Actor): unit = {} @@ -137,6 +149,7 @@ class SingleThreadedScheduler extends IScheduler { def unPendReaction: unit = {} def shutdown(): Unit = {} + def snapshot(): LinkedQueue = { null } def onLockup(handler: () => unit): unit = {} def onLockup(millis: int)(handler: () => unit): unit = {} diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala index b7d68cfae5..caa18ecbdc 100644 --- a/src/actors/scala/actors/ThreadPoolScheduler.scala +++ b/src/actors/scala/actors/ThreadPoolScheduler.scala @@ -21,7 +21,7 @@ import java.util.concurrent.{ThreadPoolExecutor, * This handler executes rejected tasks on the thread of * the scheduler. * - * @version 0.9.4 + * @version 0.9.5 * @author Philipp Haller */ private class TaskRejectedHandler(sched: ThreadPoolScheduler) extends RejectedExecutionHandler { @@ -156,6 +156,10 @@ class ThreadPoolScheduler extends Thread with IScheduler { executor.execute(item) } + def execute(task: FJTask) { } + + def snapshot(): LinkedQueue = null + /** * @param worker the worker thread executing tasks * @return the executed task diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala index adc84b00c8..e17c1232c9 100644 --- a/src/actors/scala/actors/TickedScheduler.scala +++ b/src/actors/scala/actors/TickedScheduler.scala @@ -13,7 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has * by the execution of actors. Unlike <code>ThreadPoolScheduler</code>, this * scheduler is available on all Java versions >= 1.4.</p> * - * @version 0.9.4 + * @version 0.9.5 * @author Philipp Haller */ class TickedScheduler extends Thread with IScheduler { @@ -127,6 +127,10 @@ class TickedScheduler extends Thread with IScheduler { } } + def execute(task: FJTask) { } + + def snapshot(): LinkedQueue = null + /** * @param worker the worker thread executing tasks * @return the executed task |