summaryrefslogtreecommitdiff
path: root/sources
diff options
context:
space:
mode:
authorcremet <cremet@epfl.ch>2003-09-25 15:49:41 +0000
committercremet <cremet@epfl.ch>2003-09-25 15:49:41 +0000
commit50fc9d84a0390a7e6282bd2b4ce398fe122f607c (patch)
treeeb4f540a25a9028f00e3be324332bd0616d59e0e /sources
parent2b811578d49380002d2c39fde8ed250c9aa00792 (diff)
downloadscala-50fc9d84a0390a7e6282bd2b4ce398fe122f607c.tar.gz
scala-50fc9d84a0390a7e6282bd2b4ce398fe122f607c.tar.bz2
scala-50fc9d84a0390a7e6282bd2b4ce398fe122f607c.zip
- Better implementation of Pilib with fewer thr...
- Better implementation of Pilib with fewer thread creations.
Diffstat (limited to 'sources')
-rw-r--r--sources/scala/concurrent/pilib.scala156
1 files changed, 156 insertions, 0 deletions
diff --git a/sources/scala/concurrent/pilib.scala b/sources/scala/concurrent/pilib.scala
index 6a90c46068..c3cf55bd2d 100644
--- a/sources/scala/concurrent/pilib.scala
+++ b/sources/scala/concurrent/pilib.scala
@@ -5,6 +5,161 @@ package scala.concurrent;
*/
object pilib with Monitor {
+ /////////////////////////// SPAWN //////////////////////////////
+
+ /**
+ * Run several processes in parallel using the following syntax:
+ * spawn < p_1 | ... | p_n >
+ */
+ trait Spawn {
+ def <(def p: unit): Spawn;
+ def |(def p: unit): Spawn;
+ def > : unit;
+ }
+ val spawn = new Spawn {
+ // object spawn extends Spawn { BUG !
+ def <(def p: unit): Spawn = { ops.spawn(p); this }
+ def |(def p: unit): Spawn = { ops.spawn(p); this }
+ def > : unit = ()
+ }
+
+ //////////////////////// GUARDED PROCESSES /////////////////////////
+
+ type Name = AnyRef;
+
+ /** A guarded process.
+ * @param a channel
+ * @param direction input (true) or output (false)
+ * @param v transmitted value
+ * @param c continuation
+ */
+ case class GP(a: Name, direction: boolean, v: Any, c: Any => unit);
+
+ ////////////////////////// CHANNELS //////////////////////////////
+
+ /**
+ * Name on which one can emit, receive or that can be emitted or received
+ * during a communication.
+ */
+ class Chan[a] with Function1[a, Product[a]] {
+
+ /** Creates an input guarded process. */
+ def input(c: a => unit) =
+ GP(this, true, (), v => c(v.asInstanceOf[a]));
+
+ /** Creates an input guarded process. */
+ def output(v: a, c: () => unit) =
+ GP(this, false, v, x => c());
+
+ /** Blocking read. */
+ def read = {
+ var res: a = _;
+ choice ( input(x => res = x) );
+ res
+ }
+
+ /** Blocking write. */
+ def write(x: a) =
+ choice ( output(x, () => ()) );
+
+ /** Syntactic sugar for input. */
+ def *(f: a => unit) =
+ input(f);
+
+ /** Syntactic sugar for output. */
+ def apply(v: a) =
+ new Product(this, v);
+ }
+
+ class Product[a](c: Chan[a], v: a) {
+ def *(def f: unit) = c.output(v, () => f);
+ }
+
+ //////////////////// SUM OF GUARDED PROCESSES //////////////////////
+
+ case class Sum(gs: List[GP]) with Monitor {
+
+ /** Continuation of the sum. */
+ var fun: Any => unit = _;
+ /** Argument of the continuation of the sum. */
+ var arg: Any = _;
+
+ var initialized = false;
+
+ /**
+ * Block if not initialized otherwise continue with the
+ * continuation.
+ */
+ def continue = synchronized {
+ if (!initialized) wait();
+ fun(arg)
+ }
+
+ /** Set the values of parameters and awake the sleeping sum. */
+ def set(f: Any => unit, x: Any) = synchronized {
+ fun = f;
+ arg = x;
+ initialized = true;
+ notify()
+ }
+ }
+
+ /////////////////////////// COMMUNICATION //////////////////////////
+
+ private var sums: List[Sum] = Nil;
+
+ /** Test if two lists of guarded processes can communicate. */
+ private def matches(gs1: List[GP], gs2: List[GP]):
+ Option[Tuple4[Any, Any => unit, Any, Any => unit]] =
+ Pair(gs1, gs2) match {
+ case Pair(Nil, _) => None
+ case Pair(_, Nil) => None
+ case Pair(GP(a1, d1, v1, c1) :: rest1, GP(a2, d2, v2, c2) :: rest2) =>
+ if (a1 == a2 && d1 == !d2)
+ Some(Tuple4(v1, c1, v2, c2))
+ else matches(gs1, rest2) match {
+ case None => matches(rest1, gs2)
+ case Some(t) => Some(t)
+ }
+ }
+
+ /**
+ * Test if the given sum can react with one of the pending sums.
+ * If yes then do the reaction otherwise append the sum at the end
+ * of the pending sums.
+ */
+ private def compare(s1: Sum, ss: List[Sum]): List[Sum] =
+ ss match {
+ case Nil => ss ::: List(s1)
+ case s2 :: rest => matches(s1.gs, s2.gs) match {
+ case None => s2 :: compare(s1, rest)
+ case Some(Tuple4(v1, c1, v2, c2)) => {
+ s1.set(c1, v2);
+ s2.set(c2, v1);
+ rest
+ }
+ }
+ }
+
+ /** Pi-calculus non-deterministic choice. */
+ def choice(s: GP*): unit = {
+ val sum = Sum(s.asInstanceOf[List[GP]]);
+ synchronized { sums = compare(sum, sums) };
+ sum.continue
+ }
+
+
+}
+
+/* Former version.
+
+package scala.concurrent;
+
+/**
+* Library for using Pi-calculus concurrent primitives in Scala.
+*/
+object pilib with Monitor {
+
/**
* Run several processes in parallel using the following syntax:
* spawn < p_1 | ... | p_n >
@@ -168,3 +323,4 @@ object pilib with Monitor {
}
+*/