summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-07-17 16:30:42 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-07-17 16:30:42 +0000
commitf1f6d7c6a6de28b6e1aceff271a4977251c0c629 (patch)
tree92076e1e724a51c16929e572c8be683b616e64c8 /src/actors
parent6581c02a2ecfe72ba9403c805c94f7d73027b80c (diff)
downloadscala-f1f6d7c6a6de28b6e1aceff271a4977251c0c629.tar.gz
scala-f1f6d7c6a6de28b6e1aceff271a4977251c0c629.tar.bz2
scala-f1f6d7c6a6de28b6e1aceff271a4977251c0c629.zip
Updated to newest revision of ForkJoinPool.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala2
-rw-r--r--src/actors/scala/actors/DelegatingScheduler.scala2
-rw-r--r--src/actors/scala/actors/ExecutorScheduler.scala3
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala3
-rw-r--r--src/actors/scala/actors/ForkJoinScheduler.scala12
-rw-r--r--src/actors/scala/actors/IScheduler.scala2
-rw-r--r--src/actors/scala/actors/Reactor.scala9
-rw-r--r--src/actors/scala/actors/SchedulerAdapter.scala3
-rw-r--r--src/actors/scala/actors/SimpleExecutorScheduler.scala3
-rw-r--r--src/actors/scala/actors/SingleThreadedScheduler.scala3
-rw-r--r--src/actors/scala/actors/ThreadPoolScheduler.scala3
-rw-r--r--src/actors/scala/actors/forkjoin/ForkJoinPool.java247
-rw-r--r--src/actors/scala/actors/forkjoin/ForkJoinTask.java53
-rw-r--r--src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java55
14 files changed, 275 insertions, 125 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 43d1f7c707..0f0a5f6390 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -640,7 +640,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
val task = new Reaction(this,
if (f eq null) continuation else f,
msg)
- scheduler execute task
+ scheduler executeFromActor task
}
class ActorBlocker(timeout: Long) extends ManagedBlocker {
diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala
index 50fad31606..fd8e932b05 100644
--- a/src/actors/scala/actors/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/DelegatingScheduler.scala
@@ -36,6 +36,8 @@ trait DelegatingScheduler extends IScheduler {
def execute(task: Runnable) = impl.execute(task)
+ def executeFromActor(task: Runnable) = impl.executeFromActor(task)
+
def shutdown(): Unit = synchronized {
if (sched ne null) {
sched.shutdown()
diff --git a/src/actors/scala/actors/ExecutorScheduler.scala b/src/actors/scala/actors/ExecutorScheduler.scala
index c414d80eba..16a88c1539 100644
--- a/src/actors/scala/actors/ExecutorScheduler.scala
+++ b/src/actors/scala/actors/ExecutorScheduler.scala
@@ -42,6 +42,9 @@ class ExecutorScheduler(protected var executor: ExecutorService) extends Schedul
}
}
+ def executeFromActor(task: Runnable) =
+ execute(task)
+
/** This method is called when the <code>SchedulerService</code>
* shuts down.
*/
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index 3f336c4536..8bd4d176b6 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -119,6 +119,9 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean)
def execute(task: Runnable): Unit =
executor execute task
+ def executeFromActor(task: Runnable) =
+ execute(task)
+
def execute(fun: => Unit): Unit =
executor.execute(new Runnable {
def run() { fun }
diff --git a/src/actors/scala/actors/ForkJoinScheduler.scala b/src/actors/scala/actors/ForkJoinScheduler.scala
index 74595acde6..66122654a4 100644
--- a/src/actors/scala/actors/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/ForkJoinScheduler.scala
@@ -7,6 +7,8 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
private val pool = {
val p = new ForkJoinPool()
+ // enable locally FIFO scheduling mode
+ p.setAsyncMode(true)
Debug.info(this+": parallelism "+p.getParallelism())
Debug.info(this+": max pool size "+p.getMaximumPoolSize())
p
@@ -54,14 +56,14 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
}
def execute(task: Runnable) {
+ pool.execute(task)
+ }
+
+ def executeFromActor(task: Runnable) {
val recAction = new RecursiveAction {
def compute() = task.run()
}
- val thread = Thread.currentThread()
- if (thread.isInstanceOf[ForkJoinWorkerThread])
- recAction.fork()
- else
- pool.execute(task)
+ recAction.fork()
}
/** Submits a closure for execution.
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index 3c87c64b1f..8f3b243b71 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -35,6 +35,8 @@ trait IScheduler {
*/
def execute(task: Runnable): Unit
+ def executeFromActor(task: Runnable): Unit
+
/** Shuts down the scheduler.
*/
def shutdown(): Unit
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index d99d91aac3..967dd52a86 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -144,10 +144,13 @@ trait Reactor extends OutputChannel[Any] {
throw Actor.suspendException
}
+ /* This method is guaranteed to be executed from inside
+ an actors act method.
+ */
protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
- scheduler execute (new LightReaction(this,
- if (f eq null) continuation else f,
- msg))
+ scheduler executeFromActor (new LightReaction(this,
+ if (f eq null) continuation else f,
+ msg))
}
def start(): Reactor = {
diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala
index 749847d6b9..6355ee1ace 100644
--- a/src/actors/scala/actors/SchedulerAdapter.scala
+++ b/src/actors/scala/actors/SchedulerAdapter.scala
@@ -28,6 +28,9 @@ trait SchedulerAdapter extends IScheduler {
def execute(task: Runnable): Unit =
execute { task.run() }
+ def executeFromActor(task: Runnable): Unit =
+ execute(task)
+
/** Shuts down the scheduler.
*/
def shutdown(): Unit =
diff --git a/src/actors/scala/actors/SimpleExecutorScheduler.scala b/src/actors/scala/actors/SimpleExecutorScheduler.scala
index ea9a3ae629..ffca87f87d 100644
--- a/src/actors/scala/actors/SimpleExecutorScheduler.scala
+++ b/src/actors/scala/actors/SimpleExecutorScheduler.scala
@@ -53,6 +53,9 @@ class SimpleExecutorScheduler(protected var executor: ExecutorService,
}
}
+ def executeFromActor(task: Runnable) =
+ execute(task)
+
def onShutdown() {
executor.shutdown()
}
diff --git a/src/actors/scala/actors/SingleThreadedScheduler.scala b/src/actors/scala/actors/SingleThreadedScheduler.scala
index ae75478cd7..b5f83b3416 100644
--- a/src/actors/scala/actors/SingleThreadedScheduler.scala
+++ b/src/actors/scala/actors/SingleThreadedScheduler.scala
@@ -23,6 +23,9 @@ class SingleThreadedScheduler extends IScheduler {
task.run()
}
+ def executeFromActor(task: Runnable) =
+ execute(task)
+
def execute(fun: => Unit): Unit =
execute(new Runnable {
def run() { fun }
diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala
index c76d2c4f5c..42cedf5f54 100644
--- a/src/actors/scala/actors/ThreadPoolScheduler.scala
+++ b/src/actors/scala/actors/ThreadPoolScheduler.scala
@@ -53,6 +53,9 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
}
}
+ def executeFromActor(task: Runnable) =
+ execute(task)
+
def onShutdown() {
executor.shutdown()
}
diff --git a/src/actors/scala/actors/forkjoin/ForkJoinPool.java b/src/actors/scala/actors/forkjoin/ForkJoinPool.java
index 765e2e1284..2783c249a3 100644
--- a/src/actors/scala/actors/forkjoin/ForkJoinPool.java
+++ b/src/actors/scala/actors/forkjoin/ForkJoinPool.java
@@ -12,22 +12,10 @@ import java.util.concurrent.atomic.*;
import sun.misc.Unsafe;
import java.lang.reflect.*;
-
interface RunnableFuture<T> extends Runnable {
//TR placeholder for java.util.concurrent.RunnableFuture
}
-class Arrays {
- //TR placeholder for java.util.Arrays.copyOf
- public static <T> T[] copyOf(T[] original, int newLength) {
- T[] copy = (T[])new Object[newLength];
- System.arraycopy(original, 0, copy, 0, Math.min(newLength, original.length));
- return copy;
- }
-
-}
-
-
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s. A
* ForkJoinPool provides the entry point for submissions from
@@ -43,7 +31,9 @@ class Arrays {
* (eventually blocking if none exist). This makes them efficient when
* most tasks spawn other subtasks (as do most ForkJoinTasks), as well
* as the mixed execution of some plain Runnable- or Callable- based
- * activities along with ForkJoinTasks. Otherwise, other
+ * activities along with ForkJoinTasks. When setting
+ * <tt>setAsyncMode</tt>, a ForkJoinPools may also be appropriate for
+ * use with fine-grained tasks that are never joined. Otherwise, other
* ExecutorService implementations are typically more appropriate
* choices.
*
@@ -55,7 +45,7 @@ class Arrays {
* nested <code>ManagedBlocker</code> interface enables extension of
* the kinds of synchronization accommodated. The target parallelism
* level may also be changed dynamically (<code>setParallelism</code>)
- * and dynamically thread construction can be limited using methods
+ * and thread construction can be limited using methods
* <code>setMaximumPoolSize</code> and/or
* <code>setMaintainsParallelism</code>.
*
@@ -147,13 +137,13 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
new AtomicInteger();
/**
- * Array holding all worker threads in the pool. Array size must
- * be a power of two. Updates and replacements are protected by
- * workerLock, but it is always kept in a consistent enough state
- * to be randomly accessed without locking by workers performing
- * work-stealing.
+ * Array holding all worker threads in the pool. Initialized upon
+ * first use. Array size must be a power of two. Updates and
+ * replacements are protected by workerLock, but it is always kept
+ * in a consistent enough state to be randomly accessed without
+ * locking by workers performing work-stealing.
*/
- public volatile ForkJoinWorkerThread[] workers;
+ volatile ForkJoinWorkerThread[] workers;
/**
* Lock protecting access to workers.
@@ -220,6 +210,11 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
private volatile int parallelism;
/**
+ * True if use local fifo, not default lifo, for local polling
+ */
+ private volatile boolean locallyFifo;
+
+ /**
* Holds number of total (i.e., created and not yet terminated)
* and running (i.e., not blocked on joins or other managed sync)
* threads, packed into one int to ensure consistent snapshot when
@@ -407,7 +402,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
this.termination = workerLock.newCondition();
this.stealCount = new AtomicLong();
this.submissionQueue = new LinkedTransferQueue<ForkJoinTask<?>>();
- createAndStartInitialWorkers(parallelism);
+ // worker array and workers are lazily constructed
}
/**
@@ -421,6 +416,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
if (w != null) {
w.poolIndex = index;
w.setDaemon(true);
+ w.setAsyncMode(locallyFifo);
w.setName("ForkJoinPool-" + poolNumber + "-worker-" + index);
if (h != null)
w.setUncaughtExceptionHandler(h);
@@ -443,7 +439,8 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
}
/**
- * Create or resize array if necessary to hold newLength
+ * Create or resize array if necessary to hold newLength.
+ * Call only under exlusion or lock
* @return the array
*/
private ForkJoinWorkerThread[] ensureWorkerArrayCapacity(int newLength) {
@@ -461,35 +458,42 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
*/
private void tryShrinkWorkerArray() {
ForkJoinWorkerThread[] ws = workers;
- int len = ws.length;
- int last = len - 1;
- while (last >= 0 && ws[last] == null)
- --last;
- int newLength = arraySizeFor(last+1);
- if (newLength < len)
- workers = Arrays.copyOf(ws, newLength);
+ if (ws != null) {
+ int len = ws.length;
+ int last = len - 1;
+ while (last >= 0 && ws[last] == null)
+ --last;
+ int newLength = arraySizeFor(last+1);
+ if (newLength < len)
+ workers = Arrays.copyOf(ws, newLength);
+ }
}
/**
- * Initial worker array and worker creation and startup. (This
- * must be done under lock to avoid interference by some of the
- * newly started threads while creating others.)
+ * Initialize workers if necessary
*/
- private void createAndStartInitialWorkers(int ps) {
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- ForkJoinWorkerThread[] ws = ensureWorkerArrayCapacity(ps);
- for (int i = 0; i < ps; ++i) {
- ForkJoinWorkerThread w = createWorker(i);
- if (w != null) {
- ws[i] = w;
- w.start();
- updateWorkerCount(1);
+ final void ensureWorkerInitialization() {
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws == null) {
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ ws = workers;
+ if (ws == null) {
+ int ps = parallelism;
+ ws = ensureWorkerArrayCapacity(ps);
+ for (int i = 0; i < ps; ++i) {
+ ForkJoinWorkerThread w = createWorker(i);
+ if (w != null) {
+ ws[i] = w;
+ w.start();
+ updateWorkerCount(1);
+ }
+ }
}
+ } finally {
+ lock.unlock();
}
- } finally {
- lock.unlock();
}
}
@@ -535,6 +539,8 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
private <T> void doSubmit(ForkJoinTask<T> task) {
if (isShutdown())
throw new RejectedExecutionException();
+ if (workers == null)
+ ensureWorkerInitialization();
submissionQueue.offer(task);
signalIdleWorkers();
}
@@ -637,7 +643,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
public void run() { invoke(); }
}
- public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks) {
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
ArrayList<ForkJoinTask<T>> ts =
new ArrayList<ForkJoinTask<T>>(tasks.size());
for (Callable<T> c : tasks)
@@ -705,10 +711,12 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
old = ueh;
ueh = h;
ForkJoinWorkerThread[] ws = workers;
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread w = ws[i];
- if (w != null)
- w.setUncaughtExceptionHandler(h);
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread w = ws[i];
+ if (w != null)
+ w.setUncaughtExceptionHandler(h);
+ }
}
} finally {
lock.unlock();
@@ -815,6 +823,42 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
}
/**
+ * Establishes local first-in-first-out scheduling mode for forked
+ * tasks that are never joined. This mode may be more appropriate
+ * than default locally stack-based mode in applications in which
+ * worker threads only process asynchronous tasks. This method is
+ * designed to be invoked only when pool is quiescent, and
+ * typically only before any tasks are submitted. The effects of
+ * invocations at ather times may be unpredictable.
+ *
+ * @param async if true, use locally FIFO scheduling
+ * @return the previous mode.
+ */
+ public boolean setAsyncMode(boolean async) {
+ boolean oldMode = locallyFifo;
+ locallyFifo = async;
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null)
+ t.setAsyncMode(async);
+ }
+ }
+ return oldMode;
+ }
+
+ /**
+ * Returns true if this pool uses local first-in-first-out
+ * scheduling mode for forked tasks that are never joined.
+ *
+ * @return true if this pool uses async mode.
+ */
+ public boolean getAsyncMode() {
+ return locallyFifo;
+ }
+
+ /**
* Returns an estimate of the number of worker threads that are
* not blocked waiting to join tasks or for other managed
* synchronization.
@@ -896,10 +940,12 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
public long getQueuedTaskCount() {
long count = 0;
ForkJoinWorkerThread[] ws = workers;
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread t = ws[i];
- if (t != null)
- count += t.getQueueSize();
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null)
+ count += t.getQueueSize();
+ }
}
return count;
}
@@ -934,6 +980,35 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
}
/**
+ * Removes all available unexecuted submitted and forked tasks
+ * from scheduling queues and adds them to the given collection,
+ * without altering their execution status. These may include
+ * artifically generated or wrapped tasks. This method id designed
+ * to be invoked only when the pool is known to be
+ * quiescent. Invocations at other times may not remove all
+ * tasks. A failure encountered while attempting to add elements
+ * to collection <tt>c</tt> may result in elements being in
+ * neither, either or both collections when the associated
+ * exception is thrown. The behavior of this operation is
+ * undefined if the specified collection is modified while the
+ * operation is in progress.
+ * @param c the collection to transfer elements into
+ * @return the number of elements transferred
+ */
+ protected int drainTasksTo(Collection<ForkJoinTask<?>> c) {
+ int n = submissionQueue.drainTo(c);
+ ForkJoinWorkerThread[] ws = workers;
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread w = ws[i];
+ if (w != null)
+ n += w.drainTasksTo(c);
+ }
+ }
+ return n;
+ }
+
+ /**
* Returns a string identifying this pool, as well as its state,
* including indications of run state, parallelism level, and
* worker and task counts.
@@ -994,8 +1069,10 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
* waiting tasks. Tasks that are in the process of being
* submitted or executed concurrently during the course of this
* method may or may not be rejected. Unlike some other executors,
- * this method cancels rather than collects non-executed tasks,
- * so always returns an empty list.
+ * this method cancels rather than collects non-executed tasks
+ * upon termination, so always returns an empty list. However, you
+ * can use method <code>drainTasksTo</code> before invoking this
+ * method to transfer unexecuted tasks to another collection.
* @return an empty list
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
@@ -1080,17 +1157,19 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
lock.lock();
try {
ForkJoinWorkerThread[] ws = workers;
- int idx = w.poolIndex;
- if (idx >= 0 && idx < ws.length && ws[idx] == w)
- ws[idx] = null;
- if (totalCountOf(workerCounts) == 0) {
- terminate(); // no-op if already terminating
- transitionRunStateTo(TERMINATED);
- termination.signalAll();
- }
- else if (!isTerminating()) {
- tryShrinkWorkerArray();
- tryResumeSpare(true); // allow replacement
+ if (ws != null) {
+ int idx = w.poolIndex;
+ if (idx >= 0 && idx < ws.length && ws[idx] == w)
+ ws[idx] = null;
+ if (totalCountOf(workerCounts) == 0) {
+ terminate(); // no-op if already terminating
+ transitionRunStateTo(TERMINATED);
+ termination.signalAll();
+ }
+ else if (!isTerminating()) {
+ tryShrinkWorkerArray();
+ tryResumeSpare(true); // allow replacement
+ }
}
} finally {
lock.unlock();
@@ -1138,10 +1217,12 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
lock.lock();
try {
ForkJoinWorkerThread[] ws = workers;
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread t = ws[i];
- if (t != null)
- t.cancelTasks();
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null)
+ t.cancelTasks();
+ }
}
} finally {
lock.unlock();
@@ -1157,10 +1238,12 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
lock.lock();
try {
ForkJoinWorkerThread[] ws = workers;
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread t = ws[i];
- if (t != null)
- t.shutdownNow();
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null)
+ t.shutdownNow();
+ }
}
} finally {
lock.unlock();
@@ -1177,12 +1260,14 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ {
lock.lock();
try {
ForkJoinWorkerThread[] ws = workers;
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread t = ws[i];
- if (t != null && !t.isTerminated()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
+ if (ws != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread t = ws[i];
+ if (t != null && !t.isTerminated()) {
+ try {
+ t.interrupt();
+ } catch (SecurityException ignore) {
+ }
}
}
}
diff --git a/src/actors/scala/actors/forkjoin/ForkJoinTask.java b/src/actors/scala/actors/forkjoin/ForkJoinTask.java
index 5e40940ff3..230c7a0a20 100644
--- a/src/actors/scala/actors/forkjoin/ForkJoinTask.java
+++ b/src/actors/scala/actors/forkjoin/ForkJoinTask.java
@@ -926,15 +926,14 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
protected abstract boolean exec();
/**
- * Returns, but does not unschedule or execute, the task most
- * recently forked by the current thread but not yet executed, if
- * one is available. There is no guarantee that this task will
- * actually be polled or executed next.
- * This method is designed primarily to support extensions,
- * and is unlikely to be useful otherwise.
- * This method may be invoked only from within
- * ForkJoinTask computations. Attempts to invoke in other contexts
- * result in exceptions or errors possibly including ClassCastException.
+ * Returns, but does not unschedule or execute, the task queued by
+ * the current thread but not yet executed, if one is
+ * available. There is no guarantee that this task will actually
+ * be polled or executed next. This method is designed primarily
+ * to support extensions, and is unlikely to be useful otherwise.
+ * This method may be invoked only from within ForkJoinTask
+ * computations. Attempts to invoke in other contexts result in
+ * exceptions or errors possibly including ClassCastException.
*
* @return the next task, or null if none are available
*/
@@ -943,32 +942,32 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
- * Unschedules and returns, without executing, the task most
- * recently forked by the current thread but not yet executed.
- * This method is designed primarily to support extensions,
- * and is unlikely to be useful otherwise.
- * This method may be invoked only from within
- * ForkJoinTask computations. Attempts to invoke in other contexts
- * result in exceptions or errors possibly including ClassCastException.
+ * Unschedules and returns, without executing, the next task
+ * queued by the current thread but not yet executed. This method
+ * is designed primarily to support extensions, and is unlikely to
+ * be useful otherwise. This method may be invoked only from
+ * within ForkJoinTask computations. Attempts to invoke in other
+ * contexts result in exceptions or errors possibly including
+ * ClassCastException.
*
* @return the next task, or null if none are available
*/
protected static ForkJoinTask<?> pollNextLocalTask() {
- return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
+ return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask();
}
/**
- * Unschedules and returns, without executing, the task most
- * recently forked by the current thread but not yet executed, if
- * one is available, or if not available, a task that was forked
- * by some other thread, if available. Availability may be
- * transient, so a <code>null</code> result does not necessarily
- * imply quiecence of the pool this task is operating in.
- * This method is designed primarily to support extensions,
- * and is unlikely to be useful otherwise.
- * This method may be invoked only from within
+ * Unschedules and returns, without executing, the next task
+ * queued by the current thread but not yet executed, if one is
+ * available, or if not available, a task that was forked by some
+ * other thread, if available. Availability may be transient, so a
+ * <code>null</code> result does not necessarily imply quiecence
+ * of the pool this task is operating in. This method is designed
+ * primarily to support extensions, and is unlikely to be useful
+ * otherwise. This method may be invoked only from within
* ForkJoinTask computations. Attempts to invoke in other contexts
- * result in exceptions or errors possibly including ClassCastException.
+ * result in exceptions or errors possibly including
+ * ClassCastException.
*
* @return a task, or null if none are available
*/
diff --git a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java
index ad22a94afa..2d75987f91 100644
--- a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java
+++ b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java
@@ -17,8 +17,8 @@ import java.lang.reflect.*;
* subclassable solely for the sake of adding functionality -- there
* are no overridable methods dealing with scheduling or
* execution. However, you can override initialization and termination
- * cleanup methods surrounding the main task processing loop. If you
- * do create such a subclass, you will also need to supply a custom
+ * methods surrounding the main task processing loop. If you do
+ * create such a subclass, you will also need to supply a custom
* ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
*
*/
@@ -199,6 +199,11 @@ public class ForkJoinWorkerThread extends Thread {
long lastEventCount;
/**
+ * True if use local fifo, not default lifo, for local polling
+ */
+ private boolean locallyFifo;
+
+ /**
* Creates a ForkJoinWorkerThread operating in the given pool.
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
@@ -232,6 +237,14 @@ public class ForkJoinWorkerThread extends Thread {
return poolIndex;
}
+ /**
+ * Establishes local first-in-first-out scheduling mode for forked
+ * tasks that are never joined.
+ * @param async if true, use locally FIFO scheduling
+ */
+ void setAsyncMode(boolean async) {
+ locallyFifo = async;
+ }
// Runstate management
@@ -392,7 +405,7 @@ public class ForkJoinWorkerThread extends Thread {
*/
private static void setSlot(ForkJoinTask<?>[] q, int i,
ForkJoinTask<?> t){
-//TR _unsafe.putOrderedObject((Object)q, (i << qShift) + qBase, (Object)t);
+//TR _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
_unsafe.putObjectVolatile((Object)q, (i << qShift) + qBase, (Object)t);
}
@@ -436,7 +449,7 @@ public class ForkJoinWorkerThread extends Thread {
* either empty or contended.
* @return a task, or null if none or contended.
*/
- private ForkJoinTask<?> deqTask() {
+ final ForkJoinTask<?> deqTask() {
ForkJoinTask<?> t;
ForkJoinTask<?>[] q;
int i;
@@ -490,11 +503,15 @@ public class ForkJoinWorkerThread extends Thread {
}
/**
- * Returns next task to pop.
+ * Returns next task.
*/
final ForkJoinTask<?> peekTask() {
ForkJoinTask<?>[] q = queue;
- return q == null? null : q[(sp - 1) & (q.length - 1)];
+ if (q == null)
+ return null;
+ int mask = q.length - 1;
+ int i = locallyFifo? base : (sp - 1);
+ return q[i & mask];
}
/**
@@ -572,17 +589,25 @@ public class ForkJoinWorkerThread extends Thread {
}
/**
- * Pops or steals a task
+ * gets and removes a local or stolen a task
* @return a task, if available
*/
final ForkJoinTask<?> pollTask() {
- ForkJoinTask<?> t = popTask();
+ ForkJoinTask<?> t = locallyFifo? deqTask() : popTask();
if (t == null && (t = scan()) != null)
++stealCount;
return t;
}
/**
+ * gets a local task
+ * @return a task, if available
+ */
+ final ForkJoinTask<?> pollLocalTask() {
+ return locallyFifo? deqTask() : popTask();
+ }
+
+ /**
* Returns a pool submission, if one exists, activating first.
* @return a submission, if available
*/
@@ -609,6 +634,20 @@ public class ForkJoinWorkerThread extends Thread {
}
/**
+ * Drains tasks to given collection c
+ * @return the number of tasks drained
+ */
+ final int drainTasksTo(Collection<ForkJoinTask<?>> c) {
+ int n = 0;
+ ForkJoinTask<?> t;
+ while (base != sp && (t = deqTask()) != null) {
+ c.add(t);
+ ++n;
+ }
+ return n;
+ }
+
+ /**
* Get and clear steal count for accumulation by pool. Called
* only when known to be idle (in pool.sync and termination).
*/