summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-01-26 08:37:23 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2011-01-26 08:37:23 +0000
commit78a48c46cf3727dd06179cb1360b2f9057647042 (patch)
tree01f418fab8ee1cd61c5a6a891bdce09cde142098
parent78007ac467c9d6e88ae183a9126772829072704c (diff)
downloadscala-78a48c46cf3727dd06179cb1360b2f9057647042.tar.gz
scala-78a48c46cf3727dd06179cb1360b2f9057647042.tar.bz2
scala-78a48c46cf3727dd06179cb1360b2f9057647042.zip
Merge branch 'work'
Conflicts: src/library/scala/concurrent/SyncVar.scala
-rw-r--r--src/library/scala/collection/Iterator.scala2
-rw-r--r--src/library/scala/collection/TraversableLike.scala65
-rw-r--r--src/library/scala/collection/TraversableOnce.scala22
-rw-r--r--src/library/scala/collection/TraversableViewLike.scala5
-rw-r--r--src/library/scala/collection/mutable/UnrolledBuffer.scala (renamed from src/library/scala/collection/parallel/UnrolledBuffer.scala)40
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala6
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala4
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashSet.scala4
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala1
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashSet.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala4
-rw-r--r--src/library/scala/collection/parallel/package.scala1
-rw-r--r--src/library/scala/concurrent/SyncVar.scala40
-rw-r--r--src/library/scala/parallel/Future.scala35
-rw-r--r--src/library/scala/parallel/package.scala178
-rw-r--r--test/benchmarks/src/scala/collection/parallel/Benchmarking.scala5
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala3
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala4
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala28
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala18
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala3
-rw-r--r--test/files/jvm/serialization.scala4
-rw-r--r--test/files/run/UnrolledBuffer.scala2
-rw-r--r--test/files/run/scan.scala (renamed from test/files/pos/scan.scala)0
-rw-r--r--test/files/run/testblock.scala33
-rw-r--r--test/files/run/testpar.scala24
-rw-r--r--test/files/scalacheck/Unrolled.scala2
27 files changed, 470 insertions, 65 deletions
diff --git a/src/library/scala/collection/Iterator.scala b/src/library/scala/collection/Iterator.scala
index 5fae6b8730..58df4ff0d4 100644
--- a/src/library/scala/collection/Iterator.scala
+++ b/src/library/scala/collection/Iterator.scala
@@ -440,6 +440,8 @@ trait Iterator[+A] extends TraversableOnce[A] {
} else Iterator.empty.next()
}
+ def scanRight[B](z: B)(op: (A, B) => B): Iterator[B] = toBuffer.scanRight(z)(op).iterator
+
/** Takes longest prefix of values produced by this iterator that satisfy a predicate.
* @param p The predicate used to test elements.
* @return An iterator returning the values produced by this iterator, until
diff --git a/src/library/scala/collection/TraversableLike.scala b/src/library/scala/collection/TraversableLike.scala
index fd3c7adc3f..57191cdd9a 100644
--- a/src/library/scala/collection/TraversableLike.scala
+++ b/src/library/scala/collection/TraversableLike.scala
@@ -13,8 +13,10 @@ package scala.collection
import generic._
import mutable.{ Builder, ListBuffer }
import annotation.tailrec
+import annotation.migration
import annotation.unchecked.{ uncheckedVariance => uV }
+
/** A template trait for traversable collections of type `Traversable[A]`.
* $traversableInfo
* @define mutability
@@ -429,19 +431,18 @@ trait TraversableLike[+A, +Repr] extends HasNewBuilder[A, Repr]
result
}
- /**
- * Produces a collection containing cummulative results of applying the
- * operator going left to right.
+ /** Produces a collection containing cummulative results of applying the
+ * operator going left to right.
*
- * $willNotTerminateInf
- * $orderDependent
+ * $willNotTerminateInf
+ * $orderDependent
*
- * @tparam B the type of the elements in the resulting collection
- * @tparam That the actual type of the resulting collection
- * @param z the initial value
- * @param op the binary operator applied to the intermediate result and the element
- * @param bf $bfinfo
- * @return collection with intermediate results
+ * @tparam B the type of the elements in the resulting collection
+ * @tparam That the actual type of the resulting collection
+ * @param z the initial value
+ * @param op the binary operator applied to the intermediate result and the element
+ * @param bf $bfinfo
+ * @return collection with intermediate results
*/
def scanLeft[B, That](z: B)(op: (B, A) => B)(implicit bf: CanBuildFrom[Repr, B, That]): That = {
val b = bf(repr)
@@ -452,24 +453,36 @@ trait TraversableLike[+A, +Repr] extends HasNewBuilder[A, Repr]
b.result
}
- /**
- * Produces a collection containing cummulative results of applying the operator going right to left.
- * $willNotTerminateInf
- * $orderDependent
+ /** Produces a collection containing cummulative results of applying the operator going right to left.
+ * The head of the collection is the last cummulative result.
+ * $willNotTerminateInf
+ * $orderDependent
*
- * @tparam B the type of the elements in the resulting collection
- * @tparam That the actual type of the resulting collection
- * @param z the initial value
- * @param op the binary operator applied to the intermediate result and the element
- * @param bf $bfinfo
- * @return collection with intermediate results
- */
+ * Example:
+ * {{{
+ * List(1, 2, 3, 4).scanRight(0)(_ + _) == List(10, 9, 7, 4, 0)
+ * }}}
+ *
+ * @tparam B the type of the elements in the resulting collection
+ * @tparam That the actual type of the resulting collection
+ * @param z the initial value
+ * @param op the binary operator applied to the intermediate result and the element
+ * @param bf $bfinfo
+ * @return collection with intermediate results
+ */
+ @migration(2, 9,
+ "This scanRight definition has changed in 2.9.\n" +
+ "The previous behavior can be reproduced with scanRight.reverse."
+ )
def scanRight[B, That](z: B)(op: (A, B) => B)(implicit bf: CanBuildFrom[Repr, B, That]): That = {
- val b = bf(repr)
- b.sizeHint(this, 1)
+ var scanned = List(z)
var acc = z
- b += acc
- for (x <- reversed) { acc = op(x, acc); b += acc }
+ for (x <- reversed) {
+ acc = op(x, acc)
+ scanned ::= acc
+ }
+ val b = bf(repr)
+ for (elem <- scanned) b += elem
b.result
}
diff --git a/src/library/scala/collection/TraversableOnce.scala b/src/library/scala/collection/TraversableOnce.scala
index 472440be56..5b8abb3e5e 100644
--- a/src/library/scala/collection/TraversableOnce.scala
+++ b/src/library/scala/collection/TraversableOnce.scala
@@ -519,20 +519,42 @@ trait TraversableOnce[+A] {
* - immutable.ParMap overrides `toParMap` to `this`
*/
+ /** Converts this $coll to a parallel iterable.
+ * $willNotTerminateInf
+ * @return a parallel iterable containing all elements of this $coll.
+ */
def toParIterable: parallel.ParIterable[A] = toParSeq
+ /** Converts this $coll to a parallel sequence.
+ * $willNotTerminateInf
+ * @return a parallel sequence containing all elements of this $coll.
+ */
def toParSeq: parallel.ParSeq[A] = {
val cb = parallel.mutable.ParArray.newCombiner[A]
for (elem <- this) cb += elem
cb.result
}
+ /** Converts this $coll to a parallel set.
+ * $willNotTerminateInf
+ * @return a parallel set containing all elements of this $coll.
+ */
def toParSet[B >: A]: parallel.ParSet[B] = {
val cb = parallel.mutable.ParHashSet.newCombiner[B]
for (elem <- this) cb += elem
cb.result
}
+ /** Converts this $coll to a parallel map.
+ * $willNotTerminateInf
+ *
+ * This operation is only available on collections containing pairs of elements.
+ *
+ * @return a parallel map containing all elements of this $coll.
+ * @usecase def toParMap[T, U]: ParMap[T, U]
+ * @return a parallel map of type `parallel.ParMap[T, U]`
+ * containing all key/value pairs of type `(T, U)` of this $coll.
+ */
def toParMap[T, U](implicit ev: A <:< (T, U)): parallel.ParMap[T, U] = {
val cb = parallel.mutable.ParHashMap.newCombiner[T, U]
for (elem <- this) cb += elem
diff --git a/src/library/scala/collection/TraversableViewLike.scala b/src/library/scala/collection/TraversableViewLike.scala
index fb04146adb..0c01cb893e 100644
--- a/src/library/scala/collection/TraversableViewLike.scala
+++ b/src/library/scala/collection/TraversableViewLike.scala
@@ -13,6 +13,7 @@ package scala.collection
import generic._
import mutable.{Builder, ArrayBuffer}
import TraversableView.NoBuilder
+import annotation.migration
/** A template trait for non-strict views of traversable collections.
@@ -217,6 +218,10 @@ self =>
override def scanLeft[B, That](z: B)(op: (B, A) => B)(implicit bf: CanBuildFrom[This, B, That]): That =
newForced(thisSeq.scanLeft(z)(op)).asInstanceOf[That]
+ @migration(2, 9,
+ "This scanRight definition has changed in 2.9.\n" +
+ "The previous behavior can be reproduced with scanRight.reverse."
+ )
override def scanRight[B, That](z: B)(op: (A, B) => B)(implicit bf: CanBuildFrom[This, B, That]): That =
newForced(thisSeq.scanRight(z)(op)).asInstanceOf[That]
diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/mutable/UnrolledBuffer.scala
index 7f81cf779d..10a572408b 100644
--- a/src/library/scala/collection/parallel/UnrolledBuffer.scala
+++ b/src/library/scala/collection/mutable/UnrolledBuffer.scala
@@ -1,9 +1,8 @@
-package scala.collection.parallel
+package scala.collection.mutable
import collection.generic._
-import collection.mutable.Builder
import annotation.tailrec
@@ -52,11 +51,11 @@ extends collection.mutable.Buffer[T]
@transient private var lastptr = headptr
@transient private var sz = 0
- private[parallel] def headPtr = headptr
- private[parallel] def headPtr_=(head: Unrolled[T]) = headptr = head
- private[parallel] def lastPtr = lastptr
- private[parallel] def lastPtr_=(last: Unrolled[T]) = lastptr = last
- private[parallel] def size_=(s: Int) = sz = s
+ private[collection] def headPtr = headptr
+ private[collection] def headPtr_=(head: Unrolled[T]) = headptr = head
+ private[collection] def lastPtr = lastptr
+ private[collection] def lastPtr_=(last: Unrolled[T]) = lastptr = last
+ private[collection] def size_=(s: Int) = sz = s
protected[this] override def newBuilder = new UnrolledBuffer[T]
@@ -66,6 +65,13 @@ extends collection.mutable.Buffer[T]
def classManifestCompanion = UnrolledBuffer
+ /** Concatenates the targer unrolled buffer to this unrolled buffer.
+ *
+ * The specified buffer `that` is cleared after this operation. This is
+ * an O(1) operation.
+ *
+ * @param that the unrolled buffer whose elements are added to this buffer
+ */
def concat(that: UnrolledBuffer[T]) = {
// bind the two together
if (!lastptr.bind(that.headptr)) lastptr = that.lastPtr
@@ -124,17 +130,17 @@ extends collection.mutable.Buffer[T]
def apply(idx: Int) =
if (idx >= 0 && idx < sz) headptr(idx)
- else outofbounds(idx)
+ else throw new IndexOutOfBoundsException(idx.toString)
def update(idx: Int, newelem: T) =
if (idx >= 0 && idx < sz) headptr(idx) = newelem
- else outofbounds(idx)
+ else throw new IndexOutOfBoundsException(idx.toString)
def remove(idx: Int) =
if (idx >= 0 && idx < sz) {
sz -= 1
headptr.remove(idx, this)
- } else outofbounds(idx)
+ } else throw new IndexOutOfBoundsException(idx.toString)
def +=:(elem: T) = {
headptr = headptr.prepend(elem)
@@ -142,11 +148,11 @@ extends collection.mutable.Buffer[T]
this
}
- def insertAll(idx: Int, elems: Traversable[T]) =
+ def insertAll(idx: Int, elems: collection.Traversable[T]) =
if (idx >= 0 && idx <= sz) {
headptr.insertAll(idx, elems, this)
sz += elems.size
- } else outofbounds(idx)
+ } else throw new IndexOutOfBoundsException(idx.toString)
private def writeObject(out: java.io.ObjectOutputStream) {
out.defaultWriteObject
@@ -181,13 +187,13 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
val waterline = 50
val waterlineDelim = 100
- private[parallel] val unrolledlength = 32
+ private[collection] val unrolledlength = 32
/** Unrolled buffer node.
*/
- class Unrolled[T: ClassManifest] private[parallel] (var size: Int, var array: Array[T], var next: Unrolled[T], val buff: UnrolledBuffer[T] = null) {
- private[parallel] def this() = this(0, new Array[T](unrolledlength), null, null)
- private[parallel] def this(b: UnrolledBuffer[T]) = this(0, new Array[T](unrolledlength), null, b)
+ class Unrolled[T: ClassManifest] private[collection] (var size: Int, var array: Array[T], var next: Unrolled[T], val buff: UnrolledBuffer[T] = null) {
+ private[collection] def this() = this(0, new Array[T](unrolledlength), null, null)
+ private[collection] def this(b: UnrolledBuffer[T]) = this(0, new Array[T](unrolledlength), null, b)
private def nextlength = if (buff eq null) unrolledlength else buff.calcNextLength(array.length)
@@ -272,7 +278,7 @@ object UnrolledBuffer extends ClassManifestTraversableFactory[UnrolledBuffer] {
if (next eq null) true else false // checks if last node was thrown out
} else false
- @tailrec final def insertAll(idx: Int, t: Traversable[T], buffer: UnrolledBuffer[T]): Unit = if (idx < size) {
+ @tailrec final def insertAll(idx: Int, t: collection.Traversable[T], buffer: UnrolledBuffer[T]): Unit = if (idx < size) {
// divide this node at the appropriate position and insert all into head
// update new next
val newnextnode = new Unrolled[T](0, new Array(array.length), null, buff)
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index e88d5dd0e0..13a41eb7d4 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -417,9 +417,9 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
object ForkJoinTasks {
- val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool
- defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
- defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
+ val defaultForkJoinPool: ForkJoinPool = scala.parallel.forkjoinpool
+ // defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
+ // defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index d60c2d39e8..446d87348b 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -10,8 +10,8 @@ import scala.collection.parallel.ParMapLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
-import scala.collection.parallel.UnrolledBuffer.Unrolled
-import scala.collection.parallel.UnrolledBuffer
+import scala.collection.mutable.UnrolledBuffer.Unrolled
+import scala.collection.mutable.UnrolledBuffer
import scala.collection.generic.ParMapFactory
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.GenericParMapTemplate
diff --git a/src/library/scala/collection/parallel/immutable/ParHashSet.scala b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
index 56d252f346..77dd947201 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashSet.scala
@@ -10,8 +10,8 @@ import scala.collection.parallel.ParSetLike
import scala.collection.parallel.Combiner
import scala.collection.parallel.ParIterableIterator
import scala.collection.parallel.EnvironmentPassingCombiner
-import scala.collection.parallel.UnrolledBuffer.Unrolled
-import scala.collection.parallel.UnrolledBuffer
+import scala.collection.mutable.UnrolledBuffer.Unrolled
+import scala.collection.mutable.UnrolledBuffer
import scala.collection.generic.ParSetFactory
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.GenericParTemplate
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index d0bffc10f6..4db3d89291 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -8,6 +8,7 @@ import collection.generic._
import collection.mutable.DefaultEntry
import collection.mutable.HashEntry
import collection.mutable.HashTable
+import collection.mutable.UnrolledBuffer
diff --git a/src/library/scala/collection/parallel/mutable/ParHashSet.scala b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
index d2e63100fa..4065baec06 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashSet.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashSet.scala
@@ -8,7 +8,7 @@ import collection.mutable.HashSet
import collection.mutable.FlatHashTable
import collection.parallel.Combiner
import collection.parallel.EnvironmentPassingCombiner
-import collection.parallel.UnrolledBuffer
+import collection.mutable.UnrolledBuffer
diff --git a/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala b/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala
index 339f827aef..9e27f6c58c 100644
--- a/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala
+++ b/src/library/scala/collection/parallel/mutable/UnrolledParArrayCombiner.scala
@@ -7,11 +7,11 @@ package scala.collection.parallel.mutable
import scala.collection.generic.Sizing
import scala.collection.mutable.ArraySeq
import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.UnrolledBuffer
+import scala.collection.mutable.UnrolledBuffer.Unrolled
import scala.collection.parallel.TaskSupport
import scala.collection.parallel.EnvironmentPassingCombiner
import scala.collection.parallel.unsupportedop
-import scala.collection.parallel.UnrolledBuffer
-import scala.collection.parallel.UnrolledBuffer.Unrolled
import scala.collection.parallel.Combiner
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index acced246da..67b60ad129 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -6,6 +6,7 @@ import java.lang.Thread._
import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
+import scala.collection.mutable.UnrolledBuffer
import annotation.unchecked.uncheckedVariance
diff --git a/src/library/scala/concurrent/SyncVar.scala b/src/library/scala/concurrent/SyncVar.scala
index 61f8188ab1..bf39fef27c 100644
--- a/src/library/scala/concurrent/SyncVar.scala
+++ b/src/library/scala/concurrent/SyncVar.scala
@@ -36,16 +36,40 @@ class SyncVar[A] {
* by counting time elapsed directly. Loop required
* to deal with spurious wakeups.
*/
- var rest = timeout
- while (!isDefined && rest >= 0) {
- val elapsed = waitMeasuringElapsed(timeout)
- if (!isDefined && elapsed > 0)
- rest -= elapsed
- }
- if (isDefined) Some(value)
- else None
+ var rest = timeout
+ while (!isDefined && rest >= 0) {
+ val elapsed = waitMeasuringElapsed(timeout)
+ if (!isDefined && elapsed > 0)
+ rest -= elapsed
+ }
+ if (isDefined) Some(value)
+ else None
}
+ // /** Waits for this SyncVar to become defined at least for
+ // * `timeout` milliseconds (possibly more), and gets its
+ // * value.
+ // *
+ // * @param timeout the amount of milliseconds to wait
+ // * @return `None` if variable is undefined after `timeout`, `Some(value)` otherwise
+ // */
+ // def get(timeout: Long): Option[A] = synchronized {
+ // if (timeout == 0L) Some(get)
+ // else {
+ // val start = System.currentTimeMillis
+ // var left = timeout
+ // while (!isDefined && left > 0) {
+ // wait(left)
+ // if (!isDefined) {
+ // val elapsed = System.currentTimeMillis - start
+ // left = timeout - elapsed
+ // }
+ // }
+ // if (isDefined) Some(value)
+ // else None
+ // }
+ // }
+
def take() = synchronized {
try get
finally unset()
diff --git a/src/library/scala/parallel/Future.scala b/src/library/scala/parallel/Future.scala
new file mode 100644
index 0000000000..29faa06148
--- /dev/null
+++ b/src/library/scala/parallel/Future.scala
@@ -0,0 +1,35 @@
+package scala.parallel
+
+
+
+
+
+
+
+/** A future is a function without parameters that will block the caller if the
+ * parallel computation associated with the function is not completed.
+ *
+ * @since 2.9
+ */
+trait Future[@specialized +R] extends (() => R) {
+ /** Returns a result once the parallel computation completes. If the computation
+ * produced an exception, an exception is forwarded.
+ *
+ * '''Note:''' creating a circular dependency between futures by calling this method will
+ * result in a deadlock.
+ *
+ * @tparam R the type of the result
+ * @return the result
+ * @throws the exception that was thrown during a parallel computation
+ */
+ def apply(): R
+
+ /** Returns `true` if the parallel computation is completed.
+ *
+ * @return `true` if the parallel computation is completed, `false` otherwise
+ */
+ def isDone(): Boolean
+}
+
+
+
diff --git a/src/library/scala/parallel/package.scala b/src/library/scala/parallel/package.scala
new file mode 100644
index 0000000000..4cae1ad4b1
--- /dev/null
+++ b/src/library/scala/parallel/package.scala
@@ -0,0 +1,178 @@
+package scala
+
+
+
+import scala.concurrent.forkjoin._
+
+
+/** This package object contains various parallel operations.
+ *
+ * @define invokingPar
+ * Invoking a parallel computation creates a future which will
+ * hold the result of the computation once it completes. Querying
+ * the result of a future before its parallel computation has completed
+ * will block the caller. For all practical concerns, the dependency
+ * chain obtained by querying results of unfinished futures can have
+ * arbitrary lengths. However, care must be taken not to create a
+ * circular dependency, as this will result in a deadlock.
+ *
+ * Additionally, if the parallel computation performs a blocking call
+ * (e.g. an I/O operation or waiting for a lock) other than waiting for a future,
+ * it should do so by invoking the `block` method. This is another
+ * form of waiting that could potentially create a circular dependency,
+ * an the user should take care not to do this.
+ *
+ * Users should be aware that invoking a parallel computation has a
+ * certain overhead. Parallel computations should not be invoked for
+ * small computations, as this can lead to bad performance. A rule of the
+ * thumb is having parallel computations equivalent to a loop
+ * with 50000 arithmetic operations (at least). If a parallel computation
+ * is invoked within another parallel computation, then it should be
+ * computationally equivalent to a loop with 10000 arithmetic operations.
+ */
+package object parallel {
+
+ private[scala] val forkjoinpool = new ForkJoinPool()
+
+ private class Task[T](body: =>T) extends RecursiveTask[T] with Future[T] {
+ def compute = body
+ def apply() = join()
+ }
+
+ private final def newTask[T](body: =>T) = new Task[T](body)
+
+ private final def executeTask[T](task: RecursiveTask[T]) {
+ if (Thread.currentThread().isInstanceOf[ForkJoinWorkerThread]) task.fork
+ else forkjoinpool.execute(task)
+ }
+
+ /* public methods */
+
+ /** Performs a call which can potentially block execution.
+ *
+ * Example:
+ * {{{
+ * val lock = new ReentrantLock
+ *
+ * // ... do something ...
+ *
+ * blocking {
+ * if (!lock.hasLock) lock.lock()
+ * }
+ * }}}
+ *
+ * '''Note:''' calling methods that wait arbitrary amounts of time
+ * (e.g. for I/O operations or locks) may severely decrease performance
+ * or even result in deadlocks. This does not include waiting for
+ * results of futures.
+ */
+ def blocking[T](body: =>T): T = {
+ if (Thread.currentThread().isInstanceOf[ForkJoinWorkerThread]) {
+ val blocker = new ForkJoinPool.ManagedBlocker {
+ @volatile var done = false
+ @volatile var result: Any = _
+ def block() = {
+ result = body
+ done = true
+ true
+ }
+ def isReleasable() = done
+ }
+ ForkJoinPool.managedBlock(blocker, true)
+ blocker.result.asInstanceOf[T]
+ } else body
+ }
+
+ /** Starts a parallel computation and returns a future.
+ *
+ * $invokingPar
+ *
+ * @tparam T the type of the result of the parallel computation
+ * @param body the computation to be invoked in parallel
+ * @return a future with the result
+ */
+ def par[T](body: =>T): Future[T] = {
+ val task = newTask(body)
+ executeTask(task)
+ task
+ }
+
+ /** Starts 2 parallel computations and returns a future.
+ *
+ * $invokingPar
+ *
+ * @tparam T1 the type of the result of 1st the parallel computation
+ * @tparam T2 the type of the result of 2nd the parallel computation
+ * @param b1 the 1st computation to be invoked in parallel
+ * @param b2 the 2nd computation to be invoked in parallel
+ * @return a tuple of futures corresponding to parallel computations
+ */
+ def par[T1, T2](b1: =>T1, b2: =>T2): (Future[T1], Future[T2]) = {
+ val t1 = newTask(b1)
+ executeTask(t1)
+ val t2 = newTask(b2)
+ executeTask(t2)
+ (t1, t2)
+ }
+
+ /** Starts 3 parallel computations and returns a future.
+ *
+ * $invokingPar
+ *
+ * @tparam T1 the type of the result of 1st the parallel computation
+ * @tparam T2 the type of the result of 2nd the parallel computation
+ * @tparam T3 the type of the result of 3rd the parallel computation
+ * @param b1 the 1st computation to be invoked in parallel
+ * @param b2 the 2nd computation to be invoked in parallel
+ * @param b3 the 3rd computation to be invoked in parallel
+ * @return a tuple of futures corresponding to parallel computations
+ */
+ def par[T1, T2, T3](b1: =>T1, b2: =>T2, b3: =>T3): (Future[T1], Future[T2], Future[T3]) = {
+ val t1 = newTask(b1)
+ executeTask(t1)
+ val t2 = newTask(b2)
+ executeTask(t2)
+ val t3 = newTask(b3)
+ executeTask(t3)
+ (t1, t2, t3)
+ }
+
+ /** Starts 4 parallel computations and returns a future.
+ *
+ * $invokingPar
+ *
+ * @tparam T1 the type of the result of 1st the parallel computation
+ * @tparam T2 the type of the result of 2nd the parallel computation
+ * @tparam T3 the type of the result of 3rd the parallel computation
+ * @tparam T4 the type of the result of 4th the parallel computation
+ * @param b1 the 1st computation to be invoked in parallel
+ * @param b2 the 2nd computation to be invoked in parallel
+ * @param b3 the 3rd computation to be invoked in parallel
+ * @param b4 the 4th computation to be invoked in parallel
+ * @return a tuple of futures corresponding to parallel computations
+ */
+ def par[T1, T2, T3, T4](b1: =>T1, b2: =>T2, b3: =>T3, b4: =>T4): (Future[T1], Future[T2], Future[T3], Future[T4]) = {
+ val t1 = newTask(b1)
+ executeTask(t1)
+ val t2 = newTask(b2)
+ executeTask(t2)
+ val t3 = newTask(b3)
+ executeTask(t3)
+ val t4 = newTask(b4)
+ executeTask(t4)
+ (t1, t2, t3, t4)
+ }
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
index e413fb537e..c3811601e3 100644
--- a/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
+++ b/test/benchmarks/src/scala/collection/parallel/Benchmarking.scala
@@ -119,6 +119,9 @@ trait BenchmarkRegister {
register(hashtables.RefParHashTableBenches.Map)
register(hashtables.RefParHashTableBenches.Map2)
register(hashtables.RefParHashTableBenches.HeavyMap)
+ register(hashtables.RefParHashTableBenches.Filter)
+ register(hashtables.RefParHashTableBenches.FlatMap)
+ register(hashtables.RefParHashTableBenches.FlatMap2)
// parallel hash table set benchmarks
register(hashtables.RefParHashTableSetBenches.Reduce)
@@ -128,6 +131,8 @@ trait BenchmarkRegister {
register(hashtables.RefParHashTableSetBenches.Map)
register(hashtables.RefParHashTableSetBenches.Map2)
register(hashtables.RefParHashTableSetBenches.HeavyMap)
+ register(hashtables.RefParHashTableSetBenches.Filter)
+ register(hashtables.RefParHashTableSetBenches.FlatMap)
// general examples
register(misc.Coder)
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala
index 7cb3641622..3b5308f8c2 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Dummy.scala
@@ -58,6 +58,9 @@ object DummyOperators extends Operators[Dummy] {
}
a
}
+ val flatmapper = (a: Dummy) => {
+ List(a, a, a, a, a)
+ }
val taker = (a: Dummy) => {
a.in >= 0
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala
index 84f69239f6..4fb76542e1 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/Operators.scala
@@ -14,6 +14,7 @@ trait Operators[T] {
def mapper: T => T
def mapper2: T => T = error("unsupported")
def heavymapper: T => T
+ def flatmapper: T => Seq[T]
def taker: T => Boolean
def eachFun: T => Unit
def eachPairFun: ((T, T)) => Unit = error("unsupported")
@@ -44,6 +45,9 @@ trait IntOperators extends Operators[Int] {
}
n + sum
}
+ val flatmapper: Int => Seq[Int] = (n: Int) => {
+ List(n, n, n, n, n)
+ }
val taker: Int => Boolean = _ < 10000
val eachFun: Int => Unit = { n =>
n % 2 == 0
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala
index c0545aff52..b14a0b3aab 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/generic/ParallelBenches.scala
@@ -117,6 +117,34 @@ trait StandardParIterableBenches[T, Coll <: ParIterable[T]] extends ParIterableB
def companion = Map
}
+ object Filter extends IterableBenchCompanion {
+ override def defaultSize = 5000
+ def benchName = "filter";
+ def apply(sz: Int, p: Int, w: String) = new Filter(sz, p, w)
+ }
+
+ class Filter(val size: Int, val parallelism: Int, val runWhat: String)
+ extends IterableBench {
+ def comparisonMap = collection.Map()
+ def runseq = this.seqcoll.filter(operators.filterer)
+ def runpar = this.parcoll.filter(operators.filterer)
+ def companion = Filter
+ }
+
+ object FlatMap extends IterableBenchCompanion {
+ override def defaultSize = 5000
+ def benchName = "flatmap";
+ def apply(sz: Int, p: Int, w: String) = new FlatMap(sz, p, w)
+ }
+
+ class FlatMap(val size: Int, val parallelism: Int, val runWhat: String)
+ extends IterableBench {
+ def comparisonMap = collection.Map()
+ def runseq = this.seqcoll.flatMap(operators.flatmapper)
+ def runpar = this.parcoll.flatMap(operators.flatmapper)
+ def companion = FlatMap
+ }
+
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala
index cf0e53e47d..bdb1dff56d 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtables/ParallelHashTables.scala
@@ -57,6 +57,21 @@ trait ParHashTableBenches[K, V] extends StandardParIterableBenches[(K, V), ParHa
}
}
+ object FlatMap2 extends IterableBenchCompanion {
+ override def defaultSize = 5000
+ def benchName = "flatmap2";
+ def apply(sz: Int, p: Int, w: String) = new FlatMap2(sz, p, w)
+ }
+
+ class FlatMap2(val size: Int, val parallelism: Int, val runWhat: String)
+ extends IterableBench {
+ def comparisonMap = collection.Map()
+ override def repetitionsPerRun = 25
+ def runseq = this.seqcoll.flatMap(operators.flatmapper)
+ def runpar = this.parcoll.flatMap(operators.flatmapper)
+ def companion = FlatMap2
+ }
+
object HeavyMap extends IterableBenchCompanion {
override def defaultSize = 5000
override def comparisons = List()
@@ -177,6 +192,9 @@ object RefParHashTableBenches extends ParHashTableBenches[Dummy, Dummy] {
a.num = a.in % 2
(a, p._2)
}
+ val flatmapper = (p: DPair) => {
+ for (i <- 0 until 20) yield p
+ }
override val mapper2 = (p: DPair) => {
val a = 1 //heavy(p._1.in)
(new Dummy(p._1.in * -2 + a), p._2)
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
index 81d4f095da..3976b72d1a 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/hashtries/ParallelHashTries.scala
@@ -135,6 +135,9 @@ object RefParHashTrieBenches extends ParHashTrieBenches[Dummy, Dummy] {
a.num = a.in % 2
(a, p._2)
}
+ val flatmapper = (p: DPair) => {
+ List(p, p, p, p, p)
+ }
override val mapper2 = (p: DPair) => {
val a = 1 //heavy(p._1.in)
(new Dummy(p._1.in * -2 + a), p._2)
diff --git a/test/files/jvm/serialization.scala b/test/files/jvm/serialization.scala
index 8c4df8fc37..6a4e37bda9 100644
--- a/test/files/jvm/serialization.scala
+++ b/test/files/jvm/serialization.scala
@@ -604,9 +604,9 @@ object Test9_parallel {
println()
// UnrolledBuffer
- val ub = new UnrolledBuffer[String]
+ val ub = new collection.mutable.UnrolledBuffer[String]
ub ++= List("one", "two")
- val _ub: UnrolledBuffer[String] = read(write(ub))
+ val _ub: collection.mutable.UnrolledBuffer[String] = read(write(ub))
check(ub, _ub)
// mutable.ParArray
diff --git a/test/files/run/UnrolledBuffer.scala b/test/files/run/UnrolledBuffer.scala
index 7e113c3e04..62a1f7d083 100644
--- a/test/files/run/UnrolledBuffer.scala
+++ b/test/files/run/UnrolledBuffer.scala
@@ -2,7 +2,7 @@
-import collection.parallel.UnrolledBuffer
+import collection.mutable.UnrolledBuffer
diff --git a/test/files/pos/scan.scala b/test/files/run/scan.scala
index 47e0a7d976..47e0a7d976 100644
--- a/test/files/pos/scan.scala
+++ b/test/files/run/scan.scala
diff --git a/test/files/run/testblock.scala b/test/files/run/testblock.scala
new file mode 100644
index 0000000000..a334b668fd
--- /dev/null
+++ b/test/files/run/testblock.scala
@@ -0,0 +1,33 @@
+
+
+
+
+import scala.parallel._
+
+
+
+
+object Test {
+
+ def main(args: Array[String]) {
+ if (util.Properties.isJavaAtLeast("1.6")) {
+ val vendor = util.Properties.javaVmVendor
+ if ((vendor contains "Sun") || (vendor contains "Apple")) blockcomp(10)
+ }
+ }
+
+ val lock = new java.util.concurrent.locks.ReentrantLock
+
+ def blockcomp(n: Int): Unit = if (n > 0) {
+ val (x, y) = par(blockcomp(n - 1), blockcomp(n - 1))
+ if (n == 8) blocking { // without this blocking block, deadlock occurs
+ lock.lock()
+ }
+ x()
+ y()
+ if (n == 8) {
+ lock.unlock()
+ }
+ }
+
+}
diff --git a/test/files/run/testpar.scala b/test/files/run/testpar.scala
new file mode 100644
index 0000000000..c4c813ee00
--- /dev/null
+++ b/test/files/run/testpar.scala
@@ -0,0 +1,24 @@
+
+
+
+import scala.parallel._
+
+
+
+
+
+object Test {
+
+ def main(args: Array[String]) {
+ if (util.Properties.isJavaAtLeast("1.6")) {
+ val vendor = util.Properties.javaVmVendor
+ if ((vendor contains "Sun") || (vendor contains "Apple")) assert(fib(40) == 102334155)
+ }
+ }
+
+ def fib(n: Int): Int = if (n < 3) 1 else if (n < 35) fib(n - 1) + fib(n - 2) else {
+ val (p, pp) = par(fib(n - 1), fib(n - 2))
+ p() + pp()
+ }
+
+}
diff --git a/test/files/scalacheck/Unrolled.scala b/test/files/scalacheck/Unrolled.scala
index d69e62dd01..34604b8667 100644
--- a/test/files/scalacheck/Unrolled.scala
+++ b/test/files/scalacheck/Unrolled.scala
@@ -2,7 +2,7 @@ import org.scalacheck._
import Prop._
import Gen._
-import collection.parallel.UnrolledBuffer
+import collection.mutable.UnrolledBuffer
object Test extends Properties("UnrolledBuffer") {