summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2010-01-14 23:55:41 +0000
committerPhilipp Haller <hallerp@gmail.com>2010-01-14 23:55:41 +0000
commitdf4d259938d2a522ba0726e86ded78d2a09982ce (patch)
tree0515918ac587200c46d3a7cdd09bebd651b1a8f8 /src
parenteaa949005f8bf28d563d347755c07e96c2866e87 (diff)
downloadscala-df4d259938d2a522ba0726e86ded78d2a09982ce.tar.gz
scala-df4d259938d2a522ba0726e86ded78d2a09982ce.tar.bz2
scala-df4d259938d2a522ba0726e86ded78d2a09982ce.zip
Some optimizations to actor message queues and ...
Some optimizations to actor message queues and event handling.
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/Actor.scala26
-rw-r--r--src/actors/scala/actors/MessageQueue.scala65
-rw-r--r--src/actors/scala/actors/Reaction.scala2
-rw-r--r--src/actors/scala/actors/Reactor.scala58
-rw-r--r--src/actors/scala/actors/ReactorTask.scala2
-rw-r--r--src/actors/scala/actors/ReplyReactor.scala16
6 files changed, 109 insertions, 60 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 907389b9f0..2a526346cc 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -400,7 +400,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
protected[actors] override def scheduler: IScheduler = Scheduler
- private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: Any => Boolean) =
+ private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) =
if (isSuspended) {
() => synchronized {
mailbox.append(msg, replyTo)
@@ -411,7 +411,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
private[actors] override def makeReaction(fun: () => Unit): Runnable =
new ActorTask(this, fun)
- private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) {
+ private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
synchronized {
if (!onTimeout.isEmpty) {
onTimeout.get.cancel()
@@ -419,7 +419,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
}
}
senders = List(item._2)
- super.resumeReceiver(item, onSameThread)
+ super.resumeReceiver(item, handler, onSameThread)
}
/**
@@ -451,7 +451,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
drainSendBuffer(mailbox)
// keep going
} else {
- waitingFor = f.isDefinedAt
+ waitingFor = f
isSuspended = true
scheduler.managedBlock(blocker)
drainSendBuffer(mailbox)
@@ -517,7 +517,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
done = true
receiveTimeout
} else {
- waitingFor = f.isDefinedAt
+ waitingFor = f
received = None
isSuspended = true
val thisActor = this
@@ -565,8 +565,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
if (shouldExit) exit() // links
drainSendBuffer(mailbox)
}
- continuation = f
- searchMailbox(mailbox, f.isDefinedAt, false)
+ searchMailbox(mailbox, f, false)
throw Actor.suspendException
}
@@ -616,8 +615,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
done = true
receiveTimeout
} else {
- waitingFor = f.isDefinedAt
- continuation = f
+ waitingFor = f
val thisActor = this
onTimeout = Some(new TimerTask {
def run() { thisActor.send(TIMEOUT, thisActor) }
@@ -647,14 +645,12 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
// guarded by lock of this
// never throws SuspendActorException
- private[actors] override def scheduleActor(f: Any =>? Unit, msg: Any) =
- if ((f eq null) && (continuation eq null)) {
+ private[actors] override def scheduleActor(f: Any =>? Any, msg: Any) =
+ if (f eq null) {
// do nothing (timeout is handled instead)
}
else {
- val task = new Reaction(this,
- if (f eq null) continuation else f,
- msg)
+ val task = new Reaction(this, f, msg)
scheduler executeFromActor task
}
@@ -825,7 +821,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
if (isSuspended)
resumeActor()
else if (waitingFor ne waitingForNone) {
- scheduleActor(continuation, null)
+ scheduleActor(waitingFor, null)
/* Here we should not throw a SuspendActorException,
since the current method is called from an actor that
is in the process of exiting.
diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala
index fd43e36fff..000ff1bfc6 100644
--- a/src/actors/scala/actors/MessageQueue.scala
+++ b/src/actors/scala/actors/MessageQueue.scala
@@ -62,6 +62,15 @@ private[actors] class MQueue(protected val label: String) {
last = el
}
+ def append(el: MQueueElement) {
+ changeSize(1) // size always increases by 1
+
+ if (isEmpty) first = el
+ else last.next = el
+
+ last = el
+ }
+
def foreach(f: (Any, OutputChannel[Any]) => Unit) {
var curr = first
while (curr != null) {
@@ -70,6 +79,25 @@ private[actors] class MQueue(protected val label: String) {
}
}
+ def foreachAppend(target: MQueue) {
+ var curr = first
+ while (curr != null) {
+ target.append(curr)
+ curr = curr.next
+ }
+ }
+
+ def foreachDequeue(target: MQueue) {
+ var curr = first
+ while (curr != null) {
+ target.append(curr)
+ curr = curr.next
+ }
+ first = null
+ last = null
+ _size = 0
+ }
+
def foldLeft[B](z: B)(f: (B, Any) => B): B = {
var acc = z
var curr = first
@@ -108,6 +136,43 @@ private[actors] class MQueue(protected val label: String) {
def extractFirst(p: (Any, OutputChannel[Any]) => Boolean): MQueueElement =
removeInternal(0)(p) orNull
+ def extractFirst(pf: PartialFunction[Any, Any]): MQueueElement = {
+ if (isEmpty) // early return
+ return null
+
+ // special handling if returning the head
+ if (pf.isDefinedAt(first.msg)) {
+ val res = first
+ first = first.next
+ if (res eq last)
+ last = null
+
+ changeSize(-1)
+ res
+ }
+ else {
+ var curr = first.next // init to element #2
+ var prev = first
+
+ while (curr != null) {
+ if (pf.isDefinedAt(curr.msg)) {
+ prev.next = curr.next
+ if (curr eq last)
+ last = prev
+
+ changeSize(-1)
+ return curr // early return
+ }
+ else {
+ prev = curr
+ curr = curr.next
+ }
+ }
+ // not found
+ null
+ }
+ }
+
private def removeInternal(n: Int)(p: (Any, OutputChannel[Any]) => Boolean): Option[MQueueElement] = {
var pos = 0
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index a4736f9489..176906ebe0 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -26,7 +26,7 @@ private[actors] class KillActorException extends Throwable with ControlException
* @deprecated("this class is going to be removed in a future release")
* @author Philipp Haller
*/
-class Reaction(a: Actor, f: Any =>? Unit, msg: Any) extends ActorTask(a, () => {
+class Reaction(a: Actor, f: Any =>? Any, msg: Any) extends ActorTask(a, () => {
if (f == null)
a.act()
else
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 8545b92d1e..c982142a91 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -35,25 +35,24 @@ trait Reactor extends OutputChannel[Any] {
private[actors] val mailbox = new MQueue("Reactor")
// guarded by this
- private[actors] val sendBuffer = new Queue[(Any, OutputChannel[Any])]
+ private[actors] val sendBuffer = new MQueue("SendBuffer")
- /* If the actor waits in a react, continuation holds the
- * message handler that react was called with.
- */
- @volatile
- private[actors] var continuation: Any =>? Unit = null
-
- /* Whenever this Actor executes on some thread, waitingFor is
+ /* Whenever this actor executes on some thread, waitingFor is
* guaranteed to be equal to waitingForNone.
*
* In other words, whenever waitingFor is not equal to
- * waitingForNone, this Actor is guaranteed not to execute on some
+ * waitingForNone, this actor is guaranteed not to execute on some
* thread.
*/
- private[actors] val waitingForNone = (m: Any) => false
+ private[actors] val waitingForNone = new PartialFunction[Any, Unit] {
+ def isDefinedAt(x: Any) = false
+ def apply(x: Any) {}
+ }
- // guarded by lock of this
- private[actors] var waitingFor: Any => Boolean = waitingForNone
+ /* If the actor waits in a react, waitingFor holds the
+ * message handler that react was called with.
+ */
+ private[actors] var waitingFor: PartialFunction[Any, Any] = waitingForNone // guarded by lock of this
/**
* The behavior of an actor is specified by implementing this
@@ -84,14 +83,14 @@ trait Reactor extends OutputChannel[Any] {
waitingFor = waitingForNone
startSearch(msg, replyTo, savedWaitingFor)
} else {
- sendBuffer.enqueue((msg, replyTo))
+ sendBuffer.append(msg, replyTo)
() => { /* do nothing */ }
}
}
todo()
}
- private[actors] def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: Any => Boolean) =
+ private[actors] def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) =
() => scheduler execute (makeReaction(() => {
val startMbox = new MQueue("Start")
synchronized { startMbox.append(msg, replyTo) }
@@ -101,15 +100,11 @@ trait Reactor extends OutputChannel[Any] {
private[actors] def makeReaction(fun: () => Unit): Runnable =
new ReactorTask(this, fun)
- /* Note that this method is called without holding a lock.
- * Therefore, to read an up-to-date continuation, it must be @volatile.
- */
- private[actors] def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) {
- // assert continuation != null
+ private[actors] def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
if (onSameThread)
- continuation(item._1)
+ handler(item._1)
else {
- scheduleActor(continuation, item._1)
+ scheduleActor(handler, item._1)
/* Here, we throw a SuspendActorException to avoid
terminating this actor when the current ReactorTask
is finished.
@@ -133,22 +128,18 @@ trait Reactor extends OutputChannel[Any] {
// guarded by this
private[actors] def drainSendBuffer(mbox: MQueue) {
- while (!sendBuffer.isEmpty) {
- val item = sendBuffer.dequeue()
- mbox.append(item._1, item._2)
- }
+ sendBuffer.foreachDequeue(mbox)
}
- // assume continuation != null
private[actors] def searchMailbox(startMbox: MQueue,
- handlesMessage: Any => Boolean,
+ handler: PartialFunction[Any, Any],
resumeOnSameThread: Boolean) {
var tmpMbox = startMbox
var done = false
while (!done) {
- val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => handlesMessage(msg))
+ val qel = tmpMbox.extractFirst(handler)
if (tmpMbox ne mailbox)
- tmpMbox.foreach((m, s) => mailbox.append(m, s))
+ tmpMbox.foreachAppend(mailbox)
if (null eq qel) {
synchronized {
// in mean time new stuff might have arrived
@@ -157,7 +148,7 @@ trait Reactor extends OutputChannel[Any] {
drainSendBuffer(tmpMbox)
// keep going
} else {
- waitingFor = handlesMessage
+ waitingFor = handler
/* Here, we throw a SuspendActorException to avoid
terminating this actor when the current ReactorTask
is finished.
@@ -169,7 +160,7 @@ trait Reactor extends OutputChannel[Any] {
}
}
} else {
- resumeReceiver((qel.msg, qel.session), resumeOnSameThread)
+ resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
done = true
}
}
@@ -178,8 +169,7 @@ trait Reactor extends OutputChannel[Any] {
protected[actors] def react(f: Any =>? Unit): Nothing = {
assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
synchronized { drainSendBuffer(mailbox) }
- continuation = f
- searchMailbox(mailbox, f.isDefinedAt, false)
+ searchMailbox(mailbox, f, false)
throw Actor.suspendException
}
@@ -190,7 +180,7 @@ trait Reactor extends OutputChannel[Any] {
*
* never throws SuspendActorException
*/
- private[actors] def scheduleActor(handler: Any =>? Unit, msg: Any) = {
+ private[actors] def scheduleActor(handler: Any =>? Any, msg: Any) = {
val fun = () => handler(msg)
val task = new ReactorTask(this, fun)
scheduler executeFromActor task
diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala
index f6ec67e94c..37aec0f8ec 100644
--- a/src/actors/scala/actors/ReactorTask.scala
+++ b/src/actors/scala/actors/ReactorTask.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.Callable
*
* @author Philipp Haller
*/
-private[actors] class ReactorTask[T >: Null <: Reactor](var reactor: T, var fun: () => Unit)
+private[actors] class ReactorTask[T >: Null <: Reactor](var reactor: T, var fun: () => Any)
extends Callable[Unit] with Runnable {
def run() {
diff --git a/src/actors/scala/actors/ReplyReactor.scala b/src/actors/scala/actors/ReplyReactor.scala
index 64860f4d38..d5936ae662 100644
--- a/src/actors/scala/actors/ReplyReactor.scala
+++ b/src/actors/scala/actors/ReplyReactor.scala
@@ -52,28 +52,26 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
send(msg, Actor.sender)
}
- private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) {
+ private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), handler: PartialFunction[Any, Any], onSameThread: Boolean) {
senders = List(item._2)
- // assert continuation != null
if (onSameThread)
- continuation(item._1)
+ handler(item._1)
else {
- scheduleActor(continuation, item._1)
+ scheduleActor(handler, item._1)
// see Reactor.resumeReceiver
throw Actor.suspendException
}
}
- // assume continuation != null
private[actors] override def searchMailbox(startMbox: MQueue,
- handlesMessage: Any => Boolean,
+ handler: PartialFunction[Any, Any],
resumeOnSameThread: Boolean) {
var tmpMbox = startMbox
var done = false
while (!done) {
val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
senders = List(replyTo)
- handlesMessage(msg)
+ handler.isDefinedAt(msg)
})
if (tmpMbox ne mailbox)
tmpMbox.foreach((m, s) => mailbox.append(m, s))
@@ -85,13 +83,13 @@ trait ReplyReactor extends Reactor with ReplyableReactor {
drainSendBuffer(tmpMbox)
// keep going
} else {
- waitingFor = handlesMessage
+ waitingFor = handler
// see Reactor.searchMailbox
throw Actor.suspendException
}
}
} else {
- resumeReceiver((qel.msg, qel.session), resumeOnSameThread)
+ resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
done = true
}
}