summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-01-17 15:55:46 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-01-17 15:55:46 +0000
commitd3cc8c2190d265ca02a72d02ddde3579baf58667 (patch)
treea5611111a3833746eb6b3e08c5d729c1fc9f94ad /src/actors
parent0d6d353412537b3b51ee4b48ad0bbbc54ed0c01b (diff)
downloadscala-d3cc8c2190d265ca02a72d02ddde3579baf58667.tar.gz
scala-d3cc8c2190d265ca02a72d02ddde3579baf58667.tar.bz2
scala-d3cc8c2190d265ca02a72d02ddde3579baf58667.zip
Added scheduler based on JDK5 thread pool.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/JDK5Scheduler.scala149
-rw-r--r--src/actors/scala/actors/Reaction.scala19
-rw-r--r--src/actors/scala/actors/Scheduler.scala5
-rw-r--r--src/actors/scala/actors/TimerThread.scala19
4 files changed, 151 insertions, 41 deletions
diff --git a/src/actors/scala/actors/JDK5Scheduler.scala b/src/actors/scala/actors/JDK5Scheduler.scala
new file mode 100644
index 0000000000..c71a504d50
--- /dev/null
+++ b/src/actors/scala/actors/JDK5Scheduler.scala
@@ -0,0 +1,149 @@
+
+package scala.actors
+
+import compat.Platform
+
+import java.lang.{Runnable, Thread, InterruptedException}
+
+import scala.collection.Set
+import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
+
+import java.util.concurrent.{ThreadPoolExecutor,
+ LinkedBlockingQueue,
+ TimeUnit}
+
+/**
+ *
+ * @version 0.9.2
+ * @author Philipp Haller
+ */
+class JDK5Scheduler(initCoreSize: int, maxSize: int) extends Thread with IScheduler {
+
+ //Debug.info("using JDK5Scheduler")
+
+ /* Note:
+ * When using an unbounded queue such as a
+ * LinkedBlockingQueue, the executor never creates
+ * more than coreSize threads. Therefore, we pass
+ * coreSize also as the maxPoolSize parameter.
+ *
+ * Our maxSize controls how much we dynamically increase
+ * the pool's coreSize.
+ */
+ private val executor =
+ new ThreadPoolExecutor(initCoreSize,
+ initCoreSize,
+ 5000,
+ TimeUnit.NANOSECONDS,
+ new LinkedBlockingQueue)
+
+ private var coreSize = initCoreSize
+
+ private var terminating = false
+
+ private var lastActivity = Platform.currentTime
+
+ private var pendingReactions = 0
+ def pendReaction: unit = synchronized {
+ //Debug.info("pend reaction")
+ pendingReactions = pendingReactions + 1
+ }
+ def unPendReaction: unit = synchronized {
+ //Debug.info("unpend reaction")
+ pendingReactions = pendingReactions - 1
+ }
+
+ def printActorDump {}
+
+ def start(task: Reaction): unit = synchronized {
+ //Debug.info("Starting " + task.actor)
+ execute(task)
+ }
+
+ def terminated(a: Actor) {}
+
+ private var TICK_FREQ = 5
+ private var CHECK_FREQ = 50
+
+ def onLockup(handler: () => unit) =
+ lockupHandler = handler
+
+ def onLockup(millis: int)(handler: () => unit) = {
+ //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
+ lockupHandler = handler
+ }
+
+ private var lockupHandler: () => unit = null
+
+ override def run(): unit = {
+ try {
+ while (!terminating) {
+ this.synchronized {
+ try {
+ wait(CHECK_FREQ)
+ } catch {
+ case _: InterruptedException =>
+ if (terminating) throw new QuitException
+ }
+
+ //Debug.info("tasks.length: "+executor.getQueue().size())
+ //Debug.info("pendingReactions: "+pendingReactions)
+
+ // check if we need more threads
+ if (executor.getQueue().size() > 0
+ && Platform.currentTime - lastActivity >= TICK_FREQ
+ && coreSize < maxSize) {
+ coreSize = coreSize + 1
+ // increase corePoolSize of thread pool
+ executor.setCorePoolSize(coreSize)
+ }
+ else {
+ if (pendingReactions == 0) {
+ // if all worker threads idle terminate
+ if (executor.getActiveCount() == 0) {
+ //Debug.info("all threads idle, terminating")
+ executor.shutdown()
+ // terminate timer thread
+ TimerThread.t.interrupt()
+ Console.println("threads used: "+coreSize)
+ throw new QuitException
+ }
+ }
+ }
+ } // sync
+
+ } // while (!terminating)
+ } catch {
+ case _: QuitException =>
+ // allow thread to exit
+ }
+ }
+
+ /**
+ * @param item the task to be executed.
+ */
+ def execute(item: Reaction): unit = synchronized {
+ executor.execute(item)
+ }
+
+ /**
+ * @param worker the worker thread executing tasks
+ * @return the executed task
+ */
+ def getTask(worker: WorkerThread) = null
+
+ /**
+ * @param a the actor
+ */
+ def tick(a: Actor) {
+ lastActivity = Platform.currentTime
+ }
+
+ /** Shuts down all idle worker threads.
+ */
+ def shutdown(): unit = synchronized {
+ terminating = true
+
+ executor.shutdown()
+ }
+}
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 107b5e7847..f3510d7303 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -12,7 +12,6 @@
package scala.actors
import java.lang.{InterruptedException, Runnable}
-import java.util.logging.{Level, Logger}
class ExitActorException extends Throwable
@@ -31,22 +30,6 @@ private[actors] class Reaction(a: Actor,
def actor = a
- /**
- * @param t ...
- */
- def log(t: Throwable): unit = {
- Debug.info("logging "+t)
- val logger = Logger.getLogger("Scheduler")
- val buf = new StringBuffer
- buf.append("Exception caught by task:\n")
- buf.append(t.toString()+"\n")
- val trace = t.getStackTrace()
- for (val elem <- trace) {
- buf.append(elem.toString() + "\n")
- }
- logger.log(Level.FINE, buf.toString())
- }
-
def run(): Unit = {
val t = currentThread
val saved = Actor.selfs.get(t).asInstanceOf[Actor]
@@ -67,14 +50,12 @@ private[actors] class Reaction(a: Actor,
}
catch {
case ie: InterruptedException => {
- log(ie)
a.exitLinked()
}
case d: SuspendActorException => {
// do nothing (continuation is already saved)
}
case t: Throwable => {
- log(t)
a.exitLinked()
}
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 83d5202974..eda32bcd23 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -30,10 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
object Scheduler {
private var sched: IScheduler =
{
- val logger = Logger.getLogger("Scheduler")
- logger.addHandler(new FileHandler("sched.log"))
- logger.setLevel(Level.FINE)
- val s = new TickedScheduler
+ val s = new JDK5Scheduler(4, 32)
s.start()
s
}
diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala
index b28953ee7a..c835865002 100644
--- a/src/actors/scala/actors/TimerThread.scala
+++ b/src/actors/scala/actors/TimerThread.scala
@@ -13,7 +13,6 @@ package scala.actors
import java.lang.{InterruptedException, Runnable, Thread}
import java.util.Date
-import java.util.logging.{Logger, Level}
import scala.collection.mutable.PriorityQueue
@@ -57,22 +56,6 @@ object TimerThread extends AnyRef with Runnable {
}
}
- /**
- * @param t ...
- */
- def log(t: Throwable): unit = {
- Debug.info("logging "+t)
- val logger = Logger.getLogger("Scheduler")
- val buf = new StringBuffer
- buf.append("Exception caught by task:\n")
- buf.append(t.toString()+"\n")
- val trace = t.getStackTrace()
- for (val elem <- trace) {
- buf.append(elem.toString() + "\n")
- }
- logger.log(Level.FINE, buf.toString())
- }
-
override def run = {
try {
while(true) {
@@ -81,7 +64,7 @@ object TimerThread extends AnyRef with Runnable {
val sleepTime = dequeueLateAndGetSleepTime
if (lateList.isEmpty) wait(sleepTime)
} catch {
- case t: Throwable => { log(t); throw t }
+ case t: Throwable => { throw t }
}
}