diff options
Diffstat (limited to 'sources')
-rw-r--r-- | sources/examples/auction.scala | 111 | ||||
-rw-r--r-- | sources/scala/concurrent/Actor.scala | 8 | ||||
-rw-r--r-- | sources/scala/concurrent/MailBox.scala | 178 | ||||
-rw-r--r-- | sources/scala/concurrent/TIMEOUT.scala | 2 |
4 files changed, 232 insertions, 67 deletions
diff --git a/sources/examples/auction.scala b/sources/examples/auction.scala index 102892b4c6..8e32831942 100644 --- a/sources/examples/auction.scala +++ b/sources/examples/auction.scala @@ -5,54 +5,125 @@ import scala.concurrent._; trait AuctionMessage; case class - Offer(bid: Int, client: Actor), // make a bid + Offer(bid: int, client: Actor), // make a bid Inquire(client: Actor) extends AuctionMessage; // inquire status trait AuctionReply; case class - Status(asked: Int, expiration: Date), // asked sum, expiration date - BestOffer, // yours is the best offer - BeatenOffer(maxBid: Int), // offer beaten by maxBid + Status(asked: int, expiration: Date), // asked sum, expiration date + BestOffer(), // yours is the best offer + BeatenOffer(maxBid: int), // offer beaten by maxBid AuctionConcluded(seller: Actor, client: Actor), // auction concluded - AuctionFailed, // failed with no bids - AuctionOver extends AuctionReply; // bidding is closed + AuctionFailed(), // failed with no bids + AuctionOver() extends AuctionReply; // bidding is closed -class Auction(seller: Actor, minBid: Int, closing: Date) extends Actor { +class Auction(seller: Actor, minBid: int, closing: Date) extends Actor { - val timeToShutdown = 36000000; // msec + val timeToShutdown = 3600000; // msec val bidIncrement = 10; + override def run() = { var maxBid = minBid - bidIncrement; - var maxBidder: Actor = _; + var maxBidder: Actor = null; var running = true; + while (running) { - receiveWithin ((closing.getTime() - new Date().getTime())) { + receiveWithin (closing.getTime() - new Date().getTime()) { + case Offer(bid, client) => if (bid >= maxBid + bidIncrement) { - if (maxBid >= minBid) maxBidder send BeatenOffer(bid); + if (maxBid >= minBid) + maxBidder send BeatenOffer(bid); maxBid = bid; maxBidder = client; - client send BestOffer; + client send BestOffer() } else { - client send BeatenOffer(maxBid); - } + client send BeatenOffer(maxBid) + } case Inquire(client) => - client send Status(maxBid, closing); + client send Status(maxBid, closing) - case TIMEOUT => + case TIMEOUT() => if (maxBid >= minBid) { val reply = AuctionConcluded(seller, maxBidder); maxBidder send reply; - seller send reply; + seller send reply } else { - seller send AuctionFailed; + seller send AuctionFailed() } receiveWithin(timeToShutdown) { - case Offer(_, client) => client send AuctionOver - case TIMEOUT => running = false; + case Offer(_, client) => client send AuctionOver() + case TIMEOUT() => running = false } + } } } } + +////////////////////////// TEST ///////////////////////////////// + +object testAuction { + + val random = new java.util.Random(); + + val minBid = 100; + val closing = new Date(new Date().getTime() + 60000); + + val seller = new Actor { + override def run() = {} + } + val auction = new Auction(seller, minBid, closing); + + def client(i: int, increment: int, top: int) = new Actor { + val name = "Client " + i; + def log(msg: String) = System.out.println(name + ": " + msg); + var running = true; + var max: int = _; + var current: int = 0; + override def run() = { + log("started"); + auction send Inquire(this); + receive { + case Status(maxBid, _) => { + log("status(" + maxBid + ")"); + max = maxBid + } + } + while (running) { + if (max >= top) + log("too high for me") + else if (current < max) { + current = max + increment; + Thread.sleep(1 + random.nextInt(1000)); + auction send Offer(current, this); + } + receive { + case BestOffer() => { + log("bestOffer(" + current + ")"); + } + case BeatenOffer(maxBid) => { + log("beatenOffer(" + maxBid + ")"); + max = maxBid; + } + case AuctionConcluded(seller, maxBidder) => { + log("auctionConcluded"); + } + case AuctionOver() => { + running = false; + log("auctionOver"); + } + } + } + } + } + + def main(args: Array[String]) = { + seller.start(); + auction.start(); + client(1, 20, 200).start(); + client(2, 10, 300).start(); + } + +} diff --git a/sources/scala/concurrent/Actor.scala b/sources/scala/concurrent/Actor.scala index 9c1c5c2a57..dd4cee26d2 100644 --- a/sources/scala/concurrent/Actor.scala +++ b/sources/scala/concurrent/Actor.scala @@ -1,16 +1,18 @@ package scala.concurrent; -abstract class Actor extends Thread { +abstract class Actor extends Thread() { type Message = AnyRef; private val mb = new MailBox; - def send(msg: Message): Unit = + def send(msg: Message): unit = mb.send(msg); + def receive[a](f: PartialFunction[Message, a]): a = mb.receive(f); - def receiveWithin[a](msec: Long)(f: PartialFunction[Message, a]): a = + + def receiveWithin[a](msec: long)(f: PartialFunction[Message, a]): a = mb.receiveWithin(msec)(f); } diff --git a/sources/scala/concurrent/MailBox.scala b/sources/scala/concurrent/MailBox.scala index e83829acbc..4bec0ee461 100644 --- a/sources/scala/concurrent/MailBox.scala +++ b/sources/scala/concurrent/MailBox.scala @@ -1,68 +1,160 @@ package scala.concurrent; -import scala.collection.mutable.LinkedList; - -class MailBox with Monitor { +//class MailBox with Monitor with LinkedListQueueCreator { +class MailBox with Monitor with ListQueueCreator { type Message = AnyRef; - private abstract class Receiver extends Monitor { - type t; - val receiver: PartialFunction[Message, t]; - var msg: Message = _; - def receive(): t = synchronized { - if (msg != null) wait(); + private abstract class PreReceiver with Monitor { + var msg: Message = null; + def isDefinedAt(msg: Message): boolean; + } + + private class Receiver[a](receiver: PartialFunction[Message, a]) extends PreReceiver { + + def isDefinedAt(msg: Message) = receiver.isDefinedAt(msg); + + def receive(): a = synchronized { + if (msg == null) wait(); receiver(msg) } - def receiveWithin(msec: Long): t = synchronized { - if (msg != null) wait(msec); - receiver(if (msg != null) msg else TIMEOUT) + + def receiveWithin(msec: long): a = synchronized { + if (msg == null) wait(msec); + receiver(if (msg != null) msg else TIMEOUT()) } } - // !!! is this new correct ? - private val sent = new LinkedList[Message](null, null); - private var lastSent = sent; - // !!! is this new correct ? - private var receivers = new LinkedList[Receiver](null, null); - private var lastReceiver = receivers; - - def send(msg: Message): Unit = synchronized { - var rs = receivers, rs1 = rs.next; - // !!! does not compile - // !!! while (rs1 != null && !rs1.elem.receiver.isDefinedAt(msg)) { - // !!! rs = rs1; rs1 = rs1.next; - // !!! } - if (rs1 != null) { - rs.next = rs1.next; rs1.elem.msg = msg; rs1.elem.notify(); - } else { - // !!! does not compile - // !!! lastSent = lastSent.append(msg) + private val messageQueue = queueCreate[Message]; + private val receiverQueue = queueCreate[PreReceiver]; + + /** Unconsumed messages. */ + private var sent = messageQueue.make; + + /** Pending receivers. */ + private var receivers = receiverQueue.make; + + /** + * Check whether the receiver can be applied to an unconsumed message. + * If yes, the message is extracted and associated with the receiver. + * Otherwise the receiver is appended to the list of pending receivers. + */ + private def scanSentMsgs[a](receiver: Receiver[a]): unit = synchronized { + messageQueue.extractFirst(sent, msg => receiver.isDefinedAt(msg)) match { + case None => receivers = receiverQueue.append(receivers, receiver) + case Some(Pair(msg, withoutMsg)) => { + sent = withoutMsg; + receiver.msg = msg + } } } - def scanSentMsgs[a](r: Receiver { type t = a }): Unit = synchronized { - var ss = sent, ss1 = ss.next; - while (ss1 != null && !r.receiver.isDefinedAt(ss1.elem)) { - ss = ss1; ss1 = ss1.next - } - if (ss1 != null) { - ss.next = ss1.next; r.msg = ss1.elem; - } else { - // !!! does not compile - // !!! lastReceiver = lastReceiver append r; + /** + * First check whether a pending receiver is applicable to the sent + * message. If yes, the receiver is notified. Otherwise the message + * is appended to the linked list of sent messages. + */ + def send(msg: Message): unit = synchronized { + receiverQueue.extractFirst(receivers, r => r.isDefinedAt(msg)) match { + case None => sent = messageQueue.append(sent, msg) + case Some(Pair(receiver, withoutReceiver)) => { + receivers = withoutReceiver; + receiver.msg = msg; + receiver synchronized { receiver.notify() }; + } } } + /** + * Block until there is a message in the mailbox for which the processor + * <code>f</code> is defined. + */ def receive[a](f: PartialFunction[Message, a]): a = { - val r = new Receiver { type t = a; val receiver = f } + val r = new Receiver(f); scanSentMsgs(r); r.receive() } - def receiveWithin[a](msec: Long)(f: PartialFunction[Message, a]): a = { - val r = new Receiver { type t = a; val receiver = f } + /** + * Block until there is a message in the mailbox for which the processor + * <code>f</code> is defined or the timeout is over. + */ + def receiveWithin[a](msec: long)(f: PartialFunction[Message, a]): a = { + val r = new Receiver(f); scanSentMsgs(r); r.receiveWithin(msec) } + +} + +///////////////////////////////////////////////////////////////// + +/** +* Module for dealing with queues. +*/ +trait QueueModule[a] { + /** Type of queues. */ + type t; + /** Create an empty queue. */ + def make: t; + /** Append an element to a queue. */ + def append(l: t, x: a): t; + /** Extract an element satisfying a predicate from a queue. */ + def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]]; +} + +/** Inefficient but simple queue module creator. */ +trait ListQueueCreator { + def queueCreate[a]: QueueModule[a] = new QueueModule[a] { + type t = List[a]; + def make: t = Nil; + def append(l: t, x: a): t = l ::: List(x); + def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]] = + l match { + case Nil => None + case head :: tail => + if (p(head)) + Some(Pair(head, tail)) + else + extractFirst(tail, p) match { + case None => None + case Some(Pair(x, without_x)) => Some(Pair(x, head :: without_x)) + } + } + } +} + +/** Efficient queue module creator based on linked lists. */ +trait LinkedListQueueCreator { + import scala.collection.mutable.LinkedList; + def queueCreate[a <: AnyRef]: QueueModule[a] = new QueueModule[a] { + type t = Pair[LinkedList[a], LinkedList[a]]; // fst = the list, snd = last elem + def make: t = { + val l = new LinkedList[a](null, null); + Pair(l, l) + } + def append(l: t, x: a): t = { + val atTail = new LinkedList(x, null); + l._2 append atTail; + Pair(l._1, atTail) + } + def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]] = { + var xs = l._1; + var xs1 = xs.next; + while (xs1 != null && !p(xs1.elem)) { + xs = xs1; + xs1 = xs1.next; + } + if (xs1 != null) { + xs.next = xs1.next; + if (xs.next == null) + Some(Pair(xs1.elem, Pair(l._1, xs))) + else + Some(Pair(xs1.elem, l)) + } + else + None + } + } } + diff --git a/sources/scala/concurrent/TIMEOUT.scala b/sources/scala/concurrent/TIMEOUT.scala index 189ae0632f..5c26cf8611 100644 --- a/sources/scala/concurrent/TIMEOUT.scala +++ b/sources/scala/concurrent/TIMEOUT.scala @@ -1,3 +1,3 @@ package scala.concurrent; -case class TIMEOUT; +case class TIMEOUT(); |