diff options
Diffstat (limited to 'doc/reference/ScalaByExample.tex')
-rw-r--r-- | doc/reference/ScalaByExample.tex | 117 |
1 files changed, 71 insertions, 46 deletions
diff --git a/doc/reference/ScalaByExample.tex b/doc/reference/ScalaByExample.tex index a73946f0f0..4885f7457f 100644 --- a/doc/reference/ScalaByExample.tex +++ b/doc/reference/ScalaByExample.tex @@ -6384,9 +6384,11 @@ A fundamental way of interprocess communication is the asynchronous channel. Its implementation makes use the following class for linked lists: \begin{lstlisting} -class LinkedList[a](x: a) { - val elem: a = x; - var next: LinkedList[a] = null; +package scala.collection.mutable; +class LinkedList[A](head: A, tail: LinkedList[A]) + extends SingleLinkedList[A, LinkedList[A]] { + elem = head; + next = tail; } \end{lstlisting} To facilitate insertion and deletion of elements into linked lists, @@ -6395,22 +6397,27 @@ the node which conceptually forms the top of the list. Empty linked lists start with a dummy node, whose successor is \code{null}. The channel class uses a linked list to store data that has been sent -but not read yet. In the opposite direction, a signal \code{moreData} is -used to wake up reader threads that wait for data. +but not read yet. In the opposite direction, a threads that +wish to read from an empty channel, register their presence by +incrementing the \code{nreaders} field and waiting to be notified. \begin{lstlisting} -class Channel[a] { - private val written = new LinkedList[a](null); +package scala.concurrent; +class Channel[a] with Monitor { + var dummy: a = _; + private var written = new LinkedList[a](dummy, null); private var lastWritten = written; - private val moreData = new Signal; + private var nreaders = 0; - def write(x: a) = { - lastWritten.next = new LinkedList(x); + def write(x: a) = synchronized { + lastWritten.next = new LinkedList(x, null); lastWritten = lastWritten.next; - moreData.notify; + if (nreaders > 0) notify(); } - def read: a = { - if (written.next == null) moreData.wait; + def read: a = synchronized { + if (written.next == null) { + nreaders = nreaders + 1; wait(); nreaders = nreaders - 1; + } written = written.next; written.elem; } @@ -6424,23 +6431,29 @@ a message blocks until that message has been received. Synchronous channels only need a single variable to store messages in transit, but three signals are used to coordinate reader and writer processes. \begin{lstlisting} -class SyncChannel[a] { - val data = new SyncVar[a]; - - def write(x: a): unit = synchronized { - val empty = new Signal, full = new Signal, idle = new Signal; - if (data.isSet) idle.wait; - data.put(x); - full.send; - empty.wait; - data.unset; - idle.send; +package scala.concurrent; + +class SyncChannel[a] with Monitor { + private var data: a = _; + private var reading = false; + private var writing = false; + + def write(x: a) = synchronized { + await(!writing); + data = x; + writing = true; + if (reading) notifyAll(); + else await(reading) } def read: a = synchronized { - if (!(data.isSet)) full.wait; - x = data.get; - empty.send; + await(!reading); + reading = true; + await(writing); + val x = data; + writing = false; + reading = false; + notifyAll(); x } } @@ -6458,39 +6471,40 @@ the overhead inherent in context-switching several threads on a single processor. \begin{lstlisting} -class ComputeServer(n: int) { - private abstract class Job { - abstract type t; +import scala.concurrent._, scala.concurrent.ops._; + +class ComputeServer(n: Int) { + + private trait Job { + type t; def task: t; - def return(x: t): unit; + def ret(x: t): Unit; } - private val openJobs = new Channel[Job] + private val openJobs = new Channel[Job](); - private def processor: unit = { + private def processor(i: Int): Unit = { while (true) { val job = openJobs.read; - job.return(job.task) + job.ret(job.task) } } -\end{lstlisting} -\begin{lstlisting} + def future[a](def p: a): () => a = { - val reply = new SyncVar[a]; - openJobs.write( - new Job { - type t = a; - def task = p; - def return(x: a) = reply.set(x); + val reply = new SyncVar[a](); + openJobs.write{ + new Job { + type t = a; + def task = p; + def ret(x: a) = reply.set(x); } - ) - (=> reply.get) + } + () => reply.get } - replicate(n){processor}; + spawn(replicate(0, n) { processor }) } \end{lstlisting} - Expressions to be computed (i.e. arguments to calls of \code{future}) are written to the \code{openJobs} channel. A {\em job} is an object with @@ -6521,6 +6535,17 @@ between different jobs. Without abstract types it would be impossible to implement the same class to the user in a statically type-safe way, without relying on dynamic type tests and type casts. + +Here is some code which uses the compute server to evaluate +the expression \code{41 + 1}. +\begin{lstlisting} +object Test with Executable { + val server = new ComputeServer(1); + val f = server.future(41 + 1); + Console.println(f()) +} +\end{lstlisting} + \section{Mailboxes} \label{sec:mailbox} |