summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/pilib.scala
blob: 506b0c2d2e9b7eb3a569fd380a684488d9ad67cd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2006, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |                                         **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

// $Id$


package scala.concurrent


/**
 * Library for using Pi-calculus concurrent primitives in Scala. As an
 * example, the definition of a two-place buffer using the <code>pilib</code>
 * library looks like:
 * <pre>
 * <b>def</b> Buffer[a](put: Chan[a], get: Chan[a]): unit = {
 *   <b>def</b> B0: unit = choice ( put * { x => B1(x) } );
 *   <b>def</b> B1(x: a): unit = choice ( get(x) * B0, put * { y => B2(x, y) } )
 *   <b>def</b> B2(x: a, y: a): unit = choice ( get(x) * B1(y) )
 *   B0
 * }
 * </pre>
 *
 * @see <a href="http://scala.epfl.ch/docu/related.html">PiLib: A Hosted Language for Pi-Calculus Style Concurrency</a>
 * @author  Vincent Cremet, Martin Odersky
 * @version 1.0
 */
object pilib {

  /////////////////////////// SPAWN //////////////////////////////

  /**
   * Run several processes in parallel using the following syntax:
   * <code>spawn &lt; p_1 | ... | p_n &gt;</code>
   */
  abstract class Spawn {
    def <(p: => unit): Spawn
    def |(p: => unit): Spawn
    def > : unit
  }
  val spawn = new Spawn {
  //object spawn extends Spawn { // BUG !
    def <(p: => unit): Spawn = { scala.concurrent.ops.spawn(p); this }
    def |(p: => unit): Spawn = { scala.concurrent.ops.spawn(p); this }
    def > : unit = ()
  }

  //////////////////////// GUARDED PROCESSES /////////////////////////

  /** Untyped channel. */
  class UChan {
    /** Default log function. */
    var log = (x: Any) => ()
  }

  /** An untyped guarded process.
   *
   *  @param n         channel name
   *  @param polarity  input (true) or output (false)
   *  @param v         transmitted value
   *  @param c         continuation
   */
  case class UGP(n: UChan, polarity: boolean, v: Any, c: Any => Any)

  /** Typed guarded process. */
  class GP[a](n: UChan, polarity: boolean, v: Any, c: Any => a) {
    val untyped = UGP(n, polarity, v, c)
  }

  ////////////////////////// CHANNELS //////////////////////////////

  /**
   * Name on which one can emit, receive or that can be emitted or received
   * during a communication.
   */
  class Chan[a] extends UChan with Function1[a, Product[a]] {

    var defaultValue: a = _

    /** Creates an input guarded process. */
    def input[b](c: a => b) =
      new GP(this, true, (), x => c(x.asInstanceOf[a]))

    /** Creates an input guarded process. */
    def output[b](v: a, c: () => b) =
      new GP(this, false, v, x => c())

    /** Blocking read. */
    def read = {
      var res: a = defaultValue
      choice ( input(x => res = x) )
      res
    }

    /** Blocking write. */
    def write(x: a) =
      choice ( output(x, () => ()) )

    /** Syntactic sugar for input. */
    def *[b](f: a => b) =
      input(f);

    /** Syntactic sugar for output. */
    def apply(v: a) =
      new Product(this, v)

    /** Attach a function to be evaluated at each communication event
     *  on this channel. Replace previous attached function.
     */
    def attach(f: a => unit) =
      log = x => f(x.asInstanceOf[a])
  }

  class Product[a](c: Chan[a], v: a) {
    def *[b](f: => b) = c.output(v, () => f)
  }

  //////////////////// SUM OF GUARDED PROCESSES //////////////////////

  case class Sum(gs: List[UGP]) {

    /** Continuation of the sum. */
    var cont: () => Any = _

    var initialized = false

    /** Block if not initialized otherwise continue with the
     *  continuation.
     */
    def continue = synchronized {
      if (!initialized) wait()
      cont()
    }

    /** Set the values of parameters and awake the sleeping sum.
     *
     *  @param f ...
     */
    def set(f: () => Any) = synchronized {
      cont = f
      initialized = true
      notify()
    }
  }

  /////////////////////////// COMMUNICATION  //////////////////////////

  private var sums: List[Sum] = Nil

  /** Test if two lists of guarded processes can communicate.
   *
   *  @param gs1 ...
   *  @param gs2 ...
   *  @return    ...
   */
  private def matches(gs1: List[UGP], gs2: List[UGP]): Option[Triple[() => unit, () => Any, () => Any]] =
    Pair(gs1, gs2) match {
      case Pair(Nil, _) => None
      case Pair(_, Nil) => None
      case Pair(UGP(a1, d1, v1, c1) :: rest1, UGP(a2, d2, v2, c2) :: rest2) =>
        if (a1 == a2 && d1 == !d2)
          Some(Triple(() => if (d1) a1.log(v2) else a1.log(v1), () => c1(v2), () => c2(v1)))
        else matches(gs1, rest2) match {
          case None => matches(rest1, gs2)
          case Some(t) => Some(t)
        }
    }

  /** Test if the given sum can react with one of the pending sums.
   *  If yes then do the reaction otherwise append the sum at the end
   *  of the pending sums.
   *
   *  @param s1 ...
   *  @param ss ...
   *  @return   ...
   */
  private def compare(s1: Sum, ss: List[Sum]): List[Sum] =
    ss match {
      case Nil => ss ::: List(s1)
      case s2 :: rest => matches(s1.gs, s2.gs) match {
        case None => s2 :: compare(s1, rest)
        case Some(Triple(log, c1, c2)) => {
          log()
          s1.set(c1)
          s2.set(c2)
          rest
        }
      }
    }

  /** Pi-calculus non-deterministic choice.
   *
   *  @param s ...
   *  @return  ...
   */
  def choice[a](s: GP[a]*): a = {
    val sum = Sum(s.toList map { x => x.untyped })
    synchronized { sums = compare(sum, sums) }
    (sum.continue).asInstanceOf[a]
  }

}