summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/forkjoin/ForkJoinWorkerThread.java
blob: b4d889750c887f81eca1210e55cab1aa591b8488 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 */

package scala.concurrent.forkjoin;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import sun.misc.Unsafe;
import java.lang.reflect.*;

/**
 * A thread managed by a {@link ForkJoinPool}.  This class is
 * subclassable solely for the sake of adding functionality -- there
 * are no overridable methods dealing with scheduling or
 * execution. However, you can override initialization and termination
 * methods surrounding the main task processing loop.  If you do
 * create such a subclass, you will also need to supply a custom
 * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool.
 *
 */
public class ForkJoinWorkerThread extends Thread {
    /*
     * Algorithm overview:
     *
     * 1. Work-Stealing: Work-stealing queues are special forms of
     * Deques that support only three of the four possible
     * end-operations -- push, pop, and deq (aka steal), and only do
     * so under the constraints that push and pop are called only from
     * the owning thread, while deq may be called from other threads.
     * (If you are unfamiliar with them, you probably want to read
     * Herlihy and Shavit's book "The Art of Multiprocessor
     * programming", chapter 16 describing these in more detail before
     * proceeding.)  The main work-stealing queue design is roughly
     * similar to "Dynamic Circular Work-Stealing Deque" by David
     * Chase and Yossi Lev, SPAA 2005
     * (http://research.sun.com/scalable/pubs/index.html).  The main
     * difference ultimately stems from gc requirements that we null
     * out taken slots as soon as we can, to maintain as small a
     * footprint as possible even in programs generating huge numbers
     * of tasks. To accomplish this, we shift the CAS arbitrating pop
     * vs deq (steal) from being on the indices ("base" and "sp") to
     * the slots themselves (mainly via method "casSlotNull()"). So,
     * both a successful pop and deq mainly entail CAS'ing a nonnull
     * slot to null.  Because we rely on CASes of references, we do
     * not need tag bits on base or sp.  They are simple ints as used
     * in any circular array-based queue (see for example ArrayDeque).
     * Updates to the indices must still be ordered in a way that
     * guarantees that (sp - base) > 0 means the queue is empty, but
     * otherwise may err on the side of possibly making the queue
     * appear nonempty when a push, pop, or deq have not fully
     * committed. Note that this means that the deq operation,
     * considered individually, is not wait-free. One thief cannot
     * successfully continue until another in-progress one (or, if
     * previously empty, a push) completes.  However, in the
     * aggregate, we ensure at least probablistic non-blockingness. If
     * an attempted steal fails, a thief always chooses a different
     * random victim target to try next. So, in order for one thief to
     * progress, it suffices for any in-progress deq or new push on
     * any empty queue to complete. One reason this works well here is
     * that apparently-nonempty often means soon-to-be-stealable,
     * which gives threads a chance to activate if necessary before
     * stealing (see below).
     *
     * Efficient implementation of this approach currently relies on
     * an uncomfortable amount of "Unsafe" mechanics. To maintain
     * correct orderings, reads and writes of variable base require
     * volatile ordering.  Variable sp does not require volatile write
     * but needs cheaper store-ordering on writes.  Because they are
     * protected by volatile base reads, reads of the queue array and
     * its slots do not need volatile load semantics, but writes (in
     * push) require store order and CASes (in pop and deq) require
     * (volatile) CAS semantics. Since these combinations aren't
     * supported using ordinary volatiles, the only way to accomplish
     * these effciently is to use direct Unsafe calls. (Using external
     * AtomicIntegers and AtomicReferenceArrays for the indices and
     * array is significantly slower because of memory locality and
     * indirection effects.) Further, performance on most platforms is
     * very sensitive to placement and sizing of the (resizable) queue
     * array.  Even though these queues don't usually become all that
     * big, the initial size must be large enough to counteract cache
     * contention effects across multiple queues (especially in the
     * presence of GC cardmarking). Also, to improve thread-locality,
     * queues are currently initialized immediately after the thread
     * gets the initial signal to start processing tasks.  However,
     * all queue-related methods except pushTask are written in a way
     * that allows them to instead be lazily allocated and/or disposed
     * of when empty. All together, these low-level implementation
     * choices produce as much as a factor of 4 performance
     * improvement compared to naive implementations, and enable the
     * processing of billions of tasks per second, sometimes at the
     * expense of ugliness.
     *
     * 2. Run control: The primary run control is based on a global
     * counter (activeCount) held by the pool. It uses an algorithm
     * similar to that in Herlihy and Shavit section 17.6 to cause
     * threads to eventually block when all threads declare they are
     * inactive. (See variable "scans".)  For this to work, threads
     * must be declared active when executing tasks, and before
     * stealing a task. They must be inactive before blocking on the
     * Pool Barrier (awaiting a new submission or other Pool
     * event). In between, there is some free play which we take
     * advantage of to avoid contention and rapid flickering of the
     * global activeCount: If inactive, we activate only if a victim
     * queue appears to be nonempty (see above).  Similarly, a thread
     * tries to inactivate only after a full scan of other threads.
     * The net effect is that contention on activeCount is rarely a
     * measurable performance issue. (There are also a few other cases
     * where we scan for work rather than retry/block upon
     * contention.)
     *
     * 3. Selection control. We maintain policy of always choosing to
     * run local tasks rather than stealing, and always trying to
     * steal tasks before trying to run a new submission. All steals
     * are currently performed in randomly-chosen deq-order. It may be
     * worthwhile to bias these with locality / anti-locality
     * information, but doing this well probably requires more
     * lower-level information from JVMs than currently provided.
     */

