summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/TickedScheduler.scala
blob: 69b4eaa050f2e599dd16cfd474dff1e92f293061 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2007, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

// $Id$

package scala.actors

import java.lang.{Thread, InterruptedException}

import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue}
import scala.compat.Platform

/**
 * <p>This scheduler uses a thread pool to execute tasks that are generated
 * by the execution of actors.</p>
 *
 * Use class <code>FJTaskScheduler2</code> instead.
 *
 * @version 0.9.18
 * @author Philipp Haller
 */
@deprecated
class TickedScheduler extends Thread with WorkerThreadScheduler {
  // as long as this thread runs, JVM should not exit
  setDaemon(false)

  private val tasks = new Queue[Runnable]

  // Worker threads
  private val workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
  private val idle = new Queue[WorkerThread]
  private val ticks = new HashMap[WorkerThread, Long]

  private var terminating = false

  private var lastActivity = Platform.currentTime

  def printActorDump {}

  private var TICK_FREQ = 5
  private var CHECK_FREQ = 50

  for (i <- List.range(0, 2)) {
    val worker = new WorkerThread(this)
    workers += worker
    worker.start()
  }

  def onLockup(handler: () => Unit) {
    lockupHandler = handler
  }

  def onLockup(millis: Int)(handler: () => Unit) {
    //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
    lockupHandler = handler
  }

  private var lockupHandler: () => Unit = null

  override def run() {
    try {
      while (!terminating) {
        this.synchronized {
          try {
            wait(CHECK_FREQ)
          } catch {
            case _: InterruptedException =>
              if (terminating) throw new QuitException
          }

          ActorGC.gc()

          if (tasks.length > 0) {
            // check if we need more threads
            if (Platform.currentTime - lastActivity >= TICK_FREQ) {
              val newWorker = new WorkerThread(this)
              workers += newWorker

              // dequeue item to be processed
              val item = tasks.dequeue

              newWorker.execute(item)
              newWorker.start()
            }
          } // tasks.length > 0
          else {
            if (ActorGC.allTerminated) {
              // if all worker threads idle terminate
              if (workers.length == idle.length) {
                Debug.info(this+": initiating shutdown...")

                val idleThreads = idle.elements
                while (idleThreads.hasNext) {
                  val worker = idleThreads.next
                  worker.running = false
                  worker.interrupt()
                }
                // terminate timer thread
                Actor.timer.cancel()
                throw new QuitException
              }
            }
          }
        } // sync

      } // while (!terminating)
    } catch {
      case _: QuitException =>
        // allow thread to exit
    }
  }

  /**
   *  @param item the task to be executed.
   */
  def execute(item: Runnable): Unit = synchronized {
    if (!terminating) {
      if (idle.length > 0) {
        val wt = idle.dequeue
        wt.execute(item)
      }
      else
        tasks += item
    }
  }

  def execute(fun: => Unit): Unit =
    execute(new Runnable {
      def run() { fun }
    })

  /**
   *  @param worker the worker thread executing tasks
   *  @return       the executed task
   */
  def getTask(worker: WorkerThread) = synchronized {
    if (terminating)
      QUIT_TASK
    if (tasks.length > 0) {
      val item = tasks.dequeue
      item
    }
    else {
      idle += worker
      null
    }
  }

  /**
   *  @param a the actor
   */
  def tick(a: Actor) {
    lastActivity = Platform.currentTime
  }

  /** Shuts down all idle worker threads.
   */
  def shutdown(): Unit = synchronized {
    terminating = true
    val idleThreads = idle.elements
    while (idleThreads.hasNext) {
      val worker = idleThreads.next
      worker.running = false
      worker.interrupt()
    }
    // terminate timer thread
    Actor.timer.cancel()
  }
}