From f1f6d7c6a6de28b6e1aceff271a4977251c0c629 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Fri, 17 Jul 2009 16:30:42 +0000 Subject: Updated to newest revision of ForkJoinPool. --- src/actors/scala/actors/Actor.scala | 2 +- src/actors/scala/actors/DelegatingScheduler.scala | 2 + src/actors/scala/actors/ExecutorScheduler.scala | 3 + src/actors/scala/actors/FJTaskScheduler2.scala | 3 + src/actors/scala/actors/ForkJoinScheduler.scala | 12 +- src/actors/scala/actors/IScheduler.scala | 2 + src/actors/scala/actors/Reactor.scala | 9 +- src/actors/scala/actors/SchedulerAdapter.scala | 3 + .../scala/actors/SimpleExecutorScheduler.scala | 3 + .../scala/actors/SingleThreadedScheduler.scala | 3 + src/actors/scala/actors/ThreadPoolScheduler.scala | 3 + src/actors/scala/actors/forkjoin/ForkJoinPool.java | 247 ++++++++++++++------- src/actors/scala/actors/forkjoin/ForkJoinTask.java | 53 +++-- .../actors/forkjoin/ForkJoinWorkerThread.java | 55 ++++- 14 files changed, 275 insertions(+), 125 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 43d1f7c707..0f0a5f6390 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -640,7 +640,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { val task = new Reaction(this, if (f eq null) continuation else f, msg) - scheduler execute task + scheduler executeFromActor task } class ActorBlocker(timeout: Long) extends ManagedBlocker { diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala index 50fad31606..fd8e932b05 100644 --- a/src/actors/scala/actors/DelegatingScheduler.scala +++ b/src/actors/scala/actors/DelegatingScheduler.scala @@ -36,6 +36,8 @@ trait DelegatingScheduler extends IScheduler { def execute(task: Runnable) = impl.execute(task) + def executeFromActor(task: Runnable) = impl.executeFromActor(task) + def shutdown(): Unit = synchronized { if (sched ne null) { sched.shutdown() diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala index c414d80eba..16a88c1539 100644 --- a/src/actors/scala/actors/ExecutorScheduler.scala +++ b/src/actors/scala/actors/ExecutorScheduler.scala @@ -42,6 +42,9 @@ class ExecutorScheduler(protected var executor: ExecutorService) extends Schedul } } + def executeFromActor(task: Runnable) = + execute(task) + /** This method is called when the SchedulerService * shuts down. */ diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index 3f336c4536..8bd4d176b6 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -119,6 +119,9 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) def execute(task: Runnable): Unit = executor execute task + def executeFromActor(task: Runnable) = + execute(task) + def execute(fun: => Unit): Unit = executor.execute(new Runnable { def run() { fun } diff --git a/src/actors/scala/actors/ForkJoinScheduler.scala b/src/actors/scala/actors/ForkJoinScheduler.scala index 74595acde6..66122654a4 100644 --- a/src/actors/scala/actors/ForkJoinScheduler.scala +++ b/src/actors/scala/actors/ForkJoinScheduler.scala @@ -7,6 +7,8 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor { private val pool = { val p = new ForkJoinPool() + // enable locally FIFO scheduling mode + p.setAsyncMode(true) Debug.info(this+": parallelism "+p.getParallelism()) Debug.info(this+": max pool size "+p.getMaximumPoolSize()) p @@ -54,14 +56,14 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor { } def execute(task: Runnable) { + pool.execute(task) + } + + def executeFromActor(task: Runnable) { val recAction = new RecursiveAction { def compute() = task.run() } - val thread = Thread.currentThread() - if (thread.isInstanceOf[ForkJoinWorkerThread]) - recAction.fork() - else - pool.execute(task) + recAction.fork() } /** Submits a closure for execution. diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala index 3c87c64b1f..8f3b243b71 100644 --- a/src/actors/scala/actors/IScheduler.scala +++ b/src/actors/scala/actors/IScheduler.scala @@ -35,6 +35,8 @@ trait IScheduler { */ def execute(task: Runnable): Unit + def executeFromActor(task: Runnable): Unit + /** Shuts down the scheduler. */ def shutdown(): Unit diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index d99d91aac3..967dd52a86 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -144,10 +144,13 @@ trait Reactor extends OutputChannel[Any] { throw Actor.suspendException } + /* This method is guaranteed to be executed from inside + an actors act method. + */ protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = { - scheduler execute (new LightReaction(this, - if (f eq null) continuation else f, - msg)) + scheduler executeFromActor (new LightReaction(this, + if (f eq null) continuation else f, + msg)) } def start(): Reactor = { diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala index 749847d6b9..6355ee1ace 100644 --- a/src/actors/scala/actors/SchedulerAdapter.scala +++ b/src/actors/scala/actors/SchedulerAdapter.scala @@ -28,6 +28,9 @@ trait SchedulerAdapter extends IScheduler { def execute(task: Runnable): Unit = execute { task.run() } + def executeFromActor(task: Runnable): Unit = + execute(task) + /** Shuts down the scheduler. */ def shutdown(): Unit = diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala index ea9a3ae629..ffca87f87d 100644 --- a/src/actors/scala/actors/SimpleExecutorScheduler.scala +++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala @@ -53,6 +53,9 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService, } } + def executeFromActor(task: Runnable) = + execute(task) + def onShutdown() { executor.shutdown() } diff --git a/src/actors/scala/actors/SingleThreadedScheduler.scala b/src/actors/scala/actors/SingleThreadedScheduler.scala index ae75478cd7..b5f83b3416 100644 --- a/src/actors/scala/actors/SingleThreadedScheduler.scala +++ b/src/actors/scala/actors/SingleThreadedScheduler.scala @@ -23,6 +23,9 @@ class SingleThreadedScheduler extends IScheduler { task.run() } + def executeFromActor(task: Runnable) = + execute(task) + def execute(fun: => Unit): Unit = execute(new Runnable { def run() { fun } diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala index c76d2c4f5c..42cedf5f54 100644 --- a/src/actors/scala/actors/ThreadPoolScheduler.scala +++ b/src/actors/scala/actors/ThreadPoolScheduler.scala @@ -53,6 +53,9 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, } } + def executeFromActor(task: Runnable) = + execute(task) + def onShutdown() { executor.shutdown() } diff --git a/src/actors/scala/actors/forkjoin/ForkJoinPool.java b/src/actors/scala/actors/forkjoin/ForkJoinPool.java index 765e2e1284..2783c249a3 100644 --- a/src/actors/scala/actors/forkjoin/ForkJoinPool.java +++ b/src/actors/scala/actors/forkjoin/ForkJoinPool.java @@ -12,22 +12,10 @@ import java.util.concurrent.atomic.*; import sun.misc.Unsafe; import java.lang.reflect.*; - interface RunnableFuture extends Runnable { //TR placeholder for java.util.concurrent.RunnableFuture } -class Arrays { - //TR placeholder for java.util.Arrays.copyOf - public static T[] copyOf(T[] original, int newLength) { - T[] copy = (T[])new Object[newLength]; - System.arraycopy(original, 0, copy, 0, Math.min(newLength, original.length)); - return copy; - } - -} - - /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. A * ForkJoinPool provides the entry point for submissions from @@ -43,7 +31,9 @@ class Arrays { * (eventually blocking if none exist). This makes them efficient when * most tasks spawn other subtasks (as do most ForkJoinTasks), as well * as the mixed execution of some plain Runnable- or Callable- based - * activities along with ForkJoinTasks. Otherwise, other + * activities along with ForkJoinTasks. When setting + * setAsyncMode, a ForkJoinPools may also be appropriate for + * use with fine-grained tasks that are never joined. Otherwise, other * ExecutorService implementations are typically more appropriate * choices. * @@ -55,7 +45,7 @@ class Arrays { * nested ManagedBlocker interface enables extension of * the kinds of synchronization accommodated. The target parallelism * level may also be changed dynamically (setParallelism) - * and dynamically thread construction can be limited using methods + * and thread construction can be limited using methods * setMaximumPoolSize and/or * setMaintainsParallelism. * @@ -147,13 +137,13 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { new AtomicInteger(); /** - * Array holding all worker threads in the pool. Array size must - * be a power of two. Updates and replacements are protected by - * workerLock, but it is always kept in a consistent enough state - * to be randomly accessed without locking by workers performing - * work-stealing. + * Array holding all worker threads in the pool. Initialized upon + * first use. Array size must be a power of two. Updates and + * replacements are protected by workerLock, but it is always kept + * in a consistent enough state to be randomly accessed without + * locking by workers performing work-stealing. */ - public volatile ForkJoinWorkerThread[] workers; + volatile ForkJoinWorkerThread[] workers; /** * Lock protecting access to workers. @@ -219,6 +209,11 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { */ private volatile int parallelism; + /** + * True if use local fifo, not default lifo, for local polling + */ + private volatile boolean locallyFifo; + /** * Holds number of total (i.e., created and not yet terminated) * and running (i.e., not blocked on joins or other managed sync) @@ -407,7 +402,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { this.termination = workerLock.newCondition(); this.stealCount = new AtomicLong(); this.submissionQueue = new LinkedTransferQueue>(); - createAndStartInitialWorkers(parallelism); + // worker array and workers are lazily constructed } /** @@ -421,6 +416,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { if (w != null) { w.poolIndex = index; w.setDaemon(true); + w.setAsyncMode(locallyFifo); w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index); if (h != null) w.setUncaughtExceptionHandler(h); @@ -443,7 +439,8 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { } /** - * Create or resize array if necessary to hold newLength + * Create or resize array if necessary to hold newLength. + * Call only under exlusion or lock * @return the array */ private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) { @@ -461,35 +458,42 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { */ private void tryShrinkWorkerArray() { ForkJoinWorkerThread[] ws = workers; - int len = ws.length; - int last = len - 1; - while (last >= 0 && ws[last] == null) - --last; - int newLength = arraySizeFor(last+1); - if (newLength < len) - workers = Arrays.copyOf(ws, newLength); + if (ws != null) { + int len = ws.length; + int last = len - 1; + while (last >= 0 && ws[last] == null) + --last; + int newLength = arraySizeFor(last+1); + if (newLength < len) + workers = Arrays.copyOf(ws, newLength); + } } /** - * Initial worker array and worker creation and startup. (This - * must be done under lock to avoid interference by some of the - * newly started threads while creating others.) + * Initialize workers if necessary */ - private void createAndStartInitialWorkers(int ps) { - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps); - for (int i = 0; i < ps; ++i) { - ForkJoinWorkerThread w = createWorker(i); - if (w != null) { - ws[i] = w; - w.start(); - updateWorkerCount(1); + final void ensureWorkerInitialization() { + ForkJoinWorkerThread[] ws = workers; + if (ws == null) { + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + ws = workers; + if (ws == null) { + int ps = parallelism; + ws = ensureWorkerArrayCapacity(ps); + for (int i = 0; i < ps; ++i) { + ForkJoinWorkerThread w = createWorker(i); + if (w != null) { + ws[i] = w; + w.start(); + updateWorkerCount(1); + } + } } + } finally { + lock.unlock(); } - } finally { - lock.unlock(); } } @@ -535,6 +539,8 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { private void doSubmit(ForkJoinTask task) { if (isShutdown()) throw new RejectedExecutionException(); + if (workers == null) + ensureWorkerInitialization(); submissionQueue.offer(task); signalIdleWorkers(); } @@ -637,7 +643,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { public void run() { invoke(); } } - public List> invokeAll(Collection> tasks) { + public List> invokeAll(Collection> tasks) { ArrayList> ts = new ArrayList>(tasks.size()); for (Callable c : tasks) @@ -705,10 +711,12 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { old = ueh; ueh = h; ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null) - w.setUncaughtExceptionHandler(h); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + w.setUncaughtExceptionHandler(h); + } } } finally { lock.unlock(); @@ -814,6 +822,42 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { maintainsParallelism = enable; } + /** + * Establishes local first-in-first-out scheduling mode for forked + * tasks that are never joined. This mode may be more appropriate + * than default locally stack-based mode in applications in which + * worker threads only process asynchronous tasks. This method is + * designed to be invoked only when pool is quiescent, and + * typically only before any tasks are submitted. The effects of + * invocations at ather times may be unpredictable. + * + * @param async if true, use locally FIFO scheduling + * @return the previous mode. + */ + public boolean setAsyncMode(boolean async) { + boolean oldMode = locallyFifo; + locallyFifo = async; + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.setAsyncMode(async); + } + } + return oldMode; + } + + /** + * Returns true if this pool uses local first-in-first-out + * scheduling mode for forked tasks that are never joined. + * + * @return true if this pool uses async mode. + */ + public boolean getAsyncMode() { + return locallyFifo; + } + /** * Returns an estimate of the number of worker threads that are * not blocked waiting to join tasks or for other managed @@ -896,10 +940,12 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { public long getQueuedTaskCount() { long count = 0; ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - count += t.getQueueSize(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + count += t.getQueueSize(); + } } return count; } @@ -933,6 +979,35 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { return submissionQueue.poll(); } + /** + * Removes all available unexecuted submitted and forked tasks + * from scheduling queues and adds them to the given collection, + * without altering their execution status. These may include + * artifically generated or wrapped tasks. This method id designed + * to be invoked only when the pool is known to be + * quiescent. Invocations at other times may not remove all + * tasks. A failure encountered while attempting to add elements + * to collection c may result in elements being in + * neither, either or both collections when the associated + * exception is thrown. The behavior of this operation is + * undefined if the specified collection is modified while the + * operation is in progress. + * @param c the collection to transfer elements into + * @return the number of elements transferred + */ + protected int drainTasksTo(Collection> c) { + int n = submissionQueue.drainTo(c); + ForkJoinWorkerThread[] ws = workers; + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + n += w.drainTasksTo(c); + } + } + return n; + } + /** * Returns a string identifying this pool, as well as its state, * including indications of run state, parallelism level, and @@ -994,8 +1069,10 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * waiting tasks. Tasks that are in the process of being * submitted or executed concurrently during the course of this * method may or may not be rejected. Unlike some other executors, - * this method cancels rather than collects non-executed tasks, - * so always returns an empty list. + * this method cancels rather than collects non-executed tasks + * upon termination, so always returns an empty list. However, you + * can use method drainTasksTo before invoking this + * method to transfer unexecuted tasks to another collection. * @return an empty list * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads @@ -1080,17 +1157,19 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - int idx = w.poolIndex; - if (idx >= 0 && idx < ws.length && ws[idx] == w) - ws[idx] = null; - if (totalCountOf(workerCounts) == 0) { - terminate(); // no-op if already terminating - transitionRunStateTo(TERMINATED); - termination.signalAll(); - } - else if (!isTerminating()) { - tryShrinkWorkerArray(); - tryResumeSpare(true); // allow replacement + if (ws != null) { + int idx = w.poolIndex; + if (idx >= 0 && idx < ws.length && ws[idx] == w) + ws[idx] = null; + if (totalCountOf(workerCounts) == 0) { + terminate(); // no-op if already terminating + transitionRunStateTo(TERMINATED); + termination.signalAll(); + } + else if (!isTerminating()) { + tryShrinkWorkerArray(); + tryResumeSpare(true); // allow replacement + } } } finally { lock.unlock(); @@ -1138,10 +1217,12 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.cancelTasks(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.cancelTasks(); + } } } finally { lock.unlock(); @@ -1157,10 +1238,12 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null) - t.shutdownNow(); + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null) + t.shutdownNow(); + } } } finally { lock.unlock(); @@ -1177,12 +1260,14 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { lock.lock(); try { ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread t = ws[i]; - if (t != null && !t.isTerminated()) { - try { - t.interrupt(); - } catch (SecurityException ignore) { + if (ws != null) { + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread t = ws[i]; + if (t != null && !t.isTerminated()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } } } } diff --git a/src/actors/scala/actors/forkjoin/ForkJoinTask.java b/src/actors/scala/actors/forkjoin/ForkJoinTask.java index 5e40940ff3..230c7a0a20 100644 --- a/src/actors/scala/actors/forkjoin/ForkJoinTask.java +++ b/src/actors/scala/actors/forkjoin/ForkJoinTask.java @@ -926,15 +926,14 @@ public abstract class ForkJoinTask implements Future, Serializable { protected abstract boolean exec(); /** - * Returns, but does not unschedule or execute, the task most - * recently forked by the current thread but not yet executed, if - * one is available. There is no guarantee that this task will - * actually be polled or executed next. - * This method is designed primarily to support extensions, - * and is unlikely to be useful otherwise. - * This method may be invoked only from within - * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors possibly including ClassCastException. + * Returns, but does not unschedule or execute, the task queued by + * the current thread but not yet executed, if one is + * available. There is no guarantee that this task will actually + * be polled or executed next. This method is designed primarily + * to support extensions, and is unlikely to be useful otherwise. + * This method may be invoked only from within ForkJoinTask + * computations. Attempts to invoke in other contexts result in + * exceptions or errors possibly including ClassCastException. * * @return the next task, or null if none are available */ @@ -943,32 +942,32 @@ public abstract class ForkJoinTask implements Future, Serializable { } /** - * Unschedules and returns, without executing, the task most - * recently forked by the current thread but not yet executed. - * This method is designed primarily to support extensions, - * and is unlikely to be useful otherwise. - * This method may be invoked only from within - * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors possibly including ClassCastException. + * Unschedules and returns, without executing, the next task + * queued by the current thread but not yet executed. This method + * is designed primarily to support extensions, and is unlikely to + * be useful otherwise. This method may be invoked only from + * within ForkJoinTask computations. Attempts to invoke in other + * contexts result in exceptions or errors possibly including + * ClassCastException. * * @return the next task, or null if none are available */ protected static ForkJoinTask pollNextLocalTask() { - return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask(); + return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask(); } /** - * Unschedules and returns, without executing, the task most - * recently forked by the current thread but not yet executed, if - * one is available, or if not available, a task that was forked - * by some other thread, if available. Availability may be - * transient, so a null result does not necessarily - * imply quiecence of the pool this task is operating in. - * This method is designed primarily to support extensions, - * and is unlikely to be useful otherwise. - * This method may be invoked only from within + * Unschedules and returns, without executing, the next task + * queued by the current thread but not yet executed, if one is + * available, or if not available, a task that was forked by some + * other thread, if available. Availability may be transient, so a + * null result does not necessarily imply quiecence + * of the pool this task is operating in. This method is designed + * primarily to support extensions, and is unlikely to be useful + * otherwise. This method may be invoked only from within * ForkJoinTask computations. Attempts to invoke in other contexts - * result in exceptions or errors possibly including ClassCastException. + * result in exceptions or errors possibly including + * ClassCastException. * * @return a task, or null if none are available */ diff --git a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java index ad22a94afa..2d75987f91 100644 --- a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java +++ b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java @@ -17,8 +17,8 @@ import java.lang.reflect.*; * subclassable solely for the sake of adding functionality -- there * are no overridable methods dealing with scheduling or * execution. However, you can override initialization and termination - * cleanup methods surrounding the main task processing loop. If you - * do create such a subclass, you will also need to supply a custom + * methods surrounding the main task processing loop. If you do + * create such a subclass, you will also need to supply a custom * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. * */ @@ -198,6 +198,11 @@ public class ForkJoinWorkerThread extends Thread { */ long lastEventCount; + /** + * True if use local fifo, not default lifo, for local polling + */ + private boolean locallyFifo; + /** * Creates a ForkJoinWorkerThread operating in the given pool. * @param pool the pool this thread works in @@ -232,6 +237,14 @@ public class ForkJoinWorkerThread extends Thread { return poolIndex; } + /** + * Establishes local first-in-first-out scheduling mode for forked + * tasks that are never joined. + * @param async if true, use locally FIFO scheduling + */ + void setAsyncMode(boolean async) { + locallyFifo = async; + } // Runstate management @@ -392,7 +405,7 @@ public class ForkJoinWorkerThread extends Thread { */ private static void setSlot(ForkJoinTask[] q, int i, ForkJoinTask t){ -//TR _unsafe.putOrderedObject((Object)q, (i << qShift) + qBase, (Object)t); +//TR _unsafe.putOrderedObject(q, (i << qShift) + qBase, t); _unsafe.putObjectVolatile((Object)q, (i << qShift) + qBase, (Object)t); } @@ -436,7 +449,7 @@ public class ForkJoinWorkerThread extends Thread { * either empty or contended. * @return a task, or null if none or contended. */ - private ForkJoinTask deqTask() { + final ForkJoinTask deqTask() { ForkJoinTask t; ForkJoinTask[] q; int i; @@ -490,11 +503,15 @@ public class ForkJoinWorkerThread extends Thread { } /** - * Returns next task to pop. + * Returns next task. */ final ForkJoinTask peekTask() { ForkJoinTask[] q = queue; - return q == null? null : q[(sp - 1) & (q.length - 1)]; + if (q == null) + return null; + int mask = q.length - 1; + int i = locallyFifo? base : (sp - 1); + return q[i & mask]; } /** @@ -572,16 +589,24 @@ public class ForkJoinWorkerThread extends Thread { } /** - * Pops or steals a task + * gets and removes a local or stolen a task * @return a task, if available */ final ForkJoinTask pollTask() { - ForkJoinTask t = popTask(); + ForkJoinTask t = locallyFifo? deqTask() : popTask(); if (t == null && (t = scan()) != null) ++stealCount; return t; } + /** + * gets a local task + * @return a task, if available + */ + final ForkJoinTask pollLocalTask() { + return locallyFifo? deqTask() : popTask(); + } + /** * Returns a pool submission, if one exists, activating first. * @return a submission, if available @@ -608,6 +633,20 @@ public class ForkJoinWorkerThread extends Thread { t.cancelIgnoringExceptions(); } + /** + * Drains tasks to given collection c + * @return the number of tasks drained + */ + final int drainTasksTo(Collection> c) { + int n = 0; + ForkJoinTask t; + while (base != sp && (t = deqTask()) != null) { + c.add(t); + ++n; + } + return n; + } + /** * Get and clear steal count for accumulation by pool. Called * only when known to be idle (in pool.sync and termination). -- cgit v1.2.3