    /**
     * Capacity of work-stealing queue array upon initialization.
     * Must be a power of two. Initial size must be at least 2, but is
     * padded to minimize cache effects.
     */
    private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

    /**
     * Maximum work-stealing queue array size.  Must be less than or
     * equal to 1 << 28 to ensure lack of index wraparound. (This
     * is less than usual bounds, because we need leftshift by 3
     * to be in int range).
     */
    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28;

    /**
     * The pool this thread works in. Accessed directly by ForkJoinTask
     */
    final ForkJoinPool pool;

    /**
     * The work-stealing queue array. Size must be a power of two.
     * Initialized when thread starts, to improve memory locality.
     */
    private ForkJoinTask<?>[] queue;

    /**
     * Index (mod queue.length) of next queue slot to push to or pop
     * from. It is written only by owner thread, via ordered store.
     * Both sp and base are allowed to wrap around on overflow, but
     * (sp - base) still estimates size.
     */
    private volatile int sp;

    /**
     * Index (mod queue.length) of least valid queue slot, which is
     * always the next position to steal from if nonempty.
     */
    private volatile int base;

    /**
     * Activity status. When true, this worker is considered active.
     * Must be false upon construction. It must be true when executing
     * tasks, and BEFORE stealing a task. It must be false before
     * calling pool.sync
     */
    private boolean active;

    /**
     * Run state of this worker. Supports simple versions of the usual
     * shutdown/shutdownNow control.
     */
    private volatile int runState;

    /**
     * Seed for random number generator for choosing steal victims.
     * Uses Marsaglia xorshift. Must be nonzero upon initialization.
     */
    private int seed;

    /**
     * Number of steals, transferred to pool when idle
     */
    private int stealCount;

    /**
     * Index of this worker in pool array. Set once by pool before
     * running, and accessed directly by pool during cleanup etc
     */
    int poolIndex;

    /**
     * The last barrier event waited for. Accessed in pool callback
     * methods, but only by current thread.
     */
    long lastEventCount;

    /**
     * True if use local fifo, not default lifo, for local polling
     */
    private boolean locallyFifo;

