blob: 4f7a9ff8786b71198ddee48a394bcd0df5b97985 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
package scala.tools.nsc
package util
import scala.collection.mutable
class WorkScheduler {
type Action = () => Unit
private val todo = new mutable.Queue[Action]
private val throwables = new mutable.Queue[Throwable]
private val interruptReqs = new mutable.Queue[InterruptReq]
/** Called from server: block until one of todo list, throwables or interruptReqs is nonempty */
def waitForMoreWork() = synchronized {
while (todo.isEmpty && throwables.isEmpty && interruptReqs.isEmpty) { wait() }
}
/** called from Server: test whether one of todo list, throwables, or InterruptReqs is nonempty */
def moreWork: Boolean = synchronized {
todo.nonEmpty || throwables.nonEmpty || interruptReqs.nonEmpty
}
/** Called from server: get first action in todo list, and pop it off */
def nextWorkItem(): Option[Action] = synchronized {
if (todo.isEmpty) None else Some(todo.dequeue())
}
def dequeueAll[T](f: Action => Option[T]): Seq[T] = synchronized {
todo.dequeueAll(a => f(a).isDefined).map(a => f(a).get)
}
def dequeueAllInterrupts(f: InterruptReq => Unit): Unit = synchronized {
interruptReqs.dequeueAll { iq => f(iq); true }
}
/** Called from server: return optional exception posted by client
* Reset to no exception.
*/
def pollThrowable(): Option[Throwable] = synchronized {
if (throwables.isEmpty)
None
else {
val result = Some(throwables.dequeue())
if (!throwables.isEmpty)
postWorkItem { () => }
result
}
}
def pollInterrupt(): Option[InterruptReq] = synchronized {
if (interruptReqs.isEmpty) None else Some(interruptReqs.dequeue())
}
/** Called from client: have interrupt executed by server and return result */
def doQuickly[A](op: () => A): A = {
val ir = askDoQuickly(op)
ir.getResult()
}
def askDoQuickly[A](op: () => A): InterruptReq { type R = A } = {
val ir = new InterruptReq {
type R = A
val todo = op
}
synchronized {
interruptReqs enqueue ir
notify()
}
ir
}
/** Called from client: have action executed by server */
def postWorkItem(action: Action) = synchronized {
todo enqueue action
notify()
}
/** Called from client: cancel all queued actions */
def cancelQueued() = synchronized {
todo.clear()
}
/** Called from client:
* Require an exception to be thrown on next poll.
*/
def raise(exc: Throwable) = synchronized {
throwables enqueue exc
postWorkItem { new EmptyAction }
}
}
class EmptyAction extends (() => Unit) {
def apply() {}
}
|