summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/Tuple2.scala2
-rw-r--r--src/library/scala/collection/immutable/RedBlack.scala7
-rw-r--r--src/library/scala/collection/immutable/RedBlackTree.scala485
-rw-r--r--src/library/scala/collection/immutable/TreeMap.scala101
-rw-r--r--src/library/scala/collection/immutable/TreeSet.scala91
-rw-r--r--src/library/scala/collection/mutable/BasicNode.java2
-rw-r--r--src/library/scala/collection/mutable/CNodeBase.java35
-rw-r--r--src/library/scala/collection/mutable/Ctrie.scala110
-rw-r--r--src/library/scala/collection/mutable/HashTable.scala32
-rw-r--r--src/library/scala/collection/mutable/MainNode.java4
-rw-r--r--src/library/scala/collection/parallel/mutable/ParCtrie.scala57
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala10
-rw-r--r--src/library/scala/concurrent/Awaitable.scala24
-rw-r--r--src/library/scala/concurrent/Channel.scala5
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala103
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala11
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala132
-rw-r--r--src/library/scala/concurrent/Future.scala492
-rw-r--r--src/library/scala/concurrent/FutureTaskRunner.scala1
-rw-r--r--src/library/scala/concurrent/JavaConversions.scala7
-rw-r--r--src/library/scala/concurrent/ManagedBlocker.scala1
-rw-r--r--src/library/scala/concurrent/Promise.scala132
-rw-r--r--src/library/scala/concurrent/Scheduler.scala54
-rw-r--r--src/library/scala/concurrent/Task.scala13
-rw-r--r--src/library/scala/concurrent/TaskRunner.scala1
-rw-r--r--src/library/scala/concurrent/TaskRunners.scala1
-rw-r--r--src/library/scala/concurrent/ThreadPoolRunner.scala1
-rw-r--r--src/library/scala/concurrent/default/SchedulerImpl.scala.disabled44
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala.disabled313
-rw-r--r--src/library/scala/concurrent/impl/AbstractPromise.java21
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala134
-rw-r--r--src/library/scala/concurrent/impl/Future.scala89
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala252
-rw-r--r--src/library/scala/concurrent/ops.scala1
-rw-r--r--src/library/scala/concurrent/package.scala57
-rw-r--r--src/library/scala/concurrent/package.scala.disabled108
-rw-r--r--src/library/scala/package.scala6
-rw-r--r--src/library/scala/reflect/api/Trees.scala5
-rw-r--r--src/library/scala/sys/process/BasicIO.scala128
-rw-r--r--src/library/scala/sys/process/Process.scala67
-rw-r--r--src/library/scala/sys/process/ProcessBuilder.scala306
-rw-r--r--src/library/scala/sys/process/ProcessIO.scala49
-rw-r--r--src/library/scala/sys/process/ProcessLogger.scala26
-rw-r--r--src/library/scala/sys/process/package.scala212
-rw-r--r--src/library/scala/util/Duration.scala485
-rw-r--r--src/library/scala/util/Timeout.scala33
-rw-r--r--src/library/scala/util/Try.scala165
47 files changed, 4035 insertions, 380 deletions
diff --git a/src/library/scala/Tuple2.scala b/src/library/scala/Tuple2.scala
index ad3f7df697..b1befca4fa 100644
--- a/src/library/scala/Tuple2.scala
+++ b/src/library/scala/Tuple2.scala
@@ -19,7 +19,7 @@ import scala.collection.generic.{ CanBuildFrom => CBF }
* @param _1 Element 1 of this Tuple2
* @param _2 Element 2 of this Tuple2
*/
-case class Tuple2[@specialized(Int, Long, Double) +T1, @specialized(Int, Long, Double) +T2](_1: T1, _2: T2)
+case class Tuple2[@specialized(Int, Long, Double, Char, Boolean, AnyRef) +T1, @specialized(Int, Long, Double, Char, Boolean, AnyRef) +T2](_1: T1, _2: T2)
extends Product2[T1, T2]
{
override def toString() = "(" + _1 + "," + _2 + ")"
diff --git a/src/library/scala/collection/immutable/RedBlack.scala b/src/library/scala/collection/immutable/RedBlack.scala
index 9906c9896e..83eeaa45ee 100644
--- a/src/library/scala/collection/immutable/RedBlack.scala
+++ b/src/library/scala/collection/immutable/RedBlack.scala
@@ -11,10 +11,13 @@
package scala.collection
package immutable
-/** A base class containing the implementations for `TreeMaps` and `TreeSets`.
+/** Old base class that was used by previous implementations of `TreeMaps` and `TreeSets`.
+ *
+ * Deprecated due to various performance bugs (see [[https://issues.scala-lang.org/browse/SI-5331 SI-5331]] for more information).
*
* @since 2.3
*/
+@deprecated("use `TreeMap` or `TreeSet` instead", "2.10")
@SerialVersionUID(8691885935445612921L)
abstract class RedBlack[A] extends Serializable {
@@ -287,5 +290,3 @@ abstract class RedBlack[A] extends Serializable {
def isBlack = true
}
}
-
-
diff --git a/src/library/scala/collection/immutable/RedBlackTree.scala b/src/library/scala/collection/immutable/RedBlackTree.scala
new file mode 100644
index 0000000000..0f28c4997b
--- /dev/null
+++ b/src/library/scala/collection/immutable/RedBlackTree.scala
@@ -0,0 +1,485 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+
+
+package scala.collection
+package immutable
+
+import annotation.tailrec
+import annotation.meta.getter
+
+/** An object containing the RedBlack tree implementation used by for `TreeMaps` and `TreeSets`.
+ *
+ * Implementation note: since efficiency is important for data structures this implementation
+ * uses <code>null</code> to represent empty trees. This also means pattern matching cannot
+ * easily be used. The API represented by the RedBlackTree object tries to hide these
+ * optimizations behind a reasonably clean API.
+ *
+ * @since 2.10
+ */
+private[immutable]
+object RedBlackTree {
+
+ def isEmpty(tree: Tree[_, _]): Boolean = tree eq null
+
+ def contains[A](tree: Tree[A, _], x: A)(implicit ordering: Ordering[A]): Boolean = lookup(tree, x) ne null
+ def get[A, B](tree: Tree[A, B], x: A)(implicit ordering: Ordering[A]): Option[B] = lookup(tree, x) match {
+ case null => None
+ case tree => Some(tree.value)
+ }
+
+ @tailrec
+ def lookup[A, B](tree: Tree[A, B], x: A)(implicit ordering: Ordering[A]): Tree[A, B] = if (tree eq null) null else {
+ val cmp = ordering.compare(x, tree.key)
+ if (cmp < 0) lookup(tree.left, x)
+ else if (cmp > 0) lookup(tree.right, x)
+ else tree
+ }
+
+ def count(tree: Tree[_, _]) = if (tree eq null) 0 else tree.count
+ def update[A, B, B1 >: B](tree: Tree[A, B], k: A, v: B1)(implicit ordering: Ordering[A]): Tree[A, B1] = blacken(upd(tree, k, v))
+ def delete[A, B](tree: Tree[A, B], k: A)(implicit ordering: Ordering[A]): Tree[A, B] = blacken(del(tree, k))
+ def rangeImpl[A: Ordering, B](tree: Tree[A, B], from: Option[A], until: Option[A]): Tree[A, B] = (from, until) match {
+ case (Some(from), Some(until)) => this.range(tree, from, until)
+ case (Some(from), None) => this.from(tree, from)
+ case (None, Some(until)) => this.until(tree, until)
+ case (None, None) => tree
+ }
+ def range[A: Ordering, B](tree: Tree[A, B], from: A, until: A): Tree[A, B] = blacken(doRange(tree, from, until))
+ def from[A: Ordering, B](tree: Tree[A, B], from: A): Tree[A, B] = blacken(doFrom(tree, from))
+ def to[A: Ordering, B](tree: Tree[A, B], to: A): Tree[A, B] = blacken(doTo(tree, to))
+ def until[A: Ordering, B](tree: Tree[A, B], key: A): Tree[A, B] = blacken(doUntil(tree, key))
+
+ def drop[A: Ordering, B](tree: Tree[A, B], n: Int): Tree[A, B] = blacken(doDrop(tree, n))
+ def take[A: Ordering, B](tree: Tree[A, B], n: Int): Tree[A, B] = blacken(doTake(tree, n))
+ def slice[A: Ordering, B](tree: Tree[A, B], from: Int, until: Int): Tree[A, B] = blacken(doSlice(tree, from, until))
+
+ def smallest[A, B](tree: Tree[A, B]): Tree[A, B] = {
+ if (tree eq null) throw new NoSuchElementException("empty map")
+ var result = tree
+ while (result.left ne null) result = result.left
+ result
+ }
+ def greatest[A, B](tree: Tree[A, B]): Tree[A, B] = {
+ if (tree eq null) throw new NoSuchElementException("empty map")
+ var result = tree
+ while (result.right ne null) result = result.right
+ result
+ }
+
+ def foreach[A, B, U](tree: Tree[A, B], f: ((A, B)) => U): Unit = if (tree ne null) {
+ if (tree.left ne null) foreach(tree.left, f)
+ f((tree.key, tree.value))
+ if (tree.right ne null) foreach(tree.right, f)
+ }
+ def foreachKey[A, U](tree: Tree[A, _], f: A => U): Unit = if (tree ne null) {
+ if (tree.left ne null) foreachKey(tree.left, f)
+ f(tree.key)
+ if (tree.right ne null) foreachKey(tree.right, f)
+ }
+
+ def iterator[A, B](tree: Tree[A, B]): Iterator[(A, B)] = new EntriesIterator(tree)
+ def keysIterator[A, _](tree: Tree[A, _]): Iterator[A] = new KeysIterator(tree)
+ def valuesIterator[_, B](tree: Tree[_, B]): Iterator[B] = new ValuesIterator(tree)
+
+ @tailrec
+ def nth[A, B](tree: Tree[A, B], n: Int): Tree[A, B] = {
+ val count = this.count(tree.left)
+ if (n < count) nth(tree.left, n)
+ else if (n > count) nth(tree.right, n - count - 1)
+ else tree
+ }
+
+ def isBlack(tree: Tree[_, _]) = (tree eq null) || isBlackTree(tree)
+
+ private[this] def isRedTree(tree: Tree[_, _]) = tree.isInstanceOf[RedTree[_, _]]
+ private[this] def isBlackTree(tree: Tree[_, _]) = tree.isInstanceOf[BlackTree[_, _]]
+
+ private[this] def blacken[A, B](t: Tree[A, B]): Tree[A, B] = if (t eq null) null else t.black
+
+ private[this] def mkTree[A, B](isBlack: Boolean, k: A, v: B, l: Tree[A, B], r: Tree[A, B]) =
+ if (isBlack) BlackTree(k, v, l, r) else RedTree(k, v, l, r)
+
+ private[this] def balanceLeft[A, B, B1 >: B](isBlack: Boolean, z: A, zv: B, l: Tree[A, B1], d: Tree[A, B1]): Tree[A, B1] = {
+ if (isRedTree(l) && isRedTree(l.left))
+ RedTree(l.key, l.value, BlackTree(l.left.key, l.left.value, l.left.left, l.left.right), BlackTree(z, zv, l.right, d))
+ else if (isRedTree(l) && isRedTree(l.right))
+ RedTree(l.right.key, l.right.value, BlackTree(l.key, l.value, l.left, l.right.left), BlackTree(z, zv, l.right.right, d))
+ else
+ mkTree(isBlack, z, zv, l, d)
+ }
+ private[this] def balanceRight[A, B, B1 >: B](isBlack: Boolean, x: A, xv: B, a: Tree[A, B1], r: Tree[A, B1]): Tree[A, B1] = {
+ if (isRedTree(r) && isRedTree(r.left))
+ RedTree(r.left.key, r.left.value, BlackTree(x, xv, a, r.left.left), BlackTree(r.key, r.value, r.left.right, r.right))
+ else if (isRedTree(r) && isRedTree(r.right))
+ RedTree(r.key, r.value, BlackTree(x, xv, a, r.left), BlackTree(r.right.key, r.right.value, r.right.left, r.right.right))
+ else
+ mkTree(isBlack, x, xv, a, r)
+ }
+ private[this] def upd[A, B, B1 >: B](tree: Tree[A, B], k: A, v: B1)(implicit ordering: Ordering[A]): Tree[A, B1] = if (tree eq null) {
+ RedTree(k, v, null, null)
+ } else {
+ val cmp = ordering.compare(k, tree.key)
+ if (cmp < 0) balanceLeft(isBlackTree(tree), tree.key, tree.value, upd(tree.left, k, v), tree.right)
+ else if (cmp > 0) balanceRight(isBlackTree(tree), tree.key, tree.value, tree.left, upd(tree.right, k, v))
+ else mkTree(isBlackTree(tree), k, v, tree.left, tree.right)
+ }
+
+ // Based on Stefan Kahrs' Haskell version of Okasaki's Red&Black Trees
+ // http://www.cse.unsw.edu.au/~dons/data/RedBlackTree.html
+ private[this] def del[A, B](tree: Tree[A, B], k: A)(implicit ordering: Ordering[A]): Tree[A, B] = if (tree eq null) null else {
+ def balance(x: A, xv: B, tl: Tree[A, B], tr: Tree[A, B]) = if (isRedTree(tl)) {
+ if (isRedTree(tr)) {
+ RedTree(x, xv, tl.black, tr.black)
+ } else if (isRedTree(tl.left)) {
+ RedTree(tl.key, tl.value, tl.left.black, BlackTree(x, xv, tl.right, tr))
+ } else if (isRedTree(tl.right)) {
+ RedTree(tl.right.key, tl.right.value, BlackTree(tl.key, tl.value, tl.left, tl.right.left), BlackTree(x, xv, tl.right.right, tr))
+ } else {
+ BlackTree(x, xv, tl, tr)
+ }
+ } else if (isRedTree(tr)) {
+ if (isRedTree(tr.right)) {
+ RedTree(tr.key, tr.value, BlackTree(x, xv, tl, tr.left), tr.right.black)
+ } else if (isRedTree(tr.left)) {
+ RedTree(tr.left.key, tr.left.value, BlackTree(x, xv, tl, tr.left.left), BlackTree(tr.key, tr.value, tr.left.right, tr.right))
+ } else {
+ BlackTree(x, xv, tl, tr)
+ }
+ } else {
+ BlackTree(x, xv, tl, tr)
+ }
+ def subl(t: Tree[A, B]) =
+ if (t.isInstanceOf[BlackTree[_, _]]) t.red
+ else sys.error("Defect: invariance violation; expected black, got "+t)
+
+ def balLeft(x: A, xv: B, tl: Tree[A, B], tr: Tree[A, B]) = if (isRedTree(tl)) {
+ RedTree(x, xv, tl.black, tr)
+ } else if (isBlackTree(tr)) {
+ balance(x, xv, tl, tr.red)
+ } else if (isRedTree(tr) && isBlackTree(tr.left)) {
+ RedTree(tr.left.key, tr.left.value, BlackTree(x, xv, tl, tr.left.left), balance(tr.key, tr.value, tr.left.right, subl(tr.right)))
+ } else {
+ sys.error("Defect: invariance violation")
+ }
+ def balRight(x: A, xv: B, tl: Tree[A, B], tr: Tree[A, B]) = if (isRedTree(tr)) {
+ RedTree(x, xv, tl, tr.black)
+ } else if (isBlackTree(tl)) {
+ balance(x, xv, tl.red, tr)
+ } else if (isRedTree(tl) && isBlackTree(tl.right)) {
+ RedTree(tl.right.key, tl.right.value, balance(tl.key, tl.value, subl(tl.left), tl.right.left), BlackTree(x, xv, tl.right.right, tr))
+ } else {
+ sys.error("Defect: invariance violation")
+ }
+ def delLeft = if (isBlackTree(tree.left)) balLeft(tree.key, tree.value, del(tree.left, k), tree.right) else RedTree(tree.key, tree.value, del(tree.left, k), tree.right)
+ def delRight = if (isBlackTree(tree.right)) balRight(tree.key, tree.value, tree.left, del(tree.right, k)) else RedTree(tree.key, tree.value, tree.left, del(tree.right, k))
+ def append(tl: Tree[A, B], tr: Tree[A, B]): Tree[A, B] = if (tl eq null) {
+ tr
+ } else if (tr eq null) {
+ tl
+ } else if (isRedTree(tl) && isRedTree(tr)) {
+ val bc = append(tl.right, tr.left)
+ if (isRedTree(bc)) {
+ RedTree(bc.key, bc.value, RedTree(tl.key, tl.value, tl.left, bc.left), RedTree(tr.key, tr.value, bc.right, tr.right))
+ } else {
+ RedTree(tl.key, tl.value, tl.left, RedTree(tr.key, tr.value, bc, tr.right))
+ }
+ } else if (isBlackTree(tl) && isBlackTree(tr)) {
+ val bc = append(tl.right, tr.left)
+ if (isRedTree(bc)) {
+ RedTree(bc.key, bc.value, BlackTree(tl.key, tl.value, tl.left, bc.left), BlackTree(tr.key, tr.value, bc.right, tr.right))
+ } else {
+ balLeft(tl.key, tl.value, tl.left, BlackTree(tr.key, tr.value, bc, tr.right))
+ }
+ } else if (isRedTree(tr)) {
+ RedTree(tr.key, tr.value, append(tl, tr.left), tr.right)
+ } else if (isRedTree(tl)) {
+ RedTree(tl.key, tl.value, tl.left, append(tl.right, tr))
+ } else {
+ sys.error("unmatched tree on append: " + tl + ", " + tr)
+ }
+
+ val cmp = ordering.compare(k, tree.key)
+ if (cmp < 0) delLeft
+ else if (cmp > 0) delRight
+ else append(tree.left, tree.right)
+ }
+
+ private[this] def doFrom[A, B](tree: Tree[A, B], from: A)(implicit ordering: Ordering[A]): Tree[A, B] = {
+ if (tree eq null) return null
+ if (ordering.lt(tree.key, from)) return doFrom(tree.right, from)
+ val newLeft = doFrom(tree.left, from)
+ if (newLeft eq tree.left) tree
+ else if (newLeft eq null) upd(tree.right, tree.key, tree.value)
+ else rebalance(tree, newLeft, tree.right)
+ }
+ private[this] def doTo[A, B](tree: Tree[A, B], to: A)(implicit ordering: Ordering[A]): Tree[A, B] = {
+ if (tree eq null) return null
+ if (ordering.lt(to, tree.key)) return doTo(tree.left, to)
+ val newRight = doTo(tree.right, to)
+ if (newRight eq tree.right) tree
+ else if (newRight eq null) upd(tree.left, tree.key, tree.value)
+ else rebalance(tree, tree.left, newRight)
+ }
+ private[this] def doUntil[A, B](tree: Tree[A, B], until: A)(implicit ordering: Ordering[A]): Tree[A, B] = {
+ if (tree eq null) return null
+ if (ordering.lteq(until, tree.key)) return doUntil(tree.left, until)
+ val newRight = doUntil(tree.right, until)
+ if (newRight eq tree.right) tree
+ else if (newRight eq null) upd(tree.left, tree.key, tree.value)
+ else rebalance(tree, tree.left, newRight)
+ }
+ private[this] def doRange[A, B](tree: Tree[A, B], from: A, until: A)(implicit ordering: Ordering[A]): Tree[A, B] = {
+ if (tree eq null) return null
+ if (ordering.lt(tree.key, from)) return doRange(tree.right, from, until);
+ if (ordering.lteq(until, tree.key)) return doRange(tree.left, from, until);
+ val newLeft = doFrom(tree.left, from)
+ val newRight = doUntil(tree.right, until)
+ if ((newLeft eq tree.left) && (newRight eq tree.right)) tree
+ else if (newLeft eq null) upd(newRight, tree.key, tree.value);
+ else if (newRight eq null) upd(newLeft, tree.key, tree.value);
+ else rebalance(tree, newLeft, newRight)
+ }
+
+ private[this] def doDrop[A: Ordering, B](tree: Tree[A, B], n: Int): Tree[A, B] = {
+ if (n <= 0) return tree
+ if (n >= this.count(tree)) return null
+ val count = this.count(tree.left)
+ if (n > count) return doDrop(tree.right, n - count - 1)
+ val newLeft = doDrop(tree.left, n)
+ if (newLeft eq tree.left) tree
+ else if (newLeft eq null) upd(tree.right, tree.key, tree.value)
+ else rebalance(tree, newLeft, tree.right)
+ }
+ private[this] def doTake[A: Ordering, B](tree: Tree[A, B], n: Int): Tree[A, B] = {
+ if (n <= 0) return null
+ if (n >= this.count(tree)) return tree
+ val count = this.count(tree.left)
+ if (n <= count) return doTake(tree.left, n)
+ val newRight = doTake(tree.right, n - count - 1)
+ if (newRight eq tree.right) tree
+ else if (newRight eq null) upd(tree.left, tree.key, tree.value)
+ else rebalance(tree, tree.left, newRight)
+ }
+ private[this] def doSlice[A: Ordering, B](tree: Tree[A, B], from: Int, until: Int): Tree[A, B] = {
+ if (tree eq null) return null
+ val count = this.count(tree.left)
+ if (from > count) return doSlice(tree.right, from - count - 1, until - count - 1)
+ if (until <= count) return doSlice(tree.left, from, until)
+ val newLeft = doDrop(tree.left, from)
+ val newRight = doTake(tree.right, until - count - 1)
+ if ((newLeft eq tree.left) && (newRight eq tree.right)) tree
+ else if (newLeft eq null) upd(newRight, tree.key, tree.value)
+ else if (newRight eq null) upd(newLeft, tree.key, tree.value)
+ else rebalance(tree, newLeft, newRight)
+ }
+
+ // The zipper returned might have been traversed left-most (always the left child)
+ // or right-most (always the right child). Left trees are traversed right-most,
+ // and right trees are traversed leftmost.
+
+ // Returns the zipper for the side with deepest black nodes depth, a flag
+ // indicating whether the trees were unbalanced at all, and a flag indicating
+ // whether the zipper was traversed left-most or right-most.
+
+ // If the trees were balanced, returns an empty zipper
+ private[this] def compareDepth[A, B](left: Tree[A, B], right: Tree[A, B]): (List[Tree[A, B]], Boolean, Boolean, Int) = {
+ // Once a side is found to be deeper, unzip it to the bottom
+ def unzip(zipper: List[Tree[A, B]], leftMost: Boolean): List[Tree[A, B]] = {
+ val next = if (leftMost) zipper.head.left else zipper.head.right
+ next match {
+ case null => zipper
+ case node => unzip(node :: zipper, leftMost)
+ }
+ }
+
+ // Unzip left tree on the rightmost side and right tree on the leftmost side until one is
+ // found to be deeper, or the bottom is reached
+ def unzipBoth(left: Tree[A, B],
+ right: Tree[A, B],
+ leftZipper: List[Tree[A, B]],
+ rightZipper: List[Tree[A, B]],
+ smallerDepth: Int): (List[Tree[A, B]], Boolean, Boolean, Int) = {
+ if (isBlackTree(left) && isBlackTree(right)) {
+ unzipBoth(left.right, right.left, left :: leftZipper, right :: rightZipper, smallerDepth + 1)
+ } else if (isRedTree(left) && isRedTree(right)) {
+ unzipBoth(left.right, right.left, left :: leftZipper, right :: rightZipper, smallerDepth)
+ } else if (isRedTree(right)) {
+ unzipBoth(left, right.left, leftZipper, right :: rightZipper, smallerDepth)
+ } else if (isRedTree(left)) {
+ unzipBoth(left.right, right, left :: leftZipper, rightZipper, smallerDepth)
+ } else if ((left eq null) && (right eq null)) {
+ (Nil, true, false, smallerDepth)
+ } else if ((left eq null) && isBlackTree(right)) {
+ val leftMost = true
+ (unzip(right :: rightZipper, leftMost), false, leftMost, smallerDepth)
+ } else if (isBlackTree(left) && (right eq null)) {
+ val leftMost = false
+ (unzip(left :: leftZipper, leftMost), false, leftMost, smallerDepth)
+ } else {
+ sys.error("unmatched trees in unzip: " + left + ", " + right)
+ }
+ }
+ unzipBoth(left, right, Nil, Nil, 0)
+ }
+
+ private[this] def rebalance[A, B](tree: Tree[A, B], newLeft: Tree[A, B], newRight: Tree[A, B]) = {
+ // This is like drop(n-1), but only counting black nodes
+ def findDepth(zipper: List[Tree[A, B]], depth: Int): List[Tree[A, B]] = zipper match {
+ case head :: tail if isBlackTree(head) =>
+ if (depth == 1) zipper else findDepth(tail, depth - 1)
+ case _ :: tail => findDepth(tail, depth)
+ case Nil => sys.error("Defect: unexpected empty zipper while computing range")
+ }
+
+ // Blackening the smaller tree avoids balancing problems on union;
+ // this can't be done later, though, or it would change the result of compareDepth
+ val blkNewLeft = blacken(newLeft)
+ val blkNewRight = blacken(newRight)
+ val (zipper, levelled, leftMost, smallerDepth) = compareDepth(blkNewLeft, blkNewRight)
+
+ if (levelled) {
+ BlackTree(tree.key, tree.value, blkNewLeft, blkNewRight)
+ } else {
+ val zipFrom = findDepth(zipper, smallerDepth)
+ val union = if (leftMost) {
+ RedTree(tree.key, tree.value, blkNewLeft, zipFrom.head)
+ } else {
+ RedTree(tree.key, tree.value, zipFrom.head, blkNewRight)
+ }
+ val zippedTree = zipFrom.tail.foldLeft(union: Tree[A, B]) { (tree, node) =>
+ if (leftMost)
+ balanceLeft(isBlackTree(node), node.key, node.value, tree, node.right)
+ else
+ balanceRight(isBlackTree(node), node.key, node.value, node.left, tree)
+ }
+ zippedTree
+ }
+ }
+
+ /*
+ * Forcing direct fields access using the @inline annotation helps speed up
+ * various operations (especially smallest/greatest and update/delete).
+ *
+ * Unfortunately the direct field access is not guaranteed to work (but
+ * works on the current implementation of the Scala compiler).
+ *
+ * An alternative is to implement the these classes using plain old Java code...
+ */
+ sealed abstract class Tree[A, +B](
+ @(inline @getter) final val key: A,
+ @(inline @getter) final val value: B,
+ @(inline @getter) final val left: Tree[A, B],
+ @(inline @getter) final val right: Tree[A, B])
+ extends Serializable {
+ final val count: Int = 1 + RedBlackTree.count(left) + RedBlackTree.count(right)
+ def black: Tree[A, B]
+ def red: Tree[A, B]
+ }
+ final class RedTree[A, +B](key: A,
+ value: B,
+ left: Tree[A, B],
+ right: Tree[A, B]) extends Tree[A, B](key, value, left, right) {
+ override def black: Tree[A, B] = BlackTree(key, value, left, right)
+ override def red: Tree[A, B] = this
+ override def toString: String = "RedTree(" + key + ", " + value + ", " + left + ", " + right + ")"
+ }
+ final class BlackTree[A, +B](key: A,
+ value: B,
+ left: Tree[A, B],
+ right: Tree[A, B]) extends Tree[A, B](key, value, left, right) {
+ override def black: Tree[A, B] = this
+ override def red: Tree[A, B] = RedTree(key, value, left, right)
+ override def toString: String = "BlackTree(" + key + ", " + value + ", " + left + ", " + right + ")"
+ }
+
+ object RedTree {
+ @inline def apply[A, B](key: A, value: B, left: Tree[A, B], right: Tree[A, B]) = new RedTree(key, value, left, right)
+ def unapply[A, B](t: RedTree[A, B]) = Some((t.key, t.value, t.left, t.right))
+ }
+ object BlackTree {
+ @inline def apply[A, B](key: A, value: B, left: Tree[A, B], right: Tree[A, B]) = new BlackTree(key, value, left, right)
+ def unapply[A, B](t: BlackTree[A, B]) = Some((t.key, t.value, t.left, t.right))
+ }
+
+ private[this] abstract class TreeIterator[A, B, R](tree: Tree[A, B]) extends Iterator[R] {
+ protected[this] def nextResult(tree: Tree[A, B]): R
+
+ override def hasNext: Boolean = next ne null
+
+ override def next: R = next match {
+ case null =>
+ throw new NoSuchElementException("next on empty iterator")
+ case tree =>
+ next = findNext(tree.right)
+ nextResult(tree)
+ }
+
+ @tailrec
+ private[this] def findNext(tree: Tree[A, B]): Tree[A, B] = {
+ if (tree eq null) popPath()
+ else if (tree.left eq null) tree
+ else {
+ pushPath(tree)
+ findNext(tree.left)
+ }
+ }
+
+ private[this] def pushPath(tree: Tree[A, B]) {
+ try {
+ path(index) = tree
+ index += 1
+ } catch {
+ case _: ArrayIndexOutOfBoundsException =>
+ /*
+ * Either the tree became unbalanced or we calculated the maximum height incorrectly.
+ * To avoid crashing the iterator we expand the path array. Obviously this should never
+ * happen...
+ *
+ * An exception handler is used instead of an if-condition to optimize the normal path.
+ * This makes a large difference in iteration speed!
+ */
+ assert(index >= path.length)
+ path :+= null
+ pushPath(tree)
+ }
+ }
+ private[this] def popPath(): Tree[A, B] = if (index == 0) null else {
+ index -= 1
+ path(index)
+ }
+
+ private[this] var path = if (tree eq null) null else {
+ /*
+ * According to "Ralf Hinze. Constructing red-black trees" [http://www.cs.ox.ac.uk/ralf.hinze/publications/#P5]
+ * the maximum height of a red-black tree is 2*log_2(n + 2) - 2.
+ *
+ * According to {@see Integer#numberOfLeadingZeros} ceil(log_2(n)) = (32 - Integer.numberOfLeadingZeros(n - 1))
+ *
+ * We also don't store the deepest nodes in the path so the maximum path length is further reduced by one.
+ */
+ val maximumHeight = 2 * (32 - Integer.numberOfLeadingZeros(tree.count + 2 - 1)) - 2 - 1
+ new Array[Tree[A, B]](maximumHeight)
+ }
+ private[this] var index = 0
+ private[this] var next: Tree[A, B] = findNext(tree)
+ }
+
+ private[this] class EntriesIterator[A, B](tree: Tree[A, B]) extends TreeIterator[A, B, (A, B)](tree) {
+ override def nextResult(tree: Tree[A, B]) = (tree.key, tree.value)
+ }
+
+ private[this] class KeysIterator[A, B](tree: Tree[A, B]) extends TreeIterator[A, B, A](tree) {
+ override def nextResult(tree: Tree[A, B]) = tree.key
+ }
+
+ private[this] class ValuesIterator[A, B](tree: Tree[A, B]) extends TreeIterator[A, B, B](tree) {
+ override def nextResult(tree: Tree[A, B]) = tree.value
+ }
+}
diff --git a/src/library/scala/collection/immutable/TreeMap.scala b/src/library/scala/collection/immutable/TreeMap.scala
index ef0eac3701..dc4f79be35 100644
--- a/src/library/scala/collection/immutable/TreeMap.scala
+++ b/src/library/scala/collection/immutable/TreeMap.scala
@@ -12,6 +12,7 @@ package scala.collection
package immutable
import generic._
+import immutable.{RedBlackTree => RB}
import mutable.Builder
import annotation.bridge
@@ -23,7 +24,6 @@ object TreeMap extends ImmutableSortedMapFactory[TreeMap] {
def empty[A, B](implicit ord: Ordering[A]) = new TreeMap[A, B]()(ord)
/** $sortedMapCanBuildFromInfo */
implicit def canBuildFrom[A, B](implicit ord: Ordering[A]): CanBuildFrom[Coll, (A, B), TreeMap[A, B]] = new SortedMapCanBuildFrom[A, B]
- private def make[A, B](s: Int, t: RedBlack[A]#Tree[B])(implicit ord: Ordering[A]) = new TreeMap[A, B](s, t)(ord)
}
/** This class implements immutable maps using a tree.
@@ -46,31 +46,79 @@ object TreeMap extends ImmutableSortedMapFactory[TreeMap] {
* @define mayNotTerminateInf
* @define willNotTerminateInf
*/
-class TreeMap[A, +B](override val size: Int, t: RedBlack[A]#Tree[B])(implicit val ordering: Ordering[A])
- extends RedBlack[A]
- with SortedMap[A, B]
+class TreeMap[A, +B] private (tree: RB.Tree[A, B])(implicit val ordering: Ordering[A])
+ extends SortedMap[A, B]
with SortedMapLike[A, B, TreeMap[A, B]]
with MapLike[A, B, TreeMap[A, B]]
with Serializable {
+ @deprecated("use `ordering.lt` instead", "2.10")
def isSmaller(x: A, y: A) = ordering.lt(x, y)
override protected[this] def newBuilder : Builder[(A, B), TreeMap[A, B]] =
TreeMap.newBuilder[A, B]
- def this()(implicit ordering: Ordering[A]) = this(0, null)(ordering)
+ override def size = RB.count(tree)
- protected val tree: RedBlack[A]#Tree[B] = if (size == 0) Empty else t
+ def this()(implicit ordering: Ordering[A]) = this(null)(ordering)
- override def rangeImpl(from : Option[A], until : Option[A]): TreeMap[A,B] = {
- val ntree = tree.range(from,until)
- new TreeMap[A,B](ntree.count, ntree)
- }
+ override def rangeImpl(from: Option[A], until: Option[A]): TreeMap[A, B] = new TreeMap[A, B](RB.rangeImpl(tree, from, until))
+ override def range(from: A, until: A): TreeMap[A, B] = new TreeMap[A, B](RB.range(tree, from, until))
+ override def from(from: A): TreeMap[A, B] = new TreeMap[A, B](RB.from(tree, from))
+ override def to(to: A): TreeMap[A, B] = new TreeMap[A, B](RB.to(tree, to))
+ override def until(until: A): TreeMap[A, B] = new TreeMap[A, B](RB.until(tree, until))
- override def firstKey = t.first
- override def lastKey = t.last
+ override def firstKey = RB.smallest(tree).key
+ override def lastKey = RB.greatest(tree).key
override def compare(k0: A, k1: A): Int = ordering.compare(k0, k1)
+ override def head = {
+ val smallest = RB.smallest(tree)
+ (smallest.key, smallest.value)
+ }
+ override def headOption = if (RB.isEmpty(tree)) None else Some(head)
+ override def last = {
+ val greatest = RB.greatest(tree)
+ (greatest.key, greatest.value)
+ }
+ override def lastOption = if (RB.isEmpty(tree)) None else Some(last)
+
+ override def tail = new TreeMap(RB.delete(tree, firstKey))
+ override def init = new TreeMap(RB.delete(tree, lastKey))
+
+ override def drop(n: Int) = {
+ if (n <= 0) this
+ else if (n >= size) empty
+ else new TreeMap(RB.drop(tree, n))
+ }
+
+ override def take(n: Int) = {
+ if (n <= 0) empty
+ else if (n >= size) this
+ else new TreeMap(RB.take(tree, n))
+ }
+
+ override def slice(from: Int, until: Int) = {
+ if (until <= from) empty
+ else if (from <= 0) take(until)
+ else if (until >= size) drop(from)
+ else new TreeMap(RB.slice(tree, from, until))
+ }
+
+ override def dropRight(n: Int) = take(size - n)
+ override def takeRight(n: Int) = drop(size - n)
+ override def splitAt(n: Int) = (take(n), drop(n))
+
+ private[this] def countWhile(p: ((A, B)) => Boolean): Int = {
+ var result = 0
+ val it = iterator
+ while (it.hasNext && p(it.next)) result += 1
+ result
+ }
+ override def dropWhile(p: ((A, B)) => Boolean) = drop(countWhile(p))
+ override def takeWhile(p: ((A, B)) => Boolean) = take(countWhile(p))
+ override def span(p: ((A, B)) => Boolean) = splitAt(countWhile(p))
+
/** A factory to create empty maps of the same type of keys.
*/
override def empty: TreeMap[A, B] = TreeMap.empty[A, B](ordering)
@@ -84,10 +132,7 @@ class TreeMap[A, +B](override val size: Int, t: RedBlack[A]#Tree[B])(implicit va
* @param value the value to be associated with `key`
* @return a new $coll with the updated binding
*/
- override def updated [B1 >: B](key: A, value: B1): TreeMap[A, B1] = {
- val newsize = if (tree.lookup(key).isEmpty) size + 1 else size
- TreeMap.make(newsize, tree.update(key, value))
- }
+ override def updated [B1 >: B](key: A, value: B1): TreeMap[A, B1] = new TreeMap(RB.update(tree, key, value))
/** Add a key/value pair to this map.
* @tparam B1 type of the value of the new binding, a supertype of `B`
@@ -128,14 +173,13 @@ class TreeMap[A, +B](override val size: Int, t: RedBlack[A]#Tree[B])(implicit va
* @return a new $coll with the inserted binding, if it wasn't present in the map
*/
def insert [B1 >: B](key: A, value: B1): TreeMap[A, B1] = {
- assert(tree.lookup(key).isEmpty)
- TreeMap.make(size + 1, tree.update(key, value))
+ assert(!RB.contains(tree, key))
+ new TreeMap(RB.update(tree, key, value))
}
def - (key:A): TreeMap[A, B] =
- if (tree.lookup(key).isEmpty) this
- else if (size == 1) empty
- else TreeMap.make(size - 1, tree.delete(key))
+ if (!RB.contains(tree, key)) this
+ else new TreeMap(RB.delete(tree, key))
/** Check if this map maps `key` to a value and return the
* value if it exists.
@@ -143,21 +187,22 @@ class TreeMap[A, +B](override val size: Int, t: RedBlack[A]#Tree[B])(implicit va
* @param key the key of the mapping of interest
* @return the value of the mapping, if it exists
*/
- override def get(key: A): Option[B] = tree.lookup(key) match {
- case n: NonEmpty[b] => Some(n.value)
- case _ => None
- }
+ override def get(key: A): Option[B] = RB.get(tree, key)
/** Creates a new iterator over all elements contained in this
* object.
*
* @return the new iterator
*/
- def iterator: Iterator[(A, B)] = tree.toStream.iterator
+ override def iterator: Iterator[(A, B)] = RB.iterator(tree)
+
+ override def keysIterator: Iterator[A] = RB.keysIterator(tree)
+ override def valuesIterator: Iterator[B] = RB.valuesIterator(tree)
- override def toStream: Stream[(A, B)] = tree.toStream
+ override def contains(key: A): Boolean = RB.contains(tree, key)
+ override def isDefinedAt(key: A): Boolean = RB.contains(tree, key)
- override def foreach[U](f : ((A,B)) => U) = tree foreach { case (x, y) => f(x, y) }
+ override def foreach[U](f : ((A,B)) => U) = RB.foreach(tree, f)
}
diff --git a/src/library/scala/collection/immutable/TreeSet.scala b/src/library/scala/collection/immutable/TreeSet.scala
index 8b90ece143..1b3d72ceb7 100644
--- a/src/library/scala/collection/immutable/TreeSet.scala
+++ b/src/library/scala/collection/immutable/TreeSet.scala
@@ -12,6 +12,7 @@ package scala.collection
package immutable
import generic._
+import immutable.{RedBlackTree => RB}
import mutable.{ Builder, SetBuilder }
/** $factoryInfo
@@ -46,20 +47,61 @@ object TreeSet extends ImmutableSortedSetFactory[TreeSet] {
* @define mayNotTerminateInf
* @define willNotTerminateInf
*/
-@SerialVersionUID(-234066569443569402L)
-class TreeSet[A](override val size: Int, t: RedBlack[A]#Tree[Unit])
- (implicit val ordering: Ordering[A])
- extends RedBlack[A] with SortedSet[A] with SortedSetLike[A, TreeSet[A]] with Serializable {
+@SerialVersionUID(-5685982407650748405L)
+class TreeSet[A] private (tree: RB.Tree[A, Unit])(implicit val ordering: Ordering[A])
+ extends SortedSet[A] with SortedSetLike[A, TreeSet[A]] with Serializable {
override def stringPrefix = "TreeSet"
- def isSmaller(x: A, y: A) = compare(x,y) < 0
+ override def size = RB.count(tree)
+
+ override def head = RB.smallest(tree).key
+ override def headOption = if (RB.isEmpty(tree)) None else Some(head)
+ override def last = RB.greatest(tree).key
+ override def lastOption = if (RB.isEmpty(tree)) None else Some(last)
+
+ override def tail = new TreeSet(RB.delete(tree, firstKey))
+ override def init = new TreeSet(RB.delete(tree, lastKey))
+
+ override def drop(n: Int) = {
+ if (n <= 0) this
+ else if (n >= size) empty
+ else newSet(RB.drop(tree, n))
+ }
- def this()(implicit ordering: Ordering[A]) = this(0, null)(ordering)
+ override def take(n: Int) = {
+ if (n <= 0) empty
+ else if (n >= size) this
+ else newSet(RB.take(tree, n))
+ }
- protected val tree: RedBlack[A]#Tree[Unit] = if (size == 0) Empty else t
+ override def slice(from: Int, until: Int) = {
+ if (until <= from) empty
+ else if (from <= 0) take(until)
+ else if (until >= size) drop(from)
+ else newSet(RB.slice(tree, from, until))
+ }
- private def newSet(s: Int, t: RedBlack[A]#Tree[Unit]) = new TreeSet[A](s, t)
+ override def dropRight(n: Int) = take(size - n)
+ override def takeRight(n: Int) = drop(size - n)
+ override def splitAt(n: Int) = (take(n), drop(n))
+
+ private[this] def countWhile(p: A => Boolean): Int = {
+ var result = 0
+ val it = iterator
+ while (it.hasNext && p(it.next)) result += 1
+ result
+ }
+ override def dropWhile(p: A => Boolean) = drop(countWhile(p))
+ override def takeWhile(p: A => Boolean) = take(countWhile(p))
+ override def span(p: A => Boolean) = splitAt(countWhile(p))
+
+ @deprecated("use `ordering.lt` instead", "2.10")
+ def isSmaller(x: A, y: A) = compare(x,y) < 0
+
+ def this()(implicit ordering: Ordering[A]) = this(null)(ordering)
+
+ private def newSet(t: RB.Tree[A, Unit]) = new TreeSet[A](t)
/** A factory to create empty sets of the same type of keys.
*/
@@ -70,10 +112,7 @@ class TreeSet[A](override val size: Int, t: RedBlack[A]#Tree[Unit])
* @param elem a new element to add.
* @return a new $coll containing `elem` and all the elements of this $coll.
*/
- def + (elem: A): TreeSet[A] = {
- val newsize = if (tree.lookup(elem).isEmpty) size + 1 else size
- newSet(newsize, tree.update(elem, ()))
- }
+ def + (elem: A): TreeSet[A] = newSet(RB.update(tree, elem, ()))
/** A new `TreeSet` with the entry added is returned,
* assuming that elem is <em>not</em> in the TreeSet.
@@ -82,8 +121,8 @@ class TreeSet[A](override val size: Int, t: RedBlack[A]#Tree[Unit])
* @return a new $coll containing `elem` and all the elements of this $coll.
*/
def insert(elem: A): TreeSet[A] = {
- assert(tree.lookup(elem).isEmpty)
- newSet(size + 1, tree.update(elem, ()))
+ assert(!RB.contains(tree, elem))
+ newSet(RB.update(tree, elem, ()))
}
/** Creates a new `TreeSet` with the entry removed.
@@ -92,31 +131,31 @@ class TreeSet[A](override val size: Int, t: RedBlack[A]#Tree[Unit])
* @return a new $coll containing all the elements of this $coll except `elem`.
*/
def - (elem:A): TreeSet[A] =
- if (tree.lookup(elem).isEmpty) this
- else newSet(size - 1, tree delete elem)
+ if (!RB.contains(tree, elem)) this
+ else newSet(RB.delete(tree, elem))
/** Checks if this set contains element `elem`.
*
* @param elem the element to check for membership.
* @return true, iff `elem` is contained in this set.
*/
- def contains(elem: A): Boolean = !tree.lookup(elem).isEmpty
+ def contains(elem: A): Boolean = RB.contains(tree, elem)
/** Creates a new iterator over all elements contained in this
* object.
*
* @return the new iterator
*/
- def iterator: Iterator[A] = tree.toStream.iterator map (_._1)
+ def iterator: Iterator[A] = RB.keysIterator(tree)
- override def toStream: Stream[A] = tree.toStream map (_._1)
+ override def foreach[U](f: A => U) = RB.foreachKey(tree, f)
- override def foreach[U](f: A => U) = tree foreach { (x, y) => f(x) }
+ override def rangeImpl(from: Option[A], until: Option[A]): TreeSet[A] = newSet(RB.rangeImpl(tree, from, until))
+ override def range(from: A, until: A): TreeSet[A] = newSet(RB.range(tree, from, until))
+ override def from(from: A): TreeSet[A] = newSet(RB.from(tree, from))
+ override def to(to: A): TreeSet[A] = newSet(RB.to(tree, to))
+ override def until(until: A): TreeSet[A] = newSet(RB.until(tree, until))
- override def rangeImpl(from: Option[A], until: Option[A]): TreeSet[A] = {
- val tree = this.tree.range(from, until)
- newSet(tree.count, tree)
- }
- override def firstKey = tree.first
- override def lastKey = tree.last
+ override def firstKey = head
+ override def lastKey = last
}
diff --git a/src/library/scala/collection/mutable/BasicNode.java b/src/library/scala/collection/mutable/BasicNode.java
index b934aed24f..c05009470a 100644
--- a/src/library/scala/collection/mutable/BasicNode.java
+++ b/src/library/scala/collection/mutable/BasicNode.java
@@ -13,7 +13,7 @@ package scala.collection.mutable;
-abstract class BasicNode {
+public abstract class BasicNode {
public abstract String string(int lev);
diff --git a/src/library/scala/collection/mutable/CNodeBase.java b/src/library/scala/collection/mutable/CNodeBase.java
new file mode 100644
index 0000000000..4374943b8d
--- /dev/null
+++ b/src/library/scala/collection/mutable/CNodeBase.java
@@ -0,0 +1,35 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2012, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.collection.mutable;
+
+
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+
+
+abstract class CNodeBase<K, V> extends MainNode<K, V> {
+
+ public static final AtomicIntegerFieldUpdater<CNodeBase> updater = AtomicIntegerFieldUpdater.newUpdater(CNodeBase.class, "csize");
+
+ public volatile int csize = -1;
+
+ public boolean CAS_SIZE(int oldval, int nval) {
+ return updater.compareAndSet(this, oldval, nval);
+ }
+
+ public void WRITE_SIZE(int nval) {
+ updater.set(this, nval);
+ }
+
+ public int READ_SIZE() {
+ return updater.get(this);
+ }
+
+} \ No newline at end of file
diff --git a/src/library/scala/collection/mutable/Ctrie.scala b/src/library/scala/collection/mutable/Ctrie.scala
index 6ed3a516c4..699b96b87c 100644
--- a/src/library/scala/collection/mutable/Ctrie.scala
+++ b/src/library/scala/collection/mutable/Ctrie.scala
@@ -20,7 +20,7 @@ import annotation.switch
-private[mutable] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INodeBase[K, V](g) {
+private[collection] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INodeBase[K, V](g) {
import INodeBase._
WRITE(bn)
@@ -31,6 +31,8 @@ private[mutable] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INo
@inline final def CAS(old: MainNode[K, V], n: MainNode[K, V]) = INodeBase.updater.compareAndSet(this, old, n)
+ final def gcasRead(ct: Ctrie[K, V]): MainNode[K, V] = GCAS_READ(ct)
+
@inline final def GCAS_READ(ct: Ctrie[K, V]): MainNode[K, V] = {
val m = /*READ*/mainnode
val prevval = /*READ*/m.prev
@@ -41,7 +43,7 @@ private[mutable] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INo
@tailrec private def GCAS_Complete(m: MainNode[K, V], ct: Ctrie[K, V]): MainNode[K, V] = if (m eq null) null else {
// complete the GCAS
val prev = /*READ*/m.prev
- val ctr = ct.RDCSS_READ_ROOT(true)
+ val ctr = ct.readRoot(true)
prev match {
case null =>
@@ -84,7 +86,7 @@ private[mutable] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INo
nin
}
- @inline final def copyToGen(ngen: Gen, ct: Ctrie[K, V]) = {
+ final def copyToGen(ngen: Gen, ct: Ctrie[K, V]) = {
val nin = new INode[K, V](ngen)
val main = GCAS_READ(ct)
nin.WRITE(main)
@@ -317,7 +319,7 @@ private[mutable] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INo
case tn: TNode[K, V] =>
val ncn = cn.updatedAt(pos, tn.copyUntombed, gen).toContracted(lev - 5)
if (!parent.GCAS(cn, ncn, ct))
- if (ct.RDCSS_READ_ROOT().gen == startgen) cleanParent(nonlive)
+ if (ct.readRoot().gen == startgen) cleanParent(nonlive)
}
}
case _ => // parent is no longer a cnode, we're done
@@ -360,6 +362,11 @@ private[mutable] final class INode[K, V](bn: MainNode[K, V], g: Gen) extends INo
final def isNullInode(ct: Ctrie[K, V]) = GCAS_READ(ct) eq null
+ final def cachedSize(ct: Ctrie[K, V]): Int = {
+ val m = GCAS_READ(ct)
+ m.cachedSize(ct)
+ }
+
/* this is a quiescent method! */
def string(lev: Int) = "%sINode -> %s".format(" " * lev, mainnode match {
case null => "<null>"
@@ -389,6 +396,8 @@ private[mutable] final class FailedNode[K, V](p: MainNode[K, V]) extends MainNod
def string(lev: Int) = throw new UnsupportedOperationException
+ def cachedSize(ct: AnyRef): Int = throw new UnsupportedOperationException
+
override def toString = "FailedNode(%s)".format(p)
}
@@ -398,7 +407,7 @@ private[mutable] trait KVNode[K, V] {
}
-private[mutable] final class SNode[K, V](final val k: K, final val v: V, final val hc: Int)
+private[collection] final class SNode[K, V](final val k: K, final val v: V, final val hc: Int)
extends BasicNode with KVNode[K, V] {
final def copy = new SNode(k, v, hc)
final def copyTombed = new TNode(k, v, hc)
@@ -408,17 +417,18 @@ extends BasicNode with KVNode[K, V] {
}
-private[mutable] final class TNode[K, V](final val k: K, final val v: V, final val hc: Int)
+private[collection] final class TNode[K, V](final val k: K, final val v: V, final val hc: Int)
extends MainNode[K, V] with KVNode[K, V] {
final def copy = new TNode(k, v, hc)
final def copyTombed = new TNode(k, v, hc)
final def copyUntombed = new SNode(k, v, hc)
final def kvPair = (k, v)
+ final def cachedSize(ct: AnyRef): Int = 1
final def string(lev: Int) = (" " * lev) + "TNode(%s, %s, %x, !)".format(k, v, hc)
}
-private[mutable] final class LNode[K, V](final val listmap: ImmutableListMap[K, V])
+private[collection] final class LNode[K, V](final val listmap: ImmutableListMap[K, V])
extends MainNode[K, V] {
def this(k: K, v: V) = this(ImmutableListMap(k -> v))
def this(k1: K, v1: V, k2: K, v2: V) = this(ImmutableListMap(k1 -> v1, k2 -> v2))
@@ -432,12 +442,44 @@ extends MainNode[K, V] {
}
}
def get(k: K) = listmap.get(k)
+ def cachedSize(ct: AnyRef): Int = listmap.size
def string(lev: Int) = (" " * lev) + "LNode(%s)".format(listmap.mkString(", "))
}
-private[mutable] final class CNode[K, V](final val bitmap: Int, final val array: Array[BasicNode], final val gen: Gen)
-extends MainNode[K, V] {
+private[collection] final class CNode[K, V](final val bitmap: Int, final val array: Array[BasicNode], final val gen: Gen)
+extends CNodeBase[K, V] {
+
+ // this should only be called from within read-only snapshots
+ final def cachedSize(ct: AnyRef) = {
+ val currsz = READ_SIZE()
+ if (currsz != -1) currsz
+ else {
+ val sz = computeSize(ct.asInstanceOf[Ctrie[K, V]])
+ while (READ_SIZE() == -1) CAS_SIZE(-1, sz)
+ READ_SIZE()
+ }
+ }
+
+ // lends itself towards being parallelizable by choosing
+ // a random starting offset in the array
+ // => if there are concurrent size computations, they start
+ // at different positions, so they are more likely to
+ // to be independent
+ private def computeSize(ct: Ctrie[K, V]): Int = {
+ var i = 0
+ var sz = 0
+ val offset = math.abs(util.Random.nextInt()) % array.length
+ while (i < array.length) {
+ val pos = (i + offset) % array.length
+ array(pos) match {
+ case sn: SNode[_, _] => sz += 1
+ case in: INode[K, V] => sz += in.cachedSize(ct)
+ }
+ i += 1
+ }
+ sz
+ }
final def updatedAt(pos: Int, nn: BasicNode, gen: Gen) = {
val len = array.length
@@ -509,7 +551,7 @@ extends MainNode[K, V] {
val sub = arr(i)
sub match {
case in: INode[K, V] =>
- val inodemain = in.GCAS_READ(ct)
+ val inodemain = in.gcasRead(ct)
assert(inodemain ne null)
tmparray(i) = resurrect(in, inodemain)
case sn: SNode[K, V] =>
@@ -630,6 +672,8 @@ extends ConcurrentMap[K, V]
@inline final def CAS_ROOT(ov: AnyRef, nv: AnyRef) = rootupdater.compareAndSet(this, ov, nv)
+ final def readRoot(abort: Boolean = false): INode[K, V] = RDCSS_READ_ROOT(abort)
+
@inline final def RDCSS_READ_ROOT(abort: Boolean = false): INode[K, V] = {
val r = /*READ*/root
r match {
@@ -648,7 +692,7 @@ extends ConcurrentMap[K, V]
if (CAS_ROOT(desc, ov)) ov
else RDCSS_Complete(abort)
} else {
- val oldmain = ov.GCAS_READ(this)
+ val oldmain = ov.gcasRead(this)
if (oldmain eq exp) {
if (CAS_ROOT(desc, nv)) {
desc.committed = true
@@ -720,9 +764,9 @@ extends ConcurrentMap[K, V]
override def empty: Ctrie[K, V] = new Ctrie[K, V]
- @inline final def isReadOnly = rootupdater eq null
+ final def isReadOnly = rootupdater eq null
- @inline final def nonReadOnly = rootupdater ne null
+ final def nonReadOnly = rootupdater ne null
/** Returns a snapshot of this Ctrie.
* This operation is lock-free and linearizable.
@@ -735,7 +779,7 @@ extends ConcurrentMap[K, V]
*/
@tailrec final def snapshot(): Ctrie[K, V] = {
val r = RDCSS_READ_ROOT()
- val expmain = r.GCAS_READ(this)
+ val expmain = r.gcasRead(this)
if (RDCSS_ROOT(r, expmain, r.copyToGen(new Gen, this))) new Ctrie(r.copyToGen(new Gen, this), rootupdater)
else snapshot()
}
@@ -754,14 +798,14 @@ extends ConcurrentMap[K, V]
*/
@tailrec final def readOnlySnapshot(): collection.Map[K, V] = {
val r = RDCSS_READ_ROOT()
- val expmain = r.GCAS_READ(this)
+ val expmain = r.gcasRead(this)
if (RDCSS_ROOT(r, expmain, r.copyToGen(new Gen, this))) new Ctrie(r, null)
else readOnlySnapshot()
}
@tailrec final override def clear() {
val r = RDCSS_READ_ROOT()
- if (!RDCSS_ROOT(r, r.GCAS_READ(this), INode.newRootNode[K, V])) clear()
+ if (!RDCSS_ROOT(r, r.gcasRead(this), INode.newRootNode[K, V])) clear()
}
final def lookup(k: K): V = {
@@ -830,6 +874,15 @@ extends ConcurrentMap[K, V]
if (nonReadOnly) readOnlySnapshot().iterator
else new CtrieIterator(0, this)
+ private def cachedSize() = {
+ val r = RDCSS_READ_ROOT()
+ r.cachedSize(this)
+ }
+
+ override def size: Int =
+ if (nonReadOnly) readOnlySnapshot().size
+ else cachedSize()
+
override def stringPrefix = "Ctrie"
}
@@ -852,7 +905,7 @@ object Ctrie extends MutableMapFactory[Ctrie] {
}
-private[collection] class CtrieIterator[K, V](var level: Int, ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
+private[collection] class CtrieIterator[K, V](var level: Int, private var ct: Ctrie[K, V], mustInit: Boolean = true) extends Iterator[(K, V)] {
var stack = new Array[Array[BasicNode]](7)
var stackpos = new Array[Int](7)
var depth = -1
@@ -875,7 +928,7 @@ private[collection] class CtrieIterator[K, V](var level: Int, ct: Ctrie[K, V], m
r
} else Iterator.empty.next()
- private def readin(in: INode[K, V]) = in.GCAS_READ(ct) match {
+ private def readin(in: INode[K, V]) = in.gcasRead(ct) match {
case cn: CNode[K, V] =>
depth += 1
stack(depth) = cn.array
@@ -920,6 +973,25 @@ private[collection] class CtrieIterator[K, V](var level: Int, ct: Ctrie[K, V], m
protected def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new CtrieIterator[K, V](_lev, _ct, _mustInit)
+ protected def dupTo(it: CtrieIterator[K, V]) = {
+ it.level = this.level
+ it.ct = this.ct
+ it.depth = this.depth
+ it.current = this.current
+
+ // these need a deep copy
+ Array.copy(this.stack, 0, it.stack, 0, 7)
+ Array.copy(this.stackpos, 0, it.stackpos, 0, 7)
+
+ // this one needs to be evaluated
+ if (this.subiter == null) it.subiter = null
+ else {
+ val lst = this.subiter.toList
+ this.subiter = lst.iterator
+ it.subiter = lst.iterator
+ }
+ }
+
/** Returns a sequence of iterators over subsets of this iterator.
* It's used to ease the implementation of splitters for a parallel version of the Ctrie.
*/
@@ -955,7 +1027,7 @@ private[collection] class CtrieIterator[K, V](var level: Int, ct: Ctrie[K, V], m
Seq(this)
}
- private def print {
+ def printDebug {
println("ctrie iterator")
println(stackpos.mkString(","))
println("depth: " + depth)
diff --git a/src/library/scala/collection/mutable/HashTable.scala b/src/library/scala/collection/mutable/HashTable.scala
index cdf1b78f29..5b3e07b826 100644
--- a/src/library/scala/collection/mutable/HashTable.scala
+++ b/src/library/scala/collection/mutable/HashTable.scala
@@ -52,6 +52,10 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
*/
@transient protected var sizemap: Array[Int] = null
+ @transient var seedvalue: Int = tableSizeSeed
+
+ protected def tableSizeSeed = Integer.bitCount(table.length - 1)
+
protected def initialSize: Int = HashTable.initialSize
private def lastPopulatedIndex = {
@@ -70,14 +74,16 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
private[collection] def init[B](in: java.io.ObjectInputStream, f: (A, B) => Entry) {
in.defaultReadObject
- _loadFactor = in.readInt
+ _loadFactor = in.readInt()
assert(_loadFactor > 0)
- val size = in.readInt
+ val size = in.readInt()
tableSize = 0
assert(size >= 0)
-
- val smDefined = in.readBoolean
+
+ seedvalue = in.readInt()
+
+ val smDefined = in.readBoolean()
table = new Array(capacity(sizeForThreshold(_loadFactor, size)))
threshold = newThreshold(_loadFactor, table.size)
@@ -86,7 +92,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
var index = 0
while (index < size) {
- addEntry(f(in.readObject.asInstanceOf[A], in.readObject.asInstanceOf[B]))
+ addEntry(f(in.readObject().asInstanceOf[A], in.readObject().asInstanceOf[B]))
index += 1
}
}
@@ -103,6 +109,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
out.defaultWriteObject
out.writeInt(_loadFactor)
out.writeInt(tableSize)
+ out.writeInt(seedvalue)
out.writeBoolean(isSizeMapDefined)
foreachEntry { entry =>
out.writeObject(entry.key)
@@ -314,7 +321,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
// this is of crucial importance when populating the table in parallel
protected final def index(hcode: Int) = {
val ones = table.length - 1
- val improved = improve(hcode)
+ val improved = improve(hcode, seedvalue)
val shifted = (improved >> (32 - java.lang.Integer.bitCount(ones))) & ones
shifted
}
@@ -325,6 +332,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
table = c.table
tableSize = c.tableSize
threshold = c.threshold
+ seedvalue = c.seedvalue
sizemap = c.sizemap
}
if (alwaysInitSizeMap && sizemap == null) sizeMapInitAndRebuild
@@ -335,6 +343,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU
table,
tableSize,
threshold,
+ seedvalue,
sizemap
)
}
@@ -368,7 +377,7 @@ private[collection] object HashTable {
protected def elemHashCode(key: KeyType) = key.##
- protected final def improve(hcode: Int) = {
+ protected final def improve(hcode: Int, seed: Int) = {
/* Murmur hash
* m = 0x5bd1e995
* r = 24
@@ -396,7 +405,7 @@ private[collection] object HashTable {
* */
var i = hcode * 0x9e3775cd
i = java.lang.Integer.reverseBytes(i)
- i * 0x9e3775cd
+ i = i * 0x9e3775cd
// a slower alternative for byte reversal:
// i = (i << 16) | (i >> 16)
// i = ((i >> 8) & 0x00ff00ff) | ((i << 8) & 0xff00ff00)
@@ -420,6 +429,11 @@ private[collection] object HashTable {
// h = h ^ (h >>> 14)
// h = h + (h << 4)
// h ^ (h >>> 10)
+
+ // the rest of the computation is due to SI-5293
+ val rotation = seed % 32
+ val rotated = (i >>> rotation) | (i << (32 - rotation))
+ rotated
}
}
@@ -442,6 +456,7 @@ private[collection] object HashTable {
val table: Array[HashEntry[A, Entry]],
val tableSize: Int,
val threshold: Int,
+ val seedvalue: Int,
val sizemap: Array[Int]
) {
import collection.DebugUtils._
@@ -452,6 +467,7 @@ private[collection] object HashTable {
append("Table: [" + arrayString(table, 0, table.length) + "]")
append("Table size: " + tableSize)
append("Load factor: " + loadFactor)
+ append("Seedvalue: " + seedvalue)
append("Threshold: " + threshold)
append("Sizemap: [" + arrayString(sizemap, 0, sizemap.length) + "]")
}
diff --git a/src/library/scala/collection/mutable/MainNode.java b/src/library/scala/collection/mutable/MainNode.java
index 09bc858edc..0578de676d 100644
--- a/src/library/scala/collection/mutable/MainNode.java
+++ b/src/library/scala/collection/mutable/MainNode.java
@@ -20,6 +20,8 @@ abstract class MainNode<K, V> extends BasicNode {
public volatile MainNode<K, V> prev = null;
+ public abstract int cachedSize(Object ct);
+
public boolean CAS_PREV(MainNode<K, V> oldval, MainNode<K, V> nval) {
return updater.compareAndSet(this, oldval, nval);
}
@@ -29,6 +31,8 @@ abstract class MainNode<K, V> extends BasicNode {
}
// do we need this? unclear in the javadocs...
+ // apparently not - volatile reads are supposed to be safe
+ // irregardless of whether there are concurrent ARFU updates
public MainNode<K, V> READ_PREV() {
return updater.get(this);
}
diff --git a/src/library/scala/collection/parallel/mutable/ParCtrie.scala b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
index 86624500fd..cec2e6886d 100644
--- a/src/library/scala/collection/parallel/mutable/ParCtrie.scala
+++ b/src/library/scala/collection/parallel/mutable/ParCtrie.scala
@@ -13,6 +13,12 @@ package scala.collection.parallel.mutable
import scala.collection.generic._
import scala.collection.parallel.Combiner
import scala.collection.parallel.IterableSplitter
+import scala.collection.mutable.BasicNode
+import scala.collection.mutable.TNode
+import scala.collection.mutable.LNode
+import scala.collection.mutable.CNode
+import scala.collection.mutable.SNode
+import scala.collection.mutable.INode
import scala.collection.mutable.Ctrie
import scala.collection.mutable.CtrieIterator
@@ -34,6 +40,7 @@ extends ParMap[K, V]
with ParCtrieCombiner[K, V]
with Serializable
{
+ import collection.parallel.tasksupport._
def this() = this(new Ctrie)
@@ -47,8 +54,6 @@ extends ParMap[K, V]
def splitter = new ParCtrieSplitter(0, ctrie.readOnlySnapshot().asInstanceOf[Ctrie[K, V]], true)
- override def size = ctrie.size
-
override def clear() = ctrie.clear()
def result = this
@@ -71,8 +76,46 @@ extends ParMap[K, V]
this
}
+ override def size = {
+ val in = ctrie.readRoot()
+ val r = in.gcasRead(ctrie)
+ r match {
+ case tn: TNode[_, _] => tn.cachedSize(ctrie)
+ case ln: LNode[_, _] => ln.cachedSize(ctrie)
+ case cn: CNode[_, _] =>
+ executeAndWaitResult(new Size(0, cn.array.length, cn.array))
+ cn.cachedSize(ctrie)
+ }
+ }
+
override def stringPrefix = "ParCtrie"
+ /* tasks */
+
+ /** Computes Ctrie size in parallel. */
+ class Size(offset: Int, howmany: Int, array: Array[BasicNode]) extends Task[Int, Size] {
+ var result = -1
+ def leaf(prev: Option[Int]) = {
+ var sz = 0
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ array(i) match {
+ case sn: SNode[_, _] => sz += 1
+ case in: INode[K, V] => sz += in.cachedSize(ctrie)
+ }
+ i += 1
+ }
+ result = sz
+ }
+ def split = {
+ val fp = howmany / 2
+ Seq(new Size(offset, fp, array), new Size(offset + fp, howmany - fp, array))
+ }
+ def shouldSplitFurther = howmany > 1
+ override def merge(that: Size) = result = result + that.result
+ }
+
}
@@ -81,8 +124,7 @@ extends CtrieIterator[K, V](lev, ct, mustInit)
with IterableSplitter[(K, V)]
{
// only evaluated if `remaining` is invoked (which is not used by most tasks)
- //lazy val totalsize = ct.iterator.size /* TODO improve to lazily compute sizes */
- def totalsize: Int = throw new UnsupportedOperationException
+ lazy val totalsize = ct.par.size
var iterated = 0
protected override def newIterator(_lev: Int, _ct: Ctrie[K, V], _mustInit: Boolean) = new ParCtrieSplitter[K, V](_lev, _ct, _mustInit)
@@ -92,7 +134,12 @@ extends CtrieIterator[K, V](lev, ct, mustInit)
level < maxsplits
}
- def dup = null // TODO necessary for views
+ def dup = {
+ val it = newIterator(0, ct, false)
+ dupTo(it)
+ it.iterated = this.iterated
+ it
+ }
override def next() = {
iterated += 1
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index 15ffd3fdd2..21a5b05749 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -160,10 +160,11 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr
import collection.parallel.tasksupport._
private var mask = ParHashMapCombiner.discriminantmask
private var nonmasklen = ParHashMapCombiner.nonmasklength
+ private var seedvalue = 27
def +=(elem: (K, V)) = {
sz += 1
- val hc = improve(elemHashCode(elem._1))
+ val hc = improve(elemHashCode(elem._1), seedvalue)
val pos = (hc >>> nonmasklen)
if (buckets(pos) eq null) {
// initialize bucket
@@ -176,7 +177,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr
def result: ParHashMap[K, V] = if (size >= (ParHashMapCombiner.numblocks * sizeMapBucketSize)) { // 1024
// construct table
- val table = new AddingHashTable(size, tableLoadFactor)
+ val table = new AddingHashTable(size, tableLoadFactor, seedvalue)
val bucks = buckets.map(b => if (b ne null) b.headPtr else null)
val insertcount = executeAndWaitResult(new FillBlocks(bucks, table, 0, bucks.length))
table.setSize(insertcount)
@@ -210,11 +211,12 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr
* and true if the key was successfully inserted. It does not update the number of elements
* in the table.
*/
- private[ParHashMapCombiner] class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] {
+ private[ParHashMapCombiner] class AddingHashTable(numelems: Int, lf: Int, _seedvalue: Int) extends HashTable[K, DefaultEntry[K, V]] {
import HashTable._
_loadFactor = lf
table = new Array[HashEntry[K, DefaultEntry[K, V]]](capacity(sizeForThreshold(_loadFactor, numelems)))
tableSize = 0
+ seedvalue = _seedvalue
threshold = newThreshold(_loadFactor, table.length)
sizeMapInit(table.length)
def setSize(sz: Int) = tableSize = sz
@@ -285,7 +287,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr
insertcount
}
private def assertCorrectBlock(block: Int, k: K) {
- val hc = improve(elemHashCode(k))
+ val hc = improve(elemHashCode(k), seedvalue)
if ((hc >>> nonmasklen) != block) {
println(hc + " goes to " + (hc >>> nonmasklen) + ", while expected block is " + block)
assert((hc >>> nonmasklen) == block)
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala
new file mode 100644
index 0000000000..c38e668f30
--- /dev/null
+++ b/src/library/scala/concurrent/Awaitable.scala
@@ -0,0 +1,24 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+import scala.annotation.implicitNotFound
+import scala.util.Duration
+
+
+
+trait Awaitable[+T] {
+ @implicitNotFound(msg = "Waiting must be done by calling `blocking(timeout) b`, where `b` is the `Awaitable` object or a potentially blocking piece of code.")
+ def await(atMost: Duration)(implicit canawait: CanAwait): T
+}
+
+
+
diff --git a/src/library/scala/concurrent/Channel.scala b/src/library/scala/concurrent/Channel.scala
index 43d684641e..e79f76430f 100644
--- a/src/library/scala/concurrent/Channel.scala
+++ b/src/library/scala/concurrent/Channel.scala
@@ -23,7 +23,7 @@ class Channel[A] {
private var written = new LinkedList[A] // FIFO buffer, realized through
private var lastWritten = written // aliasing of a linked list
private var nreaders = 0
-
+
/**
* @param x ...
*/
@@ -33,7 +33,7 @@ class Channel[A] {
lastWritten = lastWritten.next
if (nreaders > 0) notify()
}
-
+
def read: A = synchronized {
while (written.next == null) {
try {
@@ -46,4 +46,5 @@ class Channel[A] {
written = written.next
x
}
+
}
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
new file mode 100644
index 0000000000..6aacd53de2
--- /dev/null
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -0,0 +1,103 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import scala.util.{ Duration, Try, Success, Failure }
+import ConcurrentPackageObject._
+
+/** This package object contains primitives for concurrent and parallel programming.
+ */
+abstract class ConcurrentPackageObject {
+ /** A global execution environment for executing lightweight tasks.
+ */
+ lazy val executionContext =
+ new impl.ExecutionContextImpl(java.util.concurrent.Executors.newCachedThreadPool())
+
+ /** A global service for scheduling tasks for execution.
+ */
+ // lazy val scheduler =
+ // new default.SchedulerImpl
+
+ val handledFutureException: PartialFunction[Throwable, Throwable] = {
+ case t: Throwable if isFutureThrowable(t) => t
+ }
+
+ // TODO rename appropriately and make public
+ private[concurrent] def isFutureThrowable(t: Throwable) = t match {
+ case e: Error => false
+ case t: scala.util.control.ControlThrowable => false
+ case i: InterruptedException => false
+ case _ => true
+ }
+
+ private[concurrent] def resolve[T](source: Try[T]): Try[T] = source match {
+ case Failure(t: scala.runtime.NonLocalReturnControl[_]) => Success(t.value.asInstanceOf[T])
+ case Failure(t: scala.util.control.ControlThrowable) => Failure(new ExecutionException("Boxed ControlThrowable", t))
+ case Failure(t: InterruptedException) => Failure(new ExecutionException("Boxed InterruptedException", t))
+ case Failure(e: Error) => Failure(new ExecutionException("Boxed Error", e))
+ case _ => source
+ }
+
+ private[concurrent] def resolver[T] =
+ resolverFunction.asInstanceOf[PartialFunction[Throwable, Try[T]]]
+
+ /* concurrency constructs */
+
+ def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] =
+ execCtx future body
+
+ def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] =
+ execCtx promise
+
+ /** Wraps a block of code into an awaitable object. */
+ def body2awaitable[T](body: =>T) = new Awaitable[T] {
+ def await(atMost: Duration)(implicit cb: CanAwait) = body
+ }
+
+ /** Used to block on a piece of code which potentially blocks.
+ *
+ * @param body A piece of code which contains potentially blocking or long running calls.
+ *
+ * Calling this method may throw the following exceptions:
+ * - CancellationException - if the computation was cancelled
+ * - InterruptedException - in the case that a wait within the blockable object was interrupted
+ * - TimeoutException - in the case that the blockable object timed out
+ */
+ def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext): T =
+ executionContext.blocking(atMost)(body)
+
+ /** Blocks on an awaitable object.
+ *
+ * @param awaitable An object with a `block` method which runs potentially blocking or long running calls.
+ *
+ * Calling this method may throw the following exceptions:
+ * - CancellationException - if the computation was cancelled
+ * - InterruptedException - in the case that a wait within the blockable object was interrupted
+ * - TimeoutException - in the case that the blockable object timed out
+ */
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration)(implicit execCtx: ExecutionContext = executionContext): T =
+ executionContext.blocking(awaitable, atMost)
+
+ @inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x)
+}
+
+private[concurrent] object ConcurrentPackageObject {
+ // TODO, docs, return type
+ // Note that having this in the package object led to failures when
+ // compiling a subset of sources; it seems that the wildcard is not
+ // properly handled, and you get messages like "type _$1 defined twice".
+ // This is consistent with other package object breakdowns.
+ private val resolverFunction: PartialFunction[Throwable, Try[_]] = {
+ case t: scala.runtime.NonLocalReturnControl[_] => Success(t.value)
+ case t: scala.util.control.ControlThrowable => Failure(new ExecutionException("Boxed ControlThrowable", t))
+ case t: InterruptedException => Failure(new ExecutionException("Boxed InterruptedException", t))
+ case e: Error => Failure(new ExecutionException("Boxed Error", e))
+ case t => Failure(t)
+ }
+}
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index e308c3b5a6..0b7f54a27a 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -8,7 +8,6 @@
package scala.concurrent
-import ops.future
/** A `DelayedLazyVal` is a wrapper for lengthy computations which have a
* valid partially computed result.
@@ -27,21 +26,23 @@ import ops.future
class DelayedLazyVal[T](f: () => T, body: => Unit) {
@volatile private[this] var _isDone = false
private[this] lazy val complete = f()
-
+
/** Whether the computation is complete.
*
* @return true if the computation is complete.
*/
def isDone = _isDone
-
+
/** The current result of f(), or the final result if complete.
*
* @return the current value
*/
def apply(): T = if (isDone) complete else f()
-
- future {
+
+ // TODO replace with scala.concurrent.future { ... }
+ ops.future {
body
_isDone = true
}
+
}
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
new file mode 100644
index 0000000000..99cd264ac5
--- /dev/null
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -0,0 +1,132 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+import java.util.concurrent.atomic.{ AtomicInteger }
+import java.util.concurrent.{ Executors, Future => JFuture, Callable }
+import scala.util.Duration
+import scala.util.{ Try, Success, Failure }
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
+import scala.collection.generic.CanBuildFrom
+import collection._
+
+
+
+trait ExecutionContext {
+
+ protected implicit object CanAwaitEvidence extends CanAwait
+
+ def execute(runnable: Runnable): Unit
+
+ def execute[U](body: () => U): Unit
+
+ def promise[T]: Promise[T]
+
+ def future[T](body: Callable[T]): Future[T] = future(body.call())
+
+ def future[T](body: => T): Future[T]
+
+ def blocking[T](atMost: Duration)(body: =>T): T
+
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
+
+ def reportFailure(t: Throwable): Unit
+
+ /* implementations follow */
+
+ private implicit val executionContext = this
+
+ def keptPromise[T](result: T): Promise[T] = {
+ val p = promise[T]
+ p success result
+ }
+
+ def brokenPromise[T](t: Throwable): Promise[T] = {
+ val p = promise[T]
+ p failure t
+ }
+
+ /** TODO some docs
+ *
+ */
+ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
+ import nondeterministic._
+ val buffer = new mutable.ArrayBuffer[T]
+ val counter = new AtomicInteger(1) // how else could we do this?
+ val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature
+ var idx = 0
+
+ def tryFinish() = if (counter.decrementAndGet() == 0) {
+ val builder = cbf(futures)
+ builder ++= buffer
+ p success builder.result
+ }
+
+ for (f <- futures) {
+ val currentIndex = idx
+ buffer += null.asInstanceOf[T]
+ counter.incrementAndGet()
+ f onComplete {
+ case Failure(t) =>
+ p tryFailure t
+ case Success(v) =>
+ buffer(currentIndex) = v
+ tryFinish()
+ }
+ idx += 1
+ }
+
+ tryFinish()
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def any[T](futures: Traversable[Future[T]]): Future[T] = {
+ val p = promise[T]
+ val completeFirst: Try[T] => Unit = elem => p tryComplete elem
+
+ futures foreach (_ onComplete completeFirst)
+
+ p.future
+ }
+
+ /** TODO some docs
+ *
+ */
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
+ if (futures.isEmpty) Promise.kept[Option[T]](None).future
+ else {
+ val result = promise[Option[T]]
+ val count = new AtomicInteger(futures.size)
+ val search: Try[T] => Unit = {
+ v => v match {
+ case Success(r) => if (predicate(r)) result trySuccess Some(r)
+ case _ =>
+ }
+ if (count.decrementAndGet() == 0) result trySuccess None
+ }
+
+ futures.foreach(_ onComplete search)
+
+ result.future
+ }
+ }
+
+}
+
+
+sealed trait CanAwait
+
+
+
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
new file mode 100644
index 0000000000..73f76bbbfb
--- /dev/null
+++ b/src/library/scala/concurrent/Future.scala
@@ -0,0 +1,492 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+
+
+import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
+import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
+import java.lang.{ Iterable => JIterable }
+import java.util.{ LinkedList => JLinkedList }
+import java.{ lang => jl }
+import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
+
+import scala.util.{ Timeout, Duration, Try, Success, Failure }
+import scala.Option
+
+import scala.annotation.tailrec
+import scala.collection.mutable.Stack
+import scala.collection.mutable.Builder
+import scala.collection.generic.CanBuildFrom
+
+
+
+/** The trait that represents futures.
+ *
+ * Asynchronous computations that yield futures are created with the `future` call:
+ *
+ * {{{
+ * val s = "Hello"
+ * val f: Future[String] = future {
+ * s + " future!"
+ * }
+ * f onSuccess {
+ * case msg => println(msg)
+ * }
+ * }}}
+ *
+ * @author Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang
+ *
+ * @define multipleCallbacks
+ * Multiple callbacks may be registered; there is no guarantee that they will be
+ * executed in a particular order.
+ *
+ * @define caughtThrowables
+ * The future may contain a throwable object and this means that the future failed.
+ * Futures obtained through combinators have the same exception as the future they were obtained from.
+ * The following throwable objects are not contained in the future:
+ * - `Error` - errors are not contained within futures
+ * - `InterruptedException` - not contained within futures
+ * - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures
+ *
+ * Instead, the future is completed with a ExecutionException with one of the exceptions above
+ * as the cause.
+ * If a future is failed with a `scala.runtime.NonLocalReturnControl`,
+ * it is completed with a value instead from that throwable instead instead.
+ *
+ * @define nonDeterministic
+ * Note: using this method yields nondeterministic dataflow programs.
+ *
+ * @define forComprehensionExamples
+ * Example:
+ *
+ * {{{
+ * val f = future { 5 }
+ * val g = future { 3 }
+ * val h = for {
+ * x: Int <- f // returns Future(5)
+ * y: Int <- g // returns Future(5)
+ * } yield x + y
+ * }}}
+ *
+ * is translated to:
+ *
+ * {{{
+ * f flatMap { (x: Int) => g map { (y: Int) => x + y } }
+ * }}}
+ */
+trait Future[+T] extends Awaitable[T] {
+self =>
+
+ /* Callbacks */
+
+ /** When this future is completed successfully (i.e. with a value),
+ * apply the provided partial function to the value if the partial function
+ * is defined at that value.
+ *
+ * If the future has already been completed with a value,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ */
+ def onSuccess[U](pf: PartialFunction[T, U]): this.type = onComplete {
+ case Failure(t) => // do nothing
+ case Success(v) => if (pf isDefinedAt v) pf(v) else { /*do nothing*/ }
+ }
+
+ /** When this future is completed with a failure (i.e. with a throwable),
+ * apply the provided callback to the throwable.
+ *
+ * $caughtThrowables
+ *
+ * If the future has already been completed with a failure,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * Will not be called in case that the future is completed with a value.
+ *
+ * $multipleCallbacks
+ */
+ def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete {
+ case Failure(t) => if (isFutureThrowable(t) && callback.isDefinedAt(t)) callback(t) else { /*do nothing*/ }
+ case Success(v) => // do nothing
+ }
+
+ /** When this future is completed, either through an exception, a timeout, or a value,
+ * apply the provided function.
+ *
+ * If the future has already been completed,
+ * this will either be applied immediately or be scheduled asynchronously.
+ *
+ * $multipleCallbacks
+ */
+ def onComplete[U](func: Try[T] => U): this.type
+
+
+ /* Miscellaneous */
+
+ /** Creates a new promise.
+ */
+ def newPromise[S]: Promise[S]
+
+
+ /* Projections */
+
+ /** Returns a failed projection of this future.
+ *
+ * The failed projection is a future holding a value of type `Throwable`.
+ *
+ * It is completed with a value which is the throwable of the original future
+ * in case the original future is failed.
+ *
+ * It is failed with a `NoSuchElementException` if the original future is completed successfully.
+ *
+ * Blocking on this future returns a value if the original future is completed with an exception
+ * and throws a corresponding exception if the original future fails.
+ */
+ def failed: Future[Throwable] = {
+ def noSuchElem(v: T) =
+ new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v)
+
+ val p = newPromise[Throwable]
+
+ onComplete {
+ case Failure(t) => p success t
+ case Success(v) => p failure noSuchElem(v)
+ }
+
+ p.future
+ }
+
+
+ /* Monadic operations */
+
+ /** Asynchronously processes the value in the future once the value becomes available.
+ *
+ * Will not be called if the future fails.
+ */
+ def foreach[U](f: T => U): Unit = onComplete {
+ case Success(r) => f(r)
+ case Failure(_) => // do nothing
+ }
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future. If this future is completed with an exception then the new
+ * future will also contain this exception.
+ *
+ * $forComprehensionExample
+ */
+ def map[S](f: T => S): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Failure(t) => p failure t
+ case Success(v) =>
+ try p success f(v)
+ catch {
+ case t => p complete resolver(t)
+ }
+ }
+
+ p.future
+ }
+
+ /** Creates a new future by applying a function to the successful result of
+ * this future, and returns the result of the function as the new future.
+ * If this future is completed with an exception then the new future will
+ * also contain this exception.
+ *
+ * $forComprehensionExample
+ */
+ def flatMap[S](f: T => Future[S]): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Failure(t) => p failure t
+ case Success(v) =>
+ try {
+ f(v) onComplete {
+ case Failure(t) => p failure t
+ case Success(v) => p success v
+ }
+ } catch {
+ case t: Throwable => p complete resolver(t)
+ }
+ }
+
+ p.future
+ }
+
+ /** Creates a new future by filtering the value of the current future with a predicate.
+ *
+ * If the current future contains a value which satisfies the predicate, the new future will also hold that value.
+ * Otherwise, the resulting future will fail with a `NoSuchElementException`.
+ *
+ * If the current future fails or times out, the resulting future also fails or times out, respectively.
+ *
+ * Example:
+ * {{{
+ * val f = future { 5 }
+ * val g = f filter { _ % 2 == 1 }
+ * val h = f filter { _ % 2 == 0 }
+ * await(0) g // evaluates to 5
+ * await(0) h // throw a NoSuchElementException
+ * }}}
+ */
+ def filter(pred: T => Boolean): Future[T] = {
+ val p = newPromise[T]
+
+ onComplete {
+ case Failure(t) => p failure t
+ case Success(v) =>
+ try {
+ if (pred(v)) p success v
+ else p failure new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
+ } catch {
+ case t: Throwable => p complete resolver(t)
+ }
+ }
+
+ p.future
+ }
+
+ /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value.
+ *
+ * If the current future contains a value for which the partial function is defined, the new future will also hold that value.
+ * Otherwise, the resulting future will fail with a `NoSuchElementException`.
+ *
+ * If the current future fails or times out, the resulting future also fails or times out, respectively.
+ *
+ * Example:
+ * {{{
+ * val f = future { -5 }
+ * val g = f collect {
+ * case x if x < 0 => -x
+ * }
+ * val h = f collect {
+ * case x if x > 0 => x * 2
+ * }
+ * await(0) g // evaluates to 5
+ * await(0) h // throw a NoSuchElementException
+ * }}}
+ */
+ def collect[S](pf: PartialFunction[T, S]): Future[S] = {
+ val p = newPromise[S]
+
+ onComplete {
+ case Failure(t) => p failure t
+ case Success(v) =>
+ try {
+ if (pf.isDefinedAt(v)) p success pf(v)
+ else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v)
+ } catch {
+ case t: Throwable => p complete resolver(t)
+ }
+ }
+
+ p.future
+ }
+
+ /** Creates a new future that will handle any matching throwable that this
+ * future might contain. If there is no match, or if this future contains
+ * a valid result then the new future will contain the same.
+ *
+ * Example:
+ *
+ * {{{
+ * future (6 / 0) recover { case e: ArithmeticException ⇒ 0 } // result: 0
+ * future (6 / 0) recover { case e: NotFoundException ⇒ 0 } // result: exception
+ * future (6 / 2) recover { case e: ArithmeticException ⇒ 0 } // result: 3
+ * }}}
+ */
+ def recover[U >: T](pf: PartialFunction[Throwable, U]): Future[U] = {
+ val p = newPromise[U]
+
+ onComplete {
+ case Failure(t) if pf isDefinedAt t =>
+ try { p success pf(t) }
+ catch { case t: Throwable => p complete resolver(t) }
+ case otherwise => p complete otherwise
+ }
+
+ p.future
+ }
+
+ /** Creates a new future that will handle any matching throwable that this
+ * future might contain by assigning it a value of another future.
+ *
+ * If there is no match, or if this future contains
+ * a valid result then the new future will contain the same result.
+ *
+ * Example:
+ *
+ * {{{
+ * val f = future { Int.MaxValue }
+ * future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue
+ * }}}
+ */
+ def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = {
+ val p = newPromise[U]
+
+ onComplete {
+ case Failure(t) if pf isDefinedAt t =>
+ try {
+ p completeWith pf(t)
+ } catch {
+ case t: Throwable => p complete resolver(t)
+ }
+ case otherwise => p complete otherwise
+ }
+
+ p.future
+ }
+
+ /** Zips the values of `this` and `that` future, and creates
+ * a new future holding the tuple of their results.
+ *
+ * If `this` future fails, the resulting future is failed
+ * with the throwable stored in `this`.
+ * Otherwise, if `that` future fails, the resulting future is failed
+ * with the throwable stored in `that`.
+ */
+ def zip[U](that: Future[U]): Future[(T, U)] = {
+ val p = newPromise[(T, U)]
+
+ this onComplete {
+ case Failure(t) => p failure t
+ case Success(r) => that onSuccess {
+ case r2 => p success ((r, r2))
+ }
+ }
+
+ that onFailure {
+ case f => p failure f
+ }
+
+ p.future
+ }
+
+ /** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
+ * the result of the `that` future if `that` is completed successfully.
+ * If both futures are failed, the resulting future holds the throwable object of the first future.
+ *
+ * Using this method will not cause concurrent programs to become nondeterministic.
+ *
+ * Example:
+ * {{{
+ * val f = future { sys.error("failed") }
+ * val g = future { 5 }
+ * val h = f orElse g
+ * await(0) h // evaluates to 5
+ * }}}
+ */
+ def fallbackTo[U >: T](that: Future[U]): Future[U] = {
+ val p = newPromise[U]
+
+ onComplete {
+ case Failure(t) => that onComplete {
+ case Failure(_) => p failure t
+ case Success(v) => p success v
+ }
+ case Success(v) => p success v
+ }
+
+ p.future
+ }
+
+ /** Applies the side-effecting function to the result of this future, and returns
+ * a new future with the result of this future.
+ *
+ * This method allows one to enforce that the callbacks are executed in a
+ * specified order.
+ *
+ * Note that if one of the chained `andThen` callbacks throws
+ * an exception, that exception is not propagated to the subsequent `andThen`
+ * callbacks. Instead, the subsequent `andThen` callbacks are given the original
+ * value of this future.
+ *
+ * The following example prints out `5`:
+ *
+ * {{{
+ * val f = future { 5 }
+ * f andThen {
+ * case r => sys.error("runtime exception")
+ * } andThen {
+ * case Failure(t) => println(t)
+ * case Success(v) => println(v)
+ * }
+ * }}}
+ */
+ def andThen[U](pf: PartialFunction[Try[T], U]): Future[T] = {
+ val p = newPromise[T]
+
+ onComplete {
+ case r =>
+ try if (pf isDefinedAt r) pf(r)
+ finally p complete r
+ }
+
+ p.future
+ }
+
+ /** Creates a new future which holds the result of either this future or `that` future, depending on
+ * which future was completed first.
+ *
+ * $nonDeterministic
+ *
+ * Example:
+ * {{{
+ * val f = future { sys.error("failed") }
+ * val g = future { 5 }
+ * val h = f either g
+ * await(0) h // evaluates to either 5 or throws a runtime exception
+ * }}}
+ */
+ def either[U >: T](that: Future[U]): Future[U] = {
+ val p = self.newPromise[U]
+
+ val completePromise: PartialFunction[Try[U], _] = {
+ case Failure(t) => p tryFailure t
+ case Success(v) => p trySuccess v
+ }
+
+ self onComplete completePromise
+ that onComplete completePromise
+
+ p.future
+ }
+
+}
+
+
+
+/** TODO some docs
+ *
+ * @define nonDeterministic
+ * Note: using this method yields nondeterministic dataflow programs.
+ */
+object Future {
+
+ // TODO make more modular by encoding all other helper methods within the execution context
+ /** TODO some docs
+ */
+ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] =
+ ec.all[T, Coll](futures)
+
+ // move this to future companion object
+ @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body)
+
+ def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.any(futures)
+
+ def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.find(futures)(predicate)
+
+}
+
+
+
+
diff --git a/src/library/scala/concurrent/FutureTaskRunner.scala b/src/library/scala/concurrent/FutureTaskRunner.scala
index c5fcde2d19..75e6299ad9 100644
--- a/src/library/scala/concurrent/FutureTaskRunner.scala
+++ b/src/library/scala/concurrent/FutureTaskRunner.scala
@@ -13,6 +13,7 @@ package scala.concurrent
*
* @author Philipp Haller
*/
+@deprecated("Use `ExecutionContext`s instead.", "2.10.0")
trait FutureTaskRunner extends TaskRunner {
/** The type of the futures that the underlying task runner supports.
diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala
index db3c490882..bac9d4f558 100644
--- a/src/library/scala/concurrent/JavaConversions.scala
+++ b/src/library/scala/concurrent/JavaConversions.scala
@@ -17,6 +17,7 @@ import java.util.concurrent.{ExecutorService, Executor}
*/
object JavaConversions {
+ @deprecated("Use `asExecutionContext` instead.", "2.10.0")
implicit def asTaskRunner(exec: ExecutorService): FutureTaskRunner =
new ThreadPoolRunner {
override protected def executor =
@@ -26,6 +27,7 @@ object JavaConversions {
exec.shutdown()
}
+ @deprecated("Use `asExecutionContext` instead.", "2.10.0")
implicit def asTaskRunner(exec: Executor): TaskRunner =
new TaskRunner {
type Task[T] = Runnable
@@ -46,4 +48,9 @@ object JavaConversions {
// do nothing
}
}
+
+ implicit def asExecutionContext(exec: ExecutorService): ExecutionContext = null // TODO
+
+ implicit def asExecutionContext(exec: Executor): ExecutionContext = null // TODO
+
}
diff --git a/src/library/scala/concurrent/ManagedBlocker.scala b/src/library/scala/concurrent/ManagedBlocker.scala
index 9c6f4d51d6..0b6d82e76f 100644
--- a/src/library/scala/concurrent/ManagedBlocker.scala
+++ b/src/library/scala/concurrent/ManagedBlocker.scala
@@ -12,6 +12,7 @@ package scala.concurrent
*
* @author Philipp Haller
*/
+@deprecated("Not used.", "2.10.0")
trait ManagedBlocker {
/**
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
new file mode 100644
index 0000000000..f26deb77ab
--- /dev/null
+++ b/src/library/scala/concurrent/Promise.scala
@@ -0,0 +1,132 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import scala.util.{ Try, Success, Failure }
+
+
+
+
+/** Promise is an object which can be completed with a value or failed
+ * with an exception.
+ *
+ * @define promiseCompletion
+ * If the promise has already been fulfilled, failed or has timed out,
+ * calling this method will throw an IllegalStateException.
+ *
+ * @define allowedThrowables
+ * If the throwable used to fail this promise is an error, a control exception
+ * or an interrupted exception, it will be wrapped as a cause within an
+ * `ExecutionException` which will fail the promise.
+ *
+ * @define nonDeterministic
+ * Note: Using this method may result in non-deterministic concurrent programs.
+ */
+trait Promise[T] {
+
+ import nondeterministic._
+
+ /** Future containing the value of this promise.
+ */
+ def future: Future[T]
+
+ /** Completes the promise with either an exception or a value.
+ *
+ * @param result Either the value or the exception to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def complete(result:Try[T]): this.type = if (tryComplete(result)) this else throwCompleted
+
+ /** Tries to complete the promise with either a value or the exception.
+ *
+ * $nonDeterministic
+ *
+ * @return If the promise has already been completed returns `false`, or `true` otherwise.
+ */
+ def tryComplete(result: Try[T]): Boolean
+
+ /** Completes this promise with the specified future, once that future is completed.
+ *
+ * @return This promise
+ */
+ final def completeWith(other: Future[T]): this.type = {
+ other onComplete {
+ this complete _
+ }
+ this
+ }
+
+ /** Completes the promise with a value.
+ *
+ * @param value The value to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def success(v: T): this.type = if (trySuccess(v)) this else throwCompleted
+
+ /** Tries to complete the promise with a value.
+ *
+ * $nonDeterministic
+ *
+ * @return If the promise has already been completed returns `false`, or `true` otherwise.
+ */
+ def trySuccess(value: T): Boolean = tryComplete(Success(value))
+
+ /** Completes the promise with an exception.
+ *
+ * @param t The throwable to complete the promise with.
+ *
+ * $allowedThrowables
+ *
+ * $promiseCompletion
+ */
+ def failure(t: Throwable): this.type = if (tryFailure(t)) this else throwCompleted
+
+ /** Tries to complete the promise with an exception.
+ *
+ * $nonDeterministic
+ *
+ * @return If the promise has already been completed returns `false`, or `true` otherwise.
+ */
+ def tryFailure(t: Throwable): Boolean = tryComplete(Failure(t))
+
+ /** Wraps a `Throwable` in an `ExecutionException` if necessary. TODO replace with `resolver` from scala.concurrent
+ *
+ * $allowedThrowables
+ */
+ protected def wrap(t: Throwable): Throwable = t match {
+ case t: Throwable if isFutureThrowable(t) => t
+ case _ => new ExecutionException(t)
+ }
+
+ private def throwCompleted = throw new IllegalStateException("Promise already completed.")
+
+}
+
+
+
+object Promise {
+
+ def kept[T](result: T)(implicit execctx: ExecutionContext): Promise[T] =
+ execctx keptPromise result
+
+ def broken[T](t: Throwable)(implicit execctx: ExecutionContext): Promise[T] =
+ execctx brokenPromise t
+
+}
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/concurrent/Scheduler.scala b/src/library/scala/concurrent/Scheduler.scala
new file mode 100644
index 0000000000..39d798e6b4
--- /dev/null
+++ b/src/library/scala/concurrent/Scheduler.scala
@@ -0,0 +1,54 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+
+import scala.util.Duration
+
+/** A service for scheduling tasks and thunks for one-time, or periodic execution.
+ */
+trait Scheduler {
+
+ /** Schedules a thunk for repeated execution with an initial delay and a frequency.
+ *
+ * @param delay the initial delay after which the thunk should be executed
+ * the first time
+ * @param frequency the frequency with which the thunk should be executed,
+ * as a time period between subsequent executions
+ */
+ def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable
+
+ /** Schedules a task for execution after a given delay.
+ *
+ * @param delay the duration after which the task should be executed
+ * @param task the task that is scheduled for execution
+ * @return a `Cancellable` that may be used to cancel the execution
+ * of the task
+ */
+ def scheduleOnce(delay: Duration, task: Runnable): Cancellable
+
+ /** Schedules a thunk for execution after a given delay.
+ *
+ * @param delay the duration after which the thunk should be executed
+ * @param thunk the thunk that is scheduled for execution
+ * @return a `Cancellable` that may be used to cancel the execution
+ * of the thunk
+ */
+ def scheduleOnce(delay: Duration)(task: => Unit): Cancellable
+
+}
+
+
+
+trait Cancellable {
+
+ /** Cancels the underlying task.
+ */
+ def cancel(): Unit
+
+}
diff --git a/src/library/scala/concurrent/Task.scala b/src/library/scala/concurrent/Task.scala
new file mode 100644
index 0000000000..d6f86bac31
--- /dev/null
+++ b/src/library/scala/concurrent/Task.scala
@@ -0,0 +1,13 @@
+package scala.concurrent
+
+
+
+trait Task[+T] {
+
+ def start(): Unit
+
+ def future: Future[T]
+
+}
+
+
diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala
index 64e62adfd3..500d79e07f 100644
--- a/src/library/scala/concurrent/TaskRunner.scala
+++ b/src/library/scala/concurrent/TaskRunner.scala
@@ -12,6 +12,7 @@ package scala.concurrent
*
* @author Philipp Haller
*/
+@deprecated("Use `ExecutionContext`s instead.", "2.10.0")
trait TaskRunner {
type Task[T]
diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala
index 588073dc5e..7994255b25 100644
--- a/src/library/scala/concurrent/TaskRunners.scala
+++ b/src/library/scala/concurrent/TaskRunners.scala
@@ -14,6 +14,7 @@ import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit}
*
* @author Philipp Haller
*/
+@deprecated("Use `ExecutionContext`s instead.", "2.10.0")
object TaskRunners {
implicit val threadRunner: FutureTaskRunner =
diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala
index 27d8f2cc32..a3e0253634 100644
--- a/src/library/scala/concurrent/ThreadPoolRunner.scala
+++ b/src/library/scala/concurrent/ThreadPoolRunner.scala
@@ -15,6 +15,7 @@ import java.util.concurrent.{ExecutorService, Callable, TimeUnit}
*
* @author Philipp Haller
*/
+@deprecated("Use `ExecutionContext`s instead.", "2.10.0")
trait ThreadPoolRunner extends FutureTaskRunner {
type Task[T] = Callable[T] with Runnable
diff --git a/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled b/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled
new file mode 100644
index 0000000000..745d2d1a15
--- /dev/null
+++ b/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled
@@ -0,0 +1,44 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent
+package default
+
+import scala.util.Duration
+
+private[concurrent] final class SchedulerImpl extends Scheduler {
+ private val timer =
+ new java.util.Timer(true) // the associated thread runs as a daemon
+
+ def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable = ???
+
+ def scheduleOnce(delay: Duration, task: Runnable): Cancellable = {
+ val timerTask = new java.util.TimerTask {
+ def run(): Unit =
+ task.run()
+ }
+ timer.schedule(timerTask, delay.toMillis)
+ new Cancellable {
+ def cancel(): Unit =
+ timerTask.cancel()
+ }
+ }
+
+ def scheduleOnce(delay: Duration)(task: => Unit): Cancellable = {
+ val timerTask = new java.util.TimerTask {
+ def run(): Unit =
+ task
+ }
+ timer.schedule(timerTask, delay.toMillis)
+ new Cancellable {
+ def cancel(): Unit =
+ timerTask.cancel()
+ }
+ }
+
+}
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala.disabled b/src/library/scala/concurrent/default/TaskImpl.scala.disabled
new file mode 100644
index 0000000000..94e54cb372
--- /dev/null
+++ b/src/library/scala/concurrent/default/TaskImpl.scala.disabled
@@ -0,0 +1,313 @@
+package scala.concurrent
+package default
+
+
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread }
+import scala.util.Try
+import scala.util
+import scala.util.Duration
+import scala.annotation.tailrec
+
+
+
+private[concurrent] trait Completable[T] {
+self: Future[T] =>
+
+ val executor: ExecutionContextImpl
+
+ def newPromise[S]: Promise[S] = executor promise
+
+ type Callback = Try[T] => Any
+
+ def getState: State[T]
+
+ def casState(oldv: State[T], newv: State[T]): Boolean
+
+ protected def dispatch[U](r: Runnable) = executionContext execute r
+
+ protected def processCallbacks(cbs: List[Callback], r: Try[T]) =
+ for (cb <- cbs) dispatch(new Runnable {
+ override def run() = cb(r)
+ })
+
+ def future: Future[T] = self
+
+ def onComplete[U](callback: Try[T] => U): this.type = {
+ @tailrec def tryAddCallback(): Try[T] = {
+ getState match {
+ case p @ Pending(lst) =>
+ val pt = p.asInstanceOf[Pending[T]]
+ if (casState(pt, Pending(callback :: pt.callbacks))) null
+ else tryAddCallback()
+ case Success(res) => util.Success(res)
+ case Failure(t) => util.Failure(t)
+ }
+ }
+
+ val res = tryAddCallback()
+ if (res != null) dispatch(new Runnable {
+ override def run() =
+ try callback(res)
+ catch handledFutureException andThen {
+ t => Console.err.println(t)
+ }
+ })
+
+ this
+ }
+
+ def isTimedout: Boolean = getState match {
+ case Failure(ft: FutureTimeoutException) => true
+ case _ => false
+ }
+
+}
+
+private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl)
+extends Promise[T] with Future[T] with Completable[T] {
+
+ val executor: scala.concurrent.default.ExecutionContextImpl = context
+
+ @volatile private var state: State[T] = _
+
+ val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[PromiseImpl[T]], classOf[State[T]], "state")
+
+ updater.set(this, Pending(List()))
+
+ def casState(oldv: State[T], newv: State[T]): Boolean = {
+ updater.compareAndSet(this, oldv, newv)
+ }
+
+ def getState: State[T] = {
+ updater.get(this)
+ }
+
+ @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
+ case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
+ case _ => null
+ }
+
+ def tryComplete(r: Try[T]) = r match {
+ case util.Failure(t) => tryFailure(t)
+ case util.Success(v) => trySuccess(v)
+ }
+
+ override def trySuccess(value: T): Boolean = {
+ val cbs = tryCompleteState(Success(value))
+ if (cbs == null)
+ false
+ else {
+ processCallbacks(cbs, util.Success(value))
+ this.synchronized {
+ this.notifyAll()
+ }
+ true
+ }
+ }
+
+ override def tryFailure(t: Throwable): Boolean = {
+ val wrapped = wrap(t)
+ val cbs = tryCompleteState(Failure(wrapped))
+ if (cbs == null)
+ false
+ else {
+ processCallbacks(cbs, util.Failure(wrapped))
+ this.synchronized {
+ this.notifyAll()
+ }
+ true
+ }
+ }
+
+ def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match {
+ case Success(res) => res
+ case Failure(t) => throw t
+ case _ =>
+ this.synchronized {
+ while (true)
+ getState match {
+ case Pending(_) => this.wait()
+ case Success(res) => return res
+ case Failure(t) => throw t
+ }
+ }
+ sys.error("unreachable")
+ }
+
+}
+
+private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T)
+extends RecursiveAction with Task[T] with Future[T] with Completable[T] {
+
+ val executor: ExecutionContextImpl = context
+
+ @volatile private var state: State[T] = _
+
+ val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[TaskImpl[T]], classOf[State[T]], "state")
+
+ updater.set(this, Pending(List()))
+
+ def casState(oldv: State[T], newv: State[T]): Boolean = {
+ updater.compareAndSet(this, oldv, newv)
+ }
+
+ def getState: State[T] = {
+ updater.get(this)
+ }
+
+ @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
+ case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
+ }
+
+ def compute(): Unit = {
+ var cbs: List[Callback] = null
+ try {
+ val res = body
+ processCallbacks(tryCompleteState(Success(res)), util.Success(res))
+ } catch {
+ case t if isFutureThrowable(t) =>
+ processCallbacks(tryCompleteState(Failure(t)), util.Failure(t))
+ case t =>
+ val ee = new ExecutionException(t)
+ processCallbacks(tryCompleteState(Failure(ee)), util.Failure(ee))
+ throw t
+ }
+ }
+
+ def start(): Unit = {
+ Thread.currentThread match {
+ case fj: ForkJoinWorkerThread if fj.getPool eq executor.pool => fork()
+ case _ => executor.pool.execute(this)
+ }
+ }
+
+ // TODO FIXME: handle timeouts
+ def await(atMost: Duration): this.type =
+ await
+
+ def await: this.type = {
+ this.join()
+ this
+ }
+
+ def tryCancel(): Unit =
+ tryUnfork()
+
+ def await(atMost: Duration)(implicit canawait: CanAwait): T = {
+ join() // TODO handle timeout also
+ (updater.get(this): @unchecked) match {
+ case Success(r) => r
+ case Failure(t) => throw t
+ }
+ }
+
+}
+
+
+private[concurrent] sealed abstract class State[T]
+
+
+case class Pending[T](callbacks: List[Try[T] => Any]) extends State[T]
+
+
+case class Success[T](result: T) extends State[T]
+
+
+case class Failure[T](throwable: Throwable) extends State[T]
+
+
+private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
+ import ExecutionContextImpl._
+
+ val pool = {
+ val p = new ForkJoinPool
+ p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
+ def uncaughtException(t: Thread, throwable: Throwable) {
+ Console.err.println(throwable.getMessage)
+ throwable.printStackTrace(Console.err)
+ }
+ })
+ p
+ }
+
+ @inline
+ private def executeTask(task: RecursiveAction) {
+ if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread])
+ task.fork()
+ else
+ pool execute task
+ }
+
+ def execute(task: Runnable) {
+ val action = new RecursiveAction { def compute() { task.run() } }
+ executeTask(action)
+ }
+
+ def execute[U](body: () => U) {
+ val action = new RecursiveAction { def compute() { body() } }
+ executeTask(action)
+ }
+
+ def task[T](body: => T): Task[T] = {
+ new TaskImpl(this, body)
+ }
+
+ def future[T](body: => T): Future[T] = {
+ val t = task(body)
+ t.start()
+ t.future
+ }
+
+ def promise[T]: Promise[T] =
+ new PromiseImpl[T](this)
+
+ def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ currentExecutionContext.get match {
+ case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
+ case x if x eq this => this.blockingCall(awaitable) // inside an execution context thread on this executor
+ case x => x.blocking(awaitable, atMost)
+ }
+ }
+
+ private def blockingCall[T](b: Awaitable[T]): T = b match {
+ case fj: TaskImpl[_] if fj.executor.pool eq pool =>
+ fj.await(Duration.fromNanos(0))
+ case _ =>
+ var res: T = null.asInstanceOf[T]
+ @volatile var blockingDone = false
+ // TODO add exception handling here!
+ val mb = new ForkJoinPool.ManagedBlocker {
+ def block() = {
+ res = b.await(Duration.fromNanos(0))(CanAwaitEvidence)
+ blockingDone = true
+ true
+ }
+ def isReleasable = blockingDone
+ }
+ ForkJoinPool.managedBlock(mb, true)
+ res
+ }
+
+ def reportFailure(t: Throwable): Unit = {}
+
+}
+
+
+object ExecutionContextImpl {
+
+ private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
+ override protected def initialValue = null
+ }
+
+}
+
+
+
+
+
+
+
diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java
new file mode 100644
index 0000000000..5280d67854
--- /dev/null
+++ b/src/library/scala/concurrent/impl/AbstractPromise.java
@@ -0,0 +1,21 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.impl;
+
+
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+
+
+abstract class AbstractPromise {
+ private volatile Object _ref = null;
+ protected final static AtomicReferenceFieldUpdater<AbstractPromise, Object> updater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref");
+}
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
new file mode 100644
index 0000000000..af0eb66292
--- /dev/null
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -0,0 +1,134 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.impl
+
+
+
+import java.util.concurrent.{Callable, ExecutorService}
+import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable}
+import scala.util.{ Duration, Try, Success, Failure }
+import scala.collection.mutable.Stack
+
+
+
+class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext {
+ import ExecutionContextImpl._
+
+ def execute(runnable: Runnable): Unit = executorService match {
+ // case fj: ForkJoinPool =>
+ // TODO fork if more applicable
+ // executorService execute runnable
+ case _ =>
+ executorService execute runnable
+ }
+
+ def execute[U](body: () => U): Unit = execute(new Runnable {
+ def run() = body()
+ })
+
+ def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this)
+
+ def future[T](body: =>T): Future[T] = {
+ val p = promise[T]
+
+ dispatchFuture {
+ () =>
+ p complete {
+ try {
+ Success(body)
+ } catch {
+ case e => resolver(e)
+ }
+ }
+ }
+
+ p.future
+ }
+
+ def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ currentExecutionContext.get match {
+ case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
+ case x => x.blockingCall(awaitable) // inside an execution context thread
+ }
+ }
+
+ def reportFailure(t: Throwable) = t match {
+ case e: Error => throw e // rethrow serious errors
+ case t => t.printStackTrace()
+ }
+
+ /** Only callable from the tasks running on the same execution context. */
+ private def blockingCall[T](body: Awaitable[T]): T = {
+ releaseStack()
+
+ // TODO see what to do with timeout
+ body.await(Duration.fromNanos(0))(CanAwaitEvidence)
+ }
+
+ // an optimization for batching futures
+ // TODO we should replace this with a public queue,
+ // so that it can be stolen from
+ // OR: a push to the local task queue should be so cheap that this is
+ // not even needed, but stealing is still possible
+ private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
+
+ private def releaseStack(): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && stack.nonEmpty =>
+ val tasks = stack.elems
+ stack.clear()
+ _taskStack.remove()
+ dispatchFuture(() => _taskStack.get.elems = tasks, true)
+ case null =>
+ // do nothing - there is no local batching stack anymore
+ case _ =>
+ _taskStack.remove()
+ }
+
+ private[impl] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && !force => stack push task
+ case _ => this.execute(
+ new Runnable {
+ def run() {
+ try {
+ val taskStack = Stack[() => Unit](task)
+ _taskStack set taskStack
+ while (taskStack.nonEmpty) {
+ val next = taskStack.pop()
+ try {
+ next.apply()
+ } catch {
+ case e =>
+ // TODO catching all and continue isn't good for OOME
+ reportFailure(e)
+ }
+ }
+ } finally {
+ _taskStack.remove()
+ }
+ }
+ }
+ )
+ }
+
+}
+
+
+object ExecutionContextImpl {
+
+ private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContextImpl] = new ThreadLocal[ExecutionContextImpl] {
+ override protected def initialValue = null
+ }
+
+}
+
+
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
new file mode 100644
index 0000000000..24d0258cc8
--- /dev/null
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -0,0 +1,89 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.impl
+
+import scala.concurrent.{Awaitable, ExecutionContext}
+import scala.util.{ Try, Success, Failure }
+//import scala.util.continuations._
+
+trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
+
+ implicit def executor: ExecutionContextImpl
+
+ /** For use only within a Future.flow block or another compatible Delimited Continuations reset block.
+ *
+ * Returns the result of this Future without blocking, by suspending execution and storing it as a
+ * continuation until the result is available.
+ */
+ //def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any]))
+
+ /** Tests whether this Future has been completed.
+ */
+ final def isCompleted: Boolean = value.isDefined
+
+ /** The contained value of this Future. Before this Future is completed
+ * the value will be None. After completion the value will be Some(Right(t))
+ * if it contains a valid result, or Some(Left(error)) if it contains
+ * an exception.
+ */
+ def value: Option[Try[T]]
+
+ def onComplete[U](func: Try[T] => U): this.type
+
+ /** Creates a new Future[A] which is completed with this Future's result if
+ * that conforms to A's erased type or a ClassCastException otherwise.
+ */
+ final def mapTo[T](implicit m: Manifest[T]) = {
+ val p = executor.promise[T]
+
+ onComplete {
+ case f @ Failure(t) => p complete f.asInstanceOf[Try[T]]
+ case Success(v) =>
+ p complete (try {
+ Success(Future.boxedType(m.erasure).cast(v).asInstanceOf[T])
+ } catch {
+ case e: ClassCastException ⇒ Failure(e)
+ })
+ }
+
+ p.future
+ }
+
+ /** Used by for-comprehensions.
+ */
+ final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p)
+
+ final class FutureWithFilter[+A](self: Future[A], p: A => Boolean) {
+ def foreach(f: A => Unit): Unit = self filter p foreach f
+ def map[B](f: A => B) = self filter p map f
+ def flatMap[B](f: A => Future[B]) = self filter p flatMap f
+ def withFilter(q: A => Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x))
+ }
+
+}
+
+object Future {
+ import java.{ lang => jl }
+
+ private val toBoxed = Map[Class[_], Class[_]](
+ classOf[Boolean] -> classOf[jl.Boolean],
+ classOf[Byte] -> classOf[jl.Byte],
+ classOf[Char] -> classOf[jl.Character],
+ classOf[Short] -> classOf[jl.Short],
+ classOf[Int] -> classOf[jl.Integer],
+ classOf[Long] -> classOf[jl.Long],
+ classOf[Float] -> classOf[jl.Float],
+ classOf[Double] -> classOf[jl.Double],
+ classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
+ )
+
+ def boxedType(c: Class[_]): Class[_] = {
+ if (c.isPrimitive) toBoxed(c) else c
+ }
+}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
new file mode 100644
index 0000000000..7ef76e1501
--- /dev/null
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -0,0 +1,252 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.concurrent.impl
+
+
+
+import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException}
+//import scala.util.continuations._
+import scala.util.Duration
+import scala.util.Try
+import scala.util
+import scala.annotation.tailrec
+//import scala.concurrent.NonDeterministic
+
+
+
+trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
+
+ def future = this
+
+ def newPromise[S]: Promise[S] = executor promise
+
+ // TODO refine answer and return types here from Any to type parameters
+ // then move this up in the hierarchy
+ /*
+ final def <<(value: T): Future[T] @cps[Future[Any]] = shift {
+ cont: (Future[T] => Future[Any]) =>
+ cont(complete(Right(value)))
+ }
+
+ final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift {
+ cont: (Future[T] => Future[Any]) =>
+ val p = executor.promise[Any]
+ val thisPromise = this
+
+ thisPromise completeWith other
+ thisPromise onComplete { v =>
+ try {
+ p completeWith cont(thisPromise)
+ } catch {
+ case e => p complete resolver(e)
+ }
+ }
+
+ p.future
+ }
+ */
+ // TODO finish this once we introduce something like dataflow streams
+
+ /*
+ final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) =>
+ val fr = executor.promise[Any]
+ val f = stream.dequeue(this)
+ f.onComplete { _ =>
+ try {
+ fr completeWith cont(f)
+ } catch {
+ case e =>
+ fr failure e
+ }
+ }
+ fr
+ }
+ */
+
+}
+
+
+object Promise {
+ def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue
+
+ def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
+
+ /** Represents the internal state.
+ */
+ sealed trait FState[+T] { def value: Option[Try[T]] }
+
+ case class Pending[T](listeners: List[Try[T] => Any] = Nil) extends FState[T] {
+ def value: Option[Try[T]] = None
+ }
+
+ case class Success[T](value: Option[util.Success[T]] = None) extends FState[T] {
+ def result: T = value.get.get
+ }
+
+ case class Failure[T](value: Option[util.Failure[T]] = None) extends FState[T] {
+ def exception: Throwable = value.get.exception
+ }
+
+ private val emptyPendingValue = Pending[Nothing](Nil)
+
+ /** Default promise implementation.
+ */
+ class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] {
+ self =>
+
+ updater.set(this, Promise.EmptyPending())
+
+ protected final def tryAwait(atMost: Duration): Boolean = {
+ @tailrec
+ def awaitUnsafe(waitTimeNanos: Long): Boolean = {
+ if (value.isEmpty && waitTimeNanos > 0) {
+ val ms = NANOSECONDS.toMillis(waitTimeNanos)
+ val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec
+ val start = System.nanoTime()
+ try {
+ synchronized {
+ while (value.isEmpty) wait(ms, ns)
+ }
+ } catch {
+ case e: InterruptedException =>
+ }
+
+ awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
+ } else
+ value.isDefined
+ }
+
+ executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0))
+ }
+
+ private def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
+ if (value.isDefined || tryAwait(atMost)) this
+ else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
+
+ def await(atMost: Duration)(implicit permit: CanAwait): T =
+ ready(atMost).value.get match {
+ case util.Failure(e) => throw e
+ case util.Success(r) => r
+ }
+
+ def value: Option[Try[T]] = getState.value
+
+ @inline
+ private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]]
+
+ @inline
+ protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState)
+
+ @inline
+ protected final def getState: FState[T] = updater.get(this)
+
+ def tryComplete(value: Try[T]): Boolean = {
+ val callbacks: List[Try[T] => Any] = {
+ try {
+ @tailrec
+ def tryComplete(v: Try[T]): List[Try[T] => Any] = {
+ getState match {
+ case cur @ Pending(listeners) =>
+ if (updateState(cur, if (v.isFailure) Failure(Some(v.asInstanceOf[util.Failure[T]])) else Success(Some(v.asInstanceOf[util.Success[T]])))) listeners
+ else tryComplete(v)
+ case _ => null
+ }
+ }
+ tryComplete(resolve(value))
+ } finally {
+ synchronized { notifyAll() } // notify any blockers from `tryAwait`
+ }
+ }
+
+ callbacks match {
+ case null => false
+ case cs if cs.isEmpty => true
+ case cs =>
+ executor dispatchFuture {
+ () => cs.foreach(f => notifyCompleted(f, value))
+ }
+ true
+ }
+ }
+
+ def onComplete[U](func: Try[T] => U): this.type = {
+ @tailrec // Returns whether the future has already been completed or not
+ def tryAddCallback(): Boolean = {
+ val cur = getState
+ cur match {
+ case _: Success[_] | _: Failure[_] => true
+ case p: Pending[_] =>
+ val pt = p.asInstanceOf[Pending[T]]
+ if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
+ }
+ }
+
+ if (tryAddCallback()) {
+ val result = value.get
+ executor dispatchFuture {
+ () => notifyCompleted(func, result)
+ }
+ }
+
+ this
+ }
+
+ private final def notifyCompleted(func: Try[T] => Any, result: Try[T]) {
+ try {
+ func(result)
+ } catch {
+ case e => executor.reportFailure(e)
+ }
+ }
+ }
+
+ /** An already completed Future is given its result at creation.
+ *
+ * Useful in Future-composition when a value to contribute is already available.
+ */
+ final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContextImpl) extends Promise[T] {
+ val value = Some(resolve(suppliedValue))
+
+ def tryComplete(value: Try[T]): Boolean = false
+
+ def onComplete[U](func: Try[T] => U): this.type = {
+ val completedAs = value.get
+ executor dispatchFuture {
+ () => func(completedAs)
+ }
+ this
+ }
+
+ private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
+
+ def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
+ case util.Failure(e) => throw e
+ case util.Success(r) => r
+ }
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala
index 92220a8313..2cea29aefe 100644
--- a/src/library/scala/concurrent/ops.scala
+++ b/src/library/scala/concurrent/ops.scala
@@ -15,6 +15,7 @@ import scala.util.control.Exception.allCatch
*
* @author Martin Odersky, Stepan Koltsov, Philipp Haller
*/
+@deprecated("Use `future` instead.", "2.10.0")
object ops
{
val defaultRunner: FutureTaskRunner = TaskRunners.threadRunner
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
new file mode 100644
index 0000000000..6a98fd50c2
--- /dev/null
+++ b/src/library/scala/concurrent/package.scala
@@ -0,0 +1,57 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala
+
+import scala.util.{ Duration, Try, Success, Failure }
+
+/** This package object contains primitives for concurrent and parallel programming.
+ */
+package object concurrent extends scala.concurrent.ConcurrentPackageObject {
+ type ExecutionException = java.util.concurrent.ExecutionException
+ type CancellationException = java.util.concurrent.CancellationException
+ type TimeoutException = java.util.concurrent.TimeoutException
+}
+
+package concurrent {
+ object await {
+ def ready[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): Awaitable[T] = {
+ try blocking(awaitable, atMost)
+ catch { case _ => }
+ awaitable
+ }
+
+ def result[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T = {
+ blocking(awaitable, atMost)
+ }
+ }
+
+ /** Importing this object allows using some concurrency primitives
+ * on futures and promises that can yield nondeterministic programs.
+ *
+ * While program determinism is broken when using these primitives,
+ * some programs cannot be written without them (e.g. multiple client threads
+ * cannot send requests to a server thread through regular promises and futures).
+ */
+ object nondeterministic { }
+
+ /** A timeout exception.
+ *
+ * Futures are failed with a timeout exception when their timeout expires.
+ *
+ * Each timeout exception contains an origin future which originally timed out.
+ */
+ class FutureTimeoutException(origin: Future[_], message: String) extends TimeoutException(message) {
+ def this(origin: Future[_]) = this(origin, "Future timed out.")
+ }
+
+ final class DurationOps private[concurrent] (x: Int) {
+ // TODO ADD OTHERS
+ def ns = util.Duration.fromNanos(0)
+ }
+}
diff --git a/src/library/scala/concurrent/package.scala.disabled b/src/library/scala/concurrent/package.scala.disabled
deleted file mode 100644
index 42b4bf954c..0000000000
--- a/src/library/scala/concurrent/package.scala.disabled
+++ /dev/null
@@ -1,108 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-
-
-package scala
-
-
-
-
-/** This package object contains primitives for parallel programming.
- */
-package object concurrent {
-
- /** Performs a call which can potentially block execution.
- *
- * Example:
- * {{{
- * val lock = new ReentrantLock
- *
- * // ... do something ...
- *
- * blocking {
- * if (!lock.hasLock) lock.lock()
- * }
- * }}}
- *
- * '''Note:''' calling methods that wait arbitrary amounts of time
- * (e.g. for I/O operations or locks) may severely decrease performance
- * or even result in deadlocks. This does not include waiting for
- * results of futures.
- *
- * @tparam T the result type of the blocking operation
- * @param body the blocking operation
- * @param runner the runner used for parallel computations
- * @return the result of the potentially blocking operation
- */
- def blocking[T](body: =>T)(implicit runner: TaskRunner): T = {
- null.asInstanceOf[T]
- }
-
- /** Invokes a computation asynchronously. Does not wait for the computation
- * to finish.
- *
- * @tparam U the result type of the operation
- * @param p the computation to be invoked asynchronously
- * @param runner the runner used for parallel computations
- */
- def spawn[U](p: =>U)(implicit runner: TaskRunner): Unit = {
- }
-
- /** Starts 2 parallel computations and returns once they are completed.
- *
- * $invokingPar
- *
- * @tparam T1 the type of the result of 1st the parallel computation
- * @tparam T2 the type of the result of 2nd the parallel computation
- * @param b1 the 1st computation to be invoked in parallel
- * @param b2 the 2nd computation to be invoked in parallel
- * @param runner the runner used for parallel computations
- * @return a tuple of results corresponding to parallel computations
- */
- def par[T1, T2](b1: =>T1)(b2: =>T2)(implicit runner: TaskRunner): (T1, T2) = {
- null
- }
-
- /** Starts 3 parallel computations and returns once they are completed.
- *
- * $invokingPar
- *
- * @tparam T1 the type of the result of 1st the parallel computation
- * @tparam T2 the type of the result of 2nd the parallel computation
- * @tparam T3 the type of the result of 3rd the parallel computation
- * @param b1 the 1st computation to be invoked in parallel
- * @param b2 the 2nd computation to be invoked in parallel
- * @param b3 the 3rd computation to be invoked in parallel
- * @param runner the runner used for parallel computations
- * @return a tuple of results corresponding to parallel computations
- */
- def par[T1, T2, T3](b1: =>T1)(b2: =>T2)(b3: =>T3)(implicit runner: TaskRunner): (T1, T2, T3) = {
- null
- }
-
- /** Starts 4 parallel computations and returns once they are completed.
- *
- * $invokingPar
- *
- * @tparam T1 the type of the result of 1st the parallel computation
- * @tparam T2 the type of the result of 2nd the parallel computation
- * @tparam T3 the type of the result of 3rd the parallel computation
- * @tparam T4 the type of the result of 4th the parallel computation
- * @param b1 the 1st computation to be invoked in parallel
- * @param b2 the 2nd computation to be invoked in parallel
- * @param b3 the 3rd computation to be invoked in parallel
- * @param b4 the 4th computation to be invoked in parallel
- * @param runner the runner used for parallel computations
- * @return a tuple of results corresponding to parallel computations
- */
- def par[T1, T2, T3, T4](b1: =>T1)(b2: =>T2)(b3: =>T3)(b4: =>T4)(implicit runner: TaskRunner): (T1, T2, T3, T4) = {
- null
- }
-
-}
diff --git a/src/library/scala/package.scala b/src/library/scala/package.scala
index 9425eba232..366af34ee9 100644
--- a/src/library/scala/package.scala
+++ b/src/library/scala/package.scala
@@ -27,12 +27,10 @@ package object scala {
type NoSuchElementException = java.util.NoSuchElementException
type NumberFormatException = java.lang.NumberFormatException
type AbstractMethodError = java.lang.AbstractMethodError
+ type InterruptedException = java.lang.InterruptedException
// A dummy used by the specialization annotation.
- // Normally it's bad juju to place objects inside package objects,
- // but there's no choice here as we'd have to be AnyRef's companion
- // and defined in the same file - except there is no such file.
- object AnyRef extends Specializable {
+ val AnyRef = new Specializable {
override def toString = "object AnyRef"
}
diff --git a/src/library/scala/reflect/api/Trees.scala b/src/library/scala/reflect/api/Trees.scala
index 2c960392ec..1472866460 100644
--- a/src/library/scala/reflect/api/Trees.scala
+++ b/src/library/scala/reflect/api/Trees.scala
@@ -486,11 +486,10 @@ trait Trees { self: Universe =>
* A `New(t, as)` is expanded to: `(new t).<init>(as)`
*/
def New(tpt: Tree, argss: List[List[Tree]]): Tree = {
- assert(!argss.isEmpty)
// todo. we need to expose names in scala.reflect.api
-// val superRef: Tree = Select(New(tpt), nme.CONSTRUCTOR)
val superRef: Tree = Select(New(tpt), nme.CONSTRUCTOR)
- (superRef /: argss) (Apply)
+ if (argss.isEmpty) Apply(superRef, Nil)
+ else (superRef /: argss) (Apply)
}
/** Type annotation, eliminated by explicit outer */
diff --git a/src/library/scala/sys/process/BasicIO.scala b/src/library/scala/sys/process/BasicIO.scala
index 44e573896f..edc60a1bb5 100644
--- a/src/library/scala/sys/process/BasicIO.scala
+++ b/src/library/scala/sys/process/BasicIO.scala
@@ -13,15 +13,25 @@ import processInternal._
import java.io.{ BufferedReader, InputStreamReader, FilterInputStream, FilterOutputStream }
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.immutable.Stream
+import scala.annotation.tailrec
/**
* This object contains factories for [[scala.sys.process.ProcessIO]],
* which can be used to control the I/O of a [[scala.sys.process.Process]]
* when a [[scala.sys.process.ProcessBuilder]] is started with the `run`
* command.
+ *
+ * It also contains some helper methods that can be used to in the creation of
+ * `ProcessIO`.
+ *
+ * It is used by other classes in the package in the implementation of various
+ * features, but can also be used by client code.
*/
object BasicIO {
+ /** Size of the buffer used in all the functions that copy data */
final val BufferSize = 8192
+
+ /** Used to separate lines in the `processFully` function that takes `Appendable`. */
final val Newline = props("line.separator")
private[process] final class Streamed[T](
@@ -52,15 +62,70 @@ object BasicIO {
def protect(out: OutputStream): OutputStream = if ((out eq stdout) || (out eq stderr)) Uncloseable(out) else out
}
+ /** Creates a `ProcessIO` from a function `String => Unit`. It can attach the
+ * process input to stdin, and it will either send the error stream to
+ * stderr, or to a `ProcessLogger`.
+ *
+ * For example, the `ProcessIO` created below will print all normal output
+ * while ignoring all error output. No input will be provided.
+ * {{{
+ * import scala.sys.process.BasicIO
+ * val errToDevNull = BasicIO(false, println(_), None)
+ * }}}
+ *
+ * @param withIn True if the process input should be attached to stdin.
+ * @param output A function that will be called with the process output.
+ * @param log An optional `ProcessLogger` to which the output should be
+ * sent. If `None`, output will be sent to stderr.
+ * @return A `ProcessIO` with the characteristics above.
+ */
def apply(withIn: Boolean, output: String => Unit, log: Option[ProcessLogger]) =
new ProcessIO(input(withIn), processFully(output), getErr(log))
+ /** Creates a `ProcessIO` that appends its output to a `StringBuffer`. It can
+ * attach the process input to stdin, and it will either send the error
+ * stream to stderr, or to a `ProcessLogger`.
+ *
+ * For example, the `ProcessIO` created by the function below will store the
+ * normal output on the buffer provided, and print all error on stderr. The
+ * input will be read from stdin.
+ * {{{
+ * import scala.sys.process.{BasicIO, ProcessLogger}
+ * val printer = ProcessLogger(println(_))
+ * def appendToBuffer(b: StringBuffer) = BasicIO(true, b, Some(printer))
+ * }}}
+ *
+ * @param withIn True if the process input should be attached to stdin.
+ * @param buffer A `StringBuffer` which will receive the process normal
+ * output.
+ * @param log An optional `ProcessLogger` to which the output should be
+ * sent. If `None`, output will be sent to stderr.
+ * @return A `ProcessIO` with the characteristics above.
+ */
def apply(withIn: Boolean, buffer: StringBuffer, log: Option[ProcessLogger]) =
new ProcessIO(input(withIn), processFully(buffer), getErr(log))
+ /** Creates a `ProcessIO` from a `ProcessLogger` . It can attach the
+ * process input to stdin.
+ *
+ * @param withIn True if the process input should be attached to stdin.
+ * @param log A `ProcessLogger` to receive all output, normal and error.
+ * @return A `ProcessIO` with the characteristics above.
+ */
def apply(withIn: Boolean, log: ProcessLogger) =
new ProcessIO(input(withIn), processOutFully(log), processErrFully(log))
+ /** Returns a function `InputStream => Unit` given an optional
+ * `ProcessLogger`. If no logger is passed, the function will send the output
+ * to stderr. This function can be used to create a
+ * [[scala.sys.process.ProcessIO]].
+ *
+ * @param log An optional `ProcessLogger` to which the contents of
+ * the `InputStream` will be sent.
+ * @return A function `InputStream => Unit` (used by
+ * [[scala.sys.process.ProcessIO]]) which will send the data to
+ * either the provided `ProcessLogger` or, if `None`, to stderr.
+ */
def getErr(log: Option[ProcessLogger]) = log match {
case Some(lg) => processErrFully(lg)
case None => toStdErr
@@ -69,13 +134,40 @@ object BasicIO {
private def processErrFully(log: ProcessLogger) = processFully(log err _)
private def processOutFully(log: ProcessLogger) = processFully(log out _)
+ /** Closes a `Closeable` without throwing an exception */
def close(c: Closeable) = try c.close() catch { case _: IOException => () }
+
+ /** Returns a function `InputStream => Unit` that appends all data read to the
+ * provided `Appendable`. This function can be used to create a
+ * [[scala.sys.process.ProcessIO]]. The buffer will be appended line by line.
+ *
+ * @param buffer An `Appendable` such as `StringBuilder` or `StringBuffer`.
+ * @return A function `InputStream => Unit` (used by
+ * [[scala.sys.process.ProcessIO]] which will append all data read
+ * from the stream to the buffer.
+ */
def processFully(buffer: Appendable): InputStream => Unit = processFully(appendLine(buffer))
+
+ /** Returns a function `InputStream => Unit` that will call the passed
+ * function with all data read. This function can be used to create a
+ * [[scala.sys.process.ProcessIO]]. The `processLine` function will be called
+ * with each line read, and `Newline` will be appended after each line.
+ *
+ * @param processLine A function that will be called with all data read from
+ * the stream.
+ * @return A function `InputStream => Unit` (used by
+ * [[scala.sys.process.ProcessIO]] which will call `processLine`
+ * with all data read from the stream.
+ */
def processFully(processLine: String => Unit): InputStream => Unit = in => {
val reader = new BufferedReader(new InputStreamReader(in))
processLinesFully(processLine)(reader.readLine)
+ reader.close()
}
+ /** Calls `processLine` with the result of `readLine` until the latter returns
+ * `null`.
+ */
def processLinesFully(processLine: String => Unit)(readLine: () => String) {
def readFully() {
val line = readLine()
@@ -86,14 +178,38 @@ object BasicIO {
}
readFully()
}
- def connectToIn(o: OutputStream): Unit = transferFully(stdin, o)
- def input(connect: Boolean): OutputStream => Unit = if (connect) connectToIn else _ => ()
+
+ /** Copy contents of stdin to the `OutputStream`. */
+ def connectToIn(o: OutputStream): Unit = transferFully(Uncloseable protect stdin, o)
+
+ /** Returns a function `OutputStream => Unit` that either reads the content
+ * from stdin or does nothing. This function can be used by
+ * [[scala.sys.process.ProcessIO]].
+ */
+ def input(connect: Boolean): OutputStream => Unit = { outputToProcess =>
+ if (connect) connectToIn(outputToProcess)
+ outputToProcess.close()
+ }
+
+ /** Returns a `ProcessIO` connected to stdout and stderr, and, optionally, stdin. */
def standard(connectInput: Boolean): ProcessIO = standard(input(connectInput))
+
+ /** Retruns a `ProcessIO` connected to stdout, stderr and the provided `in` */
def standard(in: OutputStream => Unit): ProcessIO = new ProcessIO(in, toStdOut, toStdErr)
+ /** Send all the input from the stream to stderr, and closes the input stream
+ * afterwards.
+ */
def toStdErr = (in: InputStream) => transferFully(in, stderr)
+
+ /** Send all the input from the stream to stdout, and closes the input stream
+ * afterwards.
+ */
def toStdOut = (in: InputStream) => transferFully(in, stdout)
+ /** Copy all input from the input stream to the output stream. Closes the
+ * input stream once it's all read.
+ */
def transferFully(in: InputStream, out: OutputStream): Unit =
try transferFullyImpl(in, out)
catch onInterrupt(())
@@ -105,14 +221,16 @@ object BasicIO {
private[this] def transferFullyImpl(in: InputStream, out: OutputStream) {
val buffer = new Array[Byte](BufferSize)
- def loop() {
+ @tailrec def loop() {
val byteCount = in.read(buffer)
if (byteCount > 0) {
out.write(buffer, 0, byteCount)
- out.flush()
- loop()
+ // flush() will throw an exception once the process has terminated
+ val available = try { out.flush(); true } catch { case _: IOException => false }
+ if (available) loop()
}
}
loop()
+ in.close()
}
}
diff --git a/src/library/scala/sys/process/Process.scala b/src/library/scala/sys/process/Process.scala
index b8765aa615..c2a61af936 100644
--- a/src/library/scala/sys/process/Process.scala
+++ b/src/library/scala/sys/process/Process.scala
@@ -13,7 +13,7 @@ import processInternal._
import ProcessBuilder._
/** Represents a process that is running or has finished running.
- * It may be a compound process with several underlying native processes (such as 'a #&& b`).
+ * It may be a compound process with several underlying native processes (such as `a #&& b`).
*
* This trait is often not used directly, though its companion object contains
* factories for [[scala.sys.process.ProcessBuilder]], the main component of this
@@ -42,28 +42,28 @@ object Process extends ProcessImpl with ProcessCreation { }
* found on and used through [[scala.sys.process.Process]]'s companion object.
*/
trait ProcessCreation {
- /** Create a [[scala.sys.process.ProcessBuilder]] from a `String`, including the
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a `String`, including the
* parameters.
*
* @example {{{ apply("cat file.txt") }}}
*/
def apply(command: String): ProcessBuilder = apply(command, None)
- /** Create a [[scala.sys.process.ProcessBuilder]] from a sequence of `String`,
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a sequence of `String`,
* where the head is the command and each element of the tail is a parameter.
*
* @example {{{ apply("cat" :: files) }}}
*/
def apply(command: Seq[String]): ProcessBuilder = apply(command, None)
- /** Create a [[scala.sys.process.ProcessBuilder]] from a command represented by a `String`,
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a command represented by a `String`,
* and a sequence of `String` representing the arguments.
*
* @example {{{ apply("cat", files) }}}
*/
def apply(command: String, arguments: Seq[String]): ProcessBuilder = apply(command +: arguments, None)
- /** Create a [[scala.sys.process.ProcessBuilder]] with working dir set to `File` and extra
+ /** Creates a [[scala.sys.process.ProcessBuilder]] with working dir set to `File` and extra
* environment variables.
*
* @example {{{ apply("java", new java.ioFile("/opt/app"), "CLASSPATH" -> "library.jar") }}}
@@ -71,7 +71,7 @@ trait ProcessCreation {
def apply(command: String, cwd: File, extraEnv: (String, String)*): ProcessBuilder =
apply(command, Some(cwd), extraEnv: _*)
- /** Create a [[scala.sys.process.ProcessBuilder]] with working dir set to `File` and extra
+ /** Creates a [[scala.sys.process.ProcessBuilder]] with working dir set to `File` and extra
* environment variables.
*
* @example {{{ apply("java" :: javaArgs, new java.ioFile("/opt/app"), "CLASSPATH" -> "library.jar") }}}
@@ -79,7 +79,7 @@ trait ProcessCreation {
def apply(command: Seq[String], cwd: File, extraEnv: (String, String)*): ProcessBuilder =
apply(command, Some(cwd), extraEnv: _*)
- /** Create a [[scala.sys.process.ProcessBuilder]] with working dir optionally set to
+ /** Creates a [[scala.sys.process.ProcessBuilder]] with working dir optionally set to
* `File` and extra environment variables.
*
* @example {{{ apply("java", params.get("cwd"), "CLASSPATH" -> "library.jar") }}}
@@ -93,7 +93,7 @@ trait ProcessCreation {
}*/
}
- /** Create a [[scala.sys.process.ProcessBuilder]] with working dir optionally set to
+ /** Creates a [[scala.sys.process.ProcessBuilder]] with working dir optionally set to
* `File` and extra environment variables.
*
* @example {{{ apply("java" :: javaArgs, params.get("cwd"), "CLASSPATH" -> "library.jar") }}}
@@ -105,7 +105,7 @@ trait ProcessCreation {
apply(jpb)
}
- /** create a [[scala.sys.process.ProcessBuilder]] from a `java.lang.ProcessBuilder`.
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a `java.lang.ProcessBuilder`.
*
* @example {{{
* apply((new java.lang.ProcessBuilder("ls", "-l")) directory new java.io.File(System.getProperty("user.home")))
@@ -113,19 +113,19 @@ trait ProcessCreation {
*/
def apply(builder: JProcessBuilder): ProcessBuilder = new Simple(builder)
- /** create a [[scala.sys.process.ProcessBuilder]] from a `java.io.File`. This
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a `java.io.File`. This
* `ProcessBuilder` can then be used as a `Source` or a `Sink`, so one can
* pipe things from and to it.
*/
def apply(file: File): FileBuilder = new FileImpl(file)
- /** Create a [[scala.sys.process.ProcessBuilder]] from a `java.net.URL`. This
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a `java.net.URL`. This
* `ProcessBuilder` can then be used as a `Source`, so that one can pipe things
* from it.
*/
def apply(url: URL): URLBuilder = new URLImpl(url)
- /** Create a [[scala.sys.process.ProcessBuilder]] from a Scala XML Element.
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a Scala XML Element.
* This can be used as a way to template strings.
*
* @example {{{
@@ -134,23 +134,23 @@ trait ProcessCreation {
*/
def apply(command: scala.xml.Elem): ProcessBuilder = apply(command.text.trim)
- /** Create a [[scala.sys.process.ProcessBuilder]] from a `Boolean`. This can be
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a `Boolean`. This can be
* to force an exit value.
*/
def apply(value: Boolean): ProcessBuilder = apply(value.toString, if (value) 0 else 1)
- /** Create a [[scala.sys.process.ProcessBuilder]] from a `String` name and a
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a `String` name and a
* `Boolean`. This can be used to force an exit value, with the name being
* used for `toString`.
*/
def apply(name: String, exitValue: => Int): ProcessBuilder = new Dummy(name, exitValue)
- /** Create a sequence of [[scala.sys.process.ProcessBuilder.Source]] from a sequence of
+ /** Creates a sequence of [[scala.sys.process.ProcessBuilder.Source]] from a sequence of
* something else for which there's an implicit conversion to `Source`.
*/
def applySeq[T](builders: Seq[T])(implicit convert: T => Source): Seq[Source] = builders.map(convert)
- /** Create a [[scala.sys.process.ProcessBuilder]] from one or more
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from one or more
* [[scala.sys.process.ProcessBuilder.Source]], which can then be
* piped to something else.
*
@@ -170,7 +170,7 @@ trait ProcessCreation {
*/
def cat(file: Source, files: Source*): ProcessBuilder = cat(file +: files)
- /** Create a [[scala.sys.process.ProcessBuilder]] from a non-empty sequence
+ /** Creates a [[scala.sys.process.ProcessBuilder]] from a non-empty sequence
* of [[scala.sys.process.ProcessBuilder.Source]], which can then be
* piped to something else.
*
@@ -198,18 +198,41 @@ trait ProcessImplicits {
/** Implicitly convert a `java.lang.ProcessBuilder` into a Scala one. */
implicit def builderToProcess(builder: JProcessBuilder): ProcessBuilder = apply(builder)
- /** Implicitly convert a `java.io.File` into a [[scala.sys.process.ProcessBuilder]] */
+ /** Implicitly convert a `java.io.File` into a
+ * [[scala.sys.process.ProcessBuilder.FileBuilder]], which can be used as
+ * either input or output of a process. For example:
+ * {{{
+ * import scala.sys.process._
+ * "ls" #> new java.io.File("dirContents.txt") !
+ * }}}
+ */
implicit def fileToProcess(file: File): FileBuilder = apply(file)
- /** Implicitly convert a `java.net.URL` into a [[scala.sys.process.ProcessBuilder]] */
+ /** Implicitly convert a `java.net.URL` into a
+ * [[scala.sys.process.ProcessBuilder.URLBuilder]] , which can be used as
+ * input to a process. For example:
+ * {{{
+ * import scala.sys.process._
+ * Seq("xmllint", "--html", "-") #< new java.net.URL("http://www.scala-lang.org") #> new java.io.File("fixed.html") !
+ * }}}
+ */
implicit def urlToProcess(url: URL): URLBuilder = apply(url)
- /** Implicitly convert a [[scala.xml.Elem]] into a [[scala.sys.process.ProcessBuilder]] */
+ /** Implicitly convert a [[scala.xml.Elem]] into a
+ * [[scala.sys.process.ProcessBuilder]]. This is done by obtaining the text
+ * elements of the element, trimming spaces, and then converting the result
+ * from string to a process. Importantly, tags are completely ignored, so
+ * they cannot be used to separate parameters.
+ */
implicit def xmlToProcess(command: scala.xml.Elem): ProcessBuilder = apply(command)
- /** Implicitly convert a `String` into a [[scala.sys.process.ProcessBuilder]] */
+ /** Implicitly convert a `String` into a [[scala.sys.process.ProcessBuilder]]. */
implicit def stringToProcess(command: String): ProcessBuilder = apply(command)
- /** Implicitly convert a sequence of `String` into a [[scala.sys.process.ProcessBuilder]] */
+ /** Implicitly convert a sequence of `String` into a
+ * [[scala.sys.process.ProcessBuilder]]. The first argument will be taken to
+ * be the command to be executed, and the remaining will be its arguments.
+ * When using this, arguments may contain spaces.
+ */
implicit def stringSeqToProcess(command: Seq[String]): ProcessBuilder = apply(command)
}
diff --git a/src/library/scala/sys/process/ProcessBuilder.scala b/src/library/scala/sys/process/ProcessBuilder.scala
index 214d908012..20270d423f 100644
--- a/src/library/scala/sys/process/ProcessBuilder.scala
+++ b/src/library/scala/sys/process/ProcessBuilder.scala
@@ -12,133 +12,265 @@ package process
import processInternal._
import ProcessBuilder._
-/** Represents a runnable process.
+/** Represents a sequence of one or more external processes that can be
+ * executed. A `ProcessBuilder` can be a single external process, or a
+ * combination of other `ProcessBuilder`. One can control where a
+ * the output of an external process will go to, and where its input will come
+ * from, or leave that decision to whoever starts it.
*
- * This is the main component of this package. A `ProcessBuilder` may be composed with
- * others, either concatenating their outputs or piping them from one to the next, and
- * possibly with conditional execution depending on the last process exit value.
+ * One creates a `ProcessBuilder` through factories provided in
+ * [[scala.sys.process.Process]]'s companion object, or implicit conversions
+ * based on these factories made available in the package object
+ * [[scala.sys.process]]. Here are some examples:
+ * {{{
+ * import.scala.sys.process._
*
- * Once executed, one can retrieve the output or redirect it to a
- * [[scala.sys.process.ProcessLogger]], or one can get the exit value, discarding or
- * redirecting the output.
+ * // Executes "ls" and sends output to stdout
+ * "ls".!
*
- * One creates a `ProcessBuilder` through factories provided in [[scala.sys.process.Process]]'s
- * companion object, or implicit conversions based on these factories made available in the
- * package object [[scala.sys.process]].
+ * // Execute "ls" and assign a `Stream[String]` of its output to "contents".
+ * // Because [[scala.Predef]] already defines a `lines` method for `String`,
+ * // we use [[scala.sys.process.Process]]'s object companion to create it.
+ * val contents = Process("ls").lines
*
- * Let's examine in detail one example of usage:
+ * // Here we use a `Seq` to make the parameter whitespace-safe
+ * def contentsOf(dir: String): String = Seq("ls", dir).!!
+ * }}}
+ *
+ * The methods of `ProcessBuilder` are divided in three categories: the ones that
+ * combine two `ProcessBuilder` to create a third, the ones that redirect input
+ * or output of a `ProcessBuilder`, and the ones that execute
+ * the external processes associated with it.
+ *
+ * ==Combining `ProcessBuilder`==
+ *
+ * Two existing `ProcessBuilder` can be combined in the following ways:
+ *
+ * * They can be executed in parallel, with the output of the first being fed
+ * as input to the second, like Unix pipes. This is achieved with the `#|`
+ * method.
+ * * They can be executed in sequence, with the second starting as soon as
+ * the first ends. This is done by the `###` method.
+ * * The execution of the second one can be conditioned by the return code
+ * (exit status) of the first, either only when it's zero, or only when it's
+ * not zero. The methods `#&&` and `#||` accomplish these tasks.
+ *
+ * ==Redirecting Input/Output==
+ *
+ * Though control of input and output can be done when executing the process,
+ * there's a few methods that create a new `ProcessBuilder` with a
+ * pre-configured input or output. They are `#<`, `#>` and `#>>`, and may take
+ * as input either another `ProcessBuilder` (like the pipe described above), or
+ * something else such as a `java.io.File` or a `java.lang.InputStream`.
+ * For example:
+ * {{{
+ * new URL("http://databinder.net/dispatch/About") #> "grep JSON" #>> new File("About_JSON") !
+ * }}}
+ *
+ * ==Starting Processes==
+ *
+ * To execute all external commands associated with a `ProcessBuilder`, one
+ * may use one of four groups of methods. Each of these methods have various
+ * overloads and variations to enable further control over the I/O. These
+ * methods are:
+ *
+ * * `run`: the most general method, it returns a
+ * [[scala.sys.process.Process]] immediately, and the external command
+ * executes concurrently.
+ * * `!`: blocks until all external commands exit, and returns the exit code
+ * of the last one in the chain of execution.
+ * * `!!`: blocks until all external commands exit, and returns a `String`
+ * with the output generated.
+ * * `lines`: returns immediately like `run`, and the output being generared
+ * is provided through a `Stream[String]`. Getting the next element of that
+ * `Stream` may block until it becomes available. This method will throw an
+ * exception if the return code is different than zero -- if this is not
+ * desired, use the `lines_!` method.
+ *
+ * ==Handling Input and Output==
+ *
+ * If not specified, the input of the external commands executed with `run` or
+ * `!` will not be tied to anything, and the output will be redirected to the
+ * stdout and stderr of the Scala process. For the methods `!!` and `lines`, no
+ * input will be provided, and the output will be directed according to the
+ * semantics of these methods.
*
+ * Some methods will cause stdin to be used as input. Output can be controlled
+ * with a [[scala.sys.process.ProcessLogger]] -- `!!` and `lines` will only
+ * redirect error output when passed a `ProcessLogger`. If one desires full
+ * control over input and output, then a [[scala.sys.process.ProcessIO]] can be
+ * used with `run`.
+ *
+ * For example, we could silence the error output from `lines_!` like this:
+ * {{{
+ * val etcFiles = "find /etc" lines_! ProcessLogger(line => ())
+ * }}}
+ *
+ * ==Extended Example==
+ *
+ * Let's examine in detail one example of usage:
* {{{
* import scala.sys.process._
* "find src -name *.scala -exec grep null {} ;" #| "xargs test -z" #&& "echo null-free" #|| "echo null detected" !
* }}}
- *
* Note that every `String` is implicitly converted into a `ProcessBuilder`
* through the implicits imported from [[scala.sys.process]]. These `ProcessBuilder` are then
* combined in three different ways.
*
* 1. `#|` pipes the output of the first command into the input of the second command. It
- * mirrors a shell pipe (`|`).
- * 2. `#&&` conditionally executes the second command if the previous one finished with
- * exit value 0. It mirrors shell's `&&`.
- * 3. `#||` conditionally executes the third command if the exit value of the previous
- * command is different than zero. It mirrors shell's `&&`.
- *
- * Not shown here, the equivalent of a shell's `;` would be `###`. The reason for this name is
- * that `;` is a reserved token in Scala.
- *
- * Finally, `!` at the end executes the commands, and returns the exit value. If the output
- * was desired instead, one could run that with `!!` instead.
- *
- * If one wishes to execute the commands in background, one can either call `run`, which
- * returns a [[scala.sys.process.Process]] from which the exit value can be obtained, or
- * `lines`, which returns a [scala.collection.immutable.Stream] of output lines. This throws
- * an exception at the end of the `Stream` is the exit value is non-zero. To avoid exceptions,
- * one can use `lines_!` instead.
- *
- * One can also start the commands in specific ways to further control their I/O. Using `!<` to
- * start the commands will use the stdin from the current process for them. All methods can
- * be used passing a [[scala.sys.process.ProcessLogger]] to capture the output, both stderr and
- * stdout. And, when using `run`, one can pass a [[scala.sys.process.ProcessIO]] to control
- * stdin, stdout and stderr.
- *
- * The stdin of a command can be redirected from a `java.io.InputStream`, a `java.io.File`, a
- * `java.net.URL` or another `ProcessBuilder` through the method `#<`. Likewise, the stdout
- * can be sent to a `java.io.OutputStream`, a `java.io.File` or another `ProcessBuilder` with
- * the method `#>`. The method `#>>` can be used to append the output to a `java.io.File`.
- * For example:
+ * mirrors a shell pipe (`|`).
+ * 1. `#&&` conditionally executes the second command if the previous one finished with
+ * exit value 0. It mirrors shell's `&&`.
+ * 1. `#||` conditionally executes the third command if the exit value of the previous
+ * command is different than zero. It mirrors shell's `&&`.
*
- * {{{
- * new URL("http://databinder.net/dispatch/About") #> "grep JSON" #>> new File("About_JSON") !
- * }}}
+ * Finally, `!` at the end executes the commands, and returns the exit value.
+ * Whatever is printed will be sent to the Scala process standard output. If
+ * we wanted to caputre it, we could run that with `!!` instead.
+ *
+ * Note: though it is not shown above, the equivalent of a shell's `;` would be
+ * `###`. The reason for this name is that `;` is a reserved token in Scala.
*/
trait ProcessBuilder extends Source with Sink {
- /** Starts the process represented by this builder, blocks until it exits, and returns the output as a String. Standard error is
- * sent to the console. If the exit code is non-zero, an exception is thrown.*/
+ /** Starts the process represented by this builder, blocks until it exits, and
+ * returns the output as a String. Standard error is sent to the console. If
+ * the exit code is non-zero, an exception is thrown.
+ */
def !! : String
- /** Starts the process represented by this builder, blocks until it exits, and returns the output as a String. Standard error is
- * sent to the provided ProcessLogger. If the exit code is non-zero, an exception is thrown.*/
+
+ /** Starts the process represented by this builder, blocks until it exits, and
+ * returns the output as a String. Standard error is sent to the provided
+ * ProcessLogger. If the exit code is non-zero, an exception is thrown.
+ */
def !!(log: ProcessLogger): String
- /** Starts the process represented by this builder. The output is returned as a Stream that blocks when lines are not available
- * but the process has not completed. Standard error is sent to the console. If the process exits with a non-zero value,
- * the Stream will provide all lines up to termination and then throw an exception. */
+
+ /** Starts the process represented by this builder, blocks until it exits, and
+ * returns the output as a String. Standard error is sent to the console. If
+ * the exit code is non-zero, an exception is thrown. The newly started
+ * process reads from standard input of the current process.
+ */
+ def !!< : String
+
+ /** Starts the process represented by this builder, blocks until it exits, and
+ * returns the output as a String. Standard error is sent to the provided
+ * ProcessLogger. If the exit code is non-zero, an exception is thrown. The
+ * newly started process reads from standard input of the current process.
+ */
+ def !!<(log: ProcessLogger): String
+
+ /** Starts the process represented by this builder. The output is returned as
+ * a Stream that blocks when lines are not available but the process has not
+ * completed. Standard error is sent to the console. If the process exits
+ * with a non-zero value, the Stream will provide all lines up to termination
+ * and then throw an exception.
+ */
def lines: Stream[String]
- /** Starts the process represented by this builder. The output is returned as a Stream that blocks when lines are not available
- * but the process has not completed. Standard error is sent to the provided ProcessLogger. If the process exits with a non-zero value,
- * the Stream will provide all lines up to termination but will not throw an exception. */
+
+ /** Starts the process represented by this builder. The output is returned as
+ * a Stream that blocks when lines are not available but the process has not
+ * completed. Standard error is sent to the provided ProcessLogger. If the
+ * process exits with a non-zero value, the Stream will provide all lines up
+ * to termination but will not throw an exception.
+ */
def lines(log: ProcessLogger): Stream[String]
- /** Starts the process represented by this builder. The output is returned as a Stream that blocks when lines are not available
- * but the process has not completed. Standard error is sent to the console. If the process exits with a non-zero value,
- * the Stream will provide all lines up to termination but will not throw an exception. */
+
+ /** Starts the process represented by this builder. The output is returned as
+ * a Stream that blocks when lines are not available but the process has not
+ * completed. Standard error is sent to the console. If the process exits
+ * with a non-zero value, the Stream will provide all lines up to termination
+ * but will not throw an exception.
+ */
def lines_! : Stream[String]
- /** Starts the process represented by this builder. The output is returned as a Stream that blocks when lines are not available
- * but the process has not completed. Standard error is sent to the provided ProcessLogger. If the process exits with a non-zero value,
- * the Stream will provide all lines up to termination but will not throw an exception. */
+
+ /** Starts the process represented by this builder. The output is returned as
+ * a Stream that blocks when lines are not available but the process has not
+ * completed. Standard error is sent to the provided ProcessLogger. If the
+ * process exits with a non-zero value, the Stream will provide all lines up
+ * to termination but will not throw an exception.
+ */
def lines_!(log: ProcessLogger): Stream[String]
- /** Starts the process represented by this builder, blocks until it exits, and returns the exit code. Standard output and error are
- * sent to the console.*/
+
+ /** Starts the process represented by this builder, blocks until it exits, and
+ * returns the exit code. Standard output and error are sent to the console.
+ */
def ! : Int
- /** Starts the process represented by this builder, blocks until it exits, and returns the exit code. Standard output and error are
- * sent to the given ProcessLogger.*/
+
+ /** Starts the process represented by this builder, blocks until it exits, and
+ * returns the exit code. Standard output and error are sent to the given
+ * ProcessLogger.
+ */
def !(log: ProcessLogger): Int
- /** Starts the process represented by this builder, blocks until it exits, and returns the exit code. Standard output and error are
- * sent to the console. The newly started process reads from standard input of the current process.*/
+
+ /** Starts the process represented by this builder, blocks until it exits, and
+ * returns the exit code. Standard output and error are sent to the console.
+ * The newly started process reads from standard input of the current process.
+ */
def !< : Int
- /** Starts the process represented by this builder, blocks until it exits, and returns the exit code. Standard output and error are
- * sent to the given ProcessLogger. The newly started process reads from standard input of the current process.*/
+
+ /** Starts the process represented by this builder, blocks until it exits, and
+ * returns the exit code. Standard output and error are sent to the given
+ * ProcessLogger. The newly started process reads from standard input of the
+ * current process.
+ */
def !<(log: ProcessLogger): Int
- /** Starts the process represented by this builder. Standard output and error are sent to the console.*/
+
+ /** Starts the process represented by this builder. Standard output and error
+ * are sent to the console.*/
def run(): Process
- /** Starts the process represented by this builder. Standard output and error are sent to the given ProcessLogger.*/
+
+ /** Starts the process represented by this builder. Standard output and error
+ * are sent to the given ProcessLogger.
+ */
def run(log: ProcessLogger): Process
- /** Starts the process represented by this builder. I/O is handled by the given ProcessIO instance.*/
+
+ /** Starts the process represented by this builder. I/O is handled by the
+ * given ProcessIO instance.
+ */
def run(io: ProcessIO): Process
- /** Starts the process represented by this builder. Standard output and error are sent to the console.
- * The newly started process reads from standard input of the current process if `connectInput` is true.*/
+
+ /** Starts the process represented by this builder. Standard output and error
+ * are sent to the console. The newly started process reads from standard
+ * input of the current process if `connectInput` is true.
+ */
def run(connectInput: Boolean): Process
- /** Starts the process represented by this builder, blocks until it exits, and returns the exit code. Standard output and error are
- * sent to the given ProcessLogger.
- * The newly started process reads from standard input of the current process if `connectInput` is true.*/
+
+ /** Starts the process represented by this builder, blocks until it exits, and
+ * returns the exit code. Standard output and error are sent to the given
+ * ProcessLogger. The newly started process reads from standard input of the
+ * current process if `connectInput` is true.
+ */
def run(log: ProcessLogger, connectInput: Boolean): Process
- /** Constructs a command that runs this command first and then `other` if this command succeeds.*/
+ /** Constructs a command that runs this command first and then `other` if this
+ * command succeeds.
+ */
def #&& (other: ProcessBuilder): ProcessBuilder
- /** Constructs a command that runs this command first and then `other` if this command does not succeed.*/
+
+ /** Constructs a command that runs this command first and then `other` if this
+ * command does not succeed.
+ */
def #|| (other: ProcessBuilder): ProcessBuilder
- /** Constructs a command that will run this command and pipes the output to `other`. `other` must be a simple command.*/
+
+ /** Constructs a command that will run this command and pipes the output to
+ * `other`. `other` must be a simple command.
+ */
def #| (other: ProcessBuilder): ProcessBuilder
- /** Constructs a command that will run this command and then `other`. The exit code will be the exit code of `other`.*/
+
+ /** Constructs a command that will run this command and then `other`. The
+ * exit code will be the exit code of `other`.
+ */
def ### (other: ProcessBuilder): ProcessBuilder
- /** True if this command can be the target of a pipe.
- */
+
+ /** True if this command can be the target of a pipe. */
def canPipeTo: Boolean
- /** True if this command has an exit code which should be propagated to the user.
- * Given a pipe between A and B, if B.hasExitValue is true then the exit code will
- * be the one from B; if it is false, the one from A. This exists to prevent output
- * redirections (implemented as pipes) from masking useful process error codes.
- */
+ /** True if this command has an exit code which should be propagated to the
+ * user. Given a pipe between A and B, if B.hasExitValue is true then the
+ * exit code will be the one from B; if it is false, the one from A. This
+ * exists to prevent output redirections (implemented as pipes) from masking
+ * useful process error codes.
+ */
def hasExitValue: Boolean
}
diff --git a/src/library/scala/sys/process/ProcessIO.scala b/src/library/scala/sys/process/ProcessIO.scala
index 261e837a4d..fa0674670f 100644
--- a/src/library/scala/sys/process/ProcessIO.scala
+++ b/src/library/scala/sys/process/ProcessIO.scala
@@ -11,14 +11,40 @@ package process
import processInternal._
-/** This class is used to control the I/O of every [[scala.sys.process.ProcessBuilder]].
- * Most of the time, there is no need to interact with `ProcessIO` directly. However, if
- * fine control over the I/O of a `ProcessBuilder` is desired, one can use the factories
- * on [[scala.sys.process.BasicIO]] stand-alone object to create one.
- *
- * Each method will be called in a separate thread.
- * If daemonizeThreads is true, they will all be marked daemon threads.
- */
+/** This class is used to control the I/O of every
+ * [[scala.sys.process.Process]]. The functions used to create it will be
+ * called with the process streams once it has been started. It might not be
+ * necessary to use `ProcessIO` directly --
+ * [[scala.sys.process.ProcessBuilder]] can return the process output to the
+ * caller, or use a [[scala.sys.process.ProcessLogger]] which avoids direct
+ * interaction with a stream. One can even use the factories at `BasicIO` to
+ * create a `ProcessIO`, or use its helper methods when creating one's own
+ * `ProcessIO`.
+ *
+ * When creating a `ProcessIO`, it is important to ''close all streams'' when
+ * finished, since the JVM might use system resources to capture the process
+ * input and output, and will not release them unless the streams are
+ * explicitly closed.
+ *
+ * `ProcessBuilder` will call `writeInput`, `processOutput` and `processError`
+ * in separate threads, and if daemonizeThreads is true, they will all be
+ * marked as daemon threads.
+ *
+ * @param writeInput Function that will be called with the `OutputStream` to
+ * which all input to the process must be written. This will
+ * be called in a newly spawned thread.
+ * @param processOutput Function that will be called with the `InputStream`
+ * from which all normal output of the process must be
+ * read from. This will be called in a newly spawned
+ * thread.
+ * @param processError Function that will be called with the `InputStream` from
+ * which all error output of the process must be read from.
+ * This will be called in a newly spawned thread.
+ * @param daemonizeThreads Indicates whether the newly spawned threads that
+ * will run `processOutput`, `processError` and
+ * `writeInput` should be marked as daemon threads.
+ * @note Failure to close the passed streams may result in resource leakage.
+ */
final class ProcessIO(
val writeInput: OutputStream => Unit,
val processOutput: InputStream => Unit,
@@ -27,8 +53,15 @@ final class ProcessIO(
) {
def this(in: OutputStream => Unit, out: InputStream => Unit, err: InputStream => Unit) = this(in, out, err, false)
+ /** Creates a new `ProcessIO` with a different handler for the process input. */
def withInput(write: OutputStream => Unit): ProcessIO = new ProcessIO(write, processOutput, processError, daemonizeThreads)
+
+ /** Creates a new `ProcessIO` with a different handler for the normal output. */
def withOutput(process: InputStream => Unit): ProcessIO = new ProcessIO(writeInput, process, processError, daemonizeThreads)
+
+ /** Creates a new `ProcessIO` with a different handler for the error output. */
def withError(process: InputStream => Unit): ProcessIO = new ProcessIO(writeInput, processOutput, process, daemonizeThreads)
+
+ /** Creates a new `ProcessIO`, with `daemonizeThreads` true. */
def daemonized(): ProcessIO = new ProcessIO(writeInput, processOutput, processError, true)
}
diff --git a/src/library/scala/sys/process/ProcessLogger.scala b/src/library/scala/sys/process/ProcessLogger.scala
index 67146dd70e..a8241db53c 100644
--- a/src/library/scala/sys/process/ProcessLogger.scala
+++ b/src/library/scala/sys/process/ProcessLogger.scala
@@ -11,12 +11,26 @@ package process
import java.io._
-/** Encapsulates the output and error streams of a running process.
- * Many of the methods of `ProcessBuilder` accept a `ProcessLogger` as
- * an argument.
- *
- * @see [[scala.sys.process.ProcessBuilder]]
- */
+/** Encapsulates the output and error streams of a running process. This is used
+ * by [[scala.sys.process.ProcessBuilder]] when starting a process, as an
+ * alternative to [[scala.sys.process.ProcessIO]], which can be more difficult
+ * to use. Note that a `ProcessLogger` will be used to create a `ProcessIO`
+ * anyway. The object `BasicIO` has some functions to do that.
+ *
+ * Here is an example that counts the number of lines in the normal and error
+ * output of a process:
+ * {{{
+ * import scala.sys.process._
+ *
+ * var normalLines = 0
+ * var errorLines = 0
+ * val countLogger = ProcessLogger(line => normalLines += 1,
+ * line => errorLines += 1)
+ * "find /etc" ! countLogger
+ * }}}
+ *
+ * @see [[scala.sys.process.ProcessBuilder]]
+ */
trait ProcessLogger {
/** Will be called with each line read from the process output stream.
*/
diff --git a/src/library/scala/sys/process/package.scala b/src/library/scala/sys/process/package.scala
index 3eb0e5bb89..c1bf470831 100644
--- a/src/library/scala/sys/process/package.scala
+++ b/src/library/scala/sys/process/package.scala
@@ -11,40 +11,175 @@
// for process debugging output.
//
package scala.sys {
- /**
- * This package is used to create process pipelines, similar to Unix command pipelines.
+ /** This package handles the execution of external processes. The contents of
+ * this package can be divided in three groups, according to their
+ * responsibilities:
*
- * The key concept is that one builds a [[scala.sys.process.Process]] that will run and return an exit
- * value. This `Process` is usually composed of one or more [[scala.sys.process.ProcessBuilder]], fed by a
- * [[scala.sys.process.ProcessBuilder.Source]] and feeding a [[scala.sys.process.ProcessBuilder.Sink]]. A
- * `ProcessBuilder` itself is both a `Source` and a `Sink`.
+ * - Indicating what to run and how to run it.
+ * - Handling a process input and output.
+ * - Running the process.
*
- * As `ProcessBuilder`, `Sink` and `Source` are abstract, one usually creates them with `apply` methods on
- * the companion object of [[scala.sys.process.Process]], or through implicit conversions available in this
- * package object from `String` and other types. The pipe is composed through unix-like pipeline and I/O
- * redirection operators available on [[scala.sys.process.ProcessBuilder]].
+ * For simple uses, the only group that matters is the first one. Running an
+ * external command can be as simple as `"ls".!`, or as complex as building a
+ * pipeline of commands such as this:
*
- * The example below shows how to build and combine such commands. It searches for `null` uses in the `src`
- * directory, printing a message indicating whether they were found or not. The first command pipes its
- * output to the second command, whose exit value is then used to choose between the third or fourth
- * commands. This same example is explained in greater detail on [[scala.sys.process.ProcessBuilder]].
+ * {{{
+ * import scala.sys.process._
+ * "ls" #| "grep .scala" #&& "scalac *.scala" #|| "echo nothing found" lines
+ * }}}
+ *
+ * We describe below the general concepts and architecture of the package,
+ * and then take a closer look at each of the categories mentioned above.
+ *
+ * ==Concepts and Architecture==
+ *
+ * The underlying basis for the whole package is Java's `Process` and
+ * `ProcessBuilder` classes. While there's no need to use these Java classes,
+ * they impose boundaries on what is possible. One cannot, for instance,
+ * retrieve a ''process id'' for whatever is executing.
+ *
+ * When executing an external process, one can provide a command's name,
+ * arguments to it, the directory in which it will be executed and what
+ * environment variables will be set. For each executing process, one can
+ * feed its standard input through a `java.io.OutputStream`, and read from
+ * its standard output and standard error through a pair of
+ * `java.io.InputStream`. One can wait until a process finishes execution and
+ * then retrieve its return value, or one can kill an executing process.
+ * Everything else must be built on those features.
+ *
+ * This package provides a DSL for running and chaining such processes,
+ * mimicking Unix shells ability to pipe output from one process to the input
+ * of another, or control the execution of further processes based on the
+ * return status of the previous one.
+ *
+ * In addition to this DSL, this package also provides a few ways of
+ * controlling input and output of these processes, going from simple and
+ * easy to use to complex and flexible.
*
+ * When processes are composed, a new `ProcessBuilder` is created which, when
+ * run, will execute the `ProcessBuilder` instances it is composed of
+ * according to the manner of the composition. If piping one process to
+ * another, they'll be executed simultaneously, and each will be passed a
+ * `ProcessIO` that will copy the output of one to the input of the other.
+ *
+ * ==What to Run and How==
+ *
+ * The central component of the process execution DSL is the
+ * [[scala.sys.process.ProcessBuilder]] trait. It is `ProcessBuilder` that
+ * implements the process execution DSL, that creates the
+ * [[scala.sys.process.Process]] that will handle the execution, and return
+ * the results of such execution to the caller. We can see that DSL in the
+ * introductory example: `#|`, `#&&` and `#!!` are methods on
+ * `ProcessBuilder` used to create a new `ProcessBuilder` through
+ * composition.
+ *
+ * One creates a `ProcessBuilder` either through factories on the
+ * [[scala.sys.process.Process]]'s companion object, or through implicit
+ * conversions available in this package object itself. Implicitly, each
+ * process is created either out of a `String`, with arguments separated by
+ * spaces -- no escaping of spaces is possible -- or out of a
+ * [[scala.collection.Seq]], where the first element represents the command
+ * name, and the remaining elements are arguments to it. In this latter case,
+ * arguments may contain spaces. One can also implicitly convert
+ * [[scala.xml.Elem]] and `java.lang.ProcessBuilder` into a `ProcessBuilder`.
+ * In the introductory example, the strings were converted into
+ * `ProcessBuilder` implicitly.
+ *
+ * To further control what how the process will be run, such as specifying
+ * the directory in which it will be run, see the factories on
+ * [[scala.sys.process.Process]]'s object companion.
+ *
+ * Once the desired `ProcessBuilder` is available, it can be executed in
+ * different ways, depending on how one desires to control its I/O, and what
+ * kind of result one wishes for:
+ *
+ * - Return status of the process (`!` methods)
+ * - Output of the process as a `String` (`!!` methods)
+ * - Continuous output of the process as a `Stream[String]` (`lines` methods)
+ * - The `Process` representing it (`run` methods)
+ *
+ * Some simple examples of these methods:
* {{{
* import scala.sys.process._
- * (
- * "find src -name *.scala -exec grep null {} ;"
- * #| "xargs test -z"
- * #&& "echo null-free" #|| "echo null detected"
- * ) !
+ *
+ * // This uses ! to get the exit code
+ * def fileExists(name: String) = Seq("test", "-f", name).! == 0
+ *
+ * // This uses !! to get the whole result as a string
+ * val dirContents = "ls".!!
+ *
+ * // This "fire-and-forgets" the method, which can be lazily read through
+ * // a Stream[String]
+ * def sourceFilesAt(baseDir: String): Stream[String] = {
+ * val cmd = Seq("find", baseDir, "-name", "*.scala", "-type", "f")
+ * cmd.lines
+ * }
* }}}
*
- * Other implicits available here are for [[scala.sys.process.ProcessBuilder.FileBuilder]], which extends
- * both `Sink` and `Source`, and for [[scala.sys.process.ProcessBuilder.URLBuilder]], which extends
- * `Source` alone.
+ * We'll see more details about controlling I/O of the process in the next
+ * section.
+ *
+ * ==Handling Input and Output==
+ *
+ * In the underlying Java model, once a `Process` has been started, one can
+ * get `java.io.InputStream` and `java.io.OutpuStream` representing its
+ * output and input respectively. That is, what one writes to an
+ * `OutputStream` is turned into input to the process, and the output of a
+ * process can be read from an `InputStream` -- of which there are two, one
+ * representing normal output, and the other representing error output.
+ *
+ * This model creates a difficulty, which is that the code responsible for
+ * actually running the external processes is the one that has to take
+ * decisions about how to handle its I/O.
+ *
+ * This package presents an alternative model: the I/O of a running process
+ * is controlled by a [[scala.sys.process.ProcessIO]] object, which can be
+ * passed _to_ the code that runs the external process. A `ProcessIO` will
+ * have direct access to the java streams associated with the process I/O. It
+ * must, however, close these streams afterwards.
+ *
+ * Simpler abstractions are available, however. The components of this
+ * package that handle I/O are:
+ *
+ * - [[scala.sys.process.ProcessIO]]: provides the low level abstraction.
+ * - [[scala.sys.process.ProcessLogger]]: provides a higher level abstraction
+ * for output, and can be created through its object companion
+ * - [[scala.sys.process.BasicIO]]: a library of helper methods for the
+ * creation of `ProcessIO`.
+ * - This package object itself, with a few implicit conversions.
*
- * One can even create a `Process` solely out of these, without running any command. For example, this will
- * download from a URL to a file:
+ * Some examples of I/O handling:
+ * {{{
+ * import scala.sys.process._
+ *
+ * // An overly complex way of computing size of a compressed file
+ * def gzFileSize(name: String) = {
+ * val cat = Seq("zcat", "name")
+ * var count = 0
+ * def byteCounter(input: java.io.InputStream) = {
+ * while(input.read() != -1) count += 1
+ * input.close()
+ * }
+ * cat ! new ProcessIO(_.close(), byteCounter, _.close())
+ * count
+ * }
+ *
+ * // This "fire-and-forgets" the method, which can be lazily read through
+ * // a Stream[String], and accumulates all errors on a StringBuffer
+ * def sourceFilesAt(baseDir: String): (Stream[String], StringBuffer) = {
+ * val buffer = new StringBuffer()
+ * val cmd = Seq("find", baseDir, "-name", "*.scala", "-type", "f")
+ * val lines = cmd lines_! ProcessLogger(buffer append _)
+ * (lines, buffer)
+ * }
+ * }}}
*
+ * Instances of the java classes `java.io.File` and `java.net.URL` can both
+ * be used directly as input to other processes, and `java.io.File` can be
+ * used as output as well. One can even pipe one to the other directly
+ * without any intervening process, though that's not a design goal or
+ * recommended usage. For example, the following code will copy a web page to
+ * a file:
* {{{
* import java.io.File
* import java.net.URL
@@ -52,26 +187,33 @@ package scala.sys {
* new URL("http://www.scala-lang.org/") #> new File("scala-lang.html") !
* }}}
*
- * One may use a `Process` directly through `ProcessBuilder`'s `run` method, which starts the process in
- * the background, and returns a `Process`. If background execution is not desired, one can get a
- * `ProcessBuilder` to execute through a method such as `!`, `lines`, `run` or variations thereof. That
- * will create the `Process` to execute the commands, and return either the exit value or the output, maybe
- * throwing an exception.
- *
- * Finally, when executing a `ProcessBuilder`, one may pass a [[scala.sys.process.ProcessLogger]] to
- * capture stdout and stderr of the executing processes. A `ProcessLogger` may be created through its
- * companion object from functions of type `(String) => Unit`, or one might redirect it to a file, using
- * [[scala.sys.process.FileProcessLogger]], which can also be created through `ProcessLogger`'s object
- * companion.
+ * More information about the other ways of controlling I/O can be looked at
+ * in the scaladoc for the associated objects, traits and classes.
+ *
+ * ==Running the Process==
+ *
+ * Paradoxically, this is the simplest component of all, and the one least
+ * likely to be interacted with. It consists solely of
+ * [[scala.sys.process.Process]], and it provides only two methods:
+ *
+ * - `exitValue()`: blocks until the process exit, and then returns the exit
+ * value. This is what happens when one uses the `!` method of
+ * `ProcessBuilder`.
+ * - `destroy()`: this will kill the external process and close the streams
+ * associated with it.
*/
package object process extends ProcessImplicits {
+ /** The arguments passed to `java` when creating this process */
def javaVmArguments: List[String] = {
import collection.JavaConversions._
java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toList
}
+ /** The input stream of this process */
def stdin = java.lang.System.in
+ /** The output stream of this process */
def stdout = java.lang.System.out
+ /** The error stream of this process */
def stderr = java.lang.System.err
}
// private val shell: String => Array[String] =
diff --git a/src/library/scala/util/Duration.scala b/src/library/scala/util/Duration.scala
new file mode 100644
index 0000000000..4c118f8b3b
--- /dev/null
+++ b/src/library/scala/util/Duration.scala
@@ -0,0 +1,485 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+
+package scala.util
+
+import java.util.concurrent.TimeUnit
+import TimeUnit._
+import java.lang.{ Long ⇒ JLong, Double ⇒ JDouble }
+//import akka.actor.ActorSystem (commented methods)
+
+class TimerException(message: String) extends RuntimeException(message)
+
+/**
+ * Simple timer class.
+ * Usage:
+ * <pre>
+ * import akka.util.duration._
+ * import akka.util.Timer
+ *
+ * val timer = Timer(30.seconds)
+ * while (timer.isTicking) { ... }
+ * </pre>
+ */
+case class Timer(duration: Duration, throwExceptionOnTimeout: Boolean = false) {
+ val startTimeInMillis = System.currentTimeMillis
+ val timeoutInMillis = duration.toMillis
+
+ /**
+ * Returns true while the timer is ticking. After that it either throws and exception or
+ * returns false. Depending on if the 'throwExceptionOnTimeout' argument is true or false.
+ */
+ def isTicking: Boolean = {
+ if (!(timeoutInMillis > (System.currentTimeMillis - startTimeInMillis))) {
+ if (throwExceptionOnTimeout) throw new TimerException("Time out after " + duration)
+ else false
+ } else true
+ }
+}
+
+object Duration {
+ def apply(length: Long, unit: TimeUnit): Duration = new FiniteDuration(length, unit)
+ def apply(length: Double, unit: TimeUnit): Duration = fromNanos(unit.toNanos(1) * length)
+ def apply(length: Long, unit: String): Duration = new FiniteDuration(length, timeUnit(unit))
+
+ def fromNanos(nanos: Long): Duration = {
+ if (nanos % 86400000000000L == 0) {
+ Duration(nanos / 86400000000000L, DAYS)
+ } else if (nanos % 3600000000000L == 0) {
+ Duration(nanos / 3600000000000L, HOURS)
+ } else if (nanos % 60000000000L == 0) {
+ Duration(nanos / 60000000000L, MINUTES)
+ } else if (nanos % 1000000000L == 0) {
+ Duration(nanos / 1000000000L, SECONDS)
+ } else if (nanos % 1000000L == 0) {
+ Duration(nanos / 1000000L, MILLISECONDS)
+ } else if (nanos % 1000L == 0) {
+ Duration(nanos / 1000L, MICROSECONDS)
+ } else {
+ Duration(nanos, NANOSECONDS)
+ }
+ }
+
+ def fromNanos(nanos: Double): Duration = fromNanos((nanos + 0.5).asInstanceOf[Long])
+
+ /**
+ * Construct a Duration by parsing a String. In case of a format error, a
+ * RuntimeException is thrown. See `unapply(String)` for more information.
+ */
+ def apply(s: String): Duration = unapply(s) getOrElse sys.error("format error")
+
+ /**
+ * Deconstruct a Duration into length and unit if it is finite.
+ */
+ def unapply(d: Duration): Option[(Long, TimeUnit)] = {
+ if (d.finite_?) {
+ Some((d.length, d.unit))
+ } else {
+ None
+ }
+ }
+
+ private val RE = ("""^\s*(\d+(?:\.\d+)?)\s*""" + // length part
+ "(?:" + // units are distinguished in separate match groups
+ "(d|day|days)|" +
+ "(h|hour|hours)|" +
+ "(min|minute|minutes)|" +
+ "(s|sec|second|seconds)|" +
+ "(ms|milli|millis|millisecond|milliseconds)|" +
+ "(µs|micro|micros|microsecond|microseconds)|" +
+ "(ns|nano|nanos|nanosecond|nanoseconds)" +
+ """)\s*$""").r // close the non-capturing group
+ private val REinf = """^\s*Inf\s*$""".r
+ private val REminf = """^\s*(?:-\s*|Minus)Inf\s*""".r
+
+ /**
+ * Parse String, return None if no match. Format is `"<length><unit>"`, where
+ * whitespace is allowed before, between and after the parts. Infinities are
+ * designated by `"Inf"` and `"-Inf"` or `"MinusInf"`.
+ */
+ def unapply(s: String): Option[Duration] = s match {
+ case RE(length, d, h, m, s, ms, mus, ns) ⇒
+ if (d ne null) Some(Duration(JDouble.parseDouble(length), DAYS)) else if (h ne null) Some(Duration(JDouble.parseDouble(length), HOURS)) else if (m ne null) Some(Duration(JDouble.parseDouble(length), MINUTES)) else if (s ne null) Some(Duration(JDouble.parseDouble(length), SECONDS)) else if (ms ne null) Some(Duration(JDouble.parseDouble(length), MILLISECONDS)) else if (mus ne null) Some(Duration(JDouble.parseDouble(length), MICROSECONDS)) else if (ns ne null) Some(Duration(JDouble.parseDouble(length), NANOSECONDS)) else
+ sys.error("made some error in regex (should not be possible)")
+ case REinf() ⇒ Some(Inf)
+ case REminf() ⇒ Some(MinusInf)
+ case _ ⇒ None
+ }
+
+ /**
+ * Parse TimeUnit from string representation.
+ */
+ def timeUnit(unit: String) = unit.toLowerCase match {
+ case "d" | "day" | "days" ⇒ DAYS
+ case "h" | "hour" | "hours" ⇒ HOURS
+ case "min" | "minute" | "minutes" ⇒ MINUTES
+ case "s" | "sec" | "second" | "seconds" ⇒ SECONDS
+ case "ms" | "milli" | "millis" | "millisecond" | "milliseconds" ⇒ MILLISECONDS
+ case "µs" | "micro" | "micros" | "microsecond" | "microseconds" ⇒ MICROSECONDS
+ case "ns" | "nano" | "nanos" | "nanosecond" | "nanoseconds" ⇒ NANOSECONDS
+ }
+
+ val Zero: Duration = new FiniteDuration(0, NANOSECONDS)
+ val Undefined: Duration = new Duration with Infinite {
+ override def toString = "Duration.Undefined"
+ override def equals(other: Any) = other.asInstanceOf[AnyRef] eq this
+ override def +(other: Duration): Duration = throw new IllegalArgumentException("cannot add Undefined duration")
+ override def -(other: Duration): Duration = throw new IllegalArgumentException("cannot subtract Undefined duration")
+ override def *(factor: Double): Duration = throw new IllegalArgumentException("cannot multiply Undefined duration")
+ override def /(factor: Double): Duration = throw new IllegalArgumentException("cannot divide Undefined duration")
+ override def /(other: Duration): Double = throw new IllegalArgumentException("cannot divide Undefined duration")
+ def >(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def >=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def <(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def <=(other: Duration) = throw new IllegalArgumentException("cannot compare Undefined duration")
+ def unary_- : Duration = throw new IllegalArgumentException("cannot negate Undefined duration")
+ }
+
+ trait Infinite {
+ this: Duration ⇒
+
+ override def equals(other: Any) = false
+
+ def +(other: Duration): Duration =
+ other match {
+ case _: this.type ⇒ this
+ case _: Infinite ⇒ throw new IllegalArgumentException("illegal addition of infinities")
+ case _ ⇒ this
+ }
+ def -(other: Duration): Duration =
+ other match {
+ case _: this.type ⇒ throw new IllegalArgumentException("illegal subtraction of infinities")
+ case _ ⇒ this
+ }
+ def *(factor: Double): Duration = this
+ def /(factor: Double): Duration = this
+ def /(other: Duration): Double =
+ other match {
+ case _: Infinite ⇒ throw new IllegalArgumentException("illegal division of infinities")
+ // maybe questionable but pragmatic: Inf / 0 => Inf
+ case x ⇒ Double.PositiveInfinity * (if ((this > Zero) ^ (other >= Zero)) -1 else 1)
+ }
+
+ def finite_? = false
+
+ def length: Long = throw new IllegalArgumentException("length not allowed on infinite Durations")
+ def unit: TimeUnit = throw new IllegalArgumentException("unit not allowed on infinite Durations")
+ def toNanos: Long = throw new IllegalArgumentException("toNanos not allowed on infinite Durations")
+ def toMicros: Long = throw new IllegalArgumentException("toMicros not allowed on infinite Durations")
+ def toMillis: Long = throw new IllegalArgumentException("toMillis not allowed on infinite Durations")
+ def toSeconds: Long = throw new IllegalArgumentException("toSeconds not allowed on infinite Durations")
+ def toMinutes: Long = throw new IllegalArgumentException("toMinutes not allowed on infinite Durations")
+ def toHours: Long = throw new IllegalArgumentException("toHours not allowed on infinite Durations")
+ def toDays: Long = throw new IllegalArgumentException("toDays not allowed on infinite Durations")
+ def toUnit(unit: TimeUnit): Double = throw new IllegalArgumentException("toUnit not allowed on infinite Durations")
+
+ def printHMS = toString
+ }
+
+ /**
+ * Infinite duration: greater than any other and not equal to any other,
+ * including itself.
+ */
+ val Inf: Duration = new Duration with Infinite {
+ override def toString = "Duration.Inf"
+ def >(other: Duration) = true
+ def >=(other: Duration) = true
+ def <(other: Duration) = false
+ def <=(other: Duration) = false
+ def unary_- : Duration = MinusInf
+ }
+
+ /**
+ * Infinite negative duration: lesser than any other and not equal to any other,
+ * including itself.
+ */
+ val MinusInf: Duration = new Duration with Infinite {
+ override def toString = "Duration.MinusInf"
+ def >(other: Duration) = false
+ def >=(other: Duration) = false
+ def <(other: Duration) = true
+ def <=(other: Duration) = true
+ def unary_- : Duration = Inf
+ }
+
+ // Java Factories
+ def create(length: Long, unit: TimeUnit): Duration = apply(length, unit)
+ def create(length: Double, unit: TimeUnit): Duration = apply(length, unit)
+ def create(length: Long, unit: String): Duration = apply(length, unit)
+ def parse(s: String): Duration = unapply(s).get
+}
+
+/**
+ * Utility for working with java.util.concurrent.TimeUnit durations.
+ *
+ * <p/>
+ * Examples of usage from Java:
+ * <pre>
+ * import akka.util.FiniteDuration;
+ * import java.util.concurrent.TimeUnit;
+ *
+ * Duration duration = new FiniteDuration(100, MILLISECONDS);
+ * Duration duration = new FiniteDuration(5, "seconds");
+ *
+ * duration.toNanos();
+ * </pre>
+ *
+ * <p/>
+ * Examples of usage from Scala:
+ * <pre>
+ * import akka.util.Duration
+ * import java.util.concurrent.TimeUnit
+ *
+ * val duration = Duration(100, MILLISECONDS)
+ * val duration = Duration(100, "millis")
+ *
+ * duration.toNanos
+ * duration < 1.second
+ * duration <= Duration.Inf
+ * </pre>
+ *
+ * <p/>
+ * Implicits are also provided for Int, Long and Double. Example usage:
+ * <pre>
+ * import akka.util.duration._
+ *
+ * val duration = 100 millis
+ * </pre>
+ *
+ * Extractors, parsing and arithmetic are also included:
+ * <pre>
+ * val d = Duration("1.2 µs")
+ * val Duration(length, unit) = 5 millis
+ * val d2 = d * 2.5
+ * val d3 = d2 + 1.millisecond
+ * </pre>
+ */
+abstract class Duration extends Serializable {
+ def length: Long
+ def unit: TimeUnit
+ def toNanos: Long
+ def toMicros: Long
+ def toMillis: Long
+ def toSeconds: Long
+ def toMinutes: Long
+ def toHours: Long
+ def toDays: Long
+ def toUnit(unit: TimeUnit): Double
+ def printHMS: String
+ def <(other: Duration): Boolean
+ def <=(other: Duration): Boolean
+ def >(other: Duration): Boolean
+ def >=(other: Duration): Boolean
+ def +(other: Duration): Duration
+ def -(other: Duration): Duration
+ def *(factor: Double): Duration
+ def /(factor: Double): Duration
+ def /(other: Duration): Double
+ def unary_- : Duration
+ def finite_? : Boolean
+// def dilated(implicit system: ActorSystem): Duration = this * system.settings.TestTimeFactor
+ def min(other: Duration): Duration = if (this < other) this else other
+ def max(other: Duration): Duration = if (this > other) this else other
+ def sleep(): Unit = Thread.sleep(toMillis)
+
+ // Java API
+ def lt(other: Duration) = this < other
+ def lteq(other: Duration) = this <= other
+ def gt(other: Duration) = this > other
+ def gteq(other: Duration) = this >= other
+ def plus(other: Duration) = this + other
+ def minus(other: Duration) = this - other
+ def mul(factor: Double) = this * factor
+ def div(factor: Double) = this / factor
+ def div(other: Duration) = this / other
+ def neg() = -this
+ def isFinite() = finite_?
+}
+
+class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration {
+ import Duration._
+
+ def this(length: Long, unit: String) = this(length, Duration.timeUnit(unit))
+
+ def toNanos = unit.toNanos(length)
+ def toMicros = unit.toMicros(length)
+ def toMillis = unit.toMillis(length)
+ def toSeconds = unit.toSeconds(length)
+ def toMinutes = unit.toMinutes(length)
+ def toHours = unit.toHours(length)
+ def toDays = unit.toDays(length)
+ def toUnit(u: TimeUnit) = long2double(toNanos) / NANOSECONDS.convert(1, u)
+
+ override def toString = this match {
+ case Duration(1, DAYS) ⇒ "1 day"
+ case Duration(x, DAYS) ⇒ x + " days"
+ case Duration(1, HOURS) ⇒ "1 hour"
+ case Duration(x, HOURS) ⇒ x + " hours"
+ case Duration(1, MINUTES) ⇒ "1 minute"
+ case Duration(x, MINUTES) ⇒ x + " minutes"
+ case Duration(1, SECONDS) ⇒ "1 second"
+ case Duration(x, SECONDS) ⇒ x + " seconds"
+ case Duration(1, MILLISECONDS) ⇒ "1 millisecond"
+ case Duration(x, MILLISECONDS) ⇒ x + " milliseconds"
+ case Duration(1, MICROSECONDS) ⇒ "1 microsecond"
+ case Duration(x, MICROSECONDS) ⇒ x + " microseconds"
+ case Duration(1, NANOSECONDS) ⇒ "1 nanosecond"
+ case Duration(x, NANOSECONDS) ⇒ x + " nanoseconds"
+ }
+
+ def printHMS = "%02d:%02d:%06.3f".format(toHours, toMinutes % 60, toMillis / 1000.0 % 60)
+
+ def <(other: Duration) = {
+ if (other.finite_?) {
+ toNanos < other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other > this
+ }
+ }
+
+ def <=(other: Duration) = {
+ if (other.finite_?) {
+ toNanos <= other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other >= this
+ }
+ }
+
+ def >(other: Duration) = {
+ if (other.finite_?) {
+ toNanos > other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other < this
+ }
+ }
+
+ def >=(other: Duration) = {
+ if (other.finite_?) {
+ toNanos >= other.asInstanceOf[FiniteDuration].toNanos
+ } else {
+ other <= this
+ }
+ }
+
+ def +(other: Duration) = {
+ if (!other.finite_?) {
+ other
+ } else {
+ val nanos = toNanos + other.asInstanceOf[FiniteDuration].toNanos
+ fromNanos(nanos)
+ }
+ }
+
+ def -(other: Duration) = {
+ if (!other.finite_?) {
+ other
+ } else {
+ val nanos = toNanos - other.asInstanceOf[FiniteDuration].toNanos
+ fromNanos(nanos)
+ }
+ }
+
+ def *(factor: Double) = fromNanos(long2double(toNanos) * factor)
+
+ def /(factor: Double) = fromNanos(long2double(toNanos) / factor)
+
+ def /(other: Duration) = if (other.finite_?) long2double(toNanos) / other.toNanos else 0
+
+ def unary_- = Duration(-length, unit)
+
+ def finite_? = true
+
+ override def equals(other: Any) =
+ other.isInstanceOf[FiniteDuration] &&
+ toNanos == other.asInstanceOf[FiniteDuration].toNanos
+
+ override def hashCode = toNanos.asInstanceOf[Int]
+}
+
+class DurationInt(n: Int) {
+ def nanoseconds = Duration(n, NANOSECONDS)
+ def nanos = Duration(n, NANOSECONDS)
+ def nanosecond = Duration(n, NANOSECONDS)
+ def nano = Duration(n, NANOSECONDS)
+
+ def microseconds = Duration(n, MICROSECONDS)
+ def micros = Duration(n, MICROSECONDS)
+ def microsecond = Duration(n, MICROSECONDS)
+ def micro = Duration(n, MICROSECONDS)
+
+ def milliseconds = Duration(n, MILLISECONDS)
+ def millis = Duration(n, MILLISECONDS)
+ def millisecond = Duration(n, MILLISECONDS)
+ def milli = Duration(n, MILLISECONDS)
+
+ def seconds = Duration(n, SECONDS)
+ def second = Duration(n, SECONDS)
+
+ def minutes = Duration(n, MINUTES)
+ def minute = Duration(n, MINUTES)
+
+ def hours = Duration(n, HOURS)
+ def hour = Duration(n, HOURS)
+
+ def days = Duration(n, DAYS)
+ def day = Duration(n, DAYS)
+}
+
+class DurationLong(n: Long) {
+ def nanoseconds = Duration(n, NANOSECONDS)
+ def nanos = Duration(n, NANOSECONDS)
+ def nanosecond = Duration(n, NANOSECONDS)
+ def nano = Duration(n, NANOSECONDS)
+
+ def microseconds = Duration(n, MICROSECONDS)
+ def micros = Duration(n, MICROSECONDS)
+ def microsecond = Duration(n, MICROSECONDS)
+ def micro = Duration(n, MICROSECONDS)
+
+ def milliseconds = Duration(n, MILLISECONDS)
+ def millis = Duration(n, MILLISECONDS)
+ def millisecond = Duration(n, MILLISECONDS)
+ def milli = Duration(n, MILLISECONDS)
+
+ def seconds = Duration(n, SECONDS)
+ def second = Duration(n, SECONDS)
+
+ def minutes = Duration(n, MINUTES)
+ def minute = Duration(n, MINUTES)
+
+ def hours = Duration(n, HOURS)
+ def hour = Duration(n, HOURS)
+
+ def days = Duration(n, DAYS)
+ def day = Duration(n, DAYS)
+}
+
+class DurationDouble(d: Double) {
+ def nanoseconds = Duration(d, NANOSECONDS)
+ def nanos = Duration(d, NANOSECONDS)
+ def nanosecond = Duration(d, NANOSECONDS)
+ def nano = Duration(d, NANOSECONDS)
+
+ def microseconds = Duration(d, MICROSECONDS)
+ def micros = Duration(d, MICROSECONDS)
+ def microsecond = Duration(d, MICROSECONDS)
+ def micro = Duration(d, MICROSECONDS)
+
+ def milliseconds = Duration(d, MILLISECONDS)
+ def millis = Duration(d, MILLISECONDS)
+ def millisecond = Duration(d, MILLISECONDS)
+ def milli = Duration(d, MILLISECONDS)
+
+ def seconds = Duration(d, SECONDS)
+ def second = Duration(d, SECONDS)
+
+ def minutes = Duration(d, MINUTES)
+ def minute = Duration(d, MINUTES)
+
+ def hours = Duration(d, HOURS)
+ def hour = Duration(d, HOURS)
+
+ def days = Duration(d, DAYS)
+ def day = Duration(d, DAYS)
+}
diff --git a/src/library/scala/util/Timeout.scala b/src/library/scala/util/Timeout.scala
new file mode 100644
index 0000000000..0190675344
--- /dev/null
+++ b/src/library/scala/util/Timeout.scala
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
+ */
+package scala.util
+
+import java.util.concurrent.TimeUnit
+
+case class Timeout(duration: Duration) {
+ def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
+ def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
+}
+
+object Timeout {
+ /**
+ * A timeout with zero duration, will cause most requests to always timeout.
+ */
+ val zero = new Timeout(Duration.Zero)
+
+ /**
+ * A Timeout with infinite duration. Will never timeout. Use extreme caution with this
+ * as it may cause memory leaks, blocked threads, or may not even be supported by
+ * the receiver, which would result in an exception.
+ */
+ val never = new Timeout(Duration.Inf)
+
+ def apply(timeout: Long) = new Timeout(timeout)
+ def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
+
+ implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
+ implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
+ implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
+ //implicit def defaultTimeout(implicit system: ActorSystem) = system.settings.ActorTimeout (have to introduce this in ActorSystem)
+}
diff --git a/src/library/scala/util/Try.scala b/src/library/scala/util/Try.scala
new file mode 100644
index 0000000000..a05a75e0b7
--- /dev/null
+++ b/src/library/scala/util/Try.scala
@@ -0,0 +1,165 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2008-2011, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+package scala.util
+
+
+
+import collection.Seq
+
+
+
+/**
+ * The `Try` type represents a computation that may either result in an exception,
+ * or return a success value. It's analagous to the `Either` type.
+ */
+sealed abstract class Try[+T] {
+ /**
+ * Returns true if the `Try` is a `Failure`, false otherwise.
+ */
+ def isFailure: Boolean
+
+ /**
+ * Returns true if the `Try` is a `Success`, false otherwise.
+ */
+ def isSuccess: Boolean
+
+ /**
+ * Returns the value from this `Success` or the given argument if this is a `Failure`.
+ */
+ def getOrElse[U >: T](default: => U) = if (isSuccess) get else default
+
+ /**
+ * Returns the value from this `Success` or throws the exception if this is a `Failure`.
+ */
+ def get: T
+
+ /**
+ * Applies the given function f if this is a Result.
+ */
+ def foreach[U](f: T => U): Unit
+
+ /**
+ * Returns the given function applied to the value from this `Success` or returns this if this is a `Failure`.
+ */
+ def flatMap[U](f: T => Try[U]): Try[U]
+
+ /**
+ * Maps the given function to the value from this `Success` or returns this if this is a `Failure`.
+ */
+ def map[U](f: T => U): Try[U]
+
+ def collect[U](pf: PartialFunction[T, U]): Try[U]
+
+ def exists(p: T => Boolean): Boolean
+
+ /**
+ * Converts this to a `Failure` if the predicate is not satisfied.
+ */
+ def filter(p: T => Boolean): Try[T]
+
+ /**
+ * Converts this to a `Failure` if the predicate is not satisfied.
+ */
+ def filterNot(p: T => Boolean): Try[T] = filter(x => !p(x))
+
+ /**
+ * Calls the exceptionHandler with the exception if this is a `Failure`. This is like `flatMap` for the exception.
+ */
+ def rescue[U >: T](rescueException: PartialFunction[Throwable, Try[U]]): Try[U]
+
+ /**
+ * Calls the exceptionHandler with the exception if this is a `Failure`. This is like map for the exception.
+ */
+ def recover[U >: T](rescueException: PartialFunction[Throwable, U]): Try[U]
+
+ /**
+ * Returns `None` if this is a `Failure` or a `Some` containing the value if this is a `Success`.
+ */
+ def toOption = if (isSuccess) Some(get) else None
+
+ def toSeq = if (isSuccess) Seq(get) else Seq()
+
+ /**
+ * Returns the given function applied to the value from this Success or returns this if this is a `Failure`.
+ * Alias for `flatMap`.
+ */
+ def andThen[U](f: T => Try[U]): Try[U] = flatMap(f)
+
+ /**
+ * Transforms a nested `Try`, i.e., a `Try` of type `Try[Try[T]]`,
+ * into an un-nested `Try`, i.e., a `Try` of type `Try[T]`.
+ */
+ def flatten[U](implicit ev: T <:< Try[U]): Try[U]
+
+ def failed: Try[Throwable]
+}
+
+
+final case class Failure[+T](val exception: Throwable) extends Try[T] {
+ def isFailure = true
+ def isSuccess = false
+ def rescue[U >: T](rescueException: PartialFunction[Throwable, Try[U]]): Try[U] = {
+ try {
+ if (rescueException.isDefinedAt(exception)) rescueException(exception) else this
+ } catch {
+ case e2 => Failure(e2)
+ }
+ }
+ def get: T = throw exception
+ def flatMap[U](f: T => Try[U]): Try[U] = Failure[U](exception)
+ def flatten[U](implicit ev: T <:< Try[U]): Try[U] = Failure[U](exception)
+ def foreach[U](f: T => U): Unit = {}
+ def map[U](f: T => U): Try[U] = Failure[U](exception)
+ def collect[U](pf: PartialFunction[T, U]): Try[U] = Failure[U](exception)
+ def filter(p: T => Boolean): Try[T] = this
+ def recover[U >: T](rescueException: PartialFunction[Throwable, U]): Try[U] =
+ if (rescueException.isDefinedAt(exception)) {
+ Try(rescueException(exception))
+ } else {
+ this
+ }
+ def exists(p: T => Boolean): Boolean = false
+ def failed: Try[Throwable] = Success(exception)
+}
+
+
+final case class Success[+T](r: T) extends Try[T] {
+ def isFailure = false
+ def isSuccess = true
+ def rescue[U >: T](rescueException: PartialFunction[Throwable, Try[U]]): Try[U] = Success(r)
+ def get = r
+ def flatMap[U](f: T => Try[U]): Try[U] =
+ try f(r)
+ catch {
+ case e => Failure(e)
+ }
+ def flatten[U](implicit ev: T <:< Try[U]): Try[U] = r
+ def foreach[U](f: T => U): Unit = f(r)
+ def map[U](f: T => U): Try[U] = Try[U](f(r))
+ def collect[U](pf: PartialFunction[T, U]): Try[U] =
+ if (pf isDefinedAt r) Success(pf(r))
+ else Failure[U](new NoSuchElementException("Partial function not defined at " + r))
+ def filter(p: T => Boolean): Try[T] =
+ if (p(r)) this
+ else Failure(new NoSuchElementException("Predicate does not hold for " + r))
+ def recover[U >: T](rescueException: PartialFunction[Throwable, U]): Try[U] = this
+ def exists(p: T => Boolean): Boolean = p(r)
+ def failed: Try[Throwable] = Failure(new UnsupportedOperationException("Success.failed"))
+}
+
+
+object Try {
+
+ def apply[T](r: => T): Try[T] = {
+ try { Success(r) } catch {
+ case e => Failure(e)
+ }
+ }
+
+}