From c12c3d3856adcd1ff72879bbe37e4b4c93da91b2 Mon Sep 17 00:00:00 2001 From: cremet Date: Fri, 3 Oct 2003 11:47:13 +0000 Subject: - Now the type of a guarded process contains th... - Now the type of a guarded process contains the result type of its continuation. All guarded processes in a choice must have the same type. The result type of a choice is now the common type of the continuations of all its guarded processes. --- sources/scala/concurrent/pilib.scala | 208 ++++------------------------------- 1 file changed, 20 insertions(+), 188 deletions(-) (limited to 'sources') diff --git a/sources/scala/concurrent/pilib.scala b/sources/scala/concurrent/pilib.scala index 0cd215863c..2514c0b0d2 100644 --- a/sources/scala/concurrent/pilib.scala +++ b/sources/scala/concurrent/pilib.scala @@ -28,12 +28,17 @@ object pilib with Monitor { type Name = AnyRef; /** A guarded process. - * @param a channel - * @param direction input (true) or output (false) + * @param n channel name + * @param polarity input (true) or output (false) * @param v transmitted value * @param c continuation */ - case class GP(a: Name, direction: boolean, v: Any, c: Any => unit); + case class GP(n: Name, polarity: boolean, v: Any, c: Any => Any); + + /** Typed guarded process. */ + class TGP[a](n: Name, polarity: boolean, v: Any, c: Any => a) { + val untyped = GP(n, polarity, v, c); + } ////////////////////////// CHANNELS ////////////////////////////// @@ -44,12 +49,12 @@ object pilib with Monitor { class Chan[a] with Function1[a, Product[a]] { /** Creates an input guarded process. */ - def input(c: a => unit) = - GP(this, true, (), v => c(v.asInstanceOf[a])); + def input[b](c: a => b) = + new TGP(this, true, (), x => c(x.asInstanceOf[a])); /** Creates an input guarded process. */ - def output(v: a, c: () => unit) = - GP(this, false, v, x => c()); + def output[b](v: a, c: () => b) = + new TGP(this, false, v, x => c()); /** Blocking read. */ def read = { @@ -63,7 +68,7 @@ object pilib with Monitor { choice ( output(x, () => ()) ); /** Syntactic sugar for input. */ - def *(f: a => unit) = + def *[b](f: a => b) = input(f); /** Syntactic sugar for output. */ @@ -72,7 +77,7 @@ object pilib with Monitor { } class Product[a](c: Chan[a], v: a) { - def *(def f: unit) = c.output(v, () => f); + def *[b](def f: b) = c.output(v, () => f); } //////////////////// SUM OF GUARDED PROCESSES ////////////////////// @@ -80,7 +85,7 @@ object pilib with Monitor { case class Sum(gs: List[GP]) with Monitor { /** Continuation of the sum. */ - var cont: () => unit = _; + var cont: () => Any = _; var initialized = false; @@ -94,7 +99,7 @@ object pilib with Monitor { } /** Set the values of parameters and awake the sleeping sum. */ - def set(f: () => unit) = synchronized { + def set(f: () => Any) = synchronized { cont = f; initialized = true; notify() @@ -106,7 +111,7 @@ object pilib with Monitor { private var sums: List[Sum] = Nil; /** Test if two lists of guarded processes can communicate. */ - private def matches(gs1: List[GP], gs2: List[GP]): Option[Pair[() => unit, () => unit]] = + private def matches(gs1: List[GP], gs2: List[GP]): Option[Pair[() => Any, () => Any]] = Pair(gs1, gs2) match { case Pair(Nil, _) => None case Pair(_, Nil) => None @@ -138,185 +143,12 @@ object pilib with Monitor { } /** Pi-calculus non-deterministic choice. */ - def choice(s: GP*): unit = { - val sum = Sum(s.asInstanceOf[List[GP]]); + def choice[a](s: TGP[a]*): a = { + val sum = Sum(s.asInstanceOf[List[TGP[a]]] map { x => x.untyped }); synchronized { sums = compare(sum, sums) }; - sum.continue - } - - -} - -/* Former version. - -package scala.concurrent; - -/** -* Library for using Pi-calculus concurrent primitives in Scala. -*/ -object pilib with Monitor { - - /** - * Run several processes in parallel using the following syntax: - * spawn < p_1 | ... | p_n > - */ - trait Spawn { - def <(def p: unit): Spawn; - def |(def p: unit): Spawn; - def > : unit; - } - val spawn = new Spawn { -// object spawn extends Spawn { - def <(def p: unit): Spawn = { concurrent.ops.spawn(p); this } - def |(def p: unit): Spawn = { concurrent.ops.spawn(p); this } - def > : unit = () - } - - type Sum = List[GuardedProcess]; - - /** List of pending choices. */ - private var sums: List[Sum] = Nil; - - /** - * Look in the given sum for a branch guarded by an input on the - * name a. - */ - private def lookupIn(s: Sum, a: AbstractChan): Option[Any => unit] = - s match { - case Nil => None - case InputGuardedProcess(b, p) :: rest => if (a == b) Some(p) else lookupIn(rest, a) - case OutputGuardedProcess(_, _, _) :: rest => lookupIn(rest, a) - }; - - /** - * Look in the given sum for a branch guarded by an output on the - * name a. - */ - private def lookupOut(sum: Sum, a: AbstractChan): Option[Pair[Any, () => unit]] = - sum match { - case Nil => None - case OutputGuardedProcess(b, x, q) :: rest => if (a == b) Some(Pair(x, q)) else lookupOut(rest, a) - case InputGuardedProcess(_, _) :: rest => lookupOut(rest, a) - }; - - /** - * Check if the two given sums can communicate, returns the parameters of the - * communication in this case. - */ - private def canCom(s1: Sum, s2: Sum): Option[Triple[Any, Any => unit, () => unit]] = - s1 match { - case Nil => None - case InputGuardedProcess(a, p) :: rest => lookupOut(s2, a) match { - case None => canCom(rest, s2) - case Some(Pair(x, q)) => Some(Triple(x, p, q)) - } - case OutputGuardedProcess(b, x, q) :: rest => lookupIn(s2, b) match { - case None => canCom(rest, s2) - case Some(p) => Some(Triple(x, p, q)) - } - }; - - /** - * Tries to find in the list of pending sums one which can performs a communication - * with the given sum. In this case, removes the matching pending sum from the list and - * does the communication. Otherwise adds the given sum at the end of the pending sums. - */ - private def communicateWith(s: Sum): unit = { - def comWith(hd: List[Sum], tl: List[Sum]): List[Sum] = - tl match { - case Nil => hd ::: List(s) - case s1 :: rest => canCom(s, s1) match { - case None => comWith(hd ::: List(s1), rest) - case Some(Triple(x, p, q)) => { - concurrent.ops.spawn(p(x)); - concurrent.ops.spawn(q()); - hd ::: rest - } - } - }; - - synchronized { - sums = comWith(Nil, sums) - } - } - - /** - * Represents a guarded process in a non-deterministic choice. - */ - abstract class GuardedProcess; - - /** - * Process guarded by an input prefix in a sum. - */ - case class InputGuardedProcess(a: AbstractChan, p: Any => unit) extends GuardedProcess; - - /** - * Process guarded by an output prefix in a sum. - */ - case class OutputGuardedProcess(b: AbstractChan, x: Any, q: () => unit) extends GuardedProcess; - - abstract class AbstractChan; - - /** - * Name on which one can emit, receive or that can be emitted or received - * during a communication. - */ - class Chan[a] extends AbstractChan with Function1[a, Product[a]] { - - /** - * Creates an input guarded process. - */ - def input(p: a => unit): GuardedProcess = InputGuardedProcess(this, (x => p(x.asInstanceOf[a]))); - - /** - * Creates an input guarded process. - */ - def output(x: a, q: () => unit): GuardedProcess = OutputGuardedProcess(this, x, q); - - /** - * Blocking read. - */ - def read: a = { - val res = new concurrent.SyncVar[a]; - choice ( input(x: a => res.set(x)) ); - res.get - } - - /** - * Blocking write. - */ - def write(x: a): unit = choice ( output(x, () => ()) ); - - /** - * Syntactic sugar. - */ - def *(f: a => unit) = input(f); - - def apply(x: a): Product[a] = - new Product[a](this, x); - - } - /** - * Syntactic sugar - */ - class Product[a](c: Chan[a], x: a) { - def *(def f: unit) = c.output(x, (() => f)); + (sum.continue).asInstanceOf[a] } - /** - * Evaluates a choice (is non blocking). - */ - def choice(s: GuardedProcess*): unit = { - val done = new concurrent.Lock; - done.acquire; - val new_branches: Sum = (s.asInstanceOf[List[GuardedProcess]]) map { - case InputGuardedProcess(a, p) => InputGuardedProcess(a, (x => { p(x); done.release })) - case OutputGuardedProcess(b, x, q) => OutputGuardedProcess(b, x, (() => { q(); done.release })) - }; - communicateWith(new_branches); - done.acquire - } } -*/ -- cgit v1.2.3