    /**
     * Creates a ForkJoinWorkerThread operating in the given pool.
     * @param pool the pool this thread works in
     * @throws NullPointerException if pool is null
     */
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        if (pool == null) throw new NullPointerException();
        this.pool = pool;
        // Note: poolIndex is set by pool during construction
        // Remaining initialization is deferred to onStart
    }

    // Public access methods

    /**
     * Returns the pool hosting this thread
     * @return the pool
     */
    public ForkJoinPool getPool() {
        return pool;
    }

    /**
     * Returns the index number of this thread in its pool.  The
     * returned value ranges from zero to the maximum number of
     * threads (minus one) that have ever been created in the pool.
     * This method may be useful for applications that track status or
     * collect results per-worker rather than per-task.
     * @return the index number.
     */
    public int getPoolIndex() {
        return poolIndex;
    }

    /**
     * Establishes local first-in-first-out scheduling mode for forked
     * tasks that are never joined.
     * @param async if true, use locally FIFO scheduling
     */
    void setAsyncMode(boolean async) {
        locallyFifo = async;
    }

    // Runstate management

    // Runstate values. Order matters
    private static final int RUNNING     = 0;
    private static final int SHUTDOWN    = 1;
    private static final int TERMINATING = 2;
    private static final int TERMINATED  = 3;

    final boolean isShutdown()    { return runState >= SHUTDOWN;  }
    final boolean isTerminating() { return runState >= TERMINATING;  }
    final boolean isTerminated()  { return runState == TERMINATED; }
    final boolean shutdown()      { return transitionRunStateTo(SHUTDOWN); }
    final boolean shutdownNow()   { return transitionRunStateTo(TERMINATING); }

    /**
     * Transition to at least the given state. Return true if not
     * already at least given state.
     */
    private boolean transitionRunStateTo(int state) {
        for (;;) {
            int s = runState;
            if (s >= state)
                return false;
            if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state))
                return true;
        }
    }

    /**
     * Try to set status to active; fail on contention
     */
    private boolean tryActivate() {
        if (!active) {
            if (!pool.tryIncrementActiveCount())
                return false;
            active = true;
        }
        return true;
    }

    /**
     * Try to set status to active; fail on contention
     */
    private boolean tryInactivate() {
        if (active) {
            if (!pool.tryDecrementActiveCount())
                return false;
            active = false;
        }
        return true;
    }

    /**
     * Computes next value for random victim probe. Scans don't
     * require a very high quality generator, but also not a crummy
     * one. Marsaglia xor-shift is cheap and works well.
     */
    private static int xorShift(int r) {
        r ^= r << 1;
        r ^= r >>> 3;
        r ^= r << 10;
        return r;
    }

    // Lifecycle methods

    /**
     * This method is required to be public, but should never be
     * called explicitly. It performs the main run loop to execute
     * ForkJoinTasks.
     */
    public void run() {
        Throwable exception = null;
        try {
            onStart();
            pool.sync(this); // await first pool event
            mainLoop();
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            onTermination(exception);
        }
    }

    /**
     * Execute tasks until shut down.
     */
    private void mainLoop() {
        while (!isShutdown()) {
            ForkJoinTask<?> t = pollTask();
            if (t != null || (t = pollSubmission()) != null)
                t.quietlyExec();
            else if (tryInactivate())
                pool.sync(this);
        }
    }

    /**
     * Initializes internal state after construction but before
     * processing any tasks. If you override this method, you must
     * invoke super.onStart() at the beginning of the method.
     * Initialization requires care: Most fields must have legal
     * default values, to ensure that attempted accesses from other
     * threads work correctly even before this thread starts
     * processing tasks.
     */
    protected void onStart() {
        // Allocate while starting to improve chances of thread-local
        // isolation
        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
        // Initial value of seed need not be especially random but
        // should differ across workers and must be nonzero
        int p = poolIndex + 1;
        seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits
    }

    /**
     * Perform cleanup associated with termination of this worker
     * thread.  If you override this method, you must invoke
     * super.onTermination at the end of the overridden method.
     *
     * @param exception the exception causing this thread to abort due
     * to an unrecoverable error, or null if completed normally.
     */
    protected void onTermination(Throwable exception) {
        // Execute remaining local tasks unless aborting or terminating
        while (exception == null &&  !pool.isTerminating() && base != sp) {
            try {
                ForkJoinTask<?> t = popTask();
                if (t != null)
                    t.quietlyExec();
            } catch(Throwable ex) {
                exception = ex;
            }
        }
        // Cancel other tasks, transition status, notify pool, and
        // propagate exception to uncaught exception handler
        try {
            do;while (!tryInactivate()); // ensure inactive
            cancelTasks();
            runState = TERMINATED;
            pool.workerTerminated(this);
        } catch (Throwable ex) {        // Shouldn't ever happen
            if (exception == null)      // but if so, at least rethrown
                exception = ex;
        } finally {
            if (exception != null)
                ForkJoinTask.rethrowException(exception);
        }
    }

    // Intrinsics-based support for queue operations.

    /**
     * Add in store-order the given task at given slot of q to
     * null. Caller must ensure q is nonnull and index is in range.
     */
    private static void setSlot(ForkJoinTask<?>[] q, int i,
                                ForkJoinTask<?> t){
        _unsafe.putOrderedObject(q, (i << qShift) + qBase, t);
    }

    /**
     * CAS given slot of q to null. Caller must ensure q is nonnull
     * and index is in range.
     */
    private static boolean casSlotNull(ForkJoinTask<?>[] q, int i,
                                       ForkJoinTask<?> t) {
        return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null);
    }

    /**
     * Sets sp in store-order.
     */
    private void storeSp(int s) {
        _unsafe.putOrderedInt(this, spOffset, s);
    }

    // Main queue methods

    /**
     * Pushes a task. Called only by current thread.
     * @param t the task. Caller must ensure nonnull
     */
    final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q = queue;
        int mask = q.length - 1;
        int s = sp;
        setSlot(q, s & mask, t);
        storeSp(++s);
        if ((s -= base) == 1)
            pool.signalWork();
        else if (s >= mask)
            growQueue();
    }

    /**
     * Tries to take a task from the base of the queue, failing if
     * either empty or contended.
     * @return a task, or null if none or contended.
     */
    final ForkJoinTask<?> deqTask() {
        ForkJoinTask<?> t;
        ForkJoinTask<?>[] q;
        int i;
        int b;
        if (sp != (b = base) &&
            (q = queue) != null && // must read q after b
            (t = q[i = (q.length - 1) & b]) != null &&
            casSlotNull(q, i, t)) {
            base = b + 1;
            return t;
        }
        return null;
    }

    /**
     * Returns a popped task, or null if empty. Ensures active status
     * if nonnull. Called only by current thread.
     */
    final ForkJoinTask<?> popTask() {
        int s = sp;
        while (s != base) {
            if (tryActivate()) {
                ForkJoinTask<?>[] q = queue;
                int mask = q.length - 1;
                int i = (s - 1) & mask;
                ForkJoinTask<?> t = q[i];
                if (t == null || !casSlotNull(q, i, t))
                    break;
                storeSp(s - 1);
                return t;
            }
        }
        return null;
    }

    /**
     * Specialized version of popTask to pop only if
     * topmost element is the given task. Called only
     * by current thread while active.
     * @param t the task. Caller must ensure nonnull
     */
    final boolean unpushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q = queue;
        int mask = q.length - 1;
        int s = sp - 1;
        if (casSlotNull(q, s & mask, t)) {
            storeSp(s);
            return true;
        }
        return false;
    }

    /**
     * Returns next task.
     */
    final ForkJoinTask<?> peekTask() {
        ForkJoinTask<?>[] q = queue;
        if (q == null)
            return null;
        int mask = q.length - 1;
        int i = locallyFifo? base : (sp - 1);
        return q[i & mask];
    }

    /**
     * Doubles queue array size. Transfers elements by emulating
     * steals (deqs) from old array and placing, oldest first, into
     * new array.
     */
    private void growQueue() {
        ForkJoinTask<?>[] oldQ = queue;
        int oldSize = oldQ.length;
        int newSize = oldSize << 1;
        if (newSize > MAXIMUM_QUEUE_CAPACITY)
            throw new RejectedExecutionException("Queue capacity exceeded");
        ForkJoinTask<?>[] newQ = queue = new ForkJoinTask<?>[newSize];

        int b = base;
        int bf = b + oldSize;
        int oldMask = oldSize - 1;
        int newMask = newSize - 1;
        do {
            int oldIndex = b & oldMask;
            ForkJoinTask<?> t = oldQ[oldIndex];
            if (t != null && !casSlotNull(oldQ, oldIndex, t))
                t = null;
            setSlot(newQ, b & newMask, t);
        } while (++b != bf);
        pool.signalWork();
    }

    /**
     * Tries to steal a task from another worker. Starts at a random
     * index of workers array, and probes workers until finding one
     * with non-empty queue or finding that all are empty.  It
     * randomly selects the first n probes. If these are empty, it
     * resorts to a full circular traversal, which is necessary to
     * accurately set active status by caller. Also restarts if pool
     * events occurred since last scan, which forces refresh of
     * workers array, in case barrier was associated with resize.
     *
     * This method must be both fast and quiet -- usually avoiding
     * memory accesses that could disrupt cache sharing etc other than
     * those needed to check for and take tasks. This accounts for,
     * among other things, updating random seed in place without
     * storing it until exit.
     *
     * @return a task, or null if none found
     */
    private ForkJoinTask<?> scan() {
        ForkJoinTask<?> t = null;
        int r = seed;                    // extract once to keep scan quiet
        ForkJoinWorkerThread[] ws;       // refreshed on outer loop
        int mask;                        // must be power 2 minus 1 and > 0
        outer:do {
            if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) {
                int idx = r;
                int probes = ~mask;      // use random index while negative
                for (;;) {
                    r = xorShift(r);     // update random seed
                    ForkJoinWorkerThread v = ws[mask & idx];
                    if (v == null || v.sp == v.base) {
                        if (probes <= mask)
                            idx = (probes++ < 0)? r : (idx + 1);
                        else
                            break;
                    }
                    else if (!tryActivate() || (t = v.deqTask()) == null)
                        continue outer;  // restart on contention
                    else
                        break outer;
                }
            }
        } while (pool.hasNewSyncEvent(this)); // retry on pool events
        seed = r;
        return t;
    }

    /**
     * gets and removes a local or stolen a task
     * @return a task, if available
     */
    final ForkJoinTask<?> pollTask() {
        ForkJoinTask<?> t = locallyFifo? deqTask() : popTask();
        if (t == null && (t = scan()) != null)
            ++stealCount;
        return t;
    }

    /**
     * gets a local task
     * @return a task, if available
     */
    final ForkJoinTask<?> pollLocalTask() {
        return locallyFifo? deqTask() : popTask();
    }

    /**
     * Returns a pool submission, if one exists, activating first.
     * @return a submission, if available
     */
    private ForkJoinTask<?> pollSubmission() {
        ForkJoinPool p = pool;
        while (p.hasQueuedSubmissions()) {
            ForkJoinTask<?> t;
            if (tryActivate() && (t = p.pollSubmission()) != null)
                return t;
        }
        return null;
    }

    // Methods accessed only by Pool

    /**
     * Removes and cancels all tasks in queue.  Can be called from any
     * thread.
     */
    final void cancelTasks() {
        ForkJoinTask<?> t;
        while (base != sp && (t = deqTask()) != null)
            t.cancelIgnoringExceptions();
    }

    /**
     * Drains tasks to given collection c
     * @return the number of tasks drained
     */
    final int drainTasksTo(Collection<ForkJoinTask<?>> c) {
        int n = 0;
        ForkJoinTask<?> t;
        while (base != sp && (t = deqTask()) != null) {
            c.add(t);
            ++n;
        }
        return n;
    }

    /**
     * Get and clear steal count for accumulation by pool.  Called
     * only when known to be idle (in pool.sync and termination).
     */
    final int getAndClearStealCount() {
        int sc = stealCount;
        stealCount = 0;
        return sc;
    }

    /**
     * Returns true if at least one worker in the given array appears
     * to have at least one queued task.
     * @param ws array of workers
     */
    static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) {
        if (ws != null) {
            int len = ws.length;
            for (int j = 0; j < 2; ++j) { // need two passes for clean sweep
                for (int i = 0; i < len; ++i) {
                    ForkJoinWorkerThread w = ws[i];
                    if (w != null && w.sp != w.base)
                        return true;
                }
            }
        }
        return false;
    }

    // Support methods for ForkJoinTask

    /**
     * Returns an estimate of the number of tasks in the queue.
     */
    final int getQueueSize() {
        int n = sp - base;
        return n < 0? 0 : n; // suppress momentarily negative values
    }

    /**
     * Returns an estimate of the number of tasks, offset by a
     * function of number of idle workers.
     */
    final int getEstimatedSurplusTaskCount() {
        // The halving approximates weighting idle vs non-idle workers
        return (sp - base) - (pool.getIdleThreadCount() >>> 1);
    }

    /**
     * Scan, returning early if joinMe done
     */
    final ForkJoinTask<?> scanWhileJoining(ForkJoinTask<?> joinMe) {
        ForkJoinTask<?> t = pollTask();
        if (t != null && joinMe.status < 0 && sp == base) {
            pushTask(t); // unsteal if done and this task would be stealable
            t = null;
        }
        return t;
    }

    /**
     * Runs tasks until pool isQuiescent
     */
    final void helpQuiescePool() {
        for (;;) {
            ForkJoinTask<?> t = pollTask();
            if (t != null)
                t.quietlyExec();
            else if (tryInactivate() && pool.isQuiescent())
                break;
        }
        do;while (!tryActivate()); // re-activate on exit
    }

    // Temporary Unsafe mechanics for preliminary release
    private static Unsafe getUnsafe() throws Throwable {
        try {
            return Unsafe.getUnsafe();
        } catch (SecurityException se) {
            try {
                return java.security.AccessController.doPrivileged
                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
                        public Unsafe run() throws Exception {
                            return getUnsafePrivileged();
                        }});
            } catch (java.security.PrivilegedActionException e) {
                throw e.getCause();
            }
        }
    }

    private static Unsafe getUnsafePrivileged()
            throws NoSuchFieldException, IllegalAccessException {
        Field f = Unsafe.class.getDeclaredField("theUnsafe");
        f.setAccessible(true);
        return (Unsafe) f.get(null);
    }

    private static long fieldOffset(String fieldName)
            throws NoSuchFieldException {
        return _unsafe.objectFieldOffset
            (ForkJoinWorkerThread.class.getDeclaredField(fieldName));
    }

    static final Unsafe _unsafe;
    static final long baseOffset;
    static final long spOffset;
    static final long runStateOffset;
    static final long qBase;
    static final int qShift;
    static {
        try {
            _unsafe = getUnsafe();
            baseOffset = fieldOffset("base");
            spOffset = fieldOffset("sp");
            runStateOffset = fieldOffset("runState");
            qBase = _unsafe.arrayBaseOffset(ForkJoinTask[].class);
            int s = _unsafe.arrayIndexScale(ForkJoinTask[].class);
            if ((s & (s-1)) != 0)
                throw new Error("data type scale not a power of two");
            qShift = 31 - Integer.numberOfLeadingZeros(s);
        } catch (Throwable e) {
            throw new RuntimeException("Could not initialize intrinsics", e);
        }
    }
}