summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcremet <cremet@epfl.ch>2003-08-28 09:18:10 +0000
committercremet <cremet@epfl.ch>2003-08-28 09:18:10 +0000
commit7a9bbd21f018981ce132dd7d2d4ab1ca33bd4df2 (patch)
treeb4c3586f512dee71b54152292af0f7d5f2d7c31b
parentf10b65baef4e685cc70a5cf91cfe5cd4a7cefa20 (diff)
downloadscala-7a9bbd21f018981ce132dd7d2d4ab1ca33bd4df2.tar.gz
scala-7a9bbd21f018981ce132dd7d2d4ab1ca33bd4df2.tar.bz2
scala-7a9bbd21f018981ce132dd7d2d4ab1ca33bd4df2.zip
- Made the auction example work (it required to...
- Made the auction example work (it required to fix the mailbox implementation).
-rw-r--r--config/list/library.lst4
-rw-r--r--sources/examples/auction.scala111
-rw-r--r--sources/scala/concurrent/Actor.scala8
-rw-r--r--sources/scala/concurrent/MailBox.scala178
-rw-r--r--sources/scala/concurrent/TIMEOUT.scala2
5 files changed, 234 insertions, 69 deletions
diff --git a/config/list/library.lst b/config/list/library.lst
index 75b52c2bb0..aecd89e168 100644
--- a/config/list/library.lst
+++ b/config/list/library.lst
@@ -102,10 +102,10 @@ collection/immutable/Queue.scala
collection/immutable/Set.scala
collection/immutable/Stack.scala
-# concurrent/Actor.scala
+concurrent/Actor.scala
+concurrent/MailBox.scala
concurrent/Channel.scala
concurrent/Lock.scala
-# concurrent/MailBox.scala
concurrent/SyncChannel.scala
concurrent/SyncVar.scala
concurrent/TIMEOUT.scala
diff --git a/sources/examples/auction.scala b/sources/examples/auction.scala
index 102892b4c6..8e32831942 100644
--- a/sources/examples/auction.scala
+++ b/sources/examples/auction.scala
@@ -5,54 +5,125 @@ import scala.concurrent._;
trait AuctionMessage;
case class
- Offer(bid: Int, client: Actor), // make a bid
+ Offer(bid: int, client: Actor), // make a bid
Inquire(client: Actor) extends AuctionMessage; // inquire status
trait AuctionReply;
case class
- Status(asked: Int, expiration: Date), // asked sum, expiration date
- BestOffer, // yours is the best offer
- BeatenOffer(maxBid: Int), // offer beaten by maxBid
+ Status(asked: int, expiration: Date), // asked sum, expiration date
+ BestOffer(), // yours is the best offer
+ BeatenOffer(maxBid: int), // offer beaten by maxBid
AuctionConcluded(seller: Actor, client: Actor), // auction concluded
- AuctionFailed, // failed with no bids
- AuctionOver extends AuctionReply; // bidding is closed
+ AuctionFailed(), // failed with no bids
+ AuctionOver() extends AuctionReply; // bidding is closed
-class Auction(seller: Actor, minBid: Int, closing: Date) extends Actor {
+class Auction(seller: Actor, minBid: int, closing: Date) extends Actor {
- val timeToShutdown = 36000000; // msec
+ val timeToShutdown = 3600000; // msec
val bidIncrement = 10;
+
override def run() = {
var maxBid = minBid - bidIncrement;
- var maxBidder: Actor = _;
+ var maxBidder: Actor = null;
var running = true;
+
while (running) {
- receiveWithin ((closing.getTime() - new Date().getTime())) {
+ receiveWithin (closing.getTime() - new Date().getTime()) {
+
case Offer(bid, client) =>
if (bid >= maxBid + bidIncrement) {
- if (maxBid >= minBid) maxBidder send BeatenOffer(bid);
+ if (maxBid >= minBid)
+ maxBidder send BeatenOffer(bid);
maxBid = bid;
maxBidder = client;
- client send BestOffer;
+ client send BestOffer()
} else {
- client send BeatenOffer(maxBid);
- }
+ client send BeatenOffer(maxBid)
+ }
case Inquire(client) =>
- client send Status(maxBid, closing);
+ client send Status(maxBid, closing)
- case TIMEOUT =>
+ case TIMEOUT() =>
if (maxBid >= minBid) {
val reply = AuctionConcluded(seller, maxBidder);
maxBidder send reply;
- seller send reply;
+ seller send reply
} else {
- seller send AuctionFailed;
+ seller send AuctionFailed()
}
receiveWithin(timeToShutdown) {
- case Offer(_, client) => client send AuctionOver
- case TIMEOUT => running = false;
+ case Offer(_, client) => client send AuctionOver()
+ case TIMEOUT() => running = false
}
+
}
}
}
}
+
+////////////////////////// TEST /////////////////////////////////
+
+object testAuction {
+
+ val random = new java.util.Random();
+
+ val minBid = 100;
+ val closing = new Date(new Date().getTime() + 60000);
+
+ val seller = new Actor {
+ override def run() = {}
+ }
+ val auction = new Auction(seller, minBid, closing);
+
+ def client(i: int, increment: int, top: int) = new Actor {
+ val name = "Client " + i;
+ def log(msg: String) = System.out.println(name + ": " + msg);
+ var running = true;
+ var max: int = _;
+ var current: int = 0;
+ override def run() = {
+ log("started");
+ auction send Inquire(this);
+ receive {
+ case Status(maxBid, _) => {
+ log("status(" + maxBid + ")");
+ max = maxBid
+ }
+ }
+ while (running) {
+ if (max >= top)
+ log("too high for me")
+ else if (current < max) {
+ current = max + increment;
+ Thread.sleep(1 + random.nextInt(1000));
+ auction send Offer(current, this);
+ }
+ receive {
+ case BestOffer() => {
+ log("bestOffer(" + current + ")");
+ }
+ case BeatenOffer(maxBid) => {
+ log("beatenOffer(" + maxBid + ")");
+ max = maxBid;
+ }
+ case AuctionConcluded(seller, maxBidder) => {
+ log("auctionConcluded");
+ }
+ case AuctionOver() => {
+ running = false;
+ log("auctionOver");
+ }
+ }
+ }
+ }
+ }
+
+ def main(args: Array[String]) = {
+ seller.start();
+ auction.start();
+ client(1, 20, 200).start();
+ client(2, 10, 300).start();
+ }
+
+}
diff --git a/sources/scala/concurrent/Actor.scala b/sources/scala/concurrent/Actor.scala
index 9c1c5c2a57..dd4cee26d2 100644
--- a/sources/scala/concurrent/Actor.scala
+++ b/sources/scala/concurrent/Actor.scala
@@ -1,16 +1,18 @@
package scala.concurrent;
-abstract class Actor extends Thread {
+abstract class Actor extends Thread() {
type Message = AnyRef;
private val mb = new MailBox;
- def send(msg: Message): Unit =
+ def send(msg: Message): unit =
mb.send(msg);
+
def receive[a](f: PartialFunction[Message, a]): a =
mb.receive(f);
- def receiveWithin[a](msec: Long)(f: PartialFunction[Message, a]): a =
+
+ def receiveWithin[a](msec: long)(f: PartialFunction[Message, a]): a =
mb.receiveWithin(msec)(f);
}
diff --git a/sources/scala/concurrent/MailBox.scala b/sources/scala/concurrent/MailBox.scala
index e83829acbc..4bec0ee461 100644
--- a/sources/scala/concurrent/MailBox.scala
+++ b/sources/scala/concurrent/MailBox.scala
@@ -1,68 +1,160 @@
package scala.concurrent;
-import scala.collection.mutable.LinkedList;
-
-class MailBox with Monitor {
+//class MailBox with Monitor with LinkedListQueueCreator {
+class MailBox with Monitor with ListQueueCreator {
type Message = AnyRef;
- private abstract class Receiver extends Monitor {
- type t;
- val receiver: PartialFunction[Message, t];
- var msg: Message = _;
- def receive(): t = synchronized {
- if (msg != null) wait();
+ private abstract class PreReceiver with Monitor {
+ var msg: Message = null;
+ def isDefinedAt(msg: Message): boolean;
+ }
+
+ private class Receiver[a](receiver: PartialFunction[Message, a]) extends PreReceiver {
+
+ def isDefinedAt(msg: Message) = receiver.isDefinedAt(msg);
+
+ def receive(): a = synchronized {
+ if (msg == null) wait();
receiver(msg)
}
- def receiveWithin(msec: Long): t = synchronized {
- if (msg != null) wait(msec);
- receiver(if (msg != null) msg else TIMEOUT)
+
+ def receiveWithin(msec: long): a = synchronized {
+ if (msg == null) wait(msec);
+ receiver(if (msg != null) msg else TIMEOUT())
}
}
- // !!! is this new correct ?
- private val sent = new LinkedList[Message](null, null);
- private var lastSent = sent;
- // !!! is this new correct ?
- private var receivers = new LinkedList[Receiver](null, null);
- private var lastReceiver = receivers;
-
- def send(msg: Message): Unit = synchronized {
- var rs = receivers, rs1 = rs.next;
- // !!! does not compile
- // !!! while (rs1 != null && !rs1.elem.receiver.isDefinedAt(msg)) {
- // !!! rs = rs1; rs1 = rs1.next;
- // !!! }
- if (rs1 != null) {
- rs.next = rs1.next; rs1.elem.msg = msg; rs1.elem.notify();
- } else {
- // !!! does not compile
- // !!! lastSent = lastSent.append(msg)
+ private val messageQueue = queueCreate[Message];
+ private val receiverQueue = queueCreate[PreReceiver];
+
+ /** Unconsumed messages. */
+ private var sent = messageQueue.make;
+
+ /** Pending receivers. */
+ private var receivers = receiverQueue.make;
+
+ /**
+ * Check whether the receiver can be applied to an unconsumed message.
+ * If yes, the message is extracted and associated with the receiver.
+ * Otherwise the receiver is appended to the list of pending receivers.
+ */
+ private def scanSentMsgs[a](receiver: Receiver[a]): unit = synchronized {
+ messageQueue.extractFirst(sent, msg => receiver.isDefinedAt(msg)) match {
+ case None => receivers = receiverQueue.append(receivers, receiver)
+ case Some(Pair(msg, withoutMsg)) => {
+ sent = withoutMsg;
+ receiver.msg = msg
+ }
}
}
- def scanSentMsgs[a](r: Receiver { type t = a }): Unit = synchronized {
- var ss = sent, ss1 = ss.next;
- while (ss1 != null && !r.receiver.isDefinedAt(ss1.elem)) {
- ss = ss1; ss1 = ss1.next
- }
- if (ss1 != null) {
- ss.next = ss1.next; r.msg = ss1.elem;
- } else {
- // !!! does not compile
- // !!! lastReceiver = lastReceiver append r;
+ /**
+ * First check whether a pending receiver is applicable to the sent
+ * message. If yes, the receiver is notified. Otherwise the message
+ * is appended to the linked list of sent messages.
+ */
+ def send(msg: Message): unit = synchronized {
+ receiverQueue.extractFirst(receivers, r => r.isDefinedAt(msg)) match {
+ case None => sent = messageQueue.append(sent, msg)
+ case Some(Pair(receiver, withoutReceiver)) => {
+ receivers = withoutReceiver;
+ receiver.msg = msg;
+ receiver synchronized { receiver.notify() };
+ }
}
}
+ /**
+ * Block until there is a message in the mailbox for which the processor
+ * <code>f</code> is defined.
+ */
def receive[a](f: PartialFunction[Message, a]): a = {
- val r = new Receiver { type t = a; val receiver = f }
+ val r = new Receiver(f);
scanSentMsgs(r);
r.receive()
}
- def receiveWithin[a](msec: Long)(f: PartialFunction[Message, a]): a = {
- val r = new Receiver { type t = a; val receiver = f }
+ /**
+ * Block until there is a message in the mailbox for which the processor
+ * <code>f</code> is defined or the timeout is over.
+ */
+ def receiveWithin[a](msec: long)(f: PartialFunction[Message, a]): a = {
+ val r = new Receiver(f);
scanSentMsgs(r);
r.receiveWithin(msec)
}
+
+}
+
+/////////////////////////////////////////////////////////////////
+
+/**
+* Module for dealing with queues.
+*/
+trait QueueModule[a] {
+ /** Type of queues. */
+ type t;
+ /** Create an empty queue. */
+ def make: t;
+ /** Append an element to a queue. */
+ def append(l: t, x: a): t;
+ /** Extract an element satisfying a predicate from a queue. */
+ def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]];
+}
+
+/** Inefficient but simple queue module creator. */
+trait ListQueueCreator {
+ def queueCreate[a]: QueueModule[a] = new QueueModule[a] {
+ type t = List[a];
+ def make: t = Nil;
+ def append(l: t, x: a): t = l ::: List(x);
+ def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]] =
+ l match {
+ case Nil => None
+ case head :: tail =>
+ if (p(head))
+ Some(Pair(head, tail))
+ else
+ extractFirst(tail, p) match {
+ case None => None
+ case Some(Pair(x, without_x)) => Some(Pair(x, head :: without_x))
+ }
+ }
+ }
+}
+
+/** Efficient queue module creator based on linked lists. */
+trait LinkedListQueueCreator {
+ import scala.collection.mutable.LinkedList;
+ def queueCreate[a <: AnyRef]: QueueModule[a] = new QueueModule[a] {
+ type t = Pair[LinkedList[a], LinkedList[a]]; // fst = the list, snd = last elem
+ def make: t = {
+ val l = new LinkedList[a](null, null);
+ Pair(l, l)
+ }
+ def append(l: t, x: a): t = {
+ val atTail = new LinkedList(x, null);
+ l._2 append atTail;
+ Pair(l._1, atTail)
+ }
+ def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]] = {
+ var xs = l._1;
+ var xs1 = xs.next;
+ while (xs1 != null && !p(xs1.elem)) {
+ xs = xs1;
+ xs1 = xs1.next;
+ }
+ if (xs1 != null) {
+ xs.next = xs1.next;
+ if (xs.next == null)
+ Some(Pair(xs1.elem, Pair(l._1, xs)))
+ else
+ Some(Pair(xs1.elem, l))
+ }
+ else
+ None
+ }
+ }
}
+
diff --git a/sources/scala/concurrent/TIMEOUT.scala b/sources/scala/concurrent/TIMEOUT.scala
index 189ae0632f..5c26cf8611 100644
--- a/sources/scala/concurrent/TIMEOUT.scala
+++ b/sources/scala/concurrent/TIMEOUT.scala
@@ -1,3 +1,3 @@
package scala.concurrent;
-case class TIMEOUT;
+case class TIMEOUT();