summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/Scheduler.scala
blob: 764236f9d814931f02a55f29d276153e454d9ec7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2007, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

// $Id$

package scala.actors

import compat.Platform

import java.lang.{Runnable, Thread, InterruptedException}

import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}

/**
 * The <code>Scheduler</code> object is used by
 * <code>Actor</code> to execute tasks of an execution of an actor.
 *
 * @version 0.9.18
 * @author Philipp Haller
 */
object Scheduler extends IScheduler {

  private var sched: IScheduler = {
    val s = new FJTaskScheduler2
    s.start()
    s
  }

  def impl = sched
  def impl_= (scheduler: IScheduler) = {
    sched = scheduler
  }

  private var tasks: LinkedQueue = null
  private var pendingCount = 0

  /* Assumes <code>sched</code> holds an instance
   * of <code>FJTaskScheduler2</code>.
   */
  def snapshot(): Unit =
    if (sched.isInstanceOf[FJTaskScheduler2]) {
      val fjts = sched.asInstanceOf[FJTaskScheduler2]
      tasks = fjts.snapshot()
      pendingCount = ActorGC.getPendingCount
      fjts.shutdown()
    } else
      error("snapshot operation not supported.")

  /* Creates an instance of class <code>FJTaskScheduler2</code>
   * and submits <code>tasks</code> for execution.
   */
  def restart(): Unit = synchronized {
    sched = {
      val s = new FJTaskScheduler2
      ActorGC.setPendingCount(pendingCount)
      s.start()
      s
    }
    Actor.timer = new java.util.Timer
    while (!tasks.isEmpty()) {
      sched.execute(tasks.take().asInstanceOf[FJTask])
    }
    tasks = null
  }

  def execute(task: Runnable) {
    val t = currentThread
    if (t.isInstanceOf[FJTaskRunner]) {
      val tr = t.asInstanceOf[FJTaskRunner]
      tr.push(new FJTask {
        def run() { task.run() }
      })
    } else
      sched execute task
  }

  def execute(fun: => Unit) {
    val t = currentThread
    if (t.isInstanceOf[FJTaskRunner]) {
      val tr = t.asInstanceOf[FJTaskRunner]
      tr.push(new FJTask {
        def run() { fun }
      })
    } else
      sched execute { fun }
  }

  /* This method is used to notify the scheduler
   * of library activity by the argument Actor.
   */
  def tick(a: Actor) = sched tick a

  def shutdown() = sched.shutdown()

  def onLockup(handler: () => Unit) = sched.onLockup(handler)
  def onLockup(millis: Int)(handler: () => Unit) = sched.onLockup(millis)(handler)
  def printActorDump = sched.printActorDump
}


/**
 * The <code>IScheduler</code> trait provides a common interface
 * for all schedulers used to execute actor tasks.
 *
 * Subclasses of <code>Actor</code> that override its
 * <code>scheduler</code> member value must provide
 * an implementation of the <code>IScheduler</code>
 * trait.
 *
 * @version 0.9.18
 * @author Philipp Haller
 */
trait IScheduler {

  /** Submits a closure for execution.
   *
   *  @param  fun  the closure to be executed
   */
  def execute(fun: => Unit): Unit

  /** Submits a <code>Runnable</code> for execution.
   *
   *  @param  task  the task to be executed
   */
  def execute(task: Runnable): Unit

  /** Notifies the scheduler about activity of the
   *  executing actor.
   *
   *  @param  a  the active actor
   */
  def tick(a: Actor): Unit

  /** Shuts down the scheduler.
   */
  def shutdown(): Unit

  def onLockup(handler: () => Unit): Unit
  def onLockup(millis: Int)(handler: () => Unit): Unit
  def printActorDump: Unit

  val QUIT_TASK = new Reaction(null) {
    override def run(): Unit = {}
    override def toString() = "QUIT_TASK"
  }
}


trait WorkerThreadScheduler extends IScheduler {
  /**
   *  @param  worker the worker thread executing tasks
   *  @return        the task to be executed
   */
  def getTask(worker: WorkerThread): Runnable
}


/**
 * This scheduler executes the tasks of an actor on a single
 * thread (the current thread).
 *
 * @version 0.9.18
 * @author Philipp Haller
 */
class SingleThreadedScheduler extends IScheduler {

  def execute(task: Runnable) {
    task.run()
  }

  def execute(fun: => Unit): Unit =
    execute(new Runnable {
      def run() { fun }
    })

  def tick(a: Actor) {}

  def shutdown() {}

  def onLockup(handler: () => Unit) {}
  def onLockup(millis: Int)(handler: () => Unit) {}
  def printActorDump {}
}


/**
 * The <code>QuitException</code> class is used to manage control flow
 * of certain schedulers and worker threads.
 *
 * @version 0.9.8
 * @author Philipp Haller
 */
private[actors] class QuitException extends Throwable {
  /*
   For efficiency reasons we do not fill in
   the execution stack trace.
   */
  override def fillInStackTrace(): Throwable = this
}


/**
 * <p>
 *   The class <code>WorkerThread</code> is used by schedulers to execute
 *   actor tasks on multiple threads.
 * </p>
 * <p>
 *   !!ACHTUNG: If you change this, make sure you understand the following
 *   proof of deadlock-freedom!!
 * </p>
 * <p>
 *   We proof that there is no deadlock between the scheduler and
 *   any worker thread possible. For this, note that the scheduler
 *   only acquires the lock of a worker thread by calling
 *   <code>execute</code>.  This method is only called when the worker thread
 *   is in the idle queue of the scheduler. On the other hand, a
 *   worker thread only acquires the lock of the scheduler when it
 *   calls <code>getTask</code>. At the only callsite of <code>getTask</code>,
 *   the worker thread holds its own lock.
 * </p>
 * <p>
 *   Thus, deadlock can only occur when a worker thread calls
 *   <code>getTask</code> while it is in the idle queue of the scheduler,
 *   because then the scheduler might call (at any time!) <code>execute</code>
 *   which tries to acquire the lock of the worker thread. In such
 *   a situation the worker thread would be waiting to acquire the
 *   lock of the scheduler and vice versa.
 * </p>
 * <p>
 *   Therefore, to prove deadlock-freedom, it suffices to ensure
 *   that a worker thread will never call <code>getTask</code> when
 *   it is in the idle queue of the scheduler.
 * </p>
 * <p>
 *   A worker thread enters the idle queue of the scheduler when
 *   <code>getTask</code> returns <code>null</code>. Then it will also stay
 *   in the while-loop W (<code>while (task eq null)</code>) until
 *   <code>task</code> becomes non-null. The only way this can happen is
 *   through a call of <code>execute</code> by the scheduler. Before every
 *   call of <code>execute</code> the worker thread is removed from the idle
 *   queue of the scheduler. Only then--after executing its task--
 *   the worker thread may call <code>getTask</code>. However, the scheduler
 *   is unable to call <code>execute</code> as the worker thread is not in
 *   the idle queue any more. In fact, the scheduler made sure
 *   that this is the case even _before_ calling <code>execute</code> and
 *   thus releasing the worker thread from the while-loop W. Thus,
 *   the property holds for every possible interleaving of thread
 *   execution. QED
 * </p>
 *
 * @version 0.9.18
 * @author Philipp Haller
 */
class WorkerThread(sched: WorkerThreadScheduler) extends Thread {
  private var task: Runnable = null
  private[actors] var running = true

  def execute(r: Runnable) = synchronized {
    task = r
    notify()
  }

  override def run(): Unit =
    try {
      while (running) {
        if (task ne null) {
          try {
            task.run()
          } catch {
            case consumed: InterruptedException =>
              if (!running) throw new QuitException
          }
        }
        this.synchronized {
          task = sched getTask this

          while (task eq null) {
            try {
              wait()
            } catch {
              case consumed: InterruptedException =>
                if (!running) throw new QuitException
            }
          }

          if (task == sched.QUIT_TASK) {
            running = false
          }
        }
      }
    } catch {
      case consumed: QuitException =>
        // allow thread to quit
    }

}