From 50fc9d84a0390a7e6282bd2b4ce398fe122f607c Mon Sep 17 00:00:00 2001 From: cremet Date: Thu, 25 Sep 2003 15:49:41 +0000 Subject: - Better implementation of Pilib with fewer thr... - Better implementation of Pilib with fewer thread creations. --- sources/scala/concurrent/pilib.scala | 156 +++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/sources/scala/concurrent/pilib.scala b/sources/scala/concurrent/pilib.scala index 6a90c46068..c3cf55bd2d 100644 --- a/sources/scala/concurrent/pilib.scala +++ b/sources/scala/concurrent/pilib.scala @@ -1,5 +1,160 @@ package scala.concurrent; +/** +* Library for using Pi-calculus concurrent primitives in Scala. +*/ +object pilib with Monitor { + + /////////////////////////// SPAWN ////////////////////////////// + + /** + * Run several processes in parallel using the following syntax: + * spawn < p_1 | ... | p_n > + */ + trait Spawn { + def <(def p: unit): Spawn; + def |(def p: unit): Spawn; + def > : unit; + } + val spawn = new Spawn { + // object spawn extends Spawn { BUG ! + def <(def p: unit): Spawn = { ops.spawn(p); this } + def |(def p: unit): Spawn = { ops.spawn(p); this } + def > : unit = () + } + + //////////////////////// GUARDED PROCESSES ///////////////////////// + + type Name = AnyRef; + + /** A guarded process. + * @param a channel + * @param direction input (true) or output (false) + * @param v transmitted value + * @param c continuation + */ + case class GP(a: Name, direction: boolean, v: Any, c: Any => unit); + + ////////////////////////// CHANNELS ////////////////////////////// + + /** + * Name on which one can emit, receive or that can be emitted or received + * during a communication. + */ + class Chan[a] with Function1[a, Product[a]] { + + /** Creates an input guarded process. */ + def input(c: a => unit) = + GP(this, true, (), v => c(v.asInstanceOf[a])); + + /** Creates an input guarded process. */ + def output(v: a, c: () => unit) = + GP(this, false, v, x => c()); + + /** Blocking read. */ + def read = { + var res: a = _; + choice ( input(x => res = x) ); + res + } + + /** Blocking write. */ + def write(x: a) = + choice ( output(x, () => ()) ); + + /** Syntactic sugar for input. */ + def *(f: a => unit) = + input(f); + + /** Syntactic sugar for output. */ + def apply(v: a) = + new Product(this, v); + } + + class Product[a](c: Chan[a], v: a) { + def *(def f: unit) = c.output(v, () => f); + } + + //////////////////// SUM OF GUARDED PROCESSES ////////////////////// + + case class Sum(gs: List[GP]) with Monitor { + + /** Continuation of the sum. */ + var fun: Any => unit = _; + /** Argument of the continuation of the sum. */ + var arg: Any = _; + + var initialized = false; + + /** + * Block if not initialized otherwise continue with the + * continuation. + */ + def continue = synchronized { + if (!initialized) wait(); + fun(arg) + } + + /** Set the values of parameters and awake the sleeping sum. */ + def set(f: Any => unit, x: Any) = synchronized { + fun = f; + arg = x; + initialized = true; + notify() + } + } + + /////////////////////////// COMMUNICATION ////////////////////////// + + private var sums: List[Sum] = Nil; + + /** Test if two lists of guarded processes can communicate. */ + private def matches(gs1: List[GP], gs2: List[GP]): + Option[Tuple4[Any, Any => unit, Any, Any => unit]] = + Pair(gs1, gs2) match { + case Pair(Nil, _) => None + case Pair(_, Nil) => None + case Pair(GP(a1, d1, v1, c1) :: rest1, GP(a2, d2, v2, c2) :: rest2) => + if (a1 == a2 && d1 == !d2) + Some(Tuple4(v1, c1, v2, c2)) + else matches(gs1, rest2) match { + case None => matches(rest1, gs2) + case Some(t) => Some(t) + } + } + + /** + * Test if the given sum can react with one of the pending sums. + * If yes then do the reaction otherwise append the sum at the end + * of the pending sums. + */ + private def compare(s1: Sum, ss: List[Sum]): List[Sum] = + ss match { + case Nil => ss ::: List(s1) + case s2 :: rest => matches(s1.gs, s2.gs) match { + case None => s2 :: compare(s1, rest) + case Some(Tuple4(v1, c1, v2, c2)) => { + s1.set(c1, v2); + s2.set(c2, v1); + rest + } + } + } + + /** Pi-calculus non-deterministic choice. */ + def choice(s: GP*): unit = { + val sum = Sum(s.asInstanceOf[List[GP]]); + synchronized { sums = compare(sum, sums) }; + sum.continue + } + + +} + +/* Former version. + +package scala.concurrent; + /** * Library for using Pi-calculus concurrent primitives in Scala. */ @@ -168,3 +323,4 @@ object pilib with Monitor { } +*/ -- cgit v1.2.3