summaryrefslogtreecommitdiff
path: root/sources/scala/concurrent/pilib.scala
blob: 6a90c4606862d44311ddcbad2486702c231fe2bb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package scala.concurrent;

/**
* Library for using Pi-calculus concurrent primitives in Scala.
*/
object pilib with Monitor {

  /**
  * Run several processes in parallel using the following syntax:
  * spawn < p_1 | ... | p_n >
  */
  trait Spawn {
    def <(def p: unit): Spawn;
    def |(def p: unit): Spawn;
    def > : unit;
  }
  val spawn = new Spawn {
//  object spawn extends Spawn {
    def <(def p: unit): Spawn = { concurrent.ops.spawn(p); this }
    def |(def p: unit): Spawn = { concurrent.ops.spawn(p); this }
    def > : unit = ()
  }

  type Sum = List[GuardedProcess];

  /** List of pending choices. */
  private var sums: List[Sum] = Nil;

  /**
  * Look in the given sum for a branch guarded by an input on the
  * name a.
  */
  private def lookupIn(s: Sum, a: AbstractChan): Option[Any => unit] =
    s match {
      case Nil => None
      case InputGuardedProcess(b, p) :: rest => if (a == b) Some(p) else lookupIn(rest, a)
      case OutputGuardedProcess(_, _, _) :: rest => lookupIn(rest, a)
    };

  /**
  * Look in the given sum for a branch guarded by an output on the
  * name a.
  */
  private def lookupOut(sum: Sum, a: AbstractChan): Option[Pair[Any, () => unit]] =
    sum match {
      case Nil => None
      case OutputGuardedProcess(b, x, q) :: rest => if (a == b) Some(Pair(x, q)) else lookupOut(rest, a)
      case InputGuardedProcess(_, _) :: rest => lookupOut(rest, a)
    };

  /**
  * Check if the two given sums can communicate, returns the parameters of the
  * communication in this case.
  */
  private def canCom(s1: Sum, s2: Sum): Option[Triple[Any, Any => unit, () => unit]] =
    s1 match {
      case Nil => None
      case InputGuardedProcess(a, p) :: rest => lookupOut(s2, a) match {
	case None => canCom(rest, s2)
	case Some(Pair(x, q)) => Some(Triple(x, p, q))
      }
      case OutputGuardedProcess(b, x, q) :: rest => lookupIn(s2, b) match {
	case None => canCom(rest, s2)
	case Some(p) => Some(Triple(x, p, q))
      }
    };

  /**
  * Tries to find in the list of pending sums one which can performs a communication
  * with the given sum. In this case, removes the matching pending sum from the list and
  * does the communication. Otherwise adds the given sum at the end of the pending sums.
  */
  private def communicateWith(s: Sum): unit = {
    def comWith(hd: List[Sum], tl: List[Sum]): List[Sum] =
      tl match {
	case Nil => hd ::: List(s)
	case s1 :: rest => canCom(s, s1) match {
	  case None => comWith(hd ::: List(s1), rest)
	  case Some(Triple(x, p, q)) => {
	    concurrent.ops.spawn(p(x));
	    concurrent.ops.spawn(q());
	    hd ::: rest
	  }
	}
      };

    synchronized {
      sums = comWith(Nil, sums)
    }
  }

  /**
  * Represents a guarded process in a non-deterministic choice.
  */
  abstract class GuardedProcess;

  /**
  * Process guarded by an input prefix in a sum.
  */
  case class InputGuardedProcess(a: AbstractChan, p: Any => unit) extends GuardedProcess;

  /**
  * Process guarded by an output prefix in a sum.
  */
  case class OutputGuardedProcess(b: AbstractChan, x: Any, q: () => unit) extends GuardedProcess;

  abstract class AbstractChan;

  /**
  * Name on which one can emit, receive or that can be emitted or received
  * during a communication.
  */
  class Chan[a] extends AbstractChan with Function1[a, Product[a]] {

    /**
    * Creates an input guarded process.
    */
    def input(p: a => unit): GuardedProcess = InputGuardedProcess(this, (x => p(x.asInstanceOf[a])));

    /**
    * Creates an input guarded process.
    */
    def output(x: a, q: () => unit): GuardedProcess = OutputGuardedProcess(this, x, q);

    /**
    * Blocking read.
    */
    def read: a = {
      val res = new concurrent.SyncVar[a];
      choice ( input(x: a => res.set(x)) );
      res.get
    }

    /**
    * Blocking write.
    */
    def write(x: a): unit = choice ( output(x, () => ()) );

    /**
    * Syntactic sugar.
    */
    def *(f: a => unit) = input(f);

    def apply(x: a): Product[a] =
      new Product[a](this, x);

  }
  /**
  * Syntactic sugar
  */
  class Product[a](c: Chan[a], x: a) {
    def *(def f: unit) = c.output(x, (() => f));
  }

  /**
  * Evaluates a choice (is non blocking).
  */
  def choice(s: GuardedProcess*): unit = {
    val done = new concurrent.Lock;
    done.acquire;
    val new_branches: Sum = (s.asInstanceOf[List[GuardedProcess]]) map {
      case InputGuardedProcess(a, p) => InputGuardedProcess(a, (x => { p(x); done.release }))
      case OutputGuardedProcess(b, x, q) => OutputGuardedProcess(b, x, (() => { q(); done.release }))
    };
    communicateWith(new_branches);
    done.acquire
  }

}