summaryrefslogtreecommitdiff
path: root/sources
diff options
context:
space:
mode:
authorcremet <cremet@epfl.ch>2003-10-03 11:47:13 +0000
committercremet <cremet@epfl.ch>2003-10-03 11:47:13 +0000
commitc12c3d3856adcd1ff72879bbe37e4b4c93da91b2 (patch)
treefdbbbb77fa3e6a1e26680208381af1ee630890bf /sources
parent169b9a7ebe91aab60a2f4d21b2eeb34e5e27dec2 (diff)
downloadscala-c12c3d3856adcd1ff72879bbe37e4b4c93da91b2.tar.gz
scala-c12c3d3856adcd1ff72879bbe37e4b4c93da91b2.tar.bz2
scala-c12c3d3856adcd1ff72879bbe37e4b4c93da91b2.zip
- Now the type of a guarded process contains th...
- Now the type of a guarded process contains the result type of its continuation. All guarded processes in a choice must have the same type. The result type of a choice is now the common type of the continuations of all its guarded processes.
Diffstat (limited to 'sources')
-rw-r--r--sources/scala/concurrent/pilib.scala208
1 files changed, 20 insertions, 188 deletions
diff --git a/sources/scala/concurrent/pilib.scala b/sources/scala/concurrent/pilib.scala
index 0cd215863c..2514c0b0d2 100644
--- a/sources/scala/concurrent/pilib.scala
+++ b/sources/scala/concurrent/pilib.scala
@@ -28,12 +28,17 @@ object pilib with Monitor {
type Name = AnyRef;
/** A guarded process.
- * @param a channel
- * @param direction input (true) or output (false)
+ * @param n channel name
+ * @param polarity input (true) or output (false)
* @param v transmitted value
* @param c continuation
*/
- case class GP(a: Name, direction: boolean, v: Any, c: Any => unit);
+ case class GP(n: Name, polarity: boolean, v: Any, c: Any => Any);
+
+ /** Typed guarded process. */
+ class TGP[a](n: Name, polarity: boolean, v: Any, c: Any => a) {
+ val untyped = GP(n, polarity, v, c);
+ }
////////////////////////// CHANNELS //////////////////////////////
@@ -44,12 +49,12 @@ object pilib with Monitor {
class Chan[a] with Function1[a, Product[a]] {
/** Creates an input guarded process. */
- def input(c: a => unit) =
- GP(this, true, (), v => c(v.asInstanceOf[a]));
+ def input[b](c: a => b) =
+ new TGP(this, true, (), x => c(x.asInstanceOf[a]));
/** Creates an input guarded process. */
- def output(v: a, c: () => unit) =
- GP(this, false, v, x => c());
+ def output[b](v: a, c: () => b) =
+ new TGP(this, false, v, x => c());
/** Blocking read. */
def read = {
@@ -63,7 +68,7 @@ object pilib with Monitor {
choice ( output(x, () => ()) );
/** Syntactic sugar for input. */
- def *(f: a => unit) =
+ def *[b](f: a => b) =
input(f);
/** Syntactic sugar for output. */
@@ -72,7 +77,7 @@ object pilib with Monitor {
}
class Product[a](c: Chan[a], v: a) {
- def *(def f: unit) = c.output(v, () => f);
+ def *[b](def f: b) = c.output(v, () => f);
}
//////////////////// SUM OF GUARDED PROCESSES //////////////////////
@@ -80,7 +85,7 @@ object pilib with Monitor {
case class Sum(gs: List[GP]) with Monitor {
/** Continuation of the sum. */
- var cont: () => unit = _;
+ var cont: () => Any = _;
var initialized = false;
@@ -94,7 +99,7 @@ object pilib with Monitor {
}
/** Set the values of parameters and awake the sleeping sum. */
- def set(f: () => unit) = synchronized {
+ def set(f: () => Any) = synchronized {
cont = f;
initialized = true;
notify()
@@ -106,7 +111,7 @@ object pilib with Monitor {
private var sums: List[Sum] = Nil;
/** Test if two lists of guarded processes can communicate. */
- private def matches(gs1: List[GP], gs2: List[GP]): Option[Pair[() => unit, () => unit]] =
+ private def matches(gs1: List[GP], gs2: List[GP]): Option[Pair[() => Any, () => Any]] =
Pair(gs1, gs2) match {
case Pair(Nil, _) => None
case Pair(_, Nil) => None
@@ -138,185 +143,12 @@ object pilib with Monitor {
}
/** Pi-calculus non-deterministic choice. */
- def choice(s: GP*): unit = {
- val sum = Sum(s.asInstanceOf[List[GP]]);
+ def choice[a](s: TGP[a]*): a = {
+ val sum = Sum(s.asInstanceOf[List[TGP[a]]] map { x => x.untyped });
synchronized { sums = compare(sum, sums) };
- sum.continue
- }
-
-
-}
-
-/* Former version.
-
-package scala.concurrent;
-
-/**
-* Library for using Pi-calculus concurrent primitives in Scala.
-*/
-object pilib with Monitor {
-
- /**
- * Run several processes in parallel using the following syntax:
- * spawn < p_1 | ... | p_n >
- */
- trait Spawn {
- def <(def p: unit): Spawn;
- def |(def p: unit): Spawn;
- def > : unit;
- }
- val spawn = new Spawn {
-// object spawn extends Spawn {
- def <(def p: unit): Spawn = { concurrent.ops.spawn(p); this }
- def |(def p: unit): Spawn = { concurrent.ops.spawn(p); this }
- def > : unit = ()
- }
-
- type Sum = List[GuardedProcess];
-
- /** List of pending choices. */
- private var sums: List[Sum] = Nil;
-
- /**
- * Look in the given sum for a branch guarded by an input on the
- * name a.
- */
- private def lookupIn(s: Sum, a: AbstractChan): Option[Any => unit] =
- s match {
- case Nil => None
- case InputGuardedProcess(b, p) :: rest => if (a == b) Some(p) else lookupIn(rest, a)
- case OutputGuardedProcess(_, _, _) :: rest => lookupIn(rest, a)
- };
-
- /**
- * Look in the given sum for a branch guarded by an output on the
- * name a.
- */
- private def lookupOut(sum: Sum, a: AbstractChan): Option[Pair[Any, () => unit]] =
- sum match {
- case Nil => None
- case OutputGuardedProcess(b, x, q) :: rest => if (a == b) Some(Pair(x, q)) else lookupOut(rest, a)
- case InputGuardedProcess(_, _) :: rest => lookupOut(rest, a)
- };
-
- /**
- * Check if the two given sums can communicate, returns the parameters of the
- * communication in this case.
- */
- private def canCom(s1: Sum, s2: Sum): Option[Triple[Any, Any => unit, () => unit]] =
- s1 match {
- case Nil => None
- case InputGuardedProcess(a, p) :: rest => lookupOut(s2, a) match {
- case None => canCom(rest, s2)
- case Some(Pair(x, q)) => Some(Triple(x, p, q))
- }
- case OutputGuardedProcess(b, x, q) :: rest => lookupIn(s2, b) match {
- case None => canCom(rest, s2)
- case Some(p) => Some(Triple(x, p, q))
- }
- };
-
- /**
- * Tries to find in the list of pending sums one which can performs a communication
- * with the given sum. In this case, removes the matching pending sum from the list and
- * does the communication. Otherwise adds the given sum at the end of the pending sums.
- */
- private def communicateWith(s: Sum): unit = {
- def comWith(hd: List[Sum], tl: List[Sum]): List[Sum] =
- tl match {
- case Nil => hd ::: List(s)
- case s1 :: rest => canCom(s, s1) match {
- case None => comWith(hd ::: List(s1), rest)
- case Some(Triple(x, p, q)) => {
- concurrent.ops.spawn(p(x));
- concurrent.ops.spawn(q());
- hd ::: rest
- }
- }
- };
-
- synchronized {
- sums = comWith(Nil, sums)
- }
- }
-
- /**
- * Represents a guarded process in a non-deterministic choice.
- */
- abstract class GuardedProcess;
-
- /**
- * Process guarded by an input prefix in a sum.
- */
- case class InputGuardedProcess(a: AbstractChan, p: Any => unit) extends GuardedProcess;
-
- /**
- * Process guarded by an output prefix in a sum.
- */
- case class OutputGuardedProcess(b: AbstractChan, x: Any, q: () => unit) extends GuardedProcess;
-
- abstract class AbstractChan;
-
- /**
- * Name on which one can emit, receive or that can be emitted or received
- * during a communication.
- */
- class Chan[a] extends AbstractChan with Function1[a, Product[a]] {
-
- /**
- * Creates an input guarded process.
- */
- def input(p: a => unit): GuardedProcess = InputGuardedProcess(this, (x => p(x.asInstanceOf[a])));
-
- /**
- * Creates an input guarded process.
- */
- def output(x: a, q: () => unit): GuardedProcess = OutputGuardedProcess(this, x, q);
-
- /**
- * Blocking read.
- */
- def read: a = {
- val res = new concurrent.SyncVar[a];
- choice ( input(x: a => res.set(x)) );
- res.get
- }
-
- /**
- * Blocking write.
- */
- def write(x: a): unit = choice ( output(x, () => ()) );
-
- /**
- * Syntactic sugar.
- */
- def *(f: a => unit) = input(f);
-
- def apply(x: a): Product[a] =
- new Product[a](this, x);
-
- }
- /**
- * Syntactic sugar
- */
- class Product[a](c: Chan[a], x: a) {
- def *(def f: unit) = c.output(x, (() => f));
+ (sum.continue).asInstanceOf[a]
}
- /**
- * Evaluates a choice (is non blocking).
- */
- def choice(s: GuardedProcess*): unit = {
- val done = new concurrent.Lock;
- done.acquire;
- val new_branches: Sum = (s.asInstanceOf[List[GuardedProcess]]) map {
- case InputGuardedProcess(a, p) => InputGuardedProcess(a, (x => { p(x); done.release }))
- case OutputGuardedProcess(b, x, q) => OutputGuardedProcess(b, x, (() => { q(); done.release }))
- };
- communicateWith(new_branches);
- done.acquire
- }
}
-*/