summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:20:00 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-10-20 20:20:00 +0000
commite7ca142b45255f6b41582c25fe590a664d5fc8b9 (patch)
treea674b7cc69ad247330d444f4011a55d6a7ce61e2
parentd3d218e5ea77584489437f0dfa8148ee3764d6f7 (diff)
downloadscala-e7ca142b45255f6b41582c25fe590a664d5fc8b9.tar.gz
scala-e7ca142b45255f6b41582c25fe590a664d5fc8b9.tar.bz2
scala-e7ca142b45255f6b41582c25fe590a664d5fc8b9.zip
Some exception handling fixes in parallel colle...
Some exception handling fixes in parallel collections. Fixed some regressions. Fixed some tests. No review.
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala54
-rw-r--r--src/library/scala/collection/parallel/ParMapLike.scala2
-rw-r--r--src/library/scala/collection/parallel/ParSeqLike.scala4
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala22
-rw-r--r--src/library/scala/collection/parallel/immutable/ParHashMap.scala2
-rw-r--r--src/library/scala/collection/parallel/mutable/ParHashMap.scala32
-rw-r--r--src/library/scala/collection/parallel/mutable/ParMapLike.scala4
-rw-r--r--src/library/scala/collection/parallel/package.scala92
-rw-r--r--src/partest/scala/tools/partest/nest/Worker.scala4
-rw-r--r--test/files/scalacheck/parallel-collections/PairOperators.scala97
-rw-r--r--test/files/scalacheck/parallel-collections/PairValues.scala28
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala2
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala66
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala17
-rw-r--r--test/files/scalacheck/parallel-collections/pc.scala29
15 files changed, 359 insertions, 96 deletions
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 881ab80038..3d839119b0 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -227,15 +227,11 @@ self =>
def map(r: R): R1 = mapping(r)
}
- def compose[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] {
- val ft = tsk
- val st = t2
+ def compose[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new SeqComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]](tsk, t2) {
def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr)
}
- def parallel[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]] {
- val ft = tsk
- val st = t2
+ def parallel[R3, R2, Tp2](t2: SSCTask[R2, Tp2])(resCombiner: (R, R2) => R3) = new ParComposite[R, R2, R3, SSCTask[R, Tp], SSCTask[R2, Tp2]](tsk, t2) {
def combineResults(fr: R, sr: R2): R3 = resCombiner(fr, sr)
}
}
@@ -460,7 +456,9 @@ self =>
othtask.compute
othtask.result
}
- val task = (copythis parallel copythat) { _ combine _ } mapResult { _.result }
+ val task = (copythis parallel copythat) { _ combine _ } mapResult {
+ _.result
+ }
executeAndWaitResult(task)
} else if (bf.isParallel) {
// println("case parallel builder, `that` not parallel")
@@ -687,37 +685,42 @@ self =>
protected[this] trait NonDivisible[R] extends NonDivisibleTask[R, NonDivisible[R]]
- protected[this] trait Composite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]]
+ protected[this] abstract class Composite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]]
+ (val ft: First, val st: Second)
extends NonDivisibleTask[R, Composite[FR, SR, R, First, Second]] {
- val ft: First
- val st: Second
def combineResults(fr: FR, sr: SR): R
var result: R = null.asInstanceOf[R]
private[parallel] override def signalAbort {
ft.signalAbort
st.signalAbort
}
+ protected def mergeSubtasks {
+ ft mergeThrowables st
+ if (throwable eq null) result = combineResults(ft.result, st.result)
+ }
override def requiresStrictSplitters = ft.requiresStrictSplitters || st.requiresStrictSplitters
}
/** Sequentially performs one task after another. */
- protected[this] trait SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]]
- extends Composite[FR, SR, R, First, Second] {
+ protected[this] abstract class SeqComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]]
+ (f: First, s: Second)
+ extends Composite[FR, SR, R, First, Second](f, s) {
def leaf(prevr: Option[R]) = {
- ft.compute
- st.compute
- result = combineResults(ft.result, st.result)
+ executeAndWaitResult(ft)
+ executeAndWaitResult(st)
+ mergeSubtasks
}
}
/** Performs two tasks in parallel, and waits for both to finish. */
- protected[this] trait ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]]
- extends Composite[FR, SR, R, First, Second] {
+ protected[this] abstract class ParComposite[FR, SR, R, First <: StrictSplitterCheckTask[FR, _], Second <: StrictSplitterCheckTask[SR, _]]
+ (f: First, s: Second)
+ extends Composite[FR, SR, R, First, Second](f, s) {
def leaf(prevr: Option[R]) = {
- st.start
- ft.compute
- st.sync
- result = combineResults(ft.result, st.result)
+ val ftfuture = execute(ft)
+ executeAndWaitResult(st)
+ ftfuture()
+ mergeSubtasks
}
}
@@ -727,7 +730,8 @@ self =>
def map(r: R): R1
def leaf(prevr: Option[R1]) = {
inner.compute
- result = map(inner.result)
+ throwable = inner.throwable
+ if (throwable eq null) result = map(inner.result)
}
private[parallel] override def signalAbort {
inner.signalAbort
@@ -756,7 +760,7 @@ self =>
protected[this] def newSubtask(p: ParIterableIterator[T]) = new Reduce(op, p)
override def merge(that: Reduce[U]) =
if (this.result == None) result = that.result
- else if (that.result != None) op(result.get, that.result.get)
+ else if (that.result != None) result = Some(op(result.get, that.result.get))
override def requiresStrictSplitters = true
}
@@ -890,7 +894,9 @@ self =>
protected[this] class Take[U >: T, This >: Repr](n: Int, cbf: () => Combiner[U, This], protected[this] val pit: ParIterableIterator[T])
extends Transformer[Combiner[U, This], Take[U, This]] {
var result: Combiner[U, This] = null
- def leaf(prev: Option[Combiner[U, This]]) = result = pit.take2combiner(n, reuse(prev, cbf()))
+ def leaf(prev: Option[Combiner[U, This]]) = {
+ result = pit.take2combiner(n, reuse(prev, cbf()))
+ }
protected[this] def newSubtask(p: ParIterableIterator[T]) = throw new UnsupportedOperationException
override def split = {
val pits = pit.split
diff --git a/src/library/scala/collection/parallel/ParMapLike.scala b/src/library/scala/collection/parallel/ParMapLike.scala
index e6944953b5..6eb621c76a 100644
--- a/src/library/scala/collection/parallel/ParMapLike.scala
+++ b/src/library/scala/collection/parallel/ParMapLike.scala
@@ -24,7 +24,7 @@ extends MapLike[K, V, Repr]
protected[this] override def newBuilder: Builder[(K, V), Repr] = newCombiner
- protected[this] override def newCombiner: Combiner[(K, V), Repr] = error("Must be implemented in concrete classes.")
+ protected[this] override def newCombiner: Combiner[(K, V), Repr] = unsupportedop("Must implement `newCombiner` in concrete collections.")
override def empty: Repr
diff --git a/src/library/scala/collection/parallel/ParSeqLike.scala b/src/library/scala/collection/parallel/ParSeqLike.scala
index d2e0f965d6..b3d0593ecd 100644
--- a/src/library/scala/collection/parallel/ParSeqLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqLike.scala
@@ -249,7 +249,9 @@ self =>
tsk.result
}
val copyend = new Copy[U, That](() => pbf(repr), pits(2))
- executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult { _.result })
+ executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
+ _.result
+ })
} else patch_sequential(from, patch, replaced)
private def patch_sequential[U >: T, That](from: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 915e02f787..c58bc8b734 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -59,8 +59,8 @@ trait Tasks {
protected[this] def merge(that: Tp) {}
// exception handling mechanism
- var exception: Exception = null
- def forwardException = if (exception != null) throw exception
+ var throwable: Throwable = null
+ def forwardThrowable = if (throwable != null) throw throwable
// tries to do the leaf computation, storing the possible exception
protected def tryLeaf(result: Option[R]) {
try {
@@ -70,15 +70,21 @@ trait Tasks {
signalAbort
}
} catch {
- case e: Exception =>
- exception = e
+ case thr: Throwable =>
+ throwable = thr
signalAbort
}
}
protected[this] def tryMerge(t: Tp) {
val that = t.asInstanceOf[Task[R, Tp]]
- if (this.exception == null && that.exception == null) merge(that.repr)
- else if (that.exception != null) this.exception = that.exception
+ if (this.throwable == null && that.throwable == null) merge(t)
+ mergeThrowables(that)
+ }
+ private[parallel] def mergeThrowables(that: Task[_, _]) {
+ if (this.throwable != null && that.throwable != null) {
+ // merge exceptions, since there were multiple exceptions
+ this.throwable = this.throwable alongWith that.throwable
+ } else if (that.throwable != null) this.throwable = that.throwable
}
// override in concrete task implementations to signal abort to other tasks
private[parallel] def signalAbort {}
@@ -206,7 +212,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
() => {
fjtask.join
- fjtask.forwardException
+ fjtask.forwardThrowable
fjtask.result
}
}
@@ -225,7 +231,7 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
forkJoinPool.execute(fjtask)
}
fjtask.join
- fjtask.forwardException
+ fjtask.forwardThrowable
fjtask.result
}
diff --git a/src/library/scala/collection/parallel/immutable/ParHashMap.scala b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
index 37b52b7a40..7cc0adbb4f 100644
--- a/src/library/scala/collection/parallel/immutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/immutable/ParHashMap.scala
@@ -40,6 +40,8 @@ self =>
override def empty: ParHashMap[K, V] = new ParHashMap[K, V]
+ protected[this] override def newCombiner = HashMapCombiner[K, V]
+
def parallelIterator: ParIterableIterator[(K, V)] = new ParHashMapIterator(trie.iterator, trie.size) with SCPI
def seq = trie
diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
index fb4119bddc..3648945857 100644
--- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala
+++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala
@@ -28,6 +28,8 @@ self =>
override def empty: ParHashMap[K, V] = new ParHashMap[K, V]
+ protected[this] override def newCombiner = ParHashMapCombiner[K, V]
+
def seq = new collection.mutable.HashMap[K, V](hashTableContents)
def parallelIterator = new ParHashMapIterator(0, table.length, size, table(0).asInstanceOf[DefaultEntry[K, V]]) with SCPI
@@ -108,7 +110,9 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
// construct table
val table = new AddingHashTable(size, tableLoadFactor)
- executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks))
+ val insertcount = executeAndWaitResult(new FillBlocks(heads, table, 0, ParHashMapCombiner.numblocks))
+
+ // TODO compare insertcount and size to see if compression is needed
val c = table.hashTableContents
new ParHashMap(c)
@@ -118,7 +122,8 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
* it allocates the table of the required size when created.
*
* Entries are added using the `insertEntry` method. This method checks whether the element
- * exists and updates the size map.
+ * exists and updates the size map. It returns false if the key was already in the table,
+ * and true if the key was successfully inserted.
*/
class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] {
import HashTable._
@@ -127,7 +132,7 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
tableSize = 0
threshold = newThreshold(_loadFactor, table.length)
sizeMapInit(table.length)
- def insertEntry(e: DefaultEntry[K, V]) {
+ def insertEntry(e: DefaultEntry[K, V]) = {
var h = index(elemHashCode(e.key))
var olde = table(h).asInstanceOf[DefaultEntry[K, V]]
@@ -146,24 +151,27 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
table(h) = e
tableSize = tableSize + 1
nnSizeMapAdd(h)
- }
+ true
+ } else false
}
}
/* tasks */
class FillBlocks(buckets: Array[Unrolled[DefaultEntry[K, V]]], table: AddingHashTable, offset: Int, howmany: Int)
- extends super.Task[Unit, FillBlocks] {
- var result = ()
- def leaf(prev: Option[Unit]) = {
+ extends super.Task[Int, FillBlocks] {
+ var result = Int.MinValue
+ def leaf(prev: Option[Int]) = {
var i = offset
val until = offset + howmany
+ result = 0
while (i < until) {
- fillBlock(buckets(i))
+ result += fillBlock(buckets(i))
i += 1
}
}
- private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) {
+ private def fillBlock(elems: Unrolled[DefaultEntry[K, V]]) = {
+ var insertcount = 0
var unrolled = elems
var i = 0
val t = table
@@ -172,17 +180,21 @@ self: EnvironmentPassingCombiner[(K, V), ParHashMap[K, V]] =>
val chunksz = unrolled.size
while (i < chunksz) {
val elem = chunkarr(i)
- t.insertEntry(elem)
+ if (t.insertEntry(elem)) insertcount += 1
i += 1
}
i = 0
unrolled = unrolled.next
}
+ insertcount
}
def split = {
val fp = howmany / 2
List(new FillBlocks(buckets, table, offset, fp), new FillBlocks(buckets, table, offset + fp, howmany - fp))
}
+ override def merge(that: FillBlocks) {
+ this.result += that.result
+ }
def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(ParHashMapCombiner.numblocks, parallelismLevel)
}
diff --git a/src/library/scala/collection/parallel/mutable/ParMapLike.scala b/src/library/scala/collection/parallel/mutable/ParMapLike.scala
index 902186eb2d..25b5c3b5f7 100644
--- a/src/library/scala/collection/parallel/mutable/ParMapLike.scala
+++ b/src/library/scala/collection/parallel/mutable/ParMapLike.scala
@@ -12,6 +12,6 @@ trait ParMapLike[K,
V,
+Repr <: ParMapLike[K, V, Repr, Sequential] with ParMap[K, V],
+Sequential <: collection.mutable.Map[K, V] with collection.mutable.MapLike[K, V, Sequential]]
-extends collection.parallel.ParMapLike[K, V, Repr, Sequential]
- with collection.mutable.MapLike[K, V, Repr]
+extends collection.mutable.MapLike[K, V, Repr]
+ with collection.parallel.ParMapLike[K, V, Repr, Sequential]
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index a30d564039..bb53dcdaaa 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -31,12 +31,61 @@ package object parallel {
else sz
}
- private[parallel] def unsupported(msg: String) = throw new UnsupportedOperationException(msg)
-
private[parallel] def unsupported = throw new UnsupportedOperationException
+ private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg)
+
+ /* implicit conversions */
+
+ /** An implicit conversion providing arrays with a `par` method, which
+ * returns a parallel array.
+ *
+ * @tparam T type of the elements in the array, which is a subtype of AnyRef
+ * @param array the array to be parallelized
+ * @return a `Parallelizable` object with a `par` method=
+ */
+ implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] {
+ def par = mutable.ParArray.handoff[T](array)
+ }
+
+ implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new {
+ def isParallel = bf.isInstanceOf[Parallel]
+ def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
+ def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new {
+ def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
+ }
+ }
+
+ implicit def traversable2ops[T](t: TraversableOnce[T]) = new {
+ def isParallel = t.isInstanceOf[Parallel]
+ def isParIterable = t.isInstanceOf[ParIterable[_]]
+ def asParIterable = t.asInstanceOf[ParIterable[T]]
+ def isParSeq = t.isInstanceOf[ParSeq[_]]
+ def asParSeq = t.asInstanceOf[ParSeq[T]]
+ def ifParSeq[R](isbody: ParSeq[T] => R) = new {
+ def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody
+ }
+ def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else {
+ val it = t.toIterator
+ val cb = mutable.ParArrayCombiner[T]()
+ while (it.hasNext) cb += it.next
+ cb.result
+ }
+ }
+
+ implicit def throwable2ops(self: Throwable) = new {
+ def alongWith(that: Throwable) = self match {
+ case ct: CompositeThrowable => new CompositeThrowable(ct.throwables + that)
+ case _ => new CompositeThrowable(Set(self, that))
+ }
+ }
+
/* classes */
+ /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */
+ final class CompositeThrowable(val throwables: Set[Throwable])
+ extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.mkString(", "))
+
/** Unrolled list node.
*/
private[parallel] class Unrolled[T: ClassManifest] {
@@ -159,45 +208,6 @@ package object parallel {
} else this
}
-
- /* implicit conversions */
-
- /** An implicit conversion providing arrays with a `par` method, which
- * returns a parallel array.
- *
- * @tparam T type of the elements in the array, which is a subtype of AnyRef
- * @param array the array to be parallelized
- * @return a `Parallelizable` object with a `par` method=
- */
- implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] {
- def par = mutable.ParArray.handoff[T](array)
- }
-
- implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new {
- def isParallel = bf.isInstanceOf[Parallel]
- def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
- def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new {
- def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
- }
- }
-
- implicit def traversable2ops[T](t: TraversableOnce[T]) = new {
- def isParallel = t.isInstanceOf[Parallel]
- def isParIterable = t.isInstanceOf[ParIterable[_]]
- def asParIterable = t.asInstanceOf[ParIterable[T]]
- def isParSeq = t.isInstanceOf[ParSeq[_]]
- def asParSeq = t.asInstanceOf[ParSeq[T]]
- def ifParSeq[R](isbody: ParSeq[T] => R) = new {
- def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody
- }
- def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else {
- val it = t.toIterator
- val cb = mutable.ParArrayCombiner[T]()
- while (it.hasNext) cb += it.next
- cb.result
- }
- }
-
}
diff --git a/src/partest/scala/tools/partest/nest/Worker.scala b/src/partest/scala/tools/partest/nest/Worker.scala
index fba24394a5..6e8ccff723 100644
--- a/src/partest/scala/tools/partest/nest/Worker.scala
+++ b/src/partest/scala/tools/partest/nest/Worker.scala
@@ -538,8 +538,8 @@ class Worker(val fileManager: FileManager, params: TestRunParams) extends Actor
val lines = SFile(logFile).lines.filter(_.trim != "").toBuffer
succeeded = {
val failures = lines filter (_ startsWith "!")
- val passedok = lines filter (_ startsWith "+") forall (_ contains "OK")
- failures.isEmpty && passedok
+ //val passedok = lines filter (_ startsWith "+") forall (_ contains "OK") - OK may wrap!!
+ failures.isEmpty
}
if (!succeeded) {
NestUI.normal("ScalaCheck test failed. Output:\n")
diff --git a/test/files/scalacheck/parallel-collections/PairOperators.scala b/test/files/scalacheck/parallel-collections/PairOperators.scala
new file mode 100644
index 0000000000..48cbd136e5
--- /dev/null
+++ b/test/files/scalacheck/parallel-collections/PairOperators.scala
@@ -0,0 +1,97 @@
+package scala.collection.parallel.ops
+
+
+import scala.collection.parallel._
+
+
+trait PairOperators[K, V] extends Operators[(K, V)] {
+ def koperators: Operators[K]
+ def voperators: Operators[V]
+
+ private def zipPredicates(kps: List[K => Boolean], vps: List[V => Boolean]): List[((K, V)) => Boolean] = for {
+ (kp, vp) <- koperators.countPredicates zip voperators.countPredicates
+ } yield new Function1[(K, V), Boolean] {
+ def apply(kv: (K, V)) = kp(kv._1) && vp(kv._2)
+ }
+
+ /* operators */
+
+ def reduceOperators = for {
+ (kop, vop) <- koperators.reduceOperators zip voperators.reduceOperators
+ } yield new Function2[(K, V), (K, V), (K, V)] {
+ def apply(kv1: (K, V), kv2: (K, V)) = (kop(kv1._1, kv2._1), vop(kv1._2, kv2._2))
+ }
+
+ def countPredicates = zipPredicates(koperators.countPredicates, voperators.countPredicates)
+
+ def forallPredicates = zipPredicates(koperators.forallPredicates, voperators.forallPredicates)
+
+ def existsPredicates = zipPredicates(koperators.existsPredicates, voperators.existsPredicates)
+
+ def findPredicates = zipPredicates(koperators.findPredicates, voperators.findPredicates)
+
+ def mapFunctions = for {
+ (km, vm) <- koperators.mapFunctions zip voperators.mapFunctions
+ } yield new Function1[(K, V), (K, V)] {
+ def apply(kv: (K, V)) = (km(kv._1), vm(kv._2))
+ }
+
+ def partialMapFunctions = for {
+ (kpm, vpm) <- koperators.partialMapFunctions zip voperators.partialMapFunctions
+ } yield new PartialFunction[(K, V), (K, V)] {
+ def isDefinedAt(kv: (K, V)) = kpm.isDefinedAt(kv._1) && vpm.isDefinedAt(kv._2)
+ def apply(kv: (K, V)) = (kpm(kv._1), vpm(kv._2))
+ }
+
+ def flatMapFunctions = for {
+ (kfm, vfm) <- koperators.flatMapFunctions zip voperators.flatMapFunctions
+ } yield new Function1[(K, V), Traversable[(K, V)]] {
+ def apply(kv: (K, V)) = kfm(kv._1).toIterable zip vfm(kv._2).toIterable
+ }
+
+ def filterPredicates = zipPredicates(koperators.filterPredicates, voperators.existsPredicates)
+
+ def filterNotPredicates = filterPredicates
+
+ def partitionPredicates = filterPredicates
+
+ def takeWhilePredicates = zipPredicates(koperators.takeWhilePredicates, voperators.takeWhilePredicates)
+
+ def dropWhilePredicates = takeWhilePredicates
+
+ def spanPredicates = takeWhilePredicates
+
+ def foldArguments = for {
+ ((kinit, kop), (vinit, vop)) <- koperators.foldArguments zip voperators.foldArguments
+ } yield ((kinit, vinit), new Function2[(K, V), (K, V), (K, V)] {
+ def apply(kv1: (K, V), kv2: (K, V)) = (kop(kv1._1, kv2._1), vop(kv1._2, kv2._2))
+ })
+
+ def addAllTraversables = for {
+ (kt, vt) <- koperators.addAllTraversables zip voperators.addAllTraversables
+ } yield kt.toIterable zip vt.toIterable
+
+ def newArray(sz: Int) = new Array[(K, V)](sz)
+
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/files/scalacheck/parallel-collections/PairValues.scala b/test/files/scalacheck/parallel-collections/PairValues.scala
new file mode 100644
index 0000000000..864dad2425
--- /dev/null
+++ b/test/files/scalacheck/parallel-collections/PairValues.scala
@@ -0,0 +1,28 @@
+package scala.collection.parallel.ops
+
+
+
+
+
+import org.scalacheck._
+import org.scalacheck.Gen
+import org.scalacheck.Gen._
+import org.scalacheck.Prop._
+import org.scalacheck.Properties
+import org.scalacheck.Arbitrary._
+
+
+
+
+trait PairValues[K, V] {
+ def kvalues: Seq[Gen[K]]
+ def vvalues: Seq[Gen[V]]
+
+ def values = for {
+ kg <- kvalues
+ vg <- vvalues
+ } yield for {
+ k <- kg
+ v <- vg
+ } yield (k, v)
+}
diff --git a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala
index 9169890e98..394dc6b370 100644
--- a/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelArrayCheck.scala
@@ -14,7 +14,7 @@ import scala.collection._
import scala.collection.parallel.ops._
-abstract class ParallelArrayCheck[T](tp: String) extends ParallelSeqCheck[T]("ParallelArray[" + tp + "]") {
+abstract class ParallelArrayCheck[T](tp: String) extends ParallelSeqCheck[T]("ParArray[" + tp + "]") {
ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
diff --git a/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala
new file mode 100644
index 0000000000..1224ec8d4d
--- /dev/null
+++ b/test/files/scalacheck/parallel-collections/ParallelHashMapCheck.scala
@@ -0,0 +1,66 @@
+package scala.collection.parallel
+package immutable
+
+
+
+import org.scalacheck._
+import org.scalacheck.Gen
+import org.scalacheck.Gen._
+import org.scalacheck.Prop._
+import org.scalacheck.Properties
+import org.scalacheck.Arbitrary._
+
+import scala.collection._
+import scala.collection.parallel.ops._
+
+
+abstract class ParallelHashMapCheck[K, V](tp: String) extends ParallelIterableCheck[(K, V)]("immutable.ParHashMap[" + tp + "]") {
+ ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2)
+ ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2)
+
+ type CollType = ParHashMap[K, V]
+
+ def isCheckingViews = false
+
+ def instances(vals: Seq[Gen[(K, V)]]): Gen[Iterable[(K, V)]] = sized { sz =>
+ var hm = new immutable.HashMap[K, V]
+ val gen = vals(rnd.nextInt(vals.size))
+ for (i <- 0 until sz) hm += sample(gen)
+ hm
+ }
+
+ def fromTraversable(t: Traversable[(K, V)]) = {
+ var phm = new ParHashMap[K, V]
+ var i = 0
+ for (kv <- t.toList) {
+ phm += kv
+ i += 1
+ }
+ phm
+ }
+
+}
+
+
+object IntIntParallelHashMapCheck extends ParallelHashMapCheck[Int, Int]("Int, Int")
+with PairOperators[Int, Int]
+with PairValues[Int, Int]
+{
+ def intvalues = new IntValues {}
+ def kvalues = intvalues.values
+ def vvalues = intvalues.values
+
+ val intoperators = new IntOperators {}
+ def voperators = intoperators
+ def koperators = intoperators
+}
+
+
+
+
+
+
+
+
+
+
diff --git a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
index fd323ef82c..bc08947af4 100644
--- a/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelIterableCheck.scala
@@ -95,8 +95,18 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col
}
property("mappings must be equal") = forAll(collectionPairs) { case (t, coll) =>
- val results = for ((f, ind) <- mapFunctions.zipWithIndex)
- yield ("op index: " + ind) |: t.map(f) == coll.map(f)
+ val results = for ((f, ind) <- mapFunctions.zipWithIndex) yield {
+ val ms = t.map(f)
+ val mp = coll.map(f)
+ if (ms != mp) {
+ println(t)
+ println(coll)
+ println("mapped to: ")
+ println(ms)
+ println(mp)
+ }
+ ("op index: " + ind) |: ms == mp
+ }
results.reduceLeft(_ && _)
}
@@ -107,7 +117,7 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col
if (ps != pp) {
println(t)
println(coll)
- println("partially mapped to: ")
+ println("collected to: ")
println(ps)
println(pp)
}
@@ -166,7 +176,6 @@ abstract class ParallelIterableCheck[T](collName: String) extends Properties(col
println(tsl)
println(collsl)
println("as list: " + collsl.toList)
- println(tsl.asInstanceOf[Seq[T]].sameElements(collsl))
println(collsl.iterator.hasNext)
println(collsl.iterator.next)
println(collsl.iterator.hasNext)
diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala
index f77c6db435..04b7168286 100644
--- a/test/files/scalacheck/parallel-collections/pc.scala
+++ b/test/files/scalacheck/parallel-collections/pc.scala
@@ -3,21 +3,46 @@
import org.scalacheck._
+
import scala.collection.parallel._
class ParCollProperties extends Properties("Parallel collections") {
+ /* Collections */
+
// parallel arrays
//include(mutable.IntParallelArrayCheck)
// parallel ranges
//include(immutable.ParallelRangeCheck)
+
+ // parallel immutable hash maps (tries)
+ include(immutable.IntIntParallelHashMapCheck)
+
+ // parallel immutable hash sets (tries)
+
+ // parallel mutable hash maps (tables)
+
+
+ /* Views */
+
+ // parallel array views
+
+ // parallel immutable hash map views
+
+ // parallel mutable hash map views
}
object Test {
def main(args: Array[String]) {
- val results = org.scalacheck.Test.checkProperties(new ParCollProperties)
- if (!results.forall(_._2.passed)) println("Test results: " + results.mkString("\n"))
+ val pc = new ParCollProperties
+ org.scalacheck.Test.checkProperties(
+ org.scalacheck.Test.Params(
+ rng = new java.util.Random(5134L),
+ testCallback = new ConsoleReporter(0)
+ ),
+ pc
+ )
}
}