blob: 69b4eaa050f2e599dd16cfd474dff1e92f293061 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
// $Id$
package scala.actors
import java.lang.{Thread, InterruptedException}
import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue}
import scala.compat.Platform
/**
* <p>This scheduler uses a thread pool to execute tasks that are generated
* by the execution of actors.</p>
*
* Use class <code>FJTaskScheduler2</code> instead.
*
* @version 0.9.18
* @author Philipp Haller
*/
@deprecated
class TickedScheduler extends Thread with WorkerThreadScheduler {
// as long as this thread runs, JVM should not exit
setDaemon(false)
private val tasks = new Queue[Runnable]
// Worker threads
private val workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
private val idle = new Queue[WorkerThread]
private val ticks = new HashMap[WorkerThread, Long]
private var terminating = false
private var lastActivity = Platform.currentTime
def printActorDump {}
private var TICK_FREQ = 5
private var CHECK_FREQ = 50
for (i <- List.range(0, 2)) {
val worker = new WorkerThread(this)
workers += worker
worker.start()
}
def onLockup(handler: () => Unit) {
lockupHandler = handler
}
def onLockup(millis: Int)(handler: () => Unit) {
//LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
lockupHandler = handler
}
private var lockupHandler: () => Unit = null
override def run() {
try {
while (!terminating) {
this.synchronized {
try {
wait(CHECK_FREQ)
} catch {
case _: InterruptedException =>
if (terminating) throw new QuitException
}
ActorGC.gc()
if (tasks.length > 0) {
// check if we need more threads
if (Platform.currentTime - lastActivity >= TICK_FREQ) {
val newWorker = new WorkerThread(this)
workers += newWorker
// dequeue item to be processed
val item = tasks.dequeue
newWorker.execute(item)
newWorker.start()
}
} // tasks.length > 0
else {
if (ActorGC.allTerminated) {
// if all worker threads idle terminate
if (workers.length == idle.length) {
Debug.info(this+": initiating shutdown...")
val idleThreads = idle.elements
while (idleThreads.hasNext) {
val worker = idleThreads.next
worker.running = false
worker.interrupt()
}
// terminate timer thread
Actor.timer.cancel()
throw new QuitException
}
}
}
} // sync
} // while (!terminating)
} catch {
case _: QuitException =>
// allow thread to exit
}
}
/**
* @param item the task to be executed.
*/
def execute(item: Runnable): Unit = synchronized {
if (!terminating) {
if (idle.length > 0) {
val wt = idle.dequeue
wt.execute(item)
}
else
tasks += item
}
}
def execute(fun: => Unit): Unit =
execute(new Runnable {
def run() { fun }
})
/**
* @param worker the worker thread executing tasks
* @return the executed task
*/
def getTask(worker: WorkerThread) = synchronized {
if (terminating)
QUIT_TASK
if (tasks.length > 0) {
val item = tasks.dequeue
item
}
else {
idle += worker
null
}
}
/**
* @param a the actor
*/
def tick(a: Actor) {
lastActivity = Platform.currentTime
}
/** Shuts down all idle worker threads.
*/
def shutdown(): Unit = synchronized {
terminating = true
val idleThreads = idle.elements
while (idleThreads.hasNext) {
val worker = idleThreads.next
worker.running = false
worker.interrupt()
}
// terminate timer thread
Actor.timer.cancel()
}
}
|