summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-01-21 14:18:54 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-01-21 14:18:54 +0000
commit9e58ed4d399947b263b46bb250ca0b645ea55c80 (patch)
tree1c73ae7b43ccf560ab4a97b89cf8e42bc8c0c629
parente4282e0148efd03f06eb03d347d110c15ab28ab9 (diff)
downloadscala-9e58ed4d399947b263b46bb250ca0b645ea55c80.tar.gz
scala-9e58ed4d399947b263b46bb250ca0b645ea55c80.tar.bz2
scala-9e58ed4d399947b263b46bb250ca0b645ea55c80.zip
Changed sender stack and links to be simple Lists.
-rw-r--r--src/actors/scala/actors/Actor.scala291
-rw-r--r--src/actors/scala/actors/JDK5Scheduler.scala2
-rw-r--r--src/actors/scala/actors/Reaction.scala39
-rw-r--r--src/actors/scala/actors/Scheduler.scala13
4 files changed, 142 insertions, 203 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 3fce66b1a8..f07cdd92db 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -13,8 +13,6 @@ package scala.actors
import scala.collection.mutable.{HashSet, Queue}
import compat.Platform
-import java.util.Stack
-
/**
* The <code>Actor</code> object provides functions for the definition of
* actors, as well as all actor operations, such as
@@ -264,15 +262,14 @@ trait Actor extends OutputChannel[Any] {
private[actors] var waitingFor: Any => boolean = waitingForNone
private[actors] var isSuspended = false
- private val sessions = new Stack//[Channel[Any]]
-
- private val mailbox = new Queue[Pair[Any, Channel[Any]]]
+ private val mailbox = new MessageQueue
+ private var sessions: List[Channel[Any]] = Nil
private def send(msg: Any, session: Channel[Any]) = synchronized {
tick()
if (waitingFor(msg)) {
received = Some(msg)
- sessions push session
+ sessions = session :: sessions
waitingFor = waitingForNone
if (timeoutPending) {
@@ -285,90 +282,84 @@ trait Actor extends OutputChannel[Any] {
else
scheduleActor(null, msg)
} else {
- mailbox += Pair(msg, session)
+ mailbox.append(msg, session)
}
}
def receive[R](f: PartialFunction[Any, R]): R = {
assert(Actor.self == this, "receive from channel belonging to other actor")
+ // links
+ if (shouldExit) exit()
this.synchronized {
tick()
- mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => {
- f.isDefinedAt(p._1)
- }) match {
- case Some(Pair(msg, session)) => {
- received = Some(msg)
- sessions push session
- }
- case None => {
- waitingFor = f.isDefinedAt
- isSuspended = true
- suspendActor()
- }
+ val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
+ if (null eq qel) {
+ waitingFor = f.isDefinedAt
+ isSuspended = true
+ suspendActor()
+ } else {
+ received = Some(qel.msg)
+ sessions = qel.session :: sessions
}
waitingFor = waitingForNone
isSuspended = false
}
val result = f(received.get)
- sessions.pop
+ sessions = sessions.tail
result
}
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = {
assert(Actor.self == this, "receive from channel belonging to other actor")
+ // links
+ if (shouldExit) exit()
this.synchronized {
tick()
- mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => {
- f.isDefinedAt(p._1)
- }) match {
- case Some(Pair(msg, session)) => {
- received = Some(msg)
- sessions push session
- }
- case None => {
- waitingFor = f.isDefinedAt
- isSuspended = true
- received = None
- suspendActorFor(msec)
- Debug.info("received: "+received)
- if (received.isEmpty) {
- Debug.info("no message received after "+msec+" millis")
- if (f.isDefinedAt(TIMEOUT)) {
- Debug.info("executing TIMEOUT action")
- waitingFor = waitingForNone
- isSuspended = false
- val result = f(TIMEOUT)
- return result
- }
- else
- error("unhandled timeout")
+ val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
+ if (null eq qel) {
+ waitingFor = f.isDefinedAt
+ isSuspended = true
+ received = None
+ suspendActorFor(msec)
+ Debug.info("received: "+received)
+ if (received.isEmpty) {
+ Debug.info("no message received after "+msec+" millis")
+ if (f.isDefinedAt(TIMEOUT)) {
+ Debug.info("executing TIMEOUT action")
+ waitingFor = waitingForNone
+ isSuspended = false
+ val result = f(TIMEOUT)
+ return result
}
+ else
+ error("unhandled timeout")
}
+ } else {
+ received = Some(qel.msg)
+ sessions = qel.session :: sessions
}
waitingFor = waitingForNone
isSuspended = false
}
val result = f(received.get)
- sessions.pop
+ sessions = sessions.tail
result
}
def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self == this, "react on channel belonging to other actor")
+ // links
+ if (shouldExit) exit()
Scheduler.pendReaction
this.synchronized {
tick()
- mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => {
- f.isDefinedAt(p._1)
- }) match {
- case Some(Pair(msg, session)) => {
- sessions push session
- scheduleActor(f, msg)
- }
- case None => {
- waitingFor = f.isDefinedAt
- detachActor(f)
- }
+ val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
+ if (null eq qel) {
+ waitingFor = f.isDefinedAt
+ detachActor(f)
+ } else {
+ sessions = qel.session :: sessions
+ scheduleActor(f, qel.msg)
}
throw new SuspendActorException
}
@@ -376,22 +367,20 @@ trait Actor extends OutputChannel[Any] {
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self == this, "react on channel belonging to other actor")
+ // links
+ if (shouldExit) exit()
Scheduler.pendReaction
this.synchronized {
tick()
- mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => {
- f.isDefinedAt(p._1)
- }) match {
- case Some(Pair(msg, session)) => {
- sessions push session
- scheduleActor(f, msg)
- }
- case None => {
- waitingFor = f.isDefinedAt
- TimerThread.requestTimeout(this, f, msec)
- timeoutPending = true
- detachActor(f)
- }
+ val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
+ if (null eq qel) {
+ waitingFor = f.isDefinedAt
+ TimerThread.requestTimeout(this, f, msec)
+ timeoutPending = true
+ detachActor(f)
+ } else {
+ sessions = qel.session :: sessions
+ scheduleActor(f, qel.msg)
}
throw new SuspendActorException
}
@@ -444,12 +433,12 @@ trait Actor extends OutputChannel[Any] {
}
private[actors] def sender: Actor =
- if (sessions.empty) null
- else sessions.peek.asInstanceOf[Channel[Any]].receiver
+ if (sessions.isEmpty) null
+ else sessions.head.asInstanceOf[Channel[Any]].receiver
private[actors] def session: Channel[Any] =
- if (sessions.empty) null
- else sessions.peek.asInstanceOf[Channel[Any]]
+ if (sessions.isEmpty) null
+ else sessions.head.asInstanceOf[Channel[Any]]
private[actors] var continuation: PartialFunction[Any, Unit] = null
@@ -490,6 +479,8 @@ trait Actor extends OutputChannel[Any] {
case _: InterruptedException =>
}
}
+ // links: check if we should exit
+ if (shouldExit) exit()
}
def suspendActorFor(msec: long) {
@@ -515,6 +506,8 @@ trait Actor extends OutputChannel[Any] {
if (!fromExc) throw new ExitSuspendLoop
}
} catch { case _: ExitSuspendLoop => }
+ // links: check if we should exit
+ if (shouldExit) exit()
}
def resumeActor() {
@@ -530,7 +523,7 @@ trait Actor extends OutputChannel[Any] {
Scheduler start new Reaction(this)
}
- private val links = new HashSet[Actor]
+ private[actors] var links: List[Actor] = Nil
/**
* Links <code>self</code> to actor <code>to</code>.
@@ -539,7 +532,7 @@ trait Actor extends OutputChannel[Any] {
* @return ...
*/
def link(to: Actor): Actor = {
- links += to
+ links = to :: links
to.linkTo(this)
to
}
@@ -556,23 +549,26 @@ trait Actor extends OutputChannel[Any] {
actor
}
- private[actors] def linkTo(to: Actor): Unit =
- links += to
+ private[actors] def linkTo(to: Actor) {
+ links = to :: links
+ }
/**
Unlinks <code>self</code> from actor <code>from</code>.
*/
- def unlink(from: Actor): Unit = {
- links -= from
+ def unlink(from: Actor) {
+ links = links.remove(from.==)
from.unlinkFrom(this)
}
- private[actors] def unlinkFrom(from: Actor): Unit =
- links -= from
+ private[actors] def unlinkFrom(from: Actor) {
+ links = links.remove(from.==)
+ }
var trapExit = false
-
- private[actors] var exitReason: String = ""
+ private[actors] var exitReason: String = "normal"
+ private[actors] var exiting = false
+ private[actors] var shouldExit = false
/**
* <p>
@@ -593,49 +589,51 @@ trait Actor extends OutputChannel[Any] {
*/
def exit(reason: String): Nothing = {
kill()
- exitReason = reason
- //currentThread.interrupt()
+ // links
+ if (!links.isEmpty) {
+ exitReason = reason
+ exitLinked()
+ }
throw new ExitActorException
}
- private[actors] def exit(from: Actor, reason: String): Unit = {
- if (from == this) {
- exit(reason)
- }
- else {
- if (trapExit)
- this ! Exit(from, reason)
- else if (!reason.equals("normal"))
- exit(reason)
+ def exit(): Nothing = {
+ kill()
+ // links
+ if (!links.isEmpty) {
+ exitLinked()
}
+ throw new ExitActorException
}
- private[actors] def exitLinked(): Unit =
- exitLinked(exitReason, new HashSet[Actor])
-
- private[actors] def exitLinked(reason: String): Unit =
- exitLinked(reason, new HashSet[Actor])
+ // Assume !links.isEmpty
+ private[actors] def exitLinked() {
+ exiting = true
+ // remove this from links
+ links = links.remove(this.==)
+ // exit linked processes
+ links.foreach((linked: Actor) => {
+ unlink(linked)
+ if (!linked.exiting)
+ linked.exit(this, exitReason)
+ })
+ }
- private[actors] def exitLinked(reason: String,
- exitMarks: HashSet[Actor]): Unit = {
- if (exitMarks contains this) {
- // we are marked, do nothing
- }
- else {
- exitMarks += this // mark this as exiting
- // exit linked processes
- val iter = links.elements
- while (iter.hasNext) {
- val linked = iter.next
- unlink(linked)
- linked.exit(this, reason)
+ // Assume !this.exiting
+ private[actors] def exit(from: Actor, reason: String) {
+ if (trapExit)
+ this ! Exit(from, reason)
+ else if (!reason.equals("normal"))
+ this.synchronized {
+ shouldExit = true
+ exitReason = reason
+ if (isSuspended)
+ resumeActor()
+ else if (isDetached)
+ scheduleActor(null, null)
}
- exitMarks -= this
-
- // unregister in scheduler
- Scheduler terminated this
- }
}
+
}
@@ -650,62 +648,3 @@ trait Actor extends OutputChannel[Any] {
*/
case class Exit(from: Actor, reason: String)
-
-/**
- * This class is used by our efficient message queue
- * implementation.
- */
-private[actors] abstract class MessageQueueResult[Msg] {
- def msg: Msg
- def sender: Actor
-}
-
-/**
- * The class <code>MessageQueue</code> provides an efficient
- * implementation of a message queue specialized for this actor
- * library. Classes in this package are supposed to be the only
- * clients of this class.
- *
- * @author Martin Odersky, Philipp Haller
- */
-private[actors] class MessageQueue[Msg] extends MessageQueueResult[Msg] {
- var msg: Msg = _
- var sender: Actor = _
- private var next: MessageQueue[Msg] = this
-
- def append(msg: Msg, sender: Actor) = {
- val q = new MessageQueue[Msg]
- q.msg = msg
- q.sender = sender
- q.next = next
- next = q
- }
-
- def extractFirst(p: Msg => boolean): MessageQueueResult[Msg] = {
- var q = this
- var qnext = q.next
- while (qnext != this) {
- if (p(qnext.msg)) {
- q.next = qnext.next
- return qnext
- }
- q = qnext
- qnext = qnext.next
- }
- null
- }
-
- def dequeueFirst(p: MessageQueueResult[Msg] => boolean): MessageQueueResult[Msg] = {
- var q = this
- var qnext = q.next
- while (qnext != this) {
- if (p(qnext)) {
- q.next = qnext.next
- return qnext
- }
- q = qnext
- qnext = qnext.next
- }
- null
- }
-}
diff --git a/src/actors/scala/actors/JDK5Scheduler.scala b/src/actors/scala/actors/JDK5Scheduler.scala
index f3496585fb..8361fc43fb 100644
--- a/src/actors/scala/actors/JDK5Scheduler.scala
+++ b/src/actors/scala/actors/JDK5Scheduler.scala
@@ -115,7 +115,7 @@ class JDK5Scheduler(initCoreSize: int, maxSize: int) extends Thread with ISchedu
executor.shutdown()
// terminate timer thread
TimerThread.t.interrupt()
- Console.println("threads used: "+coreSize)
+ //Debug.info("threads used: "+coreSize)
throw new QuitException
}
}
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 9bb6c849f5..e8d9ee7c72 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -20,7 +20,7 @@ class ExitActorException extends Throwable
* an instance of an <code>Actor</code> with a
* <code>java.lang.Runnable</code>.
*
- * @version 0.9.0
+ * @version 0.9.2
* @author Philipp Haller
*/
private[actors] class Reaction(a: Actor,
@@ -39,25 +39,29 @@ private[actors] class Reaction(a: Actor,
a.isDetached = false
try {
try {
- if (f == null)
- a.act()
- else
- f(msg)
- a.exit("normal")
+ // links
+ if (a.shouldExit)
+ a.exit()
+ else {
+ if (f == null)
+ a.act()
+ else
+ f(msg)
+ a.exit()
+ }
} catch {
case _: ExitActorException =>
- throw new InterruptedException
}
}
catch {
- case ie: InterruptedException => {
- a.exitLinked()
- }
- case d: SuspendActorException => {
+ case _: SuspendActorException => {
// do nothing (continuation is already saved)
}
- case t: Throwable => {
- a.exitLinked()
+ case _: Throwable => {
+ // links
+ if (!a.links.isEmpty) {
+ a.exitLinked()
+ }
}
}
/*finally {
@@ -65,13 +69,4 @@ private[actors] class Reaction(a: Actor,
}*/
}
- private var runnable = false
-
- def isRunnable = synchronized {
- runnable
- }
-
- def setRunnable(on: boolean) = synchronized {
- runnable = on
- }
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index f61d44a5a7..eb5e92ef88 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -43,10 +43,15 @@ object Scheduler {
s = if (olderThanJDK5)
new TickedScheduler
else {
- val corePoolSize =
- Integer.parseInt(java.lang.System.getProperty("actors.corePoolSize"))
- val maxPoolSize =
- Integer.parseInt(java.lang.System.getProperty("actors.maxPoolSize"))
+ var corePoolSize = 4
+ var maxPoolSize = 16
+ val prop = java.lang.System.getProperty("actors.corePoolSize")
+ if (null ne prop) {
+ corePoolSize =
+ Integer.parseInt(java.lang.System.getProperty("actors.corePoolSize"))
+ maxPoolSize =
+ Integer.parseInt(java.lang.System.getProperty("actors.maxPoolSize"))
+ }
new JDK5Scheduler(corePoolSize, maxPoolSize)
}
s.start()