summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/threadpool/locks/FIFOCondVar.java
blob: 144ac54d37404e465c697144596ea5cfab4b3c5d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/*
  File: ConditionVariable.java
  Originally written by Doug Lea and released into the public domain.
  This may be used for any purposes whatsoever without acknowledgment.
  Thanks for the assistance and support of Sun Microsystems Labs,
  and everyone contributing, testing, and using this code.
  History:
  Date       Who                What
  11Jun1998  dl               Create public version
 */

package scala.actors.threadpool.locks;

import java.util.Collection;
import java.util.Date;
import scala.actors.threadpool.*;
import scala.actors.threadpool.helpers.*;

class FIFOCondVar extends CondVar implements Condition, java.io.Serializable {
    private static final long serialVersionUID = -497497271881010475L;

    private static final WaitQueue.QueuedSync sync = new WaitQueue.QueuedSync() {
        public boolean recheck(WaitQueue.WaitNode node) { return false; }
        public void takeOver(WaitQueue.WaitNode node) {}
    };

    // wait queue; only accessed when holding the lock
    private final WaitQueue wq = new FIFOWaitQueue();

    /**
     * Create a new CondVar that relies on the given mutual exclusion lock.
     * @param lock A non-reentrant mutual exclusion lock.
     */
    FIFOCondVar(ExclusiveLock lock) {
        super(lock);
    }

    public void awaitUninterruptibly() {
        int holdCount = lock.getHoldCount();
        if (holdCount == 0) {
            throw new IllegalMonitorStateException();
        }
        WaitQueue.WaitNode n = new WaitQueue.WaitNode();
        wq.insert(n);
        for (int i=holdCount; i>0; i--) lock.unlock();
        try {
            n.doWaitUninterruptibly(sync);
        }
        finally {
            for (int i=holdCount; i>0; i--) lock.lock();
        }
    }

    public void await() throws InterruptedException {
        int holdCount = lock.getHoldCount();
        if (holdCount == 0) {
            throw new IllegalMonitorStateException();
        }
        if (Thread.interrupted()) throw new InterruptedException();
        WaitQueue.WaitNode n = new WaitQueue.WaitNode();
        wq.insert(n);
        for (int i=holdCount; i>0; i--) lock.unlock();
        try {
            n.doWait(sync);
        }
        finally {
            for (int i=holdCount; i>0; i--) lock.lock();
        }
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        int holdCount = lock.getHoldCount();
        if (holdCount == 0) {
            throw new IllegalMonitorStateException();
        }
        if (Thread.interrupted()) throw new InterruptedException();
        long nanos = unit.toNanos(timeout);
        WaitQueue.WaitNode n = new WaitQueue.WaitNode();
        wq.insert(n);
        boolean success = false;
        for (int i=holdCount; i>0; i--) lock.unlock();
        try {
            success = n.doTimedWait(sync, nanos);
        }
        finally {
            for (int i=holdCount; i>0; i--) lock.lock();
        }
        return success;
    }

//    public long awaitNanos(long timeout) throws InterruptedException {
//        throw new UnsupportedOperationException();
//    }
//
    public boolean awaitUntil(Date deadline) throws InterruptedException {
        if (deadline == null) throw new NullPointerException();
        long abstime = deadline.getTime();
        long start = System.currentTimeMillis();
        long msecs = abstime - start;
        return await(msecs, TimeUnit.MILLISECONDS);
    }

    public void signal() {
        if (!lock.isHeldByCurrentThread()) {
            throw new IllegalMonitorStateException();
        }
        for (;;) {
            WaitQueue.WaitNode w = wq.extract();
            if (w == null) return;  // no one to signal
            if (w.signal(sync)) return; // notify if still waiting, else skip
        }
    }

    public void signalAll() {
        if (!lock.isHeldByCurrentThread()) {
            throw new IllegalMonitorStateException();
        }
        for (;;) {
            WaitQueue.WaitNode w = wq.extract();
            if (w == null) return;  // no more to signal
            w.signal(sync);
        }
    }

    protected boolean hasWaiters() {
        if (!lock.isHeldByCurrentThread()) {
            throw new IllegalMonitorStateException();
        }
        return wq.hasNodes();
    }

    protected int getWaitQueueLength() {
        if (!lock.isHeldByCurrentThread()) {
            throw new IllegalMonitorStateException();
        }
        return wq.getLength();
    }

    protected Collection getWaitingThreads() {
        if (!lock.isHeldByCurrentThread()) {
            throw new IllegalMonitorStateException();
        }
        return wq.getWaitingThreads();
    }


}