diff options
-rw-r--r-- | src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java | 414 | ||||
-rw-r--r-- | src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java | 195 | ||||
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 43 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/AbstractPromise.java | 6 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 6 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Future.scala | 20 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/NonFatal.scala | 37 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Promise.scala | 127 | ||||
-rw-r--r-- | src/library/scala/concurrent/impl/Unsafe.java | 32 | ||||
-rw-r--r-- | test/files/jvm/scala-concurrent-tck.scala | 11 |
10 files changed, 477 insertions, 414 deletions
diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java index e9389e9acb..4b5b3382f5 100644 --- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java +++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java @@ -1,5 +1,4 @@ /* - * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ @@ -12,22 +11,18 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Random; -//import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -//import java.util.concurrent.RunnableFuture; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; -interface RunnableFuture<T> extends Runnable { - //TR placeholder for java.util.concurrent.RunnableFuture -} - /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. * A {@code ForkJoinPool} provides the entry point for submissions @@ -127,7 +122,7 @@ interface RunnableFuture<T> extends Runnable { * @since 1.7 * @author Doug Lea */ -public class ForkJoinPool /*extends AbstractExecutorService*/ { +public class ForkJoinPool extends AbstractExecutorService { /* * Implementation Overview @@ -634,7 +629,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared volatile Thread parker; // == owner during call to park; else null - ForkJoinTask<?> currentJoin; // task being joined in awaitJoin + volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin ForkJoinTask<?> currentSteal; // current non-local task being executed // Heuristic padding to ameliorate unfortunate memory placements Object p00, p01, p02, p03, p04, p05, p06, p07; @@ -726,12 +721,11 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * version of this method because it is never needed.) */ final ForkJoinTask<?> pop() { - ForkJoinTask<?> t; int m; - ForkJoinTask<?>[] a = array; - if (a != null && (m = a.length - 1) >= 0) { + ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m; + if ((a = array) != null && (m = a.length - 1) >= 0) { for (int s; (s = top - 1) - base >= 0;) { - int j = ((m & s) << ASHIFT) + ABASE; - if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) == null) + long j = ((m & s) << ASHIFT) + ABASE; + if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) break; if (U.compareAndSwapObject(a, j, t, null)) { top = s; @@ -835,54 +829,6 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { } /** - * If present, removes from queue and executes the given task, or - * any other cancelled task. Returns (true) immediately on any CAS - * or consistency check failure so caller can retry. - * - * @return false if no progress can be made - */ - final boolean tryRemoveAndExec(ForkJoinTask<?> task) { - boolean removed = false, empty = true, progress = true; - ForkJoinTask<?>[] a; int m, s, b, n; - if ((a = array) != null && (m = a.length - 1) >= 0 && - (n = (s = top) - (b = base)) > 0) { - for (ForkJoinTask<?> t;;) { // traverse from s to b - int j = ((--s & m) << ASHIFT) + ABASE; - t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); - if (t == null) // inconsistent length - break; - else if (t == task) { - if (s + 1 == top) { // pop - if (!U.compareAndSwapObject(a, j, task, null)) - break; - top = s; - removed = true; - } - else if (base == b) // replace with proxy - removed = U.compareAndSwapObject(a, j, task, - new EmptyTask()); - break; - } - else if (t.status >= 0) - empty = false; - else if (s + 1 == top) { // pop and throw away - if (U.compareAndSwapObject(a, j, t, null)) - top = s; - break; - } - if (--n == 0) { - if (!empty && base == b) - progress = false; - break; - } - } - } - if (removed) - task.doExec(); - return progress; - } - - /** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. @@ -944,69 +890,98 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { // Execution methods /** - * Removes and runs tasks until empty, using local mode - * ordering. Normally called only after checking for apparent - * non-emptiness. + * Pops and runs tasks until empty. */ - final void runLocalTasks() { - // hoist checks from repeated pop/poll - ForkJoinTask<?>[] a; int m; - if ((a = array) != null && (m = a.length - 1) >= 0) { - if (mode == 0) { - for (int s; (s = top - 1) - base >= 0;) { - int j = ((m & s) << ASHIFT) + ABASE; - ForkJoinTask<?> t = - (ForkJoinTask<?>)U.getObjectVolatile(a, j); - if (t != null) { - if (U.compareAndSwapObject(a, j, t, null)) { - top = s; - t.doExec(); - } - } - else - break; - } + private void popAndExecAll() { + // A bit faster than repeated pop calls + ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t; + while ((a = array) != null && (m = a.length - 1) >= 0 && + (s = top - 1) - base >= 0 && + (t = ((ForkJoinTask<?>) + U.getObject(a, j = ((m & s) << ASHIFT) + ABASE))) + != null) { + if (U.compareAndSwapObject(a, j, t, null)) { + top = s; + t.doExec(); } - else { - for (int b; (b = base) - top < 0;) { - int j = ((m & b) << ASHIFT) + ABASE; - ForkJoinTask<?> t = - (ForkJoinTask<?>)U.getObjectVolatile(a, j); - if (t != null) { - if (base == b && - U.compareAndSwapObject(a, j, t, null)) { - base = b + 1; - t.doExec(); - } - } else if (base == b) { - if (b + 1 == top) + } + } + + /** + * Polls and runs tasks until empty. + */ + private void pollAndExecAll() { + for (ForkJoinTask<?> t; (t = poll()) != null;) + t.doExec(); + } + + /** + * If present, removes from queue and executes the given task, or + * any other cancelled task. Returns (true) immediately on any CAS + * or consistency check failure so caller can retry. + * + * @return 0 if no progress can be made, else positive + * (this unusual convention simplifies use with tryHelpStealer.) + */ + final int tryRemoveAndExec(ForkJoinTask<?> task) { + int stat = 1; + boolean removed = false, empty = true; + ForkJoinTask<?>[] a; int m, s, b, n; + if ((a = array) != null && (m = a.length - 1) >= 0 && + (n = (s = top) - (b = base)) > 0) { + for (ForkJoinTask<?> t;;) { // traverse from s to b + int j = ((--s & m) << ASHIFT) + ABASE; + t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); + if (t == null) // inconsistent length + break; + else if (t == task) { + if (s + 1 == top) { // pop + if (!U.compareAndSwapObject(a, j, task, null)) break; - Thread.yield(); // wait for lagging update + top = s; + removed = true; } + else if (base == b) // replace with proxy + removed = U.compareAndSwapObject(a, j, task, + new EmptyTask()); + break; + } + else if (t.status >= 0) + empty = false; + else if (s + 1 == top) { // pop and throw away + if (U.compareAndSwapObject(a, j, t, null)) + top = s; + break; + } + if (--n == 0) { + if (!empty && base == b) + stat = 0; + break; } } } + if (removed) + task.doExec(); + return stat; } /** * Executes a top-level task and any local tasks remaining * after execution. - * - * @return true unless terminating */ - final boolean runTask(ForkJoinTask<?> t) { - boolean alive = true; + final void runTask(ForkJoinTask<?> t) { if (t != null) { currentSteal = t; t.doExec(); - if (top != base) // conservative guard - runLocalTasks(); + if (top != base) { // process remaining local tasks + if (mode == 0) + popAndExecAll(); + else + pollAndExecAll(); + } ++nsteals; currentSteal = null; } - else if (runState < 0) // terminating - alive = false; - return alive; } /** @@ -1072,7 +1047,6 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { ASHIFT = 31 - Integer.numberOfLeadingZeros(s); } } - /** * Per-thread records for threads that submit to pools. Currently * holds only pseudo-random seed / index that is used to choose @@ -1165,7 +1139,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * traversal parameters at the expense of sometimes blocking when * we could be helping. */ - private static final int MAX_HELP = 32; + private static final int MAX_HELP = 64; /** * Secondary time-based bound (in nanosecs) for helping attempts @@ -1175,7 +1149,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * value should roughly approximate the time required to create * and/or activate a worker thread. */ - private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec + private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec /** * Increment for seed generators. See class ThreadLocal for @@ -1326,22 +1300,28 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * * @param w the worker's queue */ + final void registerWorker(WorkQueue w) { Mutex lock = this.lock; lock.lock(); try { WorkQueue[] ws = workQueues; if (w != null && ws != null) { // skip on shutdown/failure - int rs, n; - while ((n = ws.length) < // ensure can hold total - (parallelism + (short)(ctl >>> TC_SHIFT) << 1)) - workQueues = ws = Arrays.copyOf(ws, n << 1); - int m = n - 1; + int rs, n = ws.length, m = n - 1; int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence w.seed = (s == 0) ? 1 : s; // ensure non-zero seed int r = (s << 1) | 1; // use odd-numbered indices - while (ws[r &= m] != null) // step by approx half size - r += ((n >>> 1) & SQMASK) + 2; + if (ws[r &= m] != null) { // collision + int probes = 0; // step by approx half size + int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2; + while (ws[r = (r + step) & m] != null) { + if (++probes >= n) { + workQueues = ws = Arrays.copyOf(ws, n <<= 1); + m = n - 1; + probes = 0; + } + } + } w.eventCount = w.poolIndex = r; // establish before recording ws[r] = w; // also update seq runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN); @@ -1488,7 +1468,6 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { } } - // Scanning for tasks /** @@ -1496,7 +1475,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { */ final void runWorker(WorkQueue w) { w.growArray(false); // initialize queue array in this thread - do {} while (w.runTask(scan(w))); + do { w.runTask(scan(w)); } while (w.runState >= 0); } /** @@ -1559,8 +1538,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { q.base = b + 1; // specialization of pollAt return t; } - else if ((t != null || b + 1 != q.top) && - (ec < 0 || j <= m)) { + else if (ec < 0 || j <= m) { rs = 0; // mark scan as imcomplete break; // caller can retry after release } @@ -1568,6 +1546,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { if (--j < 0) break; } + long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns; if (e < 0) // decode ctl on empty scan w.runState = -1; // pool is terminating @@ -1635,7 +1614,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { */ private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { if (w.eventCount < 0 && !tryTerminate(false, false) && - (int)prevCtl != 0 && ctl == currentCtl) { + (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) { Thread wt = Thread.currentThread(); Thread.yield(); // yield before block while (ctl == currentCtl) { @@ -1670,70 +1649,79 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * leaves hints in workers to speed up subsequent calls. The * implementation is very branchy to cope with potential * inconsistencies or loops encountering chains that are stale, - * unknown, or so long that they are likely cyclic. All of these - * cases are dealt with by just retrying by caller. + * unknown, or so long that they are likely cyclic. * * @param joiner the joining worker * @param task the task to join - * @return true if found or ran a task (and so is immediately retryable) - */ - private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { - WorkQueue[] ws; - int m, depth = MAX_HELP; // remaining chain depth - boolean progress = false; - if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && - task.status >= 0) { - ForkJoinTask<?> subtask = task; // current target - outer: for (WorkQueue j = joiner;;) { - WorkQueue stealer = null; // find stealer of subtask - WorkQueue v = ws[j.stealHint & m]; // try hint - if (v != null && v.currentSteal == subtask) - stealer = v; - else { // scan - for (int i = 1; i <= m; i += 2) { - if ((v = ws[i]) != null && v.currentSteal == subtask && - v != joiner) { - stealer = v; - j.stealHint = i; // save hint - break; - } + * @return 0 if no progress can be made, negative if task + * known complete, else positive + */ + private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { + int stat = 0, steps = 0; // bound to avoid cycles + if (joiner != null && task != null) { // hoist null checks + restart: for (;;) { + ForkJoinTask<?> subtask = task; // current target + for (WorkQueue j = joiner, v;;) { // v is stealer of subtask + WorkQueue[] ws; int m, s, h; + if ((s = task.status) < 0) { + stat = s; + break restart; } - if (stealer == null) - break; - } - - for (WorkQueue q = stealer;;) { // try to help stealer - ForkJoinTask[] a; ForkJoinTask<?> t; int b; - if (task.status < 0) - break outer; - if ((b = q.base) - q.top < 0 && (a = q.array) != null) { - progress = true; - int i = (((a.length - 1) & b) << ASHIFT) + ABASE; - t = (ForkJoinTask<?>)U.getObjectVolatile(a, i); - if (subtask.status < 0) // must recheck before taking - break outer; - if (t != null && - q.base == b && - U.compareAndSwapObject(a, i, t, null)) { - q.base = b + 1; - joiner.runSubtask(t); + if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) + break restart; // shutting down + if ((v = ws[h = (j.stealHint | 1) & m]) == null || + v.currentSteal != subtask) { + for (int origin = h;;) { // find stealer + if (((h = (h + 2) & m) & 15) == 1 && + (subtask.status < 0 || j.currentJoin != subtask)) + continue restart; // occasional staleness check + if ((v = ws[h]) != null && + v.currentSteal == subtask) { + j.stealHint = h; // save hint + break; + } + if (h == origin) + break restart; // cannot find stealer } - else if (q.base == b) - break outer; // possibly stalled } - else { // descend - ForkJoinTask<?> next = stealer.currentJoin; - if (--depth <= 0 || subtask.status < 0 || - next == null || next == subtask) - break outer; // stale, dead-end, or cyclic - subtask = next; - j = stealer; - break; + for (;;) { // help stealer or descend to its stealer + ForkJoinTask[] a; int b; + if (subtask.status < 0) // surround probes with + continue restart; // consistency checks + if ((b = v.base) - v.top < 0 && (a = v.array) != null) { + int i = (((a.length - 1) & b) << ASHIFT) + ABASE; + ForkJoinTask<?> t = + (ForkJoinTask<?>)U.getObjectVolatile(a, i); + if (subtask.status < 0 || j.currentJoin != subtask || + v.currentSteal != subtask) + continue restart; // stale + stat = 1; // apparent progress + if (t != null && v.base == b && + U.compareAndSwapObject(a, i, t, null)) { + v.base = b + 1; // help stealer + joiner.runSubtask(t); + } + else if (v.base == b && ++steps == MAX_HELP) + break restart; // v apparently stalled + } + else { // empty -- try to descend + ForkJoinTask<?> next = v.currentJoin; + if (subtask.status < 0 || j.currentJoin != subtask || + v.currentSteal != subtask) + continue restart; // stale + else if (next == null || ++steps == MAX_HELP) + break restart; // dead-end or maybe cyclic + else { + subtask = next; + j = v; + break; + } + } } } } } - return progress; + return stat; } /** @@ -1833,44 +1821,50 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { * @return task status on exit */ final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) { - ForkJoinTask<?> prevJoin = joiner.currentJoin; - joiner.currentJoin = task; - long startTime = 0L; - for (int k = 0, s; ; ++k) { - if ((joiner.isEmpty() ? // try to help - !tryHelpStealer(joiner, task) : - !joiner.tryRemoveAndExec(task))) { - if (k == 0) { - startTime = System.nanoTime(); - tryPollForAndExec(joiner, task); // check uncommon case - } - else if ((k & (MAX_HELP - 1)) == 0 && - System.nanoTime() - startTime >= COMPENSATION_DELAY && - tryCompensate(task, null)) { - if (task.trySetSignal() && task.status >= 0) { - synchronized (task) { - if (task.status >= 0) { - try { // see ForkJoinTask - task.wait(); // for explanation - } catch (InterruptedException ie) { + int s; + if ((s = task.status) >= 0) { + ForkJoinTask<?> prevJoin = joiner.currentJoin; + joiner.currentJoin = task; + long startTime = 0L; + for (int k = 0;;) { + if ((s = (joiner.isEmpty() ? // try to help + tryHelpStealer(joiner, task) : + joiner.tryRemoveAndExec(task))) == 0 && + (s = task.status) >= 0) { + if (k == 0) { + startTime = System.nanoTime(); + tryPollForAndExec(joiner, task); // check uncommon case + } + else if ((k & (MAX_HELP - 1)) == 0 && + System.nanoTime() - startTime >= + COMPENSATION_DELAY && + tryCompensate(task, null)) { + if (task.trySetSignal()) { + synchronized (task) { + if (task.status >= 0) { + try { // see ForkJoinTask + task.wait(); // for explanation + } catch (InterruptedException ie) { + } } + else + task.notifyAll(); } - else - task.notifyAll(); } + long c; // re-activate + do {} while (!U.compareAndSwapLong + (this, CTL, c = ctl, c + AC_UNIT)); } - long c; // re-activate - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, c + AC_UNIT)); } + if (s < 0 || (s = task.status) < 0) { + joiner.currentJoin = prevJoin; + break; + } + else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1) + Thread.yield(); // for politeness } - if ((s = task.status) < 0) { - joiner.currentJoin = prevJoin; - return s; - } - else if ((k & (MAX_HELP - 1)) == MAX_HELP >>> 1) - Thread.yield(); // for politeness } + return s; } /** @@ -1887,7 +1881,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { while ((s = task.status) >= 0 && (joiner.isEmpty() ? tryHelpStealer(joiner, task) : - joiner.tryRemoveAndExec(task))) + joiner.tryRemoveAndExec(task)) != 0) ; return s; } @@ -1919,6 +1913,7 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { } } + /** * Runs tasks until {@code isQuiescent()}. We piggyback on * active count ctl maintenance, but rather than blocking @@ -1927,8 +1922,9 @@ public class ForkJoinPool /*extends AbstractExecutorService*/ { */ final void helpQuiescePool(WorkQueue w) { for (boolean active = true;;) { - if (w.base - w.top < 0) - w.runLocalTasks(); // exhaust local queue + ForkJoinTask<?> localTask; // exhaust local queue + while ((localTask = w.nextLocalTask()) != null) + localTask.doExec(); WorkQueue q = findNonEmptyStealQueue(w); if (q != null) { ForkJoinTask<?> t; int b; diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java index 344f6887a6..2ba146c4da 100644 --- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java +++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java @@ -16,7 +16,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -//import java.util.concurrent.RunnableFuture; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; @@ -115,18 +115,19 @@ import java.lang.reflect.Constructor; * <p>The ForkJoinTask class is not usually directly subclassed. * Instead, you subclass one of the abstract classes that support a * particular style of fork/join processing, typically {@link - * RecursiveAction} for computations that do not return results, or - * {@link RecursiveTask} for those that do. Normally, a concrete - * ForkJoinTask subclass declares fields comprising its parameters, - * established in a constructor, and then defines a {@code compute} - * method that somehow uses the control methods supplied by this base - * class. While these methods have {@code public} access (to allow - * instances of different task subclasses to call each other's - * methods), some of them may only be called from within other - * ForkJoinTasks (as may be determined using method {@link - * #inForkJoinPool}). Attempts to invoke them in other contexts - * result in exceptions or errors, possibly including - * {@code ClassCastException}. + * RecursiveAction} for most computations that do not return results, + * {@link RecursiveTask} for those that do, and {@link + * CountedCompleter} for those in which completed actions trigger + * other actions. Normally, a concrete ForkJoinTask subclass declares + * fields comprising its parameters, established in a constructor, and + * then defines a {@code compute} method that somehow uses the control + * methods supplied by this base class. While these methods have + * {@code public} access (to allow instances of different task + * subclasses to call each other's methods), some of them may only be + * called from within other ForkJoinTasks (as may be determined using + * method {@link #inForkJoinPool}). Attempts to invoke them in other + * contexts result in exceptions or errors, possibly including {@code + * ClassCastException}. * * <p>Method {@link #join} and its variants are appropriate for use * only when completion dependencies are acyclic; that is, the @@ -137,17 +138,17 @@ import java.lang.reflect.Constructor; * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that * may be of use in constructing custom subclasses for problems that * are not statically structured as DAGs. To support such usages a - * ForkJoinTask may be atomically <em>marked</em> using {@link - * #markForkJoinTask} and checked for marking using {@link - * #isMarkedForkJoinTask}. The ForkJoinTask implementation does not - * use these {@code protected} methods or marks for any purpose, but + * ForkJoinTask may be atomically <em>tagged</em> with a {@code + * short} value using {@link #setForkJoinTaskTag} or {@link + * #compareAndSetForkJoinTaskTag} and checked using {@link + * #getForkJoinTaskTag}. The ForkJoinTask implementation does not + * use these {@code protected} methods or tags for any purpose, but * they may be of use in the construction of specialized subclasses. * For example, parallel graph traversals can use the supplied methods * to avoid revisiting nodes/tasks that have already been processed. - * Also, completion based designs can use them to record that one - * subtask has completed. (Method names for marking are bulky in part - * to encourage definition of methods that reflect their usage - * patterns.) + * Also, completion based designs can use them to record that subtasks + * have completed. (Method names for tagging are bulky in part to + * encourage definition of methods that reflect their usage patterns.) * * <p>Most base support methods are {@code final}, to prevent * overriding of implementations that are intrinsically tied to the @@ -213,6 +214,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * thin-lock techniques, so use some odd coding idioms that tend * to avoid them, mainly by arranging that every synchronized * block performs a wait, notifyAll or both. + * + * These control bits occupy only (some of) the upper half (16 + * bits) of status field. The lower bits are used for user-defined + * tags. */ /** The run status of this task */ @@ -221,13 +226,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { static final int NORMAL = 0xf0000000; // must be negative static final int CANCELLED = 0xc0000000; // must be < NORMAL static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED - static final int SIGNAL = 0x00000001; - static final int MARKED = 0x00000002; + static final int SIGNAL = 0x00010000; // must be >= 1 << 16 + static final int SMASK = 0x0000ffff; // short bits for tags /** * Marks completion and wakes up threads waiting to join this - * task. A specialization for NORMAL completion is in method - * doExec. + * task. * * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL * @return completion status on exit @@ -237,7 +241,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { if ((s = status) < 0) return s; if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { - if ((s & SIGNAL) != 0) + if ((s >>> 16) != 0) synchronized (this) { notifyAll(); } return completion; } @@ -259,26 +263,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { } catch (Throwable rex) { return setExceptionalCompletion(rex); } - while ((s = status) >= 0 && completed) { - if (U.compareAndSwapInt(this, STATUS, s, s | NORMAL)) { - if ((s & SIGNAL) != 0) - synchronized (this) { notifyAll(); } - return NORMAL; - } - } + if (completed) + s = setCompletion(NORMAL); } return s; } /** - * Tries to set SIGNAL status. Used by ForkJoinPool. Other - * variants are directly incorporated into externalAwaitDone etc. + * Tries to set SIGNAL status unless already completed. Used by + * ForkJoinPool. Other variants are directly incorporated into + * externalAwaitDone etc. * * @return true if successful */ final boolean trySetSignal() { - int s; - return U.compareAndSwapInt(this, STATUS, s = status, s | SIGNAL); + int s = status; + return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL); } /** @@ -328,7 +328,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { return s; } - /** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and @@ -417,25 +416,39 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * @return status on exit */ private int setExceptionalCompletion(Throwable ex) { - int h = System.identityHashCode(this); - final ReentrantLock lock = exceptionTableLock; - lock.lock(); - try { - expungeStaleExceptions(); - ExceptionNode[] t = exceptionTable; - int i = h & (t.length - 1); - for (ExceptionNode e = t[i]; ; e = e.next) { - if (e == null) { - t[i] = new ExceptionNode(this, ex, t[i]); - break; + int s; + if ((s = status) >= 0) { + int h = System.identityHashCode(this); + final ReentrantLock lock = exceptionTableLock; + lock.lock(); + try { + expungeStaleExceptions(); + ExceptionNode[] t = exceptionTable; + int i = h & (t.length - 1); + for (ExceptionNode e = t[i]; ; e = e.next) { + if (e == null) { + t[i] = new ExceptionNode(this, ex, t[i]); + break; + } + if (e.get() == this) // already present + break; } - if (e.get() == this) // already present - break; + } finally { + lock.unlock(); } - } finally { - lock.unlock(); + s = setCompletion(EXCEPTIONAL); } - return setCompletion(EXCEPTIONAL); + ForkJoinTask<?> p = internalGetCompleter(); // propagate + if (p != null && p.status >= 0) + p.setExceptionalCompletion(ex); + return s; + } + + /** + * Exception propagation support for tasks with completers. + */ + ForkJoinTask<?> internalGetCompleter() { + return null; } /** @@ -517,7 +530,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { Throwable ex; if (e == null || (ex = e.ex) == null) return null; - if (e.thrower != Thread.currentThread().getId()) { + if (false && e.thrower != Thread.currentThread().getId()) { Class<? extends Throwable> ec = ex.getClass(); try { Constructor<?> noArgCtor = null; @@ -907,6 +920,18 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { } /** + * Completes this task normally without setting a value. The most + * recent value established by {@link #setRawResult} (or {@code + * null} by default) will be returned as the result of subsequent + * invocations of {@code join} and related operations. + * + * @since 1.8 + */ + public final void quietlyComplete() { + setCompletion(NORMAL); + } + + /** * Waits if necessary for the computation to complete, and then * retrieves its result. * @@ -1225,15 +1250,18 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { protected abstract void setRawResult(V value); /** - * Immediately performs the base action of this task. This method - * is designed to support extensions, and should not in general be - * called otherwise. The return value controls whether this task - * is considered to be done normally. It may return false in + * Immediately performs the base action of this task and returns + * true if, upon return from this method, this task is guaranteed + * to have completed normally. This method may return false + * otherwise, to indicate that this task is not necessarily + * complete (or is not known to be complete), for example in * asynchronous actions that require explicit invocations of - * {@link #complete} to become joinable. It may also throw an - * (unchecked) exception to indicate abnormal exit. + * completion methods. This method may also throw an (unchecked) + * exception to indicate abnormal exit. This method is designed to + * support extensions, and should not in general be called + * otherwise. * - * @return {@code true} if completed normally + * @return {@code true} if this task is known to have completed normally */ protected abstract boolean exec(); @@ -1302,44 +1330,53 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { return wt.pool.nextTaskFor(wt.workQueue); } - // Mark-bit operations + // tag operations /** - * Returns true if this task is marked. + * Returns the tag for this task. * - * @return true if this task is marked + * @return the tag for this task * @since 1.8 */ - public final boolean isMarkedForkJoinTask() { - return (status & MARKED) != 0; + public final short getForkJoinTaskTag() { + return (short)status; } /** - * Atomically sets the mark on this task. + * Atomically sets the tag value for this task. * - * @return true if this task was previously unmarked + * @param tag the tag value + * @return the previous value of the tag * @since 1.8 */ - public final boolean markForkJoinTask() { + public final short setForkJoinTaskTag(short tag) { for (int s;;) { - if (((s = status) & MARKED) != 0) - return false; - if (U.compareAndSwapInt(this, STATUS, s, s | MARKED)) - return true; + if (U.compareAndSwapInt(this, STATUS, s = status, + (s & ~SMASK) | (tag & SMASK))) + return (short)s; } } /** - * Atomically clears the mark on this task. + * Atomically conditionally sets the tag value for this task. + * Among other applications, tags can be used as visit markers + * in tasks operating on graphs, as in methods that check: {@code + * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))} + * before processing, otherwise exiting because the node has + * already been visited. * - * @return true if this task was previously marked + * @param e the expected tag value + * @param tag the new tag value + * @return true if successful; i.e., the current value was + * equal to e and is now tag. * @since 1.8 */ - public final boolean unmarkForkJoinTask() { + public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { for (int s;;) { - if (((s = status) & MARKED) == 0) + if ((short)(s = status) != e) return false; - if (U.compareAndSwapInt(this, STATUS, s, s & ~MARKED)) + if (U.compareAndSwapInt(this, STATUS, s, + (s & ~SMASK) | (tag & SMASK))) return true; } } diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 0d76c23c25..f5f0c5cdfd 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -18,6 +18,7 @@ import java.{ lang => jl } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } import scala.concurrent.util.Duration +import scala.concurrent.impl.NonFatal import scala.Option import scala.annotation.tailrec @@ -117,7 +118,7 @@ trait Future[+T] extends Awaitable[T] { case Right(v) => // do nothing } - /** When this future is completed, either through an exception, a timeout, or a value, + /** When this future is completed, either through an exception, or a value, * apply the provided function. * * If the future has already been completed, @@ -204,7 +205,7 @@ trait Future[+T] extends Awaitable[T] { case Right(v) => try p success f(v) catch { - case t => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -230,7 +231,7 @@ trait Future[+T] extends Awaitable[T] { case Right(v) => p success v } } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -242,7 +243,7 @@ trait Future[+T] extends Awaitable[T] { * If the current future contains a value which satisfies the predicate, the new future will also hold that value. * Otherwise, the resulting future will fail with a `NoSuchElementException`. * - * If the current future fails or times out, the resulting future also fails or times out, respectively. + * If the current future fails, then the resulting future also fails. * * Example: * {{{ @@ -263,7 +264,7 @@ trait Future[+T] extends Awaitable[T] { if (pred(v)) p success v else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v) } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -282,12 +283,12 @@ trait Future[+T] extends Awaitable[T] { // def withFilter(q: S => Boolean): FutureWithFilter[S] = new FutureWithFilter[S](self, x => p(x) && q(x)) // } - /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value. + /** Creates a new future by mapping the value of the current future, if the given partial function is defined at that value. * * If the current future contains a value for which the partial function is defined, the new future will also hold that value. * Otherwise, the resulting future will fail with a `NoSuchElementException`. * - * If the current future fails or times out, the resulting future also fails or times out, respectively. + * If the current future fails, then the resulting future also fails. * * Example: * {{{ @@ -312,7 +313,7 @@ trait Future[+T] extends Awaitable[T] { if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } } @@ -337,7 +338,7 @@ trait Future[+T] extends Awaitable[T] { onComplete { case Left(t) if pf isDefinedAt t => try { p success pf(t) } - catch { case t: Throwable => p complete resolver(t) } + catch { case NonFatal(t) => p complete resolver(t) } case otherwise => p complete otherwise } @@ -365,7 +366,7 @@ trait Future[+T] extends Awaitable[T] { try { p completeWith pf(t) } catch { - case t: Throwable => p complete resolver(t) + case NonFatal(t) => p complete resolver(t) } case otherwise => p complete otherwise } @@ -512,17 +513,16 @@ trait Future[+T] extends Awaitable[T] { * Note: using this method yields nondeterministic dataflow programs. */ object Future { - - /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. - * - * The result becomes available once the asynchronous computation is completed. - * - * @tparam T the type of the result - * @param body the asychronous computation - * @param execctx the execution context on which the future is run - * @return the `Future` holding the result of the computation - */ - def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body) + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. + * + * The result becomes available once the asynchronous computation is completed. + * + * @tparam T the type of the result + * @param body the asychronous computation + * @param execctx the execution context on which the future is run + * @return the `Future` holding the result of the computation + */ + def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -615,4 +615,3 @@ object Future { - diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java index 5280d67854..8aac5de042 100644 --- a/src/library/scala/concurrent/impl/AbstractPromise.java +++ b/src/library/scala/concurrent/impl/AbstractPromise.java @@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; abstract class AbstractPromise { - private volatile Object _ref = null; + private volatile Object _ref; protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater = - AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); -} + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +}
\ No newline at end of file diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index c308a59297..2b929d91ab 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -93,8 +93,12 @@ private[scala] class ExecutionContextImpl(es: AnyRef) extends ExecutionContext w } def reportFailure(t: Throwable) = t match { + /*TODO: clarify that resolver should wrap Errors, in which case we do not want + Error to be re-thrown here */ case e: Error => throw e // rethrow serious errors - case t => t.printStackTrace() + case t => + //println(t.toString) + t.printStackTrace() } } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index a3c8ed3095..9743c37403 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -28,7 +28,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa /** Tests whether this Future has been completed. */ - final def isCompleted: Boolean = value.isDefined + def isCompleted: Boolean /** The contained value of this Future. Before this Future is completed * the value will be None. After completion the value will be Some(Right(t)) @@ -71,13 +71,19 @@ object Future { def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = { val promise = new Promise.DefaultPromise[T]() + + //TODO: shouldn't the following be: + //dispatchFuture(executor, () => { promise complete Right(body) }) + executor.execute(new Runnable { def run = { promise complete { try { Right(body) } catch { - case e => scala.concurrent.resolver(e) + case e => + //executor.reportFailure(e) + scala.concurrent.resolver(e) } } } @@ -107,7 +113,7 @@ object Future { private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit = _taskStack.get match { - case stack if (stack ne null) && !force => stack push task + case stack if (stack ne null) && !force => stack push task // FIXME we can't mix tasks aimed for different ExecutionContexts see: https://github.com/akka/akka/blob/v2.0.1/akka-actor/src/main/scala/akka/dispatch/Future.scala#L373 case _ => executor.execute(new Runnable { def run() { try { @@ -115,13 +121,7 @@ object Future { _taskStack set taskStack while (taskStack.nonEmpty) { val next = taskStack.pop() - try { - next.apply() - } catch { - case e => - // TODO catching all and continue isn't good for OOME - executor.reportFailure(e) - } + try next() catch { case NonFatal(e) => executor reportFailure e } } } finally { _taskStack.remove() diff --git a/src/library/scala/concurrent/impl/NonFatal.scala b/src/library/scala/concurrent/impl/NonFatal.scala new file mode 100644 index 0000000000..bc509e664c --- /dev/null +++ b/src/library/scala/concurrent/impl/NonFatal.scala @@ -0,0 +1,37 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent +package impl + +/** + * Extractor of non-fatal Throwables. Will not match fatal errors + * like VirtualMachineError (OutOfMemoryError) + * ThreadDeath, LinkageError and InterruptedException. + * StackOverflowError is matched, i.e. considered non-fatal. + * + * Usage to catch all harmless throwables: + * {{{ + * try { + * // dangerous stuff + * } catch { + * case NonFatal(e) => log.error(e, "Something not that bad") + * } + * }}} + */ +private[concurrent] object NonFatal { + + def unapply(t: Throwable): Option[Throwable] = t match { + case e: StackOverflowError ⇒ Some(e) // StackOverflowError ok even though it is a VirtualMachineError + // VirtualMachineError includes OutOfMemoryError and other fatal errors + case _: VirtualMachineError | _: ThreadDeath | _: InterruptedException | _: LinkageError ⇒ None + case e ⇒ Some(e) + } + +} + diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 07b6d1f278..ee1841aaff 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -74,37 +74,10 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu object Promise { - - def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue - - def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] - - /** Represents the internal state. - * - * [adriaan] it's unsound to make FState covariant (tryComplete won't type check) - */ - sealed trait FState[T] { def value: Option[Either[Throwable, T]] } - - case class Pending[T](listeners: List[Either[Throwable, T] => Any] = Nil) extends FState[T] { - def value: Option[Either[Throwable, T]] = None - } - - case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def result: T = value.get.right.get - } - - case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { - def exception: Throwable = value.get.left.get - } - - private val emptyPendingValue = Pending[Nothing](Nil) - /** Default promise implementation. */ - class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { - self => - - updater.set(this, Promise.EmptyPending()) + class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self => + updater.set(this, Nil) // Start at "No callbacks" //FIXME switch to Unsafe instead of ARFU protected final def tryAwait(atMost: Duration): Boolean = { @tailrec @@ -115,7 +88,7 @@ object Promise { val start = System.nanoTime() try { synchronized { - while (value.isEmpty) wait(ms, ns) + while (!isCompleted) wait(ms, ns) } } catch { case e: InterruptedException => @@ -123,93 +96,91 @@ object Promise { awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) } else - value.isDefined + isCompleted } - - blocking(Future.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) + //FIXME do not do this if there'll be no waiting + blocking(Future.body2awaitable(awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)), atMost) } + @throws(classOf[TimeoutException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type = - if (value.isDefined || tryAwait(atMost)) this + if (isCompleted || tryAwait(atMost)) this else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = ready(atMost).value.get match { case Left(e) => throw e case Right(r) => r } - def value: Option[Either[Throwable, T]] = getState.value + def value: Option[Either[Throwable, T]] = getState match { + case c: Either[_, _] => Some(c.asInstanceOf[Either[Throwable, T]]) + case _ => None + } + + override def isCompleted(): Boolean = getState match { // Cheaper than boxing result into Option due to "def value" + case _: Either[_, _] => true + case _ => false + } @inline - private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, AnyRef]] @inline - protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) + protected final def updateState(oldState: AnyRef, newState: AnyRef): Boolean = updater.compareAndSet(this, oldState, newState) @inline - protected final def getState: FState[T] = updater.get(this) + protected final def getState: AnyRef = updater.get(this) def tryComplete(value: Either[Throwable, T]): Boolean = { - val callbacks: List[Either[Throwable, T] => Any] = { + val callbacks: List[Either[Throwable, T] => Unit] = { try { @tailrec - def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Any] = { + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = { getState match { - case cur @ Pending(listeners) => - val newState = - if (v.isLeft) Failure(Some(v.asInstanceOf[Left[Throwable, T]])) - else Success(Some(v.asInstanceOf[Right[Throwable, T]])) - - if (updateState(cur, newState)) listeners - else tryComplete(v) + case raw: List[_] => + val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]] + if (updateState(cur, v)) cur else tryComplete(v) case _ => null } } tryComplete(resolveEither(value)) } finally { - synchronized { notifyAll() } // notify any blockers from `tryAwait` + synchronized { notifyAll() } //Notify any evil blockers } } callbacks match { case null => false case cs if cs.isEmpty => true - case cs => - Future.dispatchFuture(executor, { - () => cs.foreach(f => notifyCompleted(f, value)) - }) - true + case cs => Future.dispatchFuture(executor, () => cs.foreach(f => notifyCompleted(f, value))); true } } def onComplete[U](func: Either[Throwable, T] => U): this.type = { - @tailrec // Returns whether the future has already been completed or not - def tryAddCallback(): Boolean = { + @tailrec //Returns the future's results if it has already been completed, or null otherwise. + def tryAddCallback(): Either[Throwable, T] = { val cur = getState cur match { - case _: Success[_] | _: Failure[_] => true - case p: Pending[_] => - val pt = p.asInstanceOf[Pending[T]] - if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + case r: Either[_, _] => r.asInstanceOf[Either[Throwable, T]] + case listeners: List[_] => if (updateState(listeners, func :: listeners)) null else tryAddCallback() } } - if (tryAddCallback()) { - val result = value.get - Future.dispatchFuture(executor, { - () => notifyCompleted(func, result) - }) + tryAddCallback() match { + case null => this + case completed => + Future.dispatchFuture(executor, () => notifyCompleted(func, completed)) + this } - - this } private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T]) { try { func(result) } catch { - case e => executor.reportFailure(e) + case NonFatal(e) => executor reportFailure e } } } @@ -222,13 +193,13 @@ object Promise { val value = Some(resolveEither(suppliedValue)) + override def isCompleted(): Boolean = true + def tryComplete(value: Either[Throwable, T]): Boolean = false def onComplete[U](func: Either[Throwable, T] => U): this.type = { val completedAs = value.get - Future.dispatchFuture(executor, { - () => func(completedAs) - }) + Future.dispatchFuture(executor, () => func(completedAs)) this } @@ -241,19 +212,3 @@ object Promise { } } - - - - - - - - - - - - - - - - diff --git a/src/library/scala/concurrent/impl/Unsafe.java b/src/library/scala/concurrent/impl/Unsafe.java new file mode 100644 index 0000000000..21f7e638e5 --- /dev/null +++ b/src/library/scala/concurrent/impl/Unsafe.java @@ -0,0 +1,32 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + + +package scala.concurrent; + +import java.lang.reflect.Field; + +final class Unsafe { + public final static sun.misc.Unsafe instance; + static { + try { + sun.misc.Unsafe found = null; + for(Field field : sun.misc.Unsafe.class.getDeclaredFields()) { + if (field.getType() == sun.misc.Unsafe.class) { + field.setAccessible(true); + found = (sun.misc.Unsafe) field.get(null); + break; + } + } + if (found == null) throw new IllegalStateException("Can't find instance of sun.misc.Unsafe"); + else instance = found; + } catch(Throwable t) { + throw new ExceptionInInitializerError(t); + } + } +} diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index f0ca438774..8fcaceb3f0 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -64,7 +64,7 @@ trait FutureCallbacks extends TestBase { } } } - + def testOnSuccessWhenFailed(): Unit = once { done => val f = future[Unit] { @@ -94,7 +94,7 @@ trait FutureCallbacks extends TestBase { assert(x == 1) } } - + def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once { done => val f = future[Unit] { @@ -289,6 +289,9 @@ trait FutureCombinators extends TestBase { } } + /* TODO: Test for NonFatal in collect (more of a regression test at this point). + */ + def testForeachSuccess(): Unit = once { done => val p = promise[Int]() @@ -473,8 +476,8 @@ trait FutureCombinators extends TestBase { def testFallbackToFailure(): Unit = once { done => val cause = new Exception - val f = future { throw cause } - val g = future { sys.error("failed") } + val f = future { /*throw cause*/ sys.error("failed") } + val g = future { /*sys.error("failed")*/ throw cause } val h = f fallbackTo g h onSuccess